The Transactional Outbox Pattern implementation in .Net Core

Mortaza Ghahremani
5 min readApr 12, 2021

You could find the Sample Project code here :

https://github.com/mogh64/OutboxPatternSample

A service command typically needs to update the database and send messages/events. For example, a service that participates in a saga needs to atomically update the database and sends messages/events. Similarly, a service that publishes a domain event must atomically update an aggregate and publish an event. The database update and sending of the message must be atomic in order to avoid data inconsistencies and bugs. However, it is not viable to use a distributed transaction that spans the database and the message broker to atomically update the database and publish messages/events.

Suppose that in a microservice implemented architecture, you change some data in one of your services that has a relational database such as MSSQL Server as an underlying database, so you would want to propagate this change to another service with the elastic-search database as a search engine.

So the problem here is that how to reliably/atomically update the database and publish messages/events?

The solution is, a service that uses a relational database inserts messages/events into an outbox table (e.g. MESSAGE) as part of the local transaction. A service that uses a NoSQL database appends the messages/events to attribute of the record (e.g. document or item) being updated. A separate Message Relay process publishes the events inserted into the database to a message broker.

I have implemented a sample project to describe it in practice, so you could find it here: https://github.com/mogh64/OutboxPatternSample

So, for implementing this idea we should add below architectural components to our basic design:

Let’s go into the code!

At first, I will define IMessagePublisher Interface that has two separate implementations: OutboxPublisher which publishes messages into the Outbox table in the database, and RabbitMessagePublisher that publish messages from the outbox table into RabbitMQ message broker in the Worker service.

public interface IMessagePublisher{Task PublishAsync(object @event);}

So the OutboxPublisher would be :

public class OutboxPublisher : IMessagePublisher {private readonly IOutboxRepository _repository;public OutboxPublisher(IOutboxRepository repository){   this._repository = repository;}public async Task PublishAsync(object @event) 
{
Guid eventId = (Guid)@event.GetType().GetProperty("EventId").GetValue(@event, null);DateTime publishDateTime = (DateTime)@event.GetType().GetProperty("PublishDateTime").GetValue(@event, null);var outboxMessage = new OutboxMessage(@event, eventId, publishDateTime);_repository.CreateOutboxMessage(outboxMessage);await _repository.SaveChange();}}

OutboxPublisher creates an OutboxMessage from the received event and stores it in the database using a repository.

The OutboxMessage could be defined like the below code:

public class OutboxMessage{protected OutboxMessage(){}public OutboxMessage(object message, Guid eventId, DateTime eventDate){Data = JsonConvert.SerializeObject(message);Type = message.GetType().FullName + ", " +message.GetType().Assembly.GetName().Name;EventId = eventId;EventDate = eventDate;State = OutboxMessageState.ReadyToSend;ModifiedDate = DateTime.Now;}public long Id { get; protected set; }public string Data { get; protected set; }public string Type { get; protected set; }public Guid EventId { get; protected set; }public DateTime EventDate { get; protected set; }public OutboxMessageState State { get; private set; }public DateTime ModifiedDate { get; set; }public void ChangeState(OutboxMessageState state){State = state;this.ModifiedDate = DateTime.Now;}public object RecreateMessage() =>JsonConvert.DeserializeObject(Data, System.Type.GetType(Type));}

Data in OutboxMessage is a JSON structure of the event which is stored in the database, so by RecreateMessage, the event object is reshaped using the stored JSON data.

public interface IOutboxRepository{void CreateOutboxMessage(OutboxMessage outboxMessage);Task UpdateOutboxMesageSatate(Guid eventId, OutboxMessageState state);Task<List<OutboxMessage>> GetAllReadyToSend();Task SaveChange();}

OutboxRepository Implementation could be like this:

public class OutboxRepository : IOutboxRepository{private readonly SampleDbContext _context;public OutboxRepository(SampleDbContext context){_context = context;}public void CreateOutboxMessage(OutboxMessage outboxMessage){_context.OutboxMessages.Add(outboxMessage);}public async Task UpdateOutboxMesageSatate(Guid eventId, OutboxMessageState state){var outbox = await _context.OutboxMessages.FirstOrDefaultAsync(m => m.EventId == eventId);outbox.ChangeState(state);}public Task<List<OutboxMessage>> GetAllReadyToSend(){return _context.OutboxMessages.Where(m => m.State == OutboxMessageState.ReadyToSend).ToListAsync();}public Task SaveChange(){return _context.SaveChangesAsync();}}

Afterward, on the worker side, we have an implementation of IMessagePublisher that publishes messages to RabbitMQ. For abstracting communication with RabbitMQ message broker I have used Masstransit message bus:

public class MasstransitRabbitMessagePublisher : IMessagePublisher{private readonly IPublishEndpoint publisher;public MasstransitRabbitMessagePublisher(IPublishEndpoint publisher){this.publisher = publisher;}public Task PublishAsync(object @event){return publisher.Publish(@event);}}

So the worker has a job scheduler like Quartz that in a specified period reads from the outbox table and publish events to the message broker :

[DisallowConcurrentExecution]public class OutboxJob : IJob{private readonly ILogger<OutboxJob> logger;private readonly IWorkerOutboxRepository repository;private readonly IMessagePublisher messagePublisher;public OutboxJob(ILogger<OutboxJob> logger,IWorkerOutboxRepository repository,IMessagePublisher messagePublisher){this.logger = logger;this.repository = repository;this.messagePublisher = messagePublisher;}public async Task Execute(IJobExecutionContext context){var readyToSendItems = await repository.GetReadyToSendMessages();foreach (var item in readyToSendItems){var eventMessage = item.RecreateMessage();await messagePublisher.PublishAsync(eventMessage);item.ChangeState(OutboxMessageState.SendToQueue);}await repository.SaveAsync();}}

The next step is consuming messages from the message broker and store them in the elastic-search database. So it is the responsibility of the Consumer component to do this. MassTransit give you an abstraction to consume messages by implementing a specific Consumer for each message.

public class ProductRegisteredConsumer : IConsumer<ProductRegistered>{private readonly ISyncRepository<Product> syncRepository;public ProductRegisteredConsumer(ISyncRepository<Product> syncRepository){this.syncRepository = syncRepository;}public Task Consume(ConsumeContext<ProductRegistered> context){return syncRepository.AddAsync(new Product(){Id = context.Message.Id,Name = context.Message.Name,CategoryName= context.Message.CategoryName,Price = context.Message.Price});}}

GenericSyncRepository has implemented to store documents into elastic-search :

public class GenericSyncRepository<TDocument> : ISyncRepository<TDocument> where TDocument : IElasticIndex{ElasticClient client;public GenericSyncRepository(ElasticSearchClientFactory clientFactory){client = clientFactory.CreateClient();}public ElasticClient Client => client;public async Task AddAsync<TInput>(TInput item) where TInput : class{var indexResponse = await client.IndexAsync(item, i => i.Index(typeof(TDocument)));if (!indexResponse.IsValid){throw new Exception("ElasticSearch Fialed To Write!");}}public async Task BulkAsync<TInput>(IEnumerable<TInput> items) where TInput : class{var asyncBulkIndexResponse = await client.BulkAsync(b => b.Index(typeof(TDocument)).IndexMany(items));if (asyncBulkIndexResponse.Errors){throw new Exception("ElasticSearch Fialed To Write!");}}}

Also, we could have a GenericReadRepository for Search API:

public interface IReadRepository<TDocument> where TDocument : class, IElasticIndex{Task<ISearchResponse<TDocument>> PrefixSearchAsync(Expression<Func<TDocument, object>> matchPrefixExpression,string search);}

With this Implementation to connect to elastic-search with Nest library:

public class GenericReadRepository<TDocument> : IReadRepository<TDocument> where TDocument : class, IElasticIndex{private readonly ElasticClient client;public GenericReadRepository(ElasticSearchClientFactory clientFactory){client = clientFactory.CreateClient();}public Task<ISearchResponse<TDocument>> PrefixSearchAsync(Expression<Func<TDocument, object>> matchPrefixExpression, string search){var queryContainer = new QueryContainerDescriptor<TDocument>();queryContainer.Prefix(t => t.Field(matchPrefixExpression.KeywordSuffix()).Value(search));var searchDescriptor = new SearchDescriptor<TDocument>();searchDescriptor = searchDescriptor.Query(q => q.Bool(b => b.Should(queryContainer)));return  client.SearchAsync<TDocument>(searchDescriptor);}}

Finally, just we will call it in the API:

[Route("api/[controller]")][ApiController]public class ProductController : ControllerBase{private readonly IReadRepository<Product> readRepository;public ProductController(IReadRepository<Product> readRepository){this.readRepository = readRepository;}[HttpGet]public async Task<IActionResult> GetProducts([FromQuery]string search){var response = await readRepository.PrefixSearchAsync(x => x.Name, search);return Ok(response.Documents.ToList());}}

Resources:

https://microservices.io/patterns/data/transactional-outbox.html

--

--

Mortaza Ghahremani

A Passionate software developer & engineer with many enthusiasm to learn new technologies and concepts to solve big challenges