前言
在分布式系統(tǒng)中,由于各個系統(tǒng)服務(wù)之間的獨立性和網(wǎng)絡(luò)通信的不確定性,要確保跨系統(tǒng)的事務(wù)操作的最終一致性是一項重大的挑戰(zhàn)。今天給大家推薦一個.NET開源的處理分布式事務(wù)的解決方案基于 .NET Standard 的 C# 庫:CAP。
CAP項目介紹
CAP 是一個基于 .NET Standard 的 C# 庫,它是一種處理分布式事務(wù)的解決方案,同樣具有 EventBus 的功能,它具有輕量級、易使用、高性能等特點。CAP 是一個EventBus,同時也是一個在微服務(wù)或者SOA系統(tǒng)中解決分布式事務(wù)問題的一個框架。它有助于創(chuàng)建可擴(kuò)展,可靠并且易于更改的微服務(wù)系統(tǒng)。
什么是 EventBus?
事件總線是一種機(jī)制,它允許不同的組件彼此通信而不彼此了解。組件可以將事件發(fā)送到Eventbus,而無需知道是誰來接聽或有多少其他人來接聽。組件也可以偵聽Eventbus上的事件,而無需知道誰發(fā)送了事件。這樣,組件可以相互通信而無需相互依賴。同樣,很容易替換一個組件。只要新組件了解正在發(fā)送和接收的事件,其他組件就永遠(yuǎn)不會知道。
CAP架構(gòu)預(yù)覽
CAP支持的存儲
SQL Server、MySQL、PostgreSql、MongoDB、In-Memory Storage。
CAP 支持以下幾種運輸方式
RabbitMQ、Kafka、Azure Service Bus、Amazon SQS、NATS、In-Memory Queue、redis Streams、Apache Pulsar。
怎么選擇運輸器
項目源碼
快速開始
安裝DotNetCore.CAP Nuget包
CAP 支持主流的消息隊列作為傳輸器:
- 我本地安裝的是DotNetCore.CAP.RabbitMQ。
//你可以按需選擇下面的包進(jìn)行安裝:
PM> Install-Package DotNetCore.CAP.Kafka
PM> Install-Package DotNetCore.CAP.RabbitMQ
PM> Install-Package DotNetCore.CAP.AzureServiceBus
PM> Install-Package DotNetCore.CAP.AmazonSQS
PM> Install-Package DotNetCore.CAP.NATS
PM> Install-Package DotNetCore.CAP.RedisStreams
PM> Install-Package DotNetCore.CAP.Pulsar
圖片
CAP 提供了主流數(shù)據(jù)庫作為存儲:
- 我本地安裝的是DotNetCore.CAP.MongoDB。
// 按需選擇安裝你正在使用的數(shù)據(jù)庫:
PM> Install-Package DotNetCore.CAP.SqlServer
PM> Install-Package DotNetCore.CAP.MySql
PM> Install-Package DotNetCore.CAP.PostgreSql
PM> Install-Package DotNetCore.CAP.MongoDB
配置CAP到 Program.cs 文件中,如下:
builder.Services.AddCap(x =>
{
//如果你使用的 EF 進(jìn)行數(shù)據(jù)操作,你需要添加如下配置:
//配置數(shù)據(jù)庫上下文
x.UseEntityFramework<AppDbContext>();
//如果你使用的 MongoDB,你可以添加如下配置:
x.UseMongoDB("ConnectionStrings"); //注意,僅支持MongoDB 4.0+集群
//CAP RabbitMQ 配置
x.UseRabbitMQ(rab => {
rab.HostName = "192.0.1.1";
rab.Password = "123456";
rab.Port = 5672;
rab.UserName = "123456";
});
});
發(fā)布
在 Controller 中注入 ICapPublisher 然后使用 ICapPublisher 進(jìn)行消息發(fā)送。
public class PublishController : Controller
{
private readonly ICapPublisher _capBus;
public PublishController(ICapPublisher capPublisher)
{
_capBus = capPublisher;
}
//不使用事務(wù)
[Route("~/without/transaction")]
public IActionResult WithoutTransaction()
{
_capBus.Publish("xxx.services.show.time", DateTime.Now);
// Publish delay message
_capBus.PublishDelayAsync(TimeSpan.FromSeconds(delaySeconds), "xxx.services.show.time", DateTime.Now);
return Ok();
}
//Ado.Net 中使用事務(wù),自動提交
[Route("~/adonet/transaction")]
public IActionResult AdonetWithTransaction()
{
using (var connection = new MySqlConnection(ConnectionString))
{
using (var transaction = connection.BeginTransaction(_capBus, autoCommit: true))
{
//業(yè)務(wù)代碼
_capBus.Publish("xxx.services.show.time", DateTime.Now);
}
}
return Ok();
}
//EntityFramework 中使用事務(wù),自動提交
[Route("~/ef/transaction")]
public IActionResult EntityFrameworkWithTransaction([FromServices]AppDbContext dbContext)
{
using (var trans = dbContext.Database.BeginTransaction(_capBus, autoCommit: true))
{
//業(yè)務(wù)代碼
_capBus.Publish("xxx.services.show.time", DateTime.Now);
}
return Ok();
}
}
訂閱
Action Method
在 Action 上添加 CapSubscribeAttribute 來訂閱相關(guān)消息。
public class PublishController : Controller
{
[CapSubscribe("xxx.services.show.time")]
public void CheckReceivedMessage(DateTime datetime)
{
Console.WriteLine(datetime);
}
}
Service Method
如果你的訂閱方法沒有位于 Controller 中,則你訂閱的類需要繼承 ICapSubscribe:
namespace xxx.Service
{
public interface ISubscriberService
{
void CheckReceivedMessage(DateTime datetime);
}
public class SubscriberService: ISubscriberService, ICapSubscribe
{
[CapSubscribe("xxx.services.show.time")]
public void CheckReceivedMessage(DateTime datetime)
{
}
}
}
項目源碼地址
更多項目實用功能和特性歡迎前往項目開源地址查看