BizTalk : Analysis of Direct Mapping vs XDocument Pipeline vs Streaming Pipeline To Process Large Messages for SQL Bulk Insert
Introduction
Sometimes while developing BizTalk applications, there are requirements where BizTalk developers have to write custom code that will help them achieve the functionality they want. e.g, functionality to process data from excel sheet or to process pdf documents . While in some requirements it becomes necessary that BizTalk developers write custom components which help them, make the BizTalk application perform in optimal condition. e.g. while dealing with large messages that are to be processed by BizTalk. In such case if the application is not designed properly then there are chances that the solution will put lot of strain on the BizTalk environment which can in turn cause the degradation of performance for other applications. This article aims to discuss one such scenario where BizTalk handles large messages and what BizTalk developers do to handle this load better by writing better code.
Fictional Scenario
EmployeeManager is a application which is used by the OnBorading team to onboard the employees in to the company ABC Pvtr ltd. EmployeeManager uses the BizTalk as a middle ware to communicate with multiple systems in organization. One such process in the EmployeeManager is where they upload the Employees On boarded in bulk . Another process is uploading the necessary on boarding documents of the employee to the data store. In such cases EmployeeManager relies on BizTalk to communicate with the SQL data store where the System stores documents from the employees.
As there can be multiple employees that can be on boarded in a day, BizTalk needs an efficient solution to manage the large messages. In this article the example of saving the Employee Information like user ID, Location and their avatar(profile picture) to the database is discussed. Each call to BizTalk can contain multiple employee profiles to be inserted/updated in the datastore. The messages that BizTalk receives can go up to 30 to 40 MB and at times even more than that. This BizTalk application is hosted on a BizTalk Group where there are other applications which also process high loads of messages. Hence it becomes of paramount importance to have a graceful code which is efficient itself not to cause issues for other applications.
Conventional Approach
The basic approach to achieve this is to use the BizTalk development approach as is. Meaning, The BizTalk exposes a schema to accept the details like the userId, Location and Avatar and then map them to the schema generated for inserting the data into the SQL store. This approach has apparent disadvantages. Since the BizTalk publishes messages to the message box and then the subscriber pricks it up, in such case the entire message which can be up to 30 to 40 MB is published in the message box and then it is picked up by the orchestration which then works on the business logic and again publishes the transformed message for the SQL port to pick up for the insertion. In such case the large message gets published twice in the message box ( once when the BizTalk Receive location acts as an publisher and a second time when Orchestration acts as a Publisher). If the number of calls are more for such large messages, it is bound to produce issues down the line for other applications.
Conventional Solution
The conventional solution that can be thought of to this Direct Publish-Map -Publish scenario is
- Identify the parts of the message which are big in size (in this case the avatars of the employees)
- Create a temporary data store on the File System of BizTalk application server
- On the receive location, use a custom pipeline component which removes the large data from the incoming message and save it as a temporary file in the data store.
- Replace the large chunk in the message with the path to the file store in the file store.
- Publish the message to the message box ( Size of the message is substantially reduced here as the large parts are stripped and replaced by the path of file)
- Let the orchestration subscribe and do the business logic and direct mapping into the SQL insert message.
- On the SQL send port, create a custom pipeline component and in the last stage of pipeline(encode stage) , replace the path to the file with the actual data after reading the file from data store.
- Delete the file from the store and send the message to sql for insert.
The advantage the solution gives is that it removes the large data from the message and stores it if to file system temporarily. Thus this ensures that the message sent to the “Actual BizTalk To process” has a very small size and can ensure efficient processing.
Problem With The Conventional Solution?
The conventional solution discussed above is the correct way to go about designing the solution. But there can be problems if this is not implemented properly. Most of the time the developers use the XDocument class to do the work for them. They just load the message incoming into the pipeline and then perform the necessary operations on them. While this a perfectly valid way of doing the stuff, the problem lies with the way XDocument makes the changes to the message. The XDocument class loads the entire BizTalk message in to the process memory and then does the necessary work. The disadvantage with this process is that the XDocument creates a memory foot print which is way larger than the actual size of the message coming into the BizTalk. This defeats the purpose of implementation of the conventional solution.
How To Overcome This Problem?
BizTalk server product provides an Microsoft.BizTalk.Streaming.dll assembly which has various streaming classes present in it which can be used in lieu of the classical XDocument class. These classes are inherited from the System.IO classes and they write the message components to the file instead of loading the message components into the memory as done by the XDocument class. Thus using the classes in Streaming namespace will give the added benefit of having a code which goes easier on the in process memory. The only problem is the lack of documentation and knowledge about the classes in the streaming classes. This article will try to help the reader understand few classes of the Streaming namespace.
Implementation
The implementation for the demo for this article can be divided into following categories.
- Design SQL Tables and Stored Procedure to Save Employee Data
- Design the BizTalk Solution to Do a Bul Insert of Employee Profiles to SQL Table
- Create the Custom Receive Pipeline Component Using XDocument To Load Large Message
- Create the Custom Send Pipeline Component Using XDocument To Load Large Message
- Create the Custom Receive Pipeline Component Using BizTalk Streaming Classes To Load Large Message
- Create the Custom Send Pipeline Component Using BizTalk Streaming Classes To Load Large Message
SQL Design
Following code can be used to create the table that stores the employee profile details.
USE [POCDatabase]
GO
/****** Object:
Table
[dbo].[Employee_Profile] Script
Date
: 2/24/2018 7:49:24 PM ******/
SET
ANSI_NULLS
ON
GO
SET
QUOTED_IDENTIFIER
ON
GO
CREATE
TABLE
[dbo].[Employee_Profile](
[UserId] [
int
]
NOT
NULL
,
[LOCATION] [
varchar
](10)
NOT
NULL
,
[AVATAR] [image]
NOT
NULL
,
UNIQUE
NONCLUSTERED
(
[UserId]
ASC
)
WITH
(PAD_INDEX =
OFF
, STATISTICS_NORECOMPUTE =
OFF
, IGNORE_DUP_KEY =
OFF
, ALLOW_ROW_LOCKS =
ON
, ALLOW_PAGE_LOCKS =
ON
)
ON
[
PRIMARY
]
)
ON
[
PRIMARY
] TEXTIMAGE_ON [
PRIMARY
]
GO
Following is the Stored Procedure that is used to insert the data into the table created above.
USE [POCDatabase]
GO
/****** Object: StoredProcedure [dbo].[usp_OnboardEmployee] Script
Date
: 2/24/2018 7:55:03 PM ******/
SET
ANSI_NULLS
ON
GO
SET
QUOTED_IDENTIFIER
ON
GO
CREATE
PROCEDURE
[dbo].[usp_OnboardEmployee]
@employeeId
INT
, @employeeName
VARCHAR
(100), @salary
DECIMAL
(10, 2), @managerid
INT
AS
BEGIN
INSERT
INTO
[EMPLOYEE_MASTER]
VALUES
(@employeeId, @employeeName, @salary, @managerid);
END
GO
BizTalk Design
Following is the schema that is used for the BizTalk service ( BizTalk schemas exposed as WCF Service)
<?
xml
version
=
"1.0"
encoding
=
"utf-16"
?>
<
xs:schema
xmlns
=
"http://ProcessLargeFilesInBizTalk.BizTalkServiceTypes"
xmlns:b
=
"http://schemas.microsoft.com/BizTalk/2003"
targetNamespace
=
"http://ProcessLargeFilesInBizTalk.BizTalkServiceTypes"
xmlns:xs
=
"http://www.w3.org/2001/XMLSchema"
>
<
xs:element
name
=
"Request"
>
<
xs:complexType
>
<
xs:sequence
>
<
xs:element
minOccurs
=
"1"
maxOccurs
=
"unbounded"
name
=
"Profile"
>
<
xs:complexType
>
<
xs:sequence
>
<
xs:element
name
=
"UserId"
type
=
"xs:string"
/>
<
xs:element
name
=
"Location"
type
=
"xs:string"
/>
<
xs:element
name
=
"Avatar"
type
=
"xs:base64Binary"
/>
</
xs:sequence
>
</
xs:complexType
>
</
xs:element
>
</
xs:sequence
>
</
xs:complexType
>
</
xs:element
>
<
xs:element
name
=
"Response"
>
<
xs:complexType
>
<
xs:sequence
>
<
xs:element
minOccurs
=
"1"
maxOccurs
=
"unbounded"
name
=
"Profile"
>
<
xs:complexType
>
<
xs:sequence
>
<
xs:element
name
=
"UserId"
type
=
"xs:string"
/>
<
xs:element
name
=
"TransactionId"
type
=
"xs:string"
/>
</
xs:sequence
>
</
xs:complexType
>
</
xs:element
>
</
xs:sequence
>
</
xs:complexType
>
</
xs:element
>
</
xs:schema
>
The schemas generated after consuming the SQL stored procedure are as follows ( Refer See Also Section to understand how to consume the Stored Procedure in Visual Studio BizTalk Application)
<?
xml
version
=
"1.0"
encoding
=
"utf-8"
?>
<
xs:schema
xmlns:tns
=
"http://schemas.microsoft.com/Sql/2008/05/TypedProcedures/dbo"
elementFormDefault
=
"qualified"
targetNamespace
=
"http://schemas.microsoft.com/Sql/2008/05/TypedProcedures/dbo"
version
=
"1.0"
xmlns:xs
=
"http://www.w3.org/2001/XMLSchema"
>
<
xs:annotation
>
<
xs:appinfo
>
<
fileNameHint
xmlns
=
"http://schemas.microsoft.com/servicemodel/adapters/metadata/xsd"
>TypedProcedure.dbo</
fileNameHint
>
</
xs:appinfo
>
</
xs:annotation
>
<
xs:element
name
=
"usp_UpdateEmployeeProfile"
>
<
xs:annotation
>
<
xs:documentation
>
<
doc:action
xmlns:doc
=
"http://schemas.microsoft.com/servicemodel/adapters/metadata/documentation"
>TypedProcedure/dbo/usp_UpdateEmployeeProfile</
doc:action
>
</
xs:documentation
>
</
xs:annotation
>
<
xs:complexType
>
<
xs:sequence
>
<
xs:element
minOccurs
=
"0"
maxOccurs
=
"1"
name
=
"USERId"
nillable
=
"true"
type
=
"xs:int"
/>
<
xs:element
minOccurs
=
"0"
maxOccurs
=
"1"
name
=
"location"
nillable
=
"true"
>
<
xs:simpleType
>
<
xs:restriction
base
=
"xs:string"
>
<
xs:maxLength
value
=
"10"
/>
</
xs:restriction
>
</
xs:simpleType
>
</
xs:element
>
<
xs:element
minOccurs
=
"0"
maxOccurs
=
"1"
name
=
"avatar"
nillable
=
"true"
type
=
"xs:base64Binary"
/>
</
xs:sequence
>
</
xs:complexType
>
</
xs:element
>
<
xs:element
name
=
"usp_UpdateEmployeeProfileResponse"
>
<
xs:annotation
>
<
xs:documentation
>
<
doc:action
xmlns:doc
=
"http://schemas.microsoft.com/servicemodel/adapters/metadata/documentation"
>TypedProcedure/dbo/usp_UpdateEmployeeProfile/response</
doc:action
>
</
xs:documentation
>
</
xs:annotation
>
<
xs:complexType
>
<
xs:sequence
>
<
xs:element
minOccurs
=
"1"
maxOccurs
=
"1"
name
=
"ReturnValue"
type
=
"xs:int"
/>
</
xs:sequence
>
</
xs:complexType
>
</
xs:element
>
</
xs:schema
>
In Odrer To do a Bulk Insert using the Composite Operation facility of the WCf-Custom/WCF-SQL adapter, following custom schema is created and the request/response types created above are imported as child records in the schema. The Min and max occurs properties are set to ) and unbounded respectively. Following is the schema created for the bulk insert.
<?
xml
version
=
"1.0"
encoding
=
"utf-16"
?>
<
xs:schema
xmlns
=
"http://ProcessLargeFilesInBizTalk.SQLSchemas.BulkInsertTypes"
xmlns:b
=
"http://schemas.microsoft.com/BizTalk/2003"
xmlns:ns0
=
"http://schemas.microsoft.com/Sql/2008/05/TypedProcedures/dbo"
targetNamespace
=
"http://ProcessLargeFilesInBizTalk.SQLSchemas.BulkInsertTypes"
xmlns:xs
=
"http://www.w3.org/2001/XMLSchema"
>
<
xs:import
schemaLocation
=
".\TypedProcedure.dbo.xsd"
namespace
=
"http://schemas.microsoft.com/Sql/2008/05/TypedProcedures/dbo"
/>
<
xs:annotation
>
<
xs:appinfo
>
<
b:references
>
<
b:reference
targetNamespace
=
"http://schemas.microsoft.com/Sql/2008/05/TypedProcedures/dbo"
/>
</
b:references
>
</
xs:appinfo
>
</
xs:annotation
>
<
xs:element
name
=
"UpdateEmployeeProfiles"
>
<
xs:complexType
>
<
xs:sequence
>
<
xs:element
minOccurs
=
"1"
maxOccurs
=
"unbounded"
ref
=
"ns0:usp_UpdateEmployeeProfile"
/>
</
xs:sequence
>
</
xs:complexType
>
</
xs:element
>
<
xs:element
name
=
"UpdateEmployeeProfilesResponse"
>
<
xs:complexType
>
<
xs:sequence
>
<
xs:element
minOccurs
=
"1"
maxOccurs
=
"unbounded"
ref
=
"ns0:usp_UpdateEmployeeProfileResponse"
/>
</
xs:sequence
>
</
xs:complexType
>
</
xs:element
>
</
xs:schema
>
Following screen shot shows the schema as visible in the BizTalk schema editor.
Following are the maps used to create the SQL insert message and the Response Returned by BizTalk WCF Service( The scripting functoid just returns the GUID as transaction if).
The Orchestration Flow is as Follows.
Custom Receive Pipeline Using XDocument
The code for the receive pipeline using the XDocument is as follows. The Component loads the entire message and memory which causes a high memory utilization.
using
System;
using
System.Collections.Generic;
using
System.Linq;
using
System.Text;
using
System.Threading.Tasks;
using
Microsoft.BizTalk.Message.Interop;
using
Microsoft.BizTalk.Component.Interop;
using
System.IO;
using
System.Xml.Linq;
namespace
ProcessLargeFilesInBizTalk.Components
{
[ComponentCategory(CategoryTypes.CATID_PipelineComponent)]
[ComponentCategory(CategoryTypes.CATID_Decoder)]
[System.Runtime.InteropServices.Guid(
"2B62CE58-5C73-483D-8CB2-76E2E3014057"
)]
public
class
ExtractorClassesUsingXDocument : IBaseComponent, IComponentUI, IComponent, IPersistPropertyBag
{
#region IBaseComponent
private
const
string
_Description =
"Decode Pipeline Component To Extract the Data from Node and save To File"
;
private
const
string
_Name =
"ImageExtractorXDocument"
;
private
const
string
_Version =
"1.0.0.0"
;
public
string
Description
{
get
{
return
_Description; }
}
public
string
Name
{
get
{
return
_Name; }
}
public
string
Version
{
get
{
return
_Version; }
}
#endregion
#region IComponentUI
private
IntPtr _Icon =
new
IntPtr();
public
IntPtr Icon
{
get
{
return
_Icon; }
}
public
System.Collections.IEnumerator Validate(
object
projectSystem)
{
return
null
;
}
#endregion
#region IpersistPropertyBag
private
string
_ImageDataNodeName;
private
string
_LocalTempDataStore;
private
string
_FileExtension;
public
string
ImageDataNodeName
{
get
{
return
_ImageDataNodeName; }
set
{ _ImageDataNodeName = value; }
}
public
string
LocalTempDataStore
{
get
{
return
_LocalTempDataStore; }
set
{ _LocalTempDataStore = value; }
}
public
string
FileExtension
{
get
{
return
_FileExtension; }
set
{ _FileExtension = value; }
}
public
void
GetClassID(
out
Guid classID)
{
classID =
new
Guid(
"2B62CE58-5C73-483D-8CB2-76E2E3014057"
);
}
public
void
InitNew()
{
}
public
void
Load(IPropertyBag pBag,
int
errorLog)
{
object
val1 =
null
;
object
val2 =
null
;
object
val3 =
null
;
try
{
pBag.Read(
"ImageDataNodeName"
,
out
val1, 0);
pBag.Read(
"LocalTempDataStore"
,
out
val2, 0);
pBag.Read(
"FileExtension"
,
out
val3, 0);
}
catch
(ArgumentException)
{
}
catch
(Exception ex)
{
throw
new
ApplicationException(
"Error Reading Property Bag. Error Message "
+ ex.Message);
}
if
(val1 !=
null
)
{
_ImageDataNodeName = (
string
)val1;
}
if
(val2 !=
null
)
{
_LocalTempDataStore = (
string
)val2;
}
if
(val3 !=
null
)
{
_FileExtension = (
string
)val3;
}
}
public
void
Save(IPropertyBag pBag,
bool
clearDirty,
bool
saveAllProperties)
{
object
val1 = (
object
)_ImageDataNodeName;
object
val2 = (
object
)_LocalTempDataStore;
object
val3 = (
object
)_FileExtension;
pBag.Write(
"ImageDataNodeName"
, val1);
pBag.Write(
"LocalTempDataStore"
, val2);
pBag.Write(
"FileExtension"
, val3);
}
#endregion
#region IComponent
public
IBaseMessage Execute(IPipelineContext pContext, IBaseMessage inMsg)
{
Stream s = inMsg.BodyPart.GetOriginalDataStream();
XDocument xdoc = XDocument.Load(s);
IEnumerable<XElement> largeDataNodes = xdoc.Descendants(ImageDataNodeName);
foreach
(XElement xe
in
largeDataNodes)
{
try
{
string
base64Data = xe.Value;
var data =
new
byte
[4096];
string
uniqueId = System.Guid.NewGuid().ToString();
string
filePath = Path.Combine(_LocalTempDataStore, uniqueId +
"."
+ _FileExtension);
byte
[] imageData = Convert.FromBase64String(base64Data);
File.WriteAllBytes(filePath, imageData);
//replace the base64 string with the file path
xe.SetValue(filePath);
}
catch
(Exception)
{
//Swallow the exception and move on to next node.
}
}
System.Diagnostics.EventLog.WriteEntry(
"Xdoc"
, xdoc.ToString());
byte
[] outBytes = System.Text.Encoding.UTF8.GetBytes(xdoc.ToString());
MemoryStream ms =
new
MemoryStream(outBytes);
inMsg.BodyPart.Data = ms;
return
inMsg;
}
#endregion
}
}
The receive pipeline is created by registering above component to COM and then adding it to the decoder stage followed by an xml disassembler in the Disassemble stage of the pipeline. The Pipeline shown below.
Custom Send Pipeline Using XDocument
The code for the receive pipeline using the XDocument is as follows. The Component loads the entire message and memory which causes a high memory utilization.
using
System;
using
System.Collections.Generic;
using
System.Linq;
using
System.Text;
using
System.Threading.Tasks;
using
Microsoft.BizTalk.Message.Interop;
using
Microsoft.BizTalk.Component.Interop;
using
System.IO;
using
System.Xml;
using
System.Xml.Linq;
namespace
ProcessLargeFilesInBizTalk.Components
{
[ComponentCategory(CategoryTypes.CATID_PipelineComponent)]
[ComponentCategory(CategoryTypes.CATID_Encoder)]
[System.Runtime.InteropServices.Guid(
"A5A1F8AD-3C4B-4CAB-A51D-D68EEAD56D67"
)]
public
class
InjectorClassesUsingXDocument : IBaseComponent, IComponentUI, IComponent, IPersistPropertyBag
{
#region IBaseComponent
private
const
string
_Description =
"Encode Pipeline Component To Inject the Data into Node from file saved By Extractor"
;
private
const
string
_Name =
"ImageInjectorXDocument"
;
private
const
string
_Version =
"1.0.0.0"
;
public
string
Description
{
get
{
return
_Description; }
}
public
string
Name
{
get
{
return
_Name; }
}
public
string
Version
{
get
{
return
_Version; }
}
#endregion
#region IComponentUI
private
IntPtr _Icon =
new
IntPtr();
public
IntPtr Icon
{
get
{
return
_Icon; }
}
public
System.Collections.IEnumerator Validate(
object
projectSystem)
{
return
null
;
}
#endregion
#region IpersistPropertyBag
private
string
_ImageDataNodeName;
public
string
ImageDataNodeName
{
get
{
return
_ImageDataNodeName; }
set
{ _ImageDataNodeName = value; }
}
public
void
GetClassID(
out
Guid classID)
{
classID =
new
Guid(
"A5A1F8AD-3C4B-4CAB-A51D-D68EEAD56D67"
);
}
public
void
InitNew()
{
}
public
void
Load(IPropertyBag pBag,
int
errorLog)
{
object
val1 =
null
;
try
{
pBag.Read(
"ImageDataNodeName"
,
out
val1, 0);
}
catch
(ArgumentException)
{
}
catch
(Exception ex)
{
throw
new
ApplicationException(
"Error Reading Property Bag. Error Message "
+ ex.Message);
}
if
(val1 !=
null
)
{
_ImageDataNodeName = (
string
)val1;
}
}
public
void
Save(IPropertyBag pBag,
bool
clearDirty,
bool
saveAllProperties)
{
object
val1 = (
object
)_ImageDataNodeName;
pBag.Write(
"ImageDataNodeName"
, val1);
}
#endregion
#region IComponent
public
IBaseMessage Execute(IPipelineContext pContext, IBaseMessage inMsg)
{
Stream s = inMsg.BodyPart.GetOriginalDataStream();
XDocument xDoc = XDocument.Load(s);
System.Diagnostics.EventLog.WriteEntry(
"xdoc"
, xDoc.ToString());
XNamespace ns0 =
"http://schemas.microsoft.com/Sql/2008/05/TypedProcedures/dbo"
;
IEnumerable<XElement> imageDataNodes = xDoc.Descendants(ns0 + _ImageDataNodeName);
foreach
(XElement xe
in
imageDataNodes)
{
try
{
string
filePath = xe.Value;
System.Diagnostics.EventLog.WriteEntry(
"File Path"
, filePath);
byte
[] imageData = File.ReadAllBytes(filePath);
string
image = Convert.ToBase64String(imageData);
xe.SetValue(image);
File.Delete(filePath);
}
catch
(Exception ex)
{
//Swallow the exception and move to next node.
}
}
byte
[] outBytes = System.Text.Encoding.UTF8.GetBytes(xDoc.ToString());
MemoryStream ms =
new
MemoryStream(outBytes);
inMsg.BodyPart.Data = ms;
return
inMsg;
}
#endregion
}
}
The send pipeline is created by registering above component to the COM and then adding it to the encoder stage of the Send Pipeline. Following is a sample send pipeline.
Custom Receive Pipeline Using BizTalk Streaming Classes
The streaming classes are located in the the Microsoft.BizTalk.Streaming.dll assembly and should be used while dealing with large messages. Following piece of code uses the XmlTranslator stream to modify the message and it does so at the boundary of the BizTalk so that the messages getting published in the message box are small messages. Following is the code for the receive pipeline decoder component.
using
System;
using
System.Collections.Generic;
using
System.Linq;
using
System.Text;
using
System.Threading.Tasks;
using
Microsoft.BizTalk.Streaming;
using
Microsoft.BizTalk.Message.Interop;
using
Microsoft.BizTalk.Component.Interop;
using
System.IO;
using
System.Xml;
namespace
ProcessLargeFilesInBizTalk.Components
{
/// <summary>
/// Class to implement the Custom Decoder Pipeline component which in turn uses the XMLTranlsatorStream to modify message contents.
/// This gives the advantage of modifying the message at the boundary of BizTalk and avoid publishing big messages to MSgBoxDatabase
/// </summary>
[ComponentCategory(CategoryTypes.CATID_PipelineComponent)]
[ComponentCategory(CategoryTypes.CATID_Decoder)]
[System.Runtime.InteropServices.Guid(
"FA8EC02F-A33C-4F2A-A804-44E57FA4EEA3"
)]
public
class
Extractor : IBaseComponent, IComponentUI, IComponent, IPersistPropertyBag
{
#region IBaseComponent
private
const
string
_Description =
"Decode Pipeline Component To Extract the Data from Node and save To File"
;
private
const
string
_Name =
"ImageExtractor"
;
private
const
string
_Version =
"1.0.0.0"
;
public
string
Description
{
get
{
return
_Description; }
}
public
string
Name
{
get
{
return
_Name; }
}
public
string
Version
{
get
{
return
_Version; }
}
#endregion
#region IComponentUI
private
IntPtr _Icon =
new
IntPtr();
public
IntPtr Icon
{
get
{
return
_Icon; }
}
public
System.Collections.IEnumerator Validate(
object
projectSystem)
{
return
null
;
}
#endregion
#region IpersistPropertyBag
private
string
_ImageDataNodeName;
private
string
_LocalTempDataStore;
private
string
_FileExtension;
public
string
ImageDataNodeName
{
get
{
return
_ImageDataNodeName; }
set
{ _ImageDataNodeName = value; }
}
public
string
LocalTempDataStore
{
get
{
return
_LocalTempDataStore; }
set
{ _LocalTempDataStore = value; }
}
public
string
FileExtension
{
get
{
return
_FileExtension; }
set
{ _FileExtension = value; }
}
public
void
GetClassID(
out
Guid classID)
{
classID =
new
Guid(
"FA8EC02F-A33C-4F2A-A804-44E57FA4EEA3"
);
}
public
void
InitNew()
{
}
public
void
Load(IPropertyBag pBag,
int
errorLog)
{
object
val1 =
null
;
object
val2 =
null
;
object
val3 =
null
;
try
{
pBag.Read(
"ImageDataNodeName"
,
out
val1, 0);
pBag.Read(
"LocalTempDataStore"
,
out
val2, 0);
pBag.Read(
"FileExtension"
,
out
val3, 0);
}
catch
(ArgumentException)
{
}
catch
(Exception ex)
{
throw
new
ApplicationException(
"Error Reading Property Bag. Error Message "
+ ex.Message);
}
if
(val1 !=
null
)
{
_ImageDataNodeName = (
string
)val1;
}
if
(val2 !=
null
)
{
_LocalTempDataStore = (
string
)val2;
}
if
(val3 !=
null
)
{
_FileExtension = (
string
)val3;
}
}
public
void
Save(IPropertyBag pBag,
bool
clearDirty,
bool
saveAllProperties)
{
object
val1 = (
object
)_ImageDataNodeName;
object
val2 = (
object
)_LocalTempDataStore;
object
val3 = (
object
)_FileExtension;
pBag.Write(
"ImageDataNodeName"
, val1);
pBag.Write(
"LocalTempDataStore"
, val2);
pBag.Write(
"FileExtension"
, val3);
}
#endregion
#region IComponent
public
IBaseMessage Execute(IPipelineContext pContext, IBaseMessage inMsg)
{
VirtualStream vs =
new
VirtualStream(inMsg.BodyPart.GetOriginalDataStream());
XmlReader xReader = XmlReader.Create(vs);
inMsg.BodyPart.Data =
new
ExtractorStream(xReader, _FileExtension, _ImageDataNodeName, _LocalTempDataStore);
return
inMsg;
}
#endregion
}
/// <summary>
/// This class is derived from the XmlTranslatorStream class( Present in the BizTalk Streaming Namespace)
/// It overrides some of the functions to replace the image recieved as a base64string in the request to BizTalk with the path to the temp file folder where the image
/// is temporarily stored.
/// </summary>
public
class
ExtractorStream : XmlTranslatorStream
{
private
XmlReader reader;
private
XmlWriter writer;
private
string
fileExtension;
private
string
imageNodeName;
private
string
localTempDataStorePath;
private
XmlReader xReader;
public
ExtractorStream(XmlReader xReader,
string
fileExtension,
string
imageNodeName,
string
localTempDataStorePath)
:
base
(xReader)
{
this
.fileExtension = fileExtension;
this
.imageNodeName = imageNodeName;
this
.localTempDataStorePath = localTempDataStorePath;
Initialise();
}
private
void
Initialise()
{
reader = m_reader;
writer = m_writer;
}
/// <summary>
/// Ovverrides the TranslateStartElement function to check if the current node is the node which contains the image( as base64String)
/// If so, the function, reads the value of the node and saves the base64String as a jpg file in the temporary data store. It then replaces the
/// image string with path to the data store.
/// </summary>
/// <param name="prefix"></param>
/// <param name="localName"></param>
/// <param name="nsURI"></param>
protected
override
void
TranslateStartElement(
string
prefix,
string
localName,
string
nsURI)
{
if
(reader.LocalName == imageNodeName)
{
var document = reader.ReadSubtree();
document.Read();
var data =
new
byte
[4096];
string
uniqueId = System.Guid.NewGuid().ToString();
string
filePath = Path.Combine(localTempDataStorePath, uniqueId +
"."
+ fileExtension);
using
(FileStream fs = File.Open(filePath, FileMode.CreateNew, FileAccess.Write, FileShare.Read))
{
int
bytesRead;
while
((bytesRead = document.ReadElementContentAsBase64(data, 0, data.Length)) > 0)
{
fs.Write(data, 0, bytesRead);
}
fs.Flush();
}
writer.WriteElementString(prefix, imageNodeName, nsURI, filePath);
}
else
{
base
.TranslateStartElement(prefix, localName, nsURI);
}
}
protected
override
void
TranslateEndElement(
bool
full)
{
if
(reader.LocalName == imageNodeName)
{
}
else
{
base
.TranslateEndElement(full);
}
}
}
}
The receive pipeline is created by registering above component to COM and then adding it to the decoder stage followed by an xml disassembler in the Disassemble stage of the pipeline. The Pipeline shown below.
Custom Send Pipeline Using BizTalk Streaming Classes
The streaming classes are located in the the Microsoft.BizTalk.Streaming.dll assembly and should be used while dealing with large messages. Following piece of code uses the XmlTranslator stream to modify the message and it does so at the boundary of the BizTalk so that the messages getting published in the message box are small messages. Following is the code for the send pipeline coder component.
using
System;
using
System.Collections.Generic;
using
System.Linq;
using
System.Text;
using
System.Threading.Tasks;
using
Microsoft.BizTalk.Streaming;
using
Microsoft.BizTalk.Message.Interop;
using
Microsoft.BizTalk.Component.Interop;
using
System.IO;
using
System.Xml;
namespace
ProcessLargeFilesInBizTalk.Components
{
/// <summary>
/// Class to implement the Custom Encoder Pipeline component which in turn uses the XMLTranlsatorStream to modify message contents.
/// This gives the advantage of modifying the message at the boundary of BizTalk and avoid publishing big messages to MSgBoxDatabase
/// </summary>
[ComponentCategory(CategoryTypes.CATID_PipelineComponent)]
[ComponentCategory(CategoryTypes.CATID_Encoder)]
[System.Runtime.InteropServices.Guid(
"DD061000-A85D-4339-BF2A-531F9F4D529D"
)]
public
class
Injector : IBaseComponent, IComponentUI, IComponent, IPersistPropertyBag
{
#region IBaseComponent
private
const
string
_Description =
"Encode Pipeline Component To Inject the Data into Node from file saved By Extractor"
;
private
const
string
_Name =
"ImageInjector"
;
private
const
string
_Version =
"1.0.0.0"
;
public
string
Description
{
get
{
return
_Description; }
}
public
string
Name
{
get
{
return
_Name; }
}
public
string
Version
{
get
{
return
_Version; }
}
#endregion
#region IComponentUI
private
IntPtr _Icon =
new
IntPtr();
public
IntPtr Icon
{
get
{
return
_Icon; }
}
public
System.Collections.IEnumerator Validate(
object
projectSystem)
{
return
null
;
}
#endregion
#region IpersistPropertyBag
private
string
_ImageDataNodeName;
public
string
ImageDataNodeName
{
get
{
return
_ImageDataNodeName; }
set
{ _ImageDataNodeName = value; }
}
public
void
GetClassID(
out
Guid classID)
{
classID =
new
Guid(
"DD061000-A85D-4339-BF2A-531F9F4D529D"
);
}
public
void
InitNew()
{
}
public
void
Load(IPropertyBag pBag,
int
errorLog)
{
object
val1 =
null
;
try
{
pBag.Read(
"ImageDataNodeName"
,
out
val1, 0);
}
catch
(ArgumentException)
{
}
catch
(Exception ex)
{
throw
new
ApplicationException(
"Error Reading Property Bag. Error Message "
+ ex.Message);
}
if
(val1 !=
null
)
{
_ImageDataNodeName = (
string
)val1;
}
}
public
void
Save(IPropertyBag pBag,
bool
clearDirty,
bool
saveAllProperties)
{
object
val1 = (
object
)_ImageDataNodeName;
pBag.Write(
"ImageDataNodeName"
, val1);
}
#endregion
#region IComponent
public
IBaseMessage Execute(IPipelineContext pContext, IBaseMessage inMsg)
{
VirtualStream vs =
new
VirtualStream(inMsg.BodyPart.GetOriginalDataStream());
XmlReader xReader = XmlReader.Create(vs);
inMsg.BodyPart.Data =
new
InjectorStream(xReader,_ImageDataNodeName);
return
inMsg;
}
#endregion
}
/// <summary>
/// This class is derived from the XmlTranslatorStream class( Present in the BizTalk Streaming Namespace)
/// It overrides some of the dinctions to replace the image file location with the the actual file stored at the
/// Physical data store. Once the File is converted to base64 string, and written to the message, The file in the temp data store is deleted
/// </summary>
public
class
InjectorStream : XmlTranslatorStream
{
private
XmlReader reader;
private
XmlWriter writer;
private
string
imageDataNodeName;
/// <summary>
///
/// </summary>
/// <param name="xReader"> Reader created from the virtual stream created from the BizTalk message to SQL adapter</param>
/// <param name="imageDataNodeName">Node with the base64binary type which stores the image data too be inserted into SQL table</param>
public
InjectorStream(XmlReader xReader,
string
imageDataNodeName)
:
base
(xReader)
{
this
.imageDataNodeName = imageDataNodeName;
Initialise();
}
private
void
Initialise()
{
reader = m_reader;
writer = m_writer;
}
/// <summary>
/// Ovverride the TranslateStartElement method to check if the current node is the node which contains the path to the image stored at file store
/// If the current node matches the image node, the method, reads the file from the data store, updates the value of the node
/// usng writer and once done deletes the file from temp folder
/// </summary>
/// <param name="prefix"></param>
/// <param name="localName"></param>
/// <param name="nsURI"></param>
protected
override
void
TranslateStartElement(
string
prefix,
string
localName,
string
nsURI)
{
if
(reader.LocalName == imageDataNodeName)
{
var document = reader.ReadSubtree();
document.Read();
var imageData = String.Empty;
string
tempFilePath = document.ReadElementContentAsString();
if
(File.Exists(tempFilePath))
{
byte
[] imageBytes = File.ReadAllBytes(tempFilePath);
imageData = Convert.ToBase64String(imageBytes);
}
writer.WriteElementString(localName, nsURI, imageData);
File.Delete(tempFilePath);
}
else
{
base
.TranslateStartElement(prefix, localName, nsURI);
}
}
protected
override
void
TranslateEndElement(
bool
full)
{
base
.TranslateEndElement(full);
}
}
}
The send pipeline is created by registering above component in COM and adding it to the encoder stage of the send pipeline. Following is a sample pipeline.
This completes the implementation part of the walkthrough.
Deploying BizTalk Solution
Following are the general steps that are followed to deploy and configure the BizTalk Solution.
- Deploy the BizTalk solution.
- Expose the BizTalk Schema BizTalkServiceTypes as WCF Service.
- Configure the IIS location by mapping the IIS application to a correct Application Pool.
- Once the schemas are exposed as wcf service, a receive location and port will be created in the BizTalk application. Next step is to set up the maximum size the allowed for the messages. Refer to following sample screenshot.
The configuration of the receive pipeline using the XDocument class is shown below.
The configuration of the receive pipeline using the Streaming classes is shown below.
- Import the Binding File generated by Consuming the stored proc. This will created the Sendport to insert the data into SQL. Set the Action Mapping to CompositeOperation as shown below.
The configuration for the send pipeline using streaming classes is shown below.
The configuration for the sen pipeline using streaming classes is shown below.
- In order to take care of the stripped image files in the temporary file store, a PowerShell script can be configured to clear the files out at the end of the business day. A sample script which deletes the files older than 1 day is as follows.
param
(
[Parameter(Mandatory=$true)]
$ThresholdMinutes,
[Parameter(Mandatory=$true)]
$PathFromWhereToDelete
)
if(Test-Path $PathFromWhereToDelete)
{
$currentDate = get-date
Get-ChildItem -Path $PathFromWhereToDelete -File | where-object{$_.LastWriteTime -lt $currentDate.AddMinutes(-$ThresholdMinutes)} | Remove-Item
}
else
{
write-Host (
"Path Not found to delete files from"
)
}
Testing
The testing of the application was done by sending out multiple calls to the BizTalk service each containing a message of size around 20 MB and the memory and response times were noted for each of following scenarios.
- Direct Mapping allowing the entire message to publish into the message box
- Using the Pipleine Components with the XDocument Class
- Using the Pipeline Components with the Streaming Classes
The results are mentioned below.
Direct Mapping Results
Response Time: 32308ms
The response time for the large message and its properties are noted in the SOAP UI request made to the BizTalk WCF service.
Memory Utilization
Following Screen Shot captures the Perfmon counter .
Pipeline Components Using XDocument Class
Response Time: 5227ms
The response time for the large message and its properties are noted in the SOAP UI request made to the BizTalk WCF service.
Pipeline Components Using Streaming Class
Response Time: 5227ms
The response time for the large message and its properties are noted in the SOAP UI request made to the BizTalk WCF service.
Conclusion
Based upon the results from the testing it is clear that the Streaming classes should be used while dealing with large messages to improve the performance. In case a decision needs to be made between XDocument or XmlDocument and Streaming Classes, a decision can be made based upon the size of the message that needs to be processed.
See Also
Following articles can be visited for more reading up on the topics discussed in this article.
- How to Use the BizTalk WCF Service Publishing Wizard to Publish Schemas as WCF Services
- How to Develop BizTalk Custom Pipeline Components – Part1
- Run composite operations on SQL Server using BizTalk Server
- BizTalk Server 2013: CRUD Operation With WCF-SQL Adapter and Correlation
- BizTalk Server: SQL Patterns for Polling and Batch Retrieve
- Windows PowerShell Basics
In order to learn more about the BizTalk product, a good place to start is BizTalk Server Resources on the TechNet Wiki
References
Following articles were referred while writing this article.
- BizTalk Server: Processing large files (streaming) by Eldert Grootenboer
- BizTalk Server: Custom pipeline optimization using the Virtual Stream class by Steef-Jan Wiggers
- Microsoft.BizTalk.Streaming Namespace
- Optimizing Memory Usage with Streaming