WCF Extensibility – Transport Channels – Request Channels, part 1
This post is part of a series about WCF extensibility points. For a list of all previous posts and planned future ones, go to the index page .
This is the part 1 of 3 of a “mini-series” inside the main series. For the other parts, here’s the list.
- Transport Channels – Request Channels, part 1: Basic, synchronous-only IRequestChannel implementation
- Transport Channels – Request Channels, part 2: Service model support for the sample
- Transport Channels – Request Channels, part 3: Asynchronous support for the channel
Back to the series after a longer than expected delay, we’ll conclude the section about the WCF channels with the likely most complex part of the WCF pipeline – transport channels. I’ve been struggling mightily trying to create an example of a complete transport channel (and I decided that, although I work literally in the same building / floor as the people in the WCF team which own this area, I wouldn’t ask them for help because a “normal” developer wouldn’t have this direct connection to rely on), so to prevent further delays I decided to break this post in multiple ones (since I already have some scenarios working and I didn’t want to delay even more). So this first part is about the client part of a transport channel (which will be a request channel, the simplest of them), and only with synchronous messaging support; as this “mini-series” progresses, I’ll make the example more complete with asynchronous support, and then onto more complex channel shapes, including reply channels (server side) and duplex channels.
Transport channels are at the base of the channel stack in WCF (see post about channels for a quick recap). Unlike the other protocol channels which I exemplified in the channel posts, transports cannot simply wrap an underlying channel and just pass messages around with some modifications. Actually, you could in theory wrap the transport channels themselves, but transport channels are supposed to be written to support a different protocol (i.e., a socket-based one), so wrapping an existing transport doesn’t solve this issue.
The biggest issue with writing transport channels in general is that, besides working with the underlying transport (bytes), there is a lot of asynchronous code – and cascading asynchronous calls. As I’ve mentioned before, chaining asynchronous calls is hard, and quite error-prone (thankfully, with the new Task Parallel Library this is made a lot simpler, but since WCF was created before that, we still have to deal with lots of Begin/End calls and IAsyncResults). Transport channels also follow the “normal” channel pattern, so we need to have first a binding element, then a channel factory (or listener for the server side, although IRequestChannels are only used in client scenarios), which then can create the actual channel. The sample in this post will go through the steps required to create one such channel.
Public implementations in WCF
None. As with most channel extensibility points, there are no public implementations in WCF. There are many internal implementations, such as the one used by the HTTP transport.
Interface definition
- public interface IRequestChannel : IChannel, ICommunicationObject
- {
- IAsyncResult BeginRequest(Message message, AsyncCallback callback, object state);
- IAsyncResult BeginRequest(Message message, TimeSpan timeout, AsyncCallback callback, object state);
- Message EndRequest(IAsyncResult result);
- Message Request(Message message);
- Message Request(Message message, TimeSpan timeout);
- EndpointAddress RemoteAddress { get; }
- Uri Via { get; }
- }
Let’s start by the properties. RemoteAddress returns the address to which the channel is sending messages to, while Via is the transport address to which the request is sent. They are usually the same, but in cases where a message goes through multiple hops before reaching its final destination (i.e., in a proxying or routing scenario), the Via property points to the “next step”, while RemoteAddress points to the final destination of the message.
The methods are straightforward – take a message, return a message. We have two versions, one synchronous and another asynchronous. The synchronous Request methods (one which takes an explicit timeout, another which assumes the channel default timeout), and the asynchronous pair BeginRequest (again, one with an explicit timeout, one with the default one) / EndRequest, take a message as an input, and after sending it to the server, return the response it received.
Besides those members, classes which implement IRequestChannel must also implement the members from the two derived interfaces (IChannel and ICommunicationObject). In practice, however, most implementations of channels simply derive from ChannelBase, which implements does most of the heavy lifting required for implementing ICommunicationObject (such as dealing with events and some timeouts). But there is still a lot of work to be done, as shown in the abstract methods which need to be implemented below.
- public abstract class ChannelBase : CommunicationObject, IChannel, ICommunicationObject, IDefaultCommunicationTimeouts
- {
- protected ChannelBase(ChannelManagerBase channelManager);
- // From CommunicationObject
- protected abstract void OnAbort();
- protected abstract IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state);
- protected abstract IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state);
- protected abstract void OnClose(TimeSpan timeout);
- protected virtual void OnClosed();
- protected virtual void OnClosing();
- protected abstract void OnEndClose(IAsyncResult result);
- protected abstract void OnEndOpen(IAsyncResult result);
- protected virtual void OnFaulted();
- protected abstract void OnOpen(TimeSpan timeout);
- protected virtual void OnOpened();
- protected virtual void OnOpening();
- }
The abstract methods are all about the channel lifecycle management: opening, closing, aborting and faulting. As with other methods at this layer, there are both the synchronous and asynchronous variations for opening and closing (aborting and faulting are synchronous, which means those operations are meant to be executed quite fast.
How to add a transport channel
Like other channels, transport channels are added by using a channel factory (client) / or channel listener (server), which are created by a custom binding element. The binding element corresponding to a transport channel must be derived from the TransportBindingElement class. Besides the normal properties from the base BindingElement class, the transport binding element also defines a Scheme property, which needs to return the URI scheme for the transport. Take a look at the sample code for an example of defining a request transport channel.
Real world scenario: consuming a JSON-RPC service
A while back a customer was trying to find a way to create a service which could send JSON data over TCP (not HTTP), to implement the JSON-RPC protocol. It’s a simple protocol which uses JSON messages to invoke operations in a RPC fashion. For this post, I’ll have a simple service implemented using plain sockets, and implement a transport request channel which can be used to communicate to that server. On the next posts I’ll move to the server side as well.
To start, we need to decide how the messages will be framed. Since TCP is a byte-oriented protocol (instead of a message-based protocol, such as HTTP), we need to have some sort of framing (or protocol) to delimit the messages. In HTTP, for example, the message is composed of header and body; the header is terminated with a pair of CRLF bytes, and the body is terminated based on a size information from the header (or in the body itself, in case of chunked transfer). Since all requests (and responses) in a JSON-RPC interaction are JSON objects (as opposed to arrays or primitives) we could use the object delimiter (the closing ‘}’ after the last member) itself as the message boundary, but this would make the parsing logic more complex. In this example, I’ll define a simpler protocol which can be used not only for JSON-RPC, but for any communication, in which the first 4 bytes of every message represent the size S of the payload, and then there are additional S bytes following the length (I think this is actually a valid implementation of the JSON-RPC specification, since it doesn’t enforce any framing mechanism). So below you can see one request for a typical calculator service. The first 4 bytes indicate the length of the payload (0x29 bytes), and the next 41 bytes consist of the JSON-RPC request itself.
00 00 00 29 7B 22 6D 65 74 68 6F 64 22 3A 22 41 ...){"me thod":"A
64 64 22 2C 22 70 61 72 61 6D 73 22 3A 5B 34 34 dd","par ams":[44
2C 20 35 35 5D 2C 22 69 64 22 3A 31 7D , 55],"i d":1}
Now we can start implementing the transport itself. But before going further, here goes the usual disclaimer: this is a sample for illustrating the topic of this post, this is not production-ready code. I tested it for a few contracts and it worked, but I cannot guarantee that it will work for all scenarios (please let me know if you find a bug or something missing). I really kept the error checking to a minimum (especially in the “simple service” project and in many other helper functions whose inputs should be sanitized, to make the sample small.
First, the binding element. As I mentioned before, we need a type which derives from TransportBindingElement, and a scheme for this new protocol. Since our payload is based on a size, I’ll use “sized.tcp” as the scheme for this new transport.
- public class SizedTcpTransportBindingElement : TransportBindingElement
- {
- public const string SizedTcpScheme = "sized.tcp";
- public SizedTcpTransportBindingElement()
- : base()
- {
- }
- public SizedTcpTransportBindingElement(SizedTcpTransportBindingElement other)
- : base(other)
- {
- }
- public override string Scheme
- {
- get { return SizedTcpScheme; }
- }
- public override BindingElement Clone()
- {
- return new SizedTcpTransportBindingElement(this);
- }
- }
Now the binding element class is done, but it doesn’t do anything useful. We need then to create the appropriate channel factory when BuildChannelFactory<TChannel> is invoked. Another thing which I’m overriding in this class is the GetProperty<T> property to tell WCF that this transport cannot be used with SOAP messages, only with non-SOAP ones.
- public override bool CanBuildChannelFactory<TChannel>(BindingContext context)
- {
- return typeof(TChannel) == typeof(IRequestChannel);
- }
- public override IChannelFactory<TChannel> BuildChannelFactory<TChannel>(BindingContext context)
- {
- return (IChannelFactory<TChannel>)(object)new SizedTcpChannelFactory(this, context);
- }
- public override T GetProperty<T>(BindingContext context)
- {
- if (typeof(T) == typeof(MessageVersion))
- {
- return (T)(object)MessageVersion.None;
- }
- return base.GetProperty<T>(context);
- }
Now moving on to the channel factory. As I mentioned in the post about encoders, a transport channel can use the encoder from the context binding parameters, but it doesn’t really have to do that. In this case, all the messages don’t follow any specific format, they’re simply bytes which are preceded by a length, so I really don’t want to let the user choose any encoding of his/her choice. The implementation could simply ignore the message encoder in the context, but it’s always good practice to warn the user if we’re ignoring something that they do, so that’s what is being done here. The new encoding ByteStreamMessageEncodingBindingElement (added in .NET 4.0) is perfect for this scenario, since it simply converts between bytes and a non-SOAP message which holds on to those bytes (using the same format as the raw encoder from the WebMessageEncodingBindingElement: a <Binary> XML element wrapping the bytes as a xs:base64Binary node). If the user uses another encoding in their binding, we’ll throw to warn them that this is an invalid combination. And if the user doesn’t choose any encoder, we’ll add one ourselves. Another thing which the channel factory will create is a BufferManager, which will manage a pool of buffers as required by the WCF channel model.
- class SizedTcpChannelFactory : ChannelFactoryBase<IRequestChannel>
- {
- BufferManager bufferManager;
- MessageEncoderFactory encoderFactory;
- public SizedTcpChannelFactory(SizedTcpTransportBindingElement bindingElement, BindingContext context)
- : base(context.Binding)
- {
- // populate members from binding element
- int maxBufferSize = (int)bindingElement.MaxReceivedMessageSize;
- this.bufferManager = BufferManager.CreateBufferManager(bindingElement.MaxBufferPoolSize, maxBufferSize);
- Collection<MessageEncodingBindingElement> messageEncoderBindingElements
- = context.BindingParameters.FindAll<MessageEncodingBindingElement>();
- if (messageEncoderBindingElements.Count > 1)
- {
- throw new InvalidOperationException("More than one MessageEncodingBindingElement was found in the BindingParameters of the BindingContext");
- }
- else if (messageEncoderBindingElements.Count == 1)
- {
- if (!(messageEncoderBindingElements[0] is ByteStreamMessageEncodingBindingElement))
- {
- throw new InvalidOperationException("This transport must be used with the ByteStreamMessageEncodingBindingElement.");
- }
- this.encoderFactory = messageEncoderBindingElements[0].CreateMessageEncoderFactory();
- }
- else
- {
- this.encoderFactory = new ByteStreamMessageEncodingBindingElement().CreateMessageEncoderFactory();
- }
- }
- }
Now it’s time for managing the lifecycle of the channel factory. The ChannelFactoryBase<TChannel> class deals with most of the internal boilerplates, but we still need to override the methods to open the factory. Since our transport will only create a socket when we actually need to create a channel, we don’t need to do anything while opening the factory. But here’s where the asynchronous operations start to make things more complex – we need to deal with IAsyncResult objects even if we don’t want to do anything. Fortunately, some of the official WCF samples provide some nice implementations of a base async result class (for example, in the UDP Transport sample), and one of the classes is a “CompletedAsyncResult”, which signals the completion right after it’s created. For this sample, I copied the code almost verbatim (I only changed the namespace in the file) from that sample.
- protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)
- {
- this.OnOpen(timeout);
- return new CompletedAsyncResult(callback, state);
- }
- protected override void OnEndOpen(IAsyncResult result)
- {
- CompletedAsyncResult.End(result);
- }
- protected override void OnOpen(TimeSpan timeout)
- {
- }
Then, we move on to creating the channel, to which we pass the encoder which will be used to convert between the bytes in the socket and the Message objects, the buffer manager used to create the buffers used to read from the socket and to write to it, and the remote address and via, which were passed by the caller.
- protected override IRequestChannel OnCreateChannel(EndpointAddress address, Uri via)
- {
- return new SizedTcpRequestChannel(
- this.encoderFactory.Encoder,
- this.bufferManager,
- this,
- address,
- via);
- }
Finally onto the transport channel itself. Since I mentioned I’ll have more complex shapes in the next posts (including reply channels and duplex channels), I’ll create a base class first which deals with the socket communication and the encoding part, leaving the actual channel class (in this case, SizedTcpRequestChannel) quite simple and (as much as a transport channel can be) easy to understand. Below is the boilerplate for this base channel. It contains a reference to a socket which will be set by the derived classes (the initialization is different depending on whether we’re dealing with a channel in the client or in the server side). The ChannelBase class deals with most of the hard code, but we still need to deal with open / close / abort scenarios, which, in this case, are delegated to the socket itself.
- class SizedTcpBaseChannel : ChannelBase
- {
- const int maxBufferSize = 64 * 1024;
- Socket socket;
- MessageEncoder encoder;
- BufferManager bufferManager;
- public SizedTcpBaseChannel(MessageEncoder encoder, BufferManager bufferManager, ChannelManagerBase channelManager)
- : base(channelManager)
- {
- this.encoder = encoder;
- this.bufferManager = bufferManager;
- }
- protected void InitializeSocket(Socket socket)
- {
- if (this.socket != null)
- {
- throw new InvalidOperationException("Socket is already set");
- }
- this.socket = socket;
- }
- protected override void OnAbort()
- {
- if (this.socket != null)
- {
- socket.Close(0);
- }
- }
- protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state)
- {
- this.OnClose(timeout);
- return new CompletedAsyncResult(callback, state);
- }
- protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)
- {
- this.OnOpen(timeout);
- return new CompletedAsyncResult(callback, state);
- }
- protected override void OnClose(TimeSpan timeout)
- {
- this.socket.Close((int)timeout.TotalMilliseconds);
- }
- protected override void OnEndClose(IAsyncResult result)
- {
- CompletedAsyncResult.End(result);
- }
- protected override void OnEndOpen(IAsyncResult result)
- {
- CompletedAsyncResult.End(result);
- }
- protected override void OnOpen(TimeSpan timeout)
- {
- }
- }
But this base channel still doesn’t do anything useful. Let’s now start adding some functions which can be used to send a message from this channel to the other party. SendMessage first encodes the message using the encoder passed to the class by the channel factory / listener. It then calls WriteData to write the encoded message to the wire. On WriteData we first update the buffer with the 4-byte length prefix, then we send it over the socket. The socket send function is trivial and can be found in the code for this post on the gallery.
- public void SendMessage(Message message, TimeSpan timeout)
- {
- base.ThrowIfDisposedOrNotOpen();
- ArraySegment<byte> encodedBytes = default(ArraySegment<byte>);
- try
- {
- encodedBytes = this.EncodeMessage(message);
- this.WriteData(encodedBytes);
- }
- catch (SocketException socketException)
- {
- throw ConvertSocketException(socketException, "Send");
- }
- finally
- {
- if (encodedBytes.Array != null)
- {
- this.bufferManager.ReturnBuffer(encodedBytes.Array);
- }
- }
- }
- ArraySegment<byte> EncodeMessage(Message message)
- {
- try
- {
- return encoder.WriteMessage(message, maxBufferSize, bufferManager);
- }
- finally
- {
- // we've consumed the message by serializing it, so clean up
- message.Close();
- }
- }
- private void WriteData(ArraySegment<byte> data)
- {
- ArraySegment<byte> toSend = this.AddLengthToBuffer(data);
- try
- {
- this.SocketSend(toSend);
- }
- finally
- {
- this.bufferManager.ReturnBuffer(toSend.Array);
- }
- }
- ArraySegment<byte> AddLengthToBuffer(ArraySegment<byte> data)
- {
- byte[] fullBuffer = this.bufferManager.TakeBuffer(data.Count + 4);
- Formatting.SizeToBytes(data.Count, fullBuffer, 0);
- Array.Copy(data.Array, data.Offset, fullBuffer, 4, data.Count);
- this.bufferManager.ReturnBuffer(data.Array);
- return new ArraySegment<byte>(fullBuffer, 0, 4 + data.Count);
- }
Receiving messages is similar. We first read the 4 bytes from the socket which indicate the length of the “payload”, then we read that many bytes from the payload and ask the encoder to decode those bytes into a Message object. Again, the socket methods are left for the reader to look in the gallery.
- public Message ReceiveMessage(TimeSpan timeout)
- {
- base.ThrowIfDisposedOrNotOpen();
- try
- {
- ArraySegment<byte> encodedBytes = this.ReadData();
- return this.DecodeMessage(encodedBytes);
- }
- catch (SocketException socketException)
- {
- throw ConvertSocketException(socketException, "Receive");
- }
- }
- ArraySegment<byte> ReadData()
- {
- // 4 bytes length
- byte[] preambleBytes = this.SocketReceiveBytes(4, false);
- if (preambleBytes == null)
- {
- return new ArraySegment<byte>();
- }
- int dataLength = Formatting.BytesToSize(preambleBytes, 0);
- byte[] data = this.SocketReceiveBytes(dataLength);
- this.bufferManager.ReturnBuffer(preambleBytes);
- return new ArraySegment<byte>(data, 0, dataLength);
- }
- protected virtual Message DecodeMessage(ArraySegment<byte> data)
- {
- if (data.Array == null)
- {
- return null;
- }
- else
- {
- return this.encoder.ReadMessage(data, bufferManager);
- }
- }
We can finally look into the request channel. The first thing that this channel need to do is to set the socket in the base channel, and this is done when the channel is being opened – we find one of the addresses of the host in the Via property (whose value was passed to it by the factory), and connect a socket to it. Once we found the actual IP address which is being used by the server, we can then initialize the base channel with the connected socket.
Next, we can implement the Request method (the overload without the timeout simply calls the one with it passing the channel default send timeout, which was inherited from the binding object). A synchronous implementation is fairly straightforward – first send the request to the server, then receive its response. Notice that I’m cheating here a little in which I’m using the same timeout for the two operations (send / receive), effectively doubling the actual timeout passed by the caller. A better implementation would only pass to the ReceiveMessage the remaining time after discounting the time spend sending the message, but for the sake of simplicity, I’ll allow myself to take this shortcut here.
- class SizedTcpRequestChannel : SizedTcpBaseChannel, IRequestChannel
- {
- Uri via;
- EndpointAddress remoteAddress;
- public SizedTcpRequestChannel(MessageEncoder encoder, BufferManager bufferManager, ChannelManagerBase channelManager, EndpointAddress remoteAddress, Uri via)
- : base(encoder, bufferManager, channelManager)
- {
- this.via = via;
- this.remoteAddress = remoteAddress;
- }
- #region IRequestChannel Members
- public IAsyncResult BeginRequest(Message message, TimeSpan timeout, AsyncCallback callback, object state)
- {
- throw new NotImplementedException("Still to be implemented");
- }
- public IAsyncResult BeginRequest(Message message, AsyncCallback callback, object state)
- {
- return this.BeginRequest(message, base.DefaultSendTimeout, callback, state);
- }
- public Message EndRequest(IAsyncResult result)
- {
- throw new NotImplementedException("Still to be implemented");
- }
- public EndpointAddress RemoteAddress
- {
- get { return this.remoteAddress; }
- }
- public Message Request(Message message, TimeSpan timeout)
- {
- base.SendMessage(message, timeout);
- return base.ReceiveMessage(timeout);
- }
- public Message Request(Message message)
- {
- return this.Request(message, base.DefaultSendTimeout);
- }
- public Uri Via
- {
- get { return this.via; }
- }
- #endregion
- protected override void OnOpen(TimeSpan timeout)
- {
- this.Connect();
- base.OnOpen(timeout);
- }
- void Connect()
- {
- Socket socket = null;
- int port = Via.Port;
- if (port == -1)
- {
- port = 8000; // the default port for sized.tcp
- }
- IPHostEntry hostEntry;
- try
- {
- hostEntry = Dns.GetHostEntry(this.Via.Host);
- }
- catch (SocketException socketException)
- {
- throw new EndpointNotFoundException("Unable to resolve host: " + this.Via.Host, socketException);
- }
- for (int i = 0; i < hostEntry.AddressList.Length; i++)
- {
- try
- {
- IPAddress address = hostEntry.AddressList[i];
- socket = new Socket(address.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
- socket.Connect(new IPEndPoint(address, port));
- break;
- }
- catch (SocketException socketException)
- {
- if (i == hostEntry.AddressList.Length - 1)
- {
- throw ConvertSocketException(socketException, "Connect");
- }
- }
- }
- base.InitializeSocket(socket);
- }
- }
Now the channel is ready to be used. As long as we don’t use any of its asynchronous capabilities, it should work just fine, so let’s craft a contract to try it out. We also need a service – and I wrote a simple sockets-based service (i.e., no WCF in the picture) to validate that we can use this channel in a WCF-based client to it. I won’t paste the code for it here, but it can be seen in the code gallery. As far as the client goes, since we don’t have any formatters which know how to convert between the JSON-RPC format and operations, we need to work with messages themselves at this point.
- [ServiceContract]
- public interface IUntypedTest
- {
- [OperationContract(Action = "*", ReplyAction = "*")]
- Message Process(Message input);
- }
Now we can put that client in place:
- static void Main(string[] args)
- {
- int port = 8000;
- SocketsServer server = new SocketsServer(port, new CalculatorService());
- server.StartServing();
- Console.WriteLine("Started the simple server");
- CustomBinding binding = new CustomBinding(new SizedTcpTransportBindingElement());
- EndpointAddress address = new EndpointAddress(
- SizedTcpTransportBindingElement.SizedTcpScheme + "://localhost:" + port);
- ChannelFactory<IUntypedTest> factory = new ChannelFactory<IUntypedTest>(binding, address);
- IUntypedTest proxy = factory.CreateChannel();
- string[] allInputs = new string[]
- {
- "{\"method\":\"Add\",\"params\":[5, 8],\"id\":1}",
- "{\"method\":\"Multiply\",\"params\":[5, 8],\"id\":2}",
- "{\"method\":\"Divide\",\"params\":[5, 0],\"id\":3}",
- };
- foreach (string input in allInputs)
- {
- byte[] inputBytes = Encoding.UTF8.GetBytes(input);
- Console.WriteLine("Input: {0}", input);
- Message inputMessage = Formatting.BytesToMessage(inputBytes);
- Message outputMessage = proxy.Process(inputMessage);
- Console.WriteLine("Received output: {0}", outputMessage);
- byte[] outputBytes = Formatting.MessageToBytes(outputMessage);
- Console.WriteLine("Output bytes:");
- Debugging.PrintBytes(outputBytes);
- }
- }
Running it, we get this output below (I also added some tracing in the server) for the first input.
Started the simple server
Input: {"method":"Add","params":[5, 8],"id":1}
Connection from 127.0.0.1.54424
Length: 39
7B 22 6D 65 74 68 6F 64 22 3A 22 41 64 64 22 2C {"method ":"Add",
22 70 61 72 61 6D 73 22 3A 5B 35 2C 20 38 5D 2C "params" :[5, 8],
22 69 64 22 3A 31 7D "id":1}
Received output: <Binary>eyJyZXN1bHQiOjAsImVycm9yIjpudWxsLCJpZCI6MX0=</Binary>
Output bytes:
7B 22 72 65 73 75 6C 74 22 3A 30 2C 22 65 72 72 {"result ":0,"err
6F 72 22 3A 6E 75 6C 6C 2C 22 69 64 22 3A 31 7D or":null ,"id":1}
What do we have here? A full WCF client, using our new transport channel, sending a message and receiving a response from a JSON-RPC server. This is still quite coarse, but at least it works.
Coming up
We’ll finish the request channel implementation with full asynchronous support. And we’ll add some service model support to sample so that we can call the operations themselves instead of having to deal with messages directly.
Comments
- Anonymous
December 11, 2011
Thanks for your good illustration and demonstrate the actual usage of 'ByteStreamMessageEncodingBindingElement' in the post.