Content-based message routing using schema metadata in Azure Logic Apps & Service Bus

1. Introduction

In a previous article (see XML Message Routing with BizTalk Schema Promoted Properties in Azure Service Bus & Logic Apps) an implementation of a message routing solution was described that made use of the promoted properties contained within a BizTalk schema and uploaded to a Logic App Integration Account. Clearly a solution that completely replicates the BizTalk Server message routing functionality can not be implemented without some complex development; the purpose was to provide a relatively simple mechanism that could perform a content-based message routing function that would suffice in a great many cases. In this article an alternative solution is described, one that can used by any XML schema uploaded to an Integration Account, and one that can route messages using a Logic App rather than an Azure Function.

When certain artefacts are uploaded to an Integration Account artefact metadata is created and stored. For example, in the Schema blade, click on a Schema and then, after clicking the Edit as JSON button, the schema metadata can be viewed:

Custom metadata can be added to the metadata token. The solution, then, is to use this mechanism to store promoted property information as an alternative to, as in the previous article, utilizing an Azure Function to load a schema, retrieve the property information and persist it in a Redis cache. The solution will adopt the same generalisations and assumptions given in the previous article, and also assumes the schemas, integration account, Service Bus namespace, subscription and topic created are in extant resources.

2. Solution Overview

The promoted property information can obviously be manually copied and typed into the schema metadata, however, this would be soon become rather tedious and is liable to typing errors. One solution is to automated process whenever a schema is uploaded to an Integration Account. Within Azure there is a service, Event Grid, that sends HTTP requests to notify about events which occur in event publishers. For example, an Azure blob storage account is a publisher and when a blob is uploaded a BlobCreated event is generated and routed to event subscribers (see /en-us/azure/storage/blobs/storage-blob-event-overview ).

Resource Groups is also a publisher and hence will generate events when artefacts are added, updated, and deleted, and this includes Integration Account artefacts. The solution is as follows and consists of one function to copy the promoted properties to the schema metadata and a Logic App that will receive the incoming XML message, promote the properties, and send the message to Azure Service Bus. The steps are:

  1. Create an Event Grid trigger to subscribe to Resource Group events;
  2. Filter for schema upload events;
  3. Get the schema from the Integration Account and extract the promoted properties;
  4. Add the promoted properties to the schema metadata. For BizTalk schemas this information is stored within the schema itself. For other XSD schemas a separate file is needed to define which properties are to be promoted (see Appendix A 8.2).;
  5. Create Logic App to receive an XML message;
  6. Get the appropriate schema metadata;
  7. Promoted the properties and send the XML message to Azure Service Bus. 

3. Event Grid Trigger

Create a New Event Grid Trigger

Firstly, select an existing or create a new Azure Function resource in the Azure portal and ensure Managed service identity is enabled. Once this is done perform the following steps:

1.       Click on New Function and select and Event Grid Trigger (C#) as shown below:

 

2.       Create a new file project.json and upload the file to the root folder (i.e. the same folder as run.cx). Adding this file will cause the relevant NuGet packages to be loaded. The project.json file should contain the following:

{

 "frameworks": {

   "net46":{

     "dependencies": {

       "Microsoft.Azure.Management.Logic": "3.0.0",

       "Microsoft.Rest.ClientRuntime": "2.3.10",

       "Microsoft.IdentityModel.Clients.ActiveDirectory": "3.14.2",

       "Microsoft.Azure.Services.AppAuthentication": "1.1.0-preview",

     }

   }

 }

}

3.       In the run.csx file add the following code (you will need to update the three values: subscriptionId, resourceGroup, integrationAccount to match your Azure subscription ID, and the name of the resource group and integration account:

#r "Newtonsoft.Json"
using Microsoft.Azure.Management.Logic;
using Microsoft.Azure.Management.Logic.Models;
using Microsoft.Azure.Services.AppAuthentication;
using Microsoft.Rest;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using System.Xml;
 
private static  LogicManagementClient _client = null;
 
public static  async Task<object> Run(JObject EventGridTrigger, TraceWriter log)
{
    string  opName = (string)EventGridTrigger.SelectToken("data.operationName");    
    string  evType = (string)EventGridTrigger.SelectToken("eventType");
    string  resUri = (string)EventGridTrigger.SelectToken("data.resourceUri");
 
    string  subscriptionId = "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx";
    string  resourceGroup = "xxxxx";
    string  integrationAccount = "xxxxx";
    string  promotedPropertiesXPathBizTalk = "/*[local-name()='schema' and namespace-uri()='http://www.w3.org/2001/XMLSchema']/*[local-name()='element' and namespace-uri()='http://www.w3.org/2001/XMLSchema' and @name='{0}']/*[local-name()='annotation' and namespace-uri()='http://www.w3.org/2001/XMLSchema']/*[local-name()='appinfo' and namespace-uri()='http://www.w3.org/2001/XMLSchema']/*[local-name()='properties' and namespace-uri()='http://schemas.microsoft.com/BizTalk/2003']/*[local-name()='property' and namespace-uri()='http://schemas.microsoft.com/BizTalk/2003' and not(@distinguished='true')]";
    string  promotedPropertiesXPath = "/*[local-name()='schema' and namespace-uri()='http://www.w3.org/2001/XMLSchema']/*[local-name()='annotation' and namespace-uri()='http://www.w3.org/2001/XMLSchema']/*[local-name()='appinfo' and namespace-uri()='http://www.w3.org/2001/XMLSchema']/*[local-name()='schema' and namespace-uri()='PropertyPromotionsDemo.PropertySchema' and @name='{0}']/*";
 
    if  (opName == "Microsoft.Logic/integrationAccounts/schemas/write" && evType == "Microsoft.Resources.ResourceWriteSuccess")
    {
        // Check for LogicManagementClient
        if  (_client == null)
        {
            // Get LogicManagementClient object to access the Integration Account artefacts
            var provider = new  AzureServiceTokenProvider();
            var token = await provider.GetAccessTokenAsync("https://management.azure.com/");
            _client = new  LogicManagementClient(new TokenCredentials(token)) { SubscriptionId = subscriptionId };
        }
 
        // Get schema promoted properties
        string  schemaName = resUri.Substring(resUri.LastIndexOf('/') + 1);
        if  (schemaName == "PropertiesSchema") {return null;};
        var  ias = _client.Schemas.Get(resourceGroup, integrationAccount, schemaName);
        XmlDocument xdias = new  XmlDocument();
        xdias.Load(ias.ContentLink.Uri);
        JObject joMetaData = (JObject)ias.Metadata;
 
        // Get list of properties to promote
        XmlNodeList xnlProps = null;
        if(xdias.DocumentElement.Attributes["xmlns:b"] != null)
        {
            // It is a BizTalk schema
            xnlProps = xdias.SelectNodes(String.Format(promotedPropertiesXPathBizTalk, schemaName));                   
        }
        else
        {
            // Promoted properties are stored in the PropertiesSchema xsd
            var  iasProps = _client.Schemas.Get(resourceGroup, integrationAccount, "PropertiesSchema");
            XmlDocument xdiasProps = new  XmlDocument();
            xdiasProps.Load(iasProps.ContentLink.Uri);
            xnlProps = xdiasProps.SelectNodes(String.Format(promotedPropertiesXPath, schemaName));   
        };  
 
        // Check if there are properties to promote and that they don't already exist in the meta-data
        if  (xnlProps != null  && joMetaData["promotions"] == null)
        {
            // Construct JSON array of properties
            string  jsonProps = "["; 
            foreach  (XmlNode xn in  xnlProps)
            {
                jsonProps += "{\"" + xn.Attributes["name"].Value + "\": \"" + xn.Attributes["xpath"].Value + "\"}, ";
            }
            jsonProps = jsonProps.Remove(jsonProps.Length - 2) + "]";
            // Update schema metadata properties
            joMetaData.Add("promotions", JToken.Parse(jsonProps));
            // Update schema
            var iasUpdate = new  IntegrationAccountSchema(SchemaType.Xml, ias.Id, ias.Name, ias.Type, ias.Location, ias.Tags, ias.TargetNamespace, ias.DocumentName, ias.FileName, ias.CreatedTime, ias.ChangedTime, joMetaData, xdias.OuterXml, "application/xml", ias.ContentLink);
            _client.Schemas.CreateOrUpdate(resourceGroup, integrationAccount, schemaName, iasUpdate);
        }
    }
 
    return  null;
}

 

4.       Click Add Event Grid Subscription and Enter new subscription details. Enter a name for the new subscription, ensure the Topic Type is set to Resource Groups, and deselect Subscribe to all event types and only select Resource Write Success as shown below:

 

5.       In the Integration Account Access control (IAM) grant the Azure Function access to the Integration Account:

 

Testing the Event Grid Trigger

Test 1: Test with a BizTalk schema

  1. Create an XSD schema file given in Appendix A 8.1;

  2. Upload the XSD file to the Logic App Integration Account;

  3. On the Event Grid Trigger Monitor blade watch for a Success execution of the function;

  4. Once the Event Grid Trigger has executed view the Order schema metadata. It should appear as below with a promotions token:

 

Test 2: Test with a schema and custom property schema

  1. Remove the schema created and uploaded in test 1.
  2. Create the two XSD schema files given in Appendix A 8.2;
  3. Upload the PropertiesSchema XSD file to the Logic App Integration Account first then upload the Order schema;
  4. On the Event Grid Trigger Monitor blade watch for a Success execution of the function;
  5. Once the Event Grid Trigger has executed view the Order schema metadata. It should appear as below with a promotions token, note that the namespace prefix is missing from the two property names:

  

4. Logic App

Create a New Logic App

The steps required to perform this are as follows:

1.       First, create an XSLT map (name it GetPromotions) given below and upload it to an Integration Account:

<?xml version="1.0" encoding="utf-8"?>
<xsl:stylesheet version="1.0"
  xmlns:xsl="http://www.w3.org/1999/XSL/Transform"
  xmlns:msxsl="urn:schemas-microsoft-com:xslt"
  xmlns:userCSharp="http://schemas.microsoft.com/BizTalk/2003/userCSharp"
  exclude-result-prefixes="xsl msxsl userCSharp">
 
  <xsl:output omit-xml-declaration="yes" media-type ="application/json" version="1.0" />
  
  <xsl:param name="promotions" />
 
  <xsl:variable name="setNavigator" select="userCSharp:SetNavigator(/)" />
  <xsl:variable name="messageType" select="concat(namespace-uri(/*),'#',local-name(/*))" />
   
  <xsl:template match="/">
    <xsl:text>{</xsl:text>
    <xsl:for-each select ="userCSharp:JsonArrayToXml($promotions)/Root/Row">
      <xsl:text>"</xsl:text>
      <xsl:value-of select="name(./*[1])"/>
      <xsl:text>":"</xsl:text>
      <xsl:value-of select="userCSharp:Evaluate(./*[1])"/>
      <xsl:text>",</xsl:text>
      <xsl:if test="position()=last()">
        <xsl:text>"BTS:MessageType":"</xsl:text>
        <xsl:value-of select="$messageType"/>
        <xsl:text>"</xsl:text>
      </xsl:if>
    </xsl:for-each>
    <xsl:text>}</xsl:text>
  </xsl:template>
 
  <msxsl:script language="C#" implements-prefix="userCSharp">   
    <msxsl:assembly name="Newtonsoft.Json" />
    <msxsl:using namespace="System.Xml.XPath" />
    <msxsl:using namespace="Newtonsoft.Json" />
    <![CDATA[
       private XPathNavigator _navigator;       
       public void SetNavigator(XPathNodeIterator nodes)
       {
         if (nodes.MoveNext())
         {
           _navigator = nodes.Current;
         }
       }
       public XmlNode JsonArrayToXml(string json)
       {
         return JsonConvert.DeserializeXmlNode("{\"Row\":" + json + "}", "Root");
       }
       public string Evaluate(string xpath)
       {
         return (string)_navigator.Evaluate("string(" + xpath + ")");
       }       
     ]]>
  </msxsl:script>
</xsl:stylesheet>

 

2.       Now create a new Logic App resource, go to the resource, and in Settings | Workflow settings | Ensure an Integration Account is selected. In the Logic App designer select When a HTTP request is received trigger;

3.       Add an Integration Account Artifact Lookup action as shown below:

The artifact name should contain the expression highlighted to retrieve the XML message root node name (in Code view the value  would be: "@{xpath(xml(triggerBody()),'local-name(/*)')}");

4.       Add a Transform XML action and set the Content, Map, and Transform options values as shown below:

 

The promotions value should contain a reference to the schema metadata returned by the previous step (in Code view the value would be: "@{outputs('Integration_Account_Artifact_Lookup')['properties']['metadata']['promotions']}");

5.       Add a service Bus Send Message action configured as below:

   

Testing the Logic App

At this point the Azure Logic App can now be unit tested.

1.       Use Postman or similar to call the Logic App passing the XML message given in Appendix B in the request body;

2.       View the Orders subscription in Service Bus Explorer. The message sent and the custom properties should appear in the appropriate windows:

 

 

5. Some Further Considerations

There are a few things to bear in mind when using this method of Property Promotion:

  • The Event Grid trigger will itself cause the Resource Group publisher to create an event when the schema metadata is modified. Care must be taken not to get caught in an endless updating loop;
  • Any external source could be used to store the properties to be promoted. In this instance a separate ‘properties’ schema was used but it could easily have been some other configuration resource (e.g. a text file);
  • The solution assumes the root node name of an incoming message matches the schema name of a schema in Integration Account;
  • An XML Validation action could be added to the Logic App and any failed message routed to the dead letter queue;
  • The XSLT transform demonstrates how the Newtonsoft framework can be used to convert a JSON value to XML in XSLT and how to implement dynamic XPath;
  • An alternate Azure Function to the Logic App, which searches for a schema based on both the incoming message root node name and namespace, is given in Appendix D;

6. Summary

The purpose of this article is to provide a mechanism for the content-based routing XML of messages using metadata of schemas uploaded to an Azure Logic App’s integration account. It differs from the previous article in that it can be used by any XML schema and all the heavy lifting is done during deployment which result is a simple and light-weight implementation. It also serves to introduce the developer to Azure Event Grid, Triggers resources and Integration Account artefact metadata.

7. References

An introduction to Azure Event Grid
Manage artifact metadata in integration accounts for logic apps
How to use Azure Managed Service Identity (public preview) in App Service and Azure Functions
Use Resource Manager authentication API to access subscriptions
Microsoft.IdentityModel.Clients.ActiveDirectory Namespace
Getting started with the Logic Apps SDK
LogicManagementClient class
System.Collections.Concurrent Namespace

](https://msdn.microsoft.com/en-us/library/system.collections.concurrent(v=vs.110).aspx)

8. Appendix A: Order Schema File

8.1 BizTalk Order Schema

 

Order.xsd

<?xml version="1.0"?>

<xs:schema xmlns="PropertyPromotionsDemo.Order" xmlns:b="http://schemas.microsoft.com/BizTalk/2003" xmlns:ns0="OrderPropertySchema" targetNamespace="PropertyPromotionsDemo.Order" xmlns:xs="http://www.w3.org/2001/XMLSchema">

  <xs:annotation>

    <xs:appinfo>

      <b:imports>

        <b:namespace prefix="ns0" uri="OrderPropertySchema" location=".\OrderPropertySchema.xsd" />

      </b:imports>

    </xs:appinfo>

  </xs:annotation>

  <xs:element name="Order">

    <xs:annotation>

      <xs:appinfo>

        <b:properties>

          <b:property name="ns0:CustomerID" xpath="/*[local-name()='Order' and namespace-uri()='PropertyPromotionsDemo.Order']/*[local-name()='CustomerID' and namespace-uri()='']" />

          <b:property name="ns0:ShippingAddressID" xpath="/*[local-name()='Order' and namespace-uri()='PropertyPromotionsDemo.Order']/*[local-name()='ShippingAddressID' and namespace-uri()='']" />

        </b:properties>

      </xs:appinfo>

    </xs:annotation>

    <xs:complexType>

      <xs:sequence>

        <xs:element name="OrderID" type="xs:string" />

        <xs:element name="ItemName" type="xs:string" />

        <xs:element name="ItemCode" type="xs:string" />

        <xs:element name="Price" type="xs:string" />

        <xs:element name="Quantity" type="xs:string" />

        <xs:element name="CustomerID" type="xs:string" />

        <xs:element name="ShippingAddressID" type="xs:string" />

      </xs:sequence>

    </xs:complexType>

  </xs:element>

</xs:schema>

  

8.2 PropertiesSchema & Order schemas

 

PropertiesSchema.xsd

<?xml version="1.0"?>

<xs:schema xmlns:p="PropertyPromotionsDemo.PropertySchema" xmlns:xs="http://www.w3.org/2001/XMLSchema">

  <xs:annotation>

    <xs:appinfo>

      <p:schema name="Order">

        <p:property name="CustomerID" xpath="/*[local-name()='Order' and namespace-uri()='PropertyPromotionsDemo.Order']/*[local-name()='CustomerID' and namespace-uri()='']" />

        <p:property name="ShippingAddressID" xpath="/*[local-name()='Order' and namespace-uri()='PropertyPromotionsDemo.Order']/*[local-name()='ShippingAddressID' and namespace-uri()='']" />

      </p:schema>

    </xs:appinfo>

  </xs:annotation>

</xs:schema>

 

Order.xsd

<?xml version="1.0"?>

<xs:schema xmlns="PropertyPromotionsDemo.Order" xmlns:ns0="OrderPropertySchema" targetNamespace="PropertyPromotionsDemo.Order" xmlns:xs="http://www.w3.org/2001/XMLSchema">

  <xs:element name="Order">

    <xs:complexType>

      <xs:sequence>

        <xs:element name="OrderID" type="xs:string" />

        <xs:element name="ItemName" type="xs:string" />

        <xs:element name="ItemCode" type="xs:string" />

        <xs:element name="Price" type="xs:string" />

        <xs:element name="Quantity" type="xs:string" />

        <xs:element name="CustomerID" type="xs:string" />

        <xs:element name="ShippingAddressID" type="xs:string" />

      </xs:sequence>

    </xs:complexType>

  </xs:element>

</xs:schema>

 

9. Appendix B: Sample Messages File

 Order.xml

<ns0:Order xmlns:ns0="PropertyPromotionsDemo.Order">

  <OrderID>123456789</OrderID>

  <ItemName>Microsoft Surface Pro</ItemName>

  <ItemCode>5855919</ItemCode>

  <Price>699</Price>

  <Quantity>1</Quantity>

  <CustomerID>1357911</CustomerID>

  <ShippingAddressID>24681012</ShippingAddressID>

</ns0:Order>

  

10. Appendix C: Alternate Azure Function

Below is an Azure Function that performs the same action as the Logic App. There are a couple of differences to note (highlighted in green). First, the Integration Account schemas are stored in a thread-safe static variable for temporary caching. Second the schema is search for using both the root node name and namespace of the message (i.e. a BizTalk message type value) rather than the Integration Account schema name.

Create a new file project.json and upload the file to the root folder (i.e. the same folder as run.cx). Adding this file will cause the relevant NuGet packages to be loaded. The project.json file should contain the following:

{

 "frameworks": {

   "net46":{

     "dependencies": {

       "Microsoft.Azure.Management.Logic": "3.0.0",

       "Microsoft.Rest.ClientRuntime": "2.3.10",

       "Microsoft.IdentityModel.Clients.ActiveDirectory": "3.14.2",

       "Microsoft.Azure.Services.AppAuthentication": "1.1.0-preview",

     }

   }

 }

}

 

 

In the run.csx file add the following code (you will need to update the three values: subscriptionId, resourceGroup, integrationAccount, and serviceBusConnectionString to match your Azure subscription ID, and the name of the resource group and integration account:

 

#r "Newtonsoft.Json"

#r "Microsoft.ServiceBus"

using Microsoft.Azure.Management.Logic;

using Microsoft.Azure.Management.Logic.Models;

using Microsoft.Azure.Services.AppAuthentication;

using Microsoft.Rest;

using Microsoft.ServiceBus;

using Microsoft.ServiceBus.Messaging;

using Newtonsoft.Json;

using Newtonsoft.Json.Linq;

using System;

using System.Collections.Concurrent;

using System.Linq;

using System.Net;

using System.Text;

using System.Xml;

 

public static ConcurrentBag<IntegrationAccountSchema> _schemas = null;

 

public static async Task<HttpResponseMessage> Run(HttpRequestMessage req, TraceWriter log)

{

    string subscriptionId = "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx";

    string resourceGroup = "xxxxx";

    string integrationAccount = "xxxxx";

    string serviceBusConnectionString = "Endpoint=sb://xxxxxx.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx";

    string topicName = "BizTalkDemo";

 

    // Get Integration Account Schemas

    if (_schemas == null)

    {

        // Get LogicManagementClient object to access the Integration Account artefacts

        var provider = new AzureServiceTokenProvider();

        var token = await provider.GetAccessTokenAsync("https://management.azure.com/");

        var client = new LogicManagementClient(new TokenCredentials(token)) { SubscriptionId = subscriptionId };

        _schemas = new ConcurrentBag<IntegrationAccountSchema>(client.Schemas.ListByIntegrationAccounts(resourceGroup, integrationAccount));

    }

 

    // Load XML message

    string content = await req.Content.ReadAsStringAsync();

    XmlDocument xd = new XmlDocument();

    xd.LoadXml(content);

    BrokeredMessage bm = new BrokeredMessage(content);

 

    // Get promoted properties

    var ias = from s in _schemas where s.DocumentName == xd.DocumentElement.LocalName && s.TargetNamespace == xd.DocumentElement.NamespaceURI select s;

    if (ias.Count() != 0)

    {

        JObject jo = (JObject)ias.First<IntegrationAccountSchema>().Metadata;

        JArray ja = (JArray)jo.SelectToken("promotions");

        foreach (JToken jt in ja)

        {

            string name = jt.First.Value<JProperty>().Name;

            string xpath = (string)jt.First.Value<JProperty>().Value;

            bm.Properties.Add(name, xd?.SelectSingleNode(xpath)?.InnerText ?? String.Empty);

        }

    }

 

    // Promote message type

    bm.Properties["BTS:MessageType"] = string.Concat(xd.DocumentElement.NamespaceURI, "#", xd.DocumentElement.LocalName);

 

    // Create MessagingFactory, Service Bus NamespaceManager and send to Topic

    MessagingFactory mf = MessagingFactory.CreateFromConnectionString(serviceBusConnectionString);

    NamespaceManager nm = NamespaceManager.CreateFromConnectionString(serviceBusConnectionString);

    TopicClient tc = mf.CreateTopicClient(topicName);

    tc.Send(bm);

    mf.Close();

 

    return req.CreateResponse(HttpStatusCode.OK, "success", "application/xml");  

}