System.IO.Pipelines в .NET

System.IO.Pipelines — это библиотека, предназначенная для упрощения работы с высокопроизводительной операцией ввода-вывода в .NET. Это библиотека, предназначенная для .NET Standard, которая работает во всех реализациях .NET.

Библиотека доступна в пакете Nuget System.IO.Pipelines .

Какие задачи решает System.IO.Pipelines

Приложения, которые анализируют потоковые данные, состоят из стандартного кода, имеющего множество специализированных и необычных потоков кода. Стандартный и специальный код сложен, и его трудно поддерживать.

Возможности System.IO.Pipelines:

  • Высокопроизводительный анализ потоковых данных.
  • Упрощение кода.

Следующий код является типичным для TCP-сервера, который получает сообщения с разделенными строками (разделитель — '\n') от клиента:

async Task ProcessLinesAsync(NetworkStream stream)
{
    var buffer = new byte[1024];
    await stream.ReadAsync(buffer, 0, buffer.Length);

    // Process a single line from the buffer
    ProcessLine(buffer);
}

В предыдущем коде несколько проблем:

  • Не все сообщение (конец строки) может быть получено в одном вызове ReadAsync.
  • Игнорируется результат stream.ReadAsync. stream.ReadAsync возвращает объем считанных данных.
  • Он не обрабатывает случай, когда несколько строк считываются в одном вызове ReadAsync.
  • Он выделяет массив byte при каждом чтении.

Чтобы устранить предыдущие проблемы, необходимо внести следующие изменения:

  • Помещение входящих данных в буфер до тех пор, пока не будет найдена новая строка.

  • Синтаксический анализ всех строк, возвращенных в буфер.

  • Возможно, длина строки превышает 1 КБ (1024 байт). Код должен изменять размер входного буфера до тех пор, пока не будет найден разделитель, чтобы вместить всю строку в буфере.

    • Если размер буфера изменить, создаются дополнительные буферные копии, так как во входных данных отображаются более длинные строки.
    • Чтобы уменьшить объем неиспользуемого пространства, необходимо сжать буфер, используемый для чтения строк.
  • Рекомендуется использовать буферные пулы, чтобы избежать повторного выделения памяти.

  • В следующем коде решаются некоторые из этих проблем:

async Task ProcessLinesAsync(NetworkStream stream)
{
    byte[] buffer = ArrayPool<byte>.Shared.Rent(1024);
    var bytesBuffered = 0;
    var bytesConsumed = 0;

    while (true)
    {
        // Calculate the amount of bytes remaining in the buffer.
        var bytesRemaining = buffer.Length - bytesBuffered;

        if (bytesRemaining == 0)
        {
            // Double the buffer size and copy the previously buffered data into the new buffer.
            var newBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length * 2);
            Buffer.BlockCopy(buffer, 0, newBuffer, 0, buffer.Length);
            // Return the old buffer to the pool.
            ArrayPool<byte>.Shared.Return(buffer);
            buffer = newBuffer;
            bytesRemaining = buffer.Length - bytesBuffered;
        }

        var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, bytesRemaining);
        if (bytesRead == 0)
        {
            // EOF
            break;
        }

        // Keep track of the amount of buffered bytes.
        bytesBuffered += bytesRead;
        var linePosition = -1;

        do
        {
            // Look for a EOL in the buffered data.
            linePosition = Array.IndexOf(buffer, (byte)'\n', bytesConsumed,
                                         bytesBuffered - bytesConsumed);

            if (linePosition >= 0)
            {
                // Calculate the length of the line based on the offset.
                var lineLength = linePosition - bytesConsumed;

                // Process the line.
                ProcessLine(buffer, bytesConsumed, lineLength);

                // Move the bytesConsumed to skip past the line consumed (including \n).
                bytesConsumed += lineLength + 1;
            }
        }
        while (linePosition >= 0);
    }
}

Предыдущий код является сложным и не предназначен для устранения всех обнаруженных проблем. Высокопроизводительная сеть обычно означает написание сложного кода для повышения производительности. System.IO.Pipelines был разработан для упрощения написания этого типа кода.

Pipe

Класс Pipe можно использовать для создания пары PipeWriter/PipeReader. Все данные, записанные в PipeWriter, доступны в PipeReader:

var pipe = new Pipe();
PipeReader reader = pipe.Reader;
PipeWriter writer = pipe.Writer;

Основное использование канала

async Task ProcessLinesAsync(Socket socket)
{
    var pipe = new Pipe();
    Task writing = FillPipeAsync(socket, pipe.Writer);
    Task reading = ReadPipeAsync(pipe.Reader);

    await Task.WhenAll(reading, writing);
}

async Task FillPipeAsync(Socket socket, PipeWriter writer)
{
    const int minimumBufferSize = 512;

    while (true)
    {
        // Allocate at least 512 bytes from the PipeWriter.
        Memory<byte> memory = writer.GetMemory(minimumBufferSize);
        try
        {
            int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None);
            if (bytesRead == 0)
            {
                break;
            }
            // Tell the PipeWriter how much was read from the Socket.
            writer.Advance(bytesRead);
        }
        catch (Exception ex)
        {
            LogError(ex);
            break;
        }

        // Make the data available to the PipeReader.
        FlushResult result = await writer.FlushAsync();

        if (result.IsCompleted)
        {
            break;
        }
    }

     // By completing PipeWriter, tell the PipeReader that there's no more data coming.
    await writer.CompleteAsync();
}

async Task ReadPipeAsync(PipeReader reader)
{
    while (true)
    {
        ReadResult result = await reader.ReadAsync();
        ReadOnlySequence<byte> buffer = result.Buffer;

        while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line))
        {
            // Process the line.
            ProcessLine(line);
        }

        // Tell the PipeReader how much of the buffer has been consumed.
        reader.AdvanceTo(buffer.Start, buffer.End);

        // Stop reading if there's no more data coming.
        if (result.IsCompleted)
        {
            break;
        }
    }

    // Mark the PipeReader as complete.
    await reader.CompleteAsync();
}

bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line)
{
    // Look for a EOL in the buffer.
    SequencePosition? position = buffer.PositionOf((byte)'\n');

    if (position == null)
    {
        line = default;
        return false;
    }

    // Skip the line + the \n.
    line = buffer.Slice(0, position.Value);
    buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
    return true;
}

Существует два цикла:

  • FillPipeAsync считывает из Socket и выполняет запись в PipeWriter.
  • ReadPipeAsync считывает из PipeReader и анализирует входящие строки.

Буферы явно не выделяются. Управление всеми буферами делегируется реализациям PipeReader и PipeWriter. Делегирование управления буфером упрощает использование кода, чтобы сосредоточиться только на бизнес-логике.

В первом цикле:

  • PipeWriter.GetMemory(Int32) вызывается для получения памяти от базового модуля записи.
  • PipeWriter.Advance(Int32) вызывается, чтобы сообщить PipeWriter, сколько данных было записано в буфер.
  • PipeWriter.FlushAsync вызывается, чтобы сделать данные доступными для PipeReader.

Во втором цикле PipeReader использует буферы, записанные PipeWriter. Буферы поступают из сокета. Вызов PipeReader.ReadAsync:

  • Возвращает ReadResult, который содержит два важных элемента информации:

    • Данные, считанные в форме ReadOnlySequence<byte>.
    • Логическое значение IsCompleted, указывающее, достигнут ли конец данных (EOF).

После нахождения разделителя конца строки (EOL) и синтаксического анализа строки:

  • Логика обрабатывает буфер, чтобы пропустить уже обработанные данные.
  • PipeReader.AdvanceTo вызывается, чтобы сообщить PipeReader, сколько данных было обработано и проверено.

Циклы чтения и записи заканчиваются вызовом Complete. Complete позволяет базовому каналу освободить выделенную память.

Обратная реакция и управление потоком

В идеале чтение и анализ работают вместе:

  • Поток чтения потребляет данные из сети и помещает его в буферы.
  • Поток анализа отвечает за построение соответствующих структур данных.

Обычно синтаксический анализ занимает больше времени, чем копирование блоков данных из сети:

  • Поток чтения идет впереди потока синтаксического анализа.
  • Поток чтения должен либо замедлить работу, либо выделить больше памяти для хранения данных для потока синтаксического анализа.

Для оптимальной производительности необходим баланс между частыми паузами и выделением большего объема памяти.

Чтобы устранить описанную выше проблему, Pipe имеет два параметра для управления потоком данных:

  • PauseWriterThreshold: определяет, сколько данных следует буферизовать перед приостановкой вызовов FlushAsync .
  • ResumeWriterThreshold: определяет, сколько данных читатель должен наблюдать перед возобновлением вызовов PipeWriter.FlushAsync .

Схема с ResumeWriterThreshold и PauseWriterThreshold

PipeWriter.FlushAsync:

  • Возвращает неполный ValueTask<FlushResult>, если объем данных в Pipe пересекает PauseWriterThreshold.
  • Завершает ValueTask<FlushResult>, когда он становится меньше ResumeWriterThreshold.

Для предотвращения быстрого цикла, который может возникнуть при одном значении, используются два значения.

Примеры

// The Pipe will start returning incomplete tasks from FlushAsync until
// the reader examines at least 5 bytes.
var options = new PipeOptions(pauseWriterThreshold: 10, resumeWriterThreshold: 5);
var pipe = new Pipe(options);

PipeScheduler

Обычно при использовании async и асинхронном коде возобновляется либо в текущем, ни awaitв TaskScheduler текущем SynchronizationContext.

При выполнении операций ввода-вывода важно иметь точный контроль над местом выполнения операций ввода-вывода. Этот контроль позволяет эффективно использовать кэш ЦП. Эффективное кэширование является критически важным для высокопроизводительных приложений, таких как веб-серверы. PipeScheduler предоставляет контроль над тем, где выполняются асинхронные обратные вызовы. По умолчанию:

  • используется текущий SynchronizationContext.
  • Если SynchronizationContext отсутствует, он использует пул потоков для выполнения обратных вызовов.
public static void Main(string[] args)
{
    var writeScheduler = new SingleThreadPipeScheduler();
    var readScheduler = new SingleThreadPipeScheduler();

    // Tell the Pipe what schedulers to use and disable the SynchronizationContext.
    var options = new PipeOptions(readerScheduler: readScheduler,
                                  writerScheduler: writeScheduler,
                                  useSynchronizationContext: false);
    var pipe = new Pipe(options);
}

// This is a sample scheduler that async callbacks on a single dedicated thread.
public class SingleThreadPipeScheduler : PipeScheduler
{
    private readonly BlockingCollection<(Action<object> Action, object State)> _queue =
     new BlockingCollection<(Action<object> Action, object State)>();
    private readonly Thread _thread;

    public SingleThreadPipeScheduler()
    {
        _thread = new Thread(DoWork);
        _thread.Start();
    }

    private void DoWork()
    {
        foreach (var item in _queue.GetConsumingEnumerable())
        {
            item.Action(item.State);
        }
    }

    public override void Schedule(Action<object?> action, object? state)
    {
        if (state is not null)
        {
            _queue.Add((action, state));
        }
        // else log the fact that _queue.Add was not called.
    }
}

PipeScheduler.ThreadPool — это реализация PipeScheduler, которая позволяет поместить обратные вызовы в очередь пула потоков. PipeScheduler.ThreadPool является параметром по умолчанию и, как правило, лучшим выбором. PipeScheduler.Inline может привести к непредвиденным последствиям, например к взаимоблокировкам.

Сброс канала

Зачастую бывает эффективно повторно использовать объект Pipe. Чтобы сбросить канал, вызовите PipeReader Reset при завершении PipeReader и PipeWriter.

PipeReader

PipeReader управляет памятью от имени вызывающего объекта. Всегда вызывайте PipeReader.AdvanceTo после вызова PipeReader.ReadAsync. Это позволяет PipeReader узнать, когда вызывающий объект закончил использовать память, чтобы его можно было отследить. ReadOnlySequence<byte>, возвращенный из PipeReader.ReadAsync, допустим только до вызова PipeReader.AdvanceTo. Использование ReadOnlySequence<byte> после вызова PipeReader.AdvanceTo недопустимо.

PipeReader.AdvanceTo принимает два аргумента SequencePosition:

  • Первый аргумент определяет объем используемой памяти.
  • Второй аргумент определяет, какая часть буфера наблюдалась.

Если пометить данные как потребленные, канал сможет вернуть память в базовый буферный пул. Пометка данных как отмеченных управляет тем, что делает следующий вызов PipeReader.ReadAsync. Пометка всех элементов как отмеченных означает, что следующий вызов PipeReader.ReadAsync не вернется, пока в канал не будет записано больше данных. Любое другое значение заставит следующий вызов PipeReader.ReadAsync вернуться немедленно с отмеченными и неотмеченными, но не с уже потребленными данными.

Сценарии чтения потоковых данных

Существует несколько типичных шаблонов, которые возникают при попытке чтения потоковых данных:

  • При получении потока данных анализировать одно сообщение.
  • При получении потока данных анализировать все доступные сообщения.

В следующих примерах используется метод TryParseLines для синтаксического анализа сообщений из ReadOnlySequence<byte>. TryParseLines анализирует одно сообщение и обновляет входной буфер, чтобы обрезать проанализированное сообщение из буфера. TryParseLines не является частью .NET, это пользовательский письменный метод, используемый в следующих разделах.

bool TryParseLines(ref ReadOnlySequence<byte> buffer, out Message message);

Чтение одного сообщения

Следующий код считывает одно сообщение из PipeReader и возвращает его вызывающему объекту.

async ValueTask<Message?> ReadSingleMessageAsync(PipeReader reader,
 CancellationToken cancellationToken = default)
{
    while (true)
    {
        ReadResult result = await reader.ReadAsync(cancellationToken);
        ReadOnlySequence<byte> buffer = result.Buffer;

        // In the event that no message is parsed successfully, mark consumed
        // as nothing and examined as the entire buffer.
        SequencePosition consumed = buffer.Start;
        SequencePosition examined = buffer.End;

        try
        {
            if (TryParseLines(ref buffer, out Message message))
            {
                // A single message was successfully parsed so mark the start of the
                // parsed buffer as consumed. TryParseLines trims the buffer to
                // point to the data after the message was parsed.
                consumed = buffer.Start;

                // Examined is marked the same as consumed here, so the next call
                // to ReadSingleMessageAsync will process the next message if there's
                // one.
                examined = consumed;

                return message;
            }

            // There's no more data to be processed.
            if (result.IsCompleted)
            {
                if (buffer.Length > 0)
                {
                    // The message is incomplete and there's no more data to process.
                    throw new InvalidDataException("Incomplete message.");
                }

                break;
            }
        }
        finally
        {
            reader.AdvanceTo(consumed, examined);
        }
    }

    return null;
}

Предыдущий код:

  • анализирует одно сообщение.
  • Обновляет потребленный SequencePosition и изученный SequencePosition, чтобы указать на начало обрезанного входного буфера.

Два аргумента SequencePosition обновляются, так как TryParseLines удаляет проанализированное сообщение из входного буфера. Как правило, при синтаксическом анализе одного сообщения из буфера изученное расположение должно быть одним из следующих:

  • Конец сообщения.
  • Конец полученного буфера, если сообщение не найдено.

В случае с одним сообщением риск ошибок наиболее велик. Передача неверных значений в изученное может привести к исключению нехватки памяти или бесконечному циклу. Дополнительные сведения см. в разделе Распространенные проблемы PipeReader в этой статье.

Чтение нескольких сообщений

Следующий код считывает все сообщения из PipeReader и вызывает ProcessMessageAsync для каждого из них.

async Task ProcessMessagesAsync(PipeReader reader, CancellationToken cancellationToken = default)
{
    try
    {
        while (true)
        {
            ReadResult result = await reader.ReadAsync(cancellationToken);
            ReadOnlySequence<byte> buffer = result.Buffer;

            try
            {
                // Process all messages from the buffer, modifying the input buffer on each
                // iteration.
                while (TryParseLines(ref buffer, out Message message))
                {
                    await ProcessMessageAsync(message);
                }

                // There's no more data to be processed.
                if (result.IsCompleted)
                {
                    if (buffer.Length > 0)
                    {
                        // The message is incomplete and there's no more data to process.
                        throw new InvalidDataException("Incomplete message.");
                    }
                    break;
                }
            }
            finally
            {
                // Since all messages in the buffer are being processed, you can use the
                // remaining buffer's Start and End position to determine consumed and examined.
                reader.AdvanceTo(buffer.Start, buffer.End);
            }
        }
    }
    finally
    {
        await reader.CompleteAsync();
    }
}

Отмена

PipeReader.ReadAsync:

  • Поддерживает передачу CancellationToken.
  • Создает исключение OperationCanceledException, если CancellationToken отменяется при ожидании чтения.
  • Поддерживает способ отмены текущей операции чтения с помощью PipeReader.CancelPendingRead, что позволяет избежать исключения. Вызов PipeReader.CancelPendingRead приводит к тому, что текущий или следующий вызов PipeReader.ReadAsync возвращает ReadResult с IsCanceled со значением true. Это может быть полезно для остановки существующего цикла чтения без сбоев и исключений.
private PipeReader reader;

public MyConnection(PipeReader reader)
{
    this.reader = reader;
}

public void Abort()
{
    // Cancel the pending read so the process loop ends without an exception.
    reader.CancelPendingRead();
}

public async Task ProcessMessagesAsync()
{
    try
    {
        while (true)
        {
            ReadResult result = await reader.ReadAsync();
            ReadOnlySequence<byte> buffer = result.Buffer;

            try
            {
                if (result.IsCanceled)
                {
                    // The read was canceled. You can quit without reading the existing data.
                    break;
                }

                // Process all messages from the buffer, modifying the input buffer on each
                // iteration.
                while (TryParseLines(ref buffer, out Message message))
                {
                    await ProcessMessageAsync(message);
                }

                // There's no more data to be processed.
                if (result.IsCompleted)
                {
                    break;
                }
            }
            finally
            {
                // Since all messages in the buffer are being processed, you can use the
                // remaining buffer's Start and End position to determine consumed and examined.
                reader.AdvanceTo(buffer.Start, buffer.End);
            }
        }
    }
    finally
    {
        await reader.CompleteAsync();
    }
}

Распространенные проблемы PipeReader

  • Передача неверных значений в consumed или examined может привести к считыванию уже считанных данных.

  • Передача buffer.End как изученного может привести к следующему:

    • Остановленные данные
    • Может возникать исключение нехватки памяти, если данные не потребляются. Например, PipeReader.AdvanceTo(position, buffer.End) при обработке одного сообщения за раз из буфера.
  • Передача неверных значений в consumed или examined может привести к бесконечному циклу. Например, PipeReader.AdvanceTo(buffer.Start), если buffer.Start не изменился, приведет к тому, что следующий вызов PipeReader.ReadAsync вернется немедленно перед поступлением новых данных.

  • Передача неверных значений в consumed или examined может привести к бесконечной буферизации (и нехватке памяти в конечном итоге).

  • Использование ReadOnlySequence<byte> после вызова PipeReader.AdvanceTo может привести к повреждению памяти (используйте после освобождения).

  • Отсутствие вызова PipeReader.Complete/CompleteAsync может привести к утечке памяти.

  • Проверка ReadResult.IsCompleted и выход из логики чтения до обработки буфера приводит к потере данных. Условие выхода цикла должно основываться на ReadResult.Buffer.IsEmpty и ReadResult.IsCompleted. Неправильное выполнение этого действия может привести к бесконечному циклу.

Проблемный код

Потеря данных

ReadResult может возвращать окончательный сегмент данных, если IsCompleted имеет значение true. Если не считать эти данные до выхода из цикла чтения, данные будут утеряны.

Предупреждение

НЕ используйте следующий код. Использование этого примера приведет к утрате данных, зависанию и проблемам безопасности. Его НЕ следует копировать. Ниже приведен пример, поясняющий Распространенные проблемы с PipeReader.

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> dataLossBuffer = result.Buffer;

    if (result.IsCompleted)
        break;

    Process(ref dataLossBuffer, out Message message);

    reader.AdvanceTo(dataLossBuffer.Start, dataLossBuffer.End);
}

Предупреждение

НЕ используйте приведенный выше код. Использование этого примера приведет к утрате данных, зависанию и проблемам безопасности. Его НЕ следует копировать. Выше приведен пример, поясняющий Распространенные проблемы с PipeReader.

Бесконечный цикл

Следующая логика может привести к бесконечному циклу, если Result.IsCompleted равно true, но в буфере никогда нет полного сообщения.

Предупреждение

НЕ используйте следующий код. Использование этого примера приведет к утрате данных, зависанию и проблемам безопасности. Его НЕ следует копировать. Ниже приведен пример, поясняющий Распространенные проблемы с PipeReader.

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;
    if (result.IsCompleted && infiniteLoopBuffer.IsEmpty)
        break;

    Process(ref infiniteLoopBuffer, out Message message);

    reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}

Предупреждение

НЕ используйте приведенный выше код. Использование этого примера приведет к утрате данных, зависанию и проблемам безопасности. Его НЕ следует копировать. Выше приведен пример, поясняющий Распространенные проблемы с PipeReader.

Вот еще один фрагмент кода с такой же проблемой. Проверяется наличие непустого буфера перед проверкой ReadResult.IsCompleted. Так как он находится в else if, цикл будет повторяться бесконечно, если в буфере никогда не будет полного сообщения.

Предупреждение

НЕ используйте следующий код. Использование этого примера приведет к утрате данных, зависанию и проблемам безопасности. Его НЕ следует копировать. Ниже приведен пример, поясняющий Распространенные проблемы с PipeReader.

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> infiniteLoopBuffer = result.Buffer;

    if (!infiniteLoopBuffer.IsEmpty)
        Process(ref infiniteLoopBuffer, out Message message);

    else if (result.IsCompleted)
        break;

    reader.AdvanceTo(infiniteLoopBuffer.Start, infiniteLoopBuffer.End);
}

Предупреждение

НЕ используйте приведенный выше код. Использование этого примера приведет к утрате данных, зависанию и проблемам безопасности. Его НЕ следует копировать. Выше приведен пример, поясняющий Распространенные проблемы с PipeReader.

Неответственное приложение

Без каких-то условий вызов PipeReader.AdvanceTo с buffer.End examined позицией может привести к тому, что приложение не отвечает при анализе одного сообщения. Следующий вызов PipeReader.AdvanceTo вернет значение только в следующих условиях:

  • В канал записано больше данных.
  • Новые данные не были проверены ранее.

Предупреждение

НЕ используйте следующий код. Использование этого примера приведет к утрате данных, зависанию и проблемам безопасности. Его НЕ следует копировать. Ниже приведен пример, поясняющий Распространенные проблемы с PipeReader.

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> hangBuffer = result.Buffer;

    Process(ref hangBuffer, out Message message);

    if (result.IsCompleted)
        break;

    reader.AdvanceTo(hangBuffer.Start, hangBuffer.End);

    if (message != null)
        return message;
}

Предупреждение

НЕ используйте приведенный выше код. Использование этого примера приведет к утрате данных, зависанию и проблемам безопасности. Его НЕ следует копировать. Выше приведен пример, поясняющий Распространенные проблемы с PipeReader.

Нехватка памяти

В следующих случаях приведенный ниже код сохраняет буферизацию до тех пор, пока не произойдет OutOfMemoryException:

  • Максимальный размер сообщения не указан.
  • Данные, возвращенные из PipeReader, не составляют полное сообщение. Например, они не составляют полное сообщение, поскольку другая сторона записывает большое сообщение (например, сообщение размером 4 ГБ).

Предупреждение

НЕ используйте следующий код. Использование этого примера приведет к утрате данных, зависанию и проблемам безопасности. Его НЕ следует копировать. Ниже приведен пример, поясняющий Распространенные проблемы с PipeReader.

Environment.FailFast("This code is terrible, don't use it!");
while (true)
{
    ReadResult result = await reader.ReadAsync(cancellationToken);
    ReadOnlySequence<byte> thisCouldOutOfMemory = result.Buffer;

    Process(ref thisCouldOutOfMemory, out Message message);

    if (result.IsCompleted)
        break;

    reader.AdvanceTo(thisCouldOutOfMemory.Start, thisCouldOutOfMemory.End);

    if (message != null)
        return message;
}

Предупреждение

НЕ используйте приведенный выше код. Использование этого примера приведет к утрате данных, зависанию и проблемам безопасности. Его НЕ следует копировать. Выше приведен пример, поясняющий Распространенные проблемы с PipeReader.

Повреждение памяти

При написании вспомогательных методов, считывающих буфер, все возвращенные полезные данные должны быть скопированы перед вызовом Advance. В следующем примере возвращается память, которая была удалена Pipe и может использоваться повторно для следующей операции (чтение или запись).

Предупреждение

НЕ используйте следующий код. Использование этого примера приведет к утрате данных, зависанию и проблемам безопасности. Его НЕ следует копировать. Ниже приведен пример, поясняющий Распространенные проблемы с PipeReader.

public class Message
{
    public ReadOnlySequence<byte> CorruptedPayload { get; set; }
}
    Environment.FailFast("This code is terrible, don't use it!");
    Message message = null;

    while (true)
    {
        ReadResult result = await reader.ReadAsync(cancellationToken);
        ReadOnlySequence<byte> buffer = result.Buffer;

        ReadHeader(ref buffer, out int length);

        if (length <= buffer.Length)
        {
            message = new Message
            {
                // Slice the payload from the existing buffer
                CorruptedPayload = buffer.Slice(0, length)
            };

            buffer = buffer.Slice(length);
        }

        if (result.IsCompleted)
            break;

        reader.AdvanceTo(buffer.Start, buffer.End);

        if (message != null)
        {
            // This code is broken since reader.AdvanceTo() was called with a position *after* the buffer
            // was captured.
            break;
        }
    }

    return message;
}

Предупреждение

НЕ используйте приведенный выше код. Использование этого примера приведет к утрате данных, зависанию и проблемам безопасности. Его НЕ следует копировать. Выше приведен пример, поясняющий Распространенные проблемы с PipeReader.

PipeWriter

PipeWriter управляет буферами для записи от имени вызывающего объекта. PipeWriter реализует IBufferWriter<byte>. IBufferWriter<byte> позволяет получить доступ к буферам для выполнения операций записи без дополнительных копий буфера.

async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
    // Request at least 5 bytes from the PipeWriter.
    Memory<byte> memory = writer.GetMemory(5);

    // Write directly into the buffer.
    int written = Encoding.ASCII.GetBytes("Hello".AsSpan(), memory.Span);

    // Tell the writer how many bytes were written.
    writer.Advance(written);

    await writer.FlushAsync(cancellationToken);
}

Предыдущий код:

  • Запрашивает буфер длиной не менее 5 байт у PipeWriter с помощью GetMemory.
  • Записывает байты для строки ASCII "Hello" в возвращенный Memory<byte>.
  • Вызывает Advance, чтобы указать, сколько байтов было записано в буфер.
  • Очищает PipeWriter, который отправляет байты на базовое устройство.

Предыдущий метод записи использует буферы, предоставленные PipeWriter. Он также мог бы использовать PipeWriter.WriteAsync, что:

  • Копирует существующий буфер в PipeWriter.
  • Вызывает GetSpan, выполняет Advance надлежащим образом и вызывает FlushAsync.
async Task WriteHelloAsync(PipeWriter writer, CancellationToken cancellationToken = default)
{
    byte[] helloBytes = Encoding.ASCII.GetBytes("Hello");

    // Write helloBytes to the writer, there's no need to call Advance here
    // (Write does that).
    await writer.WriteAsync(helloBytes, cancellationToken);
}

Отмена

FlushAsync поддерживает передачу CancellationToken. Передача CancellationToken приводит к исключению OperationCanceledException, если маркер отменяется в ожидании освобождения. PipeWriter.FlushAsync поддерживает способ отмены текущей операции освобождения с помощью PipeWriter.CancelPendingFlush, не вызывая исключение. Вызов PipeWriter.CancelPendingFlush приводит к тому, что текущий или следующий вызов PipeWriter.FlushAsync или PipeWriter.WriteAsync возвращает FlushResult с IsCanceled со значением true. Это может быть полезно для остановки освобождения без сбоев и исключений.

Распространенные проблемы PipeWriter

  • GetSpan и GetMemory возвращают буфер по крайней мере с запрошенным объемом памяти. Не рассчитывайте на точный размер буфера.
  • Нет никакой гарантии, что последовательные вызовы будут возвращать один и тот же буфер или буфер того же размера.
  • Чтобы продолжить запись дополнительных данных, необходимо запросить новый буфер после вызова Advance. Запись в ранее полученный буфер невозможна.
  • Вызов GetMemory или GetSpan при наличии незавершенного вызова FlushAsync не является надежным.
  • Вызов Complete или CompleteAsync при наличии неосвобожденных данных может привести к повреждению памяти.

Советы по использованию PipeReader и PipeWriter

Следующие советы помогут вам успешно использовать System.IO.Pipelines классы:

  • Всегда заполняйте PipeReader и PipeWriter, включая исключение, в котором применимо.
  • Всегда вызывайте PipeReader.AdvanceTo после вызова PipeReader.ReadAsync.
  • Периодически при написании await PipeWriter.FlushAsync и всегда проверяйте FlushResult.IsCompleted. Прерывание записи, если IsCompleted это trueозначает, что читатель завершен и больше не заботится о том, что пишется.
  • Вызовите PipeWriter.FlushAsync после написания чего-то, к которому вы хотите PipeReader получить доступ.
  • Не вызывайте FlushAsync , если читатель не может начать работу до FlushAsync завершения, так как это может привести к взаимоблокировке.
  • Убедитесь, что только один контекст владеет PipeReader или PipeWriter обращается к ним. Эти типы не являются потокобезопасными.
  • Никогда не обращаться к телефону ReadResult.Buffer после вызова AdvanceTo или завершения PipeReaderработы.

IDuplexPipe

IDuplexPipe является контрактом для типов, поддерживающих чтение и запись. Например, сетевое подключение будет представлено IDuplexPipe.

В отличие от Pipe, который содержит PipeReader и PipeWriter, IDuplexPipe представляет собой одну сторону полного дуплексного подключения. Это означает, что запись в PipeWriter не будет считываться из PipeReader.

Потоки

При чтении или записи потоковых данных данные обычно считываются с помощью десериализатора и записываются с помощью сериализатора. Большая часть API потока чтения и записи имеет параметр Stream. Чтобы упростить интеграцию с существующими API, PipeReader и PipeWriter предоставляют AsStream. AsStream возвращает реализацию Stream на основе PipeReader или PipeWriter.

Примеры потоков

Экземпляры PipeReader и PipeWriter могут быть созданы с помощью статических методов Create, для которых задан объект Stream и необязательные соответствующие параметры создания.

StreamPipeReaderOptions позволяют контролировать создание экземпляра PipeReader со следующими параметрами:

  • StreamPipeReaderOptions.BufferSize — минимальный размер буфера в байтах, используемый при аренде памяти из пула, и значение по умолчанию 4096.
  • Флаг StreamPipeReaderOptions.LeaveOpen определяет, остается ли базовый поток открытым после завершения PipeReader, и его значение по умолчанию — false.
  • StreamPipeReaderOptions.MinimumReadSize представляет пороговое значение оставшихся байтов в буфере до выделения нового буфера, и его значение по умолчанию — 1024.
  • StreamPipeReaderOptions.Pool — это MemoryPool<byte>, используемый при выделении памяти, а значение по умолчанию — null.

StreamPipeWriterOptions позволяют контролировать создание экземпляра PipeWriter со следующими параметрами:

  • Флаг StreamPipeWriterOptions.LeaveOpen определяет, остается ли базовый поток открытым после завершения PipeWriter, и его значение по умолчанию — false.
  • StreamPipeWriterOptions.MinimumBufferSize представляет минимальный размер буфера, используемый при аренде памяти из Pool, и его значение по умолчанию — 4096.
  • StreamPipeWriterOptions.Pool — это MemoryPool<byte>, используемый при выделении памяти, а значение по умолчанию — null.

Внимание

При создании экземпляров PipeReader и PipeWriter с помощью методов Create необходимо учитывать время существования объекта Stream. Если требуется доступ к потоку после того, как модуль чтения или записи завершит работу, необходимо установить для флага LeaveOpen значение true для параметров создания. В противном случае поток будет закрыт.

В следующем коде показано создание экземпляров PipeReader и PipeWriter с помощью методов Create из потока.

using System.Buffers;
using System.IO.Pipelines;
using System.Text;

class Program
{
    static async Task Main()
    {
        using var stream = File.OpenRead("lorem-ipsum.txt");

        var reader = PipeReader.Create(stream);
        var writer = PipeWriter.Create(
            Console.OpenStandardOutput(), 
            new StreamPipeWriterOptions(leaveOpen: true));

        WriteUserCancellationPrompt();

        var processMessagesTask = ProcessMessagesAsync(reader, writer);
        var userCanceled = false;
        var cancelProcessingTask = Task.Run(() =>
        {
            while (char.ToUpperInvariant(Console.ReadKey().KeyChar) != 'C')
            {
                WriteUserCancellationPrompt();
            }

            userCanceled = true;

            // No exceptions thrown
            reader.CancelPendingRead();
            writer.CancelPendingFlush();
        });

        await Task.WhenAny(cancelProcessingTask, processMessagesTask);

        Console.WriteLine(
            $"\n\nProcessing {(userCanceled ? "cancelled" : "completed")}.\n");
    }

    static void WriteUserCancellationPrompt() =>
        Console.WriteLine("Press 'C' to cancel processing...\n");

    static async Task ProcessMessagesAsync(
        PipeReader reader,
        PipeWriter writer)
    {
        try
        {
            while (true)
            {
                ReadResult readResult = await reader.ReadAsync();
                ReadOnlySequence<byte> buffer = readResult.Buffer;

                try
                {
                    if (readResult.IsCanceled)
                    {
                        break;
                    }

                    if (TryParseLines(ref buffer, out string message))
                    {
                        FlushResult flushResult =
                            await WriteMessagesAsync(writer, message);

                        if (flushResult.IsCanceled || flushResult.IsCompleted)
                        {
                            break;
                        }
                    }

                    if (readResult.IsCompleted)
                    {
                        if (!buffer.IsEmpty)
                        {
                            throw new InvalidDataException("Incomplete message.");
                        }
                        break;
                    }
                }
                finally
                {
                    reader.AdvanceTo(buffer.Start, buffer.End);
                }
            }
        }
        catch (Exception ex)
        {
            Console.Error.WriteLine(ex);
        }
        finally
        {
            await reader.CompleteAsync();
            await writer.CompleteAsync();
        }
    }

    static bool TryParseLines(
        ref ReadOnlySequence<byte> buffer,
        out string message)
    {
        SequencePosition? position;
        StringBuilder outputMessage = new();

        while(true)
        {
            position = buffer.PositionOf((byte)'\n');

            if (!position.HasValue)
                break;

            outputMessage.Append(Encoding.ASCII.GetString(buffer.Slice(buffer.Start, position.Value)))
                        .AppendLine();

            buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
        };

        message = outputMessage.ToString();
        return message.Length != 0;
    }

    static ValueTask<FlushResult> WriteMessagesAsync(
        PipeWriter writer,
        string message) =>
        writer.WriteAsync(Encoding.ASCII.GetBytes(message));
}

Приложение использует StreamReader для чтения файла lorem-ipsum.txt в виде потока, и оно должно заканчиваться пустой строкой. FileStream передается в PipeReader.Create, который создает экземпляр объекта PipeReader. Затем консольное приложение передает стандартный выходной поток в PipeWriter.Create с помощью Console.OpenStandardOutput(). Пример поддерживает отмену.