.NET & C# Development · Lesson 191 of 229
OrderFlow Part 3: CQRS with MediatR — Commands, Queries & Pipeline Behaviours
OrderFlow: CQRS with MediatR — Commands, Queries, and Pipeline Behaviours
This is part 3 of the OrderFlow series. Auth is wired up. Now we refactor the flat endpoint handlers into a proper CQRS structure: commands change state, queries read state, and cross-cutting concerns (validation, logging) live in pipeline behaviours — not scattered through handlers.
Starting point: OrderFlow auth complete.
Why CQRS Here?
Before (flat endpoint handler):
app.MapPost("/api/orders", async (CreateOrderRequest req, OrderFlowDbContext db, ...) =>
{
// validation here
// business logic here
// persistence here
// logging here — all mixed together
});
After (CQRS):
app.MapPost("/api/orders", async (CreateOrderRequest req, ISender mediator) =>
Results.Ok(await mediator.Send(new CreateOrderCommand(req))));
// Concerns separated into:
// CreateOrderCommand — the intent
// CreateOrderCommandHandler — the business logic
// ValidationBehaviour — validates every command (pipeline)
// LoggingBehaviour — logs every request (pipeline)Step 1: Install MediatR
<PackageReference Include="MediatR" Version="12.*" />
<PackageReference Include="FluentValidation.DependencyInjectionExtensions" Version="11.*" />// Program.cs
builder.Services.AddMediatR(cfg =>
cfg.RegisterServicesFromAssembly(typeof(CreateOrderCommand).Assembly));
builder.Services.AddValidatorsFromAssembly(typeof(CreateOrderCommand).Assembly);
// Register pipeline behaviours — order matters (first registered = outermost)
builder.Services.AddTransient(typeof(IPipelineBehavior<,>), typeof(LoggingBehaviour<,>));
builder.Services.AddTransient(typeof(IPipelineBehavior<,>), typeof(ValidationBehaviour<,>));Step 2: Commands
// src/OrderFlow.Application/Orders/Commands/CreateOrder/CreateOrderCommand.cs
public record CreateOrderCommand(
int CustomerId,
List<CreateOrderLineDto> Lines) : IRequest<CreateOrderResult>;
public record CreateOrderLineDto(int ProductId, int Quantity);
public record CreateOrderResult(int OrderId, decimal Total, string Status);
// src/OrderFlow.Application/Orders/Commands/CreateOrder/CreateOrderCommandHandler.cs
public class CreateOrderCommandHandler(
OrderFlowDbContext db,
IProductRepository products,
ILogger<CreateOrderCommandHandler> logger)
: IRequestHandler<CreateOrderCommand, CreateOrderResult>
{
public async Task<CreateOrderResult> Handle(
CreateOrderCommand cmd,
CancellationToken ct)
{
// Load products to get prices
var productIds = cmd.Lines.Select(l => l.ProductId).ToList();
var catalogue = await products.GetByIdsAsync(productIds, ct);
var lines = cmd.Lines.Select(l =>
{
var product = catalogue.First(p => p.Id == l.ProductId);
return new OrderLine
{
ProductId = l.ProductId,
ProductName = product.Name,
Quantity = l.Quantity,
UnitPrice = product.Price,
};
}).ToList();
var order = new Order
{
CustomerId = cmd.CustomerId,
Lines = lines,
Total = lines.Sum(l => l.Quantity * l.UnitPrice),
Status = "Pending",
CreatedAt = DateTime.UtcNow,
};
db.Orders.Add(order);
await db.SaveChangesAsync(ct);
logger.LogInformation("Order {OrderId} created for customer {CustomerId}",
order.Id, order.CustomerId);
return new CreateOrderResult(order.Id, order.Total, order.Status);
}
}// Cancel order command
public record CancelOrderCommand(int OrderId, int RequestingUserId, string Reason)
: IRequest<Unit>;
public class CancelOrderCommandHandler(OrderFlowDbContext db)
: IRequestHandler<CancelOrderCommand, Unit>
{
public async Task<Unit> Handle(CancelOrderCommand cmd, CancellationToken ct)
{
var order = await db.Orders.FindAsync([cmd.OrderId], ct)
?? throw new NotFoundException(nameof(Order), cmd.OrderId);
if (order.CustomerId != cmd.RequestingUserId)
throw new ForbiddenException("You can only cancel your own orders.");
if (order.Status is "Shipped" or "Delivered")
throw new ConflictException($"Cannot cancel an order with status '{order.Status}'.");
order.Status = "Cancelled";
order.CancelledAt = DateTime.UtcNow;
order.CancelReason = cmd.Reason;
await db.SaveChangesAsync(ct);
return Unit.Value;
}
}Step 3: Queries
// src/OrderFlow.Application/Orders/Queries/GetOrders/GetOrdersQuery.cs
public record GetOrdersQuery(
int? CustomerId,
string? Status,
int Page = 1,
int PageSize = 20) : IRequest<PagedResult<OrderSummaryDto>>;
public record OrderSummaryDto(
int Id,
string Status,
decimal Total,
int ItemCount,
DateTime CreatedAt);
public class GetOrdersQueryHandler(OrderFlowDbContext db)
: IRequestHandler<GetOrdersQuery, PagedResult<OrderSummaryDto>>
{
public async Task<PagedResult<OrderSummaryDto>> Handle(
GetOrdersQuery query,
CancellationToken ct)
{
var q = db.Orders.AsNoTracking().AsQueryable();
if (query.CustomerId.HasValue)
q = q.Where(o => o.CustomerId == query.CustomerId.Value);
if (!string.IsNullOrWhiteSpace(query.Status))
q = q.Where(o => o.Status == query.Status);
var total = await q.CountAsync(ct);
var items = await q
.OrderByDescending(o => o.CreatedAt)
.Skip((query.Page - 1) * query.PageSize)
.Take(query.PageSize)
.Select(o => new OrderSummaryDto(
o.Id, o.Status, o.Total, o.Lines.Count, o.CreatedAt))
.ToListAsync(ct);
return new PagedResult<OrderSummaryDto>(items, total, query.Page, query.PageSize);
}
}
// Single order with full detail
public record GetOrderByIdQuery(int OrderId, int RequestingUserId, string RequestingRole)
: IRequest<OrderDetailDto?>;
public class GetOrderByIdQueryHandler(OrderFlowDbContext db)
: IRequestHandler<GetOrderByIdQuery, OrderDetailDto?>
{
public async Task<OrderDetailDto?> Handle(GetOrderByIdQuery query, CancellationToken ct)
{
var order = await db.Orders
.AsNoTracking()
.Include(o => o.Lines)
.FirstOrDefaultAsync(o => o.Id == query.OrderId, ct);
if (order is null) return null;
// Customers can only see their own orders
if (query.RequestingRole != "Admin" && order.CustomerId != query.RequestingUserId)
throw new ForbiddenException("You do not have access to this order.");
return new OrderDetailDto(
order.Id, order.Status, order.Total, order.CreatedAt,
order.Lines.Select(l => new OrderLineDto(l.ProductName, l.Quantity, l.UnitPrice)).ToList());
}
}Step 4: Validation Pipeline Behaviour
// src/OrderFlow.Application/Common/Behaviours/ValidationBehaviour.cs
public class ValidationBehaviour<TRequest, TResponse>(
IEnumerable<IValidator<TRequest>> validators)
: IPipelineBehavior<TRequest, TResponse>
where TRequest : notnull
{
public async Task<TResponse> Handle(
TRequest request,
RequestHandlerDelegate<TResponse> next,
CancellationToken ct)
{
if (!validators.Any()) return await next();
var context = new ValidationContext<TRequest>(request);
var failures = validators
.Select(v => v.Validate(context))
.SelectMany(r => r.Errors)
.Where(f => f is not null)
.ToList();
if (failures.Count > 0)
throw new ValidationException(failures);
return await next();
}
}
// Validator for CreateOrderCommand
public class CreateOrderCommandValidator : AbstractValidator<CreateOrderCommand>
{
public CreateOrderCommandValidator()
{
RuleFor(x => x.CustomerId).GreaterThan(0);
RuleFor(x => x.Lines).NotEmpty().WithMessage("Order must have at least one line.");
RuleForEach(x => x.Lines).ChildRules(line =>
{
line.RuleFor(l => l.ProductId).GreaterThan(0);
line.RuleFor(l => l.Quantity).InclusiveBetween(1, 1000);
});
}
}Step 5: Logging Pipeline Behaviour
// src/OrderFlow.Application/Common/Behaviours/LoggingBehaviour.cs
public class LoggingBehaviour<TRequest, TResponse>(
ILogger<LoggingBehaviour<TRequest, TResponse>> logger)
: IPipelineBehavior<TRequest, TResponse>
where TRequest : notnull
{
public async Task<TResponse> Handle(
TRequest request,
RequestHandlerDelegate<TResponse> next,
CancellationToken ct)
{
var name = typeof(TRequest).Name;
logger.LogInformation("Handling {RequestName}: {@Request}", name, request);
var sw = Stopwatch.StartNew();
try
{
var response = await next();
sw.Stop();
logger.LogInformation("Handled {RequestName} in {Ms}ms", name, sw.ElapsedMilliseconds);
return response;
}
catch (Exception ex)
{
sw.Stop();
logger.LogError(ex, "Error handling {RequestName} after {Ms}ms", name, sw.ElapsedMilliseconds);
throw;
}
}
}Step 6: Update the Endpoints
// src/OrderFlow.Api/Endpoints/OrderEndpoints.cs
public static class OrderEndpoints
{
public static void MapOrderEndpoints(this WebApplication app)
{
var group = app.MapGroup("/api/orders")
.WithTags("Orders")
.RequireAuthorization();
group.MapGet("/", GetOrders);
group.MapGet("/{id}", GetOrderById);
group.MapPost("/", CreateOrder);
group.MapPost("/{id}/cancel", CancelOrder);
}
static async Task<IResult> GetOrders(
[AsParameters] GetOrdersQueryParams p,
ClaimsPrincipal user,
ISender mediator,
CancellationToken ct)
{
var userId = user.GetUserId();
var role = user.GetRole();
var query = new GetOrdersQuery(
CustomerId: role == "Admin" ? p.CustomerId : userId,
Status: p.Status,
Page: p.Page,
PageSize: p.PageSize);
return Results.Ok(await mediator.Send(query, ct));
}
static async Task<IResult> GetOrderById(
int id,
ClaimsPrincipal user,
ISender mediator,
CancellationToken ct)
{
var result = await mediator.Send(
new GetOrderByIdQuery(id, user.GetUserId(), user.GetRole()), ct);
return result is null ? Results.NotFound() : Results.Ok(result);
}
static async Task<IResult> CreateOrder(
CreateOrderRequest req,
ClaimsPrincipal user,
ISender mediator,
CancellationToken ct)
{
var command = new CreateOrderCommand(user.GetUserId(), req.Lines);
var result = await mediator.Send(command, ct);
return Results.Created($"/api/orders/{result.OrderId}", result);
}
static async Task<IResult> CancelOrder(
int id,
CancelOrderRequest req,
ClaimsPrincipal user,
ISender mediator,
CancellationToken ct)
{
await mediator.Send(new CancelOrderCommand(id, user.GetUserId(), req.Reason), ct);
return Results.NoContent();
}
}
// Extension helpers on ClaimsPrincipal
public static class ClaimsPrincipalExtensions
{
public static int GetUserId(this ClaimsPrincipal u) =>
int.Parse(u.FindFirst(ClaimTypes.NameIdentifier)!.Value);
public static string GetRole(this ClaimsPrincipal u) =>
u.FindFirst(ClaimTypes.Role)!.Value;
}What's Next
OrderFlow now has clean CQRS separation — commands change state, queries read state, and validation + logging run automatically for every request. The endpoints are thin: one line each.
Next: OrderFlow Domain Events — publish events when an order is placed, paid, or cancelled, and handle side effects (sending email, updating analytics) without coupling the command handler to downstream services.