日日操夜夜添-日日操影院-日日草夜夜操-日日干干-精品一区二区三区波多野结衣-精品一区二区三区高清免费不卡

公告:魔扣目錄網為廣大站長提供免費收錄網站服務,提交前請做好本站友鏈:【 網站目錄:http://www.ylptlb.cn 】, 免友鏈快審服務(50元/站),

點擊這里在線咨詢客服
新站提交
  • 網站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會員:747

跟物聯網設備常用的通信協議有TCP,MQTT.今天我們介紹的是TCP連接,TCP連接程序的組件有Supersocket,do.NETty.Supersocket相信搞過.net的朋友應該都知道,dotnetty是有微軟Azure從JAVA平臺下移植過來的一個高性能、異步事件驅動的 NIO 框架,Kafka和RocketMQ等消息中間件、ElasticSearch開源搜索引擎、大數據處理Hadoop的RPC框架Avro、分布式通信框架Dubbo,都使用了Netty,Netty的資料很多,有興趣的可以搜一下,dotnetty和Netty語法基本一致,所以資料可以互相參考,今天我們介紹的是.net下supersocket的使用以及源碼分析

SuperSocket 可以和 ASP.NET Core 網站一起同時運行。你需要做的是將 SuperSocket 注冊到 ASP.NET Core 網站的host builder中去, 同時將服務器的選項放到配置文件中或者通過代碼定義。

  //don't forget the usings
    using SuperSocket;
    using SuperSocket.ProtoBase;

    public static IHostBuilder CreateHostBuilder(string[] args) =>
        Host.CreateDefaultBuilder(args)
            .ConfigureWebHostDefaults(webBuilder =>
            {
                webBuilder.UseStartup<Startup>();
            })
            .AsSuperSocketHostBuilder<TextPackageInfo, LinePipelineFilter>()
            .UsePackageHandler(async (s, p) =>
            {
                // echo message back to client
                await s.SendAsync(Encoding.UTF8.GetBytes(p.Text + "rn"));
            });

同時將服務器的配置選項放到配置文件 "Appsettings.json" 中去:

    {
        "Logging": {
            "LogLevel": {
            "Default": "Information",
            "Microsoft": "Warning",
            "Microsoft.Hosting.Lifetime": "Information"
            }
        },
        "serverOptions": {
            "name": "TestServer",
            "listeners": [
                {
                    "ip": "Any",
                    "port": 4040
                }
            ]
        },
        "AllowedHosts": "*"
    }

上面代碼就可以實現supersocket和asp.net core (.net core3,.net5,.net6)的集成,可以支持最新的.NET6,當然DotNetty也支持.Net 6

今天我們分析源碼的版本是supersocket V2.0的版本,跟之前.net freamwork下有很大的區別

SuperSocket 請求處理模型示意圖

 

SuperSocket 請求處理模型示意圖

 

源碼主線流程分析

 

主流程圖

1分析的入口就從SuperSocketService這個類開始,他集成自IHostedService, IServer,因此這個類的StartAsync就是執行的入口

 async Task<bool> IServer.StartAsync()
        {
            await StartAsync(CancellationToken.None);
            return true;
        }

 

2.通過channelCreatorFactory創建監聽器,監聽代碼如下

 

  public bool Start()
        {
            var options = Options;

            try
            {
                if (options.Security != SslProtocols.None && options.CertificateOptions != null)
                {
                    options.CertificateOptions.EnsureCertificate();
                }

                var listenEndpoint = options.GetListenEndPoint();
                var listenSocket = _listenSocket = new Socket(listenEndpoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
                
                listenSocket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true);
                listenSocket.LingerState = new LingerOption(false, 0);

                if (options.NoDelay)
                    listenSocket.NoDelay = true;
                
                listenSocket.Bind(listenEndpoint);
                listenSocket.Listen(options.BackLog);

                IsRunning = true;

                _cancellationTokenSource = new CancellationTokenSource();

                KeepAccept(listenSocket).DoNotAwait();
                return true;
            }
            catch (Exception e)
            {
                _logger.LogError(e, $"The listener[{this.ToString()}] failed to start.");
                return false;
            }
        }

3.開啟異步接收accept線程

 

        private async Task KeepAccept(Socket listenSocket)
        {
            while (!_cancellationTokenSource.IsCancellationRequested)
            {
                try
                {
                    var client = await listenSocket.AcceptAsync().ConfigureAwait(false);
                    OnNewClientAccept(client);
                }
                catch (Exception e)
                {
                    if (e is ObjectDisposedException || e is NullReferenceException)
                        break;
                    
                    if (e is SocketException se)
                    {
                        var errorCode = se.ErrorCode;

                        //The listen socket was closed
                        if (errorCode == 125 || errorCode == 89 || errorCode == 995 || errorCode == 10004 || errorCode == 10038)
                        {
                            break;
                        }
                    }
                    
                    _logger.LogError(e, $"Listener[{this.ToString()}] failed to do AcceptAsync");
                    continue;
                }
            }

            _stopTaskCompletionSource.TrySetResult(true);
        }

4 當有連接進來后,執行執行注冊的事件

listener.NewClientAccepted += OnNewClientAccept;

 

5.accepted后就創建新的Channel,session

 

     private void AcceptNewChannel(IChannel channel)
        {
            var session = _sessionFactory.Create() as AppSession;
            HandleSession(session, channel).DoNotAwait();
        }

6 channel.Start(),

  public override void Start()
        {
            _readsTask = ProcessReads();
            _sendsTask = ProcessSends();
            WaitHandleClosing();
        }

 

7.將從內核里讀的socket數據異步寫到 Pipe中

           protected virtual async Task ProcessReads()
        {
            var pipe = In;

            Task writing = FillPipeAsync(pipe.Writer);
            Task reading = ReadPipeAsync(pipe.Reader);

            await Task.WhenAll(reading, writing);
        }

8當Pipe有數據寫入后,通知Pipe讀線程去解析數據,這里通知用的方法是
ManualResetValueTaskSourceCore,寫線程寫入數據后執行 _taskSourceCore.SetResult(target);

就會觸發讀線程去讀,讀的時候會根據你設置的協議模版去解析,這個過程會去處理粘包和拆包的過程,

因為Pipe是可以定向的從流中取部分數據的

內置的協議模板如下;

TerminatorReceiveFilter (SuperSocket.SocketBase.Protocol.TerminatorReceiveFilter, SuperSocket.SocketBase)

CountSpliterReceiveFilter (SuperSocket.Facility.Protocol.CountSpliterReceiveFilter, SuperSocket.Facility)

FixedSizeReceiveFilter (SuperSocket.Facility.Protocol.FixedSizeReceiveFilter, SuperSocket.Facility)

BeginEndMarkReceiveFilter (SuperSocket.Facility.Protocol.BeginEndMarkReceiveFilter, SuperSocket.Facility)

FixedHeaderReceiveFilter (SuperSocket.Facility.Protocol.FixedHeaderReceiveFilter, SuperSocket.Facility)

 

9 讀取到的數據解析成packageInfo 后繼續往下執行

      await foreach (var p in packageChannel.RunAsync())
                {
                    if(_packageHandlingContextAccessor!=null)
                    {
                        _packageHandlingContextAccessor.PackageHandlingContext = new PackageHandlingContext<IAppSession, TReceivePackageInfo>(session, p);
                    }
                    await packageHandlingScheduler.HandlePackage(session, p);
                }

10再執行到我們定義的command即可

        ValueTask IPackageHandler<TNetPackageInfo>.Handle(IAppSession session, TNetPackageInfo package)
        {
            return HandlePackage(session, PackageMapper.Map(package));
        }

11 執行我們預制的command代碼

 

    [Command("add")]
    public class ADD : IAsyncCommand<StringPackageInfo>
    {
        public async ValueTask ExecuteAsync(IAppSession session, StringPackageInfo package)
        {
            var result = package.Parameters
                .Select(p => int.Parse(p))
                .Sum();

            await session.SendAsync(Encoding.UTF8.GetBytes(result.ToString() + "rn"));
        }
    }

分享到:
標簽:asp net
用戶無頭像

網友整理

注冊時間:

網站:5 個   小程序:0 個  文章:12 篇

  • 51998

    網站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會員

趕快注冊賬號,推廣您的網站吧!
最新入駐小程序

數獨大挑戰2018-06-03

數獨一種數學游戲,玩家需要根據9

答題星2018-06-03

您可以通過答題星輕松地創建試卷

全階人生考試2018-06-03

各種考試題,題庫,初中,高中,大學四六

運動步數有氧達人2018-06-03

記錄運動步數,積累氧氣值。還可偷

每日養生app2018-06-03

每日養生,天天健康

體育訓練成績評定2018-06-03

通用課目體育訓練成績評定