Showing posts with label Azure Service Bus. Show all posts
Showing posts with label Azure Service Bus. Show all posts

Friday, May 13, 2016

Retrieving the Message from Service Bus Queue

This is a follow up to the previous post where I sent a strongly typed XML message from a CRM plugin to a Service Bus Queue. This is what the Xml looks like that I sent to the queue.
Note that the root node is the name of the entity. This is a snippet to show the attributes and then the formatted values.  You can easily manipulate the Xml by changing the way it populates the datatable.

 <contact>
  <attributes>
    <address1_addressid>c6fd930f-512a-42fb-a645-af4a672c740f</address1_addressid>
    <address2_addressid>6a280e36-c009-463b-84f1-8666c2dfc5ba</address2_addressid>
    <address2_addresstypecode>1</address2_addresstypecode>
    <address2_freighttermscode>1</address2_freighttermscode>
    <address2_shippingmethodcode>1</address2_shippingmethodcode>
    <address3_addressid>5475a20e-3c79-479a-a1d5-05b0f144a8fc</address3_addressid>
    <contactid>1d50bcf0-2519-e611-80da-5065f38b46e1</contactid>
    <createdby>Charles Emes</createdby>
    <createdon>13/05/2016 16:15:51</createdon>
;... more attributes
;... now the formatted values
    <address2_addresstypecodename>Default Value</address2_addresstypecodename>
    <address2_freighttermscodename>Default Value</address2_freighttermscodename>
    <address2_shippingmethodcodename>Default Value</address2_shippingmethodcodename>
    <createdonname>13/05/2016 17:15</createdonname>
  </attributes></contact>

What I'm missing is a namespace and that is easy enough to add once I load it into an XmlDocument. I'm a BizTalk developer so right now I'm happy with a message that I can transform into anything I want. The attributes are in alphabetical order so I can cope with missing attributes easily enough in the XSD by making Nillable=true and MinOccurs=0. But I can also deserialize this into a contact object by creating a class form the XSD using the XSD.EXE.  Note this is absolutely not a CRM Contact object but my own object. Check out my usings because there is no Microsoft.Xrm.SDK anywhere to be seen. 

This receiver is just a console app and I ripped the code from another blogger. I'm just using it show what you can do with the message now you've got it.

     
static void Main(string[] args)
{
string connectionString = CloudConfigurationManager.GetSetting("Microsoft.ServiceBus.ConnectionString");
QueueClient Client = QueueClient.CreateFromConnectionString(connectionString, "anoqueue");
//Console.WriteLine("\nReceiving message from Queue...");
BrokeredMessage message = null;
NamespaceManager namespaceManager = NamespaceManager.Create();
while (true)
{
try
{
//receive messages from Queue
message = Client.Receive(TimeSpan.FromHours(10));
if (message != null)
{
Console.WriteLine(string.Format("Message received: Id = {0}", message.MessageId));
string s = new StreamReader(message.GetBody<Stream>(), Encoding.ASCII).ReadToEnd();
// load into an XML Document to s
System.Xml.XmlDocument xmldoc = new System.Xml.XmlDocument();
xmldoc.LoadXml(s);
// need to add a namespace because it doesn't have one
xmldoc.DocumentElement.SetAttribute("xmlns", "http://xrm.generic.schemas");
string mydocpath = @"C:\Projects\Test\ReceiveFromSB\";
xmldoc.Save(mydocpath + "saved.xml");
// Deserialize to an object too
XmlRootAttribute xRoot = new XmlRootAttribute();
xRoot.ElementName = message.Properties["EntityName"].ToString();
XmlSerializer serializer = new XmlSerializer(typeof(contact),xRoot);
StringReader rdr = new StringReader(s);
contact myContact = (contact)serializer.Deserialize(rdr);
// Now delete the message
message.Complete();

}
else
{
//no more messages in the queue
break;
}
}
catch (MessagingException me)
{
if (!me.IsTransient)
{
Console.WriteLine(me.Message);
throw;
}
else
{
// HandleTransientErrors(e);
}
}
catch (Exception e)
{
Console.WriteLine(e.Message);
throw;
}
}
Client.Close();
}
}
 

Dynamics CRM Online: Posting Messages to the Azure Service Bus

I have to say I've never really liked the way Dynamics CRM integrates with the Azure Service Bus. I've posted on this before here and here. What I dislike is the way it posts the Context to the Service Bus.  In order to do anything with the Context I need to use the RemotePluginContext and then extract the entity that I want. That means using the XrmSDK and being familiar with the way to handle the Context. But I like a loosely coupled architecture. I would expect a CRM plugin to place a strongly typed Xml message on the Service Bus. Then processing of the message requires no knowledge of CRM. I'm a BizTalk developer and for interfaces are always about messages that comply with XML schemas because that acts as the contract.  In a development environment where you have different teams of developers with different skill sets this separation I believe is vital.

Now you may know that Dynamics CRM only supports the ACS authentication of the Service Bus and you have to create this using PowerShell commands because it is not possible via the portal. See this post. You can find the details of setting up the Service Endpoint elsewhere but the bottom line is that you are using a Guid that points to the Service Endpoint record. So what about deploying to another environment where you will use a different Service Endpoint? Well you have to go through the manual process again. What in a Production environment? Are you kidding me? No, I want to be able to deploy and then just configure the endpoint url.

The challenge in doing this online is that the plugin runs in Sandbox mode and it restricts what .Net assemblies I can use, System.Security being one of them. That rules out SAS authentication because it requires on System.Security.Cryptography.

My solution is a plugin that does two things. It populates a data table with the attributes from the entity and produces strongly typed Xml.  It then uses WebClient to POST the xml as a string to the Service Bus using the REST API. The authentication uses the ACS token and the only configuration parameters I need are the service bus namespace, the queue name, and the issuer secret. The great thing about the solution is it is totally generic, it works with any entity, all you need is to configure a plugin step. A couple of points to note.
1. I'm using a queue.
2. My plugin is registered as synchronous because I want to maintain ordered delivery
3. I'm sending to the queue synchronously, because I want to know I successfully sent it.
4. I can choose if I want to use the PreImage, PostImage or Target
5.The plugin calls my EntitySerialize class
6. I am indebted to other bloggers for much of this code

The next post shows you can retrieve the message form the queue.
using System;

using System.Collections.Specialized;
using System.Data;
using System.Linq;
using System.Text;
using System.Net;
using Microsoft.Xrm.Sdk;

public class EntityHelper
    {
        // **************Use bits from  the Service bus namespace below ******************
        //
        static string ServiceNamespace = "yournamespace";
        static string ServiceHttpAddress = "https://yournamespace.servicebus.windows.net/queuename/messages";
        const string acsHostName = "accesscontrol.windows.net";
        const string sbHostName = "servicebus.windows.net";
        const string issuerName = "owner";
        const string issuerSecret = "your_issuer_secret_goes_here";
        public static void EntitySerialize(ITracingService tracingService,  IOrganizationService service, Entity entity, string orgName, string msgName, string correlation)
        {
            try
            {
                DataSet ds = new DataSet(entity.LogicalName);
                DataTable dt = new DataTable("attributes");
                DataTable dataTable = new DataTable();
                ConvertEntityToDataTable(dt, entity);
                ds.Tables.Add(dt);
                string xml = ds.GetXml();
                PostMessageToBus(entity, orgName, msgName, correlation, xml);
            }
            catch (System.Net.WebException  we)
            {
                tracingService.Trace(we.Message);
                throw new Exception("Web Exception: " +we.Message);
            }
            catch (Exception e)
            {
                tracingService.Trace(e.Message);
                throw e;
            }
            return ;
        }
        private static void PostMessageToBus(Entity entity, string orgName, string msgName, string correlation, string xml)
        {
            WebClient webClient = new WebClient();
            webClient.Headers[HttpRequestHeader.Authorization] = GetToken();
            webClient.Headers[HttpRequestHeader.ContentType] = "application/atom+xml;type=entry;charset=utf-8";
            // Set brokered Properties this way
            webClient.Headers.Add("BrokerProperties", "{ \"MessageId\":\"123456789\", \"Label\":\"M1\"}");
            // example of the parameters that can be passed and available to receiver as mesage.Properties collection
            webClient.Headers["Correlation"] = correlation;
            webClient.Headers["OrganisationName"] = orgName;
            webClient.Headers["MessageName"] = msgName;
            webClient.Headers["EntityName"] = entity.LogicalName;
            var response = webClient.UploadData(ServiceHttpAddress, "POST", System.Text.Encoding.UTF8.GetBytes(xml));
            string responseString = Encoding.UTF8.GetString(response);
        }
        ///////
          private static void ConvertEntityToDataTable(DataTable dataTable, Entity entity)
         {
             DataRow row = dataTable.NewRow();
             foreach (var attribute in entity.Attributes.OrderBy(a=>a.Key))
             {
                 if (!dataTable.Columns.Contains(attribute.Key))
                 {
                     dataTable.Columns.Add(attribute.Key);
                 }
                 if (getAttributeValue(attribute.Value) != null)
                 {
                     row[attribute.Key] = getAttributeValue(attribute.Value).ToString();
                 }
             }
             foreach (var fv in entity.FormattedValues.OrderBy(a=>a.Key))
             {
                 if (!dataTable.Columns.Contains(fv.Key + "name"))
                 {
                     dataTable.Columns.Add(fv.Key + "name");
                 }
                 row[fv.Key + "name"] = fv.Value;
             }
             dataTable.Rows.Add(row);
         }
        ///////
         private static object getAttributeValue(object entityValue)
         {
             object output = "";
             switch (entityValue.ToString())
             {
                 case "Microsoft.Xrm.Sdk.EntityReference":
                     output = ((EntityReference)entityValue).Name;
                     break;
                 case "Microsoft.Xrm.Sdk.OptionSetValue":
                     output = ((OptionSetValue)entityValue).Value.ToString();
                     break;
                 case "Microsoft.Xrm.Sdk.Money":
                     output = ((Money)entityValue).Value.ToString();
                     break;
                 case "Microsoft.Xrm.Sdk.AliasedValue":
                     output = getAttributeValue(((Microsoft.Xrm.Sdk.AliasedValue)entityValue).Value);
                     break;
                 default:
                     output = entityValue.ToString();
                     break;
             }
             return output;
         }
         private static string GetToken()
         {
             var acsEndpoint = "https://" + ServiceNamespace + "-sb." + acsHostName + "/WRAPv0.9/";
             // Note that the realm used when requesting a token uses the HTTP scheme, even though
             // calls to the service are always issued over HTTPS
             var realm = "http://" + ServiceNamespace + "." + sbHostName + "/";
             NameValueCollection values = new NameValueCollection();
             values.Add("wrap_name", issuerName);
             values.Add("wrap_password", issuerSecret);
             values.Add("wrap_scope", realm);
             WebClient webClient = new WebClient();
             byte[] response = webClient.UploadValues(acsEndpoint, values);
             string responseString = Encoding.UTF8.GetString(response);
             var responseProperties = responseString.Split('&');
             var tokenProperty = responseProperties[0].Split('=');
             var token = Uri.UnescapeDataString(tokenProperty[1]);
             return "WRAP access_token=\"" + token + "\"";
         }

     }
}

Sunday, August 16, 2015

Dynamics CRM 2015 Online and Azure Service Bus Messages

If you've configured Dynamics CRM 2015 Online with Azure Service Bus you will know that the "message" it puts on the queue is the plugin execution context. This is the same context that you use within a plugin.  You will know that to do anything with the context you have to use the Microsoft.Xrm.Sdk and extract the Pre or Post Image and the so called Target image. "Target" is s stupid name I always think. It contains the delta - the attributes that were changed during the operation.  The Post Image of say an Update event will contain all the attributes that have values in it and excludes any attributes with null values. 

The reason for using the Azure Service Bus is usually to get data from CRM to your on-premise systems and more often than not you may be using an ESB to read messages from the Azure Service Bus.  BizTalk for example has an adaptor that you just need to configure to read messages from Azure Service Bus.  However any BizTalk developer is going to be very disappointed if you provide them with just a Plugin Execution Context because they will have to use the CRM SDK to convert it into an XML message.  Even then the entity objects when serialized will result in a collection of KeyValuePairs.  The biggest problem with that is trying to map it to a strongly typed schema particularly because the structure varies so much when attributes that are null are set to a value and vice versa.  I've tried using the BizTalk mapper and failed.

UPDATE: CRM Online restricts plugins to Sandbox mode and you cannot serialize objects to XML because this is prohibited. I have recently stumbled on a way of getting round this. This may provide a much simpler solution then described in the rest of this article.

One solution is to have custom code that will read the context, convert it to a strongly typed schema and then put it back in another queue.  This needs to be done serially to ensure that Ordered Delivery is maintained but at least it gets to a schema that is worthy of the name.

Now you have to be careful when using CRM Online because if your plugins consume a lot of CPU they will be terminated with extreme prejudice.  It is best to keep the code within your plugin as minimal as possible and then have the bulk of your code execute on an Azure VM where you don't need to worry about CPU usage because you can size the VM to suit.

The best way to architect this is
a) Have a Plugin that sends the Plugin Execution Context to a Azure Service Bus Queue
b) Create an Azure Worker Process that reads messages from the Queue, creates an XML message and puts it into another Azure Service Bus Queue (be sure to maintain the order)
c) If using BizTalk create a Receive Port that will read XML messages from the second Queue

The Azure Worker Process is a lot like a Windows Service (in Azure its called a Cloud Service) and will automatically restart if it is stopped.  When you deploy the Azure Worker Process it will create its own VM.  You can supply configuration settings which in our case would include the EndPoint of the Queue we are reading from and the EndPoint of the Queue we are writing to.

While you messages should be placed on the first queue using a plugin running synchronously, the Azure Worker Process is running asynchronously.  If you want to post messages back to CRM because of a failure then you need to setup another asynchronous process to send the messages back to CRM. 

Saturday, July 4, 2015

Dynamics CRM 2015 Online and Azure Service Bus

In the previous post I talked about sending messages to another system via a message queue.  This is the design pattern Microsoft recommends when you want to use Dynamics CRM Online to update systems that are on-premise. They recommend using Azure Service Bus and the integration is built into the Online version by creating a Service Endpoint. This is also available for the on-premise version.

The plugin registration tool for Dynamics CRM  offers the ability to register a Service EndPoint when you are connected to CRM Online.  What is not clear though is that there are two ways you can configure it.
First though there are two gotchas when setting up the Azure Service Bus.

GOTCHA #1: You set up the Azure Service Bus Namespace using the Portal.  Wrong.
Doing it that way you no longer have the option to use ACS authentication and that is what the Plugin Registration tool uses.  Delete it. Download the PowerShell Azure Commands add-in and run this command:
New-AzureSBNamespace -Name yourservice -Location "West Europe" -CreateACSNamespace $true -NamespaceType Messaging

The response you get returned is

You need to use the DefaultKey when the Plugin Registration tool prompts you for the Management Key!

GOTCHA #2: You create the Queue (or whatever) using the Portal or PowerShell. Wrong.
You need to leave this for the Plugin Registration tool to do.

I won't give the rest of the details for configuring the endpoint because that is covered in other blogs.

Once you have the Service EndPoint registered there are two ways forward.

The first and seemingly the most attractive option is you can register steps and images right there under the endpoint just as you would do with a plugin.  The advantage is that this is a zero code solution. Just by configuring an Entity with the appropriate step and image you can get messages in your queue (or whatever). The thing is though is this method only supports Asynchronous operations.  That may be fine if you have a very simple CRM solution and want to configure only one or two entities. In more real world scenarios this is not going to work for you because it won't guarantee ordered delivery.  That is what I covered in my previous post.  To maintain Ordered Delivery you must use synchronous plugins steps.

The second route is to create an Azure-aware plugin. There is sample code in the SDK for doing this and out there in blogosphere.  In this case you just create the service endpoint and copy the Id that it creates. Create your Azure-aware plugin and paste the Id into the Unsecure Configuration section.  Register your plugin steps and images as usual. The plugin uses an instance of the IServiceEndpointNotificationService and essentially posts the context (using the Execute method) to the Service Bus endpoint.  The point here though is that you have full choice over how to register your steps, so if you need Ordered Delivery you can choose Synchronous.

Personally I find the whole method of configuring a Service EndPoint sucks. What about when I want to deploy this to other environments?  I am going to have to repeat the manual steps for each environment and when I deploy my Azure aware plugin I am going to have to amend the Id each time. Now you might argue this will be a one off process and its no big deal.  But I prefer my deployments not to involve manual steps so I'm inclined to post messages to the Azure Service Bus using code and have the connection string stored in a configuration entity along with other environment settings.  Remember though that you have to use the REST API to post messages because the plugin runs in Sandbox mode. 

Dynamics CRM, Plugins, Ordered Delivery and Queues

This post applies to both Dynamics CRM Online and On-Premise. The scenario is where you need to keep another system synchronized with changes to Dynamics CRM entities.  To make this loosely coupled you can write messages to a queue. If your target system is unavailable, the queue can store messages until it comes back online. The same design pattern is recommended for Dynamics CRM Online where messages are written to Azure Service Bus queue. You then have a process on-premise (it my be an ESB) that reads messages from the queue and sends then on to the target system.

This post stems from work I did on a previous project where we used CRM on-premise to write messages to MSMQ.  If you are reading this far I assume you already know about Ordered Delivery but here is the bottom line:

If you want to maintain Ordered Delivery you must use Synchronous Plugins.

If you use a plugin registered for asynchronous it may appear to give you Ordered Delivery 4 out of 5 times, but you cannot guarantee it for all messages.

You can spend the time proving it for yourself or read this explanation.

We had a custom entity for address that meant you could create an address that was the primary address or the regulatory address or both. The business rule was that you could only have one active address for primary and regulatory. To achieve this we created a plugin on that fires on Create of an address and as a Pre-Operation, if you set the both primary and regulatory flags on the address to true it checks if any existing addresses are primary or regulatory, sets them to false and then deactivates the address(es). Now the target system has to obey the same logic so we need to send any messages to it n the correct order, i.e. in ordered delivery.
So I created a plugin that was generic and would write a message out to MSMQ. I registered it to run as a Post Operation on Create and Update of an Address and set it to run Asynchronously.
In one test case we have two existing addresses one set as primary, the other set as regulatory  (lets call them 'Primary'  and 'Regulatory'),  We create a new address ('New') and set it to both primary and regulatory.  That creates 5 messages.
1. Update of Primary to set the primary flag to false
2. Update of Primary when status is set to deactivate
3. Update of Regulatory to set the regulatory flag to false
4. Update of Regulatory when status is set to deactivate
5. Create of the New address

Now you want to maintain the order that the addresses were written to the database. The Create must come last or you've broken the business rule about only have one active primary or regulatory address. With the on-premise CRM I could examine the Asynchronous table and could see the 5 messages there. They were flagged as belonging to the same transaction but when you looked at the processed time they were all identical.  All five records are executed simultaneously and its a matter of chance which message gets in the queue first.  There is an order, but its not consistent.

BizTalk works in a similar way to the CRM Asynchronous Service and its architected that way for performance reasons. 

When I changed the plugin to work synchronously then it does maintain the correct order of the messages.  You do need to pay attention though to the Rank when you have multiple plugins registered on the same entity for the same stage. By default, Rank is zero but you can put any integer up to 99 into it, and this will set the order that the plugins fire in.  I wanted my message to be the last plugin to execute so I set it to 99.  Remember though that it affects the order
of the plugins within the same stage. The plugin pipeline always executes as
1. Pre-Validation
2. Pre-Operation (before the database write)
3. Post-Operation(after the database write)

Here is the bottom line again

If you want to maintain Ordered Delivery you must use Synchronous Plugins.