Skip to content

Consumers

IConsumer

IConsumer<TValue> is where your message handling lives. Implement it, register it on a consumer group, and Emit manages the rest:

public class PizzaOrderedConsumer(ILogger<PizzaOrderedConsumer> logger)
: IConsumer<PizzaOrdered>
{
public async Task ConsumeAsync(ConsumeContext<PizzaOrdered> context, CancellationToken cancellationToken)
{
var pizza = context.Message;
logger.LogInformation("Preparing pizza {PizzaId} for {CustomerId}", pizza.PizzaId, pizza.CustomerId);
}
}
kafka.Topic<string, PizzaOrdered>("pizzas", topic =>
{
topic.ConsumerGroup("pizza-kitchen", group =>
{
group.AddConsumer<PizzaOrderedConsumer>();
});
});

Consumers are resolved from the DI container per message, so constructor injection works normally.

ConsumeContext

ConsumeContext<TValue> carries the message and the metadata your handler needs most:

MemberTypeDescription
MessageTValueThe deserialized message value
HeadersIReadOnlyList<...>Message headers
RetryAttemptintCurrent retry count (0 on first attempt)
TransportContextTransportContextKafka-specific metadata
TransactionITransactionContext?Active transaction, if any
ServicesIServiceProviderScoped provider for this message

For the full list of properties and the payload bag API, see Feature Collection.

Accessing Kafka-specific data

TransportContext is a KafkaTransportContext<TKey> on the consume side. Cast it to read the deserialized key and broker metadata:

if (context.TransportContext is KafkaTransportContext<string> kafka)
{
logger.LogInformation("Key={Key} Topic={Topic} Partition={Partition} Offset={Offset}",
kafka.Key, kafka.Topic, kafka.Partition, kafka.Offset);
}

Worker pool

The worker pool controls how many messages your instance processes concurrently. Sizing it correctly requires a working understanding of how Kafka distributes work.

Partitions: the unit of parallelism

A Kafka topic is not a single queue. It is divided into partitions: independent, ordered logs. Inside a single partition, messages are always delivered in write order. Across partitions, there is no ordering guarantee.

When a producer sends a message, Kafka assigns it to one partition, typically based on the message key. More partitions means more consumers can read simultaneously.

Consumer groups and partition assignment

A consumer group is a named set of consumers that share the work of reading a topic. Kafka’s contract: each partition is consumed by exactly one member of the group at a time. Two consumers in the same group never read the same partition simultaneously.

When you run multiple instances with the same group ID, Kafka divides partitions between them automatically. This redistribution (a rebalance) happens whenever an instance joins or leaves. No configuration required; Kafka handles it.

Scale to three instances and each gets two partitions. Take one down and Kafka reassigns its partitions to the survivors within seconds. Horizontal scaling with zero code changes.

Workers within an instance

Each application instance runs one Kafka consumer per group. That consumer polls its assigned partitions and feeds messages into Emit’s worker pool: a set of concurrent processing channels within the instance.

Each worker handles one message at a time. With three workers, the instance processes up to three messages simultaneously. A fourth message waits for a slot to open, or queues in the channel buffer (up to BufferSize) while waiting.

topic.ConsumerGroup("pizza-kitchen", group =>
{
group.WorkerCount = 3;
group.BufferSize = 32; // per-worker channel capacity
group.AddConsumer<PizzaOrderedConsumer>();
});

How many workers?

The rule of thumb: match WorkerCount to the number of partitions your instance will be assigned.

If orders has 6 partitions and you run 2 instances, Kafka assigns 3 partitions to each. Set WorkerCount = 3 and each partition gets a dedicated worker, full parallelism, no contention.

More workers than partitions? Extra workers stay idle. They hold a small buffer channel in memory but cause no harm. If you scale dynamically and can’t predict the exact partition count per instance, erring slightly high is safe.

Fewer workers than partitions? Multiple partitions compete for the same workers. A slow message from Partition 0 can hold up Partition 1 and Partition 2, even though those messages are already in the buffer. You lose the parallelism the partitions were designed to provide.

Distribution strategies

The distribution strategy answers one question: when a message arrives, which worker gets it? It only matters when WorkerCount > 1.

ByKeyHash (default)

Emit computes hash(key) % WorkerCount and routes the message to that worker. Because the hash of a key never changes, the same key always maps to the same worker, for every message, without exception.

Think of it as giving each customer a dedicated lane. Customer 42’s orders always land on Worker 2. Customer 99’s orders always land on Worker 3. Three orders for Customer 42 arrive in quick succession, Worker 2 processes them in sequence. Worker 3 handles Customer 99 in parallel. Neither lane waits for the other.

order-1 and order-3 both belong to cust-42, so both route to Worker 2 and process in arrival order. order-2 runs on Worker 3 simultaneously; the two customers are independent.

The tradeoff: load is not perfectly balanced. A very active key monopolizes its worker while others run lighter. That is usually the right tradeoff; correctness is worth more than perfect load distribution.

Use ByKeyHash when ordering within a key matters: state transitions for the same entity, inventory updates for the same product SKU, anything where two messages about the same subject must not process out of order.

RoundRobin

Workers are assigned in rotation. Message 1 goes to Worker 0, message 2 to Worker 1, message 3 to Worker 2, then back to Worker 0. The message key is ignored entirely.

This spreads load as evenly as possible and maximizes throughput across the pool. The cost: two messages from the same source can land on different workers and complete in a different order than they arrived.

Use RoundRobin when messages are independent and throughput matters more than ordering: audit events, telemetry records, fire-and-forget notifications, anything where arrival-order completion is not required.

Offset commit

Emit commits offsets automatically using a watermark algorithm. The offset advances only when all messages up to that point have been processed. A slow message at offset 100 blocks the commit at 99 even if offsets 101 through 200 have already finished.

Configure the commit interval:

group.CommitInterval = TimeSpan.FromSeconds(5); // default

Commits happen in the background on this interval. You do not call commit manually.

Transactional consumers

Decorate your consumer class with [Transactional] to have the pipeline wrap each invocation in a unit-of-work transaction. All outbox entries produced within the handler commit atomically with any business data writes. Each retry attempt gets a fresh transaction.

[Transactional]
public sealed class PizzaOrderedConsumer(
IEventProducer<string, PizzaDispatched> producer) : IConsumer<PizzaOrdered>
{
public async Task ConsumeAsync(ConsumeContext<PizzaOrdered> context, CancellationToken ct)
{
await producer.ProduceAsync(..., ct);
// Transaction commits on success, rolls back on failure.
}
}

For the full transactional outbox story, see Outbox.

Per-consumer middleware

Register middleware that wraps only a specific consumer:

group.AddConsumer<PizzaOrderedConsumer>(consumer =>
{
consumer.Use<RetryLoggingMiddleware>();
});

This runs inside any group-level middleware.

Consumer group middleware

Middleware registered on the group itself wraps all consumers in that group:

topic.ConsumerGroup("pizza-kitchen", group =>
{
group.Use<TracingMiddleware>();
group.AddConsumer<PizzaOrderedConsumer>();
});

For error handling, circuit breakers, and rate limiting at the group level, see Error Policies and Resilience.

Graceful shutdown

When the application stops, Emit waits up to WorkerStopTimeout for in-flight messages to complete before forcing shutdown:

group.WorkerStopTimeout = TimeSpan.FromSeconds(30); // default

Flow control

IConsumerFlowControl lets you pause and resume message delivery without dropping the Kafka connection. The consumer stays in the group and holds its partition assignments; it simply stops receiving new messages until you resume.

public class ExternalHealthMonitor(IConsumerFlowControl flowControl) : BackgroundService
{
protected override async Task ExecuteAsync(CancellationToken ct)
{
while (!ct.IsCancellationRequested)
{
var healthy = await CheckExternalDependency(ct);
if (!healthy)
await flowControl.PauseAsync(ct);
else
await flowControl.ResumeAsync(ct);
await Task.Delay(TimeSpan.FromSeconds(10), ct);
}
}
}

The primary use case is back-pressure from external dependencies. When a downstream service becomes unhealthy, pausing consumption prevents a flood of failed messages from piling up in your retry queue. No rebalance occurs, so partition assignments are preserved for when you resume.

Both PauseAsync and ResumeAsync are idempotent: calling PauseAsync when already paused, or ResumeAsync when already running, is a no-op.

Inject IConsumerFlowControl into any service or middleware that needs to influence consumption rate.

Message filtering

IConsumerFilter<TMessage> lets you skip messages before they reach the handler. Return false to short-circuit the pipeline without triggering the error policy; the offset is committed and the message is silently dropped.

public class ActiveOrderFilter : IConsumerFilter<PizzaOrdered>
{
public ValueTask<bool> ShouldConsumeAsync(ConsumeContext<PizzaOrdered> context, CancellationToken ct)
=> ValueTask.FromResult(context.Message.Status == OrderStatus.Active);
}

Register it on the consumer:

group.AddConsumer<PizzaOrderedConsumer>(consumer =>
{
consumer.Filter<ActiveOrderFilter>();
});

For inline predicates without a dedicated class:

// Synchronous
consumer.Filter<PizzaOrdered>(ctx => ctx.Message.Status == OrderStatus.Active);
// Async
consumer.Filter<PizzaOrdered>(async (ctx, ct) =>
await featureFlags.IsEnabledAsync("pizza-kitchen", ct));

Multiple filters can be registered on the same consumer. All registered filters must return true for the message to proceed; AND logic. If any filter returns false, the message is skipped and no further filters are evaluated.

Consumer observers

IKafkaConsumerObserver provides Kafka-specific lifecycle events. All methods have default no-op implementations; implement only the ones you care about:

MethodEvent recordWhen it fires
OnConsumerStartedAsyncConsumerStartedEvent(GroupId, Topic, WorkerCount)Worker begins polling
OnConsumerStoppedAsyncConsumerStoppedEvent(GroupId, Topic)Worker stops cleanly
OnConsumerFaultedAsyncConsumerFaultedEvent(GroupId, Topic, Exception)Worker crashes and triggers a restart
OnPartitionsAssignedAsyncPartitionsAssignedEvent(GroupId, Topic, Partitions)Rebalance assigns new partitions
OnPartitionsRevokedAsyncPartitionsRevokedEvent(GroupId, Topic, Partitions)Rebalance gracefully revokes partitions
OnPartitionsLostAsyncPartitionsLostEvent(GroupId, Topic, Partitions)Unclean partition loss
OnOffsetsCommittedAsyncOffsetsCommittedEvent(GroupId, Offsets)Offsets successfully committed
OnOffsetCommitErrorAsyncOffsetCommitErrorEvent(GroupId, Offsets, Exception)Offset commit fails
OnDeserializationErrorAsyncDeserializationErrorEvent(GroupId, Topic, Partition, Offset, Exception)Message fails deserialization before pipeline entry

OnConsumerFaultedAsync and OnPartitionsLostAsync are the most operationally significant; they signal conditions that require attention rather than normal lifecycle transitions.

The three partition events each represent a distinct situation. OnPartitionsAssignedAsync fires during a normal rebalance when new partitions become ready; use it to initialize per-partition state such as caches or counters. OnPartitionsRevokedAsync fires during a cooperative (planned) rebalance before the partition moves to another consumer; you have a window here to flush state or commit any side effects. OnPartitionsLostAsync is different in kind: it fires after an unclean loss, typically a session timeout, where the partition has already been reassigned. Any in-progress work on those partitions may be duplicated by the new owner, so do not attempt to flush; just clean up local state and move on.

Register on the Kafka builder:

kafka.AddConsumerObserver<MyKafkaObserver>();

For transport-agnostic consume lifecycle hooks, use IConsumeObserver at the global level. See Observers for details on both.