Batch Consuming
IBatchConsumer
Section titled “IBatchConsumer”IBatchConsumer<TValue> receives a batch of messages in a single invocation. Implement it when per-message overhead dominates your processing time, or when your domain logic benefits from seeing multiple messages at once (aggregation, bulk writes, cross-message deduplication).
public class ScanBatchConsumer(ILogger<ScanBatchConsumer> logger) : IBatchConsumer<PackageScan>{ public async Task ConsumeAsync( ConsumeContext<MessageBatch<PackageScan>> context, CancellationToken cancellationToken) { var batch = context.Message; logger.LogInformation("Processing {Count} scans", batch.Count);
foreach (var item in batch.Items) { var scan = item.Message; // deserialized payload var tc = item.TransportContext; // raw bytes, headers, partition, offset // process... } }}Register it on a consumer group with AddBatchConsumer<T>():
kafka.Topic<string, PackageScan>("sorting.scans", topic =>{ topic.ConsumerGroup("sorting.sorter", group => { group.AddBatchConsumer<ScanBatchConsumer>(); });});The batch flows through the consume pipeline as ConsumeContext<MessageBatch<TValue>>. Your handler has full access to context.RetryAttempt, context.Transaction, context.Services, and context.Headers, the same as a single-message consumer.
MessageBatch and BatchItem
Section titled “MessageBatch and BatchItem”MessageBatch<T> is an immutable, read-only list of BatchItem<T>. You can iterate it directly or access items by index:
foreach (var item in batch.Items) { /* ... */ }var first = batch[0];var count = batch.Count;Each BatchItem<T> carries two properties:
| Member | Type | Description |
|---|---|---|
Message | T | The deserialized message payload |
TransportContext | TransportContext | Raw key/value bytes, headers, and Kafka metadata (partition, offset) |
Batch configuration
Section titled “Batch configuration”Pass a configuration delegate to AddBatchConsumer<T>() to control batch sizing:
group.AddBatchConsumer<ScanBatchConsumer>(batch =>{ batch.MaxSize = 500; batch.Timeout = TimeSpan.FromSeconds(2);});| Option | Type | Default | Description |
|---|---|---|---|
MaxSize | int | 100 | Maximum messages per batch. Must be greater than zero. |
Timeout | TimeSpan | 5 seconds | Maximum wait time after the first message before dispatching a partial batch. Must be greater than zero. |
When omitted, both use their defaults:
group.AddBatchConsumer<ScanBatchConsumer>(); // MaxSize = 100, Timeout = 5sHow accumulation works
Section titled “How accumulation works”Each worker in the pool accumulates messages independently. The accumulator follows a two-phase algorithm:
- Block until at least one message arrives. No CPU is consumed while waiting.
- Drain all messages that are immediately available in the worker's channel buffer.
- If the batch reaches
MaxSize, dispatch it immediately. - Otherwise, wait up to
Timeoutfor more messages. If more arrive, drain again. If the timeout elapses, dispatch whatever has accumulated.
Accumulation is per-worker, not per-group. With WorkerCount = 4 and MaxSize = 100, each worker independently accumulates up to 100 messages. The actual batch size depends on how the distribution strategy routes messages to workers. Under low load, you will see small partial batches dispatched on timeout.
group.WorkerCount = 4;group.AddBatchConsumer<ScanBatchConsumer>(batch =>{ batch.MaxSize = 100; batch.Timeout = TimeSpan.FromSeconds(1);});Middleware behavior
Section titled “Middleware behavior”All group-level middleware works with batch consumers. Several behave differently because the pipeline message is a MessageBatch<T> rather than a single T.
Validation
Section titled “Validation”Validation runs per item, not per batch. The existing IMessageValidator<TValue> or FluentValidation validator evaluates each item individually. Invalid items are removed from the batch and handled according to the configured error action (dead-letter or discard). Valid items continue as a reduced batch.
group.AddBatchConsumer<ScanBatchConsumer>();group.ValidateWithFluentValidation(onFailure => onFailure.DeadLetter());If every item in a batch fails validation, the handler is not invoked. See Validation for validator registration options. SkipNullPayloads() and allowNullMessages: true on ValidateWithFluentValidation also apply per item; see Validation → Null payloads.
Filtering
Section titled “Filtering”Filters run per item, not per batch. Both class-based IConsumerFilter<TValue> and predicate-based .Filter(...) evaluate each item individually. Items where any filter returns false are silently dropped from the batch; survivors continue as a reduced batch. AND semantics apply across multiple filters at the item level.
group.AddBatchConsumer<ScanBatchConsumer>();group.Filter<ActiveOnlyFilter>();group.Filter((ctx, _) => ValueTask.FromResult(ctx.Message.Region != "test"));If every item in a batch is filtered out, the handler is not invoked. Offsets still advance for filtered items: filtering is a silent skip, not an error.
Per-consumer (leaf-level) filters remain unsupported in batch mode for the same reason as per-consumer middleware: see Constraints below.
Retry operates at the batch level. If your handler throws, the entire batch is retried, not individual items. Each retry attempt gets a fresh DI scope.
group.OnError(e => e.Default(d => d.Retry(3, Backoff.Exponential(TimeSpan.FromMilliseconds(200))).DeadLetter()));When retries are exhausted, the error action applies per item: each message in the batch is individually dead-lettered or discarded.
Dead-letter queue
Section titled “Dead-letter queue”When a batch fails after retry exhaustion, Emit dead-letters each item separately. Every item gets its own DLQ message with the original key, value, headers, and diagnostic headers (source topic, partition, offset, exception). See Dead Letter Queue for DLQ configuration.
Rate limiting
Section titled “Rate limiting”The rate limiter acquires one permit per message in the batch, not one per batch invocation. A batch of 50 messages consumes 50 permits. This means the effective throughput limit applies to message count, regardless of batch size.
group.RateLimit(rl => rl.TokenBucket(permitsPerSecond: 500, burstSize: 200));group.AddBatchConsumer<ScanBatchConsumer>(batch => batch.MaxSize = 100);Circuit breaker
Section titled “Circuit breaker”The circuit breaker counts batch invocations, not individual messages. One failed batch increments the failure counter by one.
group.CircuitBreaker(cb => cb .FailureThreshold(5) .SamplingWindow(TimeSpan.FromSeconds(30)) .PauseDuration(TimeSpan.FromSeconds(15)));When the circuit opens, all workers in the group pause. See Resilience for circuit breaker configuration.
Transactional outbox
Section titled “Transactional outbox”Decorate your batch consumer with [Transactional] to wrap each batch invocation in a transaction. All outbox entries produced within the handler commit atomically.
[Transactional]public sealed class ScanBatchConsumer( IEventProducer<string, RerouteCommand> producer) : IBatchConsumer<PackageScan>{ public async Task ConsumeAsync( ConsumeContext<MessageBatch<PackageScan>> context, CancellationToken cancellationToken) { foreach (var item in context.Message.Items) { if (NeedsReroute(item.Message)) { await producer.ProduceAsync(item.Message.Barcode, /* ... */, cancellationToken); } } // All produced messages commit atomically on success. }}Each retry attempt gets a fresh transaction. See Outbox for persistence setup.
Offset commit
Section titled “Offset commit”Offsets for all messages in a batch are committed together after the handler completes (or after retry exhaustion). The watermark algorithm ensures offsets advance only when all prior messages have been processed. A failed batch does not block the watermark for other workers; each worker tracks its own offsets independently.
The commit interval is the same as single-message mode:
group.CommitInterval = TimeSpan.FromSeconds(5); // defaultConstraints
Section titled “Constraints”Batch consuming has explicit restrictions that are enforced at registration time.
One batch consumer per group. You cannot register multiple IBatchConsumer<T> implementations on the same consumer group.
No fan-out. AddBatchConsumer<T>() and AddConsumer<T>() are mutually exclusive on the same group. If you need both batch and single-message consumers for the same topic, use separate consumer groups.
No content-based routing. AddBatchConsumer<T>() and AddRouter() are mutually exclusive. Routing dispatches individual messages to different handlers; this is incompatible with batch accumulation.
No per-consumer middleware. Unlike AddConsumer<T>(consumer => { consumer.Use<T>(); }), batch consumers do not support per-handler middleware configuration. Register middleware at the group level instead.