Introduction to System.Threading.Channels

Problems of producer/consumer are all around us, in every sphere of our lives. A fast food line cook cuts tomatoes and then passes them to another cook to make a hamburger. The register worker will fulfill your order. You happily eat the burger. You may see postal drivers delivering mail along their routes. If you’re home at night, you might check your mailbox later to make sure. A flight attendant unloads suitcases from a plane’s cargo hold and places them on a conveyor belt. Then another employee transfers them to a van, which then drives them to another conveyor that will transport them to you. A happy couple is getting ready to send their invites. One partner addresses an envelope, and the other hand it to the other.

Software developers often see everyday happenings in our software. “Producer/consumer” problems are no exception. Anyone who has piped together commands from a command line has used producer/consumer. The stdout of one program is fed as the stdin. Anybody who has launched multiple workers to calculate discrete values, or download data from multiple sources has used producer/consumer. A consumer aggregates the results and displays them for further processing. Anybody who has tried to parallelize an entire pipeline has explicitly used producer/consumer. So on.

These scenarios, in real life or in software, all have one thing in common: they all use a vehicle to transfer the results from the producer back to the consumer. Fast food worker places the burgers on a stand, which the register worker pulls out to fill the customer’s bag. The postal worker puts mail in a mailbox. To transfer the materials between the engaged couple, the postal worker places mail into a mailbox. A hand-off in software requires some data structure to facilitate the transaction. This can be used by the producer to transfer results and possibly buffer more. It also allows the consumer to be notified when one or more results have been made available. Enter System.Threading.Channels.

What is a Channel?

Sometimes, it is easier to grasp technology when I implement it myself. This allows me to learn about the problems that implementers of the technology might have to face, the trade-offs they had to make, as well as the best way to use the functionality. To that end, let’s start learning about System.Threading.Channels by implementing a “channel” from scratch.

A channel is simply a data format that stores data that consumers can retrieve. It also allows for safe synchronization and appropriate notifications in both directions. There are many design options available. Is it possible for a channel to store an unlimited number of items? What should you do if it is full? Performance is critical. Is it possible to reduce synchronization? Are there any assumptions that allow us to make about the number of consumers and producers allowed simultaneously? To quickly create a channel, we will assume that there is no need to set any specific limit and that overheads are not an issue. A simple API will also be created.

We need to know our type first. To that we will add some simple methods

Public sealed class Channel
{ 
  public Void Write(T value); 
  public ValuTask ReadAsync(CancellationToken cancellationToken = default); 
}

Our Write Method gives us a way to create data into the channel using a method that we can use. ReadAsync Method gives us a way to consume it. Because our channel is unlimited, data will be produced into it successfully and synchronously. This is just like calling. Add On a Liste We have made it non-asynchronous, void-returning and therefore, Our method of consuming is, however, ReadAsyncThis is because data may not be available at the moment we need it. We’ll have to wait until it arrives if there is no data available. While we don’t care about performance in the initial design, we do not want excessive overheads. Our expectations are that we will be reading often, and to read data when it is available, so our overheads won’t be excessive. ReadAsync Method returns a ValueTask Instead of a Task So that it can be made allocation-free once it is completed synchronously.

We now need to implement the two methods. We’ll start by adding two fields to our type. One to store the product and one to coordinate between producers and consumers.

Private readonly ConcurrentQueue _queue = new ConcurrentQueue(); 
Private Read Only SemaphoreSlim = New SemaphoreSlim(0)

We use a ConcurrentQueue The data can be stored, which eliminates the need to lock the buffering data structure. ConcurrentQueue It is thread-safe enough for all producers to have simultaneous access as well as any number of consumers. We use a SempahoreSlim To coordinate between consumers and producers, and to notify consumers who might be waiting for additional information to arrive.

The Write procedure is very simple. It simply needs to “release” the SemaphoreSlim data and store it in the queue.

public void Write(T value) _queue.Enqueue(value); 
// store the data _semaphore.Release(); 
// notify any consumers that more data is available

Our ReadAsync process is nearly as easy. It will wait for the data to become available, then it will take it out.

ValueTask public async ReadAsync(CancellationToken cancellationToken = default) await _semaphore.WaitAsync(cancellationToken).ConfigureAwait(false); 
// wait bool gotOne = _queue.TryDequeue(out T item); 
// retrieve the data Debug.Assert(gotOne); return item;

We note that no code can manipulate the queue or the semaphore. Once we have successfully waited on it, we know that the queue will have data. This is why we can simply assert that the TryDequeue method returned one. These assumptions could change, so this implementation would have to be more complex.

That’s all. We now have our basic channel. This implementation works well if you only need the core features described here. However, there are more important requirements, in terms of performance as well as APIs that enable more scenarios.

Now that we understand the basics of what a channel provides, we can switch to looking at the actual System.Threading.Channel APIs.

Introducing System.Threading.Channels

The core abstractions exposed from the System.Threading.Channels library are a writer:

ChannelWriter public abstract class
{
  public abstract valueTask: WriteAsync (T item, CancellationToken cancellationToken = default) 
  WaitToWriteAsync(CancellationToken cancellationToken = default); 
  public void Complete(Exception error); 
  public virtual bool TryComplete(Exception error); 
}

A reader

ChannelReader public abstract class
{ 
  public abstract Bool TryRead(outT item); 
  public virtual ValueTask ReadAsync(CancellationToken cancellationToken = default) 
  public abstract ValueTask WaitToReadAsync(CancellationToken cancellationToken = default); 
  public virtual IAsyncEnumerable ReadAllAsync([EnumeratorCancellation] CancellationToken cancellationToken = default); 
  public virtual Task Completion get; 
}

We are familiar with most of the API surface area, having just completed our channel design and implementation. ChannelWriter Provides a TryWrite This method is very similar to the Write method, but it’s abstracted and a Try method that returns an answer. Boolean To account for the possibility that some implementations might be limited in the number of items they can store and if there was a channel full enough that writing could not occur simultaneously, TryWrite Would need to return False To indicate that writing was unsuccessful. However, ChannelWriter Also, the WriteAsync Method; In such cases where the channel is full, writing would be required to wait (often called “back pressure”). WriteAsync Can be used with the producer waiting for the result WriteAsync You will only be allowed to continue if there is enough room.

There are times when code might not want to immediately produce a result. For example, if the value is too expensive or if it represents a costly resource (e.g. a large object that takes up much of the memory or has many open files), the producer may delay producing the value until it is certain it will succeed immediately. WaitToWriteAsync is available for this and other situations. Producers can wait for WaitToWriteAsync to return true and then decide to produce a value that TryWrites, or WriteAsyncs for the channel.

WriteAsync can be virtual. While some implementations might choose to offer a more optimized implementation than others, with abstract TryWrite or WaitToWriteAsync the base type can offer a reasonable implementation that is slightly less complex than these:

public async ValueTask WriteAsync(T item, CancellationToken cancellationToken) while (await WaitToWriteAsync(cancellationToken).ConfigureAwait(false)) if (TryWrite(item)) return; 
throw new ChannelCompletedException();

It also shows how WaitToWriteAsync And TryWrite This highlights some additional interesting facts. The while loop is used because channels can be used simultaneously by both producers and consumers. Two threads can be told via the while loop “yes, there is space”, if a channel has a limit on the number of items it can store. WaitToWriteAsync However, it is possible for one of them to lose the race condition and to have TryWrite Return False This is why it is necessary to keep trying again and again. This is another example of why WaitToWriteAsync Returns a ValueTask Instead of only ValueTask These include situations that exceed a buffer. TryWrite Could be back False. The notion of completion is supported by channels. A producer can signal to consumers that no more items will be produced. This allows them to gracefully end their attempts to consume. This is accomplished via the Complete Or TryComplete Methods that were previously demonstrated ChannelWriter ( Complete It is only implemented to call TryComplete Throw it if it comes back False ). However, if one producer considers the channel complete, all producers should be aware that they are not welcome to post into the channel. TryWrite Returns False, WaitToWriteAsync Also, returns False And WriteAsync throws a ChannelCompletedException.

The majority of members are ChannelReader They are also likely to be self-explanatory. TryRead It will attempt to extract the next element from each channel synchronously, and return whether it succeeded or not. ReadAsync It will also extract the next element in the channel. However, if the element is not synchronized, it will return a task. And WaitToReadAsync Returns a ValueTask This serves as an indicator that the element is available for consumption. Similar to ChannelWriter ‘s WriteAsync, ReadAsync Virtual, the base implementation is possible in terms of the abstract TryRead And WaitToReadAsync This is not the exact implementation of the base class, but it is close.

ValueTask public async ReadAsync(CancellationToken cancellationToken) 
while (true) if (!await WaitToReadAsync(cancellationToken).ConfigureAwait(false)) throw new ChannelClosedException(); 
if (TryRead(out T item)) return item;

There are many ways to consume food from one source. ChannelReader. One way to view a channel as an endless stream of value is to simply consume via infinity. ReadAsync :

while (true) T item = await channelReader.ReadAsync(); Use(item); 

However, the stream of values may not be infinite, and the channel will be marked complete at some point. After consumers have cleared the channel of all data, subsequent attempts to ReadAsync will throw. TryRead and WaitToReadAsync will return false. A nested loop is a common way to consume alcohol.

while (await channelReader.WaitToReadAsync()) 
while (channelReader.TryRead(out T item)) Use(item);

Although the inner “while” could be a simple “if”, the tight inner loop allows a cost-conscious developer avoid small overheads. WaitToReadAsync If an item is available, TryRead will consume it. This is actually the exact method used by the ReadAllAsync method. ReadAllAsync .NET Core 3.0 introduced this feature, and it returns an IAsyncEnumerable. It allows all data to be read via a channel that uses familiar language constructs.

await foreach (T item in channelReader.ReadAllAsync()) Use(item);

The base implementation of the virtual method uses the same pattern nested loop pattern as the WaitToReadAsync or TryRead.

IAsyncEnumerable public virtual async ReadAllAsync( [EnumeratorCancellation] CancellationToken cancellationToken = default) 
while (await WaitToReadAsync(cancellationToken).ConfigureAwait(false)) 
while (TryRead(out T item)) yield return item;

The last member of ChannelReader Is Completion. This just returns a Task This will make the channel complete once the channel reader has been completed.

Built-In Channel Implementations

Okay, we’re able to read from readers and write to them… but where can we find those readers and writers?

The Channel Type exposes a Writer property, and a Reader property which returns a ChannelWriter And a ChannelReader, respectively:

public abstract class Channel
{ 
  public ChannelReaderReader get; 
  public ChannelWriter Writer get; 
}

This base abstract class can be used for niche use cases, where a channel may transform written data into another type for consumption. However, the vast majority of use cases have TWrite as well as TRead being identical. That is why most use occurs via the derived Channel type which is basically:

public abstract class ChannelChannel{ }

This non-generic channel type allows for factories to be used in multiple implementations. Channel:

{
  public static class Channel Public static Class Channel public Static ChannelCreateUnbounded(); 
  public static ChannelCreateUnbounded(UnboundedChannelOptions options); 
  public static Channel CreateBounded(int capacity); 
  public stat Channel CreateBounded(BoundedChannelOptions options); 
}

The CreateUnbounded Method creates a channel that allows for unlimited storage. However, it is possible to store more than one item at a time. Liste It is very similar to the Channel-like type we used at the beginning. It’s TryWrite Will always return True Both it’s WriteAsync Its. WaitToWriteAsync Will always be completed synchronously.

The CreateBounded method, on the other hand, creates a channel with a limit that is explicitly maintained by the implementation. Just like CreateUnbounded before reaching this limit, TryWrite will return true, and WriteAsync or WaitToWriteAsync both will finish synchronously. However, TryWrite won’t return falseWriteAsync or WaitToWriteAsync both will finish asynchronously.

They will only complete their tasks when there is enough space, or if another producer signals that the channel has finished. It should be noted that these APIs accept a CancellationToken and can be interrupted by cancellation.

Both CreateUnbounded and CreateBounded have overloads that accept a ChannelOptions-derived type. The base channelOptions allows for options to control the behavior of any channel. It exposes SingleWriter, SingleReader, and other properties that allow creators to specify constraints they are willing to accept. A creator can set SingleWriter to true so that only one producer has access to the writer, and SingleReader to true so that only one consumer has access to the reader at any given time. Factory methods can then specialize the implementation created by the creator, optimizing it according to the provided options.

For example, CreateUnbounded sets SingleWriter to true to indicate that only one producer will be accessing the writer at a time, and singleReader to true to indicate that only one consumer will have access to the reader at a time. This implementation not only avoids locks while reading but also avoids interlocked operations during reading, greatly reducing overheads The base ChannelOptions also exposes an AllowSynchronousContinuations property. This property is similar to SingleReader or SingleWriter.

A creator can set it to true to get some optimizations. These optimizations have significant implications on how code is produced and consumed. Specifically, AllowSynchronousContinuations in a sense allows a producer to temporarily become a consumer.

Let’s suppose there is no data in a channel. A consumer calls ReadAsync. The consumer hooks up a callback that will be invoked when data has been written to the channel by waiting for the task to be returned from . This callback will by default be invoked asynchronously. The producer writes the data to the channel, then queues the invocation of the callback. This allows the producer and consumer to simultaneously go about their business while the consumer is being processed by another thread. In some cases, however, it might be beneficial for performance for the producer writing the data and then processing the callback itself, e.g.

It invokes the callback by itself, rather than TryWrite waiting for the invocation. This can significantly cut down on overheads, but also requires great understanding of the environment, as, for example, if you were holdling a lock while calling TryWrite, with AllowSynchronousContinuations set to true, you might end up invoking the callback while holding your lock, which (depending on what the callback tried to do) could end up observing some broken invariants your lock was trying to maintain.

The BoundedChannelOptions passed to CreateBounded layers on additional options specific to bounding. In addition to the maximum capacity supported by the channel, it also exposes a BoundedChannelFullMode enum that indicates the behavior writes should experience when the channel is full:

public enum BoundedChannelFullMode  Wait, DropNewest, DropOldest, DropWrite

Wait is the default mode. This has the semantics discussed above: TryWrite on full channels returns false. WriteAsync returns a task that can only be completed when there is enough space available. WaitToWriteAsync also returns a task that can only be completed when space becomes available. Instead, the other three modes allow writes to complete synchronously and drop an element if there is not enough space. DropOldest will remove an “oldest” item from the queue (wall-clock time), meaning that the next element to be dequeued would be removed by a consumer. DropNewest on the other hand will remove the newest item. This is the element that was written most recently to the channel. DropWrite drops any item that is currently being written. TryWrite will return true, but the item will be immediately removed.

Performance

This is the API perspective. The library’s abstractions are very simple which is a big part of its power. A few simple implementations and abstracts should be sufficient to meet 99.9% of developers’ use cases. Although the library’s surface might seem simple, it is actually quite complex in its implementation. The implementation is complex, with a lot of focus on high throughput and simple consumption patterns. For example, the implementation takes great care to minimize allocations. Many of the surface area methods have a return. ValueTask And ValueTask Instead of Task And Task. We can use, as we have seen in the trivial example at the beginning of this article. ValueTask to avoid allocations when methods complete synchronously, but the System.Threading.Channels implementation also takes advantage of the advanced IValueTaskSource And IValueTaskSource Interfaces can be used to prevent allocations, even if the different methods are completed synchronously and return tasks.

This is a benchmark:

using BenchmarkDotNet.Attributes; 
using BenchmarkDotNet.Running; 
using System.Threading.Channels; 
using System.Threading.Tasks; [MemoryDiagnoser] 
public class Program 
{ 
  static void Main() => BenchmarkRunner.Run(); 
  private readonly Channel s_channel = Channel.CreateUnbounded
  {(); 
    [Benchmark] public async Task WriteThenRead() (); 
    [Benchmark] public Async Task WriteThenRead() ChannelWriterwriter = s_channel.Writer reader = s_channel.Reader; 
    for (int i = 0; i < 10_000_000; i++) writer.TryWrite(i); 
    await reader.ReadAsync(); 
   } 
}

This is a test of the throughput and memory allocation for an unbounded channel. When writing an element, and then reading it out 10 million times, this means that an element will always have the ability to be read. The following results were obtained on my machine. (The 72 bytes in the Allocated column are for WriteThenRead’s single Task).

System.Threading.Task

Let’s make it a little more simple: first issue the read, then write the element that will fulfill it. This will ensure that reads are completed asynchronously, since the data required to complete them is not available.

Using BenchmarkDotNet.Attributes; 
using BenchmarkDotNet.Running; 
using System.Threading.Channels; 
using System.Threading.Tasks; [MemoryDiagnoser] p
ublic class Program 
{ 
  static void Main() => BenchmarkRunner.Run(); 
  private readonly Channel s_channel = Channel.CreateUnbounded
  {(); [Benchmark] 
    public async Task ReadThenWrite() (); [Benchmark] 
    public Async Task ReadThenWrite() ChannelWriterwriter = s_channel.Writer reader = s_channel.Reader; 
    for (int I = 0; I < 10_000_000; i++) 
    { 
      ValueTask vt = reader.ReadAsync(); 
      writer.TryWrite(i); await vt; 
    }
  } 
}

This is what I got after running it for 10,000,000 writes and readings.

ReadThenWrite

So, there’s some more overhead when every read completes asynchronously, but even here we see zero allocations for the 10 million asynchronously-completing reads (again, the 72 bytes shown in the Allocated column is for the Task returned from ReadThenWrite)!

Combinators

The majority of channels can be consumed using one of these methods. However, it is possible to execute different types of operations across channels in order to achieve a specific goal. Let’s take, for example, that I want to wait until the first element arrives from one of two readers. I could write this:

Public static async ValueTask> WhenAnyChannelReaderChannelReader reader1 reader2) 
{
  var cts = new CancellationTokenSource(); 
  Task t1 = reader1.WaitToReadAsync(cts.Token).AsTask(); 
  Task t2 = reader2.WaitToReadAsync(cts.Token).AsTask(); 
  TaskCompleted = Await Task.WhenAny(t1,t2); cts.Cancel(). Return Complete == t1 reader1 : reader2; 
}

We’re only calling here WaitToReadAsync Both channels and return the reader for the one that is completed first. This example has one interesting thing to notice. ChannelReader Many similarities exist between them. IEnumerator This example is not a good idea. IEnumerator (or IAsyncEnumerator ). IAsyncEnumerator Exposes a MoveNextAsync Method, which moves your cursor to the next item. This is where you can see it. Current. We could implement this if we tried. WhenAny On top of IAsyncEnumerator Invoke the following: MoveNextAsync Each. We could move each item ahead by doing this. We could end up with missing items if we used the looped method, as we might have advanced an enumerator we didn’t return to.

Relationship to the rest.NET Core

System.Threading.Channels is part of the .NET Core shared framework, meaning a .NET Core app can start using it without installing anything additional. You can also download it as a NuGet package. However, the separate implementation does not have the same optimizations as the built-in one. This is due to the fact that the built-in implementation can take advantage of additional library and runtime support in.NET Core.

It is also used in a number of other systems within.NET. ASP.NET, for example, uses channels in SignalR and its Libuv-based Kestrel transportation. The upcoming QUIC implementation for.NET 5 will also use channels.

If you squint, the System.Threading.Channels library also looks a bit similar to the System.Threading.Tasks.Dataflow library that’s been available with .NET for years. The dataflow library can be thought of as a superset or the channels library. In particular, it includes the BufferBlock The dataflow library provides much of the same functionality as type. The dataflow library is focused on a different programming paradigm. It links blocks together so that data flows from one to another. Advanced functionality is also included, such as a two-phase commit that allows multiple blocks to be linked to the same consumer and consumers to be able to atomically withdraw from multiple blocks without deadlocking. These mechanisms are more complex and more costly, but they are more powerful. You can see this by simply writing the same benchmark. BufferBlock As we did for Channels.

using BenchmarkDotNet.Attributes; 
using BenchmarkDotNet.Running; 
using System.Threading.Channels; 
using System.Threading.Tasks; 
using System.Threading.Tasks.Dataflow; [MemoryDiagnoser] 
public class Program 
{ 
  static void Main() => BenchmarkRunner.Run(); 
  private readonly Channel _channel = Channel.CreateUnbounded(); 
  private, readonly BufferBlock _bufferBlock = new BufferBlock(); [Benchmark] 
  public async Task Channel_ReadThenWrite() 
  { 
    ChannelWriterWriter = _channel.Writer. ChannelReader reader = _channel.Reader; 
    for (int i = 0; i < 10_000_000; i++) 
    { 
      ValueTask vt = reader.ReadAsync(); 
      writer.TryWrite(i); await vt; 
    } 
  } [Benchmark] public async Task BufferBlock_ReadThenWrite() 
    { 
      for (int i = 0; i < 10_000_000; i++) 
      { 
        Task t = _bufferBlock.ReceiveAsync(); 
        _bufferBlock.Post(i); await t; 
      } 
   }
}

System.Threading.Tasks.Dataflow

This is in no way meant to suggest that the System.Threading.Tasks.Dataflow library shouldn’t be used. It allows developers to communicate succinctly large numbers of concepts and can show very high performance when used to solve the best problems. However, when all one needs is a hand-off data structure between one or more producers and one or more consumers you’ve manually implemented, System.Threading.Channels is a much simpler, leaner bet.

At this point, hopefully, you have a better grasp of the System.Threading.Channels enough channels in the library to see how they might fit in and enhance your applications.