Skip to content

Custom Middleware

Middleware in Emit follows the same pattern as ASP.NET: each component receives the context and a next delegate. Call next to continue the pipeline; skip it to short-circuit. See Concepts: Pipeline for the overall model.

Implementing middleware

public class TimingMiddleware<T> : IMiddleware<ConsumeContext<T>>
{
private readonly ILogger<TimingMiddleware<T>> _logger;
public TimingMiddleware(ILogger<TimingMiddleware<T>> logger)
{
_logger = logger;
}
public async Task InvokeAsync(ConsumeContext<T> context, IMiddlewarePipeline<ConsumeContext<T>> next)
{
var sw = Stopwatch.StartNew();
try
{
await next.InvokeAsync(context);
}
finally
{
_logger.LogDebug("Message handled in {ElapsedMs}ms", sw.ElapsedMilliseconds);
}
}
}

For outbound middleware, replace ConsumeContext<T> with SendContext<T>.

IMiddlewarePipeline<in TContext> is contravariant, so a pipeline typed for a base context works with any derived context. Emit closes the open generic at registration time for each message type it handles.

Registering middleware

Global (all messages, all providers)

builder.Services.AddEmit(emit =>
{
emit.InboundPipeline.Use<TimingMiddleware<>>(); // open generic
emit.OutboundPipeline.Use<SigningMiddleware<>>();
});

Provider level (all messages for one provider)

emit.AddKafka(kafka =>
{
kafka.InboundPipeline.Use<KafkaHeaderMiddleware<>>();
});

Consumer group level

topic.ConsumerGroup("order-processor", group =>
{
group.Use<OrderValidationMiddleware<OrderPlaced>>(); // closed generic is fine here
group.AddConsumer<OrderPlacedConsumer>();
});

Per-consumer level

group.AddConsumer<OrderPlacedConsumer>(consumer =>
{
consumer.Use<ConsumerSpecificMiddleware<OrderPlaced>>();
});

Open generics work at the global and provider levels. Closed generics are required at the group and consumer levels, since the message type is known there.

Middleware lifetimes

group.Use<MyMiddleware>(MiddlewareLifetime.Singleton); // one shared instance (default)
group.Use<MyMiddleware>(MiddlewareLifetime.Scoped); // one instance per message

Singleton is the default. One instance is shared across all concurrent workers, so it must be thread-safe and cannot hold per-message state. Scoped middleware gets a fresh instance per message and can safely hold state across its InvokeAsync call.

See Configuration Reference: Middleware lifetimes for the enum values.

Factory-based registration

Sometimes you need to construct middleware manually instead of letting the DI container resolve it. Use the factory overload:

group.Use<OrderPlaced>(sp => new CustomMiddleware<OrderPlaced>(
sp.GetRequiredService<ILogger<CustomMiddleware<OrderPlaced>>>(),
sp.GetRequiredService<IFeatureFlags>()));

The factory receives IServiceProvider and returns the middleware instance. A MiddlewareLifetime can be passed as the second parameter; the default is Singleton.

This works at all registration levels (global, provider, group, consumer) for both inbound and outbound pipelines.

Accessing services

Middleware is resolved from the DI container. For scoped dependencies in Scoped middleware, constructor injection works directly. For singleton middleware that needs per-message scoped services, resolve from context.Services:

public async Task InvokeAsync(ConsumeContext<T> context, IMiddlewarePipeline<ConsumeContext<T>> next)
{
var repo = context.Services.GetRequiredService<IOrderRepository>();
// ...
await next.InvokeAsync(context);
}

Short-circuiting

Returning without calling next aborts the pipeline. The message is acknowledged and not retried:

public async Task InvokeAsync(ConsumeContext<T> context, IMiddlewarePipeline<ConsumeContext<T>> next)
{
if (context.TransportContext is KafkaTransportContext<string> { Key: "__test__" })
{
_logger.LogDebug("Skipping test message");
return; // pipeline ends here
}
await next.InvokeAsync(context);
}

To treat the message as failed and trigger the error policy, throw instead of returning.

Execution order

Middleware executes in registration order, outermost first. Emit’s built-in instrumentation (tracing, metrics, observers) always runs before your global middleware.

The transactional middleware wraps only the handler invocation, running inside the retry loop. Custom middleware registered at the consumer level runs outside the transaction unless it is placed after the transactional middleware in the pipeline.

The last-registered middleware at each level is closest to the handler.