Steve Spencer's Blog

Blogging on Azure Stuff

Processing data from IoT Hub in Azure Functions

If you have been following my previous posts (Part 1, part 2, part 3) you will know that I’m using an ESP 8266 to send data to the Azure IoT hub. This post will show you how to receive that data and store it in Azure Storage and also show how you can also forward the data onto the Azure Service Bus.

I’m going to use Visual Studio and C# to write my function. If you are unfamiliar with Azure functions you can setup bindings to a variety of Azure resources. These bindings make it easy to interface without needing to write a lot of boiler plate code. These bindings allow your function to be triggered when something happens on the resource or also use the output bindings to write data to these resources. For example, there are bindings for Blob and Table storage, Service bus, Timers etc. We’re interested in the IoT hub binding. The IoT hub trigger will be fired when an event is sent to the underlying Event hub. You can also use an output binding to put messages into the IoT hub event stream. We’re going to use the Table storage and Service bus output bindings.

To get started you need to create a new Function project in Visual Studio.

image

Select IoT hub trigger and browse to a storage account you wish to use (for logging) plus add in the setting name you want to use to store the IoT hub connection string.

image

This will generate your empty function with you preconfigured IoT hub trigger.

You need to add your IoT hub connection string to your setting file. Open local.settings.json and add in a new line below the AzureWebjobs settings with the same name you entered in the dialog. ConnectionStringSetting in my example.Your connection string can be found in the Azure Portal.

Navigate to your IoT hub, then click Shared Access Policies

image

Select the user you want to use to access the IoT hub and click the copy icon next to the primary key connection string.

image

You can run this in the Visual Studio debugger and when messages are sent to your IoT hub you should see a log appearing in the output window.

What I want to do is to receive the temperature and humidity readings from my ESP 8266 and store the data in Azure storage so that we can process it later.

For that I need to use the Table storage output binding. Add the binding attribute to your function below the FunctionName binding.

[return: Table("MyTable", Connection = "StorageConnectionAppSetting")]

Again, you will need to add the storage setting into your config file. Find your storage account in the Azure portal, click Access keys then copy the key1 connection string and paste it in your config file

image

To use Azure Storage Output binding you will need to create a class that represents the columns in you table.

image

I included a device id so that I can identify which device the reading we associated to. You will need to change the return type of your function to be TempHumidityIoTTableEntity then add the code to extract the data from the message.

Firstly, I changed the python code in my ESP8266 to send the data as json so we can process it easier. I’ve also added a message identifier so that we can send different messages from the ESP8266 and be able to process them differently.

sensor.measure()

dataDict = {'partitionKey': 'r',

      'rowkey':'recneptiot'+str(utime.ticks_ms()),

      'message':'temphumidity',

      'temperature':str(sensor.temperature()),

      'humidity': str(sensor.humidity())}

mqtt.publish(sendTopic,ujson.dumps(dataDict),True)

That means we can serialise the Iot Hub message into something we can easily access. So the whole function is below:

[FunctionName("Function1")]
[return: Table("yourtablename", Connection = "StorageConnectionAppSetting")]
public static TempHumidityIoTTableEntity Run([IoTHubTrigger("messages/events", Connection = "ConnectionStringSetting")]EventData message, TraceWriter log)
{
     var messageAsJson = Encoding.UTF8.GetString(message.GetBytes());
     log.Info($"C# IoT Hub trigger function processed a message: {messageAsJson}");

    var data = JsonConvert.DeserializeObject<Dictionary<string, string>>(messageAsJson);

    var deviceid = message.SystemProperties["iothub-connection-device-id"];

    return new TempHumidityIoTTableEntity
     {
         PartitionKey = deviceid.ToString(),
         RowKey = $"{deviceid}{message.EnqueuedTimeUtc.Ticks}",
         DeviceId = deviceid.ToString(),
         Humidity = data.ContainsKey("humidity") ? data["humidity"] : "",
         Temperature = data.ContainsKey("temperature") ? data["temperature"] : "",
         DateMeasured = message.EnqueuedTimeUtc.ToString("O")
     };

}

Providing your config is correct you should be able to run this in the Visual Studio debugger and view your data in Table Storage:

image

I mentioned at the start that I wanted to pass some messages onto the Azure Service bus. For example we may want to do something if the humidity goes above 60 percent. In this example we could add a HighHumidity message to service bus for some other service or function to respond to. We’ll send the message as a json string so that we can action it later in a different service. You can easily add a Service Bus output binding to your function. However, this binding documentation shows it as another return value. There is an alternative binging that allows you to set a message string out parameter with the message contents. This can be used as follows:

    [FunctionName("Function1")]
     [return: Table("yourtablename", Connection = "StorageConnectionAppSetting")]
     public static TempHumidityIoTTableEntity Run([IoTHubTrigger("messages/events", Connection = "ConnectionStringSetting")]EventData message,
         [ServiceBus("yourQueueOrTopicName", Connection = "ServiceBusConnectionSetting", EntityType = EntityType.Topic)]out string queueMessage,
         TraceWriter log)
     {
         var messageAsJson = Encoding.UTF8.GetString(message.GetBytes());
         log.Info($"C# IoT Hub trigger function processed a message: {messageAsJson}");

        var data = JsonConvert.DeserializeObject<Dictionary<string, string>>(messageAsJson);

        var deviceid = message.SystemProperties["iothub-connection-device-id"];

        queueMessage = null;
         if (data.ContainsKey("humidity"))
         {
             int humidity = int.Parse(data["humidity"]);

            if (humidity > 60)
             {
                 Dictionary<string, string> overHumidityThresholdMessage = new Dictionary<string, string>
                 {      
                     { "deviceId",deviceid.ToString()},
                     { "humidity", humidity.ToString()},
                     {"message", "HighHumidityThreshold" }
                 };
                 queueMessage = JsonConvert.SerializeObject(overHumidityThresholdMessage);
             }
         }

        return new TempHumidityIoTTableEntity
         {
             PartitionKey = deviceid.ToString(),
             RowKey = $"{deviceid}{message.EnqueuedTimeUtc.Ticks}",
             DeviceId = deviceid.ToString(),
             Humidity = data.ContainsKey("humidity") ? data["humidity"] : "",
             Temperature = data.ContainsKey("temperature") ? data["temperature"] : "",
             DateMeasured = message.EnqueuedTimeUtc.ToString("O")
         };

    }
}

We now have a function that reads the device temperature and humidity reading into table storage and then sends a message to a Service Bus Topic if the temperature goes above a threshold value.

Azure Relay Hybrid Connections

If you are using the Azure App Service to host your web site and you want to connect to an on-premises server then there a number of ways you can do this. One of the simplest is to use the hybrid connection. Hybrid connections have had a bit of a revamp lately and they used to require a BizTalk service to be created, now you just need a Service Bus Relay. You can generally use the hybrid connection to communicate to your back end server over TCP and you will need to install an agent on your server (or a server that  can reach the one you want to connect to) called the Hybrid Connection Manager (HCM). HCM will make an outbound connection to the Service Bus Relay over ports 80 and 443, so you are unlikely to need firewall ports changing.

Hybrid connections are limited to a specific server name and port and your code in the Azure App Service will address the service as if it was in your local network, but will only be able to connect to the machine and port configured in the Hybrid Connection. Instructions for configuring your hybrid connection and HCM are here.

I have setup a number of the old BizTalk style hybrid connections and the new way is a lot easier to do. I ran into a few connectivity issues when I first created the Relay hybrid connection and there were a few things I found that helped me to find out where the issues were. Firstly the link I provided to configure the hybrid connection has a troubleshooting section which talks about tcpping. You can run this in the debug console in Azure and it will check to see if your HCM is talking to the same relay as the one in your app service. To get to the debug console, log in to your azure portal, select the app service you want to diagnose. Scroll down to Advanced Tools and click Go.

image

This will take you to your Kudu dashboard where you can do a lot of nice things, such as process explorer, diagnostic dumps, log streaming and debug console

The address will be https://[your namespace].scm.azurewebsites.net/

The debug console will allow you to browse and edit files directly in your application without the need to ftp. This is really useful when trying to check configuration issues.

If you want to check connectivity from your server machine to the Azure Relay then you can use telnet. You might need to add the telnet feature to Windows by using:

dism /online /Enable-Feature /FeatureName:TelnetClient (From https://www.rootusers.com/how-to-enable-the-telnet-client-in-windows-10/)

in a command prompt type

telnet [your relay namespace].servicebus.windows.net 80 or

telnet [your relay namespace].servicebus.windows.net 443

Then a blank screen denotes successful connectivity (from: https://social.technet.microsoft.com/wiki/contents/articles/2055.troubleshooting-connectivity-issues-in-the-azure-appfabric-service-bus.aspx)

You can also use PowerShell to check:

Test-Netconnection -ComputerName [your relay namespace].servicebus.windows.net -Port 443

This all checks that you are connected to the relay, the final thing you need to check is whether you can actually resolve the dns of the target service from the server where HCM is running. This needs to be the host name of the server and not the fully qualified name. This also needs to match the machine name you configured in the hybrid connection.The easiest way to do this for me was to put the address of WCF service I wanted to connect to into a browser on the machine running HCM.

Hopefully I’ve given you a few pointers to help identify why your hybrid connection does not connect.

How to emulate Azure Service Bus Topic Subscription Filtering in RabbitMQ

When creating a subscription to an Azure Service Bus Topic you can add a filter which will determine which messages to send to the subscription based upon the properties of the message.

image

This is done by passing a SqlFilter to the Create Subscription method

e.g.

if (!_NamespaceManager.SubscriptionExists(topic, subscription))
{
    if (!String.IsNullOrEmpty(filter))
    {
        SqlFilter strFilter = new SqlFilter(filter);
        await _NamespaceManager.CreateSubscriptionAsync(topic, subscription, strFilter);
        bSuccess = true;
    }
    else
    {
        await _NamespaceManager.CreateSubscriptionAsync(topic, subscription);
        bSuccess = true;
    }
}

Where strFilter is a string representing the properties that you want to filter on e.g.

// Create a "LowMessages" filtered subscription.

SqlFilter lowMessagesFilter = new SqlFilter("MessageNumber <= 3");

namespaceManager.CreateSubscription("TestTopic","LowMessages",lowMessagesFilter);

Applying properties to messages makes it easier to configure multiple subscribers to sets of messages rather than having multiple subscribers that receive all the messages, providing you with a flexible approach to building your messaging applications.

Subscriptions are effectively individual queues that each subscriber uses to hold the messages that a relevant to the subscriptions

When a message is pushed onto a Topic the Service Bus will look at all the subscriptions for the Topic and determine which messages are relevant to the subscription. If it is relevant then the subscription will receive the message into its queue. If there are no subscriptions capable of receiving the message then the message will be lost unless the topic is configured to throw an exception when there are no subscriptions to receive the message.

This approach is useful if most of the message data is stored in the properties (which are subject to a size limit of 64KB) and the body content is serialised to the same object (or the body object types are known).

Receiving messages on a Service Bus Subscription is as follows:

MessagingFactory messageFactory = MessagingFactory.CreateFromConnectionString(_ConnectionString);
SubscriptionClient client = messageFactory.CreateSubscriptionClient(topic, subscription);
message = await client.ReceiveAsync(new TimeSpan(0, 5, 0));
if (message != null)
{
    properties = message.Properties;
    body = message.GetBody<MyCustomBodyData>();
    if (processMessage != null)
    {
        // do some work
    }
    message.Complete();
}

Over the past few months I have been looking at RabbitMQ and trying to apply my Service Bus knowledge, as well as looking at the differences. Routing messages based upon the message properties rather than a routing key defined in the message is still applicable in the RabbitMQ world and RabbitMQ is configurable enough to work in this way. RabbitMQ requires more configuration than Service Bus but there is a mechanism called Header Exchange which can be used to route messages based upon the properties of the message.

The first thing to do is to create the exchange, then assign a queue to it based upon a set of filter criteria. I’ve been creating my exchanges with an alternate exchange to allow me to receive message that are not handled in a default queue. The code to create the exchange and queue that subscribes to messages where the ClientId property is “Client1” and the FileType property is “transaction”.

// Create Header Exchange with alternate-exchange

IDictionary<String, Object> args4 = new Dictionary<String, Object>();

args4.Add("alternate-exchange", alternateExchangeNameForHeaderExchange);

channel.ExchangeDeclare(HeaderExchangeName, "headers", true, false, args4);

channel.ExchangeDeclare(alternateExchangeNameForHeaderExchange, "fanout");

//Queue for Header Exchange Client1 & transaction

Dictionary<string, object> bindingArgs = new Dictionary<string, object>();

bindingArgs.Add("x-match", "all"); //any or all

bindingArgs.Add("ClientId", "Client1");

bindingArgs.Add("FileType", "transaction");

channel.QueueDeclare(HeaderQueueName, true, false, false, args5);

channel.QueueBind(HeaderQueueName, HeaderExchangeName, "", bindingArgs);

//queue for Header Exchange alternate exchange (all other)

channel.QueueDeclare(unroutedMessagesQueueNameForHeaderExchange, true, false, false, null);

channel.QueueBind(unroutedMessagesQueueNameForHeaderExchange, alternateExchangeNameForHeaderExchange, "");

This will setup the exchange and queue in RabbitMQ and now you can send a message to the exchange with the correct properties as follows:

IBasicProperties properties = channel.CreateBasicProperties();
properties.Headers = new Dictionary<string, object>();
properties.Headers.Add("ClientId", "Client1");
properties.Headers.Add("FileType", "transaction");


string routingkey = "header.key";
var message = "Hello World";
var body = Encoding.UTF8.GetBytes(message);

channel.BasicPublish(exchange: TopicName,
                                routingKey: routingkey,
                                basicProperties: properties,
                                body: body);

Receiving messages from the queue is as follows:

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
    var body = ea.Body;
    var message = Encoding.UTF8.GetString(body);
    var routingKey = ea.RoutingKey;
    Byte[] FileTypeBytes = (Byte[])ea.BasicProperties.Headers["FileType"];
    Byte[] ClientIDBytes = (Byte[])ea.BasicProperties.Headers["ClientId"];
    string FileType = System.Text.Encoding.ASCII.GetString(FileTypeBytes);
    string ClientID = System.Text.Encoding.ASCII.GetString(ClientIDBytes);
    Console.WriteLine(" [x] Received '{0}':'{1}' [{2}] [{3}]",
                        routingKey,
                        message,
                        ClientID,
                        FileType);
    EventingBasicConsumer c = model as EventingBasicConsumer;
    if (c != null)
    {
        c.Model.BasicAck(ea.DeliveryTag, false);
        Console.WriteLine(" [x] Received {0} rk {1} ex {2} ct {3}", message, ea.RoutingKey, ea.Exchange, ea.ConsumerTag);
    }
};
channel.BasicConsume(queue: queueProcessorBaseName + textBox1.Text,
                        noAck: false,
                        consumer: consumer);

Again an out of the box feature for Service Bus can also be implemented in RabbitMQ but it is much simpler to use in Service Bus. The use of properties to route messages offers a much more flexible approach but does require that the body of the messages are either not used or are understood by each consumer. Service Bus offers more flexibility as the query string can contain a variety of operators whereas RabbitMQ matches all or some of the header values and not a range.

Dead Letters with Azure Service Bus and RabbitMQ

Firstly, what are dead letters?

When a  message is received in a messaging system, something tries to process it. The message is normally understood by the system and can be processed, sometimes however the messages are not understood and can cause the receiving process to fail. The failure could be caught by the systems and dealt with but in extreme situations the message could cause the receiving process to crash. Messages that cannot be delivered or that fail when processed need to be removed from the queue and stored somewhere for later analysis. A message that fails in this way is called a dead letter and the location where these dead letters reside is called a dead letter queue. Queuing systems such as Azure Service Bus, Rabbit MQ and others have mechanisms to handle this type of failure. Some systems handle them automatically and others require configuration.

Dead letter queues are the same as any other queue except that they contain dead letters. As they are queues they can be processed in the same way as the normal queues except that they have a different address to the normal queue. I’ve already discussed Service Bus Dead Letter Queue addressing in a previous post and this is still relevant today.

On RabbitMQ a Dead Letter queue is just another queue and is addressed in the same way as any other queue. The difference is in the way the Dead Letter queue is setup. Firstly you create a dead letter queue and then you add it to the queue you want to use it with.

To set up the dead letter queue, declare a “direct” exchange and bind a queue to it:

channel.ExchangeDeclare(DeadLetterExchangeName, "direct");
channel.QueueDeclare(DeadLetterQueueName, true, false, false, null);
channel.QueueBind(DeadLetterQueueName, DeadLetterExchangeName, DeadLetterRoutingKey, null);

I’ve used a dead letter routing key that is related to the queue I want to use it from with an additional “dl”. The routing key needs to be unique so that only messages you want to go to this specific dead letter queue will be delivered. e.g. Payments.Received.DL

Now we need to attach the dead letter queue to the correct queue, so when I created my new queue I needed to add the dead letter queue to it

IDictionary<String, Object> args3 = new Dictionary<String, Object>();
args3.Add("x-dead-letter-exchange", DeadLetterExchangeName);
args3.Add("x-dead-letter-routing-key", DeadLetterRoutingKey);
channel.QueueDeclare(queueName, true, false, false, args3);
channel.QueueBind(queueName, TopicName, paymentsReceivedRoutingKey)
;

Whilst there is a lot of flexibility with RabbitMQ, Dead Letter queues come out of the box with Azure Service Bus. Each topic and queue has  one and is enabled by default. RabbitMQ however allows each topic subscription to have their own dead letter queue which allows you to have a finer grained control over what to do with each type of failed message.

Now we have these dead letter queues and we know how to access them, how do we get messages into them.

In Azure Service Bus, there is a mechanism that will automatically put the message in the dead letter queue if the message fails to be delivered 10 times (default). However, you may wish to handle bad messages yourself in code without relying upon the system to do this for you. If a message is delivered 10 times before failure, you are utilising system resources when the message is being processed and these resources could be used to process valid messages. When the message is receive and validation of the message has failed or there is an error whilst processing that you have caught, then you can explicitly send the message to the dead letter queue by calling the dead letter method on the message object.

BrokeredMessage receivedMessage = subscriptionClient.EndReceive(result);

if (receivedMessage != null)
{
    Random rdm = new Random();
    int num = rdm.Next(100);
    Console.WriteLine("Random={0}", num);
    if (num < 10)
    {
        receivedMessage.DeadLetter("Randomly picked for deadletter", "error 123");
        Console.WriteLine("Deadlettered");
    }
    else
    {
        receivedMessage.Complete();
    }
}

My test code, above, randomly sends 10% of my message to the dead letter queue.

In Rabbit MQ will be published to the dead letter queue if one of the following occurs:

  1. The message is rejected by calling BasicNack or BasicReject
  2. The TTL (Time to Live) expires
  3. The queue length limit is exceeded

I’ve written a similar piece of test code for RabbitMQ

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
    var body = ea.Body;
                       
    var message = Encoding.UTF8.GetString(body);
    Random random = new Random((int)DateTime.Now.Ticks);
    int randomNumber = random.Next(0, 100);
    if (randomNumber > 30)
    {
        channel.BasicAck(ea.DeliveryTag, false);
        Console.WriteLine(" [x] Received {0} rk {1} ex {2} ct {3}", message, ea.RoutingKey, ea.Exchange, ea.ConsumerTag);
    }
    else
    {
        if (randomNumber > 10)
        {
            channel.BasicNack(ea.DeliveryTag,false, true);
            Console.WriteLine(" [xxxxx] NAK {0} rk {1} ex {2} ct {3}", message, ea.RoutingKey, ea.Exchange, ea.ConsumerTag);
        }
        else
        {
            Console.WriteLine(" [xxxxx] DeadLetter {0} rk {1} ex {2} ct {3}", message, ea.RoutingKey, ea.Exchange, ea.ConsumerTag);
            channel.BasicNack(ea.DeliveryTag, false, false);
        }
    }
    Thread.Sleep(200);
};
channel.BasicConsume(queue: "hello",
                        noAck: false,
                        consumer: consumer);

If you look at the code you will see that there are two places where BasicNack is called and only one of them sends them to the dead letter queue. BasicNack takes 3 parameters and the last one is “requeue”. Setting requeue to true will put the message back on the originating queue whereas setting requeue to false will publish the message on the dead letter queue.

Both RabbitMQ and Service Bus have the dead letter queue concept and can be used in a similar way. Service Bus has one configured by default and has both an automatic and manual mechanism for publishing message to the dead letter queue. RabbitMQ requires more configuration and does not have the same automation for dead lettering but it can be configured with more flexibility.

Unhandled Messages with Azure Service Bus and RabbitMQ

One of the requirements for our messaging system is to be able to build a system to process messages and either

  1. Have a default handler and then add custom handlers as and when they are required without needing to recode the main system.
  2. Be notified if a message is put onto a topic and there isn’t a process to handle the message.

In RabbitMQ this is relatively straight forward and requires creating an alternate-exchange, adding it as a property to your main exchange and then creating a queue to service the alternate-exchange

 

IDictionary<String, Object> args2 = new Dictionary<String, Object>();

args2.Add("alternate-exchange", alternateExchangeName);

channel.ExchangeDeclare(mainExchangeName, "direct", false, false, args2);

channel.ExchangeDeclare(alternateExchangeName, "fanout");

// Adds a queue bound to the unhandled messages exchange

channel.QueueDeclare(unroutedMessagesQueueName, true, false, false, null);

channel.QueueBind(unroutedMessagesQueueName, alternateExchangeName, "");

Now when a message is published on the main exchange and there is no subscription to handle the message, then the message will automatically appear on the unrouted message queue. This solution will solve both the scenarios we were looking for.

I was interested however understanding how to do this in the Azure Service Bus and whilst it is possible isn’t not as straight forward and will require some code to setup. Topics can be configured to throw an exception if there is no subscription available to process the message when the message is sent. So When the topic is created it needs to be configured to enable this exception to be thrown.

NamespaceManager namespaceManager =

               NamespaceManager.CreateFromConnectionString(_ConnectionString);

TopicDescription td = new TopicDescription(topic)

{

          EnableFilteringMessagesBeforePublishing = true

};

await namespaceManager.CreateTopicAsync(td);

 

Now when a message is sent we need to handle the exception and do something with the message. This is the difference between RabbitMQ and Service Bus. In RabbitMQ the message will automatically end up in the unhandled message queue. In service bus we will need to actually add it to the unhandled message queue when the message is sent. This means that at each message producer, the code will need to handle the exception:

try

{

     client.Send(message);

}

catch(NoMatchingSubscriptionException ex)

{

     // Do something here to process the unhandled message

     // Probably put it on an unhandled message queue

}

Note, however, that if you had a subscription that was a catch all (for example logging all the messages) then unhandled messages would not appear as they are already being handled by the catch all subscription.

Unlock The Door Demo Software on GitHub

If you attended my DDD East Anglia talk “A Raspberry Pi2, Azure ML and Project Oxford to unlock that door!” where I integrate a Raspberry Pi running Windows 10 IoT core with the service bus , Project Oxford for face recognition and a Windows Store App to take my picture and hopefully unlock my door. Yes I did bring a door with me. Thanks for attending and for your nice comments.

I have started to put my code up on GitHub. The code for the Raspberry Pi is already there - https://github.com/sdspencer-mvp/RaspberryPi2-UnlockTheDoor. More will appear later as I tidy it up and remove all my config secrets Winking smile

I will be repeating this talk at Smart Devs in Hereford on 12 October 2015 and again at DDD North in Sunderland on 24 October 2015.

Raspberry Pi2 , Iot Core and Azure Service Bus

Using Raspberry Pi2 on Windows 10 IoT core has a number of challenges mainly due to the limitations of both the universal app APIs and also the lack of APIs that currently run on the platform. I specifically wanted to utilise Azure Service Bus Topics to send/receive messages on my Raspberry Pi2. After a bit of searching around I decided that the easiest way to achieve this was to use the Service Bus REST API. There are a number of samples included in the documentation:

Receiving a message: https://msdn.microsoft.com/en-us/library/azure/hh690923.aspx

Sending a message: https://msdn.microsoft.com/en-us/library/azure/hh690922.aspx

The full code for the sample uses WebClient but I needed to use HttpClient so I converted the samples accordingly.

[EDIT] The above links don't work anymore so I've published my code on GitHub https://github.com/sdspencer-mvp/RaspberryPi2-UnlockTheDoor/blob/master/UnlockTheDoor/MainPage.xaml.cs 

Sending a message to the service bus requires a POST and receive and delete requires a DELETE. The following code shows how this was achieved using HttpClient

private async void SendMessage(string baseAddress, string queueTopicName, string token, string body, IDictionary<string, string> properties)

{

    string fullAddress = baseAddress + queueTopicName + "/messages" + "?timeout=60&api-version=2013-08 ";

    await SendViaHttp(token, body, properties, fullAddress, HttpMethod.Post);

}

 

 

 

// Receives and deletes the next message from the given resource (queue, topic, or subscription)

// using the resourceName and an HTTP DELETE request.

private static async System.Threading.Tasks.Task <string> ReceiveAndDeleteMessageFromSubscription(string baseAddress, string topic, string subscription, string token, IDictionary<string, string> properties)

{

    string fullAddress = baseAddress + topic + "/Subscriptions/" + subscription + "/messages/head" + "?timeout=60";

    HttpResponseMessage response = await SendViaHttp(token, "", properties, fullAddress, HttpMethod.Delete);

    string content = "";

    if (response.IsSuccessStatusCode)

    {

        // we should have retrieved a message

        content = await response.Content.ReadAsStringAsync();

    }

    return content;

}

 

 

 

private static async System.Threading.Tasks.Task<HttpResponseMessage> SendViaHttp(string token, string body, IDictionary<string, string> properties, string fullAddress, HttpMethod httpMethod )

{

    HttpClient webClient = new HttpClient();

    HttpRequestMessage request = new HttpRequestMessage()

    {

        RequestUri = new Uri(fullAddress),

        Method = httpMethod ,

 

    };

    webClient.DefaultRequestHeaders.Add("Authorization", token);

 

    if (properties != null)

    {

        foreach (string property in properties.Keys)

        {

            request.Headers.Add(property, properties[property]);

        }

    }

    request.Content = new FormUrlEncodedContent(new[] { new KeyValuePair<string, string>("", body) });

    HttpResponseMessage response = await webClient.SendAsync(request);

    if (!response.IsSuccessStatusCode)

    {

        string error = string.Format("{0} : {1}", response.StatusCode, response.ReasonPhrase);

        throw new Exception(error);

    }

    return response;

}

 

There was an issue with the GetSASToken method as some of the encryption classes weren't supported on the Universal App so I converted it to the following:

private string GetSASToken(string baseAddress, string SASKeyName, string SASKeyValue)

{

    TimeSpan fromEpochStart = DateTime.UtcNow - new DateTime(1970, 1, 1);

    string expiry = Convert.ToString((int)fromEpochStart.TotalSeconds + 3600);

    string stringToSign = WebUtility.UrlEncode(baseAddress) + "\n" + expiry;

    string hmac = GetSHA256Key(Encoding.UTF8.GetBytes(SASKeyValue), stringToSign);

    string hash = HmacSha256(SASKeyValue, stringToSign);

    string sasToken = String.Format(CultureInfo.InvariantCulture, "SharedAccessSignature sr={0}&sig={1}&se={2}&skn={3}",

        WebUtility.UrlEncode(baseAddress), WebUtility.UrlEncode(hash), expiry, SASKeyName);

    return sasToken;

}

 

 

public string HmacSha256(string secretKey, string value)

{

    // Move strings to buffers.

    var key = CryptographicBuffer.ConvertStringToBinary(secretKey, BinaryStringEncoding.Utf8);

    var msg = CryptographicBuffer.ConvertStringToBinary(value, BinaryStringEncoding.Utf8);

 

    // Create HMAC.

    var objMacProv = MacAlgorithmProvider.OpenAlgorithm(MacAlgorithmNames.HmacSha256);

    var hash = objMacProv.CreateHash(key);

    hash.Append(msg);

    return CryptographicBuffer.EncodeToBase64String(hash.GetValueAndReset());

}

 

This allowed me to send and receive messages on my Raspberry Pi2 using IoT core. I created the subscriptions for the topic using a separate app using the .NET SDK which is cheating I guess, but I’ll get around to converting it at some point.

 

In order to use this the following parameters are used:

 

SendMessage( BaseAddress, Username, Token, MessageBody, MessageProperties)

 

BaseAddress is “https://<yournamespace>.servicebus.windows.net/”

 

Token is the return value from the GetSASToken method. using the same base address as above and the KeyName and Key are obtained from the Azure portal and is of the format

 

Endpoint=sb://<yournamespace>.servicebus.windows.net/;SharedAccessKeyName=<KeyName>;SharedAccessKey=<Key>.

 

MessageBody – This is the string value of the message body

 

MessageProperties are a Dictionary containing name/value pairs that will get added to the Request headers. For example I have set the message properties when I press the door bell button on my Raspberry PI2

 

Dictionary<string, string> properties = new Dictionary<string, string>();

properties.Add("Priority", "High");

properties.Add("MessageType", "Command");

properties.Add("Command", "BingBong");

 

These are added to the service bus message and allow me to have subscriptions that filer on Command message types as well as the specific command of BingBong

 

Receiving messages are a bit trickier as we need to create a separate task that is continually running. Once the message is received we need to get back to the main tread to execute the action for the message

await Task.Run(async () =>

{

.

.

.

string message = await ReceiveAndDeleteMessageFromSubscription(_BaseAddress

,_TopicName

, _SubscriptionName

, token, null);

 if (message.Contains("Unlock"))

{

   await Windows.ApplicationModel.Core.CoreApplication.MainView.CoreWindow.Dispatcher.RunAsync(

      CoreDispatcherPriority.Normal,

      () =>

      {

          SwitchLED(false);

     });

}

 

.

.

}

 

You may want to put a delay in this if receiving the messages causes the app to slow down due to the message loop hogging all the resources. There’s a default timeout in the call to SendAsync and this will automatically slow the thread down.

 

I now have a working Raspberry PI2 that can send and receive message to the Azure Service bus. I’ve created a test win forms app that allows me to send messages to the Service bus and it allows me to control the Raspberry Pi2 remotely. The next phase is to build a workflow engine that hooks up to the service bus and allows me to automatically control the Raspberry Pi. 

5 Tips for using Azure Web Jobs

1. Use public on the main program class.In order for web jobs to initialise correctly the main class that contains the web jobs needs to be made public. Once this has been added the individual jobs can then be read and should be visible in the output when running locally.

clip_image001

2. In order to store and view the invocation details for each web job you need to configure AzureWebJobsDashboard in the configure tab of the website you have deployed the web job to. Even if you have configured this in your app.config file.

clip_image003

If this is not configured in the website then you will receive the following error when you try and view the web jobs dashboard

clip_image002

3. Debug using Visual Studio. Once of the nice features of the web jobs SDK is the ability to run and debug the web job locally in Visual Studio. Following the Getting Started guide, you create a console application which you can debug in Visual Studio before deploying it to Azure

4. User TextWriter for debugging. The Azure Web Jobs SDK (see the logging section) provides a mechanism to log out information that can be viewed through the Azure Web jobs dashboard. By adding a TextWriter as an input parameter to your web job method, you can use WriteLine to then output information you wish to log.

5. Make your Blob Triggers more efficient by triggering them using BlobOutput. The mechanism that the BlobInput trigger uses has a 10-20 minute lag before the trigger can fire, but each time BlobOutput is used it triggers a rescan for Blob input.

“There is an optimization where any blob written via a [BlobOutput] (as opposed to being written by some external source) will optimistically check for any matching [BlobInputs],” See How does [BlobInput] work?. Storage Queues and Service Bus topics and Queues are generally processed within seconds so if you can use a queue to trigger a BlobOutput then use this to trigger any subsequent BlobInputs

Azure Service Bus Event Hub Firewall Port

I’m investigating the Azure Service Bus Event Hub using the getting started tutorial and I didn’t seem to be able to receive any data. It turns out that our firewall was blocking an outbound port. After some investigation I found a post which hinted at a port for the on premise service bus. Our IT guys kindly enabled the outbound port 5671 and I now can receive data from the event hub.

For completeness the following site has details of the other firewall ports required for service bus : http://msdn.microsoft.com/en-us/library/ee732535.aspx

Internet of Things (IoT): Gadgeteer and Service Bus

Internet of Things seems to bring together two of my favourite topics: Gadgeteer and Service Bus. Whilst researching IoT I came across an article in MSDN magazine written by Clemens Vasters (http://msdn.microsoft.com/en-us/magazine/jj190807.aspx). This article is from July 2012 and things have moved on a little since then, but the fact that he  has Gadgeteer talking to service bus meant that I had to give it a go myself. The first port of call was the previous article (http://msdn.microsoft.com/en-us/magazine/jj133819.aspx - note the link is wrong in the current article). This explains the architecture that the sample is based upon using service bus topics to send commands to the device and a different topic to allow the device to send data. There is also a provisioning service that allows the devices to be initialised with the correct configuration. this provisioning service also configures up the service bus access control service (ACS) to allow each device to have its own security key. This means you can turn off devices using ACS.

Before you start take a look at the Service Bus Explorer as this is a useful tool when you are trying to diagnose why things aren’t working.

As I’m using a GHI Electronics Fez Spider main board I am using the .Net Micro-Framework 4.2. Upgrading the project to 4.2 had a couple of errors which needed resolving. Firstly, you will need to change GetJoystickPosition to GetPosition; secondly, change ConvertBase.ToBase64String to Convert.ToBase64String. This allowed me to run the project on my Gadgeteer board. However, I kept getting an error whenever I tried to call the provisioning service. I kept getting Bad Request. I immediately assumed that my configuration was wrong but after a bit of searching and then turning WCF tracing on I found that the service could not load the service bus assembly so I removed and the re-added it to solve the problem. As I mention configuration its probably a good idea to say what each of the settings in the provisioning service is used for:

sharedSignature : Go to https://manage.windowsazure.com/ and login. Click on service bus and then select the service bus you are using. on the bottom menu click the Connecting Information button. This will popup a configuration window. there are two keys in here. The first is part of the connectionstring and is under the sas section. Copy the connection string and find the key. This is the sharedSignature for this configuration setting.

servicebusNamespace: This is the name of the service bus as it appears in the management portal.i.e. sb://<servicebusNamespace>.servicebus.windows.net

managementKey: In the same connection information popup where you found the shared signature there is a section at the bottom labelled ACS. the managementkey is the Default Key

Microsoft.ServiceBus.ConnectionString: I used the connection string that appears in the SAS section of the connection information popup.

The other configuration you need to do is to change the url for the provisioning service. This is hard coded on the Gadgeteer board and is located in Program.cs, serverAddress variable in the ServiceBusApp project.

The provisioning service should be ready to go. However, I had problems connecting to the service from the Gadgeteer board as I kept receiving NotSupportedException each time I called GetRequestStream. This was due to an issue with the Ethernet configuration when trying to connect over https. This can be solved by updating the ssl seed using the Fez Config tool (https://www.ghielectronics.com/community/forum/topic?id=13927). This is done by clicking the Deployment (Advanced) button and the clicking Update SSL Seed.

image

Once complete I could then connect to the provisioning service. The provisioning service should only be called once per device and it is up to the device to store its configuration in a persistent store. This did not appear to be working on my device. Some of the settings were being but the topic urls were not. I changed the type from a URI to a string and the persistence then seemed to work and I only went to provision once. Each time the provisioning service is called a new subscription is create and a new access control identity and rule are also created.

With all this fixed I could now send messages, but I could not see them. this was because I didn’t have a subscriber to the topic where the data was published. This is easily resolved by creating one, but it will only receive new messages. Any messages sent before the subscription is created will be lost.

The provisioning service also has a web page that allows you to send commands to each device. It will broadcast a message to all devices by putting a single message into the devices topic and it sets the Broadcast property to be true. During provisioning the subscription that is created has a SQL Filter applied which allows the subscription to only receive messages that are targeted specifically at the device or if they are broadcast. The web page puts a message into the topic to tell the device to set its temperature to a specific value.The device should be listening for messages to its subscription and will act on the command once it is received.

The device never seemed to receive the message even though the Service Bus Explorer showed that the message was waiting in the queue. Whenever we tried to connect to the subscription “Bad Request” was being returned. After investigation is turns out that the sample only ever sets the event topic uri and not the devices topic uri. When we try and retrieve the device commands we are trying to connect to the events topic which is not a subscription. The sample needs modifying in Microsoft.ServiceBus.Micro project in the MessagingClient class. I added an extra Uri to the constructor and modified the CreateReceiveRequest and CreateLockRequest methods to use this Uri.

The final thing I changed was the command that is sent from the web page and how it was received:

The sender code in Default.aspx.cs in the BackEndWebrole Project

deviceSender.Broadcast(new Dictionary<string, object> { { "Temperature", this.TextBox1.Text } }, "SetTemperature");

And the receiver code in Program.cs in the ServiceBusApp project:

switch (commandType)
{
      case "SetTemperature":
      if (cmd.Properties.Contains("Temperature"))
      {
               this.settings.TargetTemperature = double.Parse((string)cmd.Properties["Temperature"]);
               StoreSettings(this.settings);
     }
     break;
}

I now have a Gadgeteer device talking to the service bus with the ability to send data and receive commands. My next steps are to create a webjob to process the event data (see my previous post) and also look into event hubs.