Observers
Observers are a lightweight alternative to middleware for cross-cutting concerns. They receive lifecycle notifications at fixed points (before, after, on error) without needing to call next. Use them when you want to observe the pipeline, not intercept it.
Exceptions thrown by observers are caught and logged individually. A failing observer never affects other observers or the message pipeline.
IConsumeObserver
Receives notifications for every inbound message across all transport consumers:
public class AuditConsumeObserver(IAuditLog audit) : IConsumeObserver{ public Task OnConsumingAsync<T>(ConsumeContext<T> context) { audit.Record($"Consuming {typeof(T).Name} at {context.Timestamp}"); return Task.CompletedTask; }
public Task OnConsumedAsync<T>(ConsumeContext<T> context) { audit.Record($"Consumed {typeof(T).Name}: success"); return Task.CompletedTask; }
public Task OnConsumeErrorAsync<T>(ConsumeContext<T> context, Exception exception) { audit.Record($"Consume error: {exception.Message}"); return Task.CompletedTask; }}All three methods have default implementations that return Task.CompletedTask, so you only need to implement what you care about.
Register at the global level:
emit.AddConsumeObserver<AuditConsumeObserver>();IConsumeObserver fires for Kafka consumers. It does not fire for mediator requests; use IMediatorObserver for those.
IProduceObserver
Fires for every produce call through the outbound pipeline.
| Method | When it fires |
|---|---|
OnProducingAsync<T>(SendContext<T>) | Before the outbound pipeline executes |
OnProducedAsync<T>(SendContext<T>) | Produce succeeded |
OnProduceErrorAsync<T>(SendContext<T>, Exception) | Pipeline threw before or during the outbox write |
Register globally:
emit.AddProduceObserver<MyProduceObserver>();IOutboxObserver
Fires at outbox entry lifecycle points: enqueue and the worker’s processing stages.
| Method | When it fires |
|---|---|
OnEnqueuedAsync | Entry was written to the outbox (fires from the outbound pipeline) |
OnProcessingAsync | Worker is about to attempt delivery |
OnProcessedAsync | Delivery succeeded; entry will be deleted |
OnProcessErrorAsync | Delivery failed; entry stays for the next poll cycle |
Register globally:
emit.AddOutboxObserver<MyOutboxObserver>();OnEnqueuedAsync fires in your request scope. The remaining three fire from the background worker.
IMediatorObserver
Fires for mediator request/response operations.
| Method | When it fires |
|---|---|
OnHandlingAsync<T>(MediatorContext<T>) | Before the handler is invoked |
OnHandledAsync<T>(MediatorContext<T>) | After the handler completes successfully |
OnHandleErrorAsync<T>(MediatorContext<T>, Exception) | If the handler throws |
IMediatorObserver is registered on the mediator builder, not on EmitBuilder:
emit.AddMediator(mediator =>{ mediator.AddObserver<MyMediatorObserver>();});IDaemonObserver
Fires when daemon assignments change on the current node.
| Method | When it fires |
|---|---|
OnDaemonAssignedAsync | A daemon was assigned to this node (before it starts) |
OnDaemonStartedAsync | The daemon has started running on this node |
OnDaemonStoppedAsync | The daemon stopped cleanly on this node |
OnDaemonRevokedAsync | The leader revoked the daemon from this node |
All four methods receive (string daemonId, Guid nodeId, CancellationToken).
Register globally:
emit.AddDaemonObserver<MyDaemonObserver>();The built-in outbox daemon (emit:outbox) is the most common daemon you will observe. Use OnDaemonRevokedAsync to detect when outbox processing shifts to another node during a rebalance.
Observer vs. middleware
Observers are the right tool when you need simple before/after/error hooks and do not need to:
- Intercept or modify the message
- Short-circuit the pipeline
- Execute async code at a specific position relative to other middleware
Use middleware when you need any of the above, or when execution order matters relative to other pipeline stages.