Learnixo

.NET & C# Development · Lesson 191 of 229

OrderFlow Part 3: CQRS with MediatR — Commands, Queries & Pipeline Behaviours

OrderFlow: CQRS with MediatR — Commands, Queries, and Pipeline Behaviours

This is part 3 of the OrderFlow series. Auth is wired up. Now we refactor the flat endpoint handlers into a proper CQRS structure: commands change state, queries read state, and cross-cutting concerns (validation, logging) live in pipeline behaviours — not scattered through handlers.

Starting point: OrderFlow auth complete.


Why CQRS Here?

Before (flat endpoint handler):
  app.MapPost("/api/orders", async (CreateOrderRequest req, OrderFlowDbContext db, ...) =>
  {
      // validation here
      // business logic here
      // persistence here
      // logging here — all mixed together
  });

After (CQRS):
  app.MapPost("/api/orders", async (CreateOrderRequest req, ISender mediator) =>
      Results.Ok(await mediator.Send(new CreateOrderCommand(req))));

  // Concerns separated into:
  //   CreateOrderCommand        — the intent
  //   CreateOrderCommandHandler — the business logic
  //   ValidationBehaviour       — validates every command (pipeline)
  //   LoggingBehaviour          — logs every request (pipeline)

Step 1: Install MediatR

XML
<PackageReference Include="MediatR" Version="12.*" />
<PackageReference Include="FluentValidation.DependencyInjectionExtensions" Version="11.*" />
C#
// Program.cs
builder.Services.AddMediatR(cfg =>
    cfg.RegisterServicesFromAssembly(typeof(CreateOrderCommand).Assembly));

builder.Services.AddValidatorsFromAssembly(typeof(CreateOrderCommand).Assembly);

// Register pipeline behaviours — order matters (first registered = outermost)
builder.Services.AddTransient(typeof(IPipelineBehavior<,>), typeof(LoggingBehaviour<,>));
builder.Services.AddTransient(typeof(IPipelineBehavior<,>), typeof(ValidationBehaviour<,>));

Step 2: Commands

C#
// src/OrderFlow.Application/Orders/Commands/CreateOrder/CreateOrderCommand.cs
public record CreateOrderCommand(
    int     CustomerId,
    List<CreateOrderLineDto> Lines) : IRequest<CreateOrderResult>;

public record CreateOrderLineDto(int ProductId, int Quantity);
public record CreateOrderResult(int OrderId, decimal Total, string Status);

// src/OrderFlow.Application/Orders/Commands/CreateOrder/CreateOrderCommandHandler.cs
public class CreateOrderCommandHandler(
    OrderFlowDbContext db,
    IProductRepository products,
    ILogger<CreateOrderCommandHandler> logger)
    : IRequestHandler<CreateOrderCommand, CreateOrderResult>
{
    public async Task<CreateOrderResult> Handle(
        CreateOrderCommand cmd,
        CancellationToken ct)
    {
        // Load products to get prices
        var productIds = cmd.Lines.Select(l => l.ProductId).ToList();
        var catalogue  = await products.GetByIdsAsync(productIds, ct);

        var lines = cmd.Lines.Select(l =>
        {
            var product = catalogue.First(p => p.Id == l.ProductId);
            return new OrderLine
            {
                ProductId   = l.ProductId,
                ProductName = product.Name,
                Quantity    = l.Quantity,
                UnitPrice   = product.Price,
            };
        }).ToList();

        var order = new Order
        {
            CustomerId = cmd.CustomerId,
            Lines      = lines,
            Total      = lines.Sum(l => l.Quantity * l.UnitPrice),
            Status     = "Pending",
            CreatedAt  = DateTime.UtcNow,
        };

        db.Orders.Add(order);
        await db.SaveChangesAsync(ct);

        logger.LogInformation("Order {OrderId} created for customer {CustomerId}",
            order.Id, order.CustomerId);

        return new CreateOrderResult(order.Id, order.Total, order.Status);
    }
}
C#
// Cancel order command
public record CancelOrderCommand(int OrderId, int RequestingUserId, string Reason)
    : IRequest<Unit>;

public class CancelOrderCommandHandler(OrderFlowDbContext db)
    : IRequestHandler<CancelOrderCommand, Unit>
{
    public async Task<Unit> Handle(CancelOrderCommand cmd, CancellationToken ct)
    {
        var order = await db.Orders.FindAsync([cmd.OrderId], ct)
            ?? throw new NotFoundException(nameof(Order), cmd.OrderId);

        if (order.CustomerId != cmd.RequestingUserId)
            throw new ForbiddenException("You can only cancel your own orders.");

        if (order.Status is "Shipped" or "Delivered")
            throw new ConflictException($"Cannot cancel an order with status '{order.Status}'.");

        order.Status      = "Cancelled";
        order.CancelledAt = DateTime.UtcNow;
        order.CancelReason = cmd.Reason;

        await db.SaveChangesAsync(ct);
        return Unit.Value;
    }
}

Step 3: Queries

C#
// src/OrderFlow.Application/Orders/Queries/GetOrders/GetOrdersQuery.cs
public record GetOrdersQuery(
    int?    CustomerId,
    string? Status,
    int     Page     = 1,
    int     PageSize = 20) : IRequest<PagedResult<OrderSummaryDto>>;

public record OrderSummaryDto(
    int      Id,
    string   Status,
    decimal  Total,
    int      ItemCount,
    DateTime CreatedAt);

public class GetOrdersQueryHandler(OrderFlowDbContext db)
    : IRequestHandler<GetOrdersQuery, PagedResult<OrderSummaryDto>>
{
    public async Task<PagedResult<OrderSummaryDto>> Handle(
        GetOrdersQuery query,
        CancellationToken ct)
    {
        var q = db.Orders.AsNoTracking().AsQueryable();

        if (query.CustomerId.HasValue)
            q = q.Where(o => o.CustomerId == query.CustomerId.Value);

        if (!string.IsNullOrWhiteSpace(query.Status))
            q = q.Where(o => o.Status == query.Status);

        var total = await q.CountAsync(ct);

        var items = await q
            .OrderByDescending(o => o.CreatedAt)
            .Skip((query.Page - 1) * query.PageSize)
            .Take(query.PageSize)
            .Select(o => new OrderSummaryDto(
                o.Id, o.Status, o.Total, o.Lines.Count, o.CreatedAt))
            .ToListAsync(ct);

        return new PagedResult<OrderSummaryDto>(items, total, query.Page, query.PageSize);
    }
}

// Single order with full detail
public record GetOrderByIdQuery(int OrderId, int RequestingUserId, string RequestingRole)
    : IRequest<OrderDetailDto?>;

public class GetOrderByIdQueryHandler(OrderFlowDbContext db)
    : IRequestHandler<GetOrderByIdQuery, OrderDetailDto?>
{
    public async Task<OrderDetailDto?> Handle(GetOrderByIdQuery query, CancellationToken ct)
    {
        var order = await db.Orders
            .AsNoTracking()
            .Include(o => o.Lines)
            .FirstOrDefaultAsync(o => o.Id == query.OrderId, ct);

        if (order is null) return null;

        // Customers can only see their own orders
        if (query.RequestingRole != "Admin" && order.CustomerId != query.RequestingUserId)
            throw new ForbiddenException("You do not have access to this order.");

        return new OrderDetailDto(
            order.Id, order.Status, order.Total, order.CreatedAt,
            order.Lines.Select(l => new OrderLineDto(l.ProductName, l.Quantity, l.UnitPrice)).ToList());
    }
}

Step 4: Validation Pipeline Behaviour

C#
// src/OrderFlow.Application/Common/Behaviours/ValidationBehaviour.cs
public class ValidationBehaviour<TRequest, TResponse>(
    IEnumerable<IValidator<TRequest>> validators)
    : IPipelineBehavior<TRequest, TResponse>
    where TRequest : notnull
{
    public async Task<TResponse> Handle(
        TRequest request,
        RequestHandlerDelegate<TResponse> next,
        CancellationToken ct)
    {
        if (!validators.Any()) return await next();

        var context = new ValidationContext<TRequest>(request);
        var failures = validators
            .Select(v => v.Validate(context))
            .SelectMany(r => r.Errors)
            .Where(f => f is not null)
            .ToList();

        if (failures.Count > 0)
            throw new ValidationException(failures);

        return await next();
    }
}

// Validator for CreateOrderCommand
public class CreateOrderCommandValidator : AbstractValidator<CreateOrderCommand>
{
    public CreateOrderCommandValidator()
    {
        RuleFor(x => x.CustomerId).GreaterThan(0);
        RuleFor(x => x.Lines).NotEmpty().WithMessage("Order must have at least one line.");
        RuleForEach(x => x.Lines).ChildRules(line =>
        {
            line.RuleFor(l => l.ProductId).GreaterThan(0);
            line.RuleFor(l => l.Quantity).InclusiveBetween(1, 1000);
        });
    }
}

Step 5: Logging Pipeline Behaviour

C#
// src/OrderFlow.Application/Common/Behaviours/LoggingBehaviour.cs
public class LoggingBehaviour<TRequest, TResponse>(
    ILogger<LoggingBehaviour<TRequest, TResponse>> logger)
    : IPipelineBehavior<TRequest, TResponse>
    where TRequest : notnull
{
    public async Task<TResponse> Handle(
        TRequest request,
        RequestHandlerDelegate<TResponse> next,
        CancellationToken ct)
    {
        var name = typeof(TRequest).Name;
        logger.LogInformation("Handling {RequestName}: {@Request}", name, request);

        var sw = Stopwatch.StartNew();
        try
        {
            var response = await next();
            sw.Stop();
            logger.LogInformation("Handled {RequestName} in {Ms}ms", name, sw.ElapsedMilliseconds);
            return response;
        }
        catch (Exception ex)
        {
            sw.Stop();
            logger.LogError(ex, "Error handling {RequestName} after {Ms}ms", name, sw.ElapsedMilliseconds);
            throw;
        }
    }
}

Step 6: Update the Endpoints

C#
// src/OrderFlow.Api/Endpoints/OrderEndpoints.cs
public static class OrderEndpoints
{
    public static void MapOrderEndpoints(this WebApplication app)
    {
        var group = app.MapGroup("/api/orders")
            .WithTags("Orders")
            .RequireAuthorization();

        group.MapGet("/",       GetOrders);
        group.MapGet("/{id}",   GetOrderById);
        group.MapPost("/",      CreateOrder);
        group.MapPost("/{id}/cancel", CancelOrder);
    }

    static async Task<IResult> GetOrders(
        [AsParameters] GetOrdersQueryParams p,
        ClaimsPrincipal user,
        ISender mediator,
        CancellationToken ct)
    {
        var userId = user.GetUserId();
        var role   = user.GetRole();

        var query = new GetOrdersQuery(
            CustomerId: role == "Admin" ? p.CustomerId : userId,
            Status:     p.Status,
            Page:       p.Page,
            PageSize:   p.PageSize);

        return Results.Ok(await mediator.Send(query, ct));
    }

    static async Task<IResult> GetOrderById(
        int id,
        ClaimsPrincipal user,
        ISender mediator,
        CancellationToken ct)
    {
        var result = await mediator.Send(
            new GetOrderByIdQuery(id, user.GetUserId(), user.GetRole()), ct);

        return result is null ? Results.NotFound() : Results.Ok(result);
    }

    static async Task<IResult> CreateOrder(
        CreateOrderRequest req,
        ClaimsPrincipal user,
        ISender mediator,
        CancellationToken ct)
    {
        var command = new CreateOrderCommand(user.GetUserId(), req.Lines);
        var result  = await mediator.Send(command, ct);
        return Results.Created($"/api/orders/{result.OrderId}", result);
    }

    static async Task<IResult> CancelOrder(
        int id,
        CancelOrderRequest req,
        ClaimsPrincipal user,
        ISender mediator,
        CancellationToken ct)
    {
        await mediator.Send(new CancelOrderCommand(id, user.GetUserId(), req.Reason), ct);
        return Results.NoContent();
    }
}

// Extension helpers on ClaimsPrincipal
public static class ClaimsPrincipalExtensions
{
    public static int    GetUserId(this ClaimsPrincipal u) =>
        int.Parse(u.FindFirst(ClaimTypes.NameIdentifier)!.Value);
    public static string GetRole(this ClaimsPrincipal u) =>
        u.FindFirst(ClaimTypes.Role)!.Value;
}

What's Next

OrderFlow now has clean CQRS separation — commands change state, queries read state, and validation + logging run automatically for every request. The endpoints are thin: one line each.

Next: OrderFlow Domain Events — publish events when an order is placed, paid, or cancelled, and handle side effects (sending email, updating analytics) without coupling the command handler to downstream services.