OrderFlow: CQRS with MediatR — Commands, Queries, and Pipeline Behaviours
Introduce CQRS into the OrderFlow API using MediatR: separate command and query handlers, validation pipeline behaviour, logging behaviour, and structured request/response contracts.
OrderFlow: CQRS with MediatR — Commands, Queries, and Pipeline Behaviours
This is part 3 of the OrderFlow series. Auth is wired up. Now we refactor the flat endpoint handlers into a proper CQRS structure: commands change state, queries read state, and cross-cutting concerns (validation, logging) live in pipeline behaviours — not scattered through handlers.
Starting point: OrderFlow auth complete.
Why CQRS Here?
Before (flat endpoint handler):
app.MapPost("/api/orders", async (CreateOrderRequest req, OrderFlowDbContext db, ...) =>
{
// validation here
// business logic here
// persistence here
// logging here — all mixed together
});
After (CQRS):
app.MapPost("/api/orders", async (CreateOrderRequest req, ISender mediator) =>
Results.Ok(await mediator.Send(new CreateOrderCommand(req))));
// Concerns separated into:
// CreateOrderCommand — the intent
// CreateOrderCommandHandler — the business logic
// ValidationBehaviour — validates every command (pipeline)
// LoggingBehaviour — logs every request (pipeline)Step 1: Install MediatR
<PackageReference Include="MediatR" Version="12.*" />
<PackageReference Include="FluentValidation.DependencyInjectionExtensions" Version="11.*" />// Program.cs
builder.Services.AddMediatR(cfg =>
cfg.RegisterServicesFromAssembly(typeof(CreateOrderCommand).Assembly));
builder.Services.AddValidatorsFromAssembly(typeof(CreateOrderCommand).Assembly);
// Register pipeline behaviours — order matters (first registered = outermost)
builder.Services.AddTransient(typeof(IPipelineBehavior<,>), typeof(LoggingBehaviour<,>));
builder.Services.AddTransient(typeof(IPipelineBehavior<,>), typeof(ValidationBehaviour<,>));Step 2: Commands
// src/OrderFlow.Application/Orders/Commands/CreateOrder/CreateOrderCommand.cs
public record CreateOrderCommand(
int CustomerId,
List<CreateOrderLineDto> Lines) : IRequest<CreateOrderResult>;
public record CreateOrderLineDto(int ProductId, int Quantity);
public record CreateOrderResult(int OrderId, decimal Total, string Status);
// src/OrderFlow.Application/Orders/Commands/CreateOrder/CreateOrderCommandHandler.cs
public class CreateOrderCommandHandler(
OrderFlowDbContext db,
IProductRepository products,
ILogger<CreateOrderCommandHandler> logger)
: IRequestHandler<CreateOrderCommand, CreateOrderResult>
{
public async Task<CreateOrderResult> Handle(
CreateOrderCommand cmd,
CancellationToken ct)
{
// Load products to get prices
var productIds = cmd.Lines.Select(l => l.ProductId).ToList();
var catalogue = await products.GetByIdsAsync(productIds, ct);
var lines = cmd.Lines.Select(l =>
{
var product = catalogue.First(p => p.Id == l.ProductId);
return new OrderLine
{
ProductId = l.ProductId,
ProductName = product.Name,
Quantity = l.Quantity,
UnitPrice = product.Price,
};
}).ToList();
var order = new Order
{
CustomerId = cmd.CustomerId,
Lines = lines,
Total = lines.Sum(l => l.Quantity * l.UnitPrice),
Status = "Pending",
CreatedAt = DateTime.UtcNow,
};
db.Orders.Add(order);
await db.SaveChangesAsync(ct);
logger.LogInformation("Order {OrderId} created for customer {CustomerId}",
order.Id, order.CustomerId);
return new CreateOrderResult(order.Id, order.Total, order.Status);
}
}// Cancel order command
public record CancelOrderCommand(int OrderId, int RequestingUserId, string Reason)
: IRequest<Unit>;
public class CancelOrderCommandHandler(OrderFlowDbContext db)
: IRequestHandler<CancelOrderCommand, Unit>
{
public async Task<Unit> Handle(CancelOrderCommand cmd, CancellationToken ct)
{
var order = await db.Orders.FindAsync([cmd.OrderId], ct)
?? throw new NotFoundException(nameof(Order), cmd.OrderId);
if (order.CustomerId != cmd.RequestingUserId)
throw new ForbiddenException("You can only cancel your own orders.");
if (order.Status is "Shipped" or "Delivered")
throw new ConflictException($"Cannot cancel an order with status '{order.Status}'.");
order.Status = "Cancelled";
order.CancelledAt = DateTime.UtcNow;
order.CancelReason = cmd.Reason;
await db.SaveChangesAsync(ct);
return Unit.Value;
}
}Step 3: Queries
// src/OrderFlow.Application/Orders/Queries/GetOrders/GetOrdersQuery.cs
public record GetOrdersQuery(
int? CustomerId,
string? Status,
int Page = 1,
int PageSize = 20) : IRequest<PagedResult<OrderSummaryDto>>;
public record OrderSummaryDto(
int Id,
string Status,
decimal Total,
int ItemCount,
DateTime CreatedAt);
public class GetOrdersQueryHandler(OrderFlowDbContext db)
: IRequestHandler<GetOrdersQuery, PagedResult<OrderSummaryDto>>
{
public async Task<PagedResult<OrderSummaryDto>> Handle(
GetOrdersQuery query,
CancellationToken ct)
{
var q = db.Orders.AsNoTracking().AsQueryable();
if (query.CustomerId.HasValue)
q = q.Where(o => o.CustomerId == query.CustomerId.Value);
if (!string.IsNullOrWhiteSpace(query.Status))
q = q.Where(o => o.Status == query.Status);
var total = await q.CountAsync(ct);
var items = await q
.OrderByDescending(o => o.CreatedAt)
.Skip((query.Page - 1) * query.PageSize)
.Take(query.PageSize)
.Select(o => new OrderSummaryDto(
o.Id, o.Status, o.Total, o.Lines.Count, o.CreatedAt))
.ToListAsync(ct);
return new PagedResult<OrderSummaryDto>(items, total, query.Page, query.PageSize);
}
}
// Single order with full detail
public record GetOrderByIdQuery(int OrderId, int RequestingUserId, string RequestingRole)
: IRequest<OrderDetailDto?>;
public class GetOrderByIdQueryHandler(OrderFlowDbContext db)
: IRequestHandler<GetOrderByIdQuery, OrderDetailDto?>
{
public async Task<OrderDetailDto?> Handle(GetOrderByIdQuery query, CancellationToken ct)
{
var order = await db.Orders
.AsNoTracking()
.Include(o => o.Lines)
.FirstOrDefaultAsync(o => o.Id == query.OrderId, ct);
if (order is null) return null;
// Customers can only see their own orders
if (query.RequestingRole != "Admin" && order.CustomerId != query.RequestingUserId)
throw new ForbiddenException("You do not have access to this order.");
return new OrderDetailDto(
order.Id, order.Status, order.Total, order.CreatedAt,
order.Lines.Select(l => new OrderLineDto(l.ProductName, l.Quantity, l.UnitPrice)).ToList());
}
}Step 4: Validation Pipeline Behaviour
// src/OrderFlow.Application/Common/Behaviours/ValidationBehaviour.cs
public class ValidationBehaviour<TRequest, TResponse>(
IEnumerable<IValidator<TRequest>> validators)
: IPipelineBehavior<TRequest, TResponse>
where TRequest : notnull
{
public async Task<TResponse> Handle(
TRequest request,
RequestHandlerDelegate<TResponse> next,
CancellationToken ct)
{
if (!validators.Any()) return await next();
var context = new ValidationContext<TRequest>(request);
var failures = validators
.Select(v => v.Validate(context))
.SelectMany(r => r.Errors)
.Where(f => f is not null)
.ToList();
if (failures.Count > 0)
throw new ValidationException(failures);
return await next();
}
}
// Validator for CreateOrderCommand
public class CreateOrderCommandValidator : AbstractValidator<CreateOrderCommand>
{
public CreateOrderCommandValidator()
{
RuleFor(x => x.CustomerId).GreaterThan(0);
RuleFor(x => x.Lines).NotEmpty().WithMessage("Order must have at least one line.");
RuleForEach(x => x.Lines).ChildRules(line =>
{
line.RuleFor(l => l.ProductId).GreaterThan(0);
line.RuleFor(l => l.Quantity).InclusiveBetween(1, 1000);
});
}
}Step 5: Logging Pipeline Behaviour
// src/OrderFlow.Application/Common/Behaviours/LoggingBehaviour.cs
public class LoggingBehaviour<TRequest, TResponse>(
ILogger<LoggingBehaviour<TRequest, TResponse>> logger)
: IPipelineBehavior<TRequest, TResponse>
where TRequest : notnull
{
public async Task<TResponse> Handle(
TRequest request,
RequestHandlerDelegate<TResponse> next,
CancellationToken ct)
{
var name = typeof(TRequest).Name;
logger.LogInformation("Handling {RequestName}: {@Request}", name, request);
var sw = Stopwatch.StartNew();
try
{
var response = await next();
sw.Stop();
logger.LogInformation("Handled {RequestName} in {Ms}ms", name, sw.ElapsedMilliseconds);
return response;
}
catch (Exception ex)
{
sw.Stop();
logger.LogError(ex, "Error handling {RequestName} after {Ms}ms", name, sw.ElapsedMilliseconds);
throw;
}
}
}Step 6: Update the Endpoints
// src/OrderFlow.Api/Endpoints/OrderEndpoints.cs
public static class OrderEndpoints
{
public static void MapOrderEndpoints(this WebApplication app)
{
var group = app.MapGroup("/api/orders")
.WithTags("Orders")
.RequireAuthorization();
group.MapGet("/", GetOrders);
group.MapGet("/{id}", GetOrderById);
group.MapPost("/", CreateOrder);
group.MapPost("/{id}/cancel", CancelOrder);
}
static async Task<IResult> GetOrders(
[AsParameters] GetOrdersQueryParams p,
ClaimsPrincipal user,
ISender mediator,
CancellationToken ct)
{
var userId = user.GetUserId();
var role = user.GetRole();
var query = new GetOrdersQuery(
CustomerId: role == "Admin" ? p.CustomerId : userId,
Status: p.Status,
Page: p.Page,
PageSize: p.PageSize);
return Results.Ok(await mediator.Send(query, ct));
}
static async Task<IResult> GetOrderById(
int id,
ClaimsPrincipal user,
ISender mediator,
CancellationToken ct)
{
var result = await mediator.Send(
new GetOrderByIdQuery(id, user.GetUserId(), user.GetRole()), ct);
return result is null ? Results.NotFound() : Results.Ok(result);
}
static async Task<IResult> CreateOrder(
CreateOrderRequest req,
ClaimsPrincipal user,
ISender mediator,
CancellationToken ct)
{
var command = new CreateOrderCommand(user.GetUserId(), req.Lines);
var result = await mediator.Send(command, ct);
return Results.Created($"/api/orders/{result.OrderId}", result);
}
static async Task<IResult> CancelOrder(
int id,
CancelOrderRequest req,
ClaimsPrincipal user,
ISender mediator,
CancellationToken ct)
{
await mediator.Send(new CancelOrderCommand(id, user.GetUserId(), req.Reason), ct);
return Results.NoContent();
}
}
// Extension helpers on ClaimsPrincipal
public static class ClaimsPrincipalExtensions
{
public static int GetUserId(this ClaimsPrincipal u) =>
int.Parse(u.FindFirst(ClaimTypes.NameIdentifier)!.Value);
public static string GetRole(this ClaimsPrincipal u) =>
u.FindFirst(ClaimTypes.Role)!.Value;
}What's Next
OrderFlow now has clean CQRS separation — commands change state, queries read state, and validation + logging run automatically for every request. The endpoints are thin: one line each.
Next: OrderFlow Domain Events — publish events when an order is placed, paid, or cancelled, and handle side effects (sending email, updating analytics) without coupling the command handler to downstream services.
Enjoyed this article?
Explore the Backend Systems learning path for more.
Found this helpful?
Leave a comment
Have a question, correction, or just found this helpful? Leave a note below.