Skip to content

Validation

Emit validates messages after deserialization and before they reach your handler. If a message fails validation, the pipeline short-circuits: no transaction starts, no retry loop fires, and the handler never sees it. You decide whether invalid messages are discarded or dead-lettered.

You can call Validate once per consumer group. Pick the approach that fits your situation.

Delegate validators

For quick, self-contained checks, pass a function directly:

group.Validate(
msg => msg.Amount > 0
? MessageValidationResult.Success
: MessageValidationResult.Fail("Amount must be positive"),
onFailure => onFailure.Discard());

MessageValidationResult.Success is a singleton. MessageValidationResult.Fail accepts a single string or an IEnumerable<string> when you have multiple errors to report.

Async delegates

When your validation needs I/O (checking a database, calling an external service), use the async overload:

group.Validate(
async (msg, cancellationToken) =>
{
var exists = await db.ExistsAsync(msg.Id, cancellationToken);
return exists
? MessageValidationResult.Success
: MessageValidationResult.Fail("Entity not found");
},
onFailure => onFailure.DeadLetter());

A word of caution: async validators that touch external state run on every incoming message, unconditionally. If you’re checking a database, that query fires even on messages that would later fail for unrelated reasons. Keep your async validators cheap or accept the cost.

Class-based validators

For validation logic that you want to unit test independently or share across consumer groups, implement IMessageValidator<T>:

public class OrderValidator : IMessageValidator<OrderCreated>
{
public Task<MessageValidationResult> ValidateAsync(
OrderCreated message, CancellationToken cancellationToken)
{
if (string.IsNullOrWhiteSpace(message.CustomerId))
return Task.FromResult(MessageValidationResult.Fail("CustomerId is required"));
if (message.Amount <= 0)
return Task.FromResult(MessageValidationResult.Fail("Amount must be positive"));
return Task.FromResult(MessageValidationResult.Success);
}
}

Register it on the consumer group:

group.Validate<OrderValidator>(onFailure => onFailure.Discard());

The validator is resolved from DI, so constructor injection works for any dependencies you need. Emit registers it as transient automatically.

FluentValidation

If your team already uses FluentValidation, install the integration package:

Terminal window
dotnet add package Emit.FluentValidation

Write a standard AbstractValidator<T>:

public class OrderCreatedValidator : AbstractValidator<OrderCreated>
{
public OrderCreatedValidator()
{
RuleFor(x => x.CustomerId).NotEmpty();
RuleFor(x => x.Amount).GreaterThan(0);
}
}

Register your validators in DI and call ValidateWithFluentValidation on the consumer group:

services.AddValidatorsFromAssemblyContaining<OrderCreatedValidator>();
services.AddEmit(emit =>
{
emit.AddKafka(kafka =>
{
kafka.Topic<string, OrderCreated>("orders", topic =>
{
topic.ConsumerGroup("order-processors", group =>
{
group.ValidateWithFluentValidation(onFailure => onFailure.DeadLetter());
group.AddConsumer<OrderCreatedHandler>();
});
});
});
});

Under the hood, ValidateWithFluentValidation resolves IValidator<TMessage> from DI for each message. If no validator is registered for the message type, you get a clear InvalidOperationException at runtime rather than silent passthrough. Both synchronous and asynchronous FluentValidation rules work correctly.

What happens on failure

Every Validate call requires an error action that determines what Emit does with invalid messages:

ActionBehavior
Discard()The message is silently dropped and acknowledged. The handler is never invoked.
DeadLetter()The message is forwarded to the configured dead-letter topic with diagnostic headers.

When a message is dead-lettered due to validation failure, the x-emit-exception-type header contains ValidationException and the individual error messages appear in x-emit-exception-message.

Validation failures vs. validator exceptions

There is an important distinction between a message that fails validation and a validator that throws an exception.

A validation failure is a normal, expected outcome. The message is structurally wrong, a required field is missing, a value is out of range. Emit applies the error action you configured on Validate (discard or dead-letter). The error policy is not involved.

A validator exception is unexpected. Your validator tried to hit a database and the connection timed out, or an internal bug caused a NullReferenceException. Emit treats this the same as any other unhandled exception: it falls through to the group-level OnError policy. If you configured retries there, the message (including the validation step) will be retried.

The rule of thumb: return MessageValidationResult.Fail(...) for deterministic business failures. Let exceptions propagate for transient infrastructure errors that a retry might fix.

Where validation runs in the pipeline

Validation sits early in the inbound pipeline, after deserialization but before everything else:

  1. Message arrives and is deserialized
  2. Validation runs here
  3. Error policy evaluation (retry loop)
  4. Transaction begins (if [Transactional])
  5. Handler executes

Because validation runs before the retry loop, a message that fails validation is never retried. It goes straight to the configured error action. This is by design: if a message has a missing required field, retrying it three times won’t grow one.