Custom Middleware
Middleware in Emit follows the same pattern as ASP.NET: each component receives the context and a next delegate. Call next to continue the pipeline; skip it to short-circuit. See Concepts: Pipeline for the overall model.
Implementing middleware
public class TimingMiddleware<T> : IMiddleware<ConsumeContext<T>>{ private readonly ILogger<TimingMiddleware<T>> _logger;
public TimingMiddleware(ILogger<TimingMiddleware<T>> logger) { _logger = logger; }
public async Task InvokeAsync(ConsumeContext<T> context, IMiddlewarePipeline<ConsumeContext<T>> next) { var sw = Stopwatch.StartNew(); try { await next.InvokeAsync(context); } finally { _logger.LogDebug("Message handled in {ElapsedMs}ms", sw.ElapsedMilliseconds); } }}For outbound middleware, replace ConsumeContext<T> with SendContext<T>.
IMiddlewarePipeline<in TContext> is contravariant, so a pipeline typed for a base context works with any derived context. Emit closes the open generic at registration time for each message type it handles.
Registering middleware
Global (all messages, all providers)
builder.Services.AddEmit(emit =>{ emit.InboundPipeline.Use<TimingMiddleware<>>(); // open generic emit.OutboundPipeline.Use<SigningMiddleware<>>();});Provider level (all messages for one provider)
emit.AddKafka(kafka =>{ kafka.InboundPipeline.Use<KafkaHeaderMiddleware<>>();});Consumer group level
topic.ConsumerGroup("order-processor", group =>{ group.Use<OrderValidationMiddleware<OrderPlaced>>(); // closed generic is fine here group.AddConsumer<OrderPlacedConsumer>();});Per-consumer level
group.AddConsumer<OrderPlacedConsumer>(consumer =>{ consumer.Use<ConsumerSpecificMiddleware<OrderPlaced>>();});Open generics work at the global and provider levels. Closed generics are required at the group and consumer levels, since the message type is known there.
Middleware lifetimes
group.Use<MyMiddleware>(MiddlewareLifetime.Singleton); // one shared instance (default)group.Use<MyMiddleware>(MiddlewareLifetime.Scoped); // one instance per messageSingleton is the default. One instance is shared across all concurrent workers, so it must be thread-safe and cannot hold per-message state. Scoped middleware gets a fresh instance per message and can safely hold state across its InvokeAsync call.
See Configuration Reference: Middleware lifetimes for the enum values.
Factory-based registration
Sometimes you need to construct middleware manually instead of letting the DI container resolve it. Use the factory overload:
group.Use<OrderPlaced>(sp => new CustomMiddleware<OrderPlaced>( sp.GetRequiredService<ILogger<CustomMiddleware<OrderPlaced>>>(), sp.GetRequiredService<IFeatureFlags>()));The factory receives IServiceProvider and returns the middleware instance. A MiddlewareLifetime can be passed as the second parameter; the default is Singleton.
This works at all registration levels (global, provider, group, consumer) for both inbound and outbound pipelines.
Accessing services
Middleware is resolved from the DI container. For scoped dependencies in Scoped middleware, constructor injection works directly. For singleton middleware that needs per-message scoped services, resolve from context.Services:
public async Task InvokeAsync(ConsumeContext<T> context, IMiddlewarePipeline<ConsumeContext<T>> next){ var repo = context.Services.GetRequiredService<IOrderRepository>(); // ... await next.InvokeAsync(context);}Short-circuiting
Returning without calling next aborts the pipeline. The message is acknowledged and not retried:
public async Task InvokeAsync(ConsumeContext<T> context, IMiddlewarePipeline<ConsumeContext<T>> next){ if (context.TransportContext is KafkaTransportContext<string> { Key: "__test__" }) { _logger.LogDebug("Skipping test message"); return; // pipeline ends here }
await next.InvokeAsync(context);}To treat the message as failed and trigger the error policy, throw instead of returning.
Execution order
Middleware executes in registration order, outermost first. Emit’s built-in instrumentation (tracing, metrics, observers) always runs before your global middleware.
The transactional middleware wraps only the handler invocation, running inside the retry loop. Custom middleware registered at the consumer level runs outside the transaction unless it is placed after the transactional middleware in the pipeline.
The last-registered middleware at each level is closest to the handler.