In IoT, a very common coding pattern is having an endless loop that does some periodic work and emits telemetry based on the work result. In the example we saw in the previous post, we used the EmitTelemetryMessagesAsync to simulate a temperature sensor, but in real life, this function would read the value of an actual hardware temperature sensor and send out the temperature telemetry event. From now on, we will call this pattern telemetry pump.

The telemetry pump pattern is very old. In fact, every Windows application has a similar pump, but for UI events instead of telemetry events. This pump is the dispatching mechanism of the OS events to your application. Similarly in the IoT context, a telemetry pump acts as the dispatching interface between the hardware sensors and the cloud.

Publisher/Subscriber

In an ideal world, a publisher/subscriber pattern that is based on hardware interrupts would be a more elegant approach. Nevertheless, the hardware world is full of uncertainty: originating in Heisenberg's uncertainty principle, there is always some degree of noise when we try to measure any physical phenomenon. This means that consecutive measurement values will jitter (try measuring voltage with a high-accuracy voltmeter). For this reason, most of the times we're forced to use periodical polling instead of change-driven updates. And depending on the required data resolution, we can come up with the required minimum frequency.

Timers

Many telemetry pumps designs exist, depending on the depth of the technology stack we're working on. The periodical nature of the sensor polling often inspires people to use timers. Although timers may seem conceptually a suitable and easy approach, timers usually get you in trouble. There is no guarantee that the previous iteration has completed before the timer fires again. Indeed, if for example we're emitting a telemetry message every second, assuming that every iteration of reading the sensor and sending the message takes roughly 300msec to complete, then the invocation callback is guaranteed to complete before the next period starts.

Graphically, this timeline looks like:

A timer-based periodic invocation timeline

But as we briefly touched on this in a previous post, there is no guarantee of an IO operation duration. Let's say for example that because of a network glitch, the t+1 callback invocation takes more time to complete:

A timer-based periodic invocation timeline during a network glitch

The danger in this case is that at the cloud side we might end up with out-of-order messages that we'll then have to add special care to determine their actual order (we'll see next that we might have to do this either way). But most importantly, because we have parallel execution of the same code, we need to make sure that the code is thread safe, in other words, to ensure that we use semaphores when we access shared (heap) memory.

An even more dangerous case leads to thread poll exhaustion. This can happen when instead of just a network glitch, we run into a network bottleneck that causes significant delay for all callback invocation completion:

A timer-based periodic invocation timeline during a network bottleneck

In this case, a catastrophic program failure is inevitable.

Task-timer

An alternative approach that mitigates these issues is to use to chained tasks. Although this approach might seem initially more complex, in reality is very straight forward. In C#, a Task can have a continuation Task, which simply is another task that starts right after the former completes. By chaining together multiple callback invocation tasks and with adding the necessary Task.Delay between them, we can have the same periodical behavior as before:

A task-timer periodic invocation timeline

Going back to the previous network glitch scenario, the chained tasks will never produce any thread-unsafe or out-of-order messages cases, but rather just a period drift in just a single iteration:

A task-timer periodic invocation timeline during a network glitch.

The downside with this approach is that we lose our polling periodicity. Note that the delay in Callback #2 has shifted all the subsequent callbacks.

A task-timer implementation, as described above, is the code snippet below:

using System;
using System.Threading;
using System.Threading.Tasks;

public class TaskTimer
{
    CancellationTokenSource cancellationTokenSource;
    TimeSpan timerPeriod;
    Action onElapsedCallback;
    bool continueOnError;

    public TaskTimer(Action onElapsedCallback,
        TimeSpan timerPeriod,
        CancellationTokenSource cancellationTokenSource,
        bool continueOnError = true)
    {
        this.cancellationTokenSource = cancellationTokenSource;
        this.timerPeriod = timerPeriod;
        this.onElapsedCallback = onElapsedCallback;
        this.continueOnError = continueOnError;
    }

    public void Start()
    {
        Task elapsedTask = null;
        elapsedTask = new Task((x) =>
        {
            Elapsed(elapsedTask, cancellationTokenSource);
        }, cancellationTokenSource.Token);

        HandleError(elapsedTask);

        elapsedTask.Start();
    }

    private void Elapsed(Task task, object objParam)
    {
        var start = DateTime.Now;
        var cancellationTokenSource = (CancellationTokenSource)objParam;
        if (cancellationTokenSource.Token.IsCancellationRequested)
        {
            Console.WriteLine("TaskTimer: A cancellation has been requested.");
            return;
        }

        onElapsedCallback();

        var delay = timerPeriod - (DateTime.Now - start);
        if (delay.Ticks > 0)
        {
            task = Task.Delay(delay);
        }
        HandleError(task.ContinueWith(Elapsed, cancellationTokenSource));
    }

    private void HandleError(Task task)
    {
        task.ContinueWith((e) =>
        {
            Console.WriteLine(
                $"Exception when running timer callback: {e.Exception}");
            if (!continueOnError)
                cancellationTokenSource.Cancel();
            else
                task.ContinueWith(Elapsed, cancellationTokenSource);
        }, TaskContinuationOptions.OnlyOnFaulted);
    }
}

A few of notes about this TaskTimer:

The endless task-chaining is achieved by creating a Task with execution entry point in the Elapsed method (our callback) and by passing a reference of this task in the Task entry point arguments:

Task elapsedTask = null;
elapsedTask = new Task((x) =>
{
	Elapsed(elapsedTask, cancellationTokenSource);
}, cancellationTokenSource.Token);

Next, inside this Elapsed method, we continue the argument task with the same method:

task.ContinueWith(Elapsed, cancellationTokenSource)

Between each continuation task, a delay might be added, based on the duration of the last iteration and the configured period:

var delay = timerPeriod - (DateTime.Now - start);
if (delay.Ticks > 0)
{
    task = Task.Delay(delay);
}

This implementation will break the task-chaining if the passed CancellationToken signals a cancellation:

if (cancellationTokenSource.Token.IsCancellationRequested)
{
    Console.WriteLine("TaskTimer: A cancellation has been requested.");
    return;
}

Finally, the task-timer will trigger a cancellation signal if an exception is thrown by the callback, and the continueOnError is set to false:

private void HandleError(Task task)
{
    task.ContinueWith((e) =>
    {
		Console.WriteLine(
            $"Exception when running timer callback: {e.Exception}");
        if (!continueOnError)
	        cancellationTokenSource.Cancel();
        else
    	    task.ContinueWith(Elapsed, cancellationTokenSource);
    }, TaskContinuationOptions.OnlyOnFaulted);
}

Let's see now how our sample temperature module evolves using the this TaskTimer:

using System;
using System.Runtime.Loader;
using System.Threading;
using System.Threading.Tasks;

namespace Example4
{
    class Program
    {
        // This is the program entry point
        static async Task Main(string[] args)
        {
            // Create the cancellation token source
            var cancellationTokenSource = new CancellationTokenSource();
            AssemblyLoadContext.Default.Unloading +=
                (cts) => cancellationTokenSource.Cancel();

            Console.CancelKeyPress +=
                (sender, cts) =>
                {
                    Console.WriteLine("Ctrl+C detected.");
                    cts.Cancel = true;
                    cancellationTokenSource.Cancel();
                };

            // Register the Reset command callback 
            await RegisterCommandCallbackAsync("Reset",
                OnReset,
                cancellationTokenSource.Token);

            // Create and start the TaskTimer
            TaskTimer taskTimer = new TaskTimer(
                EmitTelemetryMessage,
                TimeSpan.FromSeconds(1), cancellationTokenSource);

            // A non-blocking call to start the task-timer
            taskTimer.Start();

            // Wait until the app unloads or gets cancelled
            await WhenCancelled(cancellationTokenSource.Token);

            // Let the other threads drain
            Console.WriteLine("Waiting for 2 seconds..");
            await Task.Delay(TimeSpan.FromSeconds(2));

            Console.WriteLine("Exiting..");
        }

        public static Task WhenCancelled(CancellationToken cancellationToken)
        {
            var taskCompletionSource = new TaskCompletionSource<bool>();
            cancellationToken.Register(
                s => ((TaskCompletionSource<bool>)s).SetResult(true),
                taskCompletionSource);
            return taskCompletionSource.Task;
        }
        private static async Task RegisterCommandCallbackAsync(string command,
            Action callback,
            CancellationToken cancellationToken)
        {
            // Perform the command registration
            // Code omitted
            return;
        }

        // A method exposed for RPC
        static void OnReset()
        {
            // Perform a temperature sensor reset
            // Code omitted
        }

        // Emit telemetry message
        static void EmitTelemetryMessage()
        {
            Console.WriteLine($"Sending telemetry message..");
        }
    }
}

Arguably, this version is more readable and at the same time, we've avoided all aforementioned pitfalls. Running this version now produces a similar output as before:

So far, we've been using the Console.WriteLine function to print messages and we've avoided reading any application configuration. In the next post, we'll examine the best approaches on the topic of the configuration.