Skip to content

Pipeline Model

Every message in Emit flows through a layered middleware pipeline before reaching its handler or destination. The pipeline model is the same regardless of which messaging provider you use: inbound messages go through a consume pipeline, outbound messages go through a produce pipeline, and your middleware slots into both.

Understanding this model makes it straightforward to reason about ordering, error handling, and where to put things.

Inbound pipeline

The inbound path has two distinct phases: transport processing, then per-consumer pipeline execution. The split exists because some work belongs to the infrastructure (deserializing bytes, distributing messages across workers) and some belongs to your application logic.

Transport phase

Before any middleware runs, Emit handles the mechanics of getting a message from the broker into a usable form:

  1. Poll. The consumer group reads raw bytes from the broker and distributes them across worker channels.
  2. Deserialize. Key and value are deserialized using the registered IDeserializer<T> or IAsyncDeserializer<T>. Deserialization errors are handled separately via OnDeserializationError on the consumer group and never enter the consumer pipeline. A message that cannot be deserialized cannot be processed, so routing it through retry and dead-letter logic would be misleading.
  3. Fan-out. For each consumer or router registered on the consumer group, Emit creates a fresh DI scope with its own ConsumeContext<TValue>. Each entry runs its pipeline independently.

Consumer pipeline layers

After fan-out, each consumer runs its own fully composed pipeline from outermost to innermost:

LayerWhat it doesAlways present?
Error handlingCatches exceptions, evaluates the error policy (dead-letter, discard), notifies the circuit breakerYes
ObserversInvokes registered IConsumeObserver callbacksYes
TracingCreates an emit.consume activity with consumer identity tagsYes
MetricsRecords duration and success/failure countersYes
User middlewareYour custom middleware, composed Global to Provider to GroupOnly if registered
ValidationRuns IMessageValidator<T> before the handlerOnly if configured
RetryRe-executes the handler on failure with configurable backoffOnly if configured
TransactionalWraps the handler in a unit-of-work transaction when the handler class carries [Transactional]Only if [Transactional] is used
Per-consumer middlewareMiddleware registered on AddConsumer or per-routeOnly if registered
HandlerYour IConsumer<T>.ConsumeAsync()Yes

The ordering is deliberate. Error handling sits outermost so it catches exceptions from every other layer. Validation runs outside retry so a malformed message fails fast without burning retry attempts. User middleware runs inside tracing and metrics so your code is automatically observed without any extra plumbing.

Fan-out and independent failure

When a consumer group has multiple consumers or routers, each gets independent pipeline execution per message. They share the deserialized message but each receives a fresh IServiceScope, its own context instances, and isolated error handling. One consumer failing does not affect the others.

Content-based routing

Routers add a second level of dispatch. A route selector evaluates the message content and dispatches to the matched route’s sub-pipeline. Each route can carry its own per-route middleware and handler.

Unmatched messages are either discarded silently (if configured via WhenRouteUnmatched) or throw an UnmatchedRouteException for the error policy to handle.

Outbound pipeline

The outbound path is simpler: no fan-out, no deserialization. When you call IEventProducer<TKey, TValue>.ProduceAsync(), the message flows through a single pipeline:

LayerWhat it doesAlways present?
ObserversInvokes registered IProduceObserver callbacksYes
TracingCreates an emit.produce activityYes
MetricsRecords produce duration and countersYes
User middlewareYour custom middleware, composed Global to Provider to per-producerOnly if registered
TerminalDelivers the message to its destinationYes

The terminal is selected at registration time. When outbox infrastructure is configured, the outbox terminal is the default. Producers can opt out with UseDirect():

  • Outbox (default when outbox infrastructure is configured): enqueues to the database outbox within the current transaction scope. A background worker delivers to the broker asynchronously.
  • Direct (p.UseDirect()): serializes and produces to the broker immediately.

Context objects

Middleware receives a typed context that carries everything relevant to one message’s journey. The context type determines which pipeline direction the middleware belongs to.

ConsumeContext<T> is the inbound context. It extends MessageContext<T>, which extends MessageContext. Beyond the common properties, it adds a TransportContext reference, the raw headers from the broker, a RetryAttempt counter, and an optional Transaction when a unit-of-work is active.

SendContext<T> is the outbound context. It also extends MessageContext<T> and adds a mutable headers list you can write to before the message leaves.

All contexts share the MessageContext base: MessageId, Timestamp, DestinationAddress, SourceAddress, CancellationToken, Services (the current DI scope), a feature collection, and a payload bag.

Transport context

ConsumeContext<T> holds a reference to its parent TransportContext, which carries transport-level data: raw bytes, headers, and the provider ID. For Kafka specifically, this is KafkaTransportContext<TKey> and includes topic, partition, offset, group ID, and the deserialized key:

if (context.TransportContext is KafkaTransportContext<string> kafka)
logger.LogDebug("Key: {Key}, Partition: {P}", kafka.Key, kafka.Partition);

See Feature Collection & Payload Bag for the full list of built-in payloads.

Middleware levels

User middleware composes across four levels, each narrowing scope. Higher levels run outside lower levels, exactly like ASP.NET Core middleware wrapping your endpoints.

LevelScopeConfigured on
GlobalEvery message, every providerEmitBuilder
ProviderAll messages for one providerProvider builder (e.g. KafkaBuilder)
GroupOne consumer group or producerGroup or producer builder
LeafOne consumer, route, or per-producer registrationConsumer, route, or producer builder

Within each level, middleware executes in registration order.

Middleware lifetimes

group.Use<MyMiddleware>(MiddlewareLifetime.Singleton); // one shared instance (default)
group.Use<MyMiddleware>(MiddlewareLifetime.Scoped); // one instance per message

Singleton is the default. One instance is shared across all concurrent workers, so it must be thread-safe and cannot hold per-message state in fields.

Scoped creates a fresh instance for each message. It can safely hold per-message state in fields and is resolved from the message’s DI scope.

Built-in middleware

Emit automatically registers error handling, tracing, metrics, and observer middleware for every pipeline. You do not add them manually and cannot change their order relative to each other. Your custom middleware always runs inside these built-in layers, which means it is automatically traced and measured with no extra work on your part.

For more on writing middleware, see Custom Middleware.