Pipeline Model
Every message in Emit flows through a layered middleware pipeline before reaching its handler or destination. The pipeline model is the same regardless of which messaging provider you use: inbound messages go through a consume pipeline, outbound messages go through a produce pipeline, and your middleware slots into both.
Understanding this model makes it straightforward to reason about ordering, error handling, and where to put things.
Inbound pipeline
The inbound path has two distinct phases: transport processing, then per-consumer pipeline execution. The split exists because some work belongs to the infrastructure (deserializing bytes, distributing messages across workers) and some belongs to your application logic.
Transport phase
Before any middleware runs, Emit handles the mechanics of getting a message from the broker into a usable form:
- Poll. The consumer group reads raw bytes from the broker and distributes them across worker channels.
- Deserialize. Key and value are deserialized using the registered
IDeserializer<T>orIAsyncDeserializer<T>. Deserialization errors are handled separately viaOnDeserializationErroron the consumer group and never enter the consumer pipeline. A message that cannot be deserialized cannot be processed, so routing it through retry and dead-letter logic would be misleading. - Fan-out. For each consumer or router registered on the consumer group, Emit creates a fresh DI scope with its own
ConsumeContext<TValue>. Each entry runs its pipeline independently.
Consumer pipeline layers
After fan-out, each consumer runs its own fully composed pipeline from outermost to innermost:
| Layer | What it does | Always present? |
|---|---|---|
| Error handling | Catches exceptions, evaluates the error policy (dead-letter, discard), notifies the circuit breaker | Yes |
| Observers | Invokes registered IConsumeObserver callbacks | Yes |
| Tracing | Creates an emit.consume activity with consumer identity tags | Yes |
| Metrics | Records duration and success/failure counters | Yes |
| User middleware | Your custom middleware, composed Global to Provider to Group | Only if registered |
| Validation | Runs IMessageValidator<T> before the handler | Only if configured |
| Retry | Re-executes the handler on failure with configurable backoff | Only if configured |
| Transactional | Wraps the handler in a unit-of-work transaction when the handler class carries [Transactional] | Only if [Transactional] is used |
| Per-consumer middleware | Middleware registered on AddConsumer or per-route | Only if registered |
| Handler | Your IConsumer<T>.ConsumeAsync() | Yes |
The ordering is deliberate. Error handling sits outermost so it catches exceptions from every other layer. Validation runs outside retry so a malformed message fails fast without burning retry attempts. User middleware runs inside tracing and metrics so your code is automatically observed without any extra plumbing.
Fan-out and independent failure
When a consumer group has multiple consumers or routers, each gets independent pipeline execution per message. They share the deserialized message but each receives a fresh IServiceScope, its own context instances, and isolated error handling. One consumer failing does not affect the others.
Content-based routing
Routers add a second level of dispatch. A route selector evaluates the message content and dispatches to the matched route’s sub-pipeline. Each route can carry its own per-route middleware and handler.
Unmatched messages are either discarded silently (if configured via WhenRouteUnmatched) or throw an UnmatchedRouteException for the error policy to handle.
Outbound pipeline
The outbound path is simpler: no fan-out, no deserialization. When you call IEventProducer<TKey, TValue>.ProduceAsync(), the message flows through a single pipeline:
| Layer | What it does | Always present? |
|---|---|---|
| Observers | Invokes registered IProduceObserver callbacks | Yes |
| Tracing | Creates an emit.produce activity | Yes |
| Metrics | Records produce duration and counters | Yes |
| User middleware | Your custom middleware, composed Global to Provider to per-producer | Only if registered |
| Terminal | Delivers the message to its destination | Yes |
The terminal is selected at registration time. When outbox infrastructure is configured, the outbox terminal is the default. Producers can opt out with UseDirect():
- Outbox (default when outbox infrastructure is configured): enqueues to the database outbox within the current transaction scope. A background worker delivers to the broker asynchronously.
- Direct (
p.UseDirect()): serializes and produces to the broker immediately.
Context objects
Middleware receives a typed context that carries everything relevant to one message’s journey. The context type determines which pipeline direction the middleware belongs to.
ConsumeContext<T> is the inbound context. It extends MessageContext<T>, which extends MessageContext. Beyond the common properties, it adds a TransportContext reference, the raw headers from the broker, a RetryAttempt counter, and an optional Transaction when a unit-of-work is active.
SendContext<T> is the outbound context. It also extends MessageContext<T> and adds a mutable headers list you can write to before the message leaves.
All contexts share the MessageContext base: MessageId, Timestamp, DestinationAddress, SourceAddress, CancellationToken, Services (the current DI scope), a feature collection, and a payload bag.
Transport context
ConsumeContext<T> holds a reference to its parent TransportContext, which carries transport-level data: raw bytes, headers, and the provider ID. For Kafka specifically, this is KafkaTransportContext<TKey> and includes topic, partition, offset, group ID, and the deserialized key:
if (context.TransportContext is KafkaTransportContext<string> kafka) logger.LogDebug("Key: {Key}, Partition: {P}", kafka.Key, kafka.Partition);See Feature Collection & Payload Bag for the full list of built-in payloads.
Middleware levels
User middleware composes across four levels, each narrowing scope. Higher levels run outside lower levels, exactly like ASP.NET Core middleware wrapping your endpoints.
| Level | Scope | Configured on |
|---|---|---|
| Global | Every message, every provider | EmitBuilder |
| Provider | All messages for one provider | Provider builder (e.g. KafkaBuilder) |
| Group | One consumer group or producer | Group or producer builder |
| Leaf | One consumer, route, or per-producer registration | Consumer, route, or producer builder |
Within each level, middleware executes in registration order.
Middleware lifetimes
group.Use<MyMiddleware>(MiddlewareLifetime.Singleton); // one shared instance (default)group.Use<MyMiddleware>(MiddlewareLifetime.Scoped); // one instance per messageSingleton is the default. One instance is shared across all concurrent workers, so it must be thread-safe and cannot hold per-message state in fields.
Scoped creates a fresh instance for each message. It can safely hold per-message state in fields and is resolved from the message’s DI scope.
Built-in middleware
Emit automatically registers error handling, tracing, metrics, and observer middleware for every pipeline. You do not add them manually and cannot change their order relative to each other. Your custom middleware always runs inside these built-in layers, which means it is automatically traced and measured with no extra work on your part.
For more on writing middleware, see Custom Middleware.