How to Create Distributed Transaction Using MassTransit in your .Net Core Micro-services

Mortaza Ghahremani
5 min readJul 24, 2020

--

MassTransit is a free, open-source, lightweight distributed application framework for .Net which makes it easy to create applications and services that leverage message-based, loosely-coupled asynchronous communication for higher availability, reliability, and scalability.

In fact, it is an abstraction over message transport technologies such as RabbitMQ, Azure Service Bus, and it provides an extensive set of developer-friendly features to build durable asynchronous services. There are a lot of description details about it in :

Anyway, suppose that in your microservice ecosystem, there are numerous loosely coupled services that integrated with each other by a transport mechanism such as rabbitMQ, and you have a scenario that multiple loosely coupled services must be called as a transaction to fulfill the required functionality. It could be submitting an order or deposit money from accounts or any other examples of transactions that you can imagine in your business.

In this article, I want to describe how can we implement a transactional scenario using MassTransit.At first, I will describe the scenario:

Imagine we have 3 micro-services: Order, Customer, and Product. We have a Submit order API that its completion is composed of changing the status of the order, withdraw credit from customers, and reduce the product count. Ok, three micro-service has affected to this API.

I’ve uploaded the code into the GitHub repository in the below link so I don’t want to describe configurations in this article and you can refer to this project about the details.

MassTransit has three powerful feature which helps us to implement distributed transaction a: Automatonymous State Machine, Saga, and Courier. By the combination of these cumbersome features, you can implement strength transactional scenarios.

Automatonymous is a powerful state machine syntax to create sagas in MassTransit. Sagas are initiated by an event, sagas orchestrate events, and sagas maintain the state of the overall transaction. Sagas are designed to manage the complexity of a distributed transaction without locking and immediate consistency. They manage state and track any compensations required if a partial failure occurs.

MassTransit Courier is a framework that implements the routing slip pattern.it’s a mechanism for creating and executing distributed transactions with fault compensation that can be used to meet the requirements previously within the domain of database transactions but built to scale across a large system of distributed services.

But How can we use these features in Our code to implement a transactional functionality?

At first, I will start with defining a state machine with MassTransit, then I will combine it with MassTransit Courier to create a transactional functionality.

public OrderStateMachine(ILogger<OrderStateMachine> logger)
{
this.InstanceState(x => x.CurrentState);
this.ConfigureCorrelationIds();
Initially(
When(OrderSubmitted)
.Then(x => x.Instance.OrderId = x.Data.OrderId)
.Then(x => logger.LogInformation($"Order {x.Instance.OrderId} submitted"))
.ThenAsync(c => WithdrawCustomerCreditCommand(c))
.TransitionTo(Submitted)
);
During(Submitted,
When(OrderAccepted)
.Then(x => logger.LogInformation($"Order {x.Instance.OrderId} accepted"))
.ThenAsync(c => TakeProductCommand(c))
.TransitionTo(Accepted));
DuringAny(
When(OrderRejected)
.Then(x => logger.LogInformation($"Order {x.Instance.OrderId} rejected! because {x.Data.Reason}"))
.TransitionTo(Rejected)
.Finalize());
During(Accepted,
When(OrderCompleted)
.Then(x => logger.LogInformation($"Order {x.Instance.OrderId} completed"))
.TransitionTo(Completed)
.Finalize());
}

In the above example, we’ve defined some states and events that lead to a transition to another state. We can use Activity classes instead of calling methods for actions. In the above, TakeProductCommand is a method but it could be encapsulated into an Activity class.

Imagine that we have a sequence of functionalities that should act as a transaction. One good strategy is encapsulating this sequence into an acting class that uses courier.

public OrderCourierStateMachine(ILogger<OrderCourierStateMachine> logger)
{
this.logger = logger;
this.InstanceState(x => x.CurrentState);
this.ConfigureCorrelationIds();
Initially(
When(OrderSubmitted)
.Then(x => x.Instance.OrderId = x.Data.OrderId)
.Then(x => logger.LogInformation($"Order Transaction {x.Instance.OrderId} submitted"))
.Activity(c =>c.OfType<OrderTransactionSubmittedActivity>())
.TransitionTo(Submitted)
);
}

In the above state machine, we have one state but it could be more, In the action corresponding to the submitted state, we’ve defined an activity that activates a courier.

public class OrderTransactionSubmittedActivity : 
Activity<OrderTransactionState, OrderTransactionSubmittedEvent>
{
private readonly ILogger<OrderTransactionSubmittedActivity> logger;
public OrderTransactionSubmittedActivity(ILogger<OrderTransactionSubmittedActivity> logger)
{
this.logger = logger;
}
public void Accept(StateMachineVisitor visitor)
{
visitor.Visit(this);
}
public async Task Execute(BehaviorContext<OrderTransactionState, OrderTransactionSubmittedEvent> context, Behavior<OrderTransactionState, OrderTransactionSubmittedEvent> next)
{
var sendEndpoint = await context.GetSendEndpoint(QueueNames.GetMessageUri(nameof(FullfillOrderMessage)));
logger.LogInformation($"Order Transaction activity for sendEndpoint {sendEndpoint} will be called");
await sendEndpoint.Send<FullfillOrderMessage>(new
{
OrderId = context.Data.OrderId,
Credit = context.Data.Credit,
CustomerId = context.Data.CustomerId
});
}
public Task Faulted<TException>(BehaviorExceptionContext<OrderTransactionState, OrderTransactionSubmittedEvent, TException> context, Behavior<OrderTransactionState, OrderTransactionSubmittedEvent> next) where TException : Exception
{
return next.Faulted(context);
}
public void Probe(ProbeContext context)
{
context.CreateScope("submit-order");
}
}

In the execute method of activity, we activate the courier that is responsible for executing the transaction. We have implemented the courier in a command that enables an event message(FullfillOrderMessage).

public class FullfillOrderConsumer : IConsumer<FullfillOrderMessage>
{
private readonly ILogger<FullfillOrderConsumer> logger;
public FullfillOrderConsumer(ILogger<FullfillOrderConsumer> logger)
{
this.logger = logger;
}
public async Task Consume(ConsumeContext<FullfillOrderMessage> context)
{
logger.LogInformation($"Fullfilled order {context.Message.OrderId}");
var builder = new RoutingSlipBuilder(NewId.NextGuid());
var submitOrderUrl = QueueNames.GetActivityUri(nameof(SubmitOrderActivity));
builder.AddActivity("SubmitOrder", submitOrderUrl, new
{
context.Message.OrderId
});;
builder.AddActivity("Payment", QueueNames.GetActivityUri(nameof(PaymentActivity)), new {
context.Message.OrderId,
context.Message.CustomerId,
context.Message.Credit
});
builder.AddVariable("OrderId", context.Message.OrderId);
var routingSlip = builder.Build();
await context.Execute(routingSlip);
}
}

As you see, In the above command, we are building a courier that comprise of multiple courier activity (courier activities are different from saga activities mention in the previews OrderCourierStateMachine class, courier activities have the execute and compensate method but saga activities doesn’t have any compensate method).

public class PaymentActivity : IActivity<PaymentArgument, PaymentLog>
{
private readonly ILogger<PaymentActivity> logger;
public PaymentActivity(ILogger<PaymentActivity> logger)
{
this.logger = logger;
}
public async Task<CompensationResult> Compensate(CompensateContext<PaymentLog> context)
{
logger.LogInformation($"Payment cpmensated Courier called for customer {context.Log.CustomerId}");
var uri = QueueNames.GetMessageUri(nameof(ReturnCustomerCreditMessage));
var sendEndpoint = await context.GetSendEndpoint(uri);
await sendEndpoint.Send<ReturnCustomerCreditMessage>(new
{
Credit = context.Log.Credit,
CustomerId = context.Log.CustomerId
});
return context.Compensated();
}
public async Task<ExecutionResult> Execute(ExecuteContext<PaymentArgument> context)
{
logger.LogInformation($"Payment Courier called for order {context.Arguments.OrderId}");
var uri = QueueNames.GetMessageUri(nameof(WithdrawCustomerCreditMessage));
var sendEndpoint = await context.GetSendEndpoint(uri);
await sendEndpoint.Send<WithdrawCustomerCreditMessage>(new
{
Credit = context.Arguments.Credit,
CustomerId = context.Arguments.CustomerId,
OrderId = context.Arguments.OrderId
});
return context.Completed(new { CustomerId = context.Arguments.CustomerId, Credit = context.Arguments.Credit });
}
}

Imagine that in our courier we have three activities A, B, and C

(A -> B -> C).So when activity C fails,then for each previews steps automatically compensate method will be called. It means that the compensate method for B and then for A will be called.

It is important to note that when the execute method of a courier activity called, Log Information for compensation method should be provided. (

context.Completed(new { CustomerId  = context.Arguments.CustomerId, Credit = context.Arguments.Credit })

)

public interface PaymentArgument
{
int OrderId { get; }
int CustomerId { get; }
decimal Credit { get; }
}
public interface PaymentLog
{
int CustomerId { get; }
decimal Credit { get; }
}

We’ve explored a way to use saga in a combination of courier framework in MassTransit to define transactions. Courier is a good tool in MassTransit to automate transaction handling in a distributed environment.

The saga state machine has static nature in defining steps but by a combination of dynamic nature of courier, it gives you much flexibility to dynamically define transaction steps.

--

--

Mortaza Ghahremani
Mortaza Ghahremani

Written by Mortaza Ghahremani

A Passionate software developer & engineer with many enthusiasm to learn new technologies and concepts to solve big challenges

Responses (1)