<![CDATA[HAVE DATA - WILL TRAIN]]>https://havedatawilltrain.com/https://havedatawilltrain.com/favicon.pngHAVE DATA - WILL TRAINhttps://havedatawilltrain.com/Ghost 3.12Tue, 12 May 2020 00:05:32 GMT60<![CDATA[Tick on the Edge]]>https://havedatawilltrain.com/tick-on-the-edge/5eb3667d21b7ba01b83f4f0aTue, 12 May 2020 00:05:07 GMT

One of the biggest challenges in IoT Edge is Data Visualization. A standard approach is to move the data to the upstream and use various storage, querying and rendering tools to visualize the data. This can become particularly challenging when dealing with high-frequency data in a low-bandwidth connection. Here, we examine how to setup the TICK stack on Azure IoT Edge for storing, querying, processing, and visualizing your data on the edge.

The TICK stack consists of four individual services that have been developed to function as a complete framework, but are abstract enough to be interchangeable by third party services of similar capabilities.

According to Influx Data, the company that makes and supports the stack, TICK "is a loosely coupled yet tightly integrated set of open source projects designed to handle massive amounts of time-stamped information to support your metrics analysis needs".

TICK consists of four services:
1. Telegraf, a telemetry and metrics collection mechanism,
2. InfuxDB, a time series database,
3. Chronograf , a time series visualization engine, and
4. Kapacitor, a time series data processing engine.

Although many variances of this stack exist, these combined four services cover most of the typical scenarios of the edge compute space.

The easiest way to get started with the TICK stack is by using the TICK sandbox repo. This repo is a quick way to get the entire TICK Stack spun up and working together using docker containers through docker-compose.

The docker-compose file structure is somewhat similar to the Azure IoT Edge deployment manifest. The TICK sandbox converted for Azure IoT Edge is here, the rest of the post deal with how to convert the original sandbox, or any docker-compose application to run in Azure IoT Edge.

The Azure IoT Edge TICK stack has been published in this repo.

Modules

The docker-compose services correspond to the manifest modules, with two prerequisite modules defined as system modules, the edgeAgent and the edgeHub. The former is responsible for reading the manifest and starting up the containers on the edge, and the latter is there to facilitate any intra-module communication between the custom modules. In fact, if your modules do not use this communication scheme, then you can even omit the edgeHub system module.

Let's take a look on the docker-compose chronograf service. This service definition is based on the original sandbox chronograf service, with a few minor modifications:

  • we've renamed the images folder to modules
  • changed the way the image tagging schema
  • added a registry name on the images
chronograf:
  # Full tag list: https://hub.docker.com/r/library/chronograf/tags/
  build:
    context: ./modules/chronograf
    dockerfile: ./Dockerfile
    args:
      CHRONOGRAF_TAG: ${CHRONOGRAF_TAG}
  image: "${CONTAINER_REGISTRY_ADDRESS}/chrono_config:${VER}.${BLD}-${ARCH}"
  environment:
    RESOURCES_PATH: "/usr/share/chronograf/resources"
  volumes:
    # Mount for chronograf database
    - /tmp/chronograf/data/:/var/lib/chronograf/
  links:
    # Chronograf requires network access to InfluxDB and Kapacitor
    - influxdb
    - kapacitor
  ports:
    # The WebUI for Chronograf is served on port 8888
    - "8888:8888"
  depends_on:
    - kapacitor
    - influxdb
    - telegraf

Here is the corresponding module in the deployment manifest. Azure IoT Edge has a few fundamental differences compared to docker compose. The first is that all modules operate in the same network, and as such, no special configuration is required for two modules to connect. Also, there is no startup ordering, the modules need to be resilient enough for every possible startup order.

"chronograf": {
  "version": "1.0",
  "type": "docker",
  "status": "running",
  "restartPolicy": "always",
  "settings": {
    "image": "${MODULES.chronograf}",
    "createOptions": {
      "Hostname": "chronograf",
      "Env": [
        "RESOURCES_PATH=/usr/share/chronograf/resources"
      ],
      "HostConfig": {
        "Binds": [
          "/tmp/chronograf/data/:/var/lib/chronograf/"
        ],
        "PortBindings": {
          "8888/tcp": [
            {
              "HostPort": "8888"
            }
          ]
        }
      }
    }
  }
}

Module definition files

We can see that the docker image creation configuration is missing completely from the manifest. Nevertheless, the VS Code Azure IoT Edge extension helps with managing a container build solution. This extension assumes there is a template manifest that points to module definition files, like this one:

{
    "$schema-version": "0.0.1",
    "description": "",
    "image": {
        "repository": "${CONTAINER_REGISTRY_ADDRESS}/chronograf",
        "tag": {
            "version": "${VER}.${BLD}",
            "platforms": {
                "amd64": "./Dockerfile"
            }
        },
        "buildOptions": ["--build-arg CHRONOGRAF_TAG=${CHRONOGRAF_TAG}" ],
        "contextPath": "./"
    },
    "language": "other"
}

The above  module.json file exists inside the modules\chronograf folder, and the manifest "points" to it by this line:

"image": "${MODULES.chronograf}"

Parallel docker-compose

Although VS Code has commands to easily build and push the module container images, it is a good idea to keep a parallel docker-compose definition, primarily for experimenting outside the Edge runtime.

Here's the complete parallel docker-compose file for the TICK stack:

version: '3'
services:
  influxdb:
    # Full tag list: https://hub.docker.com/r/library/influxdb/tags/
    build:
      context: ./modules/influxdb/
      dockerfile: ./Dockerfile
      args:
        INFLUXDB_TAG: ${INFLUXDB_TAG}
    image: "${CONTAINER_REGISTRY_ADDRESS}/influxdb:${VER}.${BLD}-${ARCH}"
    volumes:
      # Mount for influxdb data directory
      - /tmp/influxdb/data:/var/lib/influxdb
    ports:
      # The API for InfluxDB is served on port 8086
      - "8086:8086"
      - "8082:8082"
      # UDP Port
      - "8089:8089/udp"

  telegraf:
    # Full tag list: https://hub.docker.com/r/library/telegraf/tags/
    build:
      context: ./modules/telegraf/
      dockerfile: ./Dockerfile
      args:
        TELEGRAF_TAG: ${TELEGRAF_TAG}
    image: "${CONTAINER_REGISTRY_ADDRESS}/telegraf:${VER}.${BLD}-${ARCH}"
    environment:
      HOSTNAME: "telegraf-getting-started"
    # Telegraf requires network access to InfluxDB
    links:
      - influxdb
    volumes:
      # Mount for Docker API access
      - /var/run/docker.sock:/var/run/docker.sock
    depends_on:
      - influxdb

  kapacitor:
  # Full tag list: https://hub.docker.com/r/library/kapacitor/tags/
    build:
      context: ./modules/kapacitor/
      dockerfile: ./Dockerfile
      args:
        KAPACITOR_TAG: ${KAPACITOR_TAG}
    image: "${CONTAINER_REGISTRY_ADDRESS}/kapacitor:${VER}.${BLD}-${ARCH}"
    volumes:
      # Mount for kapacitor data directory
      - /tmp/kapacitor/data/:/var/lib/kapacitor
    # Kapacitor requires network access to Influxdb
    links:
      - influxdb
    ports:
      # The API for Kapacitor is served on port 9092
      - "9092:9092"

  chronograf:
    # Full tag list: https://hub.docker.com/r/library/chronograf/tags/
    build:
      context: ./modules/chronograf
      dockerfile: ./Dockerfile
      args:
        CHRONOGRAF_TAG: ${CHRONOGRAF_TAG}
    image: "${CONTAINER_REGISTRY_ADDRESS}/chrono_config:${VER}.${BLD}-${ARCH}"
    environment:
      RESOURCES_PATH: "/usr/share/chronograf/resources"
    volumes:
      # Mount for chronograf database
      - /tmp/chronograf/data/:/var/lib/chronograf/
    links:
      # Chronograf requires network access to InfluxDB and Kapacitor
      - influxdb
      - kapacitor
    ports:
      # The WebUI for Chronograf is served on port 8888
      - "8888:8888"
    depends_on:
      - kapacitor
      - influxdb
      - telegraf

And here is the Azure IoT Edge template manifest:

{
  "$schema-template": "2.0.0",
  "modulesContent": {
    "$edgeAgent": {
      "properties.desired": {
        "schemaVersion": "1.0",
        "runtime": {
          "type": "docker",
          "settings": {
            "minDockerVersion": "v1.25",
            "loggingOptions": "",
            "registryCredentials": {
              "${CONTAINER_REGISTRY_GROUP}": {
                "username": "${CONTAINER_REGISTRY_USER_NAME}",
                "password": "${CONTAINER_REGISTRY_PASSWORD}",
                "address": "${CONTAINER_REGISTRY_ADDRESS}"
              }
            }
          }
        },
        "systemModules": {
          "edgeAgent": {
            "type": "docker",
            "settings": {
              "image": "mcr.microsoft.com/azureiotedge-agent:1.0",
              "createOptions": {}
            }
          },
          "edgeHub": {
            "type": "docker",
            "status": "running",
            "restartPolicy": "always",
            "settings": {
              "image": "mcr.microsoft.com/azureiotedge-hub:1.0",
              "createOptions": {
                "HostConfig": {
                  "PortBindings": {
                    "5671/tcp": [
                      {
                        "HostPort": "5671"
                      }
                    ],
                    "8883/tcp": [
                      {
                        "HostPort": "8883"
                      }
                    ],
                    "443/tcp": [
                      {
                        "HostPort": "443"
                      }
                    ]
                  }
                }
              }
            }
          }
        },
        "modules": {
          "chronograf": {
            "version": "1.0",
            "type": "docker",
            "status": "running",
            "restartPolicy": "always",
            "settings": {
              "image": "${MODULES.chronograf}",
              "createOptions": {
                "Hostname": "chronograf",
                "Env": [
                  "RESOURCES_PATH=/usr/share/chronograf/resources"
                ],
                "HostConfig": {
                  "Binds": [
                    "/tmp/chronograf/data/:/var/lib/chronograf/"
                  ],
                  "PortBindings": {
                    "8888/tcp": [
                      {
                        "HostPort": "8888"
                      }
                    ]
                  }
                }
              }
            }
          },
          "influxdb": {
            "version": "1.0",
            "type": "docker",
            "status": "running",
            "restartPolicy": "always",
            "settings": {
              "image": "${MODULES.influxdb}",
              "createOptions": {
                "Hostname": "influxdb",
                "Env": [],
                "HostConfig": {
                  "Binds": [
                    "/tmp/influxdb/data:/var/lib/influxdb"
                  ],
                  "PortBindings": {
                    "8086/tcp": [
                      {
                        "HostPort": "8086"
                      }
                    ],
                    "8082/tcp": [
                      {
                        "HostPort": "8082"
                      }
                    ],
                    "8089/udp": [
                      {
                        "HostPort": "8089"
                      }
                    ]
                  }
                }
              }
            }
          },
          "kapacitor": {
            "version": "1.0",
            "type": "docker",
            "status": "running",
            "restartPolicy": "always",
            "settings": {
              "image": "${MODULES.kapacitor}",
              "createOptions": {
                "Hostname": "kapacitor",
                "Env": [],
                "HostConfig": {
                  "Binds": [
                    "/tmp/kapacitor/data:/var/lib/kapacitor"
                  ],
                  "PortBindings": {
                    "9092/tcp": [
                      {
                        "HostPort": "9092"
                      }
                    ]
                  }
                }
              }
            }
          },
          "telegraf": {
            "version": "1.0",
            "type": "docker",
            "status": "running",
            "restartPolicy": "always",
            "settings": {
              "image": "${MODULES.telegraf}",
              "createOptions": {
                "Hostname": "telegraf",
                "Env": [
                  "HOSTNAME=telegraf-getting-started"
                ],
                "HostConfig": {
                  "Binds": [
                    "/var/run/docker.sock:/var/run/docker.sock"
                  ]
                }
              }
            }
          }
        }
      }
    },
    "$edgeHub": {
      "properties.desired": {
        "schemaVersion": "1.0",
        "routes": {
          "allToIoTHub": "FROM /messages/modules/* INTO $upstream"
        },
        "storeAndForwardConfiguration": {
          "timeToLiveSecs": 7200
        }
      }
    }
  }
}

Assuming all other modules are properly defined with a module.json definition file, running the Build and Push IoT Edge Solution to build your module containers and push them in the specified registry.

Make sure you specify your registry and the registry access credentials in the .env file first.

Similarly, running the Generate IoT Edge Deployment Manifest command in VS Code, will generate the actual manifest to use on a device. In the explorer view of VS Code, locate the Azure IoT Hub pane, right click on an edge enabled device and run Create deployment for Single Device and select the generated manifest. This manifest will be pushed down to the running device. If you wait a few minutes to make sure all the modules have been downloaded and visit the 8888 http device port, you'll see the Chronograf UI:

Tick on the Edge
Chronograf running on the Edge

Conclusion

We saw how to transform any docker-compose file to an Azure IoT Edge solution, and how to run the TICK stack on Azure IoT Edge. Using this as starting point, we will see next how we can read and process high frequency device telemetry data.

]]>
<![CDATA[The beloved man's objection]]>https://havedatawilltrain.com/the-beloved-mans-objection/5ea6efe24dd4b80082b58ac5Tue, 28 Apr 2020 23:31:01 GMT

In this mini-series we have analyzed different challenges around IoT Edge development, and explored a few best practices that will help us avoid any unnecessary roughness when developing Edge applications. In the last post we saw how we can use most of these best practices to compose a loosely coupled Azure IoT Edge application and some tooling that allows us to have a true F5 development experience. Today we will see the main driver for all these patterns, which is unit testing.

The main challenge with any modern application is finding ways to test most of it's code, and then automating that. This automated testing capability can be perceived as the holy grail of every production system, because it greatly accelerates the software development life cycle iterations and eventually increases the overall product quality.

The challenge with Azure IoT Edge development is that the security and communication protocols are tightly coupled to the IoT C# SDK, making it very difficult to run a test without the corresponding infrastructure. Nevertheless, as we saw in this post, there is an easy way to abstract this SDK dependency and run the business code using a mock implementation of the SDK. We will use the same approach to mock all other dependencies, and eventually test a stand alone business method, a unit test.

Unit Testing Setup

We will be using the xunit unit testing framework, a very popular option in .NET Core. We will build two projects from scratch, one IoT Edge module project similar to this template, and an xunit unit testing project that references the former.

  1. Create the solution and project folders
mkdir UnitTestingExample
cd UnitTestingExample

mkdir IoTModule
mkdir IoTModuleTests

2. Create the IoT Edge Module project and add the required nugets

cd IoTModule
dotnet new console
dotnet add package Microsoft.Azure.Devices.Client
dotnet add package Microsoft.Extensions.Hosting
dotnet add package Serilog.Extensions.Hosting
dotnet add package Serilog.Settings.Configuration
dotnet add package Serilog.Sinks.Console
dotnet add package Serilog.Enrichers.Thread

4. Add the module code. We will replace the Program.cs code with:

namespace IoTModule
{
    using System.Threading.Tasks;
    using Microsoft.Extensions.DependencyInjection;
    using Microsoft.Extensions.Hosting;
    using Serilog;

    class Program
    {
        static async Task Main(string[] args)
        {
            using (var host = Host.CreateDefaultBuilder(args)
                 .ConfigureServices((hostContext, services) =>
                 {
                    services.AddSingleton<IModuleClient, ModuleClientWrapper>();
                    services.AddHostedService<EventHandlerModule>();
                 })
                 .UseSerilog()
                 .UseConsoleLifetime()
                 .Build())
            {
                await host.RunAsync();
            }
        }
    }
}

5. This code references the IModuleClient  the ModuleClientWrapper and the module class, the EventHandlerModule. We have seen before the first two before. They are useful to create an abstraction layer over the SDK and can be found in this post.

6. The EventHandlerModule.cs code is this:

namespace IoTModule
{
    using Microsoft.Azure.Devices.Client;
    using Microsoft.Extensions.Hosting;
    using Microsoft.Extensions.Logging;
    using System.Text;
    using System.Threading;
    using System.Threading.Tasks;

    public class EventHandlerModule : IHostedService
    {
        readonly IModuleClient moduleClient;
        readonly ILogger logger;

        public EventHandlerModule(IModuleClient moduleClient,
            ILogger<EventHandlerModule> logger)
        {
            this.moduleClient = moduleClient;
            this.logger = logger;
        }

        public async Task StartAsync(CancellationToken cancellationToken)
        {
            await moduleClient.OpenAsync(cancellationToken);

            await moduleClient.SetInputMessageHandlerAsync("input",
                new MessageHandler(async (message, context) =>
                {
                    var messageText = Encoding.UTF8.GetString(message.GetBytes());
                    logger.LogInformation($"Message received: {messageText}");
                    return await Task.FromResult(MessageResponse.Completed);
                }), this);

            logger.LogInformation("Started.");
        }

        public async Task StopAsync(CancellationToken cancellationToken)
        {
            await moduleClient.CloseAsync(cancellationToken);
            logger.LogInformation("Stopped.");
        }
    }
}

This module is fairly simple, it just expects messages in the input called input and logs the message content as string. When we try to run this application, we get an exception from the IoT SDK complaining about missing environment variable, required to connect to the edge daemon:

Environment variable IOTEDGE_WORKLOADURI is required.

It is clear that to test this message handler callback, we need to mock out the SDK.

Unit test project

Let's setup our unit testing project now. We'll add the Moq nuget too:

cd ..
cd IoTModuleTests
dotnet new xunit
dotnet add package Moq
dotnet add reference ..\IoTModule

Let's replace the default test code with this empty test:

namespace ModuleTests
{
    using Microsoft.Extensions.DependencyInjection;
    using Microsoft.Extensions.Hosting;
    using Xunit;
    using SimulatedTemperatureSensor;
    using Moq;
    using System.Threading;
    using Microsoft.Extensions.Logging;
    using Microsoft.Azure.Devices.Client;
    using System.Text;
    using System.Collections.Generic;
    using System.Threading.Tasks;

    public class EventHandlerModuleTester
    {
        [Fact]
        public async Task TestInputMessageHandler()
        {
            Assert.True(true);
        }
    }
}
This is a good moment to verify your code, you should be able to run this test successfully. Simply run dotnet test inside the IoTModuleTests directory

This will be our first unit test that sends a message to the input input and expects a successful execution of the callback.  

Mocking the IoT SDK

Our test needs to setup a similar dependency injection mechanism and add in this mechanism all involved services, except for the ModuleClientWrapper which we will be mocking. This mocking will be based on the SDK abstraction layer we created in this post.

Abstracting an SDK has the added benefit of controlling the area of exposure of our codebase to this SDK. By abstracting only the useful portion of the SDK we reduce the impact the future versions have on our application.

The test with just the dependency injection mechanism becomes:

[Fact]
public void TestInputMessageHandler()
{
    var services = new ServiceCollection();

    services.AddHostedService<EventHandlerModule>();

    var serviceProvider = services.BuildServiceProvider();
    var hostedService = serviceProvider.GetService<IHostedService>();
    hostedService.StartAsync(new CancellationToken());
}

This code will try to create and start our module, the EventHandlerModule hosted service class. Looking at this class constructor we can see that this service depends on two other services, the IModuleClient and the ILogger<EventHandlerModule> services.

Lets add a mock version of them:

[Fact]
public async Task TestInputMessageHandler()
{
    var services = new ServiceCollection();

    services.AddHostedService<EventHandlerModule>();
    services.AddSingleton((s) =>
    {
        var moduleClient = new Mock<IModuleClient>();
        return moduleClient.Object;
    });

    services.AddSingleton((s) =>
    {
        var mockLogger = new Mock<ILogger<EventHandlerModule>>();
        return mockLogger.Object;
    });

    var serviceProvider = services.BuildServiceProvider();

    var hostedService = serviceProvider.GetService<IHostedService>();
    await hostedService.StartAsync(new CancellationToken());
}

Now this test is complete, in terms having a valid dependency hierarchy in place. In fact, running this test will yield a successful result. We are still missing two last tasks, to inject a message to the module's input and validate a successful callback execution. In other words, we will override the IModuleClient's  SendEventAsync  and SetInputMessageHandlerAsync methods to send a message to the registered callback. To do this, we will use a Dictionary<string, (MessageHandler, object)> structure to keep the callback in memory and invoke it when we get a message:

[Fact]
public void _TestInputMessageHandler()
{
    var inputMessageHandlers = new Dictionary<string, (MessageHandler, object)>();

    var services = new ServiceCollection();

    services.AddHostedService<EventHandlerModule>();
    services.AddSingleton((s) =>
    {
        var moduleClient = new Mock<IModuleClient>();

        moduleClient.Setup(e => e.SetInputMessageHandlerAsync(
            It.IsAny<string>(),
            It.IsAny<MessageHandler>(),
            It.IsAny<object>()))
        .Callback<string, MessageHandler, object>((inputName, messageHandler, userContext) =>
        {
            inputMessageHandlers[inputName] = (messageHandler, userContext);
        }).Returns(Task.FromResult(0));

        moduleClient.Setup(e => e.SendEventAsync(
            It.IsAny<string>(),
            It.IsAny<Message>()))
        .Callback<string, Message>((output, message) =>
        {
            var result = inputMessageHandlers[output].Item1(message,
                        inputMessageHandlers[output].Item2).GetAwaiter().GetResult();

            Assert.Equal(MessageResponse.Completed, result);

        }).Returns(Task.FromResult(0));

        return moduleClient.Object;
    });

    services.AddSingleton((s) =>
    {
        var mockLogger = new Mock<ILogger<EventHandlerModule>>();
        return mockLogger.Object;
    });

    var serviceProvider = services.BuildServiceProvider();

    var hostedService = serviceProvider.GetService<IHostedService>();
    hostedService.StartAsync(new CancellationToken()).GetAwaiter().GetResult();

    serviceProvider.GetService<IModuleClient>()
        .SendEventAsync("input",
            new Message(Encoding.UTF8.GetBytes("This is a mocked message!"))).GetAwaiter().GetResult();
} 

By calling the Assert.Equal(MessageResponse.Completed, result); inside the SendEventAsync mock , our unit test validates that the callback executed successfully.

Although this test initially might seem too complicated for such a simple callback, most of this code is common to any other unit test we might implement, and it can be hidden in a dependency initialization step, shared across all other tests for the same module. Indeed, this common initialization step could be moved in this Init method:

public async Task<ServiceProvider> Init(Action<MessageResponse> messageResultCallback)
{
    var inputMessageHandlers = new Dictionary<string, (MessageHandler, object)>();

    var services = new ServiceCollection();

    services.AddHostedService<EventHandlerModule>();
    services.AddSingleton((s) =>
    {
        var moduleClient = new Mock<IModuleClient>();

        moduleClient.Setup(e => e.SetInputMessageHandlerAsync(
            It.IsAny<string>(),
            It.IsAny<MessageHandler>(),
            It.IsAny<object>()))
        .Callback<string, MessageHandler, object>((inputName, messageHandler, userContext) =>
        {
            inputMessageHandlers[inputName] = (messageHandler, userContext);
        }).Returns(Task.FromResult(0));

        moduleClient.Setup(e => e.SendEventAsync(
            It.IsAny<string>(),
            It.IsAny<Message>()))
        .Callback<string, Message>((output, message) =>
        {
            var result = inputMessageHandlers[output].Item1(message,
                        inputMessageHandlers[output].Item2).GetAwaiter().GetResult();

            messageResultCallback(result);

        }).Returns(Task.FromResult(0));

        return moduleClient.Object;
    });

    services.AddSingleton((s) =>
    {
        var mockLogger = new Mock<ILogger<EventHandlerModule>>();
        return mockLogger.Object;
    });

    var serviceProvider = services.BuildServiceProvider();

    var hostedService = serviceProvider.GetService<IHostedService>();
    await hostedService.StartAsync(new CancellationToken());
    return serviceProvider;
}

and in this case, our message handler unit test code becomes:

[Fact]
public async Task TestInputMessageHandler()
{
    MessageResponse messageResponse = MessageResponse.Abandoned;
    var serviceProvider = await Init((e) => messageResponse = e);

    await serviceProvider.GetService<IModuleClient>()
        .SendEventAsync("input",
            new Message(Encoding.UTF8.GetBytes("This is a mocked message!")));

    Assert.Equal(MessageResponse.Completed, messageResponse);
}
The code of this example has been published here.

Recap

We saw how we can use the default .NET Core unit testing frameworks to build unit tests for our IoT Edge application. Following this approach we can define any other type of IoT Edge unit test, like twin update, direct method calls etc.  

]]>
<![CDATA[Lysis]]>https://havedatawilltrain.com/lysis/5e9f74c4d47c36006e77105eWed, 22 Apr 2020 23:29:26 GMT

In the last post we saw how to setup an F5 development experience for an IoT Edge application, without having to install the IoT Edge Runtime or any other tool on our localhost, but rather by mocking the runtime's behavior in one of our mock dependencies. In this post, we will see how to setup the same F5 experience, but with using the actual IoT Edge Runtime.

This is an alternative approach to what the official Azure IoT Edge development tools offer. In this approach we will setup a development IoT Edge Server that will allow us to debug our modules with a simple F5 experience, no redeployment, no remote debugging, no runtime installation and troubleshooting.

Emulating IoT Edge Runtime  

Unfortunately, currently there is no development IoT Edge server, but we can build one!

I have found that the effort of installing and troubleshooting the runtime on my development machine exceeds the effort of building this custom emulator.

To setup this development server, we will need to build a custom tool in C#.

Let's assume we would like to debug the SimulatedTemperatureSensor we built in the last post, but using the actual IoT Edge runtime this time instead of mocking it out. Let's create an new .NET Core console app next to the SimulatedTemperatureSensor that will hold all of our development tools:

cd ..
mkdir DevelopmentTools
cd DevelopmentTools
dotnet new console
dotnet add package Microsoft.Azure.Devices
dotnet add package Docker.DotNet
code .

This tool needs to perform three tasks:

  1. Provision a development IoT device.
  2. Create and push the appropriate development deployment manifest.
  3. Run this device locally.

Here's the self-descriptive code of our tool's Program.cs:

namespace DevelopmentTools
{
    using System;
    using System.IO;
    using System.Threading.Tasks;

    class Program
    {
        static async Task Main(string[] args)
        {
            if (args.Length != 3)
            {
                Console.WriteLine("Usage: dotnet run {MANIFEST_FILE} {DEVICE_ID} " +
                "{IOT_HUB_OWNER_COONECTION_STRING}");
                return;
            }
            var developmentManifest = Utilities.CreateDevelopmentManifest(
                File.ReadAllText(args[0]));

            var deviceConnectionString =
                await Utilities.ProvisionDeviceAsync(args[1],
                developmentManifest,
                args[2]);

            await Utilities.StartEmulatorAsync(deviceConnectionString);
        }
    }
}

We need to create a Utilities.cs file where we'll put the referenced three functions from above:

namespace DevelopmentTools
{
    using Docker.DotNet;
    using Docker.DotNet.Models;
    using Microsoft.Azure.Devices;
    using Microsoft.Azure.Devices.Shared;
    using Newtonsoft.Json;
    using Newtonsoft.Json.Linq;
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Runtime.InteropServices;
    using System.Threading.Tasks;

    public static class Utilities
    {
        public static async Task StartEmulatorAsync(string deviceConnectionString)
        {
            string deviceContainerImage = "toolboc/azure-iot-edge-device-container";
            int[] exposedPorts = new[] { 15580, 15581, 443, 8883, 5671 };
            string imageName = "dev_iot_edge";

            var localDockerSocket = RuntimeInformation.IsOSPlatform(OSPlatform.Windows) ?
                @"npipe://./pipe/docker_engine" :
                @"unix:/var/run/docker.sock";

            var dockerClient = new DockerClientConfiguration(new Uri(localDockerSocket))
                .CreateClient();

            Console.WriteLine($"Downloading the latest image:{deviceContainerImage}..");

            await dockerClient.Images.CreateImageAsync(
                new ImagesCreateParameters
                {
                    FromImage = deviceContainerImage,
                    Tag = "latest"
                },
                new AuthConfig(),
                new Progress<JSONMessage>((e) =>
                {
                    if (!string.IsNullOrEmpty(e.Status))
                        Console.Write($"{e.Status}");
                    if (!string.IsNullOrEmpty(e.Status))
                        Console.Write($"{e.ProgressMessage}");
                    if (!string.IsNullOrEmpty(e.Status))
                        Console.Write($"{e.ErrorMessage}");
                    Console.WriteLine("");
                }));

            var containers = await dockerClient.Containers
                .ListContainersAsync(new ContainersListParameters() { All = true });

            foreach (var _container in containers)
            {
                if (_container.Names.Contains(imageName) || 
                    _container.Names.Contains($@"/{imageName}"))
                {
                    if (_container.State == "running")
                    {
                        Console.WriteLine($"Stopping container {_container.ID}..");
                        await dockerClient.Containers.StopContainerAsync(_container.ID,
                            new ContainerStopParameters());
                    }
                    Console.WriteLine($"Removing container {_container.ID}..");
                    await dockerClient.Containers.RemoveContainerAsync(_container.ID,
                        new ContainerRemoveParameters());
                    break;
                }
            }

            Console.WriteLine($"Creating {imageName} container..");
            var container = await dockerClient.Containers
                .CreateContainerAsync(new CreateContainerParameters
            {
                AttachStderr = true,
                AttachStdin = true,
                AttachStdout = true,
                Tty = true,
                Env = new List<string>() { $"connectionString={deviceConnectionString}" },

                Name = imageName,
                Image = deviceContainerImage,
                ExposedPorts = exposedPorts
                    .ToDictionary(x => x.ToString(), x => default(EmptyStruct)),
                HostConfig = new HostConfig
                {
                    Privileged = true,
                    PortBindings = exposedPorts.ToDictionary(
                        x => x.ToString(),
                        x => (IList<PortBinding>)new List<PortBinding> {
                            new PortBinding {
                                HostPort = x.ToString()
                            }
                        }),
                    PublishAllPorts = true
                }
            });
            Console.WriteLine($"Starting {imageName} container..");
            var startResult = await dockerClient.Containers.StartContainerAsync(
                container.ID, null);

            if (!startResult)
                throw new Exception($"Cound not start the {imageName} container!");

            Console.WriteLine("Done.");
        }
        public static string CreateDevelopmentManifest(string template)
        {
            var templateContent = JsonConvert
                .DeserializeObject<ConfigurationContent>(template);
            
            var agentDesired = JObject.FromObject(
                   templateContent.ModulesContent["$edgeAgent"]["properties.desired"]);

            if (!agentDesired.TryGetValue("modules", out var modulesSection))
                throw new Exception("Cannot read modules config from $edgeAgent");

            foreach (var module in modulesSection as JObject)
            {
                var moduleSettings = JObject.FromObject(modulesSection[module.Key]["settings"]);
                moduleSettings["image"] = "wardsco/sleep:latest";
                modulesSection[module.Key]["settings"] = moduleSettings;
            }
            agentDesired["modules"] = modulesSection;
            templateContent.ModulesContent["$edgeAgent"]["properties.desired"]
                = agentDesired;


            return JsonConvert.SerializeObject(templateContent, Formatting.Indented,
                new JsonSerializerSettings
                {
                    NullValueHandling = NullValueHandling.Ignore
                });
        }
        public static async Task<string> ProvisionDeviceAsync(
            string deviceId,
            string manifest,
            string ioTHubConnectionString)
        {
            var registryManager = RegistryManager
                .CreateFromConnectionString(ioTHubConnectionString);

            var hostName = ioTHubConnectionString.Split(";")
                .SingleOrDefault(e => e.Contains("HostName="));

            if (string.IsNullOrEmpty(hostName))
                throw new ArgumentException(
                    $"Invalid ioTHubConnectionString: {ioTHubConnectionString}");
            hostName = hostName.Replace("HostName=", "");

            var device = await registryManager.GetDeviceAsync(deviceId) ??
                await registryManager.AddDeviceAsync(
                    new Device(deviceId)
                    {
                        Capabilities = new DeviceCapabilities() { IotEdge = true }
                    });

            var sasKey = device.Authentication.SymmetricKey.PrimaryKey;

            var manifestContent = JsonConvert
                .DeserializeObject<ConfigurationContent>(manifest);

            // remove all old modules
            foreach (var oldModule in await registryManager.GetModulesOnDeviceAsync(deviceId))
                if (!oldModule.Id.StartsWith("$"))
                    await registryManager.RemoveModuleAsync(oldModule);
            // create new modules
            foreach (var module in manifestContent.ModulesContent.Keys)
                if (!module.StartsWith("$"))
                    await registryManager.AddModuleAsync(new Module(deviceId, module));

            await registryManager
                .ApplyConfigurationContentOnDeviceAsync(deviceId, manifestContent);

            return $"HostName={hostName};DeviceId={deviceId};SharedAccessKey={sasKey}";
        }
    }
}

We need access to a running edge daemon and an edgeHub that our code will communicate with. This communication is secure, and because of that, we need first to create the module identity to use. To create this module identity, we simply need to describe a module in the deployment manifest.

Here are the contents of the application manifest.json:

I prefer putting this file at the root directory level of the solution.
{
	"modulesContent": {
		"$edgeAgent": {
			"properties.desired": {
				"schemaVersion": "1.0",
				"runtime": {
					"type": "docker",
					"settings": {
						"minDockerVersion": "v1.25",
						"loggingOptions": "",
						"registryCredentials": {
						}
					}
				},
				"systemModules": {
					"edgeAgent": {
						"type": "docker",
						"settings": {
							"image": "mcr.microsoft.com/azureiotedge-agent:1.0.9",
							"createOptions": ""
						}
					},
					"edgeHub": {
						"type": "docker",
						"status": "running",
						"restartPolicy": "always",
						"settings": {
							"image": "mcr.microsoft.com/azureiotedge-hub:1.0.9",
							"createOptions": "{\"HostConfig\":{\"PortBindings\":{\"443/tcp\":[{\"HostPort\":\"443\"}],\"5671/tcp\":[{\"HostPort\":\"5671\"}],\"8883/tcp\":[{\"HostPort\":\"8883\"}]}}}"
						}
					}
				},
				"modules": {
					"SimulatedTemperatureSensor": {
						"version": "1.0",
						"type": "docker",
						"status": "running",
						"restartPolicy": "always",
						"settings": {
							"image": "your.azurecr.io/simulatedtemperaturesensor:latest",
							"createOptions": "{}"
						}
					}
				}
			}
		},
		"$edgeHub": {
			"properties.desired": {
				"schemaVersion": "1.0",
				"routes": {
					"allToIoTHub": "FROM /* INTO $upstream"
				},
				"storeAndForwardConfiguration": {
					"timeToLiveSecs": 10
				}
			}
		},
		"SimulatedTemperatureSensor": {
			"properties.desired": {
				"SendInterval": 1
			}
		}
	}
}
Note: we can modify this manifest to match our application requirements, for example, add more modules, define module twins, change the routes etc. The tool will simply replace the modules' container image with a mock container.

Done! Let's run this console application passing the required arguments:

dotnet run ../manifest.json dev_device "IOT HUB OWNER CONNECTION STRING"

The effect of this execution is that we have a development IoT Edge device running in a local container, called dev_iot_edge. This container exposes the edge daemon and the edgeHub's ports to our host. All we have to do now us connect to it from our module.

To do that, we need to set some environment variables so that the IoT SDK can connect securely to the emulator. These variable have the prefix IOTEDGE_ and we can get them from inside the running mock module of our emulator:

docker exec dev_iot_edge bash -c "docker exec SimulatedTemperatureSensor env | grep IOTEDGE_"
Lysis

We need to override the IOTEDGE_GATEWAYHOSTNAME with our host's name, and change the IP of the IOTEDGE_WORKLOADURI with 127.0.0.1.

Setting these environment variables in our application will allow us to connect to the running emulator!

In practice, it's very easy to automate this process, with this helper function:

namespace SimulatedTemperatureSensor
{
    using System;
    using System.Diagnostics;
    using System.Linq;
    using System.Net;
    internal static class Utilities
    {
        internal static void InjectIoTEdgeVariables(string containerName)
        {
            var dockerCommand =
                $"docker exec dev_iot_edge bash -c \"docker exec {containerName} env | grep IOTEDGE_\"";
            Process p = new Process()
            {
                StartInfo = new ProcessStartInfo(dockerCommand)
                {
                    FileName = "cmd.exe",
                    Arguments = $"/C {dockerCommand}",
                    RedirectStandardError = true,
                    RedirectStandardOutput = true,
                    UseShellExecute = false,
                    CreateNoWindow = true
                },
            };
            p.Start();
            p.WaitForExit();

            var output = p.StandardOutput.ReadToEnd();
            var lines = output.Split(new[] { "\n" }, StringSplitOptions.None)
                .Where(e => e != null && e.Contains("="));

            var variables = lines.ToDictionary(e => e.Split("=")[0], e => e.Split("=")[1]);

            // Overwrite these settigns
            variables["IOTEDGE_WORKLOADURI"] = "http://127.0.0.1:15581/";
            variables["IOTEDGE_GATEWAYHOSTNAME"] = Dns.GetHostName();
            foreach (var variable in variables)
            {
                Environment.SetEnvironmentVariable(variable.Key, variable.Value);
                Console.WriteLine($"Injected {variable.Key}={variable.Value}");
            }
        }
    }
}

Then, when we're in the Emulated environment, we can automatically call this method from our application start up.

namespace SimulatedTemperatureSensor
{
    using System.Threading.Tasks;
    using Microsoft.Extensions.Configuration;
    using Microsoft.Extensions.DependencyInjection;
    using Microsoft.Extensions.Hosting;
    using Serilog;

    class Program
    {
        static async Task Main(string[] args)
        {
            using (var host = Host.CreateDefaultBuilder(args)
                 .ConfigureServices((hostContext, services) =>
                 {
                     if (hostContext.HostingEnvironment.EnvironmentName == "Emulated")
                         Utilities.InjectIoTEdgeVariables("SimulatedTemperatureSensor");

                     services.AddSingleton<IModuleClient, ModuleClientWrapper>();
                     services.AddHostedService<TemperatureSensorModule>();
                 })
                 .UseSerilog((hostingContext, log) =>
                 {
                     log.ReadFrom.Configuration(hostingContext.Configuration);
                 })
                 .UseConsoleLifetime()
                 .Build())
            {
                await host.RunAsync();
            }
        }
    }
}
Note: you don't need to design your IoT Edge module project in the specific structure we used, even the monolithic C# template works with this approach. In fact, all supported programming languages will work!

Module Container

After completing our implementation and testing, we need to build the module container image. This is easily done, both in VS Code and Visual Studio.

In Visual Studio:

Lysis

In VS Code:

Lysis

Both tools will automatically populate the required Dockerfile, and will setup the build and run tasks. Simply hit CTRL+SHIFT+B to trigger the image build.

You can even debug the same application running inside a Linux container! This is very useful when we have system dependencies that we cannot install on our host.

Last step is to push this container to your container registry and update the manifest accordingly (currently set to your.azurecr.io )

You need to include the container registry credentials if any.

The entire application is published in this GitHub repo.

Recap

We saw how to setup an F5 development experience without using any tools, but just Docker and our C# code. This approach allows us to keep the same favorite development environment of our preference, without making any compromises.

]]>
<![CDATA[Genesis]]>https://havedatawilltrain.com/genesis/5e9dd9c7d47c36006e770c3fWed, 22 Apr 2020 23:29:14 GMT

In this blog post mini-series we have been exploring the gradual evolution of an IoT Edge application, starting from a tightly-coupled monolithic approach based on the default C# module template, to a more loosely coupled and composable application. In this post we will see how we can use this design to setup a true F5 development experience.

In the process of evolving our IoT Edge the application, we explored many best practices on various trivial IoT Edge topics, like controlling the application lifetime, logging, configuration, telemetry pumps and more. Now we will use an IoT Edge application example to see how all these things come together with setting up a flexible development environment. Although no prior knowledge is required to follow through, to get a better understanding of the design decisions, a review of the the previous posts is highly recommended.

Improving the CLI-based current development experience

In the past years we've witnessed the re-emergence of CLI-oriented development, where people use various CLIs to compose, compile, containerize and publish their IoT Edge application. As an effect of this, today we can find a CLI for every single thing. And although these CLIs were meant to replace opinionated and platform-specific IDEs, like Visual Studio, these CLIs ended up getting integrated with development tools like VS Code, hidden behind their UI.

Specifically for the Azure IoT Edge development experience, the tooling story has been imperfect. It is good for getting an introductory understanding of the technology, but a better approach is required for building an application beyond the Hello World level complexity. In the following example, we will take a code-first approach: we will include most of the development tooling inside the application source code, so that people can get started with just cloning a repo.

Simulated Temperature Sensor v2.0

Let's say we want to build the runtime's simulated temperature sensor example from scratch, without any prior setup, starting from a clean Linux, macOS or Windows installation; I prefer using Visual Studio on Windows.

  1. Install the latest .NET Core SDK.
  2. Install the latest VS Code.
  3. Create a folder:
    mkdir SimulatedTemperatureSensor
  4. Create a new console app in this folder:
    cd SimulatedTemperatureSensor
    dotnet new console
  5. Install the following packages:
    dotnet add package Microsoft.Azure.Devices.Client
    dotnet add package Microsoft.Extensions.Hosting
    dotnet add package Serilog.Extensions.Hosting
    dotnet add package Serilog.Settings.Configuration
    dotnet add package Serilog.Sinks.Console
    dotnet add package Serilog.Enrichers.Thread
  6. Open the project in VS Code:
    code .
  7. Replace the Program.cs with the following code:
namespace SimulatedTemperatureSensor
{
    using System.Threading.Tasks;
    using Microsoft.Extensions.DependencyInjection;
    using Microsoft.Extensions.Hosting;
    using Serilog;

    class Program
    {
        static async Task Main(string[] args)
        {
            using (var host = Host.CreateDefaultBuilder(args)
                .ConfigureServices((hostContext, services) =>
                {
                    if(hostContext.HostingEnvironment.EnvironmentName == "Development")
                        services.AddSingleton<IModuleClient, MockModuleClientWrapper>();
                    else
                        services.AddSingleton<IModuleClient, ModuleClientWrapper>();

                    services.AddHostedService<TemperatureSensorModule>();
                })
                .UseSerilog((hostingContext, log) =>
                {
                    log.ReadFrom.Configuration(hostingContext.Configuration);
                })
                .UseConsoleLifetime()
                .Build())
            {
                await host.RunAsync();
            }
        }
    }
}
Note that we switch from ModuleClientWrapper to MockModuleClientWrapper when in development. This is controlled by the environment variable with name DOTNET_ENVIRONMENT.  
I'm using the  launchSettings.json  to set any variables at launch.

We've seen this code in the previous post, and in the one before that, we saw how to define an abstraction interface for the SDK's ModuleClient so that we can use it in a dependency injection setup. Next, we will add this IModuleClient interface and the two ModuleClientWrapper and MockModuleClientWrapper classes:

namespace SimulatedTemperatureSensor
{
    using System.Threading;
    using System.Threading.Tasks;
    using Microsoft.Azure.Devices.Shared;
    using Microsoft.Azure.Devices.Client;
    
    public interface IModuleClient
    {
        Task OpenAsync(CancellationToken cancellationToken);
        Task CloseAsync(CancellationToken cancellationToken);
        Task SendEventAsync(string outputName,
            Message message);
        Task SetInputMessageHandlerAsync(string inputName,
            MessageHandler messageHandler,
            object userContext);
        Task SetMethodHandlerAsync(string methodName,
            MethodCallback methodHandler,
            object userContext);
        Task<Twin> GetTwinAsync(CancellationToken cancellationToken);
        Task<Twin> GetTwinAsync();
    }
}

and the class ModuleClientWrapper:

namespace SimulatedTemperatureSensor
{
    using Microsoft.Azure.Devices.Client;
    using Microsoft.Azure.Devices.Client.Transport.Mqtt;
    using Microsoft.Azure.Devices.Shared;
    using Microsoft.Extensions.Configuration;
    using Microsoft.Extensions.Logging;
    using System.Threading;
    using System.Threading.Tasks;

    public class ModuleClientWrapper : IModuleClient
    {
        readonly ModuleClient moduleClient;
        readonly ILogger logger;

        public ModuleClientWrapper(IConfiguration configuration,
            ILogger<ModuleClientWrapper> logger)
        {
            this.logger = logger;

            var transportType = 
                configuration.GetValue("ClientTransportType", TransportType.Amqp_Tcp_Only);

            ITransportSettings[] settings = { new MqttTransportSettings(transportType) };

            moduleClient = ModuleClient.CreateFromEnvironmentAsync(settings).Result;
        }

        public async Task SendEventAsync(string outputName, Message message)
        {
            await moduleClient.SendEventAsync(outputName, message);
        }

        public async Task SetInputMessageHandlerAsync(string inputName,
            MessageHandler messageHandler,
            object userContext)
        {
            await moduleClient.SetInputMessageHandlerAsync(inputName,
                messageHandler,
                userContext);
        }

        public async Task SetMethodHandlerAsync(string methodName,
            MethodCallback methodHandler,
            object userContext)
        {
            await moduleClient.SetMethodHandlerAsync(methodName,
                methodHandler,
                userContext);
        }
        public async Task OpenAsync(CancellationToken cancellationToken)
        {
            await moduleClient.OpenAsync(cancellationToken);
        }
        public async Task CloseAsync(CancellationToken cancellationToken)
        {
            await moduleClient.CloseAsync(cancellationToken);
        }
        public async Task<Twin> GetTwinAsync(CancellationToken cancellationToken)
        {
            return await moduleClient.GetTwinAsync(cancellationToken);
        }
        public async Task<Twin> GetTwinAsync()
        {
            return await moduleClient.GetTwinAsync();
        }
    }
}

And finally here's a mock implementation of the above class. The purpose of this class is to simulate the IoT Edge Runtime behavior for development and testing purposes.

namespace SimulatedTemperatureSensor
{
    using System;
    using System.Collections.Generic;
    using System.Threading;
    using System.Threading.Tasks;
    using Microsoft.Azure.Devices.Client;
    using Microsoft.Azure.Devices.Shared;
    using Microsoft.Extensions.Hosting;
    using Microsoft.Extensions.Logging;

    internal class MockModuleClientWrapper : IModuleClient
    {
        readonly ILogger logger;
        readonly IHostApplicationLifetime application;

        readonly TaskTimer taskTimer;
        readonly Dictionary<string, List<Message>> messageQueues;
        readonly Dictionary<string, ValueTuple<MessageHandler, object>> inputMessageHandlers;
        readonly Dictionary<string, MethodCallback> methodMessageHandlers;

        public MockModuleClientWrapper(IHostApplicationLifetime application,
            ILogger<MockModuleClientWrapper> logger)
        {
            this.logger = logger;
            this.application = application;

            messageQueues = new Dictionary<string, List<Message>>();
            inputMessageHandlers = new Dictionary<string, (MessageHandler, object)>();
            methodMessageHandlers = new Dictionary<string, MethodCallback>();

            taskTimer = new TaskTimer(OnTimer, TimeSpan.FromSeconds(1), logger);
        }

        private void OnTimer()
        {
            lock (messageQueues)
                foreach (var queue in messageQueues)
                {
                    if (inputMessageHandlers.ContainsKey(queue.Key))
                        foreach (var message in queue.Value)
                            inputMessageHandlers[queue.Key].Item1(message, inputMessageHandlers[queue.Key].Item2);
                    messageQueues[queue.Key].Clear();
                }
            // TODO: Process method messsages too
        }

        public Task SendEventAsync(string outputName, Message message)
        {
            lock (messageQueues)
            {
                if (!messageQueues.ContainsKey(outputName))
                    messageQueues[outputName] = new List<Message>();
                messageQueues[outputName].Add(message);
            }
            logger.LogInformation($"Message Sent to {outputName}");
            return Task.CompletedTask;
        }

        public Task SetInputMessageHandlerAsync(string inputName, MessageHandler messageHandler, object userContext)
        {
            inputMessageHandlers[inputName] = (messageHandler, userContext);
            logger.LogInformation($"Message Handler Set for {inputName}");
            return Task.CompletedTask;
        }

        public Task SetMethodHandlerAsync(string methodName, MethodCallback methodHandler, object userContext)
        {
            methodMessageHandlers[methodName] = methodHandler;
            logger.LogInformation($"Method Handler Set for {methodName}");
            return Task.CompletedTask;
        }

        public Task OpenAsync(CancellationToken token)
        {
            logger.LogInformation("Opened ModuleClient");
            taskTimer.Start(application.ApplicationStopping);
            return Task.CompletedTask;
        }

        public Task CloseAsync(CancellationToken token)
        {
            logger.LogInformation("Closed ModuleClient");
            return Task.CompletedTask;
        }

        public Task<Twin> GetTwinAsync(CancellationToken cancellationToken)
        {
            logger.LogInformation("GetTwinAsync");
            return Task.FromResult<Twin>(null);
        }

        public Task<Twin> GetTwinAsync()
        {
            logger.LogInformation("GetTwinAsync");
            return Task.FromResult<Twin>(null);
        }
    }
}

We have almost everything in place. The last missing piece is to implement the TemperatureSensorModule class, the class that will contain the business logic of our IoT Edge application. This module will periodically emit messages, simulating a temperature sensor.

Following the same approach we did last time, this class will implement the IHostedService. We will include an IHostApplicationLifetime dependency to gain control on the application lifetime and trigger a restart in case something goes terribly wrong. Finally, we'll use an updated version of the TaskTimer we saw in a previous post to drive the telemetry loop:

Here's the TaskTimer class:

namespace SimulatedTemperatureSensor
{
    using System;
    using System.Threading;
    using System.Threading.Tasks;
    using Microsoft.Extensions.Logging;

    public class TaskTimer
    {
        readonly Action onTimer;
        readonly TimeSpan timerPeriod;
        readonly Action onError;
        readonly ILogger logger;

        public TaskTimer(Action onTimer,
            TimeSpan timerPeriod,
            ILogger logger,
            Action onError = null)
        {
            this.timerPeriod = timerPeriod;
            this.onTimer = onTimer;
            this.logger = logger;
            this.onError = onError;
        }

        public void Start(CancellationToken token)
        {
            Task elapsedTask = null;
            elapsedTask = new Task((x) =>
            {
                OnTimer(elapsedTask, token);
            }, token);

            HandleError(elapsedTask, token);

            elapsedTask.Start();
        }

        private void OnTimer(Task task, object objParam)
        {
            var start = DateTime.Now;
            var token = (CancellationToken)objParam;

            if (token.IsCancellationRequested)
            {
                logger.LogInformation("A cancellation has been requested.");
                return;
            }

            onTimer();

            var delay = timerPeriod - (DateTime.Now - start);
            if (delay.Ticks > 0)
            {
                task = Task.Delay(delay);
            }
            HandleError(task.ContinueWith(OnTimer, token), token);
        }

        private void HandleError(Task task, CancellationToken token)
        {
            task.ContinueWith((e) =>
            {
                logger.LogError(
                    $"Exception when running timer callback: {e.Exception}");

                onError?.Invoke();
                if (!token.IsCancellationRequested)
                    task.ContinueWith(OnTimer, token);

            }, TaskContinuationOptions.OnlyOnFaulted);
        }
    }
}

Here's the code of the TemperatureSensorModule:


namespace SimulatedTemperatureSensor
{
    using Microsoft.Azure.Devices.Client;
    using Microsoft.Extensions.Configuration;
    using Microsoft.Extensions.Hosting;
    using Microsoft.Extensions.Logging;
    using System;
    using System.Text;
    using System.Threading;
    using System.Threading.Tasks;

    public class TemperatureSensorModule : IHostedService
    {
        const string SAMPLING_PERIOD_CONFIG_NAME = "TelemetryPeriodSeconds";
        const int SAMPLING_PERIOD_DEFAULT = 1;
        
        readonly IModuleClient moduleClient;
        readonly ILogger logger;
        readonly IHostApplicationLifetime application;
        
        readonly Random random = new Random();
        readonly TaskTimer telemetryPump;

        public TemperatureSensorModule(IModuleClient moduleClient,
            IConfiguration config,
            IHostApplicationLifetime application, 
            ILogger<TemperatureSensorModule> logger)
        {
            this.moduleClient = moduleClient;
            this.logger = logger;
            this.application = application;

            var period = TimeSpan.FromSeconds(
                config.GetValue(SAMPLING_PERIOD_CONFIG_NAME, SAMPLING_PERIOD_DEFAULT));

            telemetryPump = new TaskTimer(OnTimer, period, logger, application.StopApplication);
            
            application.ApplicationStopping.Register(() =>
            {
                logger.LogWarning("Stop-draining application for 3 seconds...");
                Task.Delay(TimeSpan.FromSeconds(3)).Wait();
            });
        }

        private async void OnTimer()
        {
            await moduleClient.SendEventAsync("telemetry",
                new Message(Encoding.UTF8.GetBytes($"Current temperature: {random.Next(0, 100)}")));
        }

        public async Task StartAsync(CancellationToken cancellationToken)
        {
            await moduleClient.OpenAsync(cancellationToken);
            telemetryPump.Start(application.ApplicationStopping);

            logger.LogInformation("Started.");
        }

        public async Task StopAsync(CancellationToken cancellationToken)
        {
            await moduleClient.CloseAsync(cancellationToken);
            logger.LogInformation("Stopped.");
        }
    }
}

This is the project appSettings.json content that contains just our Serilog config:

{
  "Serilog": {
    "Using": [ "Serilog.Sinks.Console" ],
    "MinimumLevel": {
      "Default": "Debug"
    },
    "WriteTo": [
      {
        "Name": "Console",
        "Args": {
          "outputTemplate": "{Timestamp:yyyy-MM-dd HH:mm:ss.fff zzz} [{Level:u3}] [{SourceContext}] [{ThreadId}] - {Message}{NewLine}{Exception}"
        }
      }
    ],
    "Enrich": [ "FromLogContext", "WithThreadId" ]
  }
}

This the project file structure so far:

Genesis

Our first version is ready. Set the DOTNET_ENVIRONMENT environment variable to Development and hit F5 to run the application. Hit CTRL+C to gracefully stop it.

Genesis

Recap

Following this approach, we are allowing ourselves to develop most the Edge application as a console app, using the most easy F5 development experience, without any prior setup.

Next, we will see how we can gradually transform this standalone development environment to becoming our Azure IoT Edge development environment, without sacrificing any flexibility once so ever!

]]>
<![CDATA[The Man Who Wouldn't Test]]>https://havedatawilltrain.com/the-man-who-wouldnt-test/5e98d5add47c36006e770848Sat, 18 Apr 2020 00:28:49 GMT

In the last post we refactored the default Azure IoT Edge Module C# template code with a couple of dependency abstractions. In this post we will add a dependency injection mechanism in the mix and we will evolve this application example to a level that can be a good starting point for every Azure IoT Edge application.

In the final version of the Program.cs of the previous post, we reached a point where we could compose the dependencies together and create a business instance of the PipeModule. A simplified summarization of this composition is:

IConfiguration configuration = new ConfigurationBuilder()
    .AddJsonFile("appsettings.json", optional: true)
    .AddEnvironmentVariables()
    .AddCommandLine(args)
    .Build();

var cancellationTokenSource = new CancellationTokenSource();

IModuleClient moduleClient = new ModuleClientWrapper();

IDatabaseClient databaseClient = 
	new DatabaseClientWrapper(configuration);

PipeModule pipeModule = new PipeModule(moduleClient, databaseClient);

In this dependency hierarchy, we had to explicitly pass the previously created instances to create more complex ones, all the way to creating the PipeModule. Although this approach is far better than the monolithic approach of the original template code, we still have hardcoded dependencies: to replace a dependency, we would still need to change this code, recompile and redeploy.

Wouldn't be nice if there was a mechanism that satisfied this hierarchy dependency composition dynamically, so that we can drive our changes via the configuration?

The Microsoft.Extensions.Hosting namespace contains many utility classes like the HostBuilder. This library is a good starting point because it references all other required libraries like the Microsoft.Extensions.Configuration, Microsoft.Extension.DependencyInjection and Microsoft.Extension.Logging and brings together all these abstractions and functionality under a few simple interfaces.

Note: to install this extension library, run:
dotnet add package Microsoft.Extensions.Hosting

In contrast to the Azure IoT SDK, this library excels in distinguishing the application from the host. The HostBuilder is a very useful utility that helps us to compose together our application services and our hosting infrastructure. A service is a component of our application, an element in our application dependency hierarchy, and the hosting infrastructure is anything specific to the hosting environment, like for example the environment variables configuration.

Let's now refactor our previous example by using the HostBuilder to configure our dependencies. We'll start first by implementing the IHostedService interface in our PipeModule class. This interface defines just two methods:

public interface IHostedService
{
    Task StartAsync(CancellationToken cancellationToken);
    Task StopAsync(CancellationToken cancellationToken);
}

And yes, the PipeModule from the last post already implements these two methods, so we should just add this interface in the class signature:

public class PipeModule : IHostedService

Now, we can replace the entire Program class with this code:

class Program
{
    static async Task Main(string[] args)
    {
        // Read the configuration
        IConfiguration configuration = new ConfigurationBuilder()
                .AddJsonFile("appsettings.json", optional: true)
                .AddEnvironmentVariables()
                .AddCommandLine(args)
                .Build();

        // Create the cancelation token source
        var cancellationTokenSource = new CancellationTokenSource();

        // Register to the application lifetime events
        AssemblyLoadContext.Default.Unloading +=
        (cts) => cancellationTokenSource.Cancel();

        Console.CancelKeyPress +=
            (sender, cts) =>
            {
                Console.WriteLine("Ctrl+C detected.");
                cts.Cancel = true;
                cancellationTokenSource.Cancel();
            };

        // Build the host
        using (var host = new HostBuilder()
            .ConfigureServices((hostContext, services) =>
            {
                services.AddSingleton(cancellationTokenSource);
                services.AddSingleton<IModuleClient, ModuleClientWrapper>();
                services.AddSingleton<IDatabaseClient, DatabaseClientWrapper>();
                services.AddHostedService<PipeModule>();
            })
            .Build())
        {

            // Start the application
            host.Start();

            // Wait until the app unloads or is cancelled
            await WhenCancelled(cancellationTokenSource.Token);
        }

    }
    public static Task WhenCancelled(CancellationToken cancellationToken)
    {
        var taskCompletionSource = new TaskCompletionSource<bool>();
        cancellationToken.Register(
            s => ((TaskCompletionSource<bool>)s).SetResult(true),
            taskCompletionSource);
        return taskCompletionSource.Task;
    }
}
Note: the AddSingleton is used when we want to add a service that will be unique for the entire application. Singletons become the classes that carry state or identity, like the cancellationTokenSource and IModuleClient respectively.
Note: the AddHostedService is used for services that need to be started by the host.

We already saw how to control the application lifetime with the CancellationTokenSource in this post. The truth is that in the early versions of .NET Core, the AppDomain.CurrentDomain.ProcessExit event was replaced by the AssemblyLoadContext.Default.Unloading event, but now both of them exist and can be used interchangeably.

In the IHostBuilder we can find the UseConsoleLifetime() extension method, which simply adds an IHostLifetime service which is suitable for console apps, the Microsoft.Extensions.Hosting.Internal.ConsoleLifetime service. The IHostLifetime services control when the host starts and when it stops.

This ConsoleLifetime service:

Let's refactor the above code now using this lifecycle service:

class Program
{
    static async Task Main(string[] args)
    {
        // Read the configuration
        IConfiguration configuration = new ConfigurationBuilder()
                .AddJsonFile("appsettings.json", optional: true)
                .AddEnvironmentVariables()
                .AddCommandLine(args)
                .Build();

        // Build the host
        using (var host = new HostBuilder()
            .ConfigureServices((hostContext, services) =>
            {
                services.AddSingleton<IModuleClient, ModuleClientWrapper>();
                services.AddSingleton<IDatabaseClient, DatabaseClientWrapper>();
                services.AddHostedService<PipeModule>();
            })
            .UseConsoleLifetime()
            .Build())
        {
            // Run the application and wait for the application to exit
            await host.RunAsync();
        }
    }
}
What do you know! After seven posts, we're back to a single-page long Main!

In the above version, instead of passing the CancellationTokenSource to our services to signal an application termination, we can inject the IHostApplicationLifetime service:

public DatabaseClientWrapper(IHostApplicationLifetime hostApplicationLifetime)

Then, we can register to the ApplicationStoppingevent to gracefully stop the service, and call the StopApplication() to trigger an application termination from inside the service.

A very good description of the startup and shutdown sequence can be found here  

We've also analyzed the difference between the application and the system configuration, and we examined a few best practices on managing this configuration. Similarly to the UseConsoleLifetime() extension, the ConfigureHostConfiguration and ConfigureAppConfiguration extension methods are there to help us read the application and the host configuration.

Let's see how this looks like:

class Program
{
    static async Task Main(string[] args)
    {
        // Build the host
        using (var host = new HostBuilder()
            .ConfigureHostConfiguration(configHost =>
            {
                configHost.SetBasePath(Directory.GetCurrentDirectory());
                configHost.AddJsonFile("hostsettings.json", optional: true);
                configHost.AddEnvironmentVariables(prefix: "PREFIX_");
                configHost.AddCommandLine(args);
            })
            .ConfigureAppConfiguration((hostContext, configApp) =>
            {
                configApp.AddJsonFile("appsettings.json", optional: true);
                configApp.AddJsonFile(
                    $"appsettings.{hostContext.HostingEnvironment.EnvironmentName}.json",
                    optional: true);
                configApp.AddEnvironmentVariables();
                configApp.AddCommandLine(args);
            })
            .ConfigureServices((hostContext, services) =>
            {
                services.AddSingleton<IModuleClient, ModuleClientWrapper>();
                services.AddSingleton<IDatabaseClient, DatabaseClientWrapper>();
                services.AddHostedService<PipeModule>();
            })
            .UseConsoleLifetime()
            .Build())
        {
            // Run the application and wait for the application to exit
            await host.RunAsync();
        }
    }
}
Note that when we read the application configuration in the ConfigureAppConfiguration, we have access to the host configuration via the hostContext.Configuration property.

The application configuration now will automatically be injected to any service that depends on it, like the DatabaseClientWrapper we implemented in the last post.

A final step is to configure and hook up our logging logic. One alternative is to use the corresponding extension method, like for example:

.ConfigureLogging((logging) =>
{
    logging.AddFilter("Microsoft", LogLevel.Warning);
    logging.AddFilter("System", LogLevel.Warning);
    logging.AddFilter("DependencyInjection.Program", LogLevel.Debug);
    logging.AddConsole();
    logging.AddDebug();
    logging.AddEventSourceLogger();
    logging.AddEventLog();
})

If we do this, we've just replicated the behavior of the .NET Generic Host, which is just a pre-defined host configuration, as described here.

Then our code can be simplified to:

class Program
{
    static async Task Main(string[] args)
    {
        // Build default host
        using (var host = Host.CreateDefaultBuilder(args)
            .ConfigureServices((hostContext, services) =>
            {
                services.AddSingleton<IModuleClient, ModuleClientWrapper>();
                services.AddSingleton<IDatabaseClient, DatabaseClientWrapper>();
                services.AddHostedService<PipeModule>();
            })
            .UseConsoleLifetime()
            .Build())
        {
            // Run the application and wait for the application to exit
            await host.RunAsync();
        }
    }
}

Alternatively, we can use a Serilog logger, driven by configuration. To do this, we will add the Serilog.Extensions.Hosting nuget package:

class Program
{
    static async Task Main(string[] args)
    {
        // Build default host
        using (var host = Host.CreateDefaultBuilder(args)
            .ConfigureServices((hostContext, services) =>
            {
                // Configure our services
                services.AddSingleton<IModuleClient, ModuleClientWrapper>();
                services.AddSingleton<IDatabaseClient, DatabaseClientWrapper>();
                services.AddHostedService<PipeModule>();
            })
            .UseSerilog((hostingContext, log) =>
            {
                log.ReadFrom.Configuration(hostingContext.Configuration);
            })
            .UseConsoleLifetime()
            .Build())
        {
            // Run the application and wait for the application to exit
            await host.RunAsync();
        }
    }
}

By doing so, we get the expected logger injection behavior by adding a logger dependency in a constructor:

public DatabaseClientWrapper(ILogger<DatabaseClientWrapper> logger)

Also, the Serilog.Log.Logger is set implicitly, and can be accessed from everywhere by the static Serilog.Log property.

The Man Who Wouldn't Test

It is impressive how much plumbing code is hidden in this last version, but without losing the ability to override the default host builder if we want to, as we did with the Serilog logger.

Recap

In these mini-series blog posts we have explored various patterns and best practices and we managed to evolve the monolithic Azure IoT Edge module C# template code to a loosely coupled application that allows us have a true F5 development experience and define unit tests.

In the next post we will explore how we can use this loosely coupled design to setup our development environment.

]]>
<![CDATA[The Reasonable Man]]>https://havedatawilltrain.com/the-reasonable-man/5e95fe62e73bbc006e59f48eThu, 16 Apr 2020 21:52:33 GMT

So far, in this mini-series we have explored various approaches on some standard issues that every IoT Edge application needs to address. In this post we will start exploring the big picture, a complete example that is production-ready and that can act as a good starting point the Azure IoT Edge application developement.

In the previous posts we meticulously avoided introducing any technology-specific dependencies in our evolving thermostat application example, either by temporarily omitting the code, or by abstracting the various libraries' behavior behind predefined interfaces. The benefit of this is that we keep our business code technology-agnostic, and as such, we can easily test it. We have also preserved the freedom to switch platforms any time we choose to. Let's see how we can get there.

But first..

The Reasonable Man

The Azure IoT Module Template

Let's use an example to make this easier to understand. Let's assume that we want to implement an Azure IoT Edge module, starting from the default Azure IoT Edge Module template. This template is a simple .NET Core console app, with all the required IoT SDK references in it, that simply echoes all messages coming from the input input1 to the output output1. Let's assume we'd like extend this template by recording every message in a database.

The original template source code is here.

This is a modified version that includes the message database recording:

namespace DependencyInjection
{
    using System;
    using System.IO;
    using System.Runtime.InteropServices;
    using System.Runtime.Loader;
    using System.Security.Cryptography.X509Certificates;
    using System.Text;
    using System.Threading;
    using System.Threading.Tasks;
    using Microsoft.Azure.Amqp.Framing;
    using Microsoft.Azure.Devices.Client;
    using Microsoft.Azure.Devices.Client.Transport.Mqtt;

    class Program
    {
        static int counter;
        static void Main(string[] args)
        {
            Init().Wait();

            // Wait until the app unloads or is cancelled
            var cts = new CancellationTokenSource();
            AssemblyLoadContext.Default.Unloading += (ctx) => cts.Cancel();
            Console.CancelKeyPress += (sender, cpe) => cts.Cancel();
            WhenCancelled(cts.Token).Wait();
        }

        /// <summary>
        /// Handles cleanup operations when app is cancelled or unloads
        /// </summary>
        public static Task WhenCancelled(CancellationToken cancellationToken)
        {
            var tcs = new TaskCompletionSource<bool>();
            cancellationToken.Register(s => ((TaskCompletionSource<bool>)s).SetResult(true), tcs);
            return tcs.Task;
        }

        /// <summary>
        /// Initializes the ModuleClient and sets up the callback to receive
        /// messages containing temperature information
        /// </summary>
        static async Task Init()
        {
            MqttTransportSettings mqttSetting = new MqttTransportSettings(TransportType.Mqtt_Tcp_Only);
            ITransportSettings[] settings = { mqttSetting };

            // Open a connection to the Edge runtime
            ModuleClient ioTHubModuleClient = await ModuleClient.CreateFromEnvironmentAsync(settings);
            await ioTHubModuleClient.OpenAsync();
            Console.WriteLine("IoT Hub module client initialized.");

            // Initialize the database client
            var connectionString = Environment.GetEnvironmentVariable("DbConnectionString");
            DatabaseClient databaseClient = DatabaseClient.CreateFromConnectionString(connectionString);

            // Register callback to be called when a message is received by the module
            await ioTHubModuleClient.SetInputMessageHandlerAsync("input1", PipeMessage, (ioTHubModuleClient, databaseClient));
        }

        /// <summary>
        /// This method is called whenever the module is sent a message from the EdgeHub. 
        /// It just pipe the messages without any change.
        /// It prints all the incoming messages.
        /// </summary>
        static async Task<MessageResponse> PipeMessage(Message message, object userContext)
        {
            int counterValue = Interlocked.Increment(ref counter);

            var (moduleClient, databaseClient) = ((ModuleClient, DatabaseClient))userContext;
            

            byte[] messageBytes = message.GetBytes();
            string messageString = Encoding.UTF8.GetString(messageBytes);
            Console.WriteLine($"Received message: {counterValue}, Body: [{messageString}]");

            if (!string.IsNullOrEmpty(messageString))
            {
                using (var pipeMessage = new Message(messageBytes))
                {
                    foreach (var prop in message.Properties)
                    {
                        pipeMessage.Properties.Add(prop.Key, prop.Value);
                    }
                    
                    await databaseClient.RecordMessageAsync(pipeMessage);
                    
                    await moduleClient.SendEventAsync("output1", pipeMessage);

                    Console.WriteLine("Received message sent");
                }
            }
            return MessageResponse.Completed;
        }
    }
}

We've made just a few small changes to the original template code:

  1. Inside the Init() we now create our DatabaseClient instance:
// Initialize the database client
var connectionString = Environment.GetEnvironmentVariable("DbConnectionString");
DatabaseClient databaseClient = DatabaseClient.CreateFromConnectionString(connectionString);

2. Inside the PipeMessage we get the databaseClient instance from the method userContext with:

var (moduleClient, databaseClient) = ((ModuleClient, DatabaseClient))userContext;

3. In the same PipeMessage we record the message with:

await databaseClient.RecordMessageAsync(pipeMessage);

This is clearly a monolithic and tightly-coupled implementation. This code will fail if we don't have a valid connection string and the required environment variables to create the ModuleClient. Even if we did have the required environment variables, we would still need a running database and the entire IoT Edge Device Runtime in place to get it working.

So, the only way to test this PipeMessage code is

  • to setup an IoT Edge Device on your development environment
  • setup a database server on your development environment
  • build and deploy a new version of the edge application
  • either attach a debugger to the running container or start searching through the logs.

This apparently leaves unit testing out of the question.

The Reasonable Man
Welcome to 1985!

Congrats! You just traveled 35 years back in the software!

Inversion of Control

In it's simplest form, dependency injection is an abstraction strategy that allows us to decouple our business code from any technical implementation. The idea is fairly simple: for every dependency, we define an interface that describes well the dependency's public behavior. Then, for each module we define the it's dependencies at the constructor, by adding the dependency interface as constructor arguments.

A dependency is something that we use but we don't control, eg. the ModuleClient and the DatabaseClient, because their behavior depends on external processes.

Let's define these two abstractions. We'll start with the DatabaseClient class.

For the purpose of simplicity, this is an imaginary database SDK.

We start by defining a simple interface that contains all useful behavior:

public interface IDatabaseClient
{
    Task OpenAsync(CancellationToken cancellationToken);
    Task CloseAsync(CancellationToken cancellationToken);
    Task RecordMessageAsync(byte[] message);
}

Then, we define a wrapper class that implements this interface and wraps the original DatabaseClient object:

internal class DatabaseClientWrapper : IDatabaseClient
{
    const string CONNECTION_STRING_NAME = "dbConnectionString";
    DatabaseClient _databaseClient { get; }

    public DatabaseClientWrapper(IConfiguration configuration)
    {
        var connectionString =
            configuration.GetConnectionString(CONNECTION_STRING_NAME);

        _databaseClient = DatabaseClient.
                CreateFromConnectionString(connectionString);
    }
    public async Task OpenAsync(CancellationToken cancellationToken)
    {
        await _databaseClient.OpenAsync(cancellationToken);
    }

    public async Task CloseAsync(CancellationToken cancellationToken)
    {
        await _databaseClient.CloseAsync(cancellationToken);
    }

    public async Task RecordMessageAsync(byte[] message)
    {
        await _databaseClient.RecordMessageAsync(message);
    }
}
Note: this wrapper class "depends" on IConfiguration

What have we achieved? We can now replace the DatabaseClient references with IDatabaseClient, which means we don't have any more hardcoded references on this database SDK inside Program.cs.

Let's do the same for the ModuleClient. A first version of the interface can be:

public interface IModuleClient
{
    Task OpenAsync(CancellationToken cancellationToken);
    Task CloseAsync(CancellationToken cancellationToken);
    Task SendEventAsync(string outputName, 
        Message message);
    Task SetInputMessageHandlerAsync(string inputName, 
        MessageHandler messageHandler, 
        object userContext);
    Task SetMethodHandlerAsync(string methodName, 
        MethodCallback methodHandler, 
        object userContext);
    Task<Twin> GetTwinAsync(CancellationToken cancellationToken);
    Task<Twin> GetTwinAsync();
}

and the wrapper class can be:

public class ModuleClientWrapper : IModuleClient
{
    private ModuleClient ModuleClient { get; }

    public ModuleClientWrapper()
    {
        MqttTransportSettings mqttSetting = 
                new MqttTransportSettings(TransportType.Mqtt_Tcp_Only);
        ITransportSettings[] settings = { mqttSetting };

        ModuleClient = ModuleClient.CreateFromEnvironmentAsync(settings).Result;
    }

    public async Task SendEventAsync(string outputName, Message message)
    {
        await ModuleClient.SendEventAsync(outputName, message);
    }

    public async Task SetInputMessageHandlerAsync(string inputName, 
        MessageHandler messageHandler, 
        object userContext)
    {
        await ModuleClient.SetInputMessageHandlerAsync(inputName, 
            messageHandler, 
            userContext);
    }

    public async Task SetMethodHandlerAsync(string methodName, 
        MethodCallback methodHandler, 
        object userContext)
    {
        await ModuleClient.SetMethodHandlerAsync(methodName, 
            methodHandler, 
            userContext);
    }
    public async Task OpenAsync(CancellationToken cancellationToken)
    {
        await ModuleClient.OpenAsync(cancellationToken);
    }
    public async Task CloseAsync(CancellationToken cancellationToken)
    {
        await ModuleClient.CloseAsync(cancellationToken);
    }
    public async Task<Twin> GetTwinAsync(CancellationToken cancellationToken)
    {
        return await ModuleClient.GetTwinAsync(cancellationToken);
    }
    public async Task<Twin> GetTwinAsync()
    {
        return await ModuleClient.GetTwinAsync();
    }
}

After abstracting our dependencies, the final step is to isolate our business code in a new class that depends on these two interfaces.

Let's call this class PipeModule:

public class PipeModule
{
    IModuleClient _moduleClient { get; }
    IDatabaseClient _databaseClient { get; }
    public PipeModule(IModuleClient moduleClient, 
        IDatabaseClient databaseClient)
    {
        _moduleClient = moduleClient;
        _databaseClient = databaseClient;
    }
    public async Task StartAsync(CancellationToken cancellationToken)
    {
        await _moduleClient.OpenAsync(cancellationToken);
        await _databaseClient.OpenAsync(cancellationToken);

        await _moduleClient.SetInputMessageHandlerAsync("input1",
            new MessageHandler(async (message, context) =>
            {
                await _databaseClient.RecordMessageAsync(message.GetBytes());
                await _moduleClient.SendEventAsync("output1", message);
                return await Task.FromResult(MessageResponse.Completed);
            }), this);
    }
        
    public async Task StopAsync(CancellationToken cancellationToken)
    {
        await _moduleClient.CloseAsync(cancellationToken);
        await _databaseClient.CloseAsync(cancellationToken);
    }
}

This class contains some initialization and termination logic, and our piping business code, which is just these two lines:

await _databaseClient.RecordMessageAsync(message.GetBytes());
await _moduleClient.SendEventAsync("output1", message);

Now, with these abstractions in mind, let's see what the original example looks like:

class Program2
{
    static async Task Main(string[] args)
    {
        // Read the configuration
        IConfiguration configuration = new ConfigurationBuilder()
                .AddJsonFile("appsettings.json", optional: true)
                .AddEnvironmentVariables()
                .AddCommandLine(args)
                .Build();

        // Create the cancelation token source
        var cancellationTokenSource = new CancellationTokenSource();
        AssemblyLoadContext.Default.Unloading +=
            (cts) => cancellationTokenSource.Cancel();

        Console.CancelKeyPress +=
            (sender, cts) =>
            {
                Console.WriteLine("Ctrl+C detected.");
                cts.Cancel = true;
                cancellationTokenSource.Cancel();
            };

        await Init(configuration, cancellationTokenSource.Token);

        // Wait until the app unloads or is cancelled
        await WhenCancelled(cancellationTokenSource.Token);
    }
    public static Task WhenCancelled(CancellationToken cancellationToken)
    {
        var taskCompletionSource = new TaskCompletionSource<bool>();
        cancellationToken.Register(
            s => ((TaskCompletionSource<bool>)s).SetResult(true),
            taskCompletionSource);
        return taskCompletionSource.Task;
    }
    static async Task Init(IConfiguration configuration, 
        CancellationToken cancellationToken)
    {
        IModuleClient moduleClient = new ModuleClientWrapper();
        IDatabaseClient databaseClient = 
                new DatabaseClientWrapper(configuration);

        await moduleClient.OpenAsync(cancellationToken);
        await databaseClient.OpenAsync(cancellationToken);
        
        PipeModule pipeModule = new PipeModule(moduleClient, 
            databaseClient);
        await pipeModule.StartAsync(cancellationToken);

        Console.WriteLine("Module initialized.");
    }
}

Besides the PipeModule, this version includes some of the best practices from the previous posts. Although the code is much simpler, this is just an interim version, we still have a few missing things to take care of, namely, the dependency injection mechanism.

In the next post we will include a dependency injection mechanism that is based on the Microsoft.Extensions.Hosting library and we will see the benefits of doing so.

]]>
<![CDATA[The Log Hunt]]>https://havedatawilltrain.com/the-log-hunt/5e9086adca768b2661b5cbb7Tue, 14 Apr 2020 16:06:18 GMT

In the last post, we touched on the topic of the application logging. We talked about the need for separation of the configuration information into two distinct categories: application and systemic. To emphasize this, we used one of the archetypical tasks in software engineering, the diagnostic information extraction in a debugging environment.

In this post we will explore in depth the best practices of application logging and extend it in the broader IoT space.

Sources and Sinks

Historically, the various logging libraries have been inspired by the hosting operating system underlying implementation. The reason for this is somewhat obvious if seen from the perspective of the hardware (device): the operating system itself is just a collection of applications, some of them running on the kernel space like drivers, and some in the user space, like any user application. In mission-critical applications, the related logging information might span a cross-section of OS components, drivers and other applications, so having a consolidated logging mechanism makes more sense.

And although minor differences existed between the different operating systems' logging infrastructure, all of them shared the same publisher/subscriber pattern: the log stream is published by sources, and sinks can subscribe to these sources.  On Windows, the inherent logging infrastructure is called ETW and on Linux LTTng, both of them the de facto best approach for platform-specific tracing.

Nowadays, in the era of cloud computing, anything that is platform-specific sounds like a bad idea, and rightfully so, people try to avoid it. Logging in the  .NET ecosystem has traditionally been confusing, with many alternatives and no clear winner so far. But at last, we have a clear winner! Today, Serilog is the second most popular nuget package right after Newtonsoft's JSON library. This practically means that the most important logging features are supported, and even if there is any missing feature or limitation in this library compared to other options, it's most probably a matter of time for the Serilog open source community to catch-up. As of now, most of the important tracing sinks exist, like for example flat files, databases, cloud managed tracing services etc.

Now let's see how this looks in source code. Let's modify the default .NET Core console app to use Serilog. To create a new Console app in .NET Core run:

mkdir Logging
dotnet new console

dotnet add package Microsoft.Extensions.Hosting

dotnet add package Serilog.Sinks.Console
dotnet add package Serilog.Settings.Configuration
dotnet add package Serilog.Enrichers.Thread

Replace the default Program.cs code with this:

using Microsoft.Extensions.Configuration;
using Serilog;
using Serilog.Context;

namespace Logging
{
    class Program
    {
        static void Main(string[] args)
        {
            IConfiguration configuration = new ConfigurationBuilder()
                .AddJsonFile("appsettings.json")
                .Build();

            Log.Logger = new LoggerConfiguration()
                .ReadFrom.Configuration(configuration)
                .CreateLogger();

            LogContext.PushProperty("ClassId", typeof(Program).Name);
            LogContext.PushProperty("FunctionId", nameof(Main));

            Log.Information("Hello World!");
            Log.Error("Something is wrong..");
            Log.Information("Goodbye!");
        }
    }
}

and add this appsettings.json file and set it to "copy if newer"

{
  "Serilog": {
    "Using": [ "Serilog.Sinks.Console" ],
    "MinimumLevel": {
      "Default": "Debug"
    },
    "WriteTo": [
      {
        "Name": "Console",
        "Args": {
          "outputTemplate": "{Timestamp:yyyy-MM-dd HH:mm:ss.fff zzz} [{Level:u3}] [{ModuleId}] [{ThreadId}] - {Message}{NewLine}{Exception}"
        }
      }
    ],
    "Enrich": [ "FromLogContext", "WithThreadId" ]
  }
}
Note: In VSCode you can copy to output the appsettings.json by adding this snippet in the .csproj file:
  <ItemGroup>
    <None Update="appsettings.json">
      <CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
    </None>
  </ItemGroup>

Running this application, we see the following output:

The Log Hunt

A few important notes about the above example:

  1. We put the Serilog settings in the appsettings.json file. A better approach would be to to have a dedicated file, just for the purpose of logging.
  2. More elaborate log event enrichment alternatives exist, removing or adding more information to each log event is a matter of preference (and performance).
  3. We set the static Log.Logger property with the new logger. This way we can access the logger by simply doing Log.Information. This simple approach works well only if we have need a single logger instance in our entire application.

The canonical example

For a better control over the logger creation, a good logger example exists in the IoT Edge runtime source code. This example demonstrates a good way to remove the the Serilog specific implementation dependency from the rest of the source code by implementing the and the ILoggerFactory interface of the Microsoft.Extensions.Logging namespace and hiding the Serilog specific references behind this implementation.

A usage example of the above approach, using a slightly modified version of the aforementioned logger of the IoT Edge Runtime looks like:

using Microsoft.Extensions.Logging;

namespace Logging
{
    class Program1
    {
        static readonly ILogger Log = 
            Logger.Factory.CreateLogger(typeof(Program).Name);

        static void Main(string[] args)
        {
            Log.LogInformation("Hello World!");
            Log.LogError("Something is wrong..");
            Log.LogInformation("Goodbye!");
        }
    }
}
Note that there is no reference to the Serilog namespace anymore. The benefit of this is that we've made our code technology-agnostic, that is, we can replace the logging library by simply changing our Logger class implementation.

Running the above code produces a similar output

The Log Hunt

Besides the output logs style, the major difference between these two examples is that now we can pass this logger to third party libraries that use the same logging abstraction. This is a subtle difference, but it can prove to be very helpful in various scenarios, integrating with ASP.NET Core.

Note: we will see how this becomes important in the next post where we introduce dependency injection

Docker and Azure IoT Edge

So far, the focus has been intentionally limited in generating application logs and sending them to a console sink. Obviously, many other sink options exist, some of them pushing the logs in remote cloud sinks like Azure Event Hubs or Blob Storage. The console sink approach works well both in development environments and when deploying as a container, because Docker can be configured to save these logs in host flat files and then combine these logs on a system level with other OS logs.

The related IoT manifest section is:

"createOptions": {
    "HostConfig": {
        "LogConfig": {
            "Type": "json-file",
            "Config": {
                "max-size": "10m",
                "max-file": "3"
            }
        }
    }
}
Note: by default the Moby container engine does not set container log size limits!

Finally, a couple of notable Azure IoT Edge log features are the log collation and upload capability and the Azure Log Analytics integration module.

To summarize the logging story, the rules of thumb are:

  1. Use Serilog library for logging. If you configure Serilog via configuration, use a dedicated file. Write to at least a console sink.
  2. Reference and use the Serilog.Log if your code is relatively simple and does not integrate with other external libraries.
  3. Implement an ILoggerFactory and use dependency injection if you have integration with external libraries, like ASP.NET Core.
  4. Set a limit to the docker logs.
  5. Never push logs in the same application payload transport channel, use a dedicated transport channel if you push your logs to the upstream.
  6. If your Edge application is part of a broader distributed solution, use the Application Insights sink and correlation IDs.
  7. For ad hoc log retrieval use the log collation and upload capability.
  8. For continuous push edge log, use the Azure Log Analytics integration module.
  9. Configure the OS with a host management solution to retrieve the system logs.

In the next post, we will explore a dependency injection approach that combines all concepts we've seen so far in this mini-series .

]]>
<![CDATA[The Service Room]]>https://havedatawilltrain.com/the-service-room/5e8ba7eaca768b2661b5c8d7Fri, 10 Apr 2020 18:38:20 GMT

Reading configuration settings is one of the most fundamental tasks of any IoT Edge application. We will postulate a configuration categorization and explore how this categorization maps to .NET. Although these examples are specific to Azure IoT Edge applications, the best practices presented here are generic and apply to all other application types.  

Application Configuration

A well-designed application needs to be able to adjust its behavior based on the environment it is running in. And although the definition of the environment can become a matter of debate, there are some commonly accepted standard environments in Software Engineering: Development, Test, Staging and Production. Apparently, some variability exists to the cardinality of these environments, sometimes merging together some of them (eg. Test and Staging), and similarly, an environment can split into more specialized cases.

In reality, besides the environments related to the application lifecycle, an environment can be defined arbitrarily: can be based on geographical location, on physical characteristics, like ambient temperature or water pH, business parameters, like if an item has been sold or not, etc. This diversity of all possible environments makes the environment modeling difficult.  

To shed some light, we'll draw an example from real life, something that almost all people can relate to, cars.

Cars can be one of the best analogy examples one can find, mostly because cars are complex machines that most people have interacted with. Cars carry many well-designed features that many software engineers draw inspiration from when designing things.

Environments

Most modern ECUs (the main computer of a car) are designed to behave differently when in maintenance mode, when the service mechanic plugs in the diagnostic tool. This can be seen as analogous to a Debugging environment, where the TCU architects made sure that the TCU will emit diagnostic information to help the mechanic understand if there are any issues with the car. These environments are called lifecycle environments.

A second category of an environment is the operating environment, the environment in which a car is driven in. Many SUVs allow the drivers to choose between 4x4 and rear wheel drive based on the operating environment traction requirements. Similarly, the vehicle A/C can be turned on and off, according to the environment conditions. You can either define explicitly the environment, via a push button, or infer the environment by reading the sensors; for example infer that it's currently raining by reading the rain sensor. Many of these environment-based decisions have been automated today: the vehicle lights will turn on when it's dark, the A/C will kick-in when it's hot etc. Nevertheless, the point here is that the vehicle designers made the car real-time configurable to conform to the operating environment conditions.

If seen from this standpoint, the application configuration is effectively information that defines an environment!

Device Twin

The long intro and the emphasis to the application configuration definition is necessary, because having a clear understanding of what the configuration information is and what is not, affects the abstraction decisions we make that down the line become the difference between a well-designed and a poorly-designed application.

The notion of a device twin exists in the IoT space. Specifically, in Azure IoT Edge, the definition of the device twin is: "Device twins are JSON documents that store device state information including metadata, configurations, and conditions". The device twin is practically a collection of name-value properties that fall into the two self-explanatory categories, the desired and the reported properties.

Let's go back to the car example. Congrats! You just got hired by an automotive company to write the TCU software of their new connected vehicle car model. Would you store any lifecycle environment information in the device twin? How about the operating environment information, like for example, if it's is currently raining?

The lines can get easily blurred between the device state, the device configuration conditions and metadata.  Using the car analogy, we can draw a clear line to separate the configuration in two major categories:

The Service Room
System configuration
  • System (device) configuration, the configuration that the mechanic (administrator) can change, for example, set the engine idle RPM. The system configuration maps to the lifecycle environment definition. These environments are related to the lifecycle of the vehicle, are clearly defined and changing them usually requires an expert, tools and a vehicle restart.

The Service Room
User configuration
  • User configuration, the configuration that the vehicle drivers (users) can change, for example, the steering wheel height or the radio station. This is easier to achieve through a user interface. The user configuration maps to the operation environment definition. This environment can change dynamically, no restart is required and the vehicle driver (user) has the power to define it.
Note: In IoT, there's always an administrator, and the devices that do not interact with users, don't have user configuration.

By using these definitions, the rest of the story around configuration falls into place:

  1. The system configuration is kept into the system itself, and to change the system configuration, a restart is required, perhaps even a new deployment. The system configuration is enforced, the device either conforms, or fails to start.
  2. On the other hand, the user configuration is persisted in the desired properties of the digital twin. These can change on the fly, and perhaps by external systems, and the device should try to do its best to honor these updates, although this configuration can be seen more like hints.
  3. Finally, all the read-only sensory values are the reported properties of the device twin.
Food for thought: Where would you store the default values of the user configuration?

Let's see now how all these look written in source code.

Reading System Configuration

For system configuration we'll use the following  packages:

  • Microsoft.Extensions.Configuration
  • Microsoft.Extensions.Configuration.Abstractions
  • Microsoft.Extensions.Configuration.CommandLine
  • Microsoft.Extensions.Configuration.EnvironmentVariables
  • Microsoft.Extensions.Configuration.Json

Using these packages we can write a simple application that reads the system information:

using System;
using System.Threading.Tasks;
using Microsoft.Extensions.Configuration;

namespace Example5
{
    class Program
    {
        // This is the program entry point
        static async Task Main(string[] args)
        {
            // Read the system configuration
            IConfiguration configuration;
            configuration = new ConfigurationBuilder()
                .AddJsonFile("appsettings.json", optional: true)
                .AddEnvironmentVariables()
                .AddCommandLine(args)
                .Build();

            if (configuration["Environment"] == "Debug")
            {
                Console.WriteLine($"Configuring for debugging environment..");
                // Code omitted
            }
            Console.WriteLine("Exiting..");
        }
    }
}
Reading the System Configuration using Microsoft.Extensions.Configuration

The IConfiguration property will hold the configuration settings union, based on the order of composition we define. In this example, we start with an optional local JSON file appsettings.json, then we add the environment variables and the command line arguments. Each time we add a new configuration source, we override any variables with the same name.

A good practice is to keep the hosting environment configuration last. This allows us change the configuration without redeploying a new file system version.

Running this application with command line arguments makes easier to test our code:

The Service Room

Writing System Configuration

To set this configuration, you can either push an updated appsettings.json file through a new deployment, or set the settings.createOptions.Env section of a module in the deployment manifest:

          "module2": {
            ...
            "settings": {
              "createOptions": {
                "Env": [
                  "DB_SERVER_URL=${DB_SERVER_URL}"
                ]
              }
            }
          }

Reading User Configuration

To read the user configuration from the device twin, we can use this code snippet:

await ModuleClient.OpenAsync();
var twin = await ModuleClient.GetTwinAsync();
if (twin!= null && twin.Properties.Desired.Contains("IntervalInSeconds"))
	int.TryParse(twin.Properties.Desired["IntervalInSeconds"], out IntervalInSeconds);
Reading user configuration from the device twin 

Writing User Configuration  

You have two options to update the user configuration: via deployment manifest or by using the service-side IoT SDK.

The deployment manifest has the properties.desired section per module. Setting this section requires a redeployment and an application restart. You can think this section as the default values of the module twin.

        "module1": {
            "properties.desired": {
                // desired properties of module1
            }
        },

Conversely, you can set the module twin using the Service (cloud) side of the Azure IoT SDK. A complete example can be found here.

To better understand the different categories and their corresponding location throughout the system, we can visualize everything in a single diagram:

The Service Room

The Thermostat Example

Revisiting the original thermostat example we saw in the previous post, now the thermostat code looks like:

using System;
using System.Runtime.Loader;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Configuration;

namespace Example6
{
    class Program
    {
        // This is the program entry point
        static async Task Main(string[] args)
        {
            // Read the system configuration
            IConfiguration configuration;
            configuration = new ConfigurationBuilder()
                .AddJsonFile("appsettings.json", optional: true)
                .AddEnvironmentVariables()
                .AddCommandLine(args)
                .Build();

            // Create the cancelation token source
            var cancellationTokenSource = new CancellationTokenSource();
            AssemblyLoadContext.Default.Unloading +=
                (cts) => cancellationTokenSource.Cancel();

            Console.CancelKeyPress +=
                (sender, cts) =>
                {
                    Console.WriteLine("Ctrl+C detected.");
                    cts.Cancel = true;
                    cancellationTokenSource.Cancel();
                };

            // Register the Reset command callback 
            await RegisterCommandCallbackAsync("Reset",
                OnReset,
                cancellationTokenSource.Token);

            // Read the system configuration or default to 1
            int interval;
            if (!int.TryParse(
                configuration["IntervalInSeconds"],
                out interval))
                interval = 1;

            // Create and start the TaskTimer
            TaskTimer taskTimer = new TaskTimer(
                EmitTelemetryMessage,
                TimeSpan.FromSeconds(interval), 
                cancellationTokenSource);

            // A non-blocking call to start the task-timer
            taskTimer.Start();

            // Wait until the app unloads or gets cancelled
            await WhenCancelled(cancellationTokenSource.Token);

            // Let the other threads drain
            Console.WriteLine("Waiting for 2 seconds..");
            await Task.Delay(2 * 1000);

            Console.WriteLine("Exiting..");
        }

        public static Task WhenCancelled(CancellationToken cancellationToken)
        {
            var taskCompletionSource = new TaskCompletionSource<bool>();
            cancellationToken.Register(
                s => ((TaskCompletionSource<bool>)s).SetResult(true),
                taskCompletionSource);
            return taskCompletionSource.Task;
        }
        private static async Task RegisterCommandCallbackAsync(string command,
            Action callback,
            CancellationToken cancellationToken)
        {
            // Perform the command registration
            // Code omitted
            return;
        }

        // A method exposed for RPC
        static void OnReset()
        {
            // Perform a temperature sensor reset
            // Code omitted
        }

        // Emit telemetry message
        static void EmitTelemetryMessage()
        {
            Console.WriteLine($"Sending telemetry message..");
        }
    }
}
So far in this mini-series we have deliberately avoided using the Azure IoT SDK, the idea being, we want to build a hosting environment agnostic application, were the code is not coupled to a specific hosting technology. With this approach, hosting it on the Edge becomes a choice, and helps us to develop most of the code in our preferred development environment.

As alluded to this post, one of the most popular cases for reconfiguring the IoT application is for debugging purposes. In the next post we will explore the best practices on application logging.

]]>
<![CDATA[The Waiting Loop]]>https://havedatawilltrain.com/the-waiting-loop/5e860b6b188f260074a3084eSat, 04 Apr 2020 00:48:28 GMT

In IoT, a very common coding pattern is having an endless loop that does some periodic work and emits telemetry based on the work result. In the example we saw in the previous post, we used the EmitTelemetryMessagesAsync to simulate a temperature sensor, but in real life, this function would read the value of an actual hardware temperature sensor and send out the temperature telemetry event. From now on, we will call this pattern telemetry pump.

The telemetry pump pattern is very old. In fact, every Windows application has a similar pump, but for UI events instead of telemetry events. This pump is the dispatching mechanism of the OS events to your application. Similarly in the IoT context, a telemetry pump acts as the dispatching interface between the hardware sensors and the cloud.

Publisher/Subscriber

In an ideal world, a publisher/subscriber pattern that is based on hardware interrupts would be a more elegant approach. Nevertheless, the hardware world is full of uncertainty: originating in Heisenberg's uncertainty principle, there is always some degree of noise when we try to measure any physical phenomenon. This means that consecutive measurement values will jitter (try measuring voltage with a high-accuracy voltmeter). For this reason, most of the times we're forced to use periodical polling instead of change-driven updates. And depending on the required data resolution, we can come up with the required minimum frequency.

Timers

Many telemetry pumps designs exist, depending on the depth of the technology stack we're working on. The periodical nature of the sensor polling often inspires people to use timers. Although timers may seem conceptually a suitable and easy approach, timers usually get you in trouble. There is no guarantee that the previous iteration has completed before the timer fires again. Indeed, if for example we're emitting a telemetry message every second, assuming that every iteration of reading the sensor and sending the message takes roughly 300msec to complete, then the invocation callback is guaranteed to complete before the next period starts.

Graphically, this timeline looks like:

The Waiting Loop
A timer-based periodic invocation timeline

But as we briefly touched on this in a previous post, there is no guarantee of an IO operation duration. Let's say for example that because of a network glitch, the t+1 callback invocation takes more time to complete:

The Waiting Loop
A timer-based periodic invocation timeline during a network glitch

The danger in this case is that at the cloud side we might end up with out-of-order messages that we'll then have to add special care to determine their actual order (we'll see next that we might have to do this either way). But most importantly, because we have parallel execution of the same code, we need to make sure that the code is thread safe, in other words, to ensure that we use semaphores when we access shared (heap) memory.

An even more dangerous case leads to thread poll exhaustion. This can happen when instead of just a network glitch, we run into a network bottleneck that causes significant delay for all callback invocation completion:

The Waiting Loop
A timer-based periodic invocation timeline during a network bottleneck

In this case, a catastrophic program failure is inevitable.

Task-timer

The Waiting Loop

An alternative approach that mitigates these issues is to use to chained tasks. Although this approach might seem initially more complex, in reality is very straight forward. In C#, a Task can have a continuation Task, which simply is another task that starts right after the former completes. By chaining together multiple callback invocation tasks and with adding the necessary Task.Delay between them, we can have the same periodical behavior as before:

The Waiting Loop
A task-timer periodic invocation timeline

Going back to the previous network glitch scenario, the chained tasks will never produce any thread-unsafe or out-of-order messages cases, but rather just a period drift in just a single iteration:

The Waiting Loop
A task-timer periodic invocation timeline during a network glitch.

The downside with this approach is that we lose our polling periodicity. Note that the delay in Callback #2 has shifted all the subsequent callbacks.

A task-timer implementation, as described above, is the code snippet below:

using System;
using System.Threading;
using System.Threading.Tasks;

public class TaskTimer
{
    CancellationTokenSource cancellationTokenSource;
    TimeSpan timerPeriod;
    Action onElapsedCallback;
    bool continueOnError;

    public TaskTimer(Action onElapsedCallback,
        TimeSpan timerPeriod,
        CancellationTokenSource cancellationTokenSource,
        bool continueOnError = true)
    {
        this.cancellationTokenSource = cancellationTokenSource;
        this.timerPeriod = timerPeriod;
        this.onElapsedCallback = onElapsedCallback;
        this.continueOnError = continueOnError;
    }

    public void Start()
    {
        Task elapsedTask = null;
        elapsedTask = new Task((x) =>
        {
            Elapsed(elapsedTask, cancellationTokenSource);
        }, cancellationTokenSource.Token);

        HandleError(elapsedTask);

        elapsedTask.Start();
    }

    private void Elapsed(Task task, object objParam)
    {
        var start = DateTime.Now;
        var cancellationTokenSource = (CancellationTokenSource)objParam;
        if (cancellationTokenSource.Token.IsCancellationRequested)
        {
            Console.WriteLine("TaskTimer: A cancellation has been requested.");
            return;
        }

        onElapsedCallback();

        var delay = timerPeriod - (DateTime.Now - start);
        if (delay.Ticks > 0)
        {
            task = Task.Delay(delay);
        }
        HandleError(task.ContinueWith(Elapsed, cancellationTokenSource));
    }

    private void HandleError(Task task)
    {
        task.ContinueWith((e) =>
        {
            Console.WriteLine(
                $"Exception when running timer callback: {e.Exception}");
            if (!continueOnError)
                cancellationTokenSource.Cancel();
            else
                task.ContinueWith(Elapsed, cancellationTokenSource);
        }, TaskContinuationOptions.OnlyOnFaulted);
    }
}

A few of notes about this TaskTimer:

The endless task-chaining is achieved by creating a Task with execution entry point in the Elapsed method (our callback) and by passing a reference of this task in the Task entry point arguments:

Task elapsedTask = null;
elapsedTask = new Task((x) =>
{
	Elapsed(elapsedTask, cancellationTokenSource);
}, cancellationTokenSource.Token);

Next, inside this Elapsed method, we continue the argument task with the same method:

task.ContinueWith(Elapsed, cancellationTokenSource)

Between each continuation task, a delay might be added, based on the duration of the last iteration and the configured period:

var delay = timerPeriod - (DateTime.Now - start);
if (delay.Ticks > 0)
{
    task = Task.Delay(delay);
}

This implementation will break the task-chaining if the passed CancellationToken signals a cancellation:

if (cancellationTokenSource.Token.IsCancellationRequested)
{
    Console.WriteLine("TaskTimer: A cancellation has been requested.");
    return;
}

Finally, the task-timer will trigger a cancellation signal if an exception is thrown by the callback, and the continueOnError is set to false:

private void HandleError(Task task)
{
    task.ContinueWith((e) =>
    {
		Console.WriteLine(
            $"Exception when running timer callback: {e.Exception}");
        if (!continueOnError)
	        cancellationTokenSource.Cancel();
        else
    	    task.ContinueWith(Elapsed, cancellationTokenSource);
    }, TaskContinuationOptions.OnlyOnFaulted);
}

Let's see now how our sample temperature module evolves using the this TaskTimer:

using System;
using System.Runtime.Loader;
using System.Threading;
using System.Threading.Tasks;

namespace Example4
{
    class Program
    {
        // This is the program entry point
        static async Task Main(string[] args)
        {
            // Create the cancellation token source
            var cancellationTokenSource = new CancellationTokenSource();
            AssemblyLoadContext.Default.Unloading +=
                (cts) => cancellationTokenSource.Cancel();

            Console.CancelKeyPress +=
                (sender, cts) =>
                {
                    Console.WriteLine("Ctrl+C detected.");
                    cts.Cancel = true;
                    cancellationTokenSource.Cancel();
                };

            // Register the Reset command callback 
            await RegisterCommandCallbackAsync("Reset",
                OnReset,
                cancellationTokenSource.Token);

            // Create and start the TaskTimer
            TaskTimer taskTimer = new TaskTimer(
                EmitTelemetryMessage,
                TimeSpan.FromSeconds(1), cancellationTokenSource);

            // A non-blocking call to start the task-timer
            taskTimer.Start();

            // Wait until the app unloads or gets cancelled
            await WhenCancelled(cancellationTokenSource.Token);

            // Let the other threads drain
            Console.WriteLine("Waiting for 2 seconds..");
            await Task.Delay(TimeSpan.FromSeconds(2));

            Console.WriteLine("Exiting..");
        }

        public static Task WhenCancelled(CancellationToken cancellationToken)
        {
            var taskCompletionSource = new TaskCompletionSource<bool>();
            cancellationToken.Register(
                s => ((TaskCompletionSource<bool>)s).SetResult(true),
                taskCompletionSource);
            return taskCompletionSource.Task;
        }
        private static async Task RegisterCommandCallbackAsync(string command,
            Action callback,
            CancellationToken cancellationToken)
        {
            // Perform the command registration
            // Code omitted
            return;
        }

        // A method exposed for RPC
        static void OnReset()
        {
            // Perform a temperature sensor reset
            // Code omitted
        }

        // Emit telemetry message
        static void EmitTelemetryMessage()
        {
            Console.WriteLine($"Sending telemetry message..");
        }
    }
}

Arguably, this version is more readable and at the same time, we've avoided all aforementioned pitfalls. Running this version now produces a similar output as before:

The Waiting Loop

So far, we've been using the Console.WriteLine function to print messages and we've avoided reading any application configuration. In the next post, we'll examine the best approaches on the topic of the configuration.

]]>
<![CDATA[Restarted by request]]>https://havedatawilltrain.com/restarted-by-request/5e86096c188f260074a3083dSat, 04 Apr 2020 00:03:31 GMT

In the previous post we briefly touched on holding the main method from restarting, and as a trick, we used the Console.ReadLine() blocking call. But as we postulated, it's good to avoid any thread-blocking function calls. Furthermore, the process termination and recycling is a more complicated story, especially for code that is running on the edge, perhaps on a remote device with no keyboard, mouse or monitor to interact with. A better strategy in these cases is to control the process termination and use it as a recovery mechanism from unexpected situations, commonly called fatal exceptions.

Application termination and recycling

Restarted by request
The best troubleshooting strategy

Recycling

The Azure IoT Edge hosting mechanism can be configured to restart a stopped module through the restartPolicy configuration in the application manifest. The main point here is that the hosting runtime assumes that the application might crash and exit, and allowing the application to restart is a very appealing crash recovery mechanism that we should definitely leverage.

The restartPolicy configuration dictates how the IoT Edge agent restarts a module. Possible values include:

  • never – The IoT Edge agent never restarts the module.
  • on-failure - If the module crashes, the IoT Edge agent restarts it. If the module shuts down cleanly, the IoT Edge agent doesn't restart it.
  • on-unhealthy - If the module crashes or is considered unhealthy, the IoT Edge agent restarts it.
  • always - If the module crashes, is considered unhealthy, or shuts down in any way, the IoT Edge agent restarts it.

The product has other ways to remotely stop and start any module in an ad-hoc manner, so setting the restartPolicy to always is, generally speaking, a good strategy.

Cancellation

In the second code sample of the previous post, we saw how to use the async/await pattern to implement non-blocking code parallelism. But in that approach, there was no way to signal the application to exit gracefully, and this can become particularly challenging in highly parallel code. The .NET Core runtime provides a parallel operation cancellation mechanism through the CancellationTokenSource.

When a fatal exception issue occurs, a graceful exit is always preferred versus an application crash, because this allows debugging information to be written in the application logs for post-mortem analysis.

Now, let's see how our code evolves with the usage of a CancellationTokenSource:

using System;
using System.Runtime.Loader;
using System.Threading;
using System.Threading.Tasks;

namespace Example3
{
    class Program
    {
        // This is the program entry point
        static async Task Main(string[] args)
        {
            // Create the cancellation token source
            var cancellationTokenSource = new CancellationTokenSource();
            AssemblyLoadContext.Default.Unloading += 
                (cts) => cancellationTokenSource.Cancel();
            
            Console.CancelKeyPress +=
                (sender, cts) =>
                {
                    Console.WriteLine("Ctrl+C detected.");
                    cts.Cancel = true;
                    cancellationTokenSource.Cancel();
                };

            // Register the Reset command callback 
            await RegisterCommandCallbackAsync("Reset", 
                OnReset, 
                cancellationTokenSource.Token);

            // A non-blocking telemetry emission invocation
            await EmitTelemetryMessagesAsync(cancellationTokenSource.Token);

            // Wait until the app unloads or gets cancelled
            await WhenCancelled(cancellationTokenSource.Token);

            // Let the other threads drain
            Console.WriteLine("Waiting for 2 seconds..");
            await Task.Delay(TimeSpan.FromSeconds(2));

            Console.WriteLine("Exiting..");
        }

        public static Task WhenCancelled(CancellationToken cancellationToken)
        {
            var taskCompletionSource = new TaskCompletionSource<bool>();
            cancellationToken.Register(
                s => ((TaskCompletionSource<bool>)s).SetResult(true), 
                taskCompletionSource);
            return taskCompletionSource.Task;
        }
        private static async Task RegisterCommandCallbackAsync(string command,
            Action callback,
            CancellationToken cancellationToken)
        {
            // Perform the command registration
            // Code omitted
            return;
        }

        // A method exposed for RPC
        static void OnReset()
        {
            // Perform a temperature sensor reset
            // Code omitted
        }

        // Emit telemetry messages
        static async Task EmitTelemetryMessagesAsync(
            CancellationToken cancellationToken)
        {
            while(true)
            {
                if (cancellationToken.IsCancellationRequested)
                {
                    Console.WriteLine($"Exiting telemetry pump..");
                    break;
                }
                Console.WriteLine($"Sending telemetry message..");
                await Task.Delay(TimeSpan.FromSeconds(1));
            }
        }
    }
}

The cancellation token source allows us to signal a cancellation by invoking the Cancel() method, as we decided to do when the AssemblyLoadContext.Default.Unloading fires, an event that generally means your application is already exiting, or when the Console.CancelKeyPress event fires (Ctrl+C), useful for testing the cancellation mechanism in our development environment. A requested cancellation can be detected by the cancellationToken.IsCancellationRequested property, and then make sure we gracefully terminate any running parallel operation, similarly to what we did in the EmitTelemetryMessagesAsync.

Now in the EmitTelemetryMessagesAsyncwe can loop forever and break based on the cancellation token signal.

Finally, we can register to this cancellation event and when this event fires, let the Main method return by awaiting the WhenCancelled function. In other words, now we have an event driven mechanism to gracefully exit from all active threads of our program.

Restarted by request

The rules of thumb here are:

  1. Pass a reference of the CancellationToken to every async function, and to every the long-running synchronous. Use this token signal to gracefully return from these functions.
  2. Pass a reference to the CancellationTokenSource to every function that performs critical operations that cannot recover from possible exceptions, e.g. initialization of used dependencies. Use the source to cancel the execution of the program.
  3. Let the main function return when a cancellation is signaled. Allow some time for the other threads to complete their shutdown process.

In the next post we will examine more elegant ways to implement telemetry pumps without the usage of while loops.

]]>
<![CDATA[The Last Judgment]]>https://havedatawilltrain.com/the-last-judgment/5e83c71e0b72be03aba4a5ddFri, 03 Apr 2020 23:44:19 GMT

Word is you like to live on the Edge, stranger.

Sometimes I like it too. I wish I didn't, but I do. I do it because that's where most of the interesting things happen, and for a good reason: zero latency. I wasn't always like that. I used to struggle with my equipment, didn't know where to start from, I was intimidated, I felt completely overwhelmed and powerless by this "technology". It took me some time to build up my skills and find my comfort zone, but to get there, the ride was rough.

For the past two years I've been working on various IoT Edge technologies. For the most part of it, my work was around the Azure IoT Edge technology stack. Through this work, I came up with some ground rules that I personally think are the minimum prerequisites for any IoT development environment, especially for developers who write hardware-specific code, and some design principles that over time I've found great value in.

In this mini-series posts, I'll try to demonstrate these tools and patterns in an incremental way, starting from a simple console app that will gradually evolve to a production-ready Azure IoT Edge application.

Azure IoT Edge: the main gist

All edge technologies' purpose is to help you achieve one thing: run your code on a remote (network) device. This boils down to two distinct capabilities:

  1. Deploy and manage your code on a device.
  2. Establish communication with the device.

We just experienced the proliferation of containers and, as expected, containers are the main deployment mechanism of Azure IoT Edge too. In Azure IoT Edge, one can deploy an application by creating the application manifest: a file that holds the information of which containers compose your application, and how these containers should be instantiated.

The development tools landscape

There are development tools that help you build your containers and compose this manifest. Unfortunately, these tools rely heavily on the solution file system structure to operate, and come with unintuitive commands that mostly confuse rather than help. These tools rely on a thick stack of dependencies, that makes them flaky and slow, and print cryptic error messages. Even if you manage to properly install them on your development machine and use them to successfully compose your first IoT Edge application, the next challenge is to figure out how to debug this application. In a constant output log searching, lengthy redeployments, and remote debugging tricks, the whole development experience is frustrating.

The problem lies in the containerized nature of the deployment mechanism, and in the fact that this deployment mechanism was never abstracted out from the development experience. In addition to that, the deployment manifest is not generated by user code, and as a consequence, no code validation checks can apply to it. This means for example that tools like compilers, linters etc. are useless. If you decide to rename a class, any decent IDE/editor is clever enough to propagate this rename effect to the entire solution. But because this technology-unique manifest is exposed to the user, the application often breaks. Furthermore, it's difficult to have environment solution variants (debug, release etc.), because the compiler directives do not apply inside the manifest, and the tooling approach is to have dedicated replicas per configuration, making a large size application maintenance very difficult.  

An incremental approach

The Last Judgment
RPC happiness

In it's most fundamental form, an IoT Edge application performs two functions:

  1. Processes device sensor data and emits some telemetry.
  2. Perform some form of RPC (remote command invocation).

The typical documentation example of Azure IoT Edge is a temperature sensor, that emits the device temperature (telemetry), and has a reset method (command). As of writing this article, the source code of this example is here.

A simple console app that demonstrates this simulated temperature example looks like this:

using System;
using System.Threading;

namespace Example1
{
    class Program
    {
        // This is the program entry point
        static void Main(string[] args)
        {
            // Register the Reset command callback 
            RegisterCommandCallback("Reset", OnReset);

            // A non-blocking telemetry emission invocation
            new Thread(EmitTelemetryMessages).Start();

            // Wait until the app unloads or gets cancelled
            Console.ReadLine();
        }

        private static void RegisterCommandCallback(string command,
            Action callback)
        {
            // Perform the command registration
            // Code omitted
            return;
        }

        // A method exposed for RPC
        static void OnReset()
        {
            // Perform a temperature sensor reset
            // Code omitted
            return;
        }

        // Emit 500 telemetry messages
        static void EmitTelemetryMessages()
        {
            for (int i = 1; i <= 500; i++)
            {
                Console.WriteLine($"Sending telemetry message {i} ..");
                Thread.Sleep(1000);
            }
        }
    }
}
A simple IoT Edge Temperature Sensor

Let's break down what happens here. This example code performs three things:

  1. Registers a command callback (the actual registration code is omitted for clarity)
  2. Sends some messages in a second thread
  3. Waits for the user to press enter to exit.

The latter step is there to ensure that the program does not exit immediately, before the other thread has the chance to send any message. In practice though, in a typical IoT Edge scenario there is no user to press enter, there's not even a monitor screen to print these messages. We'll see later on what's the best pattern to deal with this issue.

As with most containerized applications, this is a command-line initiated app, a console app. Console apps come from a time when all applications were single threaded and there was no GUI to interact with. The operating system would load the application binary code into a memory block, and start a thread at the program entry point, the main function. When this function returned, the program would exit.

When multithreaded applications were eventually allowed by modern operating systems, programmers could manually create a new thread and start it at any function. This felt like running multiple applications, but sharing the same memory block. This capability allowed the creation of a new programming style, called multithreaded event-driven that was very useful in scenarios like computer networking or graphical user interfaces, or in general, when functions had to be executed in a non-predefined order and timing, but rather based on external events.

Fast forward a few decades, and the modern languages have made it easier to deal with multithreaded event driven programming, generally called asynchronous programming . Tons of literature has been written about this topic, but the fundamental idea is simple: writing code that can react to external events of undetermined timing and order. C# in particular has the async/await pattern, which is an elegant way to execute some code in parallel (code that runs in a different thread).

Using this pattern, the above example becomes:

using System;
using System.Threading.Tasks;

namespace Example2
{
    class Program
    {
        // This is the program entry point
        static async Task Main(string[] args)
        {
            // Register the Reset command callback 
            await RegisterCommandCallbackAsync("Reset", OnReset);

            // A non-blocking telemetry emission invocation
            await EmitTelemetryMessagesAsync();

            // Wait until the app unloads or gets cancelled
            Console.ReadLine();
        }

        private static async Task RegisterCommandCallbackAsync(string command,
            Action callback)
        {
            // Perform the command registration
            // Code omitted 
            return;
        }

        // A method exposed for RPC
        static void OnReset()
        {
            // Perform a temperature sensor reset
            // Code omitted
            return;
        }

        // Emit 500 telemetry messages
        static async Task EmitTelemetryMessagesAsync()
        {
            for (int i = 1; i <= 500; i++)
            {
                Console.WriteLine($"Sending telemetry message {i} ..");
                await Task.Delay(TimeSpan.FromSeconds(1));
            }
        }
    }
}
Note that we had to refactor our main function signature to be async

These two versions are very similar in functionality, but with a major difference: in the latter example, there is no code segment that runs for an extensive period, in other words, no thread is blocked by waiting for something to complete. Indeed, in the first example, the thread we created to run the EmitTelemetryMessages function would periodically wait, blocked for a second at the Thread.Sleep(1000). Blocked in this context means that the thread, although running, is waiting for something else to continue. This is not too important for now, but in real production systems becomes very important and helps to avoid the thread pool starvation issue, aka, running out of threads.

The rule of thumb here is: anything that might take a considerable amount of time to complete, has to be implemented in the async/await pattern. The important word here is might: generally speaking, things that require IO, like reading a file from the disk, sending a message over the network etc. fall into this category. Practically anything that does not run only in the context of the CPU should be awaited, because of the uncertainty of the completion time of the required additional hardware resource. But even for code that runs solely in the boundaries of the CPU, if it could take more than a few milliseconds to run, it should be awaited.

In the next article we will examine the best approach to keep the main function from returning.

]]>
<![CDATA[Stream of the Jest]]>https://havedatawilltrain.com/stream-of-the-jest/5d9cdad332f8e9003c17823eFri, 11 Oct 2019 21:29:44 GMT

In a previous post we explored a high performance implementation of a Python Object Detector based on the SSD Inception V2 COCO model running on an NVIDIA Jetson TX2. In this post we will explore how we can implement the same Object Detector using NVIDIA's DeepStream SDK.

Previously, in the process of increasing the observed detector Frames Per Second (FPS) we saw how we can optimize the model with TensorRT and at the same time replace the simple synchronous while-loop implementation to an asynchronous multi-threaded one.

We noticed the increased FPS and the introduced trade-offs: the increased inference latency and the increased CPU and GPU utilization. Reading the web camera input frame-by-frame and pushing the data to the GPU for inference can be very challenging.

DeepStream Object Detector on a Jetson TX2

The data generated by a web camera are streaming data. By definition, data streams are continuous sources of data, in other words, sources that you cannot pause in any way. One of the strategies in processing data streams is to record the data and run an offline processing pipeline. This is called batch processing.

In our case we are interested in minimizing the latency of inference. A more appropriate strategy then would be a real time stream processing pipeline. The DeepStream SDK offers different hardware accelerated plugins to compose different real time stream processing pipelines.

NVIDIA also offers pre-built containers for running a containerized DeepStream application. Unfortunately, at the time of this blog post, the containers had some missing dependencies. For this reason, this application is run directly on the TX2 device.

DeepStream Object Detector application setup

To run a custom DeepSteam object detector pipeline with the SSD Inception V2 COCO model on a TX2, run the following commands.

I'm using JetPack 4.2.1

Step 1: Get the model.

This command will download and extract in /temp the same model we used in the Python implementation.

wget -qO- http://download.tensorflow.org/models/object_detection/ssd_inception_v2_coco_2017_11_17.tar.gz | tar xvz -C /tmp

Step 2: Optimize the model with TensorRT

This command will convert this downloaded frozen graph to a UFF MetaGraph model.

python3 /usr/lib/python3.6/dist-packages/uff/bin/convert_to_uff.py \
  /tmp/ssd_inception_v2_coco_2017_11_17/frozen_inference_graph.pb -O NMS \
  -p /usr/src/tensorrt/samples/sampleUffSSD/config.py \
  -o /tmp/sample_ssd_relu6.uff

The generated UFF file is here: /tmp/sample_ssd_relu6.uff.

Step 3: Compile the custom object detector application

This will build the nvdsinfer_custom_impl sample that comes with the SDK.

cd /opt/nvidia/deepstream/deepstream-4.0/sources/objectDetector_SSD
CUDA_VER=10.0 make -C nvdsinfer_custom_impl

We need to copy the UFF MetaGraph along with the label names file inside this folder.

cp /usr/src/tensorrt/data/ssd/ssd_coco_labels.txt .
cp /tmp/sample_ssd_relu6.uff .

Step 4: Edit the application configuration file to use your camera

Located in the same directory, the file deepstream_app_config_ssd.txt contains information about the DeepStream pipeline components, including the input source. The original example has been configured to use a static file as source.

[source0]
enable=1
#Type - 1=CameraV4L2 2=URI 3=MultiURI
type=3
num-sources=1
uri=file://../../samples/streams/sample_1080p_h264.mp4
gpu-id=0
cudadec-memtype=0

If you want to use a USB camera, make a copy of this file and change the above section to:

[source0]
enable=1
#Type - 1=CameraV4L2 2=URI 3=MultiURI
type=1
camera-width=1280
camera-height=720
camera-fps-n=30
camera-fps-d=1
camera-v4l2-dev-node=1

Save this as deepstream_app_config_ssd_USB.txt.

TX2 comes with an on board CSI camera. If you want to use the embedded CSI camera change the source to:

[source0]
enable=1
#Type - 1=CameraV4L2 2=URI 3=MultiURI 4=RTSP 5=CSI
type=5
camera-width=1280
camera-height=720
camera-fps-n=30
camera-fps-d=1

Save this file as deepstream_app_config_ssd_CSI.txt.

Both cameras are configured to run at 30 FPS with 1280x720 resolution.

Execute the application

To execute the Object Detection application using the USB camera, run:

deepstream-app -c deepstream_app_config_ssd_USB.txt
Stream of the Jest
DeepStream detector @16 FPS with the USB Camera

The application will print the FPS in the terminal, it's ~16 FPS. This result is very similar to the TensorRT Python implementation where we had achieved 15 FPS in a simple Python while loop.

So what's the fuzz about DeepStream?

In a closer look, we can tell there is a substantial difference. The tegrastats output shows an semi-utilized GPU (~50%) and an under utilized CPU (~25%).

Stream of the Jest

The USB camera throughput is the obvious bottleneck in this pipeline. The Jetson TX2 development kit comes with an on board 5 MP Fixed Focus MIPI CSI Camera out of the box.

The Camera Serial Interface (CSI) is a specification of the Mobile Industry Processor Interface (MIPI) Alliance. It defines an interface between a camera and a host processor. This means that the CSI camera can move the data in the GPU faster than the USB port.

Let's try the CSI camera.

deepstream-app -c deepstream_app_config_ssd_CSI.txt
Stream of the Jest
DeepStream detector @24 FPS with the CSI Camera

The average performance now is ~24 FPS. Note that the theoretical maximum we can get is 30 FPS, since this is the camera frame rate.

We can see an obvious increase of the system utilization:

Stream of the Jest

Let's try running both applications side by side:

Stream of the Jest
~12 FPS for each application

The GPU now is operating at full capacity.

Stream of the Jest

The observed frame rates are ~12 FPS for each application. Apparently, the maximum capacity of this GPU is ~24 inferences per second for this setup.

Note: NVIDIA advertises that a DeepStream ResNet-based object
detector application can handle concurrently 14 independent 1080p 30fps video streams on a TX2. Here we see that this is far from true when using an industry standard detection model like SSD Inception V2 COCO.

What have we achieved: We've explored an optimized streaming Object Detection application pipeline using the DeepStream SDK and we've achieved the maximum detection throughput possible, as defined by the device's hardware limits.

]]>
<![CDATA[Three Threads to Perdido]]>https://havedatawilltrain.com/three-threads-to-perdido/5d321251435c041126c989a3Fri, 04 Oct 2019 18:56:00 GMT

Welcome stranger, I've been expecting you.

I know what brought you here, it's despair. I know what is out there,  I've seen it, and it's not pretty. Bad designs, broken code samples, no container definitions, missing dependency libraries, poor performance, you name it.

I was recently exploring ways to do real time object detection on my Nvidia Jetson TX2. The real time term here simply means, low latency and high throughput. It's a very loosely defined term, but it's used here in contrast to the store-and-process pattern, where storage is used as an interim stage.


High Performance Objection Detection on a Jetson TX2

Starting simple

We'll explore a simple program that detects human faces using the camera input and renders the camera input with the bounding boxes. This one is based on the Haar Cascades and is one of the simplest ways to get started with Object Detection on the Edge. There is no Jetson platform dependency for this code, only on OpenCV.

I'm using a remote development setup to do all of my coding that uses containers. This way you can experiment with all the code samples yourself without having to setup any runtime dependencies on your device.

Here is how I setup my device and my remote development environment with VSCode.

Start by cloning the example code. After cloning, you need to build and run the container that we'll be using to run our code.

Clone the example repo

https://github.com/paloukari/jetson-detectors
cd jetson-detectors

To build and run the development container

sudo docker build . -f ./docker/Dockerfile.cpu -t object-detection-cpu
sudo docker run --rm --runtime nvidia --privileged -ti -e DISPLAY=$DISPLAY -v "$PWD":/src -p 32001:22 object-detection-cpu

The --privileged is required for accessing all the devices. Alternatively you can use the --device /dev/video0.

Here's the code we'll be running. Simple open the cpudetector.py file in VSCode and hit F5 or just run: python3 src/cpudetector.py. In both cases you'll need to setup the X forwarding. See the Step 2: X forwarding on how to do this.

Three Threads to Perdido
~23 FPS with OpenCV

We get about 23 FPS. Use the tegrastats to see what's happening in the GPU:

Three Threads to Perdido

We're interested in the GR3D_FREQ values. It's clear that this code runs only on the device CPUs with more than 75% utilization per core, and with 0% GPU utilization.

Next up, we use go Deep

Haar Cascades is good, but how about detecting more things at once? In this case, we need to use Deep Neural Networks. We will need to use another container from now on to run the following code.

To build and run the GPU accelerated container

sudo docker build . -f ./docker/Dockerfile.gpu -t object-detection-gpu
sudo docker run --rm --runtime nvidia --privileged -ti -e DISPLAY=$DISPLAY -v "$PWD":/src -p 32001:22 object-detection-gpu
WARNING: This build takes a few hours to complete on a TX2. The main reason is because we build the Protobuf library to increase to models loading performance. To reduce this the build time, you can build the same container on a X64 workstation.

In the first attempt, we'll be using the official TensorFlow pre-trained networks. The code we'll be running is here.

When you run python3 src/gpudetector.py --model-name ssd_inception_v2_coco , the code will try to download the specified model inside the /models folder, and start the object detection in a very similar fashion as we did before. The --model-name default value is ssd_inception_v2_coco, so you can omit it.

This model has been trained to detect 90 classes (you can see the details in the downloaded pipeline.config file). Our performance plummeted to ~8 FPS.

Run python3 src/gpudetector.py

Three Threads to Perdido
~8 FPS with a the TensorFlow SSD Inception V2 COCO model

What happened? We've started running the inference in the GPU which for a single inference round trip now takes more time. Also, we move now a lot of data from the camera to RAM and from there to the GPU. This has an obvious performance penalty.

What we can do is start with optimizing the inference. We'll use the TensorRT optimization to speedup the inference. Run the same file as before, but now with the --trt-optimize flag. This flag will convert the specified TensorFlow mode to a TensorRT and save if to a local file for the next time.

Run python3 gpudetector.py --trt-optimize:

Three Threads to Perdido
~15 FPS with TensorRT optimization 

Better, but still far from perfect. The way we can tell is by looking at the GPU utilization in the background, it drops periodically to 0%. This happens because the code is being executed sequentially. In other words, for each frame we have to wait to get the bits from the camera, create an in memory copy, push it to the GPU, perform the inference, and render the original frame with the scoring results.

We can break down this sequential execution to an asynchronous parallel version. We can have a dedicated thread for reading the data from the camera, one for running inference in the GPU and one for rendering the results.

To test this version, run  python3 gpudetectorasync.py --trt-optimize

Three Threads to Perdido
~40 FPS with async TensorRT

By parallelizing expensive calculations, we've achieved a better performance compared to the OpenCV first example. The trade off now is that we've introduced a slight delay between the current frame and the corresponding inference result. To be more precise here, because the inference now is running on an independent thread, the observed FPS do not match with the number of inferences per second.

What we have achieved: We've explored different ways of improving the performance of the typical pedagogic object detection while-loop.

Next, in this post we'll explore how we can improve even more our detector, by using the DeepStream SDK.

]]>
<![CDATA[Drama of entropy]]>https://havedatawilltrain.com/drama-of-entropy/5d8aad58bea92c00581d20fcMon, 30 Sep 2019 20:17:07 GMT

I used to mess up my development environment over time by installing all the different experimentation dependencies. This is why I decided to switch to Docker containers.

The Jetson Nano default image comes with Docker runtime pre-installed. To package your app and avoid polluting your device OS with various dependencies, you can use Docker containers as development environments.

In fact, using a container as a development environment is a very similar to using directly a Jetson device for remote development. Here's my post on how to do the latter.


Remote Debugging inside a Container running on a Jetson Device

The NVIDIA Container Runtime on Jetson repo has great information on how to use GPU accelerated containers in both the Jetson family, and an X64 workstation.

Once you choose what's your base container, you'll need to setup a few more things.

You can either clone this repo that contains all the below code, or start from scratch. The assumed file structure of all the following code is:

Drama of entropy
/keys: Here's where I keep my public ssh key.
/.vscode: Here's where I keep my debugging configuration.
/docker: Inside the docker folder I put the container definitions.
/src: Here's the application source code.

/keys

You will need to setup a passwordless ssh between your host's VSCode and the container. To do this, you'll need to copy your ssh public key in the container's authorized_keys file. To get your public ssh key, run:cat ~/.ssh/id_rsa.pub. Copy the key value in the id_rsa.pub key file that's inside the /keys folder.

My host is a Windows 10 machine and I generally make sure that my Windows and WSL keys are the same. To do this, I've copied my keys from ~/.ssh/ to /mnt/c/users/[YOUR USERNAME]/.ssh/. This way I get a convenience security symmetry.

/.vscode

Here's the my launch.json and how to setup an X Server for forwarding any GUI running on the container to your host (here's more information how to set this up)

{    
    "version": "0.2.0",
    "configurations": [
        {
            "name": "Python: Current File",
            "type": "python",
            "request": "launch",
            "program": "${file}",
            "console": "integratedTerminal", 
            "env":
            {
            "DISPLAY": "10.135.62.79:0.0" 
            }
        }
    ]
}
Replace the IP with your host's IP. I'm using the X410 X Server on Windows 10.

/docker

In this example, I've started from a bare minimum, the l4t OS. This dockerfile is self explanatory.

FROM nvcr.io/nvidia/l4t-base:r32.2

ENV DEBIAN_FRONTEND=noninteractive

# Install Python3, Git and OpenCV
RUN apt-get update && apt-get --yes install openssh-server python3-dev python3-pip python3-opencv git
RUN pip3 install --upgrade pip

RUN pip3 install click

ENV LC_ALL C.UTF-8
ENV LANG C.UTF-8

# Set the WORKDIR
WORKDIR /src

ENTRYPOINT service ssh restart && bash

# Install the ssh public key - Remove this in a production deployment
COPY ./keys/id_rsa.pub /tmp/tmp.pub
RUN mkdir -p ~/.ssh && chmod 700 ~/.ssh && cat /tmp/tmp.pub >> ~/.ssh/authorized_keys && chmod 600 ~/.ssh/authorized_keys && rm -f /tmp/tmp.pub

/src

This is where I put all the application code. You can test that the X forwarding works by opening running the xtest.py. You'll have to install the eog application in the container first by running apt-get install eog in a new VSCode terminal.

Here's the xtest.py code:

import subprocess
subprocess.run(["eog"])

Wiring everything up

On your Jetson device, run:

# Clone the example repo
git clone https://github.com/paloukari/jetson-detectors
cd jetson-remote-development

# Build the dev container
sudo docker build . -f ./docker/Dockerfile.cpu -t object-detection-cpu

# Run the container
sudo docker run --rm --runtime nvidia --privileged -ti -e DISPLAY=$DISPLAY -v "$PWD":/src -p 32001:22 object-detection-cpu

In your host's VSCode, add this information in the Remote-SSH configuration file:

Drama of entropy
Host Nano
    User spyros
    HostName spyros-nano2
    IdentityFile ~/.ssh/id_rsa

Host NanoContainer
    User root
    HostName spyros-nano2
    IdentityFile ~/.ssh/id_rsa
    Port 32001

Now, you should be able to connect to the NanoContainer Remote SSH host. Last step is to install the Python VSCode extension on the remote host.

Drama of entropy
The VSCode extensions need to be installed in both machines in a Remote-SSH session

Open the xtest.py and fit F5.

You need to install the eog app in the container.
Drama of entropy

What we achieved: We can now do remote development from a Windows host to a container running on a Jetson Nano, using VSCode.

]]>
<![CDATA[Got Nano, Wanna Code]]>https://havedatawilltrain.com/got-nano-will-code/5d7bf22d3694ea005ad2a5f8Wed, 25 Sep 2019 20:39:41 GMT

So, you just got your Jetson Nano and you're wondering how to get started? In this blog post you'll learn how to setup a remote development environment from your Windows 10 machine to a Jetson device.

Device Setup

First, let's start with the device initial setup. To prepare my device, I followed the official getting started guide from Nvidia. I used a 128 GB microSD and a power adapter to power up my device, along with the required jumper. I connected a Dell P2415Q monitor using an HDMI cable and my USB receiver for my Logitech Triathlon keyboard and mouse. My development laptop is a Windows 10 machine.

Desktop Sharing

The desktop sharing app is broken. Follow this article to fix it. You can also find instructions there on how to use Microsoft's Remote Desktop.

Development Environment

The latest trend of development experience in the software development industry is developing using CLIs and code editors. I generally enjoy having all moving parts together when I'm coding, or at least in proximity.

Visual Studio Code succeeded in combining together a cross-platform code editor, community driven plugins and decent UI/UX for development. But what I really like with Visual Studio Code is the recent Remote Development capability that allows you to keep a local UI/UX experience, while working on a remote host or container.

The other thing I like having in my development environment is idempotency: not having to deal with the dependencies of the host. Containers allow me to have multiple dependency configurations on the same host, and moreover, to share code between different hosts. For this reason, in every code sample I present in this blog, I make sure to include the corresponding container.

Step 1: Remote Development on a Jetson Nano

Because the ARM64 architecture is not officially supported yet, to install Visual Studio Code on your Jetson Nano, for now you'll have to use the community binaries.

Installing Visual Studio Code on a Jetson Nano

# Start an elevated session
sudo -s

# Install the community repo key
apt-key adv --keyserver keyserver.ubuntu.com --recv-keys 0CC3FD642696BFC8

# Run the installation script
. <( wget -O - https://code.headmelted.com/installers/apt.sh )

If all goes well, you should get this output:

Installation complete!

You can start code at any time by calling "code-oss" within a terminal.

A shortcut should also now be available in your desktop menus (depending on your distribution).

Next, you'll need to install the Visual Studio Code Insiders on your host and install the Remote Development extension.

As a final step, you'll need to setup a passwordless SSH between your host and the Nano. To do this, you'll need to create an SSH public-private key pair and configure your device to trust your public key.

To setup your local SSH key in Windows run:

Add-WindowsCapability -Online -Name OpenSSH.Client~~~~0.0.1.0
ssh-keygen -t rsa -b 4096 

The keys will be created here: %USERPROFILE%\.ssh

To copy your public key to the device:

SET REMOTEHOST=user@device
scp %USERPROFILE%\.ssh\id_rsa.pub %REMOTEHOST%:~/tmp.pub
ssh %REMOTEHOST% "mkdir -p ~/.ssh && chmod 700 ~/.ssh && cat ~/tmp.pub >> ~/.ssh/authorized_keys && chmod 600 ~/.ssh/authorized_keys && rm -f ~/tmp.pub"

You'll need to replace the value user@device

In you host VSCode, hit F1 and type Remote-SSH: Open Configuration File..

Add your configuration:

Host Nano
    User user
    HostName device
    IdentityFile ~/.ssh/id_rsa

Replace the values user and device

Connect to your Nano clicking the low left corner green icon:

Got Nano, Wanna Code

Congrats! Once connected, you can open a remote folder and open remote terminals directly in VSCode.

Step 2: X forwarding

What about applications that have a GUI? No worries, you can setup an X forwarding from Nano to your host. To do this, I'm using the X410 server for Windows. After installing and running the server, make sure you allow public access.

Got Nano, Wanna Code

All is left to do is to configure the X Forwarding on your Nano device.

Open a terminal in VSCode (Ctrl+Shift+` on Windows) and run:

export DISPLAY=10.135.62.79:0.0

Make sure you replace the above IP with your host IP

To verify everything works, from the terminal run:

eog

If all goes well, you should see this window:

Got Nano, Wanna Code

To automatically set the remote variable when debugging your app, you can modify your launch.json and set the variable there:

{
    "version": "0.2.0",
    "configurations": [
        {
            "name": "Python: Current File",
            "type": "python",
            "request": "launch",
            "program": "${file}",
            "console": "integratedTerminal", 
            "env":
            {
            "DISPLAY": "10.135.62.79:0.0" 
            }
        }
    ]
}

To test all this, create a new python file containing the following code:

import subprocess
subprocess.run(["eog"])

Now you can hit F5 and you'll get the same development experience, as with your local host, but running on a remote ARM machine!

Got Nano, Wanna Code
Starting a remote debugging session on Jetson Nano

What we have achieved: We can now do remote development from a Windows host to a Jetson Nano, using VSCode.

Next, we'll setup the same remote environment, but on a Docker container running on the Jetson device.

]]>