Hadoop XML Streaming and F# MapReduce
So, to round out the Hadoop Streaming samples I thought I would put together an XML Streaming sample. As always the code can be found here:
https://code.msdn.microsoft.com/Hadoop-Streaming-and-F-f2e76850
XML Streaming Reader
So how does one stream in XML? If you read the Hadoop Streaming documentation you will notice the following FAQ:
You can use the record reader StreamXmlRecordReader to process XML documents.
hadoop jar hadoop-streaming.jar -inputreader "StreamXmlRecord,begin=BEGIN_STRING,end=END_STRING" ..... (rest of the command)
Anything found between BEGIN_STRING and END_STRING would be treated as one record for map tasks.
This should allow one to define a start and end tag using commands such as:
-inputreader "StreamXmlRecord,begin=<Row>,end=</Row>"
However, if one tries to run this from the command console one gets an error due to the “<” character; as this is used to redirect standard input. It seems that using the usual caret escape character ^ is not enough.
In this example, to parse the XML one could just specify the XML Element name, “Row” in this case, and the start and end tags could easily be derived. As such to assist in XML processing I have provided an XML reader to do exactly this. I was going to modify the base Hadoop stream reader but it seems that there is traction behind the usage of a Mahout library, called “org.apache.mahout.classifier.bayes.XmlInputFormat”.
I have changed this base library to allow a configuration value of “"xmlinput.element" that allows one to just define the XML Element to be processed. This allows one to support an XML Streaming job used with the following configuration:
"-D xmlinput.element=Store" -inputformat org.apache.mahout.classifier.bayes.XmlElementStreamingInputFormat
As a note one has to remember to place the job configuration setting in quotes.
The changes to support this are quite minimal. Firstly the Input Format is defined as:
public class XmlElementStreamingInputFormat
extends FileInputFormat <NullWritable, Text> {
public static final String XML_ELEMENT_KEY = "xmlinput.element";
@Override
public RecordReader<NullWritable, Text> getRecordReader(
InputSplit inputSplit, JobConf jobConf, Reporter reporter) throws IOException {
return new XmlElementStreamingRecordReader((FileSplit) inputSplit, jobConf);
}
}
Secondly the Record Reader is modified to define the start and end tags based on the new configuration value:
String elementTagName = jobConf.get(XML_ELEMENT_KEY);
startTag = ("<" + elementTagName + ">").getBytes("UTF8");
endTag = ("</" + elementTagName + ">").getBytes("UTF8");
In using this code one has the restriction that the element tag being searched for is well-formed and has no corresponding attributes. If this is not the case one can easily create a new XML Input Format class specific to your needs.
The complete original source can be found in various places; including github. My modifications are in the source code download.
With the new XML Input Format class in place we are good to go with our F# code.
Map and Reduce Classes
As always, lets start with Map and Reduce functions. The goal of the sample code is to support a Map and Reduce with the following prototypes:
Map : XElement –> seq<(string * string) * obj>
Reduce : string * string -> seq<string> -> obj option
The idea is the Mapper takes in an XElement and projects this into a sequence of keys and values. The Reducer, as in the text streaming case, takes in a key and value pair and returns an optional reduced value.
The main difference that you will notice in this sample is that the key is a tuple, which in turn is mapped into multiple map keys. This is supported in Streaming application through the configuration value “stream.num.map.output.key.fields”. The Map in the previous samples returns a single string key value.
For the sample I was going to process a series of XML nodes with the following structure (created by running a TSQL Select from the Adventure Works Store table):
<Store>
<BusinessEntityID>294</BusinessEntityID>
<Name>Professional Sales and Service</Name>
<SalesPersonID>276</SalesPersonID>
<Demographics>
<StoreSurvey xmlns="https://schemas.microsoft.com/sqlserver/2004/07/adventure-works/StoreSurvey">
<AnnualSales>800000</AnnualSales>
<AnnualRevenue>80000</AnnualRevenue>
<BankName>International Bank</BankName>
<BusinessType>BM</BusinessType>
<YearOpened>1991</YearOpened>
<Specialty>Touring</Specialty>
<SquareFeet>18000</SquareFeet>
<Brands>4+</Brands>
<Internet>T1</Internet>
<NumberEmployees>14</NumberEmployees>
</StoreSurvey>
</Demographics>
<ModifiedDate>2008-10-13T11:15:07.497</ModifiedDate>
</Store>
The Map code is going to extract the Sales Amount for each Business Type and Bank:
let Map (element:XElement) =
let aw = "https://schemas.microsoft.com/sqlserver/2004/07/adventure-works/StoreSurvey"
let demographics = element.Element(XName.Get("Demographics")).Element(XName.Get("StoreSurvey", aw))
if not(demographics = null) then
let business = demographics.Element(XName.Get("BusinessType", aw)).Value
let bank = demographics.Element(XName.Get("BankName", aw)).Value
let key = (business, bank)
let sales = Decimal.Parse(demographics.Element(XName.Get("AnnualSales", aw)).Value)
Seq.singleton (key, sales)
else
Seq.empty
The Reduce then sums the Sales across each of the Business Types and Banks:
let Reduce (key:(string*string)) (values:seq<string>) =
let totalRevenue =
values |>
Seq.fold (fun revenue value -> revenue + Int32.Parse(value)) 0
Some(totalRevenue)
The rationale for the Map returning a sequence rather than a single value as the Map calling code allows for Map function to return multiples values per XML node.
All simple processing. So how are the Map and Reduce functions called?
Mapper and Reducer Executable
The complexity in the Mapper executable, which calls the Map function, comes in processing the XML input stream. Whereas in text streaming one gets a line per record to be processed, in the case of XML this is not the case. For XML Streaming one will get a stream of XML which will more than likely be presented over multiple lines. As such, the Mapper executable will need to parse the input, in my case line by line, and extract the required nodes for the construction of the XElement type. The requirement here then is not that different from the Java XML Input Format class.
In the sample code the construction of the XElement sequence is achieved through the XMLElements sequence definition. The full code listing is as follows:
module Controller =
let Run (args:string array) =
// Define what arguments are expected
let defs = [
{ArgInfo.Command="input"; Description="Input XML File"; Required=false };
{ArgInfo.Command="output"; Description="Output File"; Required=false } ]
// Parse Arguments into a Dictionary
let parsedArgs = Arguments.ParseArgs args defs
// Ensure Standard Input/Output and allow for debug configuration
use reader =
if parsedArgs.ContainsKey("input") then
new StreamReader(Path.GetFullPath(parsedArgs.["input"]))
else
new StreamReader(Console.OpenStandardInput())
use writer =
if parsedArgs.ContainsKey("output") then
new StreamWriter(Path.GetFullPath(parsedArgs.["output"]))
else
new StreamWriter(Console.OpenStandardOutput(), AutoFlush = true)
// Combine the name/value output into a string
let outputCollector ((outputKey1, outputKey2) , outputValue) =
let output = sprintf "%s\t%s\t%O" outputKey1 outputKey2 outputValue
writer.WriteLine(output)
// Write an counter entry
let counterReporter docType =
stderr.WriteLine (sprintf "reporter:counter:Elements Processed,%s,1" docType)
let nodename = StoreXmlElementMapper.MapNode
let nodestart = "<" + nodename + ">"
let nodeend = "</" + nodename + ">"
// Read the next line of the input stream
let readLine() =
reader.ReadLine()
// Parse the input stream into a sequence of XElement types
let elementBuilder = new StringBuilder(1024)
let rec xmlElements inContent (someContent:string option) = seq {
let line =
match someContent with
| Some(content) -> content
| None -> readLine()
if not (box line = null) then
if (inContent) then
// Try to find the end element and yield accordingly
let offset = line.IndexOf(nodeend, 0, StringComparison.InvariantCultureIgnoreCase)
if (offset > -1) then
// Found the endnode so append and start new XElement
let content = line.Substring(0, offset + nodeend.Length)
elementBuilder.Append(content) |> ignore
let nextContent =
if (offset + nodeend.Length = line.Length) then
None
else
Some(line.Substring(offset + nodeend.Length))
yield XElement.Parse(elementBuilder.ToString())
elementBuilder.Clear() |> ignore
yield! xmlElements false nextContent
else
// Just a content line so append
elementBuilder.AppendLine(line) |> ignore
yield! xmlElements true None
else
// Find the start node element and start building
let offset = line.IndexOf(nodestart, 0, StringComparison.InvariantCultureIgnoreCase)
if (offset > -1) then
yield! xmlElements true (Some(line.Substring(offset)))
else
yield! xmlElements false None
}
// Process the XElement sequence and report on successes
let elementProcessed value =
outputCollector value
counterReporter "Successfully Processed"
try
xmlElements false None
|> Seq.map StoreXmlElementMapper.Map
|> Seq.iter (Seq.iter elementProcessed)
with
| :? System.Xml.XmlException ->
// Ignore invalid xml elements but report on count
counterReporter "Invalid Elements"
// Close the streams
reader.Close()
writer.Close()
The premise of the creation of the sequence of XElement types is that a StringBuilder is used to build up the text used to create the XElement. The opening tag is located after which any content is appended to the StringBuilder. Once the end tag is located and appended to the StringBuilder, its contents are yielded as an XElement, and the process repeated. One has to remember that after finding the end tag the remainder of the line needs to be fed into the locate for the next opening tag.
As this processing is dependant on knowing the element name to process I decided to have the module containing the Map function return the node name. Other methods could be used but this seemed to be the cleanest as the Mapper needs this understanding to process the XML.
In addition to handling the XML processing this sample code also demonstrates writing out the composite key from the tuple and generating counters showing the number of elements processed; both success and failures.
With the XML previously mentioned the output from the Mapper executable will be:
BM Guardian Bank 1100000
BM International Bank 3000000
BM International Security 2700000
BM Primary Bank & Reserve 2200000
BM Primary International 1100000
BM Reserve Security 1100000
BM United Security 2900000
BS International Security 1500000
BS Primary Bank & Reserve 1500000
BS Reserve Security 800000
OS Guardian Bank 6000000
OS International Bank 4500000
OS International Security 4500000
OS Primary Bank & Reserve 10500000
OS Primary International 6000000
OS Reserve Security 6000000
OS United Security 4500000
So onto the Reducer. The Reducer, as in the previous samples, just has to read in this text stream and call the Reduce function. The full code listing is as follows:
module Controller =
let Run (args:string array) =
// Define what arguments are expected
let defs = [
{ArgInfo.Command="input"; Description="Input File"; Required=false };
{ArgInfo.Command="output"; Description="Output File"; Required=false } ]
// Parse Arguments into a Dictionary
let parsedArgs = Arguments.ParseArgs args defs
// Ensure Standard Input/Output and allow for debug configuration
use reader =
if parsedArgs.ContainsKey("input") then
new StreamReader(Path.GetFullPath(parsedArgs.["input"]))
else
new StreamReader(Console.OpenStandardInput())
use writer =
if parsedArgs.ContainsKey("output") then
new StreamWriter(Path.GetFullPath(parsedArgs.["output"]))
else
new StreamWriter(Console.OpenStandardOutput(), AutoFlush = true)
// Combine the name/value output into a string
let outputCollector (outputKey1, outputKey2) outputValue =
let output = sprintf "%s\t%s\t%O" outputKey1 outputKey2 outputValue
writer.WriteLine(output)
// Read the next line of the input stream
let readLine() =
reader.ReadLine()
// Parse the input into the required key/value pair
let parseLine (input:string) =
let keyValue = input.Split('\t')
((keyValue.[0].Trim(), keyValue.[1].Trim()), keyValue.[2].Trim())
// Converts a input line into an option
let getInput() =
let input = readLine()
if not(String.IsNullOrWhiteSpace(input)) then
Some(parseLine input)
else
None
// Creates a sequence of the input based on the provided key
let lastInput = ref None
let continueDo = ref false
let inputsByKey key firstValue = seq {
// Yield any value from previous read
yield firstValue
continueDo := true
while !continueDo do
match getInput() with
| Some(input) when (fst input) = key ->
// Yield found value and remainder of sequence
yield (snd input)
| Some(input) ->
// Have a value but different key
lastInput := Some(input)
continueDo := false
| None ->
// Have no more entries
lastInput := None
continueDo := false
}
// Controls the calling of the reducer
let rec processInput (input:((string * string) * string) option) =
if input.IsSome then
let (key, value) = input.Value
let reduced = StoreXmlElementReducer.Reduce key (inputsByKey key value)
if reduced.IsSome then
outputCollector key reduced.Value
processInput lastInput.contents
processInput (getInput())
The main difference in this code is the support for a tuple as a key. As in previous samples the Reducer has to create a sequence of sequences, as lazy evaluated groupBy, of the input data based on the key values. In this instance to find a match one needs to ensure both key elements are the same. This is achieved in F# by the fact tuple types support structural equality (check out Equality and Comparison Constraints in F# by Don Syme for more details).
Of course one could generalize this code to support N key elements; but maybe more on this at another time.
Finally onto running the job. The command to run this MapReduce job is as follows:
hadoop.cmd jar lib/hadoop-streaming-ms.jar
"-D xmlinput.element=Store" "-D stream.num.map.output.key.fields=2"
-input "/stores/demographics" -output "/stores/banking/release"
-mapper "..\..\jars\FSharp.Hadoop.MapperXml.exe"
-combiner "..\..\jars\FSharp.Hadoop.ReducerXml.exe"
-reducer "..\..\jars\FSharp.Hadoop.ReducerXml.exe"
-file "C:\bin\Release\FSharp.Hadoop.MapperXml.exe"
-file "C:\bin\Release\FSharp.Hadoop.ReducerXml.exe"
-file "C:\bin\Release\FSharp.Hadoop.MapReduce.dll"
-file "C:\bin\Release\FSharp.Hadoop.Utilities.dll"
-inputformat org.apache.mahout.classifier.bayes.XmlElementStreamingInputFormat
The configuration values one has to set are the number of output key fields, “stream.num.map.output.key” with a value of 2, and the XML element name to be processed, “xmlinput.element” with a value of “Store”.
As in the Binary Document Streaming sample, I created a new Streaming JAR called “hadoop-streaming-ms.jar” which contains the new Java classes on top of the base Hadoop streaming classes. The downloadable code has the full listing of the Java classes along with a command file to compile the source code.
Tester Application
As always I have put together a tester application for calling the Mapper and Reducer executable:
module MapReduceConsole =
let Run args =
// Define what arguments are expected
let defs = [
{ArgInfo.Command="input"; Description="Input File"; Required=true };
{ArgInfo.Command="output"; Description="Output File"; Required=true };
{ArgInfo.Command="tempPath"; Description="Temp File Path"; Required=true };
{ArgInfo.Command="mapper"; Description="Mapper EXE"; Required=true };
{ArgInfo.Command="reducer"; Description="Reducer EXE"; Required=true };
{ArgInfo.Command="nodename"; Description="XML Node"; Required=true }; ]
// Parse Arguments into a Dictionary
let parsedArgs = Arguments.ParseArgs args defs
Arguments.DisplayArgs parsedArgs
// define the executables
let mapperExe = Path.GetFullPath(parsedArgs.["mapper"])
let reducerExe = Path.GetFullPath(parsedArgs.["reducer"])
let nodename = parsedArgs.["nodename"]
let nodestart = "<" + nodename + ">"
let nodeend = "</" + nodename + ">"
Console.WriteLine()
Console.WriteLine (sprintf "The Mapper file is:\t%O" mapperExe)
Console.WriteLine (sprintf "The Reducer file is:\t%O" reducerExe)
Console.WriteLine (sprintf "Processing Nodename:\t%O" nodename)
// Get the file names
let inputfile = Path.GetFullPath(parsedArgs.["input"])
let outputfile = Path.GetFullPath(parsedArgs.["output"])
let tempPath = Path.GetFullPath(parsedArgs.["tempPath"])
let tempFile = Path.Combine(tempPath, Path.GetFileName(outputfile))
let mappedfile = Path.ChangeExtension(tempFile, "mapped")
let reducefile = Path.ChangeExtension(tempFile, "reduced")
Console.WriteLine()
Console.WriteLine (sprintf "The input file is:\t\t%O" inputfile)
Console.WriteLine (sprintf "The mapped temp file is:\t%O" mappedfile)
Console.WriteLine (sprintf "The reduced temp file is:\t%O" reducefile)
Console.WriteLine (sprintf "The output file is:\t\t%O" outputfile)
// Give the user an option to continue
Console.WriteLine()
Console.WriteLine("Hit ENTER to continue...")
Console.ReadLine() |> ignore
// Parse the input stream into a sequence of XElement types
let mapperProcess() =
use mapper = new Process()
mapper.StartInfo.FileName <- mapperExe
mapper.StartInfo.UseShellExecute <- false
mapper.StartInfo.RedirectStandardInput <- true
mapper.StartInfo.RedirectStandardOutput <- true
mapper.Start() |> ignore
use mapperInput = mapper.StandardInput
use mapperOutput = mapper.StandardOutput
// Map the reader to a background thread so processing can happen in parallel
Console.WriteLine "Mapper Processing Starting..."
let taskMapperFunc() =
use mapperWriter = File.CreateText(mappedfile)
while not mapperOutput.EndOfStream do
mapperWriter.WriteLine(mapperOutput.ReadLine())
let taskMapperWriting = Task.Factory.StartNew(Action(taskMapperFunc))
// Pass the file into the mapper process and close input stream when done
use mapperReader = new StreamReader(File.OpenRead(inputfile))
let elementBuilder = new StringBuilder(1024)
let rec xmlElements inContent (someContent:string option) = seq {
let line =
match someContent with
| Some(content) -> content
| None -> mapperReader.ReadLine()
if not (box line = null) then
if (inContent) then
// Try to find the end element and yield accordingly
let offset = line.IndexOf(nodeend, 0, StringComparison.InvariantCultureIgnoreCase)
if (offset > -1) then
// Found the endnode so always add a new line
let content = line.Substring(0, offset + nodeend.Length)
elementBuilder.AppendLine(content) |> ignore
let nextContent =
if (offset + nodeend.Length = line.Length) then
None
else
Some(line.Substring(offset + nodeend.Length))
yield elementBuilder.ToString()
elementBuilder.Clear() |> ignore
yield! xmlElements false nextContent
else
// Just a content line so append
elementBuilder.AppendLine(line) |> ignore
yield! xmlElements true None
else
// Find the start node element and start building
let offset = line.IndexOf(nodestart, 0, StringComparison.InvariantCultureIgnoreCase)
if (offset > -1) then
yield! xmlElements true (Some(line.Substring(offset)))
else
yield! xmlElements false None
}
xmlElements false None
|> Seq.iter mapperInput.WriteLine
mapperInput.Close()
taskMapperWriting.Wait()
mapperOutput.Close()
mapper.WaitForExit()
let result = match mapper.ExitCode with | 0 -> true | _ -> false
mapper.Close()
result
// Sort the mapped file by the first field - mimic the role of Hadoop
let hadoopProcess() =
Console.WriteLine "Hadoop Processing Starting..."
let unsortedValues = seq {
use reader = new StreamReader(File.OpenRead(mappedfile))
while not reader.EndOfStream do
let input = reader.ReadLine()
let keyValue = input.Split('\t')
yield (keyValue.[0].Trim(), keyValue.[1].Trim(), keyValue.[2].Trim())
reader.Close()
}
use writer = File.CreateText(reducefile)
unsortedValues
|> Seq.sortBy (fun (key1, key2, value) -> (key1, key2))
|> Seq.iter (fun (key1, key2, value) -> writer.WriteLine (sprintf "%O\t%O\t%O" key1 key2 value))
writer.Close()
// Finally call the reducer process
let reducerProcess() =
use reducer = new Process()
reducer.StartInfo.FileName <- reducerExe
reducer.StartInfo.UseShellExecute <- false
reducer.StartInfo.RedirectStandardInput <- true
reducer.StartInfo.RedirectStandardOutput <- true
reducer.Start() |> ignore
use reducerInput = reducer.StandardInput
use reducerOutput = reducer.StandardOutput
// Map the reader to a background thread so processing can happen in parallel
Console.WriteLine "Reducer Processing Starting..."
let taskReducerFunc() =
use reducerWriter = File.CreateText(outputfile)
while not reducerOutput.EndOfStream do
reducerWriter.WriteLine(reducerOutput.ReadLine())
let taskReducerWriting = Task.Factory.StartNew(Action(taskReducerFunc))
// Pass the file into the mapper process and close input stream when done
use reducerReader = new StreamReader(File.OpenRead(reducefile))
while not reducerReader.EndOfStream do
reducerInput.WriteLine(reducerReader.ReadLine())
reducerInput.Close()
taskReducerWriting.Wait()
reducerOutput.Close()
reducer.WaitForExit()
let result = match reducer.ExitCode with | 0 -> true | _ -> false
reducer.Close()
result
// Finish test
if mapperProcess() then
Console.WriteLine "Mapper Processing Complete..."
hadoopProcess()
Console.WriteLine "Hadoop Processing Complete..."
if reducerProcess() then
Console.WriteLine "Reducer Processing Complete..."
Console.WriteLine "Processing Complete..."
Console.ReadLine() |> ignore
Again, the challenge in this code is the processing of the XML. This sample code processes a single XML input file, performing the parsing of the XML into the required elements, and then streaming this into the Mapper executable. If you review the code you will see a string similarity between the Mapper executable XElement sequence generation and how the XML is parsed and streamed into the Mapper.
Finally, a required argument into the tester application is the “nodename” value. This is analogous to the job configuration parameter when running the job within a Hadoop cluster.
Conclusion
So this wraps up Streaming examples for a while. I have enjoyed putting it all together, but I hope it is some useful code.