In addition to @MartinJaffer-MSFT comments, the following picture shows an example of the subscriber for invoking an ADF pipeline:
Note, that the subscriber has a responsibility for validation response for all delivery schemas such as an EventGridSchema, CloudEventSchemaV1_0 and CustomInputSchema, also its implementation is done for webhook which is allowing to pass AAD Authorization header to the subscriber logic.
The following code snippet shows an implementation of the subscriber to invoke the ADF pipeline via the REST POST request:
#r "Newtonsoft.Json"
using System.Net;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Primitives;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using System.Text;
public static async Task<IActionResult> Run(HttpRequest req, ILogger log)
{
string headers = string.Join(" | ", req.Headers.Where(h => h.Key.StartsWith("aeg-") || h.Key.StartsWith("Content-Type")).Select(i => $"{i.Key}={i.Value.First()}"));
log.LogInformation($"Method: {req.Method} Headers: {headers}");
if (req.Method == HttpMethod.Options.ToString())
{
log.LogInformation("CloudEventSchema validation");
req.HttpContext.Response.Headers.Add("Webhook-Allowed-Origin", req.Headers["WebHook-Request-Origin"].FirstOrDefault()?.Trim());
return (ActionResult)new OkResult();
}
var jtoken = JToken.Parse(await new StreamReader(req.Body).ReadToEndAsync());
if(jtoken is JArray)
jtoken = jtoken.SingleOrDefault<JToken>();
string eventTypeHeader = req.Headers["aeg-event-type"].FirstOrDefault()?.Trim();
if(eventTypeHeader == "SubscriptionValidation")
{
if(jtoken["eventType"].Value<string>() == "Microsoft.EventGrid.SubscriptionValidationEvent")
{
log.LogInformation("EventGridSchema validation");
return (ActionResult)new OkObjectResult(new { validationResponse = ((dynamic)jtoken["data"]["validationCode"])});
}
return new BadRequestObjectResult($"Not valid event schema");
}
else if(eventTypeHeader == "Notification")
{
//log.LogInformation(req.Headers["Authorization"].FirstOrDefault());
//log.LogInformation(jtoken.ToString());
try
{
if(!req.Headers.Keys.Contains("Authorization"))
throw new Exception("Missing Authorization header");
if(!req.Query.Keys.Contains("pipeline"))
throw new Exception("Missing url query parameter: pipeline");
string resourceId = Environment.GetEnvironmentVariable("AzureDataFactory_resourceId");
string address = $"https://management.azure.com{resourceId}/pipelines/{req.Query["pipeline"]}/createrun?api-version=2018-06-01";
using(var client = new HttpClient())
{
client.DefaultRequestHeaders.Add("Authorization", req.Headers["Authorization"].FirstOrDefault());
var response = await client.PostAsync(address, new StringContent(jtoken.ToString(), Encoding.UTF8, "application/json"));
response.EnsureSuccessStatusCode();
var result = await response.Content.ReadAsStringAsync();
log.LogInformation(result);
return new OkObjectResult(result);
}
}
catch(Exception ex)
{
log.LogError(ex.Message);
return new BadRequestObjectResult($"{ex.Message}");
}
}
return new BadRequestObjectResult($"{eventTypeHeader} is not a valid type");
}
The following screen snippet shows a part of the subscription for destination property such as AAD authentication and adding an url query parameter for pipeline name:
Note, that the AAD Authentication header is needed for invoking a pipeline via the REST POST request. The address of this request is constructed based on the ADF resource id stored in the configuration settings.
Thanks
Roman