Skip to content

Testing

Terminal window
dotnet add package Emit.Testing

MessageSink and SinkConsumer

MessageSink<T> collects consumed messages in memory. Pair it with SinkConsumer<T> as a test double for your real consumer:

[Fact]
public async Task Consumer_writes_to_sink()
{
// Arrange
var sink = new MessageSink<PizzaOrdered>();
var services = new ServiceCollection();
services.AddEmit(emit =>
{
emit.AddKafka(kafka =>
{
kafka.ConfigureClient(cfg => cfg.BootstrapServers = "localhost:9092");
kafka.Topic<string, PizzaOrdered>("pizzas", topic =>
{
topic.SetKeyDeserializer(Confluent.Kafka.Deserializers.Utf8);
topic.SetValueDeserializer(myDeserializer);
topic.ConsumerGroup("test-group", group =>
{
group.AddConsumer<SinkConsumer<PizzaOrdered>>();
});
});
});
});
services.AddSingleton(sink);
// Act: produce a message using your real producer or inject one directly
// Assert
var context = await sink.WaitForMessageAsync(TimeSpan.FromSeconds(10));
Assert.Equal(expectedPizzaId, context.Message.PizzaId);
}

For integration tests against a real broker, Testcontainers spins up the container automatically. See tests/Emit.IntegrationTests/INSTRUCT.md for the integration test patterns used in this repository.

WaitForMessageAsync

WaitForMessageAsync blocks until a message arrives or the timeout elapses. It throws TimeoutException if the timeout expires, or OperationCanceledException if the token fires.

For multiple messages, call it sequentially or inspect sink.ReceivedMessages after waiting:

var first = await sink.WaitForMessageAsync(timeout);
var second = await sink.WaitForMessageAsync(timeout);
// or after collecting N messages:
Assert.Equal(3, sink.ReceivedMessages.Count);

Testing the mediator

The mediator is a plain IMediator resolved from DI:

[Fact]
public async Task PlacePizzaOrder_saves_order()
{
// Arrange
var services = new ServiceCollection();
services.AddEmit(emit =>
{
emit.AddMediator(mediator =>
{
mediator.AddHandler<PlacePizzaOrderHandler>();
});
});
services.AddScoped<IPizzaRepository, InMemoryPizzaRepository>();
var provider = services.BuildServiceProvider();
var mediator = provider.GetRequiredService<IMediator>();
// Act
await mediator.SendAsync(new PlacePizzaOrder("customer-1", ["margherita", "extra-cheese"], "123 Main St"));
// Assert
var repo = provider.GetRequiredService<IPizzaRepository>();
Assert.Single(await repo.GetAllAsync());
}

Testing error handling

Register SinkConsumer<T> as the consumer, then produce a message that triggers the error condition. Use sink.ReceivedMessages to verify the message arrived (on success) or did not (on discard). For DLQ assertions, register a second sink on the DLQ topic.

Testing outbox behavior

The outbox adds a database write to the mix. Use real containers via Testcontainers rather than in-memory fakes; the outbox depends on database-level atomicity that no in-memory implementation can replicate.

Testing with IUnitOfWork:

using var scope = host.Services.CreateScope();
var uow = scope.ServiceProvider.GetRequiredService<IUnitOfWork>();
var producer = scope.ServiceProvider.GetRequiredService<IEventProducer<string, string>>();
await using var tx = await uow.BeginAsync();
await producer.ProduceAsync(new EventMessage<string, string>("key", "value"));
await tx.CommitAsync();
var msg = await sink.WaitForMessageAsync();
Assert.Equal("value", msg.Message);

Testing the EF Core implicit approach:

using var scope = host.Services.CreateScope();
var producer = scope.ServiceProvider.GetRequiredService<IEventProducer<string, string>>();
var dbContext = scope.ServiceProvider.GetRequiredService<MyDbContext>();
await producer.ProduceAsync(new EventMessage<string, string>("key", "value"));
await dbContext.SaveChangesAsync();
var msg = await sink.WaitForMessageAsync();
Assert.Equal("value", msg.Message);

Testing a [Transactional] consumer: produce to the input topic and assert on the output sink. Transaction management is automatic.

await producer.ProduceAsync(new EventMessage<string, string>("key", "trigger"));
var msg = await outputSink.WaitForMessageAsync();
Assert.Equal("expected-output", msg.Message);