在 C# 中,Channel
它类似于 Go 语言中的 channel 或者 Java 中的 BlockingQueue,但专为 async/await 设计,支持背压(backpressure)、缓冲、无界/有界队列等特性。
引入命名空间
using System.Threading.Channels;
需要 .NET Core 3.0+ 或 .NET 5/6/7/8+,或安装 NuGet 包 System.Threading.Channels
。
创建 Channel
Channel 有两种主要类型:
无界 Channel(Unbounded):不限制队列大小(类似 Queue
有界 Channel(Bounded):限制最大容量,支持阻塞或丢弃策略
创建无界 Channel
var channel = Channel.CreateUnbounded<int>();
创建有界 Channel(容量为 10)
var channel = Channel.CreateBounded<int>(10);
有界 Channel 在满时,写入操作可以等待(默认)或丢弃旧/新数据,通过 BoundedChannelOptions 配置:
var options = new BoundedChannelOptions(10)
{
FullMode = BoundedChannelFullMode.Wait // 默认:写入者等待
// FullMode = BoundedChannelFullMode.DropOldest
// FullMode = BoundedChannelFullMode.DropNewest
};
var channel = Channel.CreateBounded<int>(options);
消费者 - 读取数据(Reader)
使用 Channel.Reader.ReadAsync() 异步读取:
async Task Consumer(ChannelReader<int> reader, CancellationToken token = default)
{
await foreach (var item in reader.ReadAllAsync(token))
{
Console.WriteLine($"Consumed: {item}");
await Task.Delay(250); // 模拟处理时间
}
Console.WriteLine("消费完成。");
}
ReadAllAsync() 是推荐方式,它会一直读取直到 Writer.Complete() 被调用。
也可以手动循环:
while (await reader.WaitToReadAsync(token))
{
while (reader.TryRead(out var item))
{
Console.WriteLine($"Consumed: {item}");
}
}
完整示例:生产者-消费者通信
using System;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
class Program
{
static async Task Main()
{
var channel = Channel.CreateBounded<int>(new BoundedChannelOptions(5)
{
FullMode = BoundedChannelFullMode.Wait
});
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); // 10秒后自动取消
// 启动生产者和消费者
var producerTask = Producer(channel.Writer, cts.Token);
var consumerTask = Consumer(channel.Reader, cts.Token);
await Task.WhenAll(producerTask, consumerTask);
Console.WriteLine("程序结束。");
}
static async Task Producer(ChannelWriter<int> writer, CancellationToken token)
{
try
{
for (int i = 0; i < 20; i++)
{
await writer.WriteAsync(i, token);
Console.WriteLine($"Produced: {i}");
await Task.Delay(300, token);
}
}
catch (OperationCanceledException)
{
Console.WriteLine("生产者被取消。");
}
finally
{
writer.Complete(); // 重要:通知消费者没有更多数据了
}
}
static async Task Consumer(ChannelReader<int> reader, CancellationToken token)
{
try
{
await foreach (var item in reader.ReadAllAsync(token))
{
Console.WriteLine($"Consumed: {item}");
await Task.Delay(500, token); // 模拟耗时处理
}
}
catch (OperationCanceledException)
{
Console.WriteLine("消费者被取消。");
}
}
}
典型应用场景
场景 | 说明 |
---|---|
后台任务处理 | 生产者生成任务,消费者异步处理(如日志、邮件、图像处理) |
限流/背压控制 | 有界 Channel 可防止生产者压垮消费者 |
多生产者/多消费者 | Channel 是线程安全的,支持多个并发写入/读取 |
管道式处理(Pipeline) | 多个 Channel 串联,形成数据处理流水线 |
多生产者 + 多消费者示例
var channel = Channel.CreateUnbounded<string>();
// 启动多个生产者
var producers = Enumerable.Range(1, 3).Select(i =>
Task.Run(() => Producer(channel.Writer, i, cts.Token)));
// 启动多个消费者
var consumers = Enumerable.Range(1, 2).Select(i =>
Task.Run(() => Consumer(channel.Reader, i, cts.Token)));
await Task.WhenAll(producers.Concat(consumers));
注意事项
- Writer.Complete() 是优雅关闭的关键 —— 消费者会读完剩余数据后退出。
- 使用 CancellationToken 可随时取消生产/消费。
- 有界 Channel 的 FullMode 要根据业务选择(等待、丢弃等)。
- Channel 是线程安全的,无需额外加锁。
- 使用 ReadAllAsync() 是最简洁安全的消费方式。
小技巧:转换为 IAsyncEnumerable
IAsyncEnumerable<T> GetData(ChannelReader<T> reader)
=> reader.ReadAllAsync();
可直接用于 await foreach 或 LINQ(配合 System.Linq.Async)。
我的自己的简单代码
using System.Threading.Channels;
using System.Threading.Tasks;
var channel = Channel.CreateUnbounded<Message>();
var sender = SendMessageAsync(channel.Writer);
var receive = ReceiveMessageAsync(channel.Reader);
Console.ReadKey();
channel.Writer.Complete();
await sender;
await receive;
static async Task SendMessageAsync(ChannelWriter<Message> channelWriter)
{
try
{
for (global::System.Int32 i = 0; i < 20; i++)
{
await channelWriter.WriteAsync(new(i));
await Task.Delay(400);
}
}
catch (ChannelClosedException)
{
Console.WriteLine("发送消息器关闭");
}
}
static async Task ReceiveMessageAsync(ChannelReader<Message> channelReader)
{
//try
//{
// while (true)
// {
// var message = await channelReader.ReadAsync();
// Console.WriteLine(message);
// }
//}
//catch (ChannelClosedException)
//{
// Console.WriteLine("接收消息器关闭");
//}
// 简单版本
await foreach (var message in channelReader.ReadAllAsync()) {
Console.WriteLine(message);
}
}
public record Message(int Id);
参考链接
本文由 jxxxy 创作,采用 知识共享署名4.0 国际许可协议进行许可。
本站文章除注明转载/出处外,均为本站原创或翻译,转载前请务必署名。