Weekend Fun 01: Building Application Request Routing/ Reverse Proxy like components using OWIN
[アーティクル]
For the fun of it. Let us build an IIS ARR without IIS (there might be an actual requirements where you want to run this in Azure Web Sites where you don't have access to do that, or hosting in a worker role).
How can we build that? if you strip out all the fatty details what we need is the following
A pipeline that processes the request with the following stages (in order)
1-HTTP Listener
2- Few components that perform AuthN and AuthZ
3- A component that looks at the incoming request and decides where this request goes say request coming in are for https://domainA.com and it should go https://domainB.com (similar to IIS URL rewrite).
4- A component that takes the request and executes.
OWIN comes natural to this as it has the pipeline (and an HTTPlistener based on .NET frx HTTP Listener).
The below code is hosted in a console app:
Program Main
1 2 3 4 5 6 7 8 9101112
static void Main(string[] args) { string baseUrl = "https://localhost:5001/"; using (var server = WebApp.Start<Startup>(new StartOptions(baseUrl))) { Console.WriteLine("Server Started... Press Enter to quit."); Console.ReadKey(); } }
the above just runs the server
the Startup class sets up 2 components (the URL re-writer and the other is Request Router)
12345678
public class Startup { public void Configuration(IAppBuilder app) { app.UseURLRewrite(); app.UseRR(); } }
Because this is fun project. I wanted to do more. Let us assume we want to implement 2 very simple pattern Broadcast (request goes to multiple servers) or First Responder (request goes to multiple servers returns when any responds).
so my URL rewrite does the following (OWIN invoke method). it randomly selects (based on current time second odd/even) modes (either broadcast or first responder) setting list of URLs and current mode in OWIN environment dictionary
public async Task Invoke(IDictionary<string, object> env) { var response = env["owin.ResponseBody"] as Stream; List<string> TargetUrls = new List<string>(); //current code does't handle GZIP // BBC is not using it. other site will give octet/stream response time out of the proxy. if(DateTime.Now.Second %2 > 0 ) { // scatter and gather approach TargetUrls .Add("https://www.bbc.com"); TargetUrls.Add("https://www.bbc.com"); TargetUrls.Add("https://www.bbc.com"); TargetUrls.Add("https://www.bbc.com"); TargetUrls.Add("https://www.bbc.com"); TargetUrls.Add("https://www.bbc.com"); TargetUrls.Add("https://www.bbc.com"); TargetUrls.Add("https://www.bbc.com"); env.Add("rr.Mode", "ALL"); Debug.WriteLine("routed first case"); } else { env.Add("rr.Mode", "ANY"); Debug.WriteLine("routed second case"); TargetUrls.Add("https://www.bbc.com"); } env.Add("rr.targetURLs", TargetUrls); await _next.Invoke(env); }
My Request router performs the following:
1- copies original request stream
2- prepares an HTTPRequest/HTTPClient instance for each URL.
3- copies original request headers
4- execute the request
5- waits for the results (either All or Any).
6- copies response headers back to the down steam header.
7- copies the response body back into the down stream body.
public async Task Invoke(IDictionary<string, object> env) { var Tasks = new List<Task<HttpResponseMessage>>(); var outStream = new MemoryStream(); Task tCopy = null; var inStream = env["owin.RequestBody"] as Stream; string rrmode = env["rr.Mode"] as string; var Clients = new List<HttpClient>(); List<string> sTargetURs = env["rr.targetURLs"] as List<string>; if(null != inStream) tCopy = inStream.CopyToAsync(outStream); else tCopy = Task.FromResult(0); foreach (var s in sTargetURs) { var client = new System.Net.Http.HttpClient(); // copy headers foreach (var header in env["owin.RequestHeaders"] as IDictionary<string, string[]>) client.DefaultRequestHeaders.Add(header.Key, header.Value); // copy body HttpRequestMessage request = new HttpRequestMessage(); await tCopy; // finish copy var sMethod = env["owin.RequestMethod"] as string; // set body - not all types take a body if("get" != sMethod.ToLower()) request.Content = new StreamContent(outStream); //set method request.Method = new HttpMethod( sMethod); var sPath = (env["owin.RequestPath"] as string) ?? string.Empty; var sQueryString = (env["owin.RequestQueryString"] as string) ?? string.Empty; if (string.Empty != sQueryString) sQueryString = "?" + sQueryString ; request.RequestUri = new Uri(s + sPath + sQueryString); // add it to our bag // you should track cancellation tokens as well. Tasks.Add(client.SendAsync(request)); Clients.Add(client); } HttpResponseMessage selectedResponse = null;// :-( HttpStatusCode _responseCode = HttpStatusCode.InternalServerError; HttpResponseHeaders _responseHeaders = null; HttpContent _responseContent = null; string _responseStatusMessage = "Proxy!"; // obviously w are not using RFC - review spec: RFC 2616 section 6.1.1 // wait for response try { if (rrmode == "ALL") { Debug.WriteLine(string.Format("waiting for - ALL - {0} requests", sTargetURs.Count)); var completedTasks = await Task.WhenAll<HttpResponseMessage>(Tasks); Debug.WriteLine(string.Format("{0} requests completed..", sTargetURs.Count)); // you can do fun stuff here like compsing a response, concating it etc.. selectedResponse = completedTasks[0]; } else { Debug.WriteLine(string.Format("waiting for - ANY - {0} requests", sTargetURs.Count)); // this could be "any" or "Single" mode var completedTask = await Task.WhenAny<HttpResponseMessage>(Tasks); // you should keep a list of cancelation tokens. and cancel all once one returned. // in this example all tasks will keep in running and return to nothing string.Format("1 request completed.."); selectedResponse = await completedTask; } selectedResponse.EnsureSuccessStatusCode(); _responseCode = selectedResponse.StatusCode; _responseHeaders = selectedResponse.Headers; _responseContent = selectedResponse.Content; } catch (AggregateException ae) { // log ae _responseStatusMessage = "error executing the request: " + ae.Flatten().Message; } catch (Exception e) // we are assuming since we are here we actually executed a request and we got a response { //log e _responseStatusMessage = "general internal error" + e.Message; } finally { // release all clients Clients.ForEach(c => c.Dispose()); } await setResponse(env, _responseCode, _responseHeaders, _responseContent, _responseStatusMessage); await _next.Invoke(env); }
While this is FUN project there is a lot of applications to it. Typically with high-scale write operations are based on eventual consistency using platforms such as Event Hubs/Service Bus/CQRS or databases (name your favorite eventual consistency DB here). Read operations however specially those across multiple shards/systems are harder to implement. My favorite implementation for this a command broker. where the client (imagine SPA page) sends one request that this pipeline intercepts and expand into multiple sub commands each is targeting a WebAPI/System.
The code is attached to this past. hope you had a fun week end