Need to archive messages when they enter BizTalk 2004? Use the archiving pipeline component!

Regularly, in microsoft.public.biztalk newsgroups, folks would like to archive messages as they are received by BizTalk, before any further processing. There are numerous ways to achieve this and the most obvious ones (non exhaustive list):

  1. When the orchestration starts, have it send the message to a "archive" port.
  2. Create a custom pipeline decoder (or encoder) that just saves whatever it sees and insert it into the receive/send pipeline of interest.

Solution 1 is easy to implement and has very good performances. After all, BizTalk is optimized to send messages. It is great if you want to save messages as they start to be processed. However, if you receive flat files and disassemble them in a pipeline, this solution will only save the message received by the orchestration, not the actual flat file at the time of reception. Also, if a message never reaches an orchestration, it won't be archived. Also, maybe the most obvious shortcoming is that to implement this solution, you need an orchestration. In some Content Based Routing, BizTalk is used without any orchestration and writing an orchestration for this might be overkill.

Solution 2 is more complex to implement (you need to write a custom pipeline) but offer a greater level of flexibility. Firs of all, you save the message as it enters, so you will get it archived. Second, you can reuse the same component for send and receive. Third, you do not need to modify the orchestration which is very useful if you cannot re-compile/re-deploy the orchestration.

Anyway, Eric (in this post) seems to need a custom pipeline component which archives messages. Let's give him a hand.

We would like to be able to save messages when they enter a pipeline as well as when they are about to leave a pipeline. We will then mark our component to run as a decoder or an encoder. This is achieved by marking the class which will implement our custom pipeline with the following attributes:

[ComponentCategory(CategoryTypes.CATID_Decoder)]
[ComponentCategory(CategoryTypes.CATID_Encoder)]

As with most pipeline components, the Execute method is where the work happens. A possible implementation among many others is presented below:

public IBaseMessage Execute(IPipelineContext pc, IBaseMessage inmsg)
{
Stream originalStrm = null;
FileStream fileArchive = null;
BinaryWriter binWriter = null;

try
{
IBaseMessagePart bodyPart = inmsg.BodyPart;
if (bodyPart != null)
{
// Build the archive file path, processing embedded macros, if any
string path = UniqueFileName + "." + Extension;
path = path.Replace("%MessageID%", inmsg.MessageID.ToString());

// Get a *copy* of the original input stream [EDIT 09/17/2004: Use a copy of the stream]
originalStrm = bodyPart.Data;

// Dump the data to the file, for archiving
fileArchive = new FileStream(path, FileMode.Create, FileAccess.Write);
binWriter = new BinaryWriter(fileArchive);

// Read the stream and send bytes to the file
byte[] buffer = new byte[bufferSize];
int sizeRead = 0;

while ((sizeRead = originalStrm.Read(buffer, 0, bufferSize)) != 0)
{
binWriter.Write(buffer, 0, sizeRead);
}
}
}
catch(Exception ex)
{
// Do not fail the pipeline if we cannot archive
// Just log the failure to the event log
System.Diagnostics.EventLog.WriteEntry(ex.Source, ex.Message, EventLogEntryType.Error);
}
finally
{
// Flush()/Close() the output stream
if (binWriter != null) { binWriter.Flush(); binWriter.Close(); }

// [EDIT 09/17/2004 No need to rewind the stream since we used a copy of it]
}

return inmsg;
}

Basically, we save the content of the body to a file which is specified by "UniqueFileName" and "Extension". One important things: we use a finally construct to ensure that the output stream is closed even in case of an error. Also, observe that we swallow most errors (not all, but most) and we log them to the event log. This means that the message can be processed even if we did not archive it. You can change this behavior if it is not adequate for your application.

[EDIT 09/17/2004] We do not need to rewind any streams since BodyPart.Data returns a copy of the inbound stream (for pipelines only) as explained here.

What could be improved in this sample?

You can download the zipped solution here . This is sample quality and iot is provided "as is" with no warranty of any sort. There are a few things that could be improved:

  • Save context properties with the file,
  • Save all parts of the message and not only the body,
  • More file name macros,
  • Implement some kind of filtering (i.e. save files which are > 100Kb for instance),
  • Expose "bufferSize" as a property or enve better, attempt to determine the optimal size of the buffer depending on the size of the incoming message.

Comments

  • Anonymous
    July 12, 2004
    The comment has been removed
  • Anonymous
    July 12, 2004
    The comment has been removed
  • Anonymous
    July 21, 2004
    Gilles this is great. Just what I was looking for for a long time. Now I am able to archive messages before they enter my orchestration. (which in my opinion is the wrong place for an achive action!)
  • Anonymous
    April 06, 2008
    The comment has been removed
  • Anonymous
    January 21, 2009
    PingBack from http://www.keyongtech.com/323062-receive-port-question