Skip to content

Observers

Observers are a lightweight alternative to middleware for cross-cutting concerns. They receive lifecycle notifications at fixed points (before, after, on error) without needing to call next. Use them when you want to observe the pipeline, not intercept it.

Exceptions thrown by observers are caught and logged individually. A failing observer never affects other observers or the message pipeline.

IConsumeObserver

Receives notifications for every inbound message across all transport consumers:

public class AuditConsumeObserver(IAuditLog audit) : IConsumeObserver
{
public Task OnConsumingAsync<T>(ConsumeContext<T> context)
{
audit.Record($"Consuming {typeof(T).Name} at {context.Timestamp}");
return Task.CompletedTask;
}
public Task OnConsumedAsync<T>(ConsumeContext<T> context)
{
audit.Record($"Consumed {typeof(T).Name}: success");
return Task.CompletedTask;
}
public Task OnConsumeErrorAsync<T>(ConsumeContext<T> context, Exception exception)
{
audit.Record($"Consume error: {exception.Message}");
return Task.CompletedTask;
}
}

All three methods have default implementations that return Task.CompletedTask, so you only need to implement what you care about.

Register at the global level:

emit.AddConsumeObserver<AuditConsumeObserver>();

IConsumeObserver fires for Kafka consumers. It does not fire for mediator requests; use IMediatorObserver for those.

IProduceObserver

Fires for every produce call through the outbound pipeline.

MethodWhen it fires
OnProducingAsync<T>(SendContext<T>)Before the outbound pipeline executes
OnProducedAsync<T>(SendContext<T>)Produce succeeded
OnProduceErrorAsync<T>(SendContext<T>, Exception)Pipeline threw before or during the outbox write

Register globally:

emit.AddProduceObserver<MyProduceObserver>();

IOutboxObserver

Fires at outbox entry lifecycle points: enqueue and the worker’s processing stages.

MethodWhen it fires
OnEnqueuedAsyncEntry was written to the outbox (fires from the outbound pipeline)
OnProcessingAsyncWorker is about to attempt delivery
OnProcessedAsyncDelivery succeeded; entry will be deleted
OnProcessErrorAsyncDelivery failed; entry stays for the next poll cycle

Register globally:

emit.AddOutboxObserver<MyOutboxObserver>();

OnEnqueuedAsync fires in your request scope. The remaining three fire from the background worker.

IMediatorObserver

Fires for mediator request/response operations.

MethodWhen it fires
OnHandlingAsync<T>(MediatorContext<T>)Before the handler is invoked
OnHandledAsync<T>(MediatorContext<T>)After the handler completes successfully
OnHandleErrorAsync<T>(MediatorContext<T>, Exception)If the handler throws

IMediatorObserver is registered on the mediator builder, not on EmitBuilder:

emit.AddMediator(mediator =>
{
mediator.AddObserver<MyMediatorObserver>();
});

IDaemonObserver

Fires when daemon assignments change on the current node.

MethodWhen it fires
OnDaemonAssignedAsyncA daemon was assigned to this node (before it starts)
OnDaemonStartedAsyncThe daemon has started running on this node
OnDaemonStoppedAsyncThe daemon stopped cleanly on this node
OnDaemonRevokedAsyncThe leader revoked the daemon from this node

All four methods receive (string daemonId, Guid nodeId, CancellationToken).

Register globally:

emit.AddDaemonObserver<MyDaemonObserver>();

The built-in outbox daemon (emit:outbox) is the most common daemon you will observe. Use OnDaemonRevokedAsync to detect when outbox processing shifts to another node during a rebalance.

Observer vs. middleware

Observers are the right tool when you need simple before/after/error hooks and do not need to:

  • Intercept or modify the message
  • Short-circuit the pipeline
  • Execute async code at a specific position relative to other middleware

Use middleware when you need any of the above, or when execution order matters relative to other pipeline stages.