跟物聯網設備常用的通信協議有TCP,MQTT.今天我們介紹的是TCP連接,TCP連接程序的組件有Supersocket,do.NETty.Supersocket相信搞過.net的朋友應該都知道,dotnetty是有微軟Azure從JAVA平臺下移植過來的一個高性能、異步事件驅動的 NIO 框架,Kafka和RocketMQ等消息中間件、ElasticSearch開源搜索引擎、大數據處理Hadoop的RPC框架Avro、分布式通信框架Dubbo,都使用了Netty,Netty的資料很多,有興趣的可以搜一下,dotnetty和Netty語法基本一致,所以資料可以互相參考,今天我們介紹的是.net下supersocket的使用以及源碼分析
SuperSocket 可以和 ASP.NET Core 網站一起同時運行。你需要做的是將 SuperSocket 注冊到 ASP.NET Core 網站的host builder中去, 同時將服務器的選項放到配置文件中或者通過代碼定義。
//don't forget the usings
using SuperSocket;
using SuperSocket.ProtoBase;
public static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.ConfigureWebHostDefaults(webBuilder =>
{
webBuilder.UseStartup<Startup>();
})
.AsSuperSocketHostBuilder<TextPackageInfo, LinePipelineFilter>()
.UsePackageHandler(async (s, p) =>
{
// echo message back to client
await s.SendAsync(Encoding.UTF8.GetBytes(p.Text + "rn"));
});
同時將服務器的配置選項放到配置文件 "Appsettings.json" 中去:
{
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft": "Warning",
"Microsoft.Hosting.Lifetime": "Information"
}
},
"serverOptions": {
"name": "TestServer",
"listeners": [
{
"ip": "Any",
"port": 4040
}
]
},
"AllowedHosts": "*"
}
上面代碼就可以實現supersocket和asp.net core (.net core3,.net5,.net6)的集成,可以支持最新的.NET6,當然DotNetty也支持.Net 6
今天我們分析源碼的版本是supersocket V2.0的版本,跟之前.net freamwork下有很大的區別
SuperSocket 請求處理模型示意圖
SuperSocket 請求處理模型示意圖
源碼主線流程分析
主流程圖
1分析的入口就從SuperSocketService這個類開始,他集成自IHostedService, IServer,因此這個類的StartAsync就是執行的入口
async Task<bool> IServer.StartAsync()
{
await StartAsync(CancellationToken.None);
return true;
}
2.通過channelCreatorFactory創建監聽器,監聽代碼如下
public bool Start()
{
var options = Options;
try
{
if (options.Security != SslProtocols.None && options.CertificateOptions != null)
{
options.CertificateOptions.EnsureCertificate();
}
var listenEndpoint = options.GetListenEndPoint();
var listenSocket = _listenSocket = new Socket(listenEndpoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
listenSocket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true);
listenSocket.LingerState = new LingerOption(false, 0);
if (options.NoDelay)
listenSocket.NoDelay = true;
listenSocket.Bind(listenEndpoint);
listenSocket.Listen(options.BackLog);
IsRunning = true;
_cancellationTokenSource = new CancellationTokenSource();
KeepAccept(listenSocket).DoNotAwait();
return true;
}
catch (Exception e)
{
_logger.LogError(e, $"The listener[{this.ToString()}] failed to start.");
return false;
}
}
3.開啟異步接收accept線程
private async Task KeepAccept(Socket listenSocket)
{
while (!_cancellationTokenSource.IsCancellationRequested)
{
try
{
var client = await listenSocket.AcceptAsync().ConfigureAwait(false);
OnNewClientAccept(client);
}
catch (Exception e)
{
if (e is ObjectDisposedException || e is NullReferenceException)
break;
if (e is SocketException se)
{
var errorCode = se.ErrorCode;
//The listen socket was closed
if (errorCode == 125 || errorCode == 89 || errorCode == 995 || errorCode == 10004 || errorCode == 10038)
{
break;
}
}
_logger.LogError(e, $"Listener[{this.ToString()}] failed to do AcceptAsync");
continue;
}
}
_stopTaskCompletionSource.TrySetResult(true);
}
4 當有連接進來后,執行執行注冊的事件
listener.NewClientAccepted += OnNewClientAccept;
5.accepted后就創建新的Channel,session
private void AcceptNewChannel(IChannel channel)
{
var session = _sessionFactory.Create() as AppSession;
HandleSession(session, channel).DoNotAwait();
}
6 channel.Start(),
public override void Start()
{
_readsTask = ProcessReads();
_sendsTask = ProcessSends();
WaitHandleClosing();
}
7.將從內核里讀的socket數據異步寫到 Pipe中
protected virtual async Task ProcessReads()
{
var pipe = In;
Task writing = FillPipeAsync(pipe.Writer);
Task reading = ReadPipeAsync(pipe.Reader);
await Task.WhenAll(reading, writing);
}
8當Pipe有數據寫入后,通知Pipe讀線程去解析數據,這里通知用的方法是
ManualResetValueTaskSourceCore,寫線程寫入數據后執行 _taskSourceCore.SetResult(target);
就會觸發讀線程去讀,讀的時候會根據你設置的協議模版去解析,這個過程會去處理粘包和拆包的過程,
因為Pipe是可以定向的從流中取部分數據的
內置的協議模板如下;
TerminatorReceiveFilter (SuperSocket.SocketBase.Protocol.TerminatorReceiveFilter, SuperSocket.SocketBase)
CountSpliterReceiveFilter (SuperSocket.Facility.Protocol.CountSpliterReceiveFilter, SuperSocket.Facility)
FixedSizeReceiveFilter (SuperSocket.Facility.Protocol.FixedSizeReceiveFilter, SuperSocket.Facility)
BeginEndMarkReceiveFilter (SuperSocket.Facility.Protocol.BeginEndMarkReceiveFilter, SuperSocket.Facility)
FixedHeaderReceiveFilter (SuperSocket.Facility.Protocol.FixedHeaderReceiveFilter, SuperSocket.Facility)
9 讀取到的數據解析成packageInfo 后繼續往下執行
await foreach (var p in packageChannel.RunAsync())
{
if(_packageHandlingContextAccessor!=null)
{
_packageHandlingContextAccessor.PackageHandlingContext = new PackageHandlingContext<IAppSession, TReceivePackageInfo>(session, p);
}
await packageHandlingScheduler.HandlePackage(session, p);
}
10再執行到我們定義的command即可
ValueTask IPackageHandler<TNetPackageInfo>.Handle(IAppSession session, TNetPackageInfo package)
{
return HandlePackage(session, PackageMapper.Map(package));
}
11 執行我們預制的command代碼
[Command("add")]
public class ADD : IAsyncCommand<StringPackageInfo>
{
public async ValueTask ExecuteAsync(IAppSession session, StringPackageInfo package)
{
var result = package.Parameters
.Select(p => int.Parse(p))
.Sum();
await session.SendAsync(Encoding.UTF8.GetBytes(result.ToString() + "rn"));
}
}