Framework for .Net Hadoop MapReduce Job Submission TextOutput Type
Some recent changes made to the “Generics based Framework for .Net Hadoop MapReduce Job Submission” code were to support Json and Binary Serialization from Mapper, in and out of Combiners, and out from the Reducer. However, this precluded one from controlling the format of the Text output. Say one wanted to create a tab delimited string from the Reducer. This could only be done using Json Serialization. To better support allowing one to construct the final text output I have created a new TextOutput type.
This TextOutput type is simple in structure. However, when this type is encountered during the serialization process, both Json and Binary serialization are bypassed and the text is written out in its raw format; including tabs and other characters usually escaped by the Json serializer.
As an example here is a modified version of one of the C# Reducer samples that supports both Json and Text output:
namespace MSDN.Hadoop.MapReduceCSharp
{
[DataContract]
public class MobilePhoneRange
{
[DataMember] public TimeSpan MinTime { get; set; }
[DataMember] public TimeSpan MaxTime { get; set; }
public MobilePhoneRange(TimeSpan minTime, TimeSpan maxTime)
{
this.MinTime = minTime;
this.MaxTime = maxTime;
}
public TextOutput ToText(string format)
{
return new TextOutput(String.Format(@"({0}, {1})", this.MinTime.ToString(format), this.MaxTime.ToString(format)));
}
}
public class MobilePhoneRangeReducer : ReducerBase<TimeSpan, MobilePhoneRange>
{
public override IEnumerable<Tuple<string, MobilePhoneRange>> Reduce(string key, IEnumerable<TimeSpan> value)
{
var baseRange = new MobilePhoneRange(TimeSpan.MaxValue, TimeSpan.MinValue);
var rangeValue = value.Aggregate(baseRange, (accSpan, timespan) =>
new MobilePhoneRange((timespan < accSpan.MinTime) ? timespan : accSpan.MinTime, (timespan > accSpan.MaxTime ) ? timespan : accSpan.MaxTime));
yield return new Tuple<string, MobilePhoneRange>(key, rangeValue);
}
}
public class MobilePhoneRangeTextReducer : ReducerBase<TimeSpan, TextOutput>
{
public override IEnumerable<Tuple<string, TextOutput>> Reduce(string key, IEnumerable<TimeSpan> value)
{
var baseRange = new MobilePhoneRange(TimeSpan.MaxValue, TimeSpan.MinValue);
var rangeValue = value.Aggregate(baseRange, (accSpan, timespan) =>
new MobilePhoneRange((timespan < accSpan.MinTime) ? timespan : accSpan.MinTime, (timespan > accSpan.MaxTime) ? timespan : accSpan.MaxTime));
yield return new Tuple<string, TextOutput>(key, rangeValue.ToText("G"));
}
}
}
For the sample Reducer above the Json serialization output would be:
Android {"MaxTime":"PT23H59M54S","MinTime":"PT6S"}
RIM OS {"MaxTime":"PT23H59M58S","MinTime":"PT1M7S"}
Unknown {"MaxTime":"PT23H52M36S","MinTime":"PT36S"}
Windows Phone {"MaxTime":"PT23H55M17S","MinTime":"PT32S"}
iPhone OS {"MaxTime":"PT23H59M50S","MinTime":"PT1S"}
The corresponding Text Output would be:
Android (0:00:00:06.0000000, 0:23:59:54.0000000)
RIM OS (0:00:01:07.0000000, 0:23:59:58.0000000)
Unknown (0:00:00:36.0000000, 0:23:52:36.0000000)
Windows Phone (0:00:00:32.0000000, 0:23:55:17.0000000)
iPhone OS (0:00:00:01.0000000, 0:23:59:50.0000000)
As mentioned the actual definition of the TextOutput type is simple and is just a wrapper over a string, although depending on needs this may change:
type TextOutput(value:string) =
// Internal text value
let mutable text = value
/// String value of the TextOutput class
member this.Text
with get () = text
and set (value) = text <- value
/// Byte array value of the TextOutput class
member this.Bytes
with get() = Encoding.UTF8.GetBytes(text)
and set (value:byte array) = text <- Encoding.UTF8.GetString(value)
new(value:byte array) =
TextOutput(Encoding.UTF8.GetString(value))
new(value:TextOutput) =
TextOutput(value.Text)
new() =
TextOutput(String.Empty)
/// Clear Text
member this.Clear() =
text <- String.Empty
/// Append Text
member this.Append(value:string) =
text <- text + value
text
/// ToString override
override this.ToString() =
text
///Equals override
override this.Equals(value:obj) =
if not (value.GetType() = typeof<TextOutput>) then
false
else
let objText = (value :?> TextOutput)
text.Equals(objText.Text)
/// GetHasCode override
override this.GetHashCode() =
text.GetHashCode()
One of the main rationales for adding the TextOutput support is so that data output by the framework can be easily used by Hive CREATE TABLE statements.
Hope you find this change useful.