Steve Spencer's Blog

Blogging on Azure Stuff

Processing data from IoT Hub in Azure Functions

If you have been following my previous posts (Part 1, part 2, part 3) you will know that I’m using an ESP 8266 to send data to the Azure IoT hub. This post will show you how to receive that data and store it in Azure Storage and also show how you can also forward the data onto the Azure Service Bus.

I’m going to use Visual Studio and C# to write my function. If you are unfamiliar with Azure functions you can setup bindings to a variety of Azure resources. These bindings make it easy to interface without needing to write a lot of boiler plate code. These bindings allow your function to be triggered when something happens on the resource or also use the output bindings to write data to these resources. For example, there are bindings for Blob and Table storage, Service bus, Timers etc. We’re interested in the IoT hub binding. The IoT hub trigger will be fired when an event is sent to the underlying Event hub. You can also use an output binding to put messages into the IoT hub event stream. We’re going to use the Table storage and Service bus output bindings.

To get started you need to create a new Function project in Visual Studio.

image

Select IoT hub trigger and browse to a storage account you wish to use (for logging) plus add in the setting name you want to use to store the IoT hub connection string.

image

This will generate your empty function with you preconfigured IoT hub trigger.

You need to add your IoT hub connection string to your setting file. Open local.settings.json and add in a new line below the AzureWebjobs settings with the same name you entered in the dialog. ConnectionStringSetting in my example.Your connection string can be found in the Azure Portal.

Navigate to your IoT hub, then click Shared Access Policies

image

Select the user you want to use to access the IoT hub and click the copy icon next to the primary key connection string.

image

You can run this in the Visual Studio debugger and when messages are sent to your IoT hub you should see a log appearing in the output window.

What I want to do is to receive the temperature and humidity readings from my ESP 8266 and store the data in Azure storage so that we can process it later.

For that I need to use the Table storage output binding. Add the binding attribute to your function below the FunctionName binding.

[return: Table("MyTable", Connection = "StorageConnectionAppSetting")]

Again, you will need to add the storage setting into your config file. Find your storage account in the Azure portal, click Access keys then copy the key1 connection string and paste it in your config file

image

To use Azure Storage Output binding you will need to create a class that represents the columns in you table.

image

I included a device id so that I can identify which device the reading we associated to. You will need to change the return type of your function to be TempHumidityIoTTableEntity then add the code to extract the data from the message.

Firstly, I changed the python code in my ESP8266 to send the data as json so we can process it easier. I’ve also added a message identifier so that we can send different messages from the ESP8266 and be able to process them differently.

sensor.measure()

dataDict = {'partitionKey': 'r',

      'rowkey':'recneptiot'+str(utime.ticks_ms()),

      'message':'temphumidity',

      'temperature':str(sensor.temperature()),

      'humidity': str(sensor.humidity())}

mqtt.publish(sendTopic,ujson.dumps(dataDict),True)

That means we can serialise the Iot Hub message into something we can easily access. So the whole function is below:

[FunctionName("Function1")]
[return: Table("yourtablename", Connection = "StorageConnectionAppSetting")]
public static TempHumidityIoTTableEntity Run([IoTHubTrigger("messages/events", Connection = "ConnectionStringSetting")]EventData message, TraceWriter log)
{
     var messageAsJson = Encoding.UTF8.GetString(message.GetBytes());
     log.Info($"C# IoT Hub trigger function processed a message: {messageAsJson}");

    var data = JsonConvert.DeserializeObject<Dictionary<string, string>>(messageAsJson);

    var deviceid = message.SystemProperties["iothub-connection-device-id"];

    return new TempHumidityIoTTableEntity
     {
         PartitionKey = deviceid.ToString(),
         RowKey = $"{deviceid}{message.EnqueuedTimeUtc.Ticks}",
         DeviceId = deviceid.ToString(),
         Humidity = data.ContainsKey("humidity") ? data["humidity"] : "",
         Temperature = data.ContainsKey("temperature") ? data["temperature"] : "",
         DateMeasured = message.EnqueuedTimeUtc.ToString("O")
     };

}

Providing your config is correct you should be able to run this in the Visual Studio debugger and view your data in Table Storage:

image

I mentioned at the start that I wanted to pass some messages onto the Azure Service bus. For example we may want to do something if the humidity goes above 60 percent. In this example we could add a HighHumidity message to service bus for some other service or function to respond to. We’ll send the message as a json string so that we can action it later in a different service. You can easily add a Service Bus output binding to your function. However, this binding documentation shows it as another return value. There is an alternative binging that allows you to set a message string out parameter with the message contents. This can be used as follows:

    [FunctionName("Function1")]
     [return: Table("yourtablename", Connection = "StorageConnectionAppSetting")]
     public static TempHumidityIoTTableEntity Run([IoTHubTrigger("messages/events", Connection = "ConnectionStringSetting")]EventData message,
         [ServiceBus("yourQueueOrTopicName", Connection = "ServiceBusConnectionSetting", EntityType = EntityType.Topic)]out string queueMessage,
         TraceWriter log)
     {
         var messageAsJson = Encoding.UTF8.GetString(message.GetBytes());
         log.Info($"C# IoT Hub trigger function processed a message: {messageAsJson}");

        var data = JsonConvert.DeserializeObject<Dictionary<string, string>>(messageAsJson);

        var deviceid = message.SystemProperties["iothub-connection-device-id"];

        queueMessage = null;
         if (data.ContainsKey("humidity"))
         {
             int humidity = int.Parse(data["humidity"]);

            if (humidity > 60)
             {
                 Dictionary<string, string> overHumidityThresholdMessage = new Dictionary<string, string>
                 {      
                     { "deviceId",deviceid.ToString()},
                     { "humidity", humidity.ToString()},
                     {"message", "HighHumidityThreshold" }
                 };
                 queueMessage = JsonConvert.SerializeObject(overHumidityThresholdMessage);
             }
         }

        return new TempHumidityIoTTableEntity
         {
             PartitionKey = deviceid.ToString(),
             RowKey = $"{deviceid}{message.EnqueuedTimeUtc.Ticks}",
             DeviceId = deviceid.ToString(),
             Humidity = data.ContainsKey("humidity") ? data["humidity"] : "",
             Temperature = data.ContainsKey("temperature") ? data["temperature"] : "",
             DateMeasured = message.EnqueuedTimeUtc.ToString("O")
         };

    }
}

We now have a function that reads the device temperature and humidity reading into table storage and then sends a message to a Service Bus Topic if the temperature goes above a threshold value.