C#中使用Channel实现生产者-消费者通信
in C# with 0 comment

在 C# 中,Channel 是 .NET Core 3.0+ 引入的一个高性能、线程安全的异步消息队列机制,用于在生产者和消费者之间传递数据,特别适合用于异步任务之间的通信与协作。

它类似于 Go 语言中的 channel 或者 Java 中的 BlockingQueue,但专为 async/await 设计,支持背压(backpressure)、缓冲、无界/有界队列等特性。

引入命名空间

using System.Threading.Channels;

需要 .NET Core 3.0+ 或 .NET 5/6/7/8+,或安装 NuGet 包 System.Threading.Channels

创建 Channel

Channel 有两种主要类型:

无界 Channel(Unbounded):不限制队列大小(类似 Queue
有界 Channel(Bounded):限制最大容量,支持阻塞或丢弃策略

创建无界 Channel

var channel = Channel.CreateUnbounded<int>();

创建有界 Channel(容量为 10)

var channel = Channel.CreateBounded<int>(10);

有界 Channel 在满时,写入操作可以等待(默认)或丢弃旧/新数据,通过 BoundedChannelOptions 配置:

var options = new BoundedChannelOptions(10)
{
    FullMode = BoundedChannelFullMode.Wait // 默认:写入者等待
    // FullMode = BoundedChannelFullMode.DropOldest
    // FullMode = BoundedChannelFullMode.DropNewest
};
var channel = Channel.CreateBounded<int>(options);

消费者 - 读取数据(Reader)

使用 Channel.Reader.ReadAsync() 异步读取:

async Task Consumer(ChannelReader<int> reader, CancellationToken token = default)
{
    await foreach (var item in reader.ReadAllAsync(token))
    {
        Console.WriteLine($"Consumed: {item}");
        await Task.Delay(250); // 模拟处理时间
    }
    Console.WriteLine("消费完成。");
}

ReadAllAsync() 是推荐方式,它会一直读取直到 Writer.Complete() 被调用。

也可以手动循环:

while (await reader.WaitToReadAsync(token))
{
    while (reader.TryRead(out var item))
    {
        Console.WriteLine($"Consumed: {item}");
    }
}

完整示例:生产者-消费者通信

using System;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;

class Program
{
    static async Task Main()
    {
        var channel = Channel.CreateBounded<int>(new BoundedChannelOptions(5)
        {
            FullMode = BoundedChannelFullMode.Wait
        });

        var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); // 10秒后自动取消

        // 启动生产者和消费者
        var producerTask = Producer(channel.Writer, cts.Token);
        var consumerTask = Consumer(channel.Reader, cts.Token);

        await Task.WhenAll(producerTask, consumerTask);

        Console.WriteLine("程序结束。");
    }

    static async Task Producer(ChannelWriter<int> writer, CancellationToken token)
    {
        try
        {
            for (int i = 0; i < 20; i++)
            {
                await writer.WriteAsync(i, token);
                Console.WriteLine($"Produced: {i}");
                await Task.Delay(300, token);
            }
        }
        catch (OperationCanceledException)
        {
            Console.WriteLine("生产者被取消。");
        }
        finally
        {
            writer.Complete(); // 重要:通知消费者没有更多数据了
        }
    }

    static async Task Consumer(ChannelReader<int> reader, CancellationToken token)
    {
        try
        {
            await foreach (var item in reader.ReadAllAsync(token))
            {
                Console.WriteLine($"Consumed: {item}");
                await Task.Delay(500, token); // 模拟耗时处理
            }
        }
        catch (OperationCanceledException)
        {
            Console.WriteLine("消费者被取消。");
        }
    }
}

典型应用场景

场景说明
后台任务处理生产者生成任务,消费者异步处理(如日志、邮件、图像处理)
限流/背压控制有界 Channel 可防止生产者压垮消费者
多生产者/多消费者Channel 是线程安全的,支持多个并发写入/读取
管道式处理(Pipeline)多个 Channel 串联,形成数据处理流水线

多生产者 + 多消费者示例

var channel = Channel.CreateUnbounded<string>();

// 启动多个生产者
var producers = Enumerable.Range(1, 3).Select(i =>
    Task.Run(() => Producer(channel.Writer, i, cts.Token)));

// 启动多个消费者
var consumers = Enumerable.Range(1, 2).Select(i =>
    Task.Run(() => Consumer(channel.Reader, i, cts.Token)));

await Task.WhenAll(producers.Concat(consumers));

注意事项

小技巧:转换为 IAsyncEnumerable
IAsyncEnumerable<T> GetData(ChannelReader<T> reader)
    => reader.ReadAllAsync();

可直接用于 await foreach 或 LINQ(配合 System.Linq.Async)。

我的自己的简单代码

using System.Threading.Channels;
using System.Threading.Tasks;

var channel = Channel.CreateUnbounded<Message>();

var sender = SendMessageAsync(channel.Writer);
var receive = ReceiveMessageAsync(channel.Reader);

Console.ReadKey();

channel.Writer.Complete();

await sender;
await receive;

static async Task SendMessageAsync(ChannelWriter<Message> channelWriter)
{
    try
    {
        for (global::System.Int32 i = 0; i < 20; i++)
        {
            await channelWriter.WriteAsync(new(i));
            await Task.Delay(400);
        }
    }
    catch (ChannelClosedException)
    {
        Console.WriteLine("发送消息器关闭");
    }
}

static async Task ReceiveMessageAsync(ChannelReader<Message> channelReader)
{
    //try
    //{
    //    while (true)
    //    {
    //        var message = await channelReader.ReadAsync();
    //        Console.WriteLine(message);
    //    }
    //}
    //catch (ChannelClosedException)
    //{
    //    Console.WriteLine("接收消息器关闭");
    //}
    // 简单版本
    await foreach (var message in channelReader.ReadAllAsync()) {
        Console.WriteLine(message);
    }
}


public record Message(int Id);

参考链接