How to Read/Write data from Azure service bus Topics/MSMQ

We had a requirement where one of our client wanted to perform the below operation.

  1. Data will read from the Azure service bus and will be written to windows MSMQ.
  2. Similarly, Data will read from windows MSMQ and will be written to Azure service bus.

Introduction

Client had a third party solution which used to send the data from portal to Dynamics AX, they did not have direct way to send the data to AX, so they did some custom development and used a worker role to send the data to AX.

Similarly, they were sending back data to the custom portal.

So, they wanted to use the scribe Insight to integrate the two system instead of the worker Role deployed by tem.

Description

As we know that the data is present in Azure Service Bus Topic from the third Party Application. We can implement the logic and send the data to the Scribe in Queue.

Azure Service Bus (Topic) -> Microsoft MSMQ

If you want to send the data form Azure service bus Topic, please follow the steps.

  1. Read the data from the Azure service bus and write to the Microsoft MSMQ (Scribe Insight Queue)
    1. You need to have connection string of the service bus.
    2. Topic Name.
    3. Subscription Name.
    4. MSMQ name.

    You can use the below code to read the data from Azure service bus Topic

    private   void ReadTopic()
    {
        string connectionString = "Endpoint=sb://sample.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=d7wsaddYK1unQjy1WiIdX/4t8M+vtFZPKDluMSzelFpGdadsad=";
     
        SubscriptionClient Client = SubscriptionClient.CreateFromConnectionString(connectionString, "topic", "subscription");
     
        // Configure the callback options.
        OnMessageOptions options = new OnMessageOptions();
        options.AutoComplete = false;
        options.AutoRenewTimeout = TimeSpan.FromMinutes(1);
     
        Client.OnMessage((message) =>
        {
            try
            {
    
    // Process message from subscription.
                           eventLog.WriteEntry(“Processing data”);
     
                           var bodyJsona = new StreamReader(message.GetBody<Stream>(), Encoding.UTF8).ReadToEnd();
                           bodyJson = @bodyJsona;
                           var document = JsonConvert.DeserializeXmlNode(bodyJson, Constants.ROOT);
                           eventLog.WriteEntry(Constants.MESSAGEID + message.MessageId);
      
                           MessageQueue rmTxnQ = new MessageQueue(FormatName:Direct=OS:" + MachineName + "\\private$\\scribein
    );
                            rmTxnQ.DefaultPropertiesToSend.Recoverable = true;
                            
                           rmTxnQ.Send(document, MessageQueueTransactionType.Automatic);
     
                           message.Complete();
            }
            catch (Exception)
            {
                // Indicates a problem, unlock message in subscription.
                message.Abandon();
            }
        }, options);
    }
    
  2. The Above function will keep listening and if there is any data coming to specified topic/subscription it will read the data and write to the specified queue.
  3. Once that data is in MSMQ you can read and pass the data to AX system since the connector is available in Scribe Insight.

Microsoft MSMQ -> Azure Service Bus (Topic)

  1. Read the data from the Microsoft MSMQ (AzureOut Queue) and write to the Azure service bus.
    1. You need to have connection string of the service bus. You can use the below code to read the data from MSMQ.
  2. Since there is no listener, so if there is any data coming to the MSMQ (AzureOut) it will not process immediately if any record processed to the queue.
  3. You can use the below function to read data form the MSMQ
    1. You need to have connection string of the service bus.
    2. Topic Name
    3. Subscription Name
    4. MSMQ name
    private void ReceiveMessageFromQueue( )
           {
     
               MessageQueue messageQueue = null;
               Message[] messages = null;
               XmlDocument doc = null;
     
               ////XDocument input = null;
               string dataFormatedJson = string.Empty;
               string messageTopicName = string.Empty;
               string connectionString = "Endpoint=sb://sample.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=d7wsaddYK1unQjy1WiIdX/4t8M+vtFZPKDluMSzelFpGdadsad=";
               string topicName = "topic";
               try
               {
                   messageQueue = new MessageQueue("FormatName:Direct=OS:" + MachineName + "\\private$\\azureout", false);
     
                   messages = messageQueue.GetAllMessages();
     
                   foreach (Message msmqMessage in messages)
                   {
                       Message message = messageQueue.Receive();
     
                       messageTopicName = message.Label;
     
                       message.Formatter = new XmlMessageFormatter(new string[] { "System.String" });
                       byte[] b = new byte[message.BodyStream.Length];
                       message.BodyStream.Read(b, 0, (int)message.BodyStream.Length);
                       System.Text.UTF8Encoding enc = new System.Text.UTF8Encoding();
     
                       var returnVal = enc.GetString(b);
     
                       doc = new XmlDocument();
     
                       doc.LoadXml(returnVal);
     
                       ////input = XDocument.Parse(returnVal);
     
                       dataFormatedJson = JsonConvert.SerializeXmlNode(doc);
     
                       TopicClient client = null;
                       try
                       {
                           client = TopicClient.CreateFromConnectionString(connectionString, topicName);
     
                           client.Send(new BrokeredMessage(formattedData));
                           client.Close();
                       }
                       catch (Exception ex)
                       {
                           this.eventLog.WriteEntry("Message: " + ex.Message + "Trace: " + ex.StackTrace + "Data: " + formattedData);
                       }
                   }
               }
               catch (MessageQueueException ex)
               {
                   this.eventLog.WriteEntry("Message: " + ex.Message + "Trace: " + ex.StackTrace + "Data: " + dataFormatedJson);
               }
               catch (Exception ex)
               {
                   this.eventLog.WriteEntry("Message: " + ex.Message + "Trace: " + ex.StackTrace + "Data: " + dataFormatedJson);
               }
               finally
               {
                   messageQueue.Close();
               }
           }
    

Using above two function you can read and write the data from Azure service bus Topic to MSMSQ and similarly you write the data back to the Azure Service Bus Topic from MSMQ.

Hope this help you to understand how to send and receive the data from Azure Service Bus Topic and vice versa.

 


Share Story :

Secured By miniOrange