MongoDB
The Emit.Persistence.MongoDB package provides MongoDB-backed implementations of the outbox repository and the distributed lock provider. You supply the connection; Emit handles everything else.
Installation
dotnet add package Emit.Persistence.MongoDBRegistration
// Register your IMongoClient however you normally would.builder.Services.AddSingleton<IMongoClient>(new MongoClient("mongodb://localhost:27017"));
builder.Services.AddEmit(emit =>{ emit.AddMongoDb(mongo => { mongo.Configure((sp, ctx) => { ctx.Client = sp.GetRequiredService<IMongoClient>(); ctx.Database = ctx.Client.GetDatabase("pizzeria-db"); }) .UseOutbox() .UseDistributedLock(); });});Configure is required and is the only place you hand Emit your IMongoClient and IMongoDatabase. The callback is deferred to resolution time, so you can safely pull services from the IServiceProvider.
UseOutbox() and UseDistributedLock() are independent. Enable whichever you need. When UseOutbox() is called, the background delivery worker starts and all producers route through the outbox by default. See Producers for how to opt out with UseDirect().
Collections
Emit creates and manages its own collections inside the database you provide. You do not need to create them manually: indexes and TTL configuration are applied automatically on startup. All collection names are prefixed with emit. to avoid collisions with your own collections.
Transactional outbox
For the outbox to work correctly, your business write and your produce call must share the same MongoDB session. Emit provides two ways to manage this. For the full explanation of the outbox and the three transaction approaches, see Transactional Outbox.
[Transactional] attribute
The simplest option for consumers and handlers. The pipeline begins a transaction before invoking your handler and commits (or rolls back) after it returns. Inject IMongoSessionAccessor to pass the active session to your MongoDB operations:
[Transactional]public sealed class PizzaOrderedConsumer( IMongoSessionAccessor sessionAccessor, IEventProducer<string, PizzaOrderConfirmed> producer, IMongoCollection<PizzaOrderDocument> orders) : IConsumer<PizzaOrdered>{ public async Task ConsumeAsync(ConsumeContext<PizzaOrdered> ctx, CancellationToken ct) { var session = sessionAccessor.Session!; await orders.InsertOneAsync(session, new PizzaOrderDocument(ctx.Message), cancellationToken: ct); await producer.ProduceAsync(..., ct); // Transaction committed automatically by middleware. }}IUnitOfWork
For hosted services, background jobs, or anywhere outside the consumer pipeline, manage the transaction explicitly:
public async Task HandleAsync(PlacePizzaOrder request, CancellationToken ct){ await using var tx = await unitOfWork.BeginAsync(ct);
var session = sessionAccessor.Session!; await ordersCollection.InsertOneAsync(session, new PizzaOrderDocument(request), cancellationToken: ct);
await producer.ProduceAsync( new EventMessage<string, PizzaOrdered>(request.PizzaId.ToString(), new PizzaOrdered(request)), ct);
await tx.CommitAsync(ct);}IUnitOfWork and IMongoSessionAccessor are registered as scoped services automatically when UseOutbox() is called.
IMongoSessionAccessor.Session is null until a transaction is started. Within a [Transactional] handler or after IUnitOfWork.BeginAsync() is called, Session holds the active IClientSessionHandle.
Serialization
Emit uses the official MongoDB.Driver BSON conventions. OutboxEntry fields are stored with camelCase keys by default, consistent with the rest of your MongoDB documents.
Outbox options
Pass a configuration action to UseOutbox() to tune the worker:
.UseOutbox(options =>{ options.PollingInterval = TimeSpan.FromSeconds(3); options.BatchSize = 200;})See Transactional Outbox for the full option reference.