Saga Development Guide
December 4, 2023 ยท View on GitHub
A development guide for building a saga orchestrator based on EventDriven.Sagas abstractions, libraries and reference architecture.
Introduction
The following diagram illustrates how various classes are used in the execution of a saga.
Classes
- Saga Configuration: Class the implements
ISagaConfigDefinition.- This class resides in a separate class library and has a
CreateSagaConfigmethod that accepts a saga config id and returns aSagaConfigurationDtowith steps, actions and commands which define the flow of a saga orchestration.
- This class resides in a separate class library and has a
- Saga: Class that extends
PersistableSaga.- Resides in the service which initiates the saga.
- Overrides
ExecuteCurrentActionAsyncandExecuteCurrentCompensatingActionAsyncmethods to define and dispatch commands. - Implement
ISagaCommandResultHandlerto handle reponses from services which publish results to the event bus.
- Evaluators: Classes which inherit from
SagaCommandResultEvaluatorand overrideEvaluateCommandResultAsyncto determine whether a command result matches its expected result. - Commands: Records which extend
SagaCommandand contain required data. - Dispatchers: Class responsible for dispatching saga commands.
- Command Handlers: Classes which inherit from
ResultDispatchingSagaCommandHandlerand overrideHandleCommandAsyncto publish integration events to the event bus. - Event Handlers: Subscribe to event bus topics and implement
ISagaCommandResultDispatcherto dispatch responses to the saga class.
Steps
A. Common Project: Integration Models and Events
- Create a new class library project called Common.
- This will contain classes that are shared between a sagas in the Order Service and other services which participate in the saga.
- Create an Integration folder.
- Create the following request and response models in an Integration/Models folder.
- For example:
public record CustomerCreditReserveResponse(Guid CustomerId, decimal CreditRequested, decimal CreditAvailable, bool Success);- CustomerCreditReserveRequest
- CustomerCreditReserveResponse
- CustomerCreditReleaseRequest
- CustomerCreditReleaseResponse
- ProductInventoryReserveRequest
- ProductInventoryReserveResponse
- ProductInventoryReleaseRequest
- ProductInventoryReleaseResponse
- Create the following integration events in an Integration/Events folder.
- For example:
public record CustomerCreditReserveFulfilled(CustomerCreditReserveResponse CustomerCreditReserveResponse) : IntegrationEvent;- CustomerCreditReserveRequested
- CustomerCreditReserveFulfilled
- CustomerCreditReleaseRequested
- CustomerCreditReleaseFulfilled
- ProductInventoryReserveRequested
- ProductInventoryReserveFulfilled
- ProductInventoryReleaseRequested
- ProductInventoryReleaseFulfilled
B. Order Service: Create Order Saga
- Create a new Web API project which will contain one or more sagas.
- Remove WeatherForecast class and controller.
dotnet new webapi --name OrderService - Add NuGet packages.
- MongoDB.Driver
- URF.Core.Mongo
- EventDriven.EventBus.Dapr
- EventDriven.DependencyInjection.URF.Mongo
- EventDriven.EventBus.Dapr.EventCache.Mongo
- AutoMapper.Extensions.Microsoft.DependencyInjection
- EventDriven.Sagas.Abstractions
- EventDriven.Sagas.DependencyInjection
- EventDriven.Sagas.Persistence.Mongo
- EventDriven.Sagas.Configuration.Mongo
- Add a Domain folder.
- Add an OrderAggregate folder to the Domain folder.
- Place
OrderState,OrderItemandOrderclasses in this folder.
- The
OrderStateenum serves as a semantic lock, which prevents duplicate sagas for a specific order from taking place while the order is in a pending state.public enum OrderState { Initial, Pending, Created } - Add a Repositories folder at the projet root with an
IOrderRepositoryinterface and aOrderRepositoryclass.OrderRepositoryaccepts anIDocumentRepository<Order>constructor parameter, which is uses to retrieve and persistOrderentities to MongoDB.
public interface IOrderRepository { Task<Order?> GetAsync(Guid id); Task<Order?> AddOAsync(Order entity); Task<Order?> UpdateAsync(Order entity); Task<Order?> AddUpdateAsync(Order entity); Task<int> RemoveAsync(Guid id); Task<OrderState?> GetOrderStateAsync(Guid id); Task<Order?> UpdateOrderStateAsync(Guid id, OrderState orderState); }public class OrderRepository : DocumentRepository<Order>, IOrderRepository { private readonly AsyncLock _syncRoot = new(); public OrderRepository( IMongoCollection<Order> collection) : base(collection) { } public async Task<IEnumerable<Order>> GetOrders() => await FindManyAsync(); public async Task<IEnumerable<Order>> GetCustomerOrders(Guid customerId) => await FindManyAsync(e => e.CustomerId == customerId); public async Task<Order?> GetAsync(Guid id) => await FindOneAsync(e => e.Id == id); public async Task<Order?> AddOAsync(Order entity) { using (await _syncRoot.LockAsync()) { var existing = await GetAsync(entity.Id); if (existing != null) throw new ConcurrencyException(entity.Id); entity.ETag = Guid.NewGuid().ToString(); return await InsertOneAsync(entity); } } public async Task<Order?> UpdateAsync(Order entity) { using (await _syncRoot.LockAsync()) { var existing = await GetAsync(entity.Id); if (existing == null) return null; if (string.Compare(entity.ETag, existing.ETag, StringComparison.OrdinalIgnoreCase) != 0) throw new ConcurrencyException(entity.Id); entity.ETag = Guid.NewGuid().ToString(); return await FindOneAndReplaceAsync(e => e.Id == entity.Id, entity); } } public async Task<Order?> AddUpdateAsync(Order entity) { using (await _syncRoot.LockAsync()) { Order? result; var existing = await GetAsync(entity.Id); if (existing == null) result = await AddOAsync(entity); else { entity.ETag = existing.ETag; result = await UpdateAsync(entity); } return result; } } public async Task<int> RemoveAsync(Guid id) => await DeleteOneAsync(e => e.Id == id); public async Task<OrderState?> GetOrderStateAsync(Guid id) { using (await _syncRoot.LockAsync()) { var existing = await GetAsync(id); return existing?.State; } } public async Task<Order?> UpdateOrderStateAsync(Guid id, OrderState orderState) { using (await _syncRoot.LockAsync()) { var existing = await GetAsync(id); if (existing == null) return null; existing.State = orderState; return await UpdateAsync(existing); } } } - Add a Sagas folder to the project root. Place a CreateOrder folder within it.
- Add a
CreateOrderSagaclass to the CreateOrder folder. - Derive the class from the
PersistableSagaabstract class. - Override the
CheckLockmethod to handleCheckSagaLockCommand. - Add a ctor that accepts
ISagaCommandDispatcher,IEnumerable<ISagaCommandResultEvaluator>. - Override the
ExecuteCurrentActionAsyncmethod, switching on the action command.
protected override async Task ExecuteCurrentActionAsync() { var action = GetCurrentAction(); if (Entity is Order order) { switch (action.Command) { case Commands.CreateOrder: SetActionStateStarted(action); SetActionCommand(action, order); await SagaCommandDispatcher.DispatchCommandAsync(action.Command, false); break; case ReserveCustomerCredit command: command.CustomerId = order.CustomerId; command.CreditRequested = order.Quantity * order.OrderItems.Sum(e => e.ProductPrice); SetActionStateStarted(action); SetActionCommand(action); await SagaCommandDispatcher.DispatchCommandAsync(action.Command, false); break; case ReserveProductInventory command: command.InventoryId = order.InventoryId; command.AmountRequested = order.Quantity; SetActionStateStarted(action); SetActionCommand(action); await SagaCommandDispatcher.DispatchCommandAsync(action.Command, false); break; case SetOrderStateCreated: SetActionStateStarted(action); SetActionCommand(action, order); await SagaCommandDispatcher.DispatchCommandAsync(action.Command, false); break; } return; } await base.ExecuteCurrentActionAsync(); }- Override
ExecuteCurrentCompensatingActionAsync.
protected override async Task ExecuteCurrentCompensatingActionAsync() { var action = GetCurrentCompensatingAction(); if (Entity is Order order) { switch (action.Command) { case ReleaseProductInventory command: command.InventoryId = order.InventoryId; command.AmountRequested = order.Quantity; SetActionStateStarted(action); SetActionCommand(action); await SagaCommandDispatcher.DispatchCommandAsync(action.Command, true); break; case ReleaseCustomerCredit command: command.CustomerId = order.CustomerId; command.CreditReleased = order.Quantity * order.OrderItems.Sum(e => e.ProductPrice); SetActionStateStarted(action); SetActionCommand(action); await SagaCommandDispatcher.DispatchCommandAsync(action.Command, true); break; case SetOrderStateInitial: SetActionStateStarted(action); SetActionCommand(action); await SagaCommandDispatcher.DispatchCommandAsync(action.Command, true); break; } return; } await base.ExecuteCurrentCompensatingActionAsync(); }- Override
ExecuteAfterStepto callPersistAsync. - Implement
ISagaCommandResultHandler<OrderState>to handle the command result. Then do the same for the command result handlers for customer credit.
public async Task HandleCommandResultAsync(OrderState result, bool compensating) { SetCurrentActionCommandResult(result); await HandleCommandResultForStepAsync<CreateOrderSaga, OrderState, OrderState>(compensating); } - Add a
- Add a Commands folder to the CreateOrder folder in Sagas.
- Add records for the following commands. Each command extends
SagaCommand.- CreateOrder
- GetOrderState
- ReserveCustomerCredit
- ReleaseCustomerCredit
- ReserveProductInventory
- ReleaseProductInventory
- SetOrderStateCreated
- SetOrderStateInitial
- Add records for the following commands. Each command extends
- Add a Dispatchers folder to the CreateOrder folder in Sagas.
- Create a
CreateOrderSagaCommandDispatcherclass that extendsSagaCommandDispatcher. - Override
DispatchCommandAsyncto dispatch various commands.
public class CreateOrderSagaCommandDispatcher : SagaCommandDispatcher { public CreateOrderSagaCommandDispatcher(IEnumerable<ISagaCommandHandler> sagaCommandHandlers) : base(sagaCommandHandlers) { } public override async Task DispatchCommandAsync(SagaCommand command, bool compensating) { switch (command.GetType().Name) { case nameof(Commands.CreateOrder): await DispatchSagaCommandHandlerAsync<Commands.CreateOrder>(command); break; case nameof(SetOrderStateInitial): await DispatchSagaCommandHandlerAsync<SetOrderStateInitial>(command); break; case nameof(ReserveCustomerCredit): await DispatchSagaCommandHandlerAsync<ReserveCustomerCredit>(command); break; case nameof(ReleaseCustomerCredit): await DispatchSagaCommandHandlerAsync<ReleaseCustomerCredit>(command); break; case nameof(ReserveProductInventory): await DispatchSagaCommandHandlerAsync<ReserveProductInventory>(command); break; case nameof(ReleaseProductInventory): await DispatchSagaCommandHandlerAsync<ReleaseProductInventory>(command); break; case nameof(SetOrderStateCreated): await DispatchSagaCommandHandlerAsync<SetOrderStateCreated>(command); break; } } } - Create a
- Add a Handlers folder to the CreateOrder folder in Sagas.
- Add classes for the following:
- CheckSagaLockCommandHandler
- CreateOrderCommandHandler
- ReserveCustomerCreditCommandHandler
- ReleaseCustomerCreditCommandHandler
- ReserveProductInventoryCommandHandler
- ReleaseProductInventoryCommandHandler
- SetOrderStateCreatedCommandHandler
- SetOrderStateInitialCommandHandler
- Add classes for the following:
- Add a Evaluators folder to the CreateOrder folder in Sagas.
- Add classes for the following:
- ReserveCustomerCreditResultEvaluator
- ReleaseCustomerCreditResultEvaluator
- ReserveProductInventoryResultEvaluator
- ReleaseProductInventoryResultEvaluator
- SetOrderStateResultEvaluator
- Add classes for the following:
- Add a Commands folder to the OrderAggregate folder.
- Add a
StartCreateOrderSagarecord.public record StartCreateOrderSaga(Order Entity) : Command<Order>(Entity); - Place a
StartCreateOrderSagaCommandHandlerclass in a Handlers folder under Commands. - Implement
ICommandHandler<Order, StartCreateOrderSaga>. - Inject
IOrderRepository,CreateOrderSagainto the constructor. - In the
Handlemethod, callStartSagaAsyncon the saga, then query the order repository to return the newly created order.
public class StartCreateOrderSagaHandler : ICommandHandler<Order, StartCreateOrderSaga> { private readonly IOrderRepository _repository; private readonly ISagaPool<CreateOrderSaga> _sagaPool; private readonly ILogger<StartCreateOrderSagaHandler> _logger; public StartCreateOrderSagaHandler( IOrderRepository repository, ISagaPool<CreateOrderSaga> sagaPool, ILogger<StartCreateOrderSagaHandler> logger) { _repository = repository; _sagaPool = sagaPool; _logger = logger; } public async Task<CommandResult<Order>> Handle(StartCreateOrderSaga command, CancellationToken cancellationToken) { if (command.Entity == null) return new CommandResult<Order>(CommandOutcome.InvalidCommand); var domainEvent = command.Entity.Process(command); command.Entity.Apply(domainEvent); try { // Create saga var saga = _sagaPool.CreateSaga(); // Start create order saga await saga.StartSagaAsync(command.Entity, command.OrderMetadata, cancellationToken); // Return created order var order = await _repository.GetAsync(command.Entity.Id); return order == null ? new CommandResult<Order>(CommandOutcome.NotFound) : new CommandResult<Order>(CommandOutcome.Accepted, order); } catch (SagaLockedException e) { _logger.LogError(e, "{Message}", e.Message); return new CommandResult<Order>(CommandOutcome.Conflict, e.ToErrors()); } } } - Add a
- Add a Handler folder to an Integration folder at the project root.
- Add a
CustomerCreditReserveFulfilledEventHandlerclass. - Override
HandleAsyncto dispatch the command result to the saga when a message is received. - Do the same for
CustomerCreditReleaseFulfilledEventHandler.
public class CustomerCreditReserveFulfilledEventHandler : IntegrationEventHandler<CustomerCreditReserveFulfilled> { private readonly ISagaPool<CreateOrderSaga> _sagaPool; private readonly ILogger<CustomerCreditReserveFulfilledEventHandler> _logger; public CustomerCreditReserveFulfilledEventHandler( ISagaPool<CreateOrderSaga> sagaPool, ILogger<CustomerCreditReserveFulfilledEventHandler> logger) { _sagaPool = sagaPool; _logger = logger; } public async Task DispatchCommandResultAsync(CustomerCreditReserveResponse commandResult, bool compensating) { // Get saga from pool to handle command result var saga = _sagaPool[commandResult.CorrelationId]; await saga.HandleCommandResultAsync(commandResult, compensating); } public override async Task HandleAsync(CustomerCreditReserveFulfilled @event) { _logger.LogInformation("Handling event: {EventName}", $"v1.{nameof(CustomerCreditReserveFulfilled)}"); await DispatchCommandResultAsync(new CustomerCreditReserveResponse( @event.CustomerCreditReserveResponse.CustomerId, @event.CustomerCreditReserveResponse.CreditRequested, @event.CustomerCreditReserveResponse.CreditAvailable, @event.CustomerCreditReserveResponse.Success, @event.CustomerCreditReserveResponse.CorrelationId ), false); } } - Add a
- Create DTO's with an auto mapper profile.
- Add a DTO folder to the project root with
OrderandOrderItemDTO's.- Include the same properties as the domain classes, but add
IdandETagfrom theEntitybase class.
- Include the same properties as the domain classes, but add
- Add a
AutoMapperProfileclass to a Mapping folder at the project root.
public class AutoMapperProfile : Profile { public AutoMapperProfile() { CreateMap<Entities.Order, Order>(); CreateMap<Entities.Order, Order>().ReverseMap(); CreateMap<Entities.OrderItem, OrderItem>(); CreateMap<Entities.OrderItem, OrderItem>().ReverseMap(); } } - Add a DTO folder to the project root with
- Add controllers to the Controllers folder.
- Add a
OrderQueryControllerclass that uses anIQueryBrokerto execute queries and retrieve orders, mapping the result to DTO's using anIMapper. - Add a
OrderCommandControllerclass that accepts a constructor parameter ofICommandBrokerto execute aStartCreateOrderSagacommand.
- Add a
- Configure services and endpoints in
Program.- Automapper
builder.Services.AddAutoMapper(typeof(Program));- Repositories and database settings
builder.Services.AddSingleton<IOrderRepository, OrderRepository>(); builder.Services.AddMongoDbSettings<OrderDatabaseSettings, Order>(builder.Configuration); builder.Services.AddMongoDbSettings<SagaConfigDatabaseSettings, SagaConfigurationDto>(builder.Configuration); builder.Services.AddMongoDbSettings<SagaSnapshotDatabaseSettings, SagaSnapshotDto>(builder.Configuration);- Command and query handlers
builder.Services.AddHandlers(typeof(Program));- Behaviors
builder.Services.AddTransient(typeof(IPipelineBehavior<,>), typeof(LoggingBehavior<,>));- App settings
builder.Services.AddAppSettings<SagaConfigSettings>(builder.Configuration);- Sagas
builder.Services.AddSaga<CreateOrderSaga, CreateOrderSagaCommandDispatcher, SagaConfigRepository, SagaSnapshotRepository, SagaConfigSettings>( builder.Configuration);- Event Bus and event handlers
builder.Services.AddDaprEventBus(builder.Configuration); builder.Services.AddDaprMongoEventCache(builder.Configuration); builder.Services.AddSingleton<CustomerCreditReserveFulfilledEventHandler>(); builder.Services.AddSingleton<CustomerCreditReleaseFulfilledEventHandler>(); builder.Services.AddSingleton<ProductInventoryReserveFulfilledEventHandler>(); builder.Services.AddSingleton<ProductInventoryReleaseFulfilledEventHandler>();- Routing
- Place prior to
app.UseAuthorization();
- Place prior to
app.UseRouting();- Map Dapr Event Bus subscribers
app.UseCloudEvents(); app.UseEndpoints(endpoints => { endpoints.MapControllers(); endpoints.MapSubscribeHandler(); endpoints.MapDaprEventBus(eventBus => { var customerCreditReservedEventHandler = app.Services.GetRequiredService<CustomerCreditReserveFulfilledEventHandler>(); var customerCreditReleasedEventHandler = app.Services.GetRequiredService<CustomerCreditReleaseFulfilledEventHandler>(); var productInventoryReservedEventHandler = app.Services.GetRequiredService<ProductInventoryReserveFulfilledEventHandler>(); var productInventoryReleasedEventHandler = app.Services.GetRequiredService<ProductInventoryReleaseFulfilledEventHandler>(); eventBus.Subscribe(customerCreditReservedEventHandler, nameof(CustomerCreditReserveFulfilled), "v1"); eventBus.Subscribe(customerCreditReleasedEventHandler, nameof(CustomerCreditReleaseFulfilled), "v1"); eventBus.Subscribe(productInventoryReservedEventHandler, nameof(ProductInventoryReserveFulfilled), "v1"); eventBus.Subscribe(productInventoryReleasedEventHandler, nameof(ProductInventoryReleaseFulfilled), "v1"); }); }); - Add entries to appsettings.json.
{ "Logging": { "LogLevel": { "Default": "Information", "Microsoft.AspNetCore": "Warning" } }, "AllowedHosts": "*", "SagaConfigSettings" : { "SagaConfigId": "d89ffb1e-7481-4111-a4dd-ac5123217293", "OverrideLockCheck": false }, "OrderDatabaseSettings": { "ConnectionString": "mongodb://localhost:27017", "DatabaseName": "OrderDb", "CollectionName": "Orders" }, "SagaConfigDatabaseSettings": { "ConnectionString": "mongodb://localhost:27017", "DatabaseName": "SagaConfigDb", "CollectionName": "SagaConfigurations" }, "SagaSnapshotDatabaseSettings": { "ConnectionString": "mongodb://localhost:27017", "DatabaseName": "SagaSnapshotDb", "CollectionName": "SagaSnapshots" }, "DaprEventBusOptions": { "PubSubName": "pubsub" }, "DaprEventCacheOptions": { "DaprStateStoreOptions": { "StateStoreName": "statestore-mongodb" }, "EventCacheTimeout": "00:01:00", "EventCacheCleanupInterval": "00:05:00" }, "DaprStoreDatabaseSettings": { "ConnectionString": "mongodb://localhost:27017", "DatabaseName": "daprStore", "CollectionName": "daprCollection" }, "DaprEventBusSchemaOptions": { "UseSchemaRegistry": true, "SchemaValidatorType": "Json", "SchemaRegistryType": "Mongo", "AddSchemaOnPublish": true, "MongoStateStoreOptions": { "ConnectionString": "mongodb://localhost:27017", "DatabaseName": "schema-registry", "SchemasCollectionName": "schemas" } } }
C. Customer Service: Handle integration events
- Create a new Web API project for a customer service.
- Remove WeatherForecast class and controller.
dotnet new webapi --name CustomerService - Add NuGet packages.
- MongoDB.Driver
- URF.Core.Mongo
- EventDriven.EventBus.Dapr
- EventDriven.DDD.Abstractions
- EventDriven.Sagas.DependencyInjection
- EventDriven.DependencyInjection.URF.Mongo
- EventDriven.EventBus.Dapr.EventCache.Mongo
- Microsoft.VisualStudio.Web.CodeGeneration.Design
- AutoMapper.Extensions.Microsoft.DependencyInjection
- Add a
Customerentity to a Domain/CustomerAggregate folder.- Add the following commands to a Commands folder.
- ReserveCredit
- ReleaseCredit
- Add the following events to an Events folder.
- CreditReserveSucceeded
- CreditReserveFailed
- CreditReleased
- Update
Customerto implement:ICommandProcessor<ReserveCredit, Customer, CreditReserved>IEventApplier<CreditReserveSucceeded>IEventApplier<CreditReserveFailed>ICommandProcessor<ReleaseCredit, Customer, CreditReleased>IEventApplier<CreditReleased>
public IDomainEvent Process(ReserveCredit command) { // If customer has sufficient credit, return CreditReserveSucceeded event if (CreditAvailable >= command.CreditRequested) return new CreditReserveSucceeded(command.EntityId, command.CreditRequested) { EntityETag = ETag }; // Otherwise, return CreditReserveFailed event return new CreditReserveFailed(command.EntityId, command.CreditRequested) { EntityETag = ETag }; } - Add the following commands to a Commands folder.
- Add a
CustomerCreditReserveRequestedEventHandlerclass to an Integration/Handlers folder.- Inject
ICommandHandler<Customer, ReserveCredit>into the constructor. - Extend
IntegrationEventHandler<CustomerCreditReserveRequested>. - Override
HandleAsyncto pass aReserveCreditcommand to the command handler.
public override async Task HandleAsync(CustomerCreditReserveRequested @event) { _logger.LogInformation("Handling event: {EventName}", $"v1.{nameof(CustomerCreditReserveRequested)}"); var command = new ReserveCredit( @event.CustomerCreditReserveRequest.CustomerId, @event.CustomerCreditReserveRequest.CreditReserved, @event.CustomerCreditReserveRequest.CorrelationId); await _commandHandler.Handle(command, CancellationToken.None); }- Repeat with a
CustomerCreditReserveReleaseEventHandlerclass.
- Inject
- Add
ICustomerRepositoryandCustomerRepositoryclasses to a Repositories folder at the project root.public interface ICustomerRepository { Task<IEnumerable<Customer>> GetAsync(); Task<Customer?> GetAsync(Guid id); Task<Customer?> AddAsync(Customer entity); Task<Customer?> UpdateAsync(Customer entity); Task<int> RemoveAsync(Guid id); }public class CustomerRepository : DocumentRepository<Customer>, ICustomerRepository { private readonly AsyncLock _syncRoot = new(); public CustomerRepository( IMongoCollection<Customer> collection) : base(collection) { } public async Task<IEnumerable<Customer>> GetAsync() => await FindManyAsync(); public async Task<Customer?> GetAsync(Guid id) => await FindOneAsync(e => e.Id == id); public async Task<Customer?> AddAsync(Customer entity) { using (await _syncRoot.LockAsync()) { var existing = await FindOneAsync(e => e.Id == entity.Id); if (existing != null) return null; entity.ETag = Guid.NewGuid().ToString(); return await InsertOneAsync(entity); } } public async Task<Customer?> UpdateAsync(Customer entity) { using (await _syncRoot.LockAsync()) { var existing = await GetAsync(entity.Id); if (existing == null) return null; if (string.Compare(entity.ETag, existing.ETag, StringComparison.OrdinalIgnoreCase) != 0) throw new ConcurrencyException(); entity.ETag = Guid.NewGuid().ToString(); return await FindOneAndReplaceAsync(e => e.Id == entity.Id, entity); } } public async Task<int> RemoveAsync(Guid id) => await DeleteOneAsync(e => e.Id == id); } - Create a
CustomerCommandHandlerclass in a Domain/CustomerAggregate/Handlers folder.- Inject an
IEventBusinto the constructor. - Create a private
PublishCreditReservedResponsehelper method to publish aCustomerCreditReserveFulfilledintegration event to the event bus.
private async Task<CommandResult<Customer>> PublishCreditReservedResponse(Customer customer, decimal creditRequested, bool success) { try { var @event = new CustomerCreditReserveFulfilled( new CustomerCreditReserveResponse(customer.Id, creditRequested, customer.CreditAvailable, success)); await _eventBus.PublishAsync(@event, nameof(CustomerCreditReserveFulfilled), "v1"); return new CommandResult<Customer>(CommandOutcome.Accepted, customer); } catch (SchemaValidationException e) { _logger.LogError("{Message}", e.Message); return new CommandResult<Customer>(CommandOutcome.NotHandled); } }- Implement
ICommandHandler<Customer, ReserveCredit>to process theReserveCreditcommand, pubish theCustomerCreditReserveFulfilledevent, and persist the credit reservation.
public async Task<CommandResult<Customer>> Handle(ReserveCredit command) { // Process command to determine if customer has sufficient credit var customer = await _repository.GetAsync(command.EntityId); if (customer == null) return new CommandResult<Customer>(CommandOutcome.InvalidCommand); var domainEvent = customer.Process(command); // Return if credit reservation unsuccessful if (domainEvent is not CreditReserveSucceeded succeededEvent) return await PublishCreditReservedResponse(customer, command.CreditRequested, false); // Apply events to reserve credit customer.Apply(succeededEvent); Customer? entity = null; CommandResult<Customer> result; try { // Persist credit reservation entity = await _repository.UpdateAsync(customer); if (entity == null) return new CommandResult<Customer>(CommandOutcome.InvalidCommand); result = await PublishCreditReservedResponse(entity, command.CreditRequested, true); // Reverse persistence if publish is unsuccessful if (result.Outcome != CommandOutcome.Accepted) { var creditReleasedEvent = customer.Process( new ReleaseCredit(customer.Id, command.CreditRequested)); customer.Apply(creditReleasedEvent); entity = await _repository.UpdateAsync(customer); if (entity == null) return new CommandResult<Customer>(CommandOutcome.InvalidCommand); } } catch (ConcurrencyException e) { _logger.LogError("{Message}", e.Message); result = await PublishCreditReservedResponse(entity ?? customer, command.CreditRequested, false); } return result; } - Inject an
- Add DTO and Mapping folders.
- Copy properties from
Customer. - Add
IdandETagproperties. - Add an
AutoMapperProfileclass to Mapping that extendsProfile.
- Copy properties from
- Add a
CustomerQueryControllerclass to the Controllers folder.- Inject
IQueryBrokerandIMapperinto the constructor. - Flesh out
Getmethods. - Map results to DTO's.
- Inject
- Add a
CustomerCommandControllerclass to the Controllers folder.- Inject
ICommandBrokerandIMapperinto the constructor. - Call
Handleron the command handler to process commands. - Map parameters and results to DTO's.
- Inject
- Add a
CustomerDatabaseSettingsclass to a Configuration folder.- Extend
MongoDbSettings.
- Extend
- Configure services and endpoints in
Program.- Automapper
builder.Services.AddAutoMapper(typeof(Program));- Repositories and database settings
builder.Services.AddSingleton<ICustomerRepository, CustomerRepository>(); builder.Services.AddMongoDbSettings<CustomerDatabaseSettings, Customer>(builder.Configuration);- Command and query handlers
builder.Services.AddHandlers(typeof(Program));- Event Bus and event handlers
builder.Services.AddDaprEventBus(builder.Configuration); builder.Services.AddDaprMongoEventCache(builder.Configuration); builder.Services.AddSingleton<CustomerCreditReserveRequestedEventHandler>(); builder.Services.AddSingleton<CustomerCreditReleaseRequestedEventHandler>();- Routing
- Place prior to
app.UseAuthorization();
- Place prior to
app.UseRouting();- Map Dapr Event Bus subscribers
app.UseCloudEvents(); app.UseEndpoints(endpoints => { endpoints.MapControllers(); endpoints.MapSubscribeHandler(); endpoints.MapDaprEventBus(eventBus => { var customerCreditRequestedEventHandler = app.Services.GetRequiredService<CustomerCreditReserveRequestedEventHandler>(); var customerCreditReleasedEventHandler = app.Services.GetRequiredService<CustomerCreditReleaseRequestedEventHandler>(); eventBus.Subscribe(customerCreditRequestedEventHandler, nameof(CustomerCreditReserveRequested), "v1"); eventBus.Subscribe(customerCreditReleasedEventHandler, nameof(CustomerCreditReleaseRequested), "v1"); }); }); - Add entries to appsettings.json.
{
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft.AspNetCore": "Warning"
}
},
"AllowedHosts": "*",
"CustomerDatabaseSettings": {
"ConnectionString": "mongodb://localhost:27017",
"DatabaseName": "CustomerDb",
"CollectionName": "Customers"
},
"DaprEventBusOptions": {
"PubSubName": "pubsub"
},
"DaprEventCacheOptions": {
"DaprStateStoreOptions": {
"StateStoreName": "statestore-mongodb"
},
"EventCacheTimeout": "00:01:00",
"EventCacheCleanupInterval": "00:05:00"
},
"DaprStoreDatabaseSettings": {
"ConnectionString": "mongodb://localhost:27017",
"DatabaseName": "daprStore",
"CollectionName": "daprCollection"
},
"DaprEventBusSchemaOptions": {
"UseSchemaRegistry": true,
"SchemaValidatorType": "Json",
"SchemaRegistryType": "Mongo",
"AddSchemaOnPublish": true,
"MongoStateStoreOptions": {
"ConnectionString": "mongodb://localhost:27017",
"DatabaseName": "schema-registry",
"SchemasCollectionName": "schemas"
}
}
}
- Update tye.yaml at reference-architecture to specify participating services.
- Specify the same port number as in Properties/launchSettings.json.
- Also update tye.yaml in test/OrderService.Sagas.Specs.
- Update
CreateOrderSagaConfigDefinition.CreateSagaConfigto include steps from all participating services.- Run the
sagaconfigglobal tool to re-create CreateOrderSagaConfigDefinition.json in the json folder of the SagaConfigDefinitions project and post it to the SagaConfigService.
- Run the
- Refactor the feature, hook and step definition files in OrderService.Sagas.Specs to include participating services.
- Copy the contents of CreateOrderSagaConfigDefinition.json from SagaConfigDefinitions to the sagaconfig.json file in the json folder.