当前博文未提及该问题,以下是基于C知道的知识内容为您回答
为 `MultiConsumerDataBuffer<T>` 类增加环形缓冲区消费者注册、普通消费者以及线程安全的增加数据接口,需要设计一个灵活且线程安全的架构。以下是一个完整的实现方案,包含环形缓冲区、普通消费者、线程安全的生产者接口,并确保在 WinForm 环境下可用。
### 1. 核心架构设计
我们将设计一个支持两种消费者模式的数据缓冲区:
- **环形缓冲区消费者**:基于固定大小的环形缓冲区,消费者按固定窗口消费数据。
- **普通消费者**:基于队列的实时消费者,立即处理新到达的数据。
| 消费者类型 | 数据传递方式 | 适用场景 | 线程安全机制 |
|------------|-------------|---------|-------------|
| 环形缓冲区消费者 | 固定大小数组快照 | 固定频率图表更新、批量数据处理 | 锁保护缓冲区操作 + 阻塞队列 |
| 普通消费者 | 单条或批量数据实时传递 | 实时日志、即时通知、流式处理 | 并发队列 + 事件驱动 |
### 2. 完整实现代码
```csharp
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Windows.Forms;
/// <summary>
/// 多功能消费者数据缓冲区
/// 支持环形缓冲区消费者和普通消费者两种模式
/// </summary>
/// <typeparam name="T">数据类型</typeparam>
public class MultiConsumerDataBuffer<T> : IDisposable
{
#region 内部数据结构
// 环形缓冲区
private readonly T[] _circularBuffer;
private int _head;
private int _tail;
private int _count;
private readonly object _bufferLock = new object();
// 消费者注册表
private readonly Dictionary<string, IConsumerInfo> _consumers = new();
private readonly object _consumerLock = new object();
// 处理线程控制
private readonly CancellationTokenSource _cts = new();
private Task _processingTask;
#endregion
#region 构造函数
/// <summary>
/// 初始化数据缓冲区
/// </summary>
/// <param name="circularBufferCapacity">环形缓冲区容量(仅用于环形消费者)</param>
public MultiConsumerDataBuffer(int circularBufferCapacity = 1000)
{
if (circularBufferCapacity <= 0)
throw new ArgumentException("缓冲区容量必须大于0", nameof(circularBufferCapacity));
_circularBuffer = new T[circularBufferCapacity];
_head = 0;
_tail = 0;
_count = 0;
// 启动消费者处理线程
StartConsumerProcessing();
}
#endregion
#region 消费者接口定义
/// <summary>
/// 消费者信息基接口
/// </summary>
private interface IConsumerInfo
{
string ConsumerId { get; }
bool IsUIThreadConsumer { get; }
bool IsActive { get; set; }
void ProcessData(object data);
}
/// <summary>
/// 环形缓冲区消费者信息
/// </summary>
private class CircularBufferConsumerInfo : IConsumerInfo
{
public string ConsumerId { get; set; }
public Action<T[]> ProcessingAction { get; set; }
public bool IsUIThreadConsumer { get; set; }
public bool IsActive { get; set; } = true;
public int SnapshotSize { get; set; } = 100; // 每次处理的数据快照大小
public BlockingCollection<T[]> DataQueue { get; set; }
public void ProcessData(object data)
{
if (data is T[] arrayData)
{
ProcessingAction?.Invoke(arrayData);
}
}
}
/// <summary>
/// 普通消费者信息
/// </summary>
private class RegularConsumerInfo : IConsumerInfo
{
public string ConsumerId { get; set; }
public Action<T> ProcessingAction { get; set; } // 处理单条数据
public Action<IEnumerable<T>> BatchProcessingAction { get; set; } // 处理批量数据
public bool IsUIThreadConsumer { get; set; }
public bool IsActive { get; set; } = true;
public int BatchSize { get; set; } = 1; // 批量处理大小
public BlockingCollection<T> DataQueue { get; set; }
public void ProcessData(object data)
{
if (BatchProcessingAction != null && data is IEnumerable<T> batchData)
{
BatchProcessingAction(batchData);
}
else if (ProcessingAction != null && data is T singleData)
{
ProcessingAction(singleData);
}
}
}
#endregion
#region 公共接口 - 数据添加
/// <summary>
/// 线程安全地添加数据到缓冲区
/// </summary>
/// <param name="item">要添加的数据项</param>
public void AddData(T item)
{
// 1. 添加到环形缓冲区
AddToCircularBuffer(item);
// 2. 通知普通消费者
NotifyRegularConsumers(item);
// 3. 为环形缓冲区消费者创建快照
CreateSnapshotForCircularConsumers();
}
/// <summary>
/// 线程安全地批量添加数据
/// </summary>
/// <param name="items">数据集合</param>
public void AddDataRange(IEnumerable<T> items)
{
if (items == null) return;
foreach (var item in items)
{
AddData(item);
}
}
/// <summary>
/// 异步添加数据(非阻塞)
/// </summary>
/// <param name="item">数据项</param>
public Task AddDataAsync(T item)
{
return Task.Run(() => AddData(item));
}
#endregion
#region 公共接口 - 消费者注册
/// <summary>
/// 注册环形缓冲区消费者
/// </summary>
/// <param name="consumerId">消费者标识</param>
/// <param name="processingAction">处理委托(接收数据数组)</param>
/// <param name="isUIThreadConsumer">是否在UI线程执行</param>
/// <param name="maxQueueSize">最大队列大小</param>
/// <param name="snapshotSize">快照大小</param>
public void RegisterCircularBufferConsumer(
string consumerId,
Action<T[]> processingAction,
bool isUIThreadConsumer = false,
int maxQueueSize = 100,
int snapshotSize = 100)
{
if (string.IsNullOrEmpty(consumerId))
throw new ArgumentException("消费者标识不能为空", nameof(consumerId));
if (processingAction == null)
throw new ArgumentNullException(nameof(processingAction));
lock (_consumerLock)
{
if (_consumers.ContainsKey(consumerId))
throw new ArgumentException($"消费者 '{consumerId}' 已注册");
var consumerInfo = new CircularBufferConsumerInfo
{
ConsumerId = consumerId,
ProcessingAction = processingAction,
IsUIThreadConsumer = isUIThreadConsumer,
SnapshotSize = Math.Max(1, Math.Min(snapshotSize, _circularBuffer.Length)),
DataQueue = new BlockingCollection<T[]>(maxQueueSize)
};
_consumers[consumerId] = consumerInfo;
}
}
/// <summary>
/// 注册普通消费者(单条数据处理)
/// </summary>
public void RegisterRegularConsumer(
string consumerId,
Action<T> processingAction,
bool isUIThreadConsumer = false,
int maxQueueSize = 1000)
{
RegisterRegularConsumerInternal(consumerId, processingAction, null, isUIThreadConsumer, maxQueueSize, 1);
}
/// <summary>
/// 注册普通消费者(批量数据处理)
/// </summary>
public void RegisterRegularConsumer(
string consumerId,
Action<IEnumerable<T>> batchProcessingAction,
bool isUIThreadConsumer = false,
int maxQueueSize = 1000,
int batchSize = 10)
{
RegisterRegularConsumerInternal(consumerId, null, batchProcessingAction, isUIThreadConsumer, maxQueueSize, batchSize);
}
private void RegisterRegularConsumerInternal(
string consumerId,
Action<T> processingAction,
Action<IEnumerable<T>> batchProcessingAction,
bool isUIThreadConsumer,
int maxQueueSize,
int batchSize)
{
if (string.IsNullOrEmpty(consumerId))
throw new ArgumentException("消费者标识不能为空", nameof(consumerId));
if (processingAction == null && batchProcessingAction == null)
throw new ArgumentException("必须提供处理委托");
lock (_consumerLock)
{
if (_consumers.ContainsKey(consumerId))
throw new ArgumentException($"消费者 '{consumerId}' 已注册");
var consumerInfo = new RegularConsumerInfo
{
ConsumerId = consumerId,
ProcessingAction = processingAction,
BatchProcessingAction = batchProcessingAction,
IsUIThreadConsumer = isUIThreadConsumer,
BatchSize = Math.Max(1, batchSize),
DataQueue = new BlockingCollection<T>(maxQueueSize)
};
_consumers[consumerId] = consumerInfo;
}
}
/// <summary>
/// 注销消费者
/// </summary>
public void UnregisterConsumer(string consumerId)
{
lock (_consumerLock)
{
if (_consumers.TryGetValue(consumerId, out var consumer))
{
consumer.IsActive = false;
if (consumer is CircularBufferConsumerInfo circularConsumer)
{
circularConsumer.DataQueue.CompleteAdding();
}
else if (consumer is RegularConsumerInfo regularConsumer)
{
regularConsumer.DataQueue.CompleteAdding();
}
_consumers.Remove(consumerId);
}
}
}
#endregion
#region 私有实现方法
/// <summary>
/// 线程安全地添加到环形缓冲区
/// </summary>
private void AddToCircularBuffer(T item)
{
lock (_bufferLock)
{
_circularBuffer[_tail] = item;
_tail = (_tail + 1) % _circularBuffer.Length;
if (_count == _circularBuffer.Length)
{
// 缓冲区已满,覆盖最旧的数据
_head = (_head + 1) % _circularBuffer.Length;
}
else
{
_count++;
}
}
}
/// <summary>
/// 通知普通消费者
/// </summary>
private void NotifyRegularConsumers(T item)
{
List<RegularConsumerInfo> regularConsumers;
lock (_consumerLock)
{
regularConsumers = _consumers.Values
.OfType<RegularConsumerInfo>()
.Where(c => c.IsActive)
.ToList();
}
foreach (var consumer in regularConsumers)
{
// 非阻塞添加,如果队列满则跳过
consumer.DataQueue.TryAdd(item);
}
}
/// <summary>
/// 为环形缓冲区消费者创建快照
/// </summary>
private void CreateSnapshotForCircularConsumers()
{
List<CircularBufferConsumerInfo> circularConsumers;
lock (_consumerLock)
{
circularConsumers = _consumers.Values
.OfType<CircularBufferConsumerInfo>()
.Where(c => c.IsActive)
.ToList();
}
if (circularConsumers.Count == 0) return;
// 获取当前缓冲区快照
T[] snapshot = GetBufferSnapshot();
foreach (var consumer in circularConsumers)
{
// 根据快照大小截取数据
int takeSize = Math.Min(consumer.SnapshotSize, snapshot.Length);
if (takeSize > 0)
{
T[] consumerSnapshot = new T[takeSize];
Array.Copy(snapshot, snapshot.Length - takeSize, consumerSnapshot, 0, takeSize);
// 非阻塞添加
consumer.DataQueue.TryAdd(consumerSnapshot);
}
}
}
/// <summary>
/// 获取环形缓冲区当前快照(线程安全)
/// </summary>
private T[] GetBufferSnapshot()
{
lock (_bufferLock)
{
if (_count == 0) return Array.Empty<T>();
T[] snapshot = new T[_count];
int current = _head;
for (int i = 0; i < _count; i++)
{
snapshot[i] = _circularBuffer[current];
current = (current + 1) % _circularBuffer.Length;
}
return snapshot;
}
}
/// <summary>
/// 启动消费者处理线程
/// </summary>
private void StartConsumerProcessing()
{
_processingTask = Task.Run(async () =>
{
while (!_cts.Token.IsCancellationRequested)
{
try
{
await ProcessAllConsumersAsync();
await Task.Delay(10, _cts.Token); // 10ms处理间隔
}
catch (OperationCanceledException)
{
// 任务取消,正常退出
break;
}
catch (Exception ex)
{
// 记录日志,继续运行
System.Diagnostics.Debug.WriteLine($"消费者处理异常: {ex.Message}");
await Task.Delay(100, _cts.Token);
}
}
}, _cts.Token);
}
/// <summary>
/// 处理所有消费者
/// </summary>
private async Task ProcessAllConsumersAsync()
{
List<IConsumerInfo> activeConsumers;
lock (_consumerLock)
{
activeConsumers = _consumers.Values
.Where(c => c.IsActive)
.ToList();
}
var processingTasks = new List<Task>();
foreach (var consumer in activeConsumers)
{
processingTasks.Add(ProcessConsumerAsync(consumer));
}
await Task.WhenAll(processingTasks);
}
/// <summary>
/// 处理单个消费者
/// </summary>
private async Task ProcessConsumerAsync(IConsumerInfo consumer)
{
try
{
if (consumer is CircularBufferConsumerInfo circularConsumer)
{
await ProcessCircularConsumerAsync(circularConsumer);
}
else if (consumer is RegularConsumerInfo regularConsumer)
{
await ProcessRegularConsumerAsync(regularConsumer);
}
}
catch (Exception ex)
{
System.Diagnostics.Debug.WriteLine($"处理消费者 '{consumer.ConsumerId}' 时出错: {ex.Message}");
}
}
/// <summary>
/// 处理环形缓冲区消费者
/// </summary>
private async Task ProcessCircularConsumerAsync(CircularBufferConsumerInfo consumer)
{
if (consumer.DataQueue.TryTake(out var data, 0))
{
if (consumer.IsUIThreadConsumer)
{
await InvokeOnUIThreadAsync(() => consumer.ProcessingAction(data));
}
else
{
consumer.ProcessingAction(data);
}
}
}
/// <summary>
/// 处理普通消费者
/// </summary>
private async Task ProcessRegularConsumerAsync(RegularConsumerInfo consumer)
{
if (consumer.BatchProcessingAction != null)
{
// 批量处理模式
var batch = new List<T>();
while (batch.Count < consumer.BatchSize && consumer.DataQueue.TryTake(out var item, 0))
{
batch.Add(item);
}
if (batch.Count > 0)
{
if (consumer.IsUIThreadConsumer)
{
await InvokeOnUIThreadAsync(() => consumer.BatchProcessingAction(batch));
}
else
{
consumer.BatchProcessingAction(batch);
}
}
}
else if (consumer.ProcessingAction != null)
{
// 单条处理模式
if (consumer.DataQueue.TryTake(out var item, 0))
{
if (consumer.IsUIThreadConsumer)
{
await InvokeOnUIThreadAsync(() => consumer.ProcessingAction(item));
}
else
{
consumer.ProcessingAction(item);
}
}
}
}
/// <summary>
/// 在UI线程上执行操作
/// </summary>
private async Task InvokeOnUIThreadAsync(Action action)
{
if (Application.OpenForms.Count > 0)
{
var mainForm = Application.OpenForms[0];
if (mainForm.InvokeRequired)
{
await Task.Run(() => mainForm.Invoke(action));
}
else
{
action();
}
}
else
{
// 如果没有UI线程,直接执行
action();
}
}
#endregion
#region 公共工具方法
/// <summary>
/// 获取缓冲区当前数据计数
/// </summary>
public int GetBufferCount()
{
lock (_bufferLock)
{
return _count;
}
}
/// <summary>
/// 获取活跃消费者数量
/// </summary>
public int GetActiveConsumerCount()
{
lock (_consumerLock)
{
return _consumers.Values.Count(c => c.IsActive);
}
}
/// <summary>
/// 清空缓冲区
/// </summary>
public void ClearBuffer()
{
lock (_bufferLock)
{
_head = 0;
_tail = 0;
_count = 0;
Array.Clear(_circularBuffer, 0, _circularBuffer.Length);
}
}
#endregion
#region IDisposable 实现
private bool _disposed = false;
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
if (!_disposed)
{
if (disposing)
{
// 取消处理任务
_cts?.Cancel();
try
{
_processingTask?.Wait(1000);
}
catch (AggregateException)
{
// 任务取消异常,忽略
}
// 清理消费者资源
lock (_consumerLock)
{
foreach (var consumer in _consumers.Values)
{
if (consumer is CircularBufferConsumerInfo circularConsumer)
{
circularConsumer.DataQueue.CompleteAdding();
circularConsumer.DataQueue.Dispose();
}
else if (consumer is RegularConsumerInfo regularConsumer)
{
regularConsumer.DataQueue.CompleteAdding();
regularConsumer.DataQueue.Dispose();
}
}
_consumers.Clear();
}
_cts?.Dispose();
}
_disposed = true;
}
}
~MultiConsumerDataBuffer()
{
Dispose(false);
}
#endregion
}
```
### 3. 使用示例(WinForm应用)
以下是在 WinForm 中使用该数据缓冲区的完整示例:
```csharp
using System;
using System.Collections.Generic;
using System.Drawing;
using System.Threading;
using System.Windows.Forms;
using System.Windows.Forms.DataVisualization.Charting;
public partial class MainForm : Form
{
private MultiConsumerDataBuffer<double> _dataBuffer;
private Random _random = new Random();
private Thread _dataGeneratorThread;
private bool _isGeneratingData = false;
// UI控件
private Chart _chart1;
private ListBox _logListBox;
private Button _btnStart;
private Button _btnStop;
private Label _lblStatus;
public MainForm()
{
InitializeComponent();
InitializeDataBuffer();
SetupUI();
}
private void InitializeComponent()
{
this.Text = "多消费者数据缓冲区示例";
this.Size = new Size(800, 600);
// 创建图表控件
_chart1 = new Chart
{
Location = new Point(10, 10),
Size = new Size(600, 400),
Anchor = AnchorStyles.Top | AnchorStyles.Left | AnchorStyles.Right
};
var chartArea = new ChartArea("MainArea");
_chart1.ChartAreas.Add(chartArea);
var series = new Series("实时数据")
{
ChartType = SeriesChartType.FastLine,
Color = Color.Blue
};
_chart1.Series.Add(series);
// 创建日志列表框
_logListBox = new ListBox
{
Location = new Point(620, 10),
Size = new Size(160, 400),
Anchor = AnchorStyles.Top | AnchorStyles.Right
};
// 创建按钮
_btnStart = new Button
{
Text = "开始生成数据",
Location = new Point(10, 420),
Size = new Size(120, 30)
};
_btnStart.Click += BtnStart_Click;
_btnStop = new Button
{
Text = "停止生成数据",
Location = new Point(140, 420),
Size = new Size(120, 30),
Enabled = false
};
_btnStop.Click += BtnStop_Click;
// 状态标签
_lblStatus = new Label
{
Text = "就绪",
Location = new Point(270, 425),
Size = new Size(300, 30),
ForeColor = Color.Green
};
// 添加到窗体
this.Controls.AddRange(new Control[] { _chart1, _logListBox, _btnStart, _btnStop, _lblStatus });
}
private void SetupUI()
{
// UI初始化代码
}
private void InitializeDataBuffer()
{
// 创建数据缓冲区(环形缓冲区容量1000)
_dataBuffer = new MultiConsumerDataBuffer<double>(1000);
// 注册环形缓冲区消费者(用于图表更新)
_dataBuffer.RegisterCircularBufferConsumer(
consumerId: "ChartConsumer",
processingAction: UpdateChartWithData,
isUIThreadConsumer: true,
maxQueueSize: 50,
snapshotSize: 50 // 每次显示最近50个数据点
);
// 注册普通消费者 - 单条数据处理(用于日志记录)
_dataBuffer.RegisterRegularConsumer(
consumerId: "LogConsumer",
processingAction: LogData,
isUIThreadConsumer: true,
maxQueueSize: 200
);
// 注册普通消费者 - 批量数据处理(用于统计分析)
_dataBuffer.RegisterRegularConsumer(
consumerId: "StatsConsumer",
batchProcessingAction: ProcessStatistics,
isUIThreadConsumer: false, // 在后台线程处理
maxQueueSize: 500,
batchSize: 20 // 每20条数据批量处理一次
);
}
// 环形缓冲区消费者处理方法
private void UpdateChartWithData(double[] dataPoints)
{
if (InvokeRequired)
{
Invoke(new Action<double[]>(UpdateChartWithData), dataPoints);
return;
}
try
{
_chart1.Series["实时数据"].Points.Clear();
for (int i = 0; i < dataPoints.Length; i++)
{
_chart1.Series["实时数据"].Points.AddXY(i, dataPoints[i]);
}
_chart1.Update();
}
catch (Exception ex)
{
LogMessage($"图表更新错误: {ex.Message}");
}
}
// 普通消费者 - 单条数据处理
private void LogData(double data)
{
if (InvokeRequired)
{
Invoke(new Action<double>(LogData), data);
return;
}
string logEntry = $"[{DateTime.Now:HH:mm:ss.fff}] 数据: {data:F4}";
_logListBox.Items.Add(logEntry);
// 保持日志列表不超过100条
if (_logListBox.Items.Count > 100)
{
_logListBox.Items.RemoveAt(0);
}
_logListBox.TopIndex = _logListBox.Items.Count - 1;
}
// 普通消费者 - 批量数据处理
private void ProcessStatistics(IEnumerable<double> dataBatch)
{
// 在后台线程执行统计计算
var batchList = dataBatch.ToList();
if (batchList.Count == 0) return;
double average = batchList.Average();
double max = batchList.Max();
double min = batchList.Min();
// 更新UI(需要线程安全)
if (InvokeRequired)
{
Invoke(new Action(() =>
{
LogMessage($"统计: 平均={average:F4}, 最大={max:F4}, 最小={min:F4}");
}));
}
}
private void LogMessage(string message)
{
_logListBox.Items.Add($"[{DateTime.Now:HH:mm:ss}] {message}");
if (_logListBox.Items.Count > 100)
{
_logListBox.Items.RemoveAt(0);
}
_logListBox.TopIndex = _logListBox.Items.Count - 1;
}
private void BtnStart_Click(object sender, EventArgs e)
{
if (_isGeneratingData) return;
_isGeneratingData = true;
_btnStart.Enabled = false;
_btnStop.Enabled = true;
_lblStatus.Text = "正在生成数据...";
_lblStatus.ForeColor = Color.Blue;
// 启动数据生成线程
_dataGeneratorThread = new Thread(GenerateData)
{
IsBackground = true
};
_dataGeneratorThread.Start();
}
private void BtnStop_Click(object sender, EventArgs e)
{
_isGeneratingData = false;
_btnStart.Enabled = true;
_btnStop.Enabled = false;
_lblStatus.Text = "已停止";
_lblStatus.ForeColor = Color.Red;
}
private void GenerateData()
{
int counter = 0;
while (_isGeneratingData)
{
try
{
// 生成模拟数据(正弦波 + 随机噪声)
double time = counter * 0.1;
double value = Math.Sin(time) + (_random.NextDouble() * 0.2 - 0.1);
// 线程安全地添加数据
_dataBuffer.AddData(value);
counter++;
Thread.Sleep(50); // 20Hz 数据频率
// 每100次更新一次状态
if (counter % 100 == 0)
{
UpdateStatus(counter);
}
}
catch (Exception ex)
{
LogMessage($"数据生成错误: {ex.Message}");
Thread.Sleep(1000);
}
}
}
private void UpdateStatus(int counter)
{
if (InvokeRequired)
{
Invoke(new Action<int>(UpdateStatus), counter);
return;
}
int bufferCount = _dataBuffer.GetBufferCount();
int consumerCount = _dataBuffer.GetActiveConsumerCount();
_lblStatus.Text = $"已生成 {counter} 条数据 | 缓冲区: {bufferCount} | 消费者: {consumerCount}";
}
protected override void OnFormClosing(FormClosingEventArgs e)
{
// 停止数据生成
_isGeneratingData = false;
// 清理资源
_dataBuffer?.Dispose();
base.OnFormClosing(e);
}
}
// 使用示例 - 控制台测试
public class Program
{
public static void Main()
{
Application.EnableVisualStyles();
Application.SetCompatibleTextRenderingDefault(false);
Application.Run(new MainForm());
}
}
```
### 4. 关键特性说明
#### 4.1 线程安全机制
| 组件 | 线程安全策略 | 实现方式 |
|------|-------------|---------|
| 环形缓冲区 | 细粒度锁 | `lock (_bufferLock)` 保护所有缓冲区操作 |
| 消费者注册表 | 细粒度锁 | `lock (_consumerLock)` 保护消费者集合 |
| 数据队列 | 并发集合 | `BlockingCollection<T>` 提供线程安全队列 |
| UI线程调用 | 异步调用 | `InvokeOnUIThreadAsync` 方法确保UI操作安全 |
#### 4.2 性能优化
1. **非阻塞队列操作**:使用 `TryAdd` 和 `TryTake` 避免线程阻塞
2. **批量处理**:普通消费者支持批量处理,减少上下文切换
3. **异步处理**:消费者处理采用异步模式,提高吞吐量
4. **内存管理**:及时清理队列,避免内存泄漏
#### 4.3 错误处理
1. **异常隔离**:每个消费者的异常不会影响其他消费者
2. **资源清理**:实现完整的 `IDisposable` 模式
3. **状态监控**:提供缓冲区计数和消费者计数方法
### 5. 扩展建议
```csharp
// 1. 添加数据过滤支持
public void RegisterConsumerWithFilter<T>(
string consumerId,
Action<T[]> processingAction,
Func<T, bool> filterPredicate)
{
// 实现过滤逻辑
}
// 2. 添加数据转换支持
public void RegisterConsumerWithTransform<TInput, TOutput>(
string consumerId,
Action<TOutput[]> processingAction,
Func<TInput, TOutput> transformFunc)
{
// 实现数据转换
}
// 3. 添加性能监控
public class ConsumerMetrics
{
public string ConsumerId { get; set; }
public long ProcessedCount { get; set; }
public double AverageLatency { get; set; }
public DateTime LastProcessTime { get; set; }
}
// 4. 添加动态配置
public void UpdateConsumerConfig(
string consumerId,
int? newBatchSize = null,
bool? newUIThreadSetting = null)
{
// 动态更新消费者配置
}
```
这个实现提供了完整的环形缓冲区消费者和普通消费者支持,具有线程安全性、良好的性能和易用性,特别适合 WinForm 应用程序中的实时数据处理场景。通过合理的锁策略和并发集合,确保了在多线程环境下的数据一致性。