REDUCE Expression (U-SQL)
Summary
U-SQL provides the ability to write custom rowset reducers in C# using the user-defined operator extensibility framework by implementing an IReducer
. See U-SQL Programmability Guide: User-Defined Reducer for more information.
A reducer, unlike an aggregator that operates on columns and produces a scalar value, provides a way to implement custom grouping and aggregation on a rowset and returning a rowset.
A reducer is being invoked with the REDUCE
expression that provide the necessary information about both the input rowset, the grouping columns, the expected result schema as well as additional information that is useful for optimization.
A reducer is invoked once per group and produces zero or one or multiple rows per group in return.
A reducer provides limited optimization support, because an optimizer cannot reason about the procedural C# code defining the reducer. For example, it cannot push predicates through to earlier statements unless the column used in the predicate is marked as read only. Therefore, it is recommended to instead use a user-defined aggregator or use the optional READONLY or REQUIRED clauses.
Syntax
Reduce_Expression := 'REDUCE' Input_Rowset ['PRESORT' Identifier_List] ('ALL' | 'ON' Identifier_List) Produce_Clause [Readonly_Clause] [Required_Clause] USING_Clause.
Remarks
Input_Rowset
Specifies the input rowset that the reducer will operate on as either the reference to a rowset name or by a nested rowset expression:
Syntax
Input_Rowset := Rowset | Rowset_Expression.
with the following semantics:
Rowset
The two simplest rowset sources are a rowset variable such as@rowset
that has been defined in a previous statement of the script or a table that has been created in the account’s catalog:
Syntax
Rowset := Rowset_Variable | Identifier.
A table can be referenced either with its fully qualified 3-part name, within the current database context with a 2-part name, or within the current database and schema context with a single-part name.
Rowset_Expression
U-SQL also provides the ability to reduce nested query expressions, table-valued function calls or querying external rowsets. Follow the links for more details on each.
Syntax
Rowset_Expression := '(' Query_Expression ')' | Function_Call | External_Rowset_Expression.
The UDO programming model makes both the values and the schema of the input rowset available in the context of the reducer's implementation.
PRESORT
The optionalPRESORT
clause guarantees the rows are ordered by the given identifier.ALL
The optionalALL
indicates that the whole input rowset will become the group to be reduced. Similar to aGROUP BY ALL
.ON Identifier_List
This option specifies the list of columns that define the groups.
Syntax
Identifier_List := Quoted_or_Unquoted_Identifier {',' Quoted_or_Unquoted_Identifier}.
If the columns are not part of the input rowset’s columns or are not comparable, an error is raised.
Syntax
Produce_Clause := 'PRODUCE' Column_Definition_List.
Column_Definition_List
This list defines the schema of the reducer. The returned columns are defined as a pair of column names and column types:
Syntax
Column_Definition_List := Column_Definition { ',' Column_Definition}.
Column_Definition := Quoted_or_Unquoted_Identifier Built_in_Type.
Each column has an identifier that can be either a quoted or unquoted identifier. A column is typed with one of the U-SQL types that the reducer supports.
The UDO programming model makes the specified rowset schema available to the implementation of the reducer. An error is raised if the reducer is producing a schema that is incompatible with the specified return schema.
Readonly_Clause
The optionalREADONLY
clause can help the UDO programmer to write more efficient code. For more information on how the UDO programmer can take advantage of this hint, see the U-SQL C# Developer’s Guide.The optional
READONLY
clause specifies the columns are read only for the reducer and will be passed through to the output using either the same name or the specified column name in parenthesis. Only columns in the reduce expression’s ON clause can be markedREADONLY
, otherwise the error“E_CSC_USER_UDOREADONLYNOTKEYCOLUMN: Column '…' cannot be marked as READONLY”
is raised.
Syntax
Readonly_Clause := 'READONLY' Star_Or_Readonly_Column_List.
Star_Or_Readonly_Column_List := '*' | Readonly_Column_List.
Readonly_Column_List := Readonly_Column { ',' Readonly_Column }.
Readonly_Column := Column_Identifier [Output_Column_Dependency_Alias].
Output_Column_Dependency_Alias := '(' Quoted_or_Unquoted_Identifier ')'.
Required_Clause
The optionalREQUIRED
clause can help the UDO programmer to write more efficient code. For more information on how the UDO programmer can take advantage of this hint, see the U-SQL C# Developer’s Guide.The optional
REQUIRED
clause specifies that either all columns are required on input for the reducer (if specified with*
) or the specified columns are required. If a specified column is followed by a list of columns in parenthesis, then the input column is only required if the columns in that list are referenced from the output.
Syntax
Required_Clause := 'REQUIRED' Star_Or_Required_Column_List.
Star_Or_Required_Column_List := '*' | Required_Column_List.
Required_Column_List := Required_Column { ',' Required_Column}.
Required_Column := Column_Identifier [Required_Output_Column_Dependency_List].
Required_Output_Column_Dependency_List := '(' Identifier_List ')'.
Syntax
USING_Clause := 'USING' udo_expression.
The USING
clause takes a C# expression that returns an instance of IReducer
. Users can write their own by implementing an IReducer
(see U-SQL Programmability Guide: User-Defined Reducer for more detail on how to write your own reducer). Most commonly, the UDO expression is either the instantiation of a reducer class of the form
USING new MyNameSpace.MyReducer(parameter:"value")
or the invocation of a factory method
USING MyNameSpace.MyReducerFactory(parameter:"value")
where parameter
is a parameter of the reducer.
Examples
- The examples can be executed in Visual Studio with the Azure Data Lake Tools plug-in.
- The scripts can be executed locally. An Azure subscription and Azure Data Lake Analytics account is not needed when executed locally.
- For simplicity, the example(s) with user-defined code make use of Code-Behind for assembly management. The main advantage of Code-Behind is that the tooling will register the assembly file and add the REFERENCE ASSEMBLY statement automatically. To use Assembly registration instead of Code-Behind, see Using Assemblies: Code-Behind vs. Assembly Registration Walkthrough.
User-Defined Reducer - RangeReducer
Example is a slightly modified version of the example given at How do I combine overlapping ranges using U-SQL? Introducing U-SQL Reducer UDOs and usql/Examples/RangeReducer/RangeReducer/. Please review the reducer article for details.
c# code is placed in the associated Code-Behind .cs file. See usage in next section, below.
using Microsoft.Analytics.Interfaces;
using Microsoft.Analytics.Types.Sql;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace ReduceSample
{
[SqlUserDefinedReducer(IsRecursive = true)] // not sure if it can run recursive yet. Need to test with large data sets.
public class RangeReducer : IReducer
{
public override IEnumerable<IRow> Reduce(IRowset input, IUpdatableRow output)
{
// Init aggregation values
bool first_row_processed = false;
var begin = DateTime.MaxValue; // Dummy value to make compiler happy
var end = DateTime.MinValue; // Dummy value to make compiler happy
// requires that the reducer is PRESORTED on begin and READONLY on the reduce key.
foreach (var row in input.Rows)
{
// Initialize the first interval with the first row if i is 0
if (!first_row_processed)
{
first_row_processed = true; // mark that we handled the first row
begin = row.Get<DateTime>("begin");
end = row.Get<DateTime>("end");
// If the end is just a time and not a date, it can be earlier than the begin, indicating it is on the next day.
// This let's fix up the end to the next day in that case
if (end < begin) { end = end.AddDays(1); }
}
else
{
var b = row.Get<DateTime>("begin");
var e = row.Get<DateTime>("end");
// fix up the date if end is earlier than begin
if (e < b) { e = e.AddDays(1); }
// if the begin is still inside the interval, increase the interval if it is longer
if (b <= end)
{
// if the new end time is later than the current, extend the interval
if (e > end) { end = e; }
}
else // output the previous interval and start a new one
{
output.Set<DateTime>("begin", begin);
output.Set<DateTime>("end", end);
yield return output.AsReadOnly();
begin = b; end = e;
} // if
} // if
} // foreach
// now output the last interval
output.Set<DateTime>("begin", begin);
output.Set<DateTime>("end", end);
yield return output.AsReadOnly();
} // Reduce
} // RangeReducer
} // ReduceSample
Using User-Defined Reducer - RangeReducer
Using Code-Behind from previous section, above.
// Dataset
@aLog =
SELECT * FROM
( VALUES
("ABC", new DateTime(2017,01,01, 05, 00, 00), new DateTime(2017,01,01, 06, 00, 00)),
("XYZ", new DateTime(2017,01,01, 05, 00, 00), new DateTime(2017,01,01, 06, 00, 00)),
("ABC", new DateTime(2017,01,01, 08, 00, 00), new DateTime(2017,01,01, 09, 00, 00)),
("ABC", new DateTime(2017,01,01, 08, 00, 00), new DateTime(2017,01,01, 10, 00, 00)),
("ABC", new DateTime(2017,01,01, 10, 00, 00), new DateTime(2017,01,01, 14, 00, 00)),
("ABC", new DateTime(2017,01,01, 07, 00, 00), new DateTime(2017,01,01, 11, 00, 00)),
("ABC", new DateTime(2017,01,01, 09, 00, 00), new DateTime(2017,01,01, 11, 00, 00)),
("ABC", new DateTime(2017,01,01, 11, 00, 00), new DateTime(2017,01,01, 11, 30, 00)),
("FOO", new DateTime(2017,01,01, 23, 40, 00), new DateTime(2017,01,01, 23, 59, 00)),
("FOO", new DateTime(2017,01,01, 23, 50, 00), new DateTime(2017,01,02, 00, 40, 00))
) AS T(user, begin, end);
// Data as is
@results =
SELECT user,
String.Format("{0:t}", begin) AS StartTime,
String.Format("{0:t}", end) AS EndTime
FROM @aLog;
OUTPUT @results
TO "/ReferenceGuide/QSE/PrimaryRowsetExpressions/Reducer/RangeReducerA.csv"
USING Outputters.Csv();
// Using Reducer
@results =
REDUCE @aLog
PRESORT begin
ON user
PRODUCE user string,
begin DateTime,
end DateTime
READONLY user
REQUIRED begin, end
USING new ReduceSample.RangeReducer();
// Some formatting
@results =
SELECT user,
String.Format("{0:t}", begin) AS StartTime,
String.Format("{0:t}", end) AS EndTime
FROM @results
ORDER BY begin OFFSET 0 ROWS;
OUTPUT @results
TO "/ReferenceGuide/QSE/PrimaryRowsetExpressions/Reducer/RangeReducerB.csv"
USING Outputters.Csv();
User-Defined Reducer - SalesReducer
c# code is placed in the associated Code-Behind .cs file. See usage in next section, below.
using Microsoft.Analytics.Interfaces;
using Microsoft.Analytics.Types.Sql;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace ReferenceGuide_Examples
{
[SqlUserDefinedReducer]
public class SalesReducer : IReducer
{
public override IEnumerable<IRow> Reduce(IRowset input, IUpdatableRow output)
{
decimal SalesAmount;
decimal TaxAmt;
foreach (IRow row in input.Rows)
{
SalesAmount = row.Get<decimal>("SalesAmount");
TaxAmt = row.Get<decimal>("TaxAmt");
if (SalesAmount + TaxAmt > 1000m)
{
yield return output.AsReadOnly();
}
}
}
}
}
Using User-Defined Reducer - SalesReducer
Using Code-Behind from previous section, above.
// Dataset
@sales =
SELECT * FROM
( VALUES
("SO43659", 349, 5, 2024.99m, 162.00m),
("SO43660", 326, 5, 419.46m, 33.56m),
("SO43661", 300, 6, 809.76m, 64.78m),
("SO43662", 330, 6, 1258.38m, 100.67m),
("SO43663", 322, 4, 419.46m, 33.56m),
("SO43664", 345, 1, 2039.99m, 163.20m),
("SO43665", 220, 1, 40.37m, 3.23m),
("SO43666", 330, 4, 419.46m, 33.56m),
("SO43667", 219, 3, 17.10m, 1.37m),
("SO43668", 317, 6, 2624.38m, 209.95m)
) AS T(SalesOrderNumber, ProductKey, SalesTerritoryKey, SalesAmount, TaxAmt);
// Using reducer
@reducer =
REDUCE @sales
ON SalesOrderNumber, ProductKey, SalesTerritoryKey, SalesAmount, TaxAmt
PRODUCE SalesOrderNumber,
ProductKey,
SalesTerritoryKey,
SalesAmount,
TaxAmt
READONLY *
REQUIRED SalesAmount, TaxAmt
USING new ReferenceGuide_Examples.SalesReducer();
OUTPUT @reducer
TO "/ReferenceGuide/QSE/PrimaryRowsetExpressions/Reducer/SalesReducerC.csv"
USING Outputters.Csv(outputHeader: true);
// NOT using reducer
@result =
SELECT *
FROM @sales
WHERE SalesAmount + TaxAmt > 1000m;
OUTPUT @result
TO "/ReferenceGuide/QSE/PrimaryRowsetExpressions/Reducer/SalesReducerD.csv"
USING Outputters.Csv(outputHeader: true);
Reducer with ORDER BY and FETCH
The ORDER BY clause with FETCH allows the selection of a limited number of rows based on the specified order.
This examples continues to use SalesReducer
defined earlier.
// Same as previous example but only returns top 3 records ordered by SalesAmount
@reducer =
REDUCE @sales
PRESORT SalesOrderNumber
ON SalesOrderNumber, ProductKey, SalesTerritoryKey, SalesAmount, TaxAmt
PRODUCE SalesOrderNumber,
ProductKey,
SalesTerritoryKey,
SalesAmount,
TaxAmt
READONLY *
REQUIRED SalesAmount, TaxAmt
USING new ReferenceGuide_Examples.SalesReducer()
ORDER BY SalesAmount DESC FETCH 3 ROWS;
OUTPUT @reducer
TO "/ReferenceGuide/QSE/PrimaryRowsetExpressions/Reducer/SalesReducer_fetch3.txt"
USING Outputters.Tsv(outputHeader: true);