So far, in this mini-series we have explored various approaches on some standard issues that every IoT Edge application needs to address. In this post we will start exploring the big picture, a complete example that is production-ready and that can act as a good starting point the Azure IoT Edge application developement.

In the previous posts we meticulously avoided introducing any technology-specific dependencies in our evolving thermostat application example, either by temporarily omitting the code, or by abstracting the various libraries' behavior behind predefined interfaces. The benefit of this is that we keep our business code technology-agnostic, and as such, we can easily test it. We have also preserved the freedom to switch platforms any time we choose to. Let's see how we can get there.

But first..

The Azure IoT Module Template

Let's use an example to make this easier to understand. Let's assume that we want to implement an Azure IoT Edge module, starting from the default Azure IoT Edge Module template. This template is a simple .NET Core console app, with all the required IoT SDK references in it, that simply echoes all messages coming from the input input1 to the output output1. Let's assume we'd like extend this template by recording every message in a database.

The original template source code is here.

This is a modified version that includes the message database recording:

namespace DependencyInjection
{
    using System;
    using System.IO;
    using System.Runtime.InteropServices;
    using System.Runtime.Loader;
    using System.Security.Cryptography.X509Certificates;
    using System.Text;
    using System.Threading;
    using System.Threading.Tasks;
    using Microsoft.Azure.Amqp.Framing;
    using Microsoft.Azure.Devices.Client;
    using Microsoft.Azure.Devices.Client.Transport.Mqtt;

    class Program
    {
        static int counter;
        static void Main(string[] args)
        {
            Init().Wait();

            // Wait until the app unloads or is cancelled
            var cts = new CancellationTokenSource();
            AssemblyLoadContext.Default.Unloading += (ctx) => cts.Cancel();
            Console.CancelKeyPress += (sender, cpe) => cts.Cancel();
            WhenCancelled(cts.Token).Wait();
        }

        /// <summary>
        /// Handles cleanup operations when app is cancelled or unloads
        /// </summary>
        public static Task WhenCancelled(CancellationToken cancellationToken)
        {
            var tcs = new TaskCompletionSource<bool>();
            cancellationToken.Register(s => ((TaskCompletionSource<bool>)s).SetResult(true), tcs);
            return tcs.Task;
        }

        /// <summary>
        /// Initializes the ModuleClient and sets up the callback to receive
        /// messages containing temperature information
        /// </summary>
        static async Task Init()
        {
            MqttTransportSettings mqttSetting = new MqttTransportSettings(TransportType.Mqtt_Tcp_Only);
            ITransportSettings[] settings = { mqttSetting };

            // Open a connection to the Edge runtime
            ModuleClient ioTHubModuleClient = await ModuleClient.CreateFromEnvironmentAsync(settings);
            await ioTHubModuleClient.OpenAsync();
            Console.WriteLine("IoT Hub module client initialized.");

            // Initialize the database client
            var connectionString = Environment.GetEnvironmentVariable("DbConnectionString");
            DatabaseClient databaseClient = DatabaseClient.CreateFromConnectionString(connectionString);

            // Register callback to be called when a message is received by the module
            await ioTHubModuleClient.SetInputMessageHandlerAsync("input1", PipeMessage, (ioTHubModuleClient, databaseClient));
        }

        /// <summary>
        /// This method is called whenever the module is sent a message from the EdgeHub. 
        /// It just pipe the messages without any change.
        /// It prints all the incoming messages.
        /// </summary>
        static async Task<MessageResponse> PipeMessage(Message message, object userContext)
        {
            int counterValue = Interlocked.Increment(ref counter);

            var (moduleClient, databaseClient) = ((ModuleClient, DatabaseClient))userContext;
            

            byte[] messageBytes = message.GetBytes();
            string messageString = Encoding.UTF8.GetString(messageBytes);
            Console.WriteLine($"Received message: {counterValue}, Body: [{messageString}]");

            if (!string.IsNullOrEmpty(messageString))
            {
                using (var pipeMessage = new Message(messageBytes))
                {
                    foreach (var prop in message.Properties)
                    {
                        pipeMessage.Properties.Add(prop.Key, prop.Value);
                    }
                    
                    await databaseClient.RecordMessageAsync(pipeMessage);
                    
                    await moduleClient.SendEventAsync("output1", pipeMessage);

                    Console.WriteLine("Received message sent");
                }
            }
            return MessageResponse.Completed;
        }
    }
}

We've made just a few small changes to the original template code:

  1. Inside the Init() we now create our DatabaseClient instance:
// Initialize the database client
var connectionString = Environment.GetEnvironmentVariable("DbConnectionString");
DatabaseClient databaseClient = DatabaseClient.CreateFromConnectionString(connectionString);

2. Inside the PipeMessage we get the databaseClient instance from the method userContext with:

var (moduleClient, databaseClient) = ((ModuleClient, DatabaseClient))userContext;

3. In the same PipeMessage we record the message with:

await databaseClient.RecordMessageAsync(pipeMessage);

This is clearly a monolithic and tightly-coupled implementation. This code will fail if we don't have a valid connection string and the required environment variables to create the ModuleClient. Even if we did have the required environment variables, we would still need a running database and the entire IoT Edge Device Runtime in place to get it working.

So, the only way to test this PipeMessage code is

  • to setup an IoT Edge Device on your development environment
  • setup a database server on your development environment
  • build and deploy a new version of the edge application
  • either attach a debugger to the running container or start searching through the logs.

This apparently leaves unit testing out of the question.

Welcome to 1985!

Congrats! You just traveled 35 years back in the software!

Inversion of Control

In it's simplest form, dependency injection is an abstraction strategy that allows us to decouple our business code from any technical implementation. The idea is fairly simple: for every dependency, we define an interface that describes well the dependency's public behavior. Then, for each module we define the it's dependencies at the constructor, by adding the dependency interface as constructor arguments.

A dependency is something that we use but we don't control, eg. the ModuleClient and the DatabaseClient, because their behavior depends on external processes.

Let's define these two abstractions. We'll start with the DatabaseClient class.

For the purpose of simplicity, this is an imaginary database SDK.

We start by defining a simple interface that contains all useful behavior:

public interface IDatabaseClient
{
    Task OpenAsync(CancellationToken cancellationToken);
    Task CloseAsync(CancellationToken cancellationToken);
    Task RecordMessageAsync(byte[] message);
}

Then, we define a wrapper class that implements this interface and wraps the original DatabaseClient object:

internal class DatabaseClientWrapper : IDatabaseClient
{
    const string CONNECTION_STRING_NAME = "dbConnectionString";
    DatabaseClient _databaseClient { get; }

    public DatabaseClientWrapper(IConfiguration configuration)
    {
        var connectionString =
            configuration.GetConnectionString(CONNECTION_STRING_NAME);

        _databaseClient = DatabaseClient.
                CreateFromConnectionString(connectionString);
    }
    public async Task OpenAsync(CancellationToken cancellationToken)
    {
        await _databaseClient.OpenAsync(cancellationToken);
    }

    public async Task CloseAsync(CancellationToken cancellationToken)
    {
        await _databaseClient.CloseAsync(cancellationToken);
    }

    public async Task RecordMessageAsync(byte[] message)
    {
        await _databaseClient.RecordMessageAsync(message);
    }
}
Note: this wrapper class "depends" on IConfiguration

What have we achieved? We can now replace the DatabaseClient references with IDatabaseClient, which means we don't have any more hardcoded references on this database SDK inside Program.cs.

Let's do the same for the ModuleClient. A first version of the interface can be:

public interface IModuleClient
{
    Task OpenAsync(CancellationToken cancellationToken);
    Task CloseAsync(CancellationToken cancellationToken);
    Task SendEventAsync(string outputName, 
        Message message);
    Task SetInputMessageHandlerAsync(string inputName, 
        MessageHandler messageHandler, 
        object userContext);
    Task SetMethodHandlerAsync(string methodName, 
        MethodCallback methodHandler, 
        object userContext);
    Task<Twin> GetTwinAsync(CancellationToken cancellationToken);
    Task<Twin> GetTwinAsync();
}

and the wrapper class can be:

public class ModuleClientWrapper : IModuleClient
{
    private ModuleClient ModuleClient { get; }

    public ModuleClientWrapper()
    {
        MqttTransportSettings mqttSetting = 
                new MqttTransportSettings(TransportType.Mqtt_Tcp_Only);
        ITransportSettings[] settings = { mqttSetting };

        ModuleClient = ModuleClient.CreateFromEnvironmentAsync(settings).Result;
    }

    public async Task SendEventAsync(string outputName, Message message)
    {
        await ModuleClient.SendEventAsync(outputName, message);
    }

    public async Task SetInputMessageHandlerAsync(string inputName, 
        MessageHandler messageHandler, 
        object userContext)
    {
        await ModuleClient.SetInputMessageHandlerAsync(inputName, 
            messageHandler, 
            userContext);
    }

    public async Task SetMethodHandlerAsync(string methodName, 
        MethodCallback methodHandler, 
        object userContext)
    {
        await ModuleClient.SetMethodHandlerAsync(methodName, 
            methodHandler, 
            userContext);
    }
    public async Task OpenAsync(CancellationToken cancellationToken)
    {
        await ModuleClient.OpenAsync(cancellationToken);
    }
    public async Task CloseAsync(CancellationToken cancellationToken)
    {
        await ModuleClient.CloseAsync(cancellationToken);
    }
    public async Task<Twin> GetTwinAsync(CancellationToken cancellationToken)
    {
        return await ModuleClient.GetTwinAsync(cancellationToken);
    }
    public async Task<Twin> GetTwinAsync()
    {
        return await ModuleClient.GetTwinAsync();
    }
}

After abstracting our dependencies, the final step is to isolate our business code in a new class that depends on these two interfaces.

Let's call this class PipeModule:

public class PipeModule
{
    IModuleClient _moduleClient { get; }
    IDatabaseClient _databaseClient { get; }
    public PipeModule(IModuleClient moduleClient, 
        IDatabaseClient databaseClient)
    {
        _moduleClient = moduleClient;
        _databaseClient = databaseClient;
    }
    public async Task StartAsync(CancellationToken cancellationToken)
    {
        await _moduleClient.OpenAsync(cancellationToken);
        await _databaseClient.OpenAsync(cancellationToken);

        await _moduleClient.SetInputMessageHandlerAsync("input1",
            new MessageHandler(async (message, context) =>
            {
                await _databaseClient.RecordMessageAsync(message.GetBytes());
                await _moduleClient.SendEventAsync("output1", message);
                return await Task.FromResult(MessageResponse.Completed);
            }), this);
    }
        
    public async Task StopAsync(CancellationToken cancellationToken)
    {
        await _moduleClient.CloseAsync(cancellationToken);
        await _databaseClient.CloseAsync(cancellationToken);
    }
}

This class contains some initialization and termination logic, and our piping business code, which is just these two lines:

await _databaseClient.RecordMessageAsync(message.GetBytes());
await _moduleClient.SendEventAsync("output1", message);

Now, with these abstractions in mind, let's see what the original example looks like:

class Program2
{
    static async Task Main(string[] args)
    {
        // Read the configuration
        IConfiguration configuration = new ConfigurationBuilder()
                .AddJsonFile("appsettings.json", optional: true)
                .AddEnvironmentVariables()
                .AddCommandLine(args)
                .Build();

        // Create the cancelation token source
        var cancellationTokenSource = new CancellationTokenSource();
        AssemblyLoadContext.Default.Unloading +=
            (cts) => cancellationTokenSource.Cancel();

        Console.CancelKeyPress +=
            (sender, cts) =>
            {
                Console.WriteLine("Ctrl+C detected.");
                cts.Cancel = true;
                cancellationTokenSource.Cancel();
            };

        await Init(configuration, cancellationTokenSource.Token);

        // Wait until the app unloads or is cancelled
        await WhenCancelled(cancellationTokenSource.Token);
    }
    public static Task WhenCancelled(CancellationToken cancellationToken)
    {
        var taskCompletionSource = new TaskCompletionSource<bool>();
        cancellationToken.Register(
            s => ((TaskCompletionSource<bool>)s).SetResult(true),
            taskCompletionSource);
        return taskCompletionSource.Task;
    }
    static async Task Init(IConfiguration configuration, 
        CancellationToken cancellationToken)
    {
        IModuleClient moduleClient = new ModuleClientWrapper();
        IDatabaseClient databaseClient = 
                new DatabaseClientWrapper(configuration);

        await moduleClient.OpenAsync(cancellationToken);
        await databaseClient.OpenAsync(cancellationToken);
        
        PipeModule pipeModule = new PipeModule(moduleClient, 
            databaseClient);
        await pipeModule.StartAsync(cancellationToken);

        Console.WriteLine("Module initialized.");
    }
}

Besides the PipeModule, this version includes some of the best practices from the previous posts. Although the code is much simpler, this is just an interim version, we still have a few missing things to take care of, namely, the dependency injection mechanism.

In the next post we will include a dependency injection mechanism that is based on the Microsoft.Extensions.Hosting library and we will see the benefits of doing so.