BizTalk : Analysis of Direct Mapping vs XDocument Pipeline vs Streaming Pipeline To Process Large Messages for SQL Bulk Insert

Time to Read: 17 minutes

Introduction

Sometimes while developing BizTalk applications, there are requirements where BizTalk developers have to write custom code that will help them achieve the functionality they want. e.g, functionality to process data from excel sheet or to process pdf documents . While in some requirements it becomes necessary that BizTalk developers write custom components which help them, make the BizTalk application perform in optimal condition. e.g. while dealing with large messages that are to be processed by BizTalk. In such case if the application is not designed properly then there are chances that the solution will put lot of strain on the BizTalk environment which can in turn cause the degradation of performance for  other applications. This article aims to discuss one such scenario where BizTalk handles large messages and what BizTalk developers do to handle this load better by writing better code.


Fictional Scenario

EmployeeManager is a application which is used by the OnBorading team to onboard the employees in to the company ABC Pvtr ltd. EmployeeManager uses the BizTalk as a middle ware to communicate with multiple systems in organization. One such process in the EmployeeManager is where they upload the Employees On boarded in bulk . Another process is uploading the necessary on boarding documents of the employee to the data store. In such cases EmployeeManager relies on BizTalk to communicate with the SQL data store where the System stores documents from the employees.

As there can be multiple employees that can be on boarded in a day, BizTalk needs an efficient solution to manage the large messages. In this article the example of saving the Employee Information like user ID, Location and their avatar(profile picture) to the database is discussed. Each call to BizTalk can contain multiple employee profiles to be inserted/updated in the datastore. The messages that BizTalk receives can go up to 30 to 40 MB and at times even more than that. This BizTalk application is hosted on a BizTalk Group where there are other applications which also process high loads of messages. Hence it becomes of paramount importance to have a graceful code which is efficient itself not to cause issues for other applications.


Conventional Approach

The basic approach to achieve this is to use the BizTalk development approach as is. Meaning, The BizTalk exposes a schema to accept the details like the userId, Location and Avatar and then map them to the schema generated for inserting the data into the SQL store. This approach has apparent disadvantages. Since the BizTalk publishes messages to the message box and then the subscriber pricks it up, in such case the entire message which can be up to 30 to 40 MB is published in the message box and then it is picked up by the orchestration which then works on the business logic and again publishes the transformed message for the SQL port to pick up for the insertion. In such case the large message gets published twice in the message box ( once when the BizTalk Receive location acts as an publisher and a second time when Orchestration acts as a Publisher). If the number of calls are more for such large messages, it is bound to produce issues down the line for other applications.


Conventional Solution

The conventional solution that can be thought of to this Direct Publish-Map -Publish scenario is

  1. Identify the parts of the message which are big in size (in this case the avatars of the employees)
  2. Create a temporary data store on the  File System of BizTalk application server
  3. On the receive location, use a custom pipeline component which removes the large data from the incoming message and save it as a temporary file in the data store.
  4. Replace the large chunk in the message with the path to the file store in the file store.
  5. Publish the message to the message box ( Size of the message is substantially reduced here as the large parts are stripped and replaced by the path of file)
  6. Let the orchestration subscribe and do the business logic and direct mapping into the SQL insert message.
  7. On the SQL send port, create a custom pipeline component and in the last stage of pipeline(encode stage) , replace the path to the file with the actual data after reading the file from data store.
  8. Delete the file from the store and send the message to sql for insert.

The advantage the solution gives is that it removes the large data from the message and stores it if to file system temporarily. Thus this ensures that the message sent to the “Actual BizTalk To process” has a very small size and can ensure efficient processing.


Problem With The Conventional Solution?

The conventional solution discussed above is the correct way to go about designing the solution. But there can be problems if this is not implemented properly. Most of the time the developers use the XDocument class to do the work for them. They just load the message incoming into the pipeline and then perform the necessary operations on them. While this a perfectly valid way of doing the stuff, the problem lies with the way XDocument makes the changes to the message. The XDocument class loads the entire BizTalk message in to the process memory and then does the necessary work. The disadvantage with this process is that the XDocument creates a memory foot print which is way larger than the actual size of the message coming into the BizTalk. This defeats the purpose of implementation of the conventional solution.

How To Overcome This Problem?

BizTalk server product provides an Microsoft.BizTalk.Streaming.dll assembly which has various streaming classes present in it which can be used in lieu of the classical XDocument class. These classes are inherited from the System.IO classes and they write the message components to the file instead of loading the message components into the memory as done by the XDocument class. Thus using the classes in Streaming namespace will give the added benefit of having a code which goes easier on the in process memory. The only problem is the lack of documentation and knowledge about the classes in the streaming classes. This article will try to help the reader understand few classes of the Streaming namespace.


Implementation

The implementation for the demo for this article can be divided into following categories.

  1. Design SQL Tables and Stored Procedure to Save Employee Data
  2. Design the BizTalk Solution to Do a Bul Insert of Employee Profiles to SQL Table
  3. Create the Custom Receive Pipeline Component Using XDocument To Load Large Message
  4. Create the Custom Send Pipeline Component Using XDocument To Load Large Message
  5. Create the Custom Receive Pipeline Component Using BizTalk Streaming Classes To Load Large Message
  6. Create the Custom Send Pipeline Component Using BizTalk Streaming Classes To Load Large Message

SQL Design

Following code can be used to create the table that stores the employee profile details.

USE [POCDatabase]
GO
/****** Object:  Table [dbo].[Employee_Profile]    Script Date: 2/24/2018 7:49:24 PM ******/
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
CREATE TABLE [dbo].[Employee_Profile](
    [UserId] [int] NOT NULL,
    [LOCATION] [varchar](10) NOT NULL,
    [AVATAR] [image] NOT NULL,
UNIQUE NONCLUSTERED
(
    [UserId] ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY] TEXTIMAGE_ON [PRIMARY]
GO

Following is the Stored Procedure that is used to insert the data into the table created above.

USE [POCDatabase]
GO
/****** Object:  StoredProcedure [dbo].[usp_OnboardEmployee]    Script Date: 2/24/2018 7:55:03 PM ******/
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
CREATE PROCEDURE [dbo].[usp_OnboardEmployee]
@employeeId INT, @employeeName VARCHAR (100), @salary DECIMAL (10, 2), @managerid INT
AS
BEGIN
    INSERT  INTO [EMPLOYEE_MASTER]
    VALUES (@employeeId, @employeeName, @salary, @managerid);
END
GO

BizTalk Design

Following is the schema that is used for the BizTalk service ( BizTalk schemas exposed as WCF Service)

<?xml version="1.0" encoding="utf-16"?>
  <xs:element name="Request">
    <xs:complexType>
      <xs:sequence>
        <xs:element minOccurs="1" maxOccurs="unbounded" name="Profile">
          <xs:complexType>
            <xs:sequence>
              <xs:element name="UserId" type="xs:string" />
              <xs:element name="Location" type="xs:string" />
              <xs:element name="Avatar" type="xs:base64Binary" />
            </xs:sequence>
          </xs:complexType>
        </xs:element>
      </xs:sequence>
    </xs:complexType>
  </xs:element>
  <xs:element name="Response">
    <xs:complexType>
      <xs:sequence>
        <xs:element minOccurs="1" maxOccurs="unbounded" name="Profile">
          <xs:complexType>
            <xs:sequence>
              <xs:element name="UserId" type="xs:string" />
              <xs:element name="TransactionId" type="xs:string" />
            </xs:sequence>
          </xs:complexType>
        </xs:element>
      </xs:sequence>
    </xs:complexType>
  </xs:element>
</xs:schema>

The schemas generated after consuming the SQL stored procedure are as follows ( Refer See Also Section to understand how to consume the Stored Procedure in Visual Studio BizTalk Application)

<?xml version="1.0" encoding="utf-8"?>
  <xs:annotation>
    <xs:appinfo>
      <fileNameHint xmlns="http://schemas.microsoft.com/servicemodel/adapters/metadata/xsd">TypedProcedure.dbo</fileNameHint>
    </xs:appinfo>
  </xs:annotation>
  <xs:element name="usp_UpdateEmployeeProfile">
    <xs:annotation>
      <xs:documentation>
        <doc:action xmlns:doc="http://schemas.microsoft.com/servicemodel/adapters/metadata/documentation">TypedProcedure/dbo/usp_UpdateEmployeeProfile</doc:action>
      </xs:documentation>
    </xs:annotation>
    <xs:complexType>
      <xs:sequence>
        <xs:element minOccurs="0" maxOccurs="1" name="USERId" nillable="true" type="xs:int" />
        <xs:element minOccurs="0" maxOccurs="1" name="location" nillable="true">
          <xs:simpleType>
            <xs:restriction base="xs:string">
              <xs:maxLength value="10" />
            </xs:restriction>
          </xs:simpleType>
        </xs:element>
        <xs:element minOccurs="0" maxOccurs="1" name="avatar" nillable="true" type="xs:base64Binary" />
      </xs:sequence>
    </xs:complexType>
  </xs:element>
  <xs:element name="usp_UpdateEmployeeProfileResponse">
    <xs:annotation>
      <xs:documentation>
        <doc:action xmlns:doc="http://schemas.microsoft.com/servicemodel/adapters/metadata/documentation">TypedProcedure/dbo/usp_UpdateEmployeeProfile/response</doc:action>
      </xs:documentation>
    </xs:annotation>
    <xs:complexType>
      <xs:sequence>
        <xs:element minOccurs="1" maxOccurs="1" name="ReturnValue" type="xs:int" />
      </xs:sequence>
    </xs:complexType>
  </xs:element>
</xs:schema>

In Odrer To do a Bulk Insert using the Composite Operation facility of the WCf-Custom/WCF-SQL adapter, following custom schema is created and the request/response types created above are imported as child records in the schema. The Min and max occurs properties are set to ) and unbounded respectively. Following is the schema created for the bulk insert.

<?xml version="1.0" encoding="utf-16"?>
  <xs:import schemaLocation=".\TypedProcedure.dbo.xsd" namespace="http://schemas.microsoft.com/Sql/2008/05/TypedProcedures/dbo" />
  <xs:annotation>
    <xs:appinfo>
      <b:references>
        <b:reference targetNamespace="http://schemas.microsoft.com/Sql/2008/05/TypedProcedures/dbo" />
      </b:references>
    </xs:appinfo>
  </xs:annotation>
  <xs:element name="UpdateEmployeeProfiles">
    <xs:complexType>
      <xs:sequence>
        <xs:element minOccurs="1" maxOccurs="unbounded" ref="ns0:usp_UpdateEmployeeProfile" />
      </xs:sequence>
    </xs:complexType>
  </xs:element>
  <xs:element name="UpdateEmployeeProfilesResponse">
    <xs:complexType>
      <xs:sequence>
        <xs:element minOccurs="1" maxOccurs="unbounded" ref="ns0:usp_UpdateEmployeeProfileResponse" />
      </xs:sequence>
    </xs:complexType>
  </xs:element>
</xs:schema>

Following screen shot shows the schema as visible in the BizTalk schema editor.

Following are the maps used to create the SQL insert message and the Response Returned by BizTalk WCF Service( The scripting functoid just returns the GUID as transaction if).


The Orchestration Flow is as Follows.

Custom Receive Pipeline Using XDocument

The code for the receive pipeline using the XDocument is as follows. The Component loads the entire message and memory which causes a high memory utilization.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.BizTalk.Message.Interop;
using Microsoft.BizTalk.Component.Interop;
using System.IO;
using System.Xml.Linq;
namespace ProcessLargeFilesInBizTalk.Components
{
    [ComponentCategory(CategoryTypes.CATID_PipelineComponent)]
    [ComponentCategory(CategoryTypes.CATID_Decoder)]
    [System.Runtime.InteropServices.Guid("2B62CE58-5C73-483D-8CB2-76E2E3014057")]
    public class ExtractorClassesUsingXDocument : IBaseComponent, IComponentUI, IComponent, IPersistPropertyBag
    {
        #region IBaseComponent
        private const string _Description = "Decode Pipeline Component To Extract the Data from Node and save To File";
        private const string _Name = "ImageExtractorXDocument";
        private const string _Version = "1.0.0.0";
        public string Description
        {
            get { return _Description; }
        }
        public string Name
        {
            get { return _Name; }
        }
        public string Version
        {
            get { return _Version; }
        }
        #endregion
        #region IComponentUI
        private IntPtr _Icon = new IntPtr();
        public IntPtr Icon
        {
            get { return _Icon; }
        }
        public System.Collections.IEnumerator Validate(object projectSystem)
        {
            return null;
        }
        #endregion
        #region IpersistPropertyBag
        private string _ImageDataNodeName;
        private string _LocalTempDataStore;
        private string _FileExtension;
        public string ImageDataNodeName
        {
            get { return _ImageDataNodeName; }
            set { _ImageDataNodeName = value; }
        }
        public string LocalTempDataStore
        {
            get { return _LocalTempDataStore; }
            set { _LocalTempDataStore = value; }
        }
        public string FileExtension
        {
            get { return _FileExtension; }
            set { _FileExtension = value; }
        }
        public void GetClassID(out Guid classID)
        {
            classID = new Guid("2B62CE58-5C73-483D-8CB2-76E2E3014057");
        }
        public void InitNew()
        {
        }
        public void Load(IPropertyBag pBag, int errorLog)
        {
            object val1 = null;
            object val2 = null;
            object val3 = null;
            try
            {
                pBag.Read("ImageDataNodeName", out val1, 0);
                pBag.Read("LocalTempDataStore", out val2, 0);
                pBag.Read("FileExtension", out val3, 0);
            }
            catch (ArgumentException)
            {
            }
            catch (Exception ex)
            {
                throw new ApplicationException("Error Reading Property Bag. Error Message " + ex.Message);
            }
            if (val1 != null)
            {
                _ImageDataNodeName = (string)val1;
            }
            if (val2 != null)
            {
                _LocalTempDataStore = (string)val2;
            }
            if (val3 != null)
            {
                _FileExtension = (string)val3;
            }
        }
        public void Save(IPropertyBag pBag, bool clearDirty, bool saveAllProperties)
        {
            object val1 = (object)_ImageDataNodeName;
            object val2 = (object)_LocalTempDataStore;
            object val3 = (object)_FileExtension;
            pBag.Write("ImageDataNodeName", val1);
            pBag.Write("LocalTempDataStore", val2);
            pBag.Write("FileExtension", val3);
        }
        #endregion
        #region IComponent
        public IBaseMessage Execute(IPipelineContext pContext, IBaseMessage inMsg)
        {
            Stream s = inMsg.BodyPart.GetOriginalDataStream();
            XDocument xdoc = XDocument.Load(s);
            IEnumerable<XElement> largeDataNodes = xdoc.Descendants(ImageDataNodeName);
            foreach (XElement xe in largeDataNodes)
            {
                try
                {
                    string base64Data = xe.Value;
                    var data = new byte[4096];
                    string uniqueId = System.Guid.NewGuid().ToString();
                    string filePath = Path.Combine(_LocalTempDataStore, uniqueId + "." + _FileExtension);
                    byte[] imageData = Convert.FromBase64String(base64Data);
                    File.WriteAllBytes(filePath, imageData);
                    //replace the base64 string with the file path
                    xe.SetValue(filePath);
                }
                catch (Exception)
                {
                    //Swallow the exception and move on to next node.
                }
            }
            System.Diagnostics.EventLog.WriteEntry("Xdoc", xdoc.ToString());
            byte[] outBytes = System.Text.Encoding.UTF8.GetBytes(xdoc.ToString());
            MemoryStream ms = new MemoryStream(outBytes);
            inMsg.BodyPart.Data = ms;
            return inMsg;
        }
        #endregion
    }
}

The receive pipeline is created by registering above component to COM and then adding it to the decoder stage followed by an xml disassembler in the Disassemble stage of the pipeline. The Pipeline shown below. 

Custom Send Pipeline Using XDocument

The code for the receive pipeline using the XDocument is as follows. The Component loads the entire message and memory which causes a high memory utilization.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.BizTalk.Message.Interop;
using Microsoft.BizTalk.Component.Interop;
using System.IO;
using System.Xml;
using System.Xml.Linq;
namespace ProcessLargeFilesInBizTalk.Components
{
    [ComponentCategory(CategoryTypes.CATID_PipelineComponent)]
    [ComponentCategory(CategoryTypes.CATID_Encoder)]
    [System.Runtime.InteropServices.Guid("A5A1F8AD-3C4B-4CAB-A51D-D68EEAD56D67")]
    public class InjectorClassesUsingXDocument : IBaseComponent, IComponentUI, IComponent, IPersistPropertyBag
    {
        #region IBaseComponent
        private const string _Description = "Encode Pipeline Component To Inject the Data into Node from file saved By Extractor";
        private const string _Name = "ImageInjectorXDocument";
        private const string _Version = "1.0.0.0";
        public string Description
        {
            get { return _Description; }
        }
        public string Name
        {
            get { return _Name; }
        }
        public string Version
        {
            get { return _Version; }
        }
        #endregion
        #region IComponentUI
        private IntPtr _Icon = new IntPtr();
        public IntPtr Icon
        {
            get { return _Icon; }
        }
        public System.Collections.IEnumerator Validate(object projectSystem)
        {
            return null;
        }
        #endregion
        #region IpersistPropertyBag
        private string _ImageDataNodeName;
        public string ImageDataNodeName
        {
            get { return _ImageDataNodeName; }
            set { _ImageDataNodeName = value; }
        }
        public void GetClassID(out Guid classID)
        {
            classID = new Guid("A5A1F8AD-3C4B-4CAB-A51D-D68EEAD56D67");
        }
        public void InitNew()
        {
        }
        public void Load(IPropertyBag pBag, int errorLog)
        {
            object val1 = null;
            try
            {
                pBag.Read("ImageDataNodeName", out val1, 0);
            }
            catch (ArgumentException)
            {
            }
            catch (Exception ex)
            {
                throw new ApplicationException("Error Reading Property Bag. Error Message " + ex.Message);
            }
            if (val1 != null)
            {
                _ImageDataNodeName = (string)val1;
            }
        }
        public void Save(IPropertyBag pBag, bool clearDirty, bool saveAllProperties)
        {
            object val1 = (object)_ImageDataNodeName;
            pBag.Write("ImageDataNodeName", val1);
        }
        #endregion
        #region IComponent
        public IBaseMessage Execute(IPipelineContext pContext, IBaseMessage inMsg)
        {
             
            Stream s = inMsg.BodyPart.GetOriginalDataStream();
            XDocument xDoc = XDocument.Load(s);
            System.Diagnostics.EventLog.WriteEntry("xdoc", xDoc.ToString());
            XNamespace ns0 = "http://schemas.microsoft.com/Sql/2008/05/TypedProcedures/dbo";
            IEnumerable<XElement> imageDataNodes = xDoc.Descendants(ns0 + _ImageDataNodeName);
            foreach (XElement xe in imageDataNodes)
            {
                try
                {
                    string filePath = xe.Value;
                    System.Diagnostics.EventLog.WriteEntry("File Path", filePath);
                    byte[] imageData = File.ReadAllBytes(filePath);
                    string image = Convert.ToBase64String(imageData);
                    xe.SetValue(image);
                    File.Delete(filePath);
                }
                catch (Exception ex)
                {
                    //Swallow the exception and move to next node.
                }
                 
            }
            byte[] outBytes = System.Text.Encoding.UTF8.GetBytes(xDoc.ToString());
            MemoryStream ms = new MemoryStream(outBytes);
            inMsg.BodyPart.Data = ms;
            return inMsg;
        }
        #endregion
    }
}

The send pipeline is created by registering above component to the COM and then adding it to the encoder stage of the Send Pipeline. Following is a sample send pipeline.

 

Custom Receive Pipeline Using BizTalk Streaming Classes

The streaming classes are located in the the Microsoft.BizTalk.Streaming.dll assembly and should be used while dealing with large messages. Following piece of code uses the XmlTranslator stream to modify the message and it does so at the boundary of the BizTalk so that the messages getting published in the message box are small messages. Following is the code for the receive pipeline decoder component.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.BizTalk.Streaming;
using Microsoft.BizTalk.Message.Interop;
using Microsoft.BizTalk.Component.Interop;
using System.IO;
using System.Xml;
namespace ProcessLargeFilesInBizTalk.Components
{
    /// <summary>
    /// Class to implement the Custom Decoder Pipeline component which in turn uses the XMLTranlsatorStream to modify message contents.
    /// This gives the advantage of modifying the message at the boundary of BizTalk and avoid publishing big messages to MSgBoxDatabase
    /// </summary>
    [ComponentCategory(CategoryTypes.CATID_PipelineComponent)]
    [ComponentCategory(CategoryTypes.CATID_Decoder)]
    [System.Runtime.InteropServices.Guid("FA8EC02F-A33C-4F2A-A804-44E57FA4EEA3")]
    public class Extractor : IBaseComponent, IComponentUI, IComponent, IPersistPropertyBag
    {
        #region IBaseComponent
        private const string _Description = "Decode Pipeline Component To Extract the Data from Node and save To File";
        private const string _Name = "ImageExtractor";
        private const string _Version = "1.0.0.0";
        public string Description
        {
            get { return _Description; }
        }
        public string Name
        {
            get { return _Name; }
        }
        public string Version
        {
            get { return _Version; }
        }
        #endregion
        #region IComponentUI
        private IntPtr _Icon = new IntPtr();
        public IntPtr Icon
        {
            get { return _Icon; }
        }
        public System.Collections.IEnumerator Validate(object projectSystem)
        {
            return null;
        }
        #endregion
        #region IpersistPropertyBag
        private string _ImageDataNodeName;
        private string _LocalTempDataStore;
        private string _FileExtension;
        public string ImageDataNodeName
        {
            get { return _ImageDataNodeName; }
            set { _ImageDataNodeName = value; }
        }
        public string LocalTempDataStore
        {
            get { return _LocalTempDataStore; }
            set { _LocalTempDataStore = value; }
        }
        public string FileExtension
        {
            get { return _FileExtension; }
            set { _FileExtension = value; }
        }
        public void GetClassID(out Guid classID)
        {
            classID = new Guid("FA8EC02F-A33C-4F2A-A804-44E57FA4EEA3");
        }
        public void InitNew()
        {
        }
        public void Load(IPropertyBag pBag, int errorLog)
        {
            object val1 = null;
            object val2 = null;
            object val3 = null;
            try
            {
                pBag.Read("ImageDataNodeName", out val1, 0);
                pBag.Read("LocalTempDataStore", out val2, 0);
                pBag.Read("FileExtension", out val3, 0);
            }
            catch (ArgumentException)
            {
            }
            catch (Exception ex)
            {
                throw new ApplicationException("Error Reading Property Bag. Error Message " + ex.Message);
            }
            if (val1 != null)
            {
                _ImageDataNodeName = (string)val1;
            }
            if (val2 != null)
            {
                _LocalTempDataStore = (string)val2;
            }
            if (val3 != null)
            {
                _FileExtension = (string)val3;
            }
        }
        public void Save(IPropertyBag pBag, bool clearDirty, bool saveAllProperties)
        {
            object val1 = (object)_ImageDataNodeName;
            object val2 = (object)_LocalTempDataStore;
            object val3 = (object)_FileExtension;
            pBag.Write("ImageDataNodeName", val1);
            pBag.Write("LocalTempDataStore", val2);
            pBag.Write("FileExtension", val3);
        }
        #endregion
        #region IComponent
        public IBaseMessage Execute(IPipelineContext pContext, IBaseMessage inMsg)
        {
            VirtualStream vs = new VirtualStream(inMsg.BodyPart.GetOriginalDataStream());
            XmlReader xReader = XmlReader.Create(vs);
            inMsg.BodyPart.Data = new ExtractorStream(xReader, _FileExtension, _ImageDataNodeName, _LocalTempDataStore);
            return inMsg;
        }
        #endregion
    }
    /// <summary>
    /// This class is derived from the XmlTranslatorStream class( Present in the BizTalk Streaming Namespace)
    /// It overrides some of the functions to replace the image recieved as a base64string in the request to BizTalk with the path to the temp file folder where the image
    /// is temporarily stored.
    /// </summary>
    public class ExtractorStream : XmlTranslatorStream
    {
        private XmlReader reader;
        private XmlWriter writer;
        private string fileExtension;
        private string imageNodeName;
        private string localTempDataStorePath;
        private XmlReader xReader;
        public ExtractorStream(XmlReader xReader, string fileExtension, string imageNodeName, string localTempDataStorePath)
            : base(xReader)
        {
            this.fileExtension = fileExtension;
            this.imageNodeName = imageNodeName;
            this.localTempDataStorePath = localTempDataStorePath;
            Initialise();
        }
        private void Initialise()
        {
            reader = m_reader;
            writer = m_writer;
        }
        /// <summary>
        /// Ovverrides the TranslateStartElement function to check if the current node is the node which contains the image( as base64String)
        /// If so, the function, reads the value of the node and saves the base64String as a jpg file in the temporary data store. It then replaces the
        /// image string with path to the data store.
        /// </summary>
        /// <param name="prefix"></param>
        /// <param name="localName"></param>
        /// <param name="nsURI"></param>
        protected override void TranslateStartElement(string prefix, string localName, string nsURI)
        {
            if (reader.LocalName == imageNodeName)
            {
                var document = reader.ReadSubtree();
                document.Read();
                var data = new byte[4096];
                string uniqueId = System.Guid.NewGuid().ToString();
                string filePath = Path.Combine(localTempDataStorePath, uniqueId + "." + fileExtension);
                using (FileStream fs = File.Open(filePath, FileMode.CreateNew, FileAccess.Write, FileShare.Read))
                {
                    int bytesRead;
                    while ((bytesRead = document.ReadElementContentAsBase64(data, 0, data.Length)) > 0)
                    {
                        fs.Write(data, 0, bytesRead);
                    }
                    fs.Flush();
                }
                writer.WriteElementString(prefix, imageNodeName, nsURI, filePath);
            }
            else
            {
                base.TranslateStartElement(prefix, localName, nsURI);
            }
        }
        protected override void TranslateEndElement(bool full)
        {
            if (reader.LocalName == imageNodeName)
            {
            }
            else
            {
                base.TranslateEndElement(full);
            }
        }
    }
}

The receive pipeline is created by registering above component to COM and then adding it to the decoder stage followed by an xml disassembler in the Disassemble stage of the pipeline. The Pipeline shown below.

Custom Send Pipeline Using BizTalk Streaming Classes

The streaming classes are located in the the Microsoft.BizTalk.Streaming.dll assembly and should be used while dealing with large messages. Following piece of code uses the XmlTranslator stream to modify the message and it does so at the boundary of the BizTalk so that the messages getting published in the message box are small messages. Following is the code for the send pipeline coder component.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.BizTalk.Streaming;
using Microsoft.BizTalk.Message.Interop;
using Microsoft.BizTalk.Component.Interop;
using System.IO;
using System.Xml;
namespace ProcessLargeFilesInBizTalk.Components
{
    /// <summary>
    /// Class to implement the Custom Encoder Pipeline component which in turn uses the XMLTranlsatorStream to modify message contents.
    /// This gives the advantage of modifying the message at the boundary of BizTalk and avoid publishing big messages to MSgBoxDatabase
    /// </summary>
    [ComponentCategory(CategoryTypes.CATID_PipelineComponent)]
    [ComponentCategory(CategoryTypes.CATID_Encoder)]
    [System.Runtime.InteropServices.Guid("DD061000-A85D-4339-BF2A-531F9F4D529D")]
    public class Injector : IBaseComponent, IComponentUI, IComponent, IPersistPropertyBag
    {
        #region IBaseComponent
        private const string _Description = "Encode Pipeline Component To Inject the Data into Node from file saved By Extractor";
        private const string _Name = "ImageInjector";
        private const string _Version = "1.0.0.0";
        public string Description
        {
            get { return _Description; }
        }
        public string Name
        {
            get { return _Name; }
        }
        public string Version
        {
            get { return _Version; }
        }
        #endregion
        #region IComponentUI
        private IntPtr _Icon = new IntPtr();
        public IntPtr Icon
        {
            get { return _Icon; }
        }
        public System.Collections.IEnumerator Validate(object projectSystem)
        {
            return null;
        }
        #endregion
        #region IpersistPropertyBag
        private string _ImageDataNodeName;
         
        public string ImageDataNodeName
        {
            get { return _ImageDataNodeName; }
            set { _ImageDataNodeName = value; }
        }
         
        public void GetClassID(out Guid classID)
        {
            classID = new Guid("DD061000-A85D-4339-BF2A-531F9F4D529D");
        }
        public void InitNew()
        {
        }
        public void Load(IPropertyBag pBag, int errorLog)
        {
            object val1 = null;
            
            try
            {
                pBag.Read("ImageDataNodeName", out val1, 0);
                 
            }
            catch (ArgumentException)
            {
            }
            catch (Exception ex)
            {
                throw new ApplicationException("Error Reading Property Bag. Error Message " + ex.Message);
            }
            if (val1 != null)
            {
                _ImageDataNodeName = (string)val1;
            }
        }
        public void Save(IPropertyBag pBag, bool clearDirty, bool saveAllProperties)
        {
            object val1 = (object)_ImageDataNodeName;
            pBag.Write("ImageDataNodeName", val1);
            
        }
        #endregion
        #region IComponent
        public IBaseMessage Execute(IPipelineContext pContext, IBaseMessage inMsg)
        {
            VirtualStream vs = new VirtualStream(inMsg.BodyPart.GetOriginalDataStream());
            XmlReader xReader = XmlReader.Create(vs);
            inMsg.BodyPart.Data = new InjectorStream(xReader,_ImageDataNodeName);
            return inMsg;
        }
        #endregion
    }
    /// <summary>
    /// This class is derived from the XmlTranslatorStream class( Present in the BizTalk Streaming Namespace)
    /// It overrides some of the dinctions to replace the image file location with the the actual file stored at the
    /// Physical data store. Once the File is converted to base64 string, and written to the message, The file in the temp data store is deleted
    /// </summary>
    public class InjectorStream : XmlTranslatorStream
    {
        private XmlReader reader;
        private XmlWriter writer;
        private string imageDataNodeName;
        /// <summary>
        ///
        /// </summary>
        /// <param name="xReader"> Reader created from the virtual stream created from the BizTalk message to SQL adapter</param>
        /// <param name="imageDataNodeName">Node with the base64binary type which stores the image data too be inserted into SQL table</param>
        public InjectorStream(XmlReader xReader, string imageDataNodeName)
            :base(xReader)
        {
            this.imageDataNodeName = imageDataNodeName;
            Initialise();
        }
        private void Initialise()
        {
            reader = m_reader;
            writer = m_writer;
        }
        /// <summary>
        /// Ovverride the TranslateStartElement method to check if the current node is the node which contains the path to the image stored at file store
        /// If the current node matches the image node, the method, reads the file from the data store, updates the value of the node
        /// usng writer and once done deletes the file from temp folder
        /// </summary>
        /// <param name="prefix"></param>
        /// <param name="localName"></param>
        /// <param name="nsURI"></param>
        protected override void TranslateStartElement(string prefix, string localName, string nsURI)
        {
            if (reader.LocalName == imageDataNodeName)
            {
                var document = reader.ReadSubtree();
                document.Read();
                var imageData = String.Empty;
                string tempFilePath = document.ReadElementContentAsString();
                if (File.Exists(tempFilePath))
                {
                    byte[] imageBytes = File.ReadAllBytes(tempFilePath);
                    imageData = Convert.ToBase64String(imageBytes);
                }
                writer.WriteElementString(localName, nsURI, imageData);
                File.Delete(tempFilePath);
            }
            else
            {
                base.TranslateStartElement(prefix, localName, nsURI);
            }
        }
        protected override void TranslateEndElement(bool full)
        {
            base.TranslateEndElement(full);
        }
         
    }
}

The send pipeline is created by registering above component in COM and adding it to the encoder stage of the send pipeline. Following is a sample pipeline.

This completes the implementation part of the walkthrough.

↑Back To Top


Deploying BizTalk Solution

Following are the general steps that are followed to deploy and configure the BizTalk Solution.

  1. Deploy the BizTalk solution.
  2. Expose the BizTalk Schema BizTalkServiceTypes as WCF Service.
  3. Configure the IIS location by mapping the IIS application to a correct Application Pool.
  4. Once the schemas are exposed as wcf service, a receive location and port will be created in the BizTalk application. Next step is to set up the maximum size the allowed for the messages. Refer to following sample screenshot.

    The configuration of the receive pipeline using the XDocument class is shown below.

    The configuration of the receive pipeline using the Streaming classes is shown below.

  5. Import the Binding File generated by Consuming the stored proc. This will created the Sendport to insert the data into SQL. Set the Action Mapping to CompositeOperation as shown below.

    The configuration for the send pipeline using streaming classes is shown below.

    The configuration for the sen pipeline using streaming classes is shown below.

  6. In order to take care of the stripped image files in the temporary file store, a PowerShell script can be configured to clear the files out at the end of the business day. A sample script which deletes the files older than 1 day is as follows.
    param
    (
        [Parameter(Mandatory=$true)]
        $ThresholdMinutes,
        [Parameter(Mandatory=$true)]
        $PathFromWhereToDelete
    )
    if(Test-Path $PathFromWhereToDelete)
    {
        $currentDate = get-date
         
        Get-ChildItem -Path $PathFromWhereToDelete -File | where-object{$_.LastWriteTime -lt $currentDate.AddMinutes(-$ThresholdMinutes)} | Remove-Item
    }
    else
    {
        write-Host ("Path Not found to delete files from")
    }

 


Testing

The testing of the application was done by sending out multiple calls to the BizTalk service each containing a message of size around 20 MB and the memory and response times were noted for each of following scenarios.

  1. Direct Mapping allowing the entire message to publish into the message box
  2. Using the Pipleine Components with the XDocument Class
  3. Using the Pipeline Components with the Streaming Classes

The results are mentioned below.

Direct  Mapping Results

Response Time: 32308ms

The response time for the large message and its properties are noted in the SOAP UI request made to the BizTalk WCF service. 

Memory Utilization

Following Screen Shot captures the Perfmon counter .

Pipeline Components Using XDocument Class

Response Time: 5227ms

The response time for the large message and its properties are noted in the SOAP UI request made to the BizTalk WCF service. 

Memory Utilization
Following Screen Shot captures the Perfmon counter

Pipeline Components Using Streaming Class

Response Time: 5227ms

The response time for the large message and its properties are noted in the SOAP UI request made to the BizTalk WCF service.

Memory Utilization
Following Screen Shot captures the Perfmon counter


Conclusion

Based upon the results from the testing it is clear that the Streaming classes should be used while dealing with large messages to improve the performance. In case a decision needs to be made between XDocument or XmlDocument and Streaming Classes, a decision can be made based upon the size of the message that needs to be processed.


See Also

Following articles can be visited for more reading up on the topics discussed in this article.

  1. How to Use the BizTalk WCF Service Publishing Wizard to Publish Schemas as WCF Services
  2. How to Develop BizTalk Custom Pipeline Components – Part1
  3. Run composite operations on SQL Server using BizTalk Server
  4. BizTalk Server 2013: CRUD Operation With WCF-SQL Adapter and Correlation
  5. BizTalk Server: SQL Patterns for Polling and Batch Retrieve
  6. Windows PowerShell Basics

In order to learn more about the BizTalk product, a good place to start is BizTalk Server Resources on the TechNet Wiki


References

Following articles were referred while writing this article.

  1. BizTalk Server: Processing large files (streaming) by Eldert Grootenboer
  2. BizTalk Server: Custom pipeline optimization using the Virtual Stream class by Steef-Jan Wiggers
  3. Microsoft.BizTalk.Streaming Namespace
  4. Optimizing Memory Usage with Streaming

 


 

Programmer by profession, curious by nature.