Integrating Service Bus Stack With Logic Apps And Azure Functions

Service Bus Queues

Service Bus is a great way to send and receive messages in a decoupled way and can be used in many languages. When sending a message from the C# SDK, and reading from Service Bus queues from Logic Apps or Azure Functions, you might notice the incoming message has a namespace appended in front of it, as well as some erroneous characters in front and after it. This has to do with how the messages are being serialized when creating a BrokeredMessage using an object. Looking at the documentation, we can use constructors using an object or a Stream. We will be sending a simple message, represented by the following Payload class.

/// <summary>
/// The payload to be sent.
/// </summary>
public class  Payload
{
    /// <summary>
    /// Gets or sets the name.
    /// </summary>
    public string  Name { get; set; }
 
    /// <summary>
    /// Gets or sets the value.
    /// </summary>
    public string  Value { get; set; }
}

Let's start by creating an application which uses the constructor with an object, and sends an instance of the Payload class represented as JSON. We will expect you have already created a Service Bus queue called directqueuetest.

/// <summary>
/// The program which will help us test various Service Bus scenarios.
/// </summary>
public class  Program
{
    /// <summary>
    /// Client used to connect to queue.
    /// </summary>
    private static  readonly QueueClient queueClient =
        QueueClient.CreateFromConnectionString(
            "Endpoint=sb://<;yournamespace>.servicebus.windows.net/;SharedAccessKeyName=Send;SharedAccessKey=<youraccesskey>;EntityPath=directqueuetest");
 
    /// <summary>
    /// The main method, this is where our application will start.
    /// </summary>
    public static  void Main(string[] args)
    {
        SendToQueue().Wait();
 
        Console.WriteLine("Press any key to continue...");
        Console.ReadKey();
    }
 
    /// <summary>
    /// Send a message to a queue.
    /// </summary>
    public static  async Task SendToQueue()
    {
        // Inform user
        Console.WriteLine("Sending to queue directly");
 
        // Send to ServiceBus
        await
            queueClient.SendAsync(
                new BrokeredMessage(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(new Payload { Name = "Direct Queue", Value = "Object - UTF8 GetBytes - JsonConvert"  }))));
 
        // Inform user
        Console.WriteLine("Finished sending to queue directly");
    }
}

We now want to use this message in a Logic App, so let's create the following Logic App.

In this Logic App, we will be receiving a message from the queue, and send the JSON payload to an email. Now if we send a message from our application, we will get an email with a body like the following.

3http://schemas.microsoft.com/2003/10/Serialization/�E{"Name":"Direct Queue","Value":"Object - UTF8 GetBytes - JsonConvert"� }

As you can see, it indeed has the namespace and erroneous characters in it. Luckily there is an easy fix for, by switching our constructor from an object to a stream. To do this, we will change the constructor for our BrokeredMessage in our SendToQueue method like this.

// Send to ServiceBus
await
    queueClient.SendAsync(
        new BrokeredMessage(
            new MemoryStream(
                Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(new Payload { Name = "Direct Queue", Value = "Stream - UTF8 GetBytes - JsonConvert"  })))));

We now wrap our object converted to JSON in a MemoryStream, and pass this into our constructor. We will be using the same Logic App as before, and after running our application we should now get an email with the following body.

{"Name":"Direct Queue","Value":"Stream - UTF8 GetBytes - JsonConvert"}

This time we received a valid JSON message, which we could also have used in our Logic App to get values from our payload, just by changing one line in our code.

Event Hubs And Stream Analytics

Another great service offered by Service Bus is Event Hubs, which is amazing for processing high throughput, high amount of messages. If you're working with Event Hubs, chances are you will also be using Stream Analytics to process the messages. In-Stream Analytics you can have various outputs, like PowerBI, DocumentDB, Azure Storage, and also Service Bus Queues. When writing from Stream Analytics to a Service Bus Queue, you will again find that namespace and erroneous characters have been added. Unfortunately, this can not be solved as easily as when we send it to a queue ourselves, because this namespace is added in Stream Analytics itself. Luckily, we can use an Azure Function to fix our message coming into our Logic App. For this example, we will be using the following application to send Payload (which we create in the previous chapter) messages to Event Hubs. You will need to have created an Event Hub called eventhubstest.

/// <summary>
/// The program which will help us test various Service Bus scenarios.
/// </summary>
public class  Program
{
    /// <summary>
    /// Client used to connect to event hubs.
    /// </summary>
    private static  readonly EventHubClient eventHubsClient =
        EventHubClient.CreateFromConnectionString(
            "Endpoint=sb://<;yournamespace>.servicebus.windows.net/;SharedAccessKeyName=Send;SharedAccessKey=<youraccesskey>;EntityPath=eventhubstest");
 
    /// <summary>
    /// The main method, this is where our application will start.
    /// </summary>
    public static  void Main(string[] args)
    {
        SendToEventHubs().Wait();
 
        Console.WriteLine("Press any key to continue...");
        Console.ReadKey();
    }
 
    /// <summary>
    /// Send message to EventHubs.
    /// </summary>
    public static  async Task SendToEventHubs()
    {
        // Inform user
        Console.WriteLine("Sending via Event Hubs");
 
        // Send to Event Hubs
        await
            eventHubsClient.SendAsync(
                new EventData(new Payload { Name = "Event Hubs", Value = "Object - JsonSerializer"  }, new  DataContractJsonSerializer(typeof(Payload))));
 
        // Inform user
        Console.WriteLine("Finished sending via Event Hubs");
    }
}

Now create a Stream Analytics job with the Event Hub as input (name = eventhubs), and the directqueuetest queue from our previous chapter as an output (name = queue). Use the query shown below.

SELECT
    *
INTO
    [queue]
FROM
    [eventhubs]

This will simply pass all the data from our incoming message to a message on the queue. In normal situations, you will probably be doing some filtering and slicing on the data. Start Stream Analytics, and make sure the Logic App from our previous chapter is still up and running. Now when we run our application, we will get an email with a body as follows.

@ string 3http://schemas.microsoft.com/2003/10/Serialization/��{"Name":"Event Hubs","Value":"Object - JsonSerializer","EventProcessedUtcTime":"2016-07-07T09:52:00.2021627Z","PartitionId":0,"EventEnqueuedUtcTime":"2016-07-07T09:52:00.4660000Z"}

As you can see, there was indeed a namespace being added, as well as some erroneous characters. Now to strip these off, we will create an Azure Function. Make sure to use Generic Webhook - C# as your template, and to create your Function in the same region as your Logic App. Replace the default code with the following code, which will extract the actual JSON message from its input. 

#r "Newtonsoft.Json"
 
using System;
using System.Net;
using Newtonsoft.Json;
 
public static  async Task<object> Run(HttpRequestMessage req, TraceWriter log)
{
    // Get message as string
    var incomingMessage = await req.Content.ReadAsStringAsync();
     
    // Inform user
    log.Info($"Received message: {incomingMessage}");
     
    // Get body, use a dynamic so we can access the body
    dynamic incomingJson = JsonConvert.DeserializeObject(incomingMessage);
    string jsonString = incomingJson.body;
     
    // Get indices of actual message
    var start = jsonString.IndexOf("{");
    var end = jsonString.LastIndexOf("}") + 1;
    var length = end - start;
     
    // Get actual message
    string cleandJsonString = jsonString.Substring(start, length);
     
    // Inform user
    log.Info($"Cleaned message: {cleandJsonString}");
     
    return req.CreateResponse(HttpStatusCode.OK, cleandJsonString);
}

With this method, we get the message string from the body of the incoming data. Let's now adjust our Logic app to wrap the message from our queue inside of some JSON, and call our Azure Function. To use your function, first delete the Send Email action (we will have to recreate this later on) and then add an action, and switch to Show Azure Functions in the same region, select the container you created your Function in, and select the Function you just created. 

Next, we will set the Input Payload Object to wrap your incoming data. Now go and add a new Send Email action, and set the body to use the Body output from your Azure Functions action.

Run the application again, and wait for the Logic App to retrieve the message from the queue. Once processed, we will now receive an email with the following body.

{"Name":"Event Hubs","Value":"Object - JsonSerializer","EventProcessedUtcTime":"2016-07-07T10:34:57.0542901Z","PartitionId":2,"EventEnqueuedUtcTime":"2016-07-07T10:34:57.3390000Z"}

The JSON is now clean with no namespaces or invalid characters, and again, can also be used inside your Logic App.

Code

The code for this article can be found here.

See Also

Other languages