WCF Extensibility – Transport Channels – Request Channels, part 3
This post is part of a series about WCF extensibility points. For a list of all previous posts and planned future ones, go to the index page .
This is the part 3 of 3 of a “mini-series” inside the main series. For the other parts, here’s the list.
- Transport Channels – Request Channels, part 1: Basic, synchronous-only IRequestChannel implementation
- Transport Channels – Request Channels, part 2: Service model support for the sample
- Transport Channels – Request Channels, part 3: Asynchronous support for the channel
Just like we did on the previous post, let’s jump right back into the scenario (for a scenario description, go back to the part 1). After the detour through the service model, we’ll dive back to the transport channel, to implement the asynchronous support for the request channel used to consume the service. I’ll also share some ideas for testing such operations.
Real world scenario: consuming a JSON-RPC service
At the previous post, we had a transport channel plus an endpoint behavior, which, together, could be used to call a simple JSON-RPC service. Now, let’s finish the implementation of the request channel with support for asynchronous requests. This is important in scenarios such as when that transport is to be used in UI threads (so we don’t block that thread and make the application “hang” while the request is being made), or in platforms such as Silverlight, in which synchronous networking requests simply don’t exist.
Since we’re going to enter the asynchronous world, testing each part of the communication as it’s implemented is really important. I’ve made the mistake in the past of trying to implement all the asynchronous operations, then trying to use it within a WCF channel. Things would blow up in a worker thread, and tracking down what was going on after all the asynchronous callbacks wasn’t something fun. So I strongly recommend to have some sort of unit test framework which can be used to validate the individual pieces of the system. I’d usually go with MSTest (the unit test framework which comes with Visual Studio), but since at home I use the express editions and it’s not supported, I decided to go with xUnit (one I’m use to and it’s free / open source; others would also be good) for this.
But before going further, here goes the usual disclaimer: this is a sample for illustrating the topic of this post, this is not production-ready code. I tested it for a few contracts and it worked, but I cannot guarantee that it will work for all scenarios (please let me know if you find a bug or something missing). I really kept the error checking to a minimum (especially in the “simple service” project and in many other helper functions whose inputs should be sanitized, such as the reflection helper), to make the sample small (or as small as a fully synchronous and asynchronous transport channel can be). Also, the timeout handling in this sample is far from ideal (in some cases they’re simply ignored, in other they’re interpreted very liberally).
So before I even attempted to write the first asynchronous operation, I started a simple unit test project. And since most of the operations (and the base class itself) were private / internal, I used a simple reflection helper to test the internal methods, as shown below.
- static class ReflectionHelper
- {
- public static object CreateInstance(Type publicTypeFromAssembly, string typeName,
- params object[] constructorArgs)
- {
- Type type = publicTypeFromAssembly.Assembly.GetType(typeName);
- ConstructorInfo ctor = FindConstructor(type, constructorArgs);
- return ctor.Invoke(constructorArgs);
- }
- public static object CallMethod(object obj, string methodName, params object[] methodArgs)
- {
- MethodInfo method = FindMethod(obj.GetType(), methodName, methodArgs);
- return method.Invoke(obj, methodArgs);
- }
- public static void SetField(object obj, string fieldName, object fieldValue)
- {
- FieldInfo field = FindField(obj.GetType(), fieldName);
- field.SetValue(obj, fieldValue);
- }
- public static object GetField(object obj, string fieldName)
- {
- FieldInfo field = FindField(obj.GetType(), fieldName);
- return field.GetValue(obj);
- }
- }
In order to test the socket operations, I also wrote a simple socket server which simply returned whatever data was sent to it (according to the same framing protocol, 4-byte length + data, as the transport channel), to validate our client implementation. I’ll leave the simple server out of this post, but it is in the code from the gallery. Also, to test socket operations we need both send / receive, so to have a starting point for the test, the first test I wrote didn’t use anything asynchronous at all, was a simple synchronous send/receive (it was useful to test my simple server as well).
- [Fact]
- public void SynchronousSendReceive()
- {
- byte[] length = new byte[4];
- byte[] data = new byte[36];
- Random rndGen = new Random();
- rndGen.NextBytes(data);
- Formatting.SizeToBytes(data.Length, length, 0);
- byte[] toSend = new byte[length.Length + data.Length];
- Array.Copy(length, 0, toSend, 0, length.Length);
- Array.Copy(data, 0, toSend, length.Length, data.Length);
- object channel = ReflectionHelper.CreateInstance(
- typeof(SizedTcpTransportBindingElement),
- "JsonRpcOverTcp.Channels.SizedTcpBaseChannel",
- null,
- BufferManager.CreateBufferManager(int.MaxValue, int.MaxValue),
- Mocks.GetChannelManagerBase());
- Socket socket = Mocks.GetConnectedSocket(Port);
- ReflectionHelper.SetField(channel, "socket", socket);
- ReflectionHelper.CallMethod(channel, "SocketSend", toSend);
- ReflectionHelper.CallMethod(channel, "SocketReceiveBytes", toSend.Length);
- Assert.Equal(2, this.server.ReceivedBytes.Count);
- Assert.Equal(length, this.server.ReceivedBytes[0], new ArrayComparer<byte>());
- Assert.Equal(data, this.server.ReceivedBytes[1], new ArrayComparer<byte>());
- socket.Close();
- }
Having that test ready gave me more confidence that the synchronous operations were working, so I could start testing the asynchronous ones individually (i.e., first test / implement the asynchronous Send, and test it with the synchronous receive, which I “knew” was working). And here goes the first test prior to the implementation (in a TDD fashion which I should have used from the start, but that’s another story). The test is exactly like the previous one, but the SocketSend method is split in a Begin/End pair. A simple event is used to wait in the main thread until the asynchronous part is done.
- [Fact]
- public void AsynchronousSendSynchronousReceive()
- {
- byte[] length = new byte[4];
- byte[] data = new byte[36];
- Random rndGen = new Random();
- rndGen.NextBytes(data);
- Formatting.SizeToBytes(data.Length, length, 0);
- byte[] toSend = new byte[length.Length + data.Length];
- Array.Copy(length, 0, toSend, 0, length.Length);
- Array.Copy(data, 0, toSend, length.Length, data.Length);
- ManualResetEvent evt = new ManualResetEvent(false);
- object channel = ReflectionHelper.CreateInstance(
- typeof(SizedTcpTransportBindingElement),
- "JsonRpcOverTcp.Channels.SizedTcpBaseChannel",
- null,
- BufferManager.CreateBufferManager(int.MaxValue, int.MaxValue),
- Mocks.GetChannelManagerBase());
- Socket socket = Mocks.GetConnectedSocket(Port);
- ReflectionHelper.SetField(channel, "socket", socket);
- object state = new object();
- bool success = true;
- ReflectionHelper.CallMethod(channel, "BeginSocketSend", toSend, new AsyncCallback(delegate(IAsyncResult asyncResult)
- {
- try
- {
- if (!Object.ReferenceEquals(asyncResult.AsyncState, state))
- {
- success = false;
- Console.WriteLine("Error, state not preserved");
- }
- else
- {
- ReflectionHelper.CallMethod(channel, "EndSocketSend", asyncResult);
- ReflectionHelper.CallMethod(channel, "SocketReceiveBytes", toSend.Length);
- try
- {
- Assert.Equal(2, this.server.ReceivedBytes.Count);
- Assert.Equal(length, this.server.ReceivedBytes[0], new ArrayComparer<byte>());
- Assert.Equal(data, this.server.ReceivedBytes[1], new ArrayComparer<byte>());
- }
- catch (Exception e)
- {
- Console.WriteLine("Error: " + e);
- success = false;
- }
- }
- }
- finally
- {
- evt.Set();
- }
- }), state);
- evt.WaitOne();
- Assert.True(success, "Error in callback");
- socket.Close();
- }
And we can now finally start implementing the operations. First, [Begin/End]SocketSend. As will be the pattern in all the implementations here, we’ll create a new class which implements IAsyncResult (actually, a class derived from the helper class AsyncResult, shown in the first post, which in turn implements that interface). And that class will be the one which will do all the work.
- IAsyncResult BeginSocketSend(byte[] buffer, AsyncCallback callback, object state)
- {
- return BeginSocketSend(new ArraySegment<byte>(buffer), callback, state);
- }
- IAsyncResult BeginSocketSend(ArraySegment<byte> buffer, AsyncCallback callback, object state)
- {
- return new SocketSendAsyncResult(buffer, this, callback, state);
- }
- void EndSocketSend(IAsyncResult result)
- {
- SocketSendAsyncResult.End(result);
- }
The SocketSendAsyncResult class is show in its entirety below. Right at its constructor (after storing some variables), we start the asynchronous operation (via the StartSending) method. In StartSending, we call the Socket.BeginSend to start that. One thing which is often overlooked is that asynchronous operations can actually be completed synchronously, so to prevent race conditions between chained asynchronous calls, it’s recommended that we check that and take different actions based on that. If it’s a “normal” asynchronous operation, we simply return and wait for the callback; otherwise we go into the CompleteSend function to see if the sending is completed, and complete the AsyncResult itself if it’s the case. Otherwise, we start sending it again. On CompleteSend, we need to see if all the bytes which we asked the socket to send were actually sent; if not, we need to start sending again from the point where we stopped. In the asynchronous case, the callback is invoked when the socket operation is complete (it’s also invoked if the operation completed synchronously, but since we already dealt with that case, the first thing in the operation is to check that and return if that’s indeed the case). In the callback we take a similar path as in the “completed synchronously” case: call CompleteSend to see if all bytes were indeed sent, and if not, start sending again.
- class SocketSendAsyncResult : AsyncResult
- {
- SizedTcpBaseChannel channel;
- ArraySegment<byte> buffer;
- public SocketSendAsyncResult(ArraySegment<byte> buffer, SizedTcpBaseChannel channel, AsyncCallback callback, object state)
- : base(callback, state)
- {
- this.channel = channel;
- this.buffer = buffer;
- this.StartSending();
- }
- void StartSending()
- {
- IAsyncResult sendResult = this.channel.socket.BeginSend(this.buffer.Array, this.buffer.Offset, this.buffer.Count, SocketFlags.None, OnSend, this);
- if (!sendResult.CompletedSynchronously)
- {
- return;
- }
- if (this.CompleteSend(sendResult))
- {
- base.Complete(true);
- }
- else
- {
- this.StartSending();
- }
- }
- bool CompleteSend(IAsyncResult result)
- {
- try
- {
- int bytesSent = channel.socket.EndSend(result);
- if (bytesSent == this.buffer.Count)
- {
- return true;
- }
- else
- {
- this.buffer = new ArraySegment<byte>(this.buffer.Array, this.buffer.Offset + bytesSent, this.buffer.Count - bytesSent);
- return false;
- }
- }
- catch (SocketException socketException)
- {
- throw ConvertSocketException(socketException, "Send");
- }
- }
- static void OnSend(IAsyncResult result)
- {
- if (result.CompletedSynchronously)
- {
- return;
- }
- SocketSendAsyncResult thisPtr = (SocketSendAsyncResult)result.AsyncState;
- Exception completionException = null;
- bool shouldComplete = false;
- try
- {
- if (thisPtr.CompleteSend(result))
- {
- shouldComplete = true;
- }
- else
- {
- thisPtr.StartSending();
- }
- }
- catch (Exception e)
- {
- completionException = e;
- }
- if (shouldComplete)
- {
- thisPtr.Complete(false, completionException);
- }
- }
- public static void End(IAsyncResult result)
- {
- AsyncResult.End<SocketSendAsyncResult>(result);
- }
- }
If you didn’t understand that class, please go back and look at it again. Classes like that will show up a lot over the next paragraphs…
So after validating that the unit test works (I had to fix a few things myself), so that was already a good thing to have those, on to the next operation: socket receive. Just like the socket send case, I’ll start with a unit test to validate it, using the synchronous send operation (which is a lot less error-prone than its asynchronous counterpart).
- [Fact]
- public void SynchronousSendAsynchronousReceive()
- {
- byte[] length = new byte[4];
- byte[] data = new byte[36];
- Random rndGen = new Random();
- rndGen.NextBytes(data);
- Formatting.SizeToBytes(data.Length, length, 0);
- byte[] toSend = new byte[length.Length + data.Length];
- Array.Copy(length, 0, toSend, 0, length.Length);
- Array.Copy(data, 0, toSend, length.Length, data.Length);
- object channel = ReflectionHelper.CreateInstance(
- typeof(SizedTcpTransportBindingElement),
- "JsonRpcOverTcp.Channels.SizedTcpBaseChannel",
- null,
- BufferManager.CreateBufferManager(int.MaxValue, int.MaxValue),
- Mocks.GetChannelManagerBase());
- Socket socket = Mocks.GetConnectedSocket(Port);
- ReflectionHelper.SetField(channel, "socket", socket);
- ReflectionHelper.CallMethod(channel, "SocketSend", toSend);
- object state = new object();
- bool success = true;
- ManualResetEvent evt = new ManualResetEvent(false);
- ReflectionHelper.CallMethod(channel, "BeginSocketReceiveBytes", toSend.Length, new AsyncCallback(delegate(IAsyncResult asyncResult)
- {
- try
- {
- if (!Object.ReferenceEquals(asyncResult.AsyncState, state))
- {
- success = false;
- Console.WriteLine("Error, state not preserved");
- }
- else
- {
- try
- {
- byte[] recvd = (byte[])ReflectionHelper.CallMethod(channel, "EndSocketReceiveBytes", asyncResult);
- Assert.NotNull(recvd);
- Assert.True(recvd.Length >= toSend.Length);
- if (recvd.Length > toSend.Length)
- {
- // maybe buffer manager returned a bigger buffer
- byte[] temp = new byte[toSend.Length];
- Array.Copy(recvd, 0, temp, 0, temp.Length);
- recvd = temp;
- }
- Assert.Equal(recvd, toSend, new ArrayComparer<byte>());
- }
- catch (Exception e)
- {
- Console.WriteLine("Error: " + e);
- success = false;
- }
- }
- }
- finally
- {
- evt.Set();
- }
- }), state);
- evt.WaitOne();
- Assert.True(success, "Error in callback");
- socket.Close();
- }
And the pattern continues. The Begin/End operations are fairly simple, just delegating the bulk of the work to the new class derived from AsyncResult.
- IAsyncResult BeginSocketReceiveBytes(int size, AsyncCallback callback, object state)
- {
- return BeginSocketReceiveBytes(size, true, callback, state);
- }
- IAsyncResult BeginSocketReceiveBytes(int size, bool throwOnEmpty, AsyncCallback callback, object state)
- {
- return new SocketReceiveAsyncResult(size, throwOnEmpty, this, callback, state);
- }
- byte[] EndSocketReceiveBytes(IAsyncResult result)
- {
- return SocketReceiveAsyncResult.End(result);
- }
Like on the previous AsyncResult class, the SocketReceiveAsyncResult starts off by storing some internal data, then firing off the socket request. Whether the request completed synchronously or not, the method CompleteReadBytes is called (by the constructor or the OnReadBytes callback, respectively). There we check whether the requested number of bytes has already been read. If that’s the case, we complete the result (which causes the client callback to be called). Otherwise we ask for more bytes in another BeginSocketReceive call.
- class SocketReceiveAsyncResult : AsyncResult
- {
- SizedTcpBaseChannel channel;
- int size;
- int bytesReadTotal;
- byte[] buffer;
- bool throwOnEmpty;
- public SocketReceiveAsyncResult(int size, bool throwOnEmpty, SizedTcpBaseChannel channel, AsyncCallback callback, object state)
- : base(callback, state)
- {
- this.size = size;
- this.channel = channel;
- this.throwOnEmpty = throwOnEmpty;
- this.bytesReadTotal = 0;
- this.buffer = channel.bufferManager.TakeBuffer(size);
- bool success = false;
- try
- {
- IAsyncResult socketReceiveResult = channel.BeginSocketReceive(this.buffer, bytesReadTotal, size, OnReadBytes, this);
- if (socketReceiveResult.CompletedSynchronously)
- {
- if (CompleteReadBytes(socketReceiveResult))
- {
- base.Complete(true);
- }
- }
- success = true;
- }
- finally
- {
- if (!success)
- {
- this.Cleanup();
- }
- }
- }
- void Cleanup()
- {
- if (this.buffer != null)
- {
- channel.bufferManager.ReturnBuffer(this.buffer);
- this.buffer = null;
- }
- }
- bool CompleteReadBytes(IAsyncResult result)
- {
- int bytesRead = channel.EndSocketReceive(result);
- bytesReadTotal += bytesRead;
- if (bytesRead == 0)
- {
- if (size == 0 || !throwOnEmpty)
- {
- channel.bufferManager.ReturnBuffer(this.buffer);
- this.buffer = null;
- return true;
- }
- else
- {
- throw new CommunicationException("Premature EOF reached");
- }
- }
- while (bytesReadTotal < size)
- {
- IAsyncResult socketReceiveResult = channel.BeginSocketReceive(buffer, bytesReadTotal, size - bytesReadTotal, OnReadBytes, this);
- if (!socketReceiveResult.CompletedSynchronously)
- {
- return false;
- }
- }
- return true;
- }
- static void OnReadBytes(IAsyncResult result)
- {
- if (result.CompletedSynchronously)
- {
- return;
- }
- SocketReceiveAsyncResult thisPtr = (SocketReceiveAsyncResult)result.AsyncState;
- Exception completionException = null;
- bool completeSelf = false;
- try
- {
- completeSelf = thisPtr.CompleteReadBytes(result);
- }
- catch (Exception e)
- {
- completeSelf = true;
- completionException = e;
- thisPtr.Cleanup();
- }
- if (completeSelf)
- {
- thisPtr.Complete(false, completionException);
- }
- }
- public static byte[] End(IAsyncResult result)
- {
- try
- {
- SocketReceiveAsyncResult thisPtr = AsyncResult.End<SocketReceiveAsyncResult>(result);
- return thisPtr.buffer;
- }
- catch (ObjectDisposedException)
- {
- return null;
- }
- }
- }
And now that the base operation is done, we can move onto the functions which call it. And that’s essentially the pattern for writing out asynchronous channels. Start from the bottom, and build up the stack until the function which we need (in this case [Begin/End]Request from the IRequestChannel interface). Next up, WriteData and ReadData. The first one, however, is fairly simple – it doesn’t need to iterate, doesn’t need to call multiple asynchronous requests, it simply needs to add the length to the buffer (a synchronous operation), and then send the bytes. So for this case, we don’t need to create a new AsyncResult implementation, simply chaining the call directly after modifying the input, as shown below.
- private IAsyncResult BeginWriteData(ArraySegment<byte> data, TimeSpan timeout,
- AsyncCallback callback, object state)
- {
- ArraySegment<byte> toSend = this.AddLengthToBuffer(data);
- return new SocketSendAsyncResult(toSend, this, callback, state);
- }
- void EndWriteData(IAsyncResult result)
- {
- SocketSendAsyncResult.End(result);
- }
Reading data, however, goes back to the asynchronous chain. To read a properly framed request, we need to make two read calls: one for the 4-byte length, and one for the actual data (all as cascading asynchronous calls). So we do need a new AsyncResult class at this time.
- IAsyncResult BeginReadData(AsyncCallback callback, object state)
- {
- return new ReadDataAsyncResult(this, callback, state);
- }
- ArraySegment<byte> EndReadData(IAsyncResult result)
- {
- return ReadDataAsyncResult.End(result);
- }
The ReadDataAsyncResult goes like the previous ones. The constructor initializes some variables, then makes the first asynchronous call (to receive the 4 length bytes). When that operation completes (on CompleteDrainLength), the object makes the second asynchronous call (to receive the actual bytes), and finally when that is done, the AsyncResult completes itself, saving the data read in an instance variable to be returned in the End call.
- class ReadDataAsyncResult : AsyncResult
- {
- ArraySegment<byte> buffer;
- SizedTcpBaseChannel channel;
- int dataLength;
- byte[] lengthBytes;
- byte[] data;
- public ReadDataAsyncResult(SizedTcpBaseChannel channel, AsyncCallback callback, object state)
- : base(callback, state)
- {
- this.channel = channel;
- bool success = false;
- try
- {
- IAsyncResult drainLengthResult = channel.BeginSocketReceiveBytes(4, false, OnDrainLength, this);
- if (drainLengthResult.CompletedSynchronously)
- {
- if (CompleteDrainLength(drainLengthResult))
- {
- base.Complete(true);
- }
- }
- success = true;
- }
- catch (CommunicationException e)
- {
- base.Complete(true, e);
- }
- finally
- {
- if (!success)
- {
- this.Cleanup();
- }
- }
- }
- bool CompleteDrainLength(IAsyncResult result)
- {
- this.lengthBytes = channel.EndSocketReceiveBytes(result);
- if (this.lengthBytes == null)
- {
- this.buffer = new ArraySegment<byte>();
- return true;
- }
- this.dataLength = Formatting.BytesToSize(this.lengthBytes, 0);
- IAsyncResult readDataResult = channel.BeginSocketReceiveBytes(this.dataLength, OnReadData, this);
- if (!readDataResult.CompletedSynchronously)
- {
- return false;
- }
- return CompleteReadData(result);
- }
- bool CompleteReadData(IAsyncResult result)
- {
- data = channel.EndSocketReceiveBytes(result);
- this.buffer = new ArraySegment<byte>(this.data, 0, this.dataLength);
- CleanupLength();
- return true;
- }
- static void OnDrainLength(IAsyncResult result)
- {
- if (result.CompletedSynchronously)
- {
- return;
- }
- ReadDataAsyncResult thisPtr = (ReadDataAsyncResult)result.AsyncState;
- Exception completionException = null;
- bool completeSelf = false;
- try
- {
- completeSelf = thisPtr.CompleteDrainLength(result);
- }
- catch (Exception e)
- {
- completeSelf = true;
- completionException = e;
- thisPtr.Cleanup();
- }
- if (completeSelf)
- {
- thisPtr.Complete(false, completionException);
- }
- }
- static void OnReadData(IAsyncResult result)
- {
- if (result.CompletedSynchronously)
- {
- return;
- }
- ReadDataAsyncResult thisPtr = (ReadDataAsyncResult)result.AsyncState;
- Exception completionException = null;
- bool completeSelf = false;
- try
- {
- completeSelf = thisPtr.CompleteReadData(result);
- }
- catch (Exception e)
- {
- completeSelf = true;
- completionException = e;
- thisPtr.Cleanup();
- }
- if (completeSelf)
- {
- thisPtr.Complete(false, completionException);
- }
- }
- public static ArraySegment<byte> End(IAsyncResult result)
- {
- ReadDataAsyncResult thisPtr = AsyncResult.End<ReadDataAsyncResult>(result);
- return thisPtr.buffer;
- }
- void Cleanup()
- {
- if (this.data != null)
- {
- this.channel.bufferManager.ReturnBuffer(data);
- this.data = null;
- }
- CleanupLength();
- }
- void CleanupLength()
- {
- if (this.lengthBytes != null)
- {
- this.channel.bufferManager.ReturnBuffer(lengthBytes);
- this.lengthBytes = null;
- }
- }
- }
So at this point we’re done with the “base” operations (those dealing with sockets only), and can start moving on to the channel operations themselves (those which deal with the Message type): SendMessage and ReceiveMessage. Those are quite similar: Send first encodes the Message into the data bytes, then writes the data to the wire; Receive first reads data from the wire, then decodes them into a Message. This is similar to the WriteData operation: a combination of one synchronous and one asynchronous operation. So thankfully we don’t need another AsyncResult class for those. As with the other operations, I also have unit tests for those, but I’ll skip them here (they’re in the code gallery download).
- public IAsyncResult BeginSendMessage(
- Message message, TimeSpan timeout, AsyncCallback callback, object state)
- {
- base.ThrowIfDisposedOrNotOpen();
- ArraySegment<byte> encodedMessage = this.EncodeMessage(message);
- return this.BeginWriteData(encodedMessage, timeout, callback, state);
- }
- public void EndSendMessage(IAsyncResult result)
- {
- this.EndWriteData(result);
- }
- public IAsyncResult BeginReceiveMessage(
- TimeSpan timeout, AsyncCallback callback, object state)
- {
- base.ThrowIfDisposedOrNotOpen();
- return this.BeginReadData(callback, state);
- }
- public Message EndReceiveMessage(IAsyncResult result)
- {
- ArraySegment<byte> encodedBytes = this.EndReadData(result);
- return this.DecodeMessage(encodedBytes);
- }
And now all the basic operations are done. All we need now is to (finally!) implement the Begin/End request operations, in the request channel itself. A request operation is a simple pair of Send/Receive calls (i.e., two asynchronous calls), so we need, yes, a new AsyncResult class to handle this case. First the methods themselves, trivial:
- public IAsyncResult BeginRequest(
- Message message, TimeSpan timeout, AsyncCallback callback, object state)
- {
- return new RequestAsyncResult(message, timeout, this, callback, state);
- }
- public IAsyncResult BeginRequest(Message message, AsyncCallback callback, object state)
- {
- return this.BeginRequest(message, base.DefaultSendTimeout, callback, state);
- }
- public Message EndRequest(IAsyncResult result)
- {
- return RequestAsyncResult.End(result);
- }
And the RequestAsyncResult class. As usual, the constructor calls the first asynchronous operation (Send), and when it’s complete (synchronously or not), the completion of the operation calls the second operation (Receive).
- class RequestAsyncResult : AsyncResult
- {
- private Message response;
- private TimeSpan timeout;
- private SizedTcpRequestChannel channel;
- public RequestAsyncResult(Message message, TimeSpan timeout, SizedTcpRequestChannel channel, AsyncCallback callback, object state)
- : base(callback, state)
- {
- this.channel = channel;
- this.timeout = timeout;
- IAsyncResult sendResult = channel.BeginSendMessage(message, timeout, OnSend, this);
- if (sendResult.CompletedSynchronously)
- {
- this.CompleteSend(sendResult);
- }
- }
- static void OnSend(IAsyncResult asyncResult)
- {
- if (asyncResult.CompletedSynchronously)
- {
- return;
- }
- RequestAsyncResult thisPtr = (RequestAsyncResult)asyncResult.AsyncState;
- Exception completeException = null;
- bool completeSelf = false;
- try
- {
- completeSelf = thisPtr.CompleteSend(asyncResult);
- }
- catch (Exception e)
- {
- completeException = e;
- completeSelf = true;
- }
- if (completeSelf)
- {
- thisPtr.Complete(false, completeException);
- }
- }
- bool CompleteSend(IAsyncResult asyncResult)
- {
- this.channel.EndSendMessage(asyncResult);
- IAsyncResult receiveResult = this.channel.BeginReceiveMessage(this.timeout, OnReceive, this);
- if (!receiveResult.CompletedSynchronously)
- {
- return false;
- }
- this.CompleteReceive(asyncResult);
- return false;
- }
- static void OnReceive(IAsyncResult asyncResult)
- {
- if (asyncResult.CompletedSynchronously)
- {
- return;
- }
- RequestAsyncResult thisPtr = (RequestAsyncResult)asyncResult.AsyncState;
- Exception completeException = null;
- try
- {
- thisPtr.CompleteReceive(asyncResult);
- }
- catch (Exception e)
- {
- completeException = e;
- }
- thisPtr.Complete(false, completeException);
- }
- void CompleteReceive(IAsyncResult asyncResult)
- {
- this.response = this.channel.EndReceiveMessage(asyncResult);
- }
- internal static Message End(IAsyncResult result)
- {
- RequestAsyncResult thisPtr = AsyncResult.End<RequestAsyncResult>(result);
- return thisPtr.response;
- }
- }
And finally the request asynchronous operation is complete. For this case, it’s actually interesting to show the unit test. Although the class is internal (so we need to use the reflection helper to instantiate it), we know that the type inherits from ChannelBase and implements IRequestChannel, so we can use those public types to write the tests in a cleaner way.
- [Fact]
- public void AsynchronousRequest()
- {
- byte[] data = new byte[36];
- Random rndGen = new Random();
- rndGen.NextBytes(data);
- Message input = Formatting.BytesToMessage(data);
- ManualResetEvent evt = new ManualResetEvent(false);
- Uri serverUri = new Uri(SizedTcpTransportBindingElement.SizedTcpScheme + "://" + Environment.MachineName + ":" + Port);
- object channel = ReflectionHelper.CreateInstance(
- typeof(SizedTcpTransportBindingElement),
- "JsonRpcOverTcp.Channels.SizedTcpRequestChannel",
- new ByteStreamMessageEncodingBindingElement().CreateMessageEncoderFactory().Encoder,
- BufferManager.CreateBufferManager(int.MaxValue, int.MaxValue),
- Mocks.GetChannelManagerBase(),
- new EndpointAddress(serverUri),
- serverUri);
- ChannelBase channelBase = (ChannelBase)channel;
- IRequestChannel requestChannel = (IRequestChannel)channel;
- channelBase.Open();
- object state = new object();
- bool success = true;
- requestChannel.BeginRequest(input, new AsyncCallback(delegate(IAsyncResult asyncResult)
- {
- try
- {
- if (!Object.ReferenceEquals(asyncResult.AsyncState, state))
- {
- success = false;
- Console.WriteLine("Error, state not preserved");
- }
- else
- {
- Message output = requestChannel.EndRequest(asyncResult);
- try
- {
- byte[] outputBytes = Formatting.MessageToBytes(output);
- Assert.Equal(data, outputBytes, new ArrayComparer<byte>());
- }
- catch (Exception e)
- {
- Console.WriteLine("Error: " + e);
- success = false;
- }
- }
- }
- finally
- {
- evt.Set();
- }
- }), state);
- evt.WaitOne();
- Assert.True(success, "Error in callback");
- channelBase.Close();
- }
So are we done? Not quite. Notice that opening the channel actually involves some networking calls (first get the IP address of the server from the name, a DNS call, then connect the socket to the server, with the TCP handshaking). So we need to override the asynchronous open calls (BeginOpen and EndOpen) to make sure that we don’t block the calling thread. And since we’re dealing with at least two asynchronous calls… you know the drill by now.
- protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)
- {
- return new ConnectAsyncResult(timeout, this, callback, state);
- }
- protected override void OnEndOpen(IAsyncResult result)
- {
- ConnectAsyncResult.End(result);
- }
And the ConnectAsyncResult class. Notice that a machine may have multiple IP addresses, so we need to account for that when trying to connect the socket.
- class ConnectAsyncResult : AsyncResult
- {
- TimeSpan timeout;
- SizedTcpRequestChannel channel;
- IPHostEntry hostEntry;
- Socket socket;
- bool connected;
- int currentEntry;
- int port;
- public ConnectAsyncResult(TimeSpan timeout, SizedTcpRequestChannel channel, AsyncCallback callback, object state)
- : base(callback, state)
- {
- // production code should use this timeout
- this.timeout = timeout;
- this.channel = channel;
- IAsyncResult dnsGetHostResult = Dns.BeginGetHostEntry(channel.Via.Host, OnDnsGetHost, this);
- if (!dnsGetHostResult.CompletedSynchronously)
- {
- return;
- }
- if (this.CompleteDnsGetHost(dnsGetHostResult))
- {
- base.Complete(true);
- }
- }
- static void OnDnsGetHost(IAsyncResult result)
- {
- if (result.CompletedSynchronously)
- {
- return;
- }
- ConnectAsyncResult thisPtr = (ConnectAsyncResult)result.AsyncState;
- Exception completionException = null;
- bool completeSelf = false;
- try
- {
- completeSelf = thisPtr.CompleteDnsGetHost(result);
- }
- catch (Exception e)
- {
- completeSelf = true;
- completionException = e;
- }
- if (completeSelf)
- {
- thisPtr.Complete(false, completionException);
- }
- }
- bool CompleteDnsGetHost(IAsyncResult result)
- {
- try
- {
- this.hostEntry = Dns.EndGetHostEntry(result);
- }
- catch (SocketException socketException)
- {
- throw new EndpointNotFoundException("Unable to resolve host" + channel.Via.Host, socketException);
- }
- port = this.channel.Via.Port;
- if (port == -1)
- {
- port = 8000; // Let's call it the default port for our protocol
- }
- IAsyncResult socketConnectResult = this.BeginSocketConnect();
- if (!socketConnectResult.CompletedSynchronously)
- {
- return false;
- }
- return this.CompleteSocketConnect(socketConnectResult);
- }
- IAsyncResult BeginSocketConnect()
- {
- IPAddress address = this.hostEntry.AddressList[this.currentEntry];
- this.socket = new Socket(address.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
- while (true)
- {
- try
- {
- return this.socket.BeginConnect(new IPEndPoint(address, this.port), OnSocketConnect, this);
- }
- catch (SocketException socketException)
- {
- if (this.currentEntry == this.hostEntry.AddressList.Length - 1)
- {
- throw ConvertSocketException(socketException, "Connect");
- }
- this.currentEntry++;
- }
- }
- }
- static void OnSocketConnect(IAsyncResult result)
- {
- if (result.CompletedSynchronously)
- {
- return;
- }
- ConnectAsyncResult thisPtr = (ConnectAsyncResult)result.AsyncState;
- Exception completionException = null;
- bool completeSelf = false;
- try
- {
- completeSelf = thisPtr.CompleteSocketConnect(result);
- }
- catch (Exception e)
- {
- completeSelf = true;
- completionException = e;
- }
- if (completeSelf)
- {
- thisPtr.Complete(false, completionException);
- }
- }
- bool CompleteSocketConnect(IAsyncResult result)
- {
- while (!this.connected && this.currentEntry < this.hostEntry.AddressList.Length)
- {
- try
- {
- socket.EndConnect(result);
- connected = true;
- break;
- }
- catch (SocketException socketException)
- {
- if (this.currentEntry == this.hostEntry.AddressList.Length - 1)
- {
- throw ConvertSocketException(socketException, "Connect");
- }
- this.currentEntry++;
- }
- result = BeginSocketConnect();
- if (!result.CompletedSynchronously)
- {
- return false;
- }
- }
- this.channel.InitializeSocket(socket);
- return true;
- }
- public static void End(IAsyncResult result)
- {
- AsyncResult.End<ConnectAsyncResult>(result);
- }
- }
Now, we’re finally done! Now for the client application to test it. An asynchronous interface (which can be typed if the endpoint is using the behavior we implemented in the previous post) gives us the starting point for testing it.
- [ServiceContract]
- public interface ITypedTestAsync
- {
- [OperationContract(AsyncPattern = true)]
- IAsyncResult BeginAdd(int x, int y, AsyncCallback callback, object state);
- int EndAdd(IAsyncResult asyncResult);
- [OperationContract(AsyncPattern = true)]
- IAsyncResult BeginSubtract(int x, int y, AsyncCallback callback, object state);
- int EndSubtract(IAsyncResult asyncResult);
- [OperationContract(AsyncPattern = true)]
- IAsyncResult BeginMultiply(int x, int y, AsyncCallback callback, object state);
- int EndMultiply(IAsyncResult asyncResult);
- [OperationContract(AsyncPattern = true)]
- IAsyncResult BeginDivide(int x, int y, AsyncCallback callback, object state);
- int EndDivide(IAsyncResult asyncResult);
- }
And using that interface in a program to verify that it worked (which, in my case, it did, since I had fixed the errors while creating the unit tests).
- Console.WriteLine("Now using the typed asynchronous interface");
- var asyncTypedFactory = new ChannelFactory<ITypedTestAsync>(binding, address);
- asyncTypedFactory.Endpoint.Behaviors.Add(new JsonRpcEndpointBehavior());
- ITypedTestAsync asyncTypedProxy = asyncTypedFactory.CreateChannel();
- AutoResetEvent evt = new AutoResetEvent(false);
- Console.WriteLine("Calling BeginAdd");
- asyncTypedProxy.BeginAdd(5, 8, delegate(IAsyncResult ar)
- {
- result = asyncTypedProxy.EndAdd(ar);
- Console.WriteLine(" ==> Result: {0}", result);
- Console.WriteLine();
- evt.Set();
- }, null);
- evt.WaitOne();
- Console.WriteLine("Calling BeginMultiply");
- asyncTypedProxy.BeginMultiply(5, 8, delegate(IAsyncResult ar)
- {
- result = asyncTypedProxy.EndMultiply(ar);
- Console.WriteLine(" ==> Result: {0}", result);
- Console.WriteLine();
- evt.Set();
- }, null);
- evt.WaitOne();
- Console.WriteLine("Calling BeginDivide (throws)");
- asyncTypedProxy.BeginDivide(5, 0, delegate(IAsyncResult ar)
- {
- try
- {
- result = asyncTypedProxy.EndDivide(ar);
- Console.WriteLine(" ==> Result: {0}", result);
- }
- catch (JsonRpcException e)
- {
- Console.WriteLine("Error: {0}", e.JsonException);
- }
- Console.WriteLine();
- evt.Set();
- }, null);
- evt.WaitOne();
And now, we’re really, really done. Here’s a recap of this “mini-series”:
- Transport Channels – Request Channels, part 1: Basic, synchronous-only IRequestChannel implementation
- Transport Channels – Request Channels, part 2: Service model support for the sample
- Transport Channels – Request Channels, part 3: Asynchronous support for the channel
Coming up
Next up, a different kind of channel – let’s move to the server side with an IReplyChannel.