Consulta de secuencias observables mediante operadores LINQ

En Puente con eventos de .NET existentes, hemos convertido los eventos de .NET existentes en secuencias observables para suscribirlos. En este tema, veremos la naturaleza de primera clase de las secuencias observables como objetos T> IObservables<, en los que los ensamblados Rx proporcionan los operadores LINQ genéricos para manipular estos objetos. La mayoría de los operadores toman una secuencia observable y realizan alguna lógica en ella y generan otra secuencia observable. Además, como puede ver en nuestros ejemplos de código, incluso puede encadenar varios operadores en una secuencia de origen para ajustar la secuencia resultante a su requisito exacto.

Uso de operadores diferentes

Ya hemos usado los operadores Create y Generate en los temas anteriores para crear y devolver secuencias sencillas. También hemos usado el operador FromEventPattern para convertir los eventos de .NET existentes en secuencias observables. En este tema, usaremos otros operadores LINQ estáticos del tipo Observable para que pueda filtrar, agrupar y transformar datos. Estos operadores toman secuencias observables como entrada y generan secuencias observables como salida.

Combinación de diferentes secuencias

En esta sección, examinaremos algunos de los operadores que combinan varias secuencias observables en una sola secuencia observable. Observe que los datos no se transforman cuando combinamos secuencias.

En el ejemplo siguiente, usamos el operador Concat para combinar dos secuencias en una sola secuencia y suscribirla. Para fines ilustrativos, usaremos el operador Range(x, y) muy sencillo para crear una secuencia de enteros que comienza con x y genera números secuenciales y posteriormente.

var source1 = Observable.Range(1, 3);
var source2 = Observable.Range(1, 3);
source1.Concat(source2)
       .Subscribe(Console.WriteLine);
Console.ReadLine();

Observe que la secuencia resultante es 1,2,3,1,2,3. Esto se debe a que cuando se usa el operador Concat, la segunda secuencia (source2) no estará activa hasta después de que la primera secuencia (source1) haya terminado de insertar todos sus valores. Solo es después source1 de que se haya completado y, a continuación source2 , empezará a insertar valores en la secuencia resultante. A continuación, el suscriptor obtendrá todos los valores de la secuencia resultante.

Compare esto con el operador Merge. Si ejecuta el código de ejemplo siguiente, obtendrá 1,1,2,2,3,3. Esto se debe a que las dos secuencias están activas al mismo tiempo y los valores se insertan a medida que se producen en los orígenes. La secuencia resultante solo se completa cuando la última secuencia de origen ha terminado de insertar valores.

Tenga en cuenta que para que Merge funcione, todas las secuencias observables de origen deben ser del mismo tipo de T> IObservable<. La secuencia resultante será del tipo IObservable<T>. Si source1 genera un OnError en medio de la secuencia, la secuencia resultante se completará inmediatamente.

var source1 = Observable.Range(1, 3);
var source2 = Observable.Range(1, 3);
source1.Merge(source2)
       .Subscribe(Console.WriteLine);
Console.ReadLine();

Otra comparación se puede realizar con el operador Catch. En este caso, si source1 se completa sin ningún error, source2 no se iniciará. Por lo tanto, si ejecuta el código de ejemplo siguiente, solo obtendrá 1,2,3 , ya source2 que (que genera 4,5,6) se omite.

var source1 = Observable.Range(1, 3);
var source2 = Observable.Range(4, 3);
source1.Catch(source2)
       .Subscribe(Console.WriteLine);
Console.ReadLine();

Por último, echemos un vistazo a OnErrorResumeNext. Este operador pasará a source2 incluso si source1 no se puede completar debido a un error. En el ejemplo siguiente, aunque source1 representa una secuencia que finaliza con una excepción (mediante el operador Throw), el suscriptor recibirá valores (1,2,3) publicados por source2. Por lo tanto, si espera que cualquiera de las secuencias de origen produzca algún error, es una apuesta más segura usar OnErrorResumeNext para garantizar que el suscriptor seguirá recibiendo algunos valores.

var source1 = Observable.Throw<int>(new Exception("An error has occurred."));
var source2 = Observable.Range(4, 3);
source1.OnErrorResumeNext(source2)
       .Subscribe(Console.WriteLine);
Console.ReadLine();

Tenga en cuenta que, para que todos estos operadores de combinación funcionen, todas las secuencias observables deben ser del mismo tipo de T.

Proyección

El operador Select puede traducir cada elemento de una secuencia observable en otro formulario.

En el ejemplo siguiente, proyectamos una secuencia de enteros en cadenas de longitud n respectivamente.

var seqNum = Observable.Range(1, 5);
var seqString = from n in seqNum
                select new string('*', (int)n);
seqString.Subscribe(str => { Console.WriteLine(str); });
Console.ReadKey();

En el ejemplo siguiente, que es una extensión del ejemplo de conversión de eventos de .NET que vimos en el tema Puente con eventos .NET existentes , usamos el operador Select para proyectar el tipo de datos IEventPattern<MouseEventArgs> en un tipo point . De este modo, estamos transformando una secuencia de eventos de movimiento del mouse en un tipo de datos que se puede analizar y manipular aún más, como se puede ver en la siguiente sección "Filtrado".

var frm = new Form();
IObservable<EventPattern<MouseEventArgs>> move = Observable.FromEventPattern<MouseEventArgs>(frm, "MouseMove");
IObservable<System.Drawing.Point> points = from evt in move
                                          select evt.EventArgs.Location;
points.Subscribe(pos => Console.WriteLine("mouse at " + pos));
Application.Run(frm);

Por último, echemos un vistazo al operador SelectMany. El operador SelectMany tiene muchas sobrecargas, una de las cuales toma un argumento de función selector. La función selectora se invoca en cada valor insertado por el observable de origen. Para cada uno de estos valores, el selector lo proyecta en una secuencia mini observable. Al final, el operador SelectMany aplana todas estas mini secuencias en una sola secuencia resultante, que luego se inserta en el suscriptor.

El observable devuelto de SelectMany publica OnCompleted después de que se hayan completado la secuencia de origen y todas las secuencias mini observables generadas por el selector. Se desencadena OnError cuando se ha producido un error en la secuencia de origen, cuando la función selector produjo una excepción o cuando se produjo un error en cualquiera de las secuencias mini observables.

En el ejemplo siguiente, primero creamos una secuencia de origen que genera un entero cada 5 segundos y decidimos tomar solo los dos primeros valores generados (mediante el operador Take). A continuación, se usa SelectMany para proyectar cada uno de estos enteros mediante otra secuencia de {100, 101, 102}. Al hacerlo, se generan dos secuencias mini observables y {100, 101, 102}{100, 101, 102}. Por último, se aplanan en una sola secuencia de enteros de {100, 101, 102, 100, 101, 102} y se insertan en el observador.

var source1 = Observable.Interval(TimeSpan.FromSeconds(5)).Take(2);
var proj = Observable.Range(100, 3);
var resultSeq = source1.SelectMany(proj);

var sub = resultSeq.Subscribe(x => Console.WriteLine("OnNext : {0}", x.ToString()),
                              ex => Console.WriteLine("Error : {0}", ex.ToString()),
                              () => Console.WriteLine("Completed"));
Console.ReadKey();

Filtros

En el ejemplo siguiente, usamos el operador Generate para crear una secuencia observable simple de números. El operador Generate tiene varias sobrecargas. En nuestro ejemplo, se toma un estado inicial (0 en nuestro ejemplo), una función condicional para finalizar (menos de 10 veces), un iterador (+1), un selector de resultados (una función cuadrada del valor actual). , e imprime solo los menores de 15 con los operadores Where y Select.

  
IObservable<int> seq = Observable.Generate(0, i => i < 10, i => i + 1, i => i * i);
IObservable<int> source = from n in seq
                          where n < 5
                          select n;
source.Subscribe(x => {Console.WriteLine(x);});   // output is 0, 1, 4, 9
Console.ReadKey();

El ejemplo siguiente es una extensión del ejemplo de proyección que ha visto anteriormente en este tema. En ese ejemplo, hemos usado el operador Select para proyectar el tipo de datos IEventPattern<MouseEventArgs> en un tipo Point . En el ejemplo siguiente, usamos el operador Where y Select para seleccionar solo los movimientos del mouse que nos interesan. En este caso, filtramos los movimientos del mouse a los del primer bisector (donde las coordenadas x e y son iguales).

var frm = new Form(); 
IObservable<EventPattern<MouseEventArgs>> move = Observable.FromEventPattern<MouseEventArgs>(frm, "MouseMove");
IObservable<System.Drawing.Point> points = from evt in move
                                          select evt.EventArgs.Location;
var overfirstbisector = from pos in points
                        where pos.X == pos.Y 
                        select pos;
var movesub = overfirstbisector.Subscribe(pos => Console.WriteLine("mouse at " + pos));
Application.Run(frm);

Operación basada en el tiempo

Puede usar los operadores de búfer para realizar operaciones basadas en tiempo.

Almacenar en búfer una secuencia observable significa que los valores de una secuencia observable se colocan en un búfer en función de un intervalo de tiempo especificado o por un umbral de recuento. Esto es especialmente útil en situaciones en las que se espera que la secuencia inserte una gran cantidad de datos y el suscriptor no tenga el recurso para procesar estos valores. Al almacenar en búfer los resultados en función del tiempo o el recuento, y devolver solo una secuencia de valores cuando se superen los criterios (o cuando se haya completado la secuencia de origen), el suscriptor puede procesar llamadas OnNext a su propio ritmo. 

En el ejemplo siguiente, primero se crea una secuencia simple de enteros para cada segundo. A continuación, usamos el operador Buffer y especificamos que cada búfer contendrá 5 elementos de la secuencia. Se llama a OnNext cuando el búfer está lleno. A continuación, se cuenta la suma del búfer mediante el operador Sum. El búfer se vacía automáticamente y comienza otro ciclo. La impresión estará 10, 35, 60… en la que 10=0+1+2+3+4, 35=5+6+7+8+9, etc.

var seq = Observable.Interval(TimeSpan.FromSeconds(1));
var bufSeq = seq.Buffer(5);
bufSeq.Subscribe(values => Console.WriteLine(values.Sum()));
Console.ReadKey();

También podemos crear un búfer con un intervalo de tiempo especificado. En el ejemplo siguiente, el búfer contendrá los elementos que se han acumulado durante 3 segundos. La impresión será 3, 12, 21... en el que 3=0+1+2, 12=3+4+5, etc.

var seq = Observable.Interval(TimeSpan.FromSeconds(1));
var bufSeq = seq.Buffer(TimeSpan.FromSeconds(3));
bufSeq.Subscribe(value => Console.WriteLine(value.Sum()));  
Console.ReadKey();

Tenga en cuenta que si usa búfer o ventana, debe asegurarse de que la secuencia no está vacía antes de filtrarla.

Operadores LINQ por categorías

En el tema Operadores LINQ by Categories se enumeran todos los operadores LINQ principales implementados por el tipo Observable por sus categorías; específicamente: creación, conversión, combinación, funcional, matemática, tiempo, excepciones, varios, selección y primitivos.

Consulte también

Referencia

Observable

Conceptos

Operadores LINQ por categorías