A development guide for building loosely coupled, event-driven microservices using Event Driven .NET abstractions and reference architecture.
The following steps illustrate how to create microservices based on the principles of Domain Driven Design (DDD) and Command Query Responsibility Segregation (CQRS) with services that communicate asynchronously over an Event Bus abstraction that uses Dapr for publish-subscribe with an underlying message broker.
- Create a CustomerService Web API project.
- Add the following packages.
- AutoMapper.Extensions.Microsoft.DependencyInjection
- EventDriven.CQRS.Abstractions
- EventDriven.CQRS.Extensions
- EventDriven.DependencyInjection.URF.Mongo
- EventDriven.EventBus.Dapr
- EventDriven.EventBus.EventCache.Mongo
- MongoDB.Driver
- URF.Core.Mongo
- Add the following packages.
- Add Domain/CustomerAggregate folders to the project, then add a
Customerclass that extendsEntity.- Add properties representing entity state.
public class Customer : Entity { public string FirstName { get; set; } = null!; public string LastName { get; set; } = null!; public Address ShippingAddress { get; set; } = null!; }
- Create commands that are C# records and extend a
Commandbase class.
public record CreateCustomer(Customer? Entity) : Command<Customer>(Entity);
public record UpdateCustomer(Customer? Entity) : Command<Customer>(Entity);
public record RemoveCustomer(Guid EntityId) : Command(EntityId);
- Create an Events folder in Domain/CustomerAggregate and add events that extend
DomainEvent.
public record CustomerCreated(Customer? Entity) : DomainEvent<Customer>(Entity);
public record CustomerUpdated(Customer? Entity) : DomainEvent<Customer>(Entity);
public record CustomerRemoved(Guid EntityId) : DomainEvent(EntityId);
- Update the
Customerentity to implementICommandProcessorandIEventApplierinterfaces to process commands by emitting domain events and to apply those events to mutate entity state.- Implementing entity behavior by means of
ProcessandApplymethods allows for easier migration to event sourcing in the future. - Implement
ICommandProcessor<CreateCustomer, Customer, CustomerCreated>to add aProcessmethod that accepts aCreateCustomercommand and returns aCustomerCreatedevent. - Implement
IEventApplier<CustomerCreated>to mutate entity state based on aCustomerCreatedevent.
- Implementing entity behavior by means of
public class Customer : Entity, ICommandProcessor<CreateCustomer, Customer, CustomerCreated>, IEventApplier<CustomerCreated> { public string FirstName { get; set; } public string LastName { get; set; } public Address ShippingAddress { get; set; } public CustomerCreated Process(CreateCustomer command) // To process command, return one or more domain events => new(command.Entity); public void Apply(CustomerCreated domainEvent) => // Set Id Id = domainEvent.EntityId != default ? domainEvent.EntityId : Guid.NewGuid(); }
- Add a
CreateCustomerHandlerclass to a CommandHandlers folder in Domain/CustomerAggregate, and implementICommandHandler<Customer, CreateCustomer>.- Inject
ICustomerRepositoryinto the constructor. - In the
Handlemethod write code to process the command, apply events, and persist the entity.
public async Task<CommandResult<Customer>> Handle(CreateCustomer command, CancellationToken cancellationToken) { // Process command if (command.Entity == null) return new CommandResult<Customer>(CommandOutcome.InvalidCommand); var domainEvent = command.Entity.Process(command); // Apply events command.Entity.Apply(domainEvent); // Persist entity var entity = await _repository.AddAsync(command.Entity); if (entity == null) return new CommandResult<Customer>(CommandOutcome.InvalidCommand); return new CommandResult<Customer>(CommandOutcome.Accepted, entity); }
- Inject
- Add a Common class library project to the solution.
- Add the following packages:
- EventDriven.CQRS.Abstractions
- EventDriven.EventBus.Abstractions
- Microsoft.Extensions.Logging.Abstractions
- Reference the Common project from the CustomerService project.
- Create a
LoggingBehavior<TRequest, TResponse>class that implementsIBehavior<TRequest, TResponse>. - In the
Handlemethod perform pre and post handler logging.
public async Task<TResponse> Handle(TRequest request, CancellationToken cancellationToken, RequestHandlerDelegate<TResponse> next) { string requestType = string.Empty; if (typeof(TRequest).IsCommandType()) requestType = "command"; else if (typeof(TRequest).IsQueryType()) requestType = "query"; _logger.LogInformation("----- Handling {RequestType} '{CommandName}'. Request: {@Request}", requestType, request.GetGenericTypeName(), request); var response = await next(); _logger.LogInformation("----- Handled {RequestType} '{CommandName}'. Response: {@Response}", requestType, request.GetGenericTypeName(), response); return response; }
- Create a
CustomerAddressUpdatedrecord that extendsIntegrationEvent.
public record CustomerAddressUpdated(Guid CustomerId, Address ShippingAddress) : IntegrationEvent;
- Add the following packages:
- Create a Repositories folder in the CustomerService project to contain repositories.
- Add an
ICustomerRepositoryinterface.
public interface ICustomerRepository { Task<IEnumerable<Customer>> GetAsync(); Task<Customer?> GetAsync(Guid id); Task<Customer?> AddAsync(Customer entity); Task<Customer?> UpdateAsync(Customer entity); Task<int> RemoveAsync(Guid id); }
- Add a
CustomerRepositoryclass that implementsICustomerRepositoryand extendsDocumentRepository<Customer>.
public class CustomerRepository : DocumentRepository<Customer>, ICustomerRepository { public CustomerRepository(IMongoCollection<Customer> collection) : base(collection) { } public async Task<IEnumerable<Customer>> GetAsync() => await FindManyAsync(); public async Task<Customer?> GetAsync(Guid id) => await FindOneAsync(e => e.Id == id); public async Task<Customer?> AddAsync(Customer entity) { var existing = await FindOneAsync(e => e.Id == entity.Id); if (existing != null) return null; if (string.IsNullOrWhiteSpace(entity.ETag)) entity.ETag = Guid.NewGuid().ToString(); return await InsertOneAsync(entity); } public async Task<Customer?> UpdateAsync(Customer entity) { var existing = await GetAsync(entity.Id); if (existing == null) return null; if (string.Compare(entity.ETag, existing.ETag, StringComparison.OrdinalIgnoreCase) != 0 ) throw new ConcurrencyException(); entity.ETag = Guid.NewGuid().ToString(); return await FindOneAndReplaceAsync(e => e.Id == entity.Id, entity); } public async Task<int> RemoveAsync(Guid id) => await DeleteOneAsync(e => e.Id == id); }
- Add an
- Add an
UpdateCustomerHandlerclass to the CommandHandlers folder.- Inject
ICustomerRepository,IEventBusandIMapperinto the constructor. - In the
Handlemethod, see if the shipping address has changed, and if so, publish aCustomerAddressUpdatedintegration event, so that the order service can update the shipping address in the customer's orders.
public async Task<CommandResult<Customer>> Handle(UpdateCustomer command, CancellationToken cancellationToken) { // Process command if (command.Entity == null) return new CommandResult<Customer>(CommandOutcome.InvalidCommand); var domainEvent = command.Entity.Process(command); // Apply events command.Entity.Apply(domainEvent); // Compare shipping addresses var existing = await _repository.GetAsync(command.EntityId); if (existing == null) return new CommandResult<Customer>(CommandOutcome.NotHandled); var addressChanged = command.Entity.ShippingAddress != existing.ShippingAddress; try { // Persist entity var entity = await _repository.UpdateAsync(command.Entity); if (entity == null) return new CommandResult<Customer>(CommandOutcome.NotFound); // Publish events if (addressChanged) { var shippingAddress = _mapper.Map<Integration.Models.Address>(entity.ShippingAddress); _logger.LogInformation("----- Publishing event: {EventName}", $"v1.{nameof(CustomerAddressUpdated)}"); await _eventBus.PublishAsync( new CustomerAddressUpdated(entity.Id, shippingAddress), null, "v1"); } return new CommandResult<Customer>(CommandOutcome.Accepted, entity); } catch (ConcurrencyException) { return new CommandResult<Customer>(CommandOutcome.Conflict); } }
- Inject
- Create queries and query handlers to retrieve entities from the customer repository.
- Add
GetCustomerandGetCustomersrecords to a Queries folder.
public record GetCustomer(Guid Id) : Query<Customer?>;
public record GetCustomers : Query<IEnumerable<Customer>>;
- Add
GetCustomerHandlerto a QueryHandlers folder.
public class GetCustomerHandler : IQueryHandler<GetCustomer, Customer?> { private readonly ICustomerRepository _repository; public GetCustomerHandler( ICustomerRepository repository) { _repository = repository; } public async Task<Customer?> Handle(GetCustomer query, CancellationToken cancellationToken) { var result = await _repository.GetAsync(query.Id); return result; } }
- Add
GetCustomersHandlerto the QueryHandlers folder.
public class GetCustomersHandler : IQueryHandler<GetCustomers, IEnumerable<Customer>> { private readonly ICustomerRepository _repository; public GetCustomersHandler( ICustomerRepository repository) { _repository = repository; } public async Task<IEnumerable<Customer>> Handle(GetCustomers query, CancellationToken cancellationToken) { var result = await _repository.GetAsync(); return result; } }
- Add
- Add read and write models to a DTO folder. Note that read and write models may differ from one another.
- Add an
AutoMapperProfileclass that extendsProfileand maps DTO's to entities.
- Add an
- Add a
CustomerCommandControllerto the project that injectsICommandBrokerandIMapperinto the ctor.- Add Post, Put and Delete actions which accept a
CustomerDTO, map it to aCustomerentity and callSendAsyncon the command broker, passing the appropriate command. - Map input and output entities to corresponding DTO's.
// POST api/customer [HttpPost] public async Task<IActionResult> Create([FromBody] DTO.Write.Customer customerDto) { var customerIn = _mapper.Map<Customer>(customerDto); var result = await _commandBroker.SendAsync(new CreateCustomer(customerIn)); if (result.Outcome != CommandOutcome.Accepted) return result.ToActionResult(); var customerOut = _mapper.Map<DTO.Write.Customer>(result.Entity); return new CreatedResult($"api/customer/{customerOut.Id}", customerOut); } // PUT api/customer [HttpPut] public async Task<IActionResult> Update([FromBody] DTO.Write.Customer customerDto) { var customerIn = _mapper.Map<Customer>(customerDto); var result = await _commandBroker.SendAsync(new UpdateCustomer(customerIn)); if (result.Outcome != CommandOutcome.Accepted) return result.ToActionResult(); var customerOut = _mapper.Map<DTO.Write.Customer>(result.Entity); return result.ToActionResult(customerOut); } // DELETE api/customer/id [HttpDelete] [Route("{id}")] public async Task<IActionResult> Remove([FromRoute] Guid id) { var result = await _commandBroker.SendAsync(new RemoveCustomer(id)); return result.Outcome != CommandOutcome.Accepted ? result.ToActionResult() : new NoContentResult(); }
- Add Post, Put and Delete actions which accept a
- Add a
CustomerQueryControllerto the project that injectsIQueryBrokerandIMapperinto the constructor.
- Use the repository to retrieve entities, then map those to
CustomerDTO objects.// GET api/customer [HttpGet] public async Task<IActionResult> GetCustomers() { var customers = await _queryBroker.SendAsync(new GetCustomers()); var result = _mapper.Map<IEnumerable<CustomerView>>(customers); return Ok(result); } // GET api/customer/id [HttpGet] [Route("{id:guid}")] public async Task<IActionResult> GetCustomer([FromRoute] Guid id) { var customer = await _queryBroker.SendAsync(new GetCustomer(id)); if (customer == null) return NotFound(); var result = _mapper.Map<CustomerView>(customer); return Ok(result); }
- Register dependencies for CustomerService in
Program.// Add automapper builder.Services.AddAutoMapper(typeof(Program)); // Add command and query handlers builder.Services.AddHandlers(typeof(Program)); // Add behaviors builder.Services.AddTransient(typeof(IPipelineBehavior<,>), typeof(LoggingBehavior<,>)); // Add database settings builder.Services.AddSingleton<ICustomerRepository, CustomerRepository>(); builder.Services.AddMongoDbSettings<CustomerDatabaseSettings, Customer>(builder.Configuration); // Add Dapr event bus builder.Services.AddDaprEventBus(builder.Configuration, true); builder.Services.AddMongoEventCache(builder.Configuration);
- Add configuration entries to appsettings.json.
"CustomerDatabaseSettings": { "ConnectionString": "mongodb://localhost:27017", "DatabaseName": "CustomersDb", "CollectionName": "Customers" }, "DaprEventBusOptions": { "PubSubName": "pubsub" }, "MongoEventCacheOptions": { "AppName": "order-service" }, "MongoStoreDatabaseSettings": { "ConnectionString": "mongodb://localhost:27017", "DatabaseName": "daprStore", "CollectionName": "daprCollection" }, "DaprEventBusSchemaOptions": { "UseSchemaRegistry": true, "SchemaValidatorType": "Json", "SchemaRegistryType": "Mongo", "AddSchemaOnPublish": true, "MongoStateStoreOptions": { "ConnectionString": "mongodb://localhost:27017", "DatabaseName": "schema-registry", "SchemasCollectionName": "schemas" } }
- Repeat these steps for the Order service.
- Reference the Common project.
- Add Integration/EventHandlers folders with a
CustomerAddressUpdatedEventHandlerclass that extendsIntegrationEventHandler<CustomerAddressUpdated>. - Override
HandleAsyncto update the order addresses for the customer.
public override async Task HandleAsync(CustomerAddressUpdated @event) { var orders = await _orderRepository.GetCustomerOrders(@event.CustomerId); foreach (var order in orders) { var shippingAddress = _mapper.Map<Address>(@event.ShippingAddress); await _orderRepository.UpdateOrderAddress(order.Id, shippingAddress); } }
- Register dependencies for CustomerService in
Program. - In
ProgramregisterCustomerAddressUpdatedEventHandlerand add the Dapr Event Bus.builder.Services.AddSingleton<CustomerAddressUpdatedEventHandler>(); builder.Services.AddDaprEventBus(builder.Configuration, true); builder.Services.AddMongoEventCache(builder.Configuration);
- Also in
Programuse Cloud Events, map subscribe handlers, and map Dapr Event Bus endpoints.app.UseCloudEvents(); app.UseEndpoints(endpoints => { endpoints.MapControllers(); endpoints.MapSubscribeHandler(); endpoints.MapDaprEventBus(eventBus => { var customerAddressUpdatedEventHandler = app.Services.GetRequiredService<CustomerAddressUpdatedEventHandler>(); eventBus.Subscribe(customerAddressUpdatedEventHandler, null, "v1"); }); });
- Add configuration entries for
MongoEventCacheOptionsandMongoStoreDatabaseSettingsto appsettings.json."MongoEventCacheOptions": { "AppName": "order-service" }, "MongoStoreDatabaseSettings": { "ConnectionString": "mongodb://localhost:27017", "DatabaseName": "daprStore", "CollectionName": "daprCollection" }
- Lastly, add a dapr/components directory to the reference-architecture folder.
- Add the following dapr component yaml files:
- pubsub.yaml
- statestore.yaml
- statestore-mongodb.yaml
- Files not in use should be placed in a separate folder.
- Add the following dapr component yaml files: