一個網站就是一個應用,當系統壓力較大時,只能橫向擴展,增加多個服務器或者多個容器去做負載均衡,避免單點故障而影響到整個系統。
集中式最明顯的優點就是開發,測試,運維會比較方便,不用考慮復雜的分布式環境。
弊端也很明顯,系統大而復雜、不易擴展、難于維護,每次更新都必須更新所有的應用。
集中式系統拓撲圖
鑒于集中式系統的種種弊端,促成了分布式系統的形成,分布式系統背后是由一系列的計算機組成,但用戶感知不到背后的邏輯,就像訪問單個計算機一樣,天然的避免了單機故障的問題。
應用可以按業務類型拆分成多個應用或服務,再按結構分成接口層、服務層。
我們也可以按訪問入口分,如移動端、PC 端等定義不同的接口應用。數據庫可以按業務類型拆分成多個實例,還可以對單表進行分庫分表。同時增加分布式緩存、消息隊列、非關系型數據庫、搜索等中間件。
分布式系統雖好,但是增加了系統的復雜性,如分布式事務、分布式鎖、分布式 Session、數據一致性等都是現在分布式系統中需要解決的難題。
分布式系統也增加了開發測試運維的成本,工作量增加,其管理不好反而會變成一種負擔。
分布式系統拓撲圖
分布式系統最為核心的要屬分布式服務框架,有了分布式服務框架,我們只需關注各自的業務,而無需去關注那些復雜的服務之間調用的過程。
分布式服務框架
目前業界比較流行的分布式服務框架有:阿里的 Dubbo、Spring Cloud。
這里不對這些分布式服務框架做對比,簡單的說說他們都做了些什么,能使我們用遠程服務就像調用本地服務那么簡單高效。
服務
服務是對使用用戶有功能輸出的模塊,以技術框架作為基礎,能實現用戶的需求。
比如日志記錄服務、權限管理服務、后臺服務、配置服務、緩存服務、存儲服務、消息服務等,這些服務可以靈活的組合在一起,也可以獨立運行。
服務需要有接口,與系統進行對接。面向服務的開發,應該是把服務拆分開發,把服務組合運行。
更加直接的例子如:歷史詳情、留言板、評論、評級服務等。他們之間能獨立運行,也要能組合在一起作為一個整體。
注冊中心
注冊中心對整個分布式系統起著最為核心的整合作用,支持對等集群,需要提供 CRUD 接口,支持訂閱發布機制且可靠性要求非常之高,一般拿 Zookeeper 集群來做為注冊中心。
分布式環境中服務提供方的服務會在多臺服務器上部署,每臺服務器會向注冊中心提供服務方標識、服務列表、地址、對應端口、序列化協議等信息。
注冊中心記錄下服務和服務地址的映射關系,一般一個服務會對應多個地址,這個過程我們稱之為服務發布或服務注冊。
服務調用方會根據服務方標識、服務列表從注冊中心獲取所需服務的信息(地址端口信息、序列化協議等),這些信息會緩存至本地。
當服務需要調用其他服務時,直接在這里找到服務的地址,進行調用,這個過程我們稱之為服務發現。
注冊中心
下面是以 Zookeeper 作為注冊中心的簡單實現:
/***創建node節點*@paramnode*@paramdata*/publicbooleancreateNode(Stringnode,Stringdata){try{byte[]bytes=data.getBytes();//同步創建臨時順序節點Stringpath=zk.create(ZkConstant.ZK_RPC_DATA_PATH+"/"+node+"-",bytes,ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);log.info("createzookeepernode({}=>{})",path,data);}catch(KeeperExceptione){log.error("",e);returnfalse;}catch(InterruptedExceptionex){log.error("",ex);returnfalse;}returntrue;}
子節點 1
子節點 2
如下面 Zookeeper 中寫入的臨時順序節點信息:
- com.black.blackrpc.test.Helloword:發布服務時對外的名稱。
- 00000000010,00000000011:ZK 順序節點 ID。
- 127.0.0.1:8888,127.0.0.1:8889:服務地址端口。
- Protostuff:序列化方式。
- 1.0:權值,負載均衡策略使用。
這里使用的是 Zookeeper 的臨時順序節點,為什么使用臨時順序節點,主要是考慮以下兩點:
- 當服務提供者異常下線時,與 Zookeeper 的連接會中斷,Zookeeper 服務器會主動刪除臨時節點,同步給服務消費者。這樣就能避免服務消費者去請求異常的服務器。校稿注: 一般消費方也會在實際發起請求前,對當前獲取到的服務提供方節點進行心跳,避免請求連接有問題的節點。
- Zookeeper 下面是不允許創建 2 個名稱相同的 ZK 子節點的,通過順序節點就能避免創建相同的名稱。當然也可以不用順序節點的方式,直接以 com.black.blackrpc.test.HelloWord 創建節點,在該節點下創建數據節點。
下面是 ZK 的數據同步過程:
/***同步節點(通知模式)*syncNodes會通過級聯方式,在每次watcher被觸發后,就會再掛上新的watcher。完成了類似鏈式觸發的功能*/publicbooleansyncNodes(){try{List<String>nodeList=zk.getChildren(ZkConstant.ZK_RPC_DATA_PATH,newWatcher(){@Overridepublicvoidprocess(WatchedEventevent){if(event.getType()==Event.EventType.NodeChildrenChanged){syncNodes();}}});Map<String,List<String>>map=newHashMap<String,List<String>>();for(Stringnode:nodeList){byte[]bytes=zk.getData(ZkConstant.ZK_RPC_DATA_PATH+"/"+node,false,null);Stringkey=node.substring(0,node.lastIndexOf(ZkConstant.DELIMITED_MARKER));Stringvalue=newString(bytes);Objectobject=map.get(key);if(object!=null){((List<String>)object).add(value);}else{List<String>dataList=newArrayList<String>();dataList.add(value);map.put(key,dataList);}log.info("node:[{}]data:[{}]",node,newString(bytes));}/**修改連接的地址緩存*/if(MapUtil.isNotEmpty(map)){log.debug("invokingservicecacheupdateing....");InvokingServiceCache.updataInvokingServiceMap(map);}returntrue;}catch(KeeperException|InterruptedExceptione){log.error(e.toString());returnfalse;}}
當數據同步到本地時,一般會寫入到本地文件中,防止因 Zookeeper 集群異常下線而無法獲取服務提供者信息。
通訊與協議
服務消費者無論是與注冊中心還是與服務提供者,都需要存在網絡連接傳輸數據,而這就涉及到通訊。
筆者之前也做過這方面的工作,當時使用的是 JAVA BIO 簡單的寫了一個通訊包,使用場景沒有多大的并發,阻塞式的 BIO 也未暴露太多問題。
java BIO 因其建立連接之后會阻塞線程等待數據,這種方式必須以一連接一線程的方式,即客戶端有連接請求時服務器端就需要啟動一個線程進行處理。當連接數過大時,會建立相當多的線程,性能直線下降。
Java NIO:同步非阻塞,服務器實現模式為一個請求一個線程,即客戶端發送的連接請求都會注冊到多路復用器上,多路復用器輪詢到連接有 I/O 請求時才啟動一個線程進行處理。
Java AIO:異步非阻塞,服務器實現模式為一個有效請求一個線程,客戶端的 I/O 請求都是由 OS 先完成了再通知服務器應用去啟動線程進行處理。
BIO、NIO、AIO 適用場景分析:
- BIO:用于連接數目比較小且固定的架構,這種方式對服務器資源要求比較高,并發局限于應用中,但程序直觀簡單易理解。
- NIO:適用于連接數目多且連接比較短(輕操作)的架構,比如聊天服務器,并發局限于應用中,編程比較復雜。目前主流的通訊框架 Netty、Apache Mina、Grizzl、NIO Framework 都是基于其實現的。
- AIO:用于連接數目多且連接比較長(重操作)的架構,比如圖片服務器,文件傳輸等,充分調用 OS 參與并發操作,編程比較復雜。
作為基石的通訊,其實要考慮很多東西。如:丟包粘包的情況,心跳機制,斷連重連,消息緩存重發,資源的優雅釋放,長連接還是短連接等。
下面是 Netty 建立服務端,客戶端的簡單實現:
importio.netty.bootstrap.ServerBootstrap;importio.netty.channel.ChannelInitializer;importio.netty.channel.ChannelPipeline;importio.netty.channel.EventLoopGroup;importio.netty.channel.nio.NioEventLoopGroup;importio.netty.channel.socket.SocketChannel;importio.netty.channel.socket.nio.NIOServerSocketChannel;importio.netty.handler.codec.LengthFieldBasedFrameDecoder;importio.netty.handler.codec.LengthFieldPrepender;importio.netty.handler.codec.bytes.ByteArrayDecoder;importio.netty.handler.codec.bytes.ByteArrayEncoder;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;/****nettytcp服務端*@authorv_wangshiyu**/publicclassNettyTcpService{privatestaticfinalLoggerlog=LoggerFactory.getLogger(NettyTcpService.class);privateStringhost;privateintport;publicNettyTcpService(Stringaddress)throwsException{Stringstr[]=address.split(":");this.host=str[0];this.port=Integer.valueOf(str[1]);}publicNettyTcpService(Stringhost,intport)throwsException{this.host=host;this.port=port;}/**用于分配處理業務線程的線程組個數*/privatestaticfinalintBIZGROUPSIZE=Runtime.getRuntime().availableProcessors()*2;//默認/**業務出現線程大小*/privatestaticfinalintBIZTHREADSIZE=4;/**NioEventLoopGroup實際上就是個線程,*NioEventLoopGroup在后臺啟動了n個NioEventLoop來處理Channel事件,*每一個NioEventLoop負責處理m個Channel,*NioEventLoopGroup從NioEventLoop數組里挨個取出NioEventLoop來處理Channel*/privatestaticfinalEventLoopGroupbossGroup=newNioEventLoopGroup(BIZGROUPSIZE);privatestaticfinalEventLoopGroupworkerGroup=newNioEventLoopGroup(BIZTHREADSIZE);publicvoidstart()throwsException{log.info("NettyTcpServiceRun...");ServerBootstrapb=newServerBootstrap();b.group(bossGroup,workerGroup);b.channel(NioServerSocketChannel.class);b.childHandler(newChannelInitializer<SocketChannel>(){@OverridepublicvoidinitChannel(SocketChannelch)throwsException{ChannelPipelinepipeline=ch.pipeline();pipeline.addLast("frameDecoder",newLengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4,0,4));pipeline.addLast("frameEncoder",newLengthFieldPrepender(4));pipeline.addLast("decoder",newByteArrayDecoder());pipeline.addLast("encoder",newByteArrayEncoder());//pipeline.addLast(newEncoder());//pipeline.addLast(newDecoder());pipeline.addLast(newTcpServerHandler());}});b.bind(host,port).sync();log.info("NettyTcpServiceSuccess!");}/***停止服務并釋放資源*/publicvoidshutdown(){workerGroup.shutdownGracefully();bossGroup.shutdownGracefully();}}importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importio.netty.channel.ChannelHandlerContext;importio.netty.channel.SimpleChannelInboundHandler;/***服務端處理器*/publicclassTcpServerHandlerextendsSimpleChannelInboundHandler<Object>{privatestaticfinalLoggerlog=LoggerFactory.getLogger(TcpServerHandler.class);@OverrideprotectedvoidchannelRead0(ChannelHandlerContextctx,Objectmsg)throwsException{byte[]data=(byte[])msg;}}
importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importio.netty.bootstrap.Bootstrap;importio.netty.channel.Channel;importio.netty.channel.ChannelInitializer;importio.netty.channel.ChannelOption;importio.netty.channel.ChannelPipeline;importio.netty.channel.EventLoopGroup;importio.netty.channel.nio.NioEventLoopGroup;importio.netty.channel.socket.nio.NioSocketChannel;importio.netty.handler.codec.LengthFieldBasedFrameDecoder;importio.netty.handler.codec.LengthFieldPrepender;importio.netty.handler.codec.bytes.ByteArrayDecoder;importio.netty.handler.codec.bytes.ByteArrayEncoder;importio.netty.util.concurrent.Future;/***nettytcp客戶端*@authorv_wangshiyu**/publicclassNettyTcpClient{privatestaticfinalLoggerlog=LoggerFactory.getLogger(NettyTcpClient.class);privateStringhost;privateintport;privateBootstrapbootstrap;privateChannelchannel;privateEventLoopGroupgroup;publicNettyTcpClient(Stringhost,intport){bootstrap=getBootstrap();channel=getChannel(host,port);this.host=host;this.port=port;}publicStringgetHost(){returnhost;}publicintgetPort(){returnport;}/***初始化Bootstrap*@return*/publicfinalBootstrapgetBootstrap(){group=newNioEventLoopGroup();Bootstrapb=newBootstrap();b.group(group).channel(NioSocketChannel.class);b.handler(newChannelInitializer<Channel>(){@OverrideprotectedvoidinitChannel(Channelch)throwsException{ChannelPipelinepipeline=ch.pipeline();//pipeline.addLast(newEncoder());//pipeline.addLast(newDecoder());pipeline.addLast("frameDecoder",newLengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4,0,4));pipeline.addLast("frameEncoder",newLengthFieldPrepender(4));pipeline.addLast("decoder",newByteArrayDecoder());pipeline.addLast("encoder",newByteArrayEncoder());pipeline.addLast("handler",newTcpClientHandler());}});b.option(ChannelOption.SO_KEEPALIVE,true);returnb;}/***連接,獲取Channel*@paramhost*@paramport*@return*/publicfinalChannelgetChannel(Stringhost,intport){Channelchannel=null;try{channel=bootstrap.connect(host,port).sync().channel();returnchannel;}catch(Exceptione){log.info(String.format("connectServer(IP[%s],PORT[%s])fail!",host,port));returnnull;}}/***發送消息*@parammsg*@throwsException*/publicbooleansendMsg(Objectmsg)throwsException{if(channel!=null){channel.writeAndFlush(msg).sync();log.debug("msgflushsuccess");returntrue;}else{log.debug("msgflushfail,connectisnull");returnfalse;}}/***連接斷開*并且釋放資源*@return*/publicbooleandisconnectConnect(){//channel.close().awaitUninterruptibly();Future<?>future=group.shutdownGracefully();//shutdownGracefully釋放所有資源,并且關閉所有當前正在使用的channelfuture.syncUninterruptibly();returntrue;}}importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importio.netty.channel.ChannelHandlerContext;importio.netty.channel.SimpleChannelInboundHandler;/***客戶端處理器*/publicclassTcpClientHandlerextendsSimpleChannelInboundHandler<Object>{privatestaticfinalLoggerlog=LoggerFactory.getLogger(TcpClientHandler.class);@OverrideprotectedvoidchannelRead0(ChannelHandlerContextctx,Objectmsg)throwsException{byte[]data=(byte[])msg;}}
說到通訊就不能不說協議,通信時所遵守的規則,訪問什么,傳輸的格式等都屬于協議。
作為一個開發人員,應該都了解 TCP/IP 協議,它是一個網絡通信模型,以及一整套網絡傳輸協議家族,是互聯網的基礎通信架構。
也都應該用過 Http(超文本傳輸協議),Web 服務器傳輸超文本到本地瀏覽器的傳送協議,該協議建立在 TCP/IP 協議之上。分布式服務框架服務間的調用也會規定協議。
為了支持不同場景,分布式服務框架會存在多種協議,如 Dubbo 就支持 7 種協議:Dubbo 協議(默認),RMI 協議,Hessian協議,HTTP 協議,WebService 協議,Thrift 協議,Memcached 協議,redis 協議每種協議應對的場景不盡相同,具體場景具體對待。
服務路由
分布式服務上線時都是集群組網部署,集群中會存在某個服務的多實例,消費者如何從服務列表中選擇合適的服務提供者進行調用,這就涉及到服務路由。分布式服務框架需要能夠滿足用戶靈活的路由需求。
透明化路由
很多開源的 RPC 框架調用者需要配置服務提供者的地址信息,盡管可以通過讀取數據庫的服務地址列表等方式避免硬編碼地址信息,但是消費者依然要感知服務提供者的地址信息,這違反了透明化路由原則。
基于服務注冊中心的服務訂閱發布,消費者通過主動查詢和被動通知的方式獲取服務提供者的地址信息,而不再需要通過硬編碼方式得到提供者的地址信息。
只需要知道當前系統發布了那些服務,而不需要知道服務具體存在于什么位置,這就是透明化路由。
負載均衡
負載均衡策略是服務的重要屬性,分布式服務框架通常會提供多種負載均衡策略,同時支持用戶擴展負載均衡策略。
隨機
通常在對等集群組網中,采用隨機算法進行負載均衡,隨機路由算法消息分發還是比較均勻的,采用 JDK 提供的 java.util.Random 或者 java.security.SecureRandom 在指定服務提供者列表中生成隨機地址。
消費者基于隨機生成的服務提供者地址進行遠程調用:
/***隨機*/publicclassRandomStrategyimplementsClusterStrategy{@OverridepublicRemoteServiceBaseselect(List<RemoteServiceBase>list){intMAX_LEN=list.size();intindex=RandomUtil.nextInt(MAX_LEN);returnlist.get(index);}}
隨機還是存在缺點的,可能出現部分節點的碰撞的概率較高,另外硬件配置差異較大時,會導致各節點負載不均勻。
為避免這些問題,需要對服務列表加權,性能好的機器接收的請求的概率應該高于一般機器:
/***加權隨機*/publicclassWeightingRandomStrategyimplementsClusterStrategy{@OverridepublicRemoteServiceBaseselect(List<RemoteServiceBase>list){//存放加權后的服務提供者列表List<RemoteServiceBase>weightingList=newArrayList<RemoteServiceBase>();for(RemoteServiceBaseremoteServiceBase:list){//擴大10倍intweight=(int)(remoteServiceBase.getWeight()*10);for(inti=0;i<weight;i++){weightingList.add(remoteServiceBase);}}intMAX_LEN=weightingList.size();intindex=RandomUtil.nextInt(MAX_LEN);returnweightingList.get(index);}}
輪詢
逐個請求服務地址,到達邊界之后,繼續繞接。主要缺點:慢的提供者會累積請求。
例如第二臺機器很慢,但沒掛。當請求第二臺機器時被卡在那。久而久之,所有請求都卡在第二臺機器上。
輪詢策略實現非常簡單,順序循環遍歷服務提供者列表,達到邊界之后重新歸零開始,繼續順序循環:
/***輪詢*/publicclassPollingStrategyimplementsClusterStrategy{//計數器privateintindex=0;privateLocklock=newReentrantLock();@OverridepublicRemoteServiceBaseselect(List<RemoteServiceBase>list){RemoteServiceBaseservice=null;try{lock.tryLock(10,TimeUnit.MILLISECONDS);//若計數大于服務提供者個數,將計數器歸0if(index>=list.size()){index=0;}service=list.get(index);index++;}catch(InterruptedExceptione){e.printStackTrace();}finally{lock.unlock();}//兜底,保證程序健壯性,若未取到服務,則直接取第一個if(service==null){service=list.get(0);}returnservice;}}
加權輪詢的話,需要給服務地址添加權重:
/***加權輪詢*/publicclassWeightingPollingStrategyimplementsClusterStrategy{//計數器privateintindex=0;//計數器鎖privateLocklock=newReentrantLock();@OverridepublicRemoteServiceBaseselect(List<RemoteServiceBase>list){RemoteServiceBaseservice=null;try{lock.tryLock(10,TimeUnit.MILLISECONDS);//存放加權后的服務提供者列表List<RemoteServiceBase>weightingList=newArrayList<RemoteServiceBase>();for(RemoteServiceBaseremoteServiceBase:list){//擴大10倍intweight=(int)(remoteServiceBase.getWeight()*10);for(inti=0;i<weight;i++){weightingList.add(remoteServiceBase);}}//若計數大于服務提供者個數,將計數器歸0if(index>=weightingList.size()){index=0;}service=weightingList.get(index);index++;returnservice;}catch(InterruptedExceptione){e.printStackTrace();}finally{lock.unlock();}//兜底,保證程序健壯性,若未取到服務,則直接取第一個returnlist.get(0);}}
服務調用時延
消費者緩存所有服務提供者的調用時延,周期性的計算服務調用平均時延。
然后計算每個服務提供者服務調用時延與平均時延的差值,根據差值大小動態調整權重,保證服務時延大的服務提供者接收更少的消息,防止消息堆積。
該策略的特點:保證處理能力強的服務接受更多的消息,通過動態的權重分配消除服務調用時延的震蕩范圍,使所有服務的調用時延接近平均值,實現負載均衡。
一致性哈希
相同參數的請求總是發送到統一服務提供者,當某一臺服務提供者宕機時,原本發往根提供者的請求,基于虛擬節點,平攤到其他提供者,不會引起劇烈變動,平臺提供默認的虛擬節點數,可以通過配置文件修改虛擬節點個數。
一致性 Hash 環工作原理如下圖所示:
一致性哈希
路由規則
負載均衡只能保證服務提供者壓力的平衡,但是在一些業務場景中需要設置一些過濾規則,比較常用的是基本表達式的條件路由。
通過 IP 條件表達式配置黑白名單訪問控制:consumerIP != 192.168.1.1。
只暴露部分服務提供者,防止這個集群服務都被沖垮,導致其他服務也不可用。
例如providerIP=192.168.3*。讀寫分離:method=find*,list*,get*,query*=>providerIP=192.168.1.。前后臺分離:App=web=>providerIP=192.168.1.,app=java=>providerIP=192.168.2.。灰度升級:將WEB前臺應用理由到新的服務版本上:app=web=>provicerIP=192.168.1.*。
序列化與反序列化
把對象轉換為字節序列的過程稱為序列化,把字節序列恢復為對象的過程稱為反序列化。
運程調用的時候,我們需要先將 Java 對象進行序列化,然后通過網絡,IO 進行傳輸,當到達目的地之后,再進行反序列化獲取到我們想要的結果對象。
分布式系統中,傳輸的對象會很多,這就要求序列化速度快,產生字節序列小的序列化技術。
序列化技術:Serializable,XML,Jackson,MessagePack,FastJson,Protocol Buffer,Thrift,Gson,Avro,Hessian 等。
Serializable 是 Java 自帶的序列化技術,無法跨平臺,序列化和反序列化的速度相對較慢。
XML 技術多平臺支持好,常用于與銀行交互的報文,但是其字節序列產生較大,不太適合用作分布式通訊框架。
FastJson 是 Java 語言編寫的高性能的 JSON 處理器,由阿里巴巴公司開發,字節序列為 json 串,可讀性好,序列化也速度非常的快。
Protocol Buffer 序列化速度非常快,字節序列較小,但是可讀性較差。
一般分布式服務框架會內置多種序列化協議可供選擇,如 Dubbo 支持的 7 種協議用到的序列化技術就不完全相同。
服務調用
本地環境下,使用某個接口很簡單,直接調用就行。分布式環境下就不是那么簡單了,消費者方只會存在接口的定義,沒有具體的實現。
想要像本地環境下直接調用遠程接口那就得耗費一些功夫了,需要用到遠程代理。
下面是我盜的圖:
遠程代理
通信時序如下:
通信時序
消費者端沒有具體的實現,需要調用接口時動態的去創建一個代理類。與 Spirng 集成的情況,那直接在 Bean 構建的時候注入代理類。
下面是構建代理類:
importjava.lang.reflect.Proxy;publicclassJdkProxy{publicstaticObjectgetInstance(Class<?>cls){JdkMethodProxyinvocationHandler=newJdkMethodProxy();ObjectnewProxyInstance=Proxy.newProxyInstance(cls.getClassLoader(),newClass[]{cls},invocationHandler);return(Object)newProxyInstance;}}
importjava.lang.reflect.InvocationHandler;importjava.lang.reflect.Method;publicclassJdkMethodProxyimplementsInvocationHandler{@OverridepublicObjectinvoke(Objectproxy,Methodmethod,Object[]parameters)throwsThrowable{//如果傳進來是一個已實現的具體類if(Object.class.equals(method.getDeclaringClass())){try{returnmethod.invoke(this,parameters);}catch(Throwablet){t.printStackTrace();}//如果傳進來的是一個接口}else{//實現接口的核心方法//returnRemoteInvoking.invoking(serviceName,serializationType,//timeOut,loadBalanceStrategy,method,parameters);}returnnull;}}
代理會做很多事情,對請求服務的名稱及參數信息的序列化、通過路由選擇最為合適服務提供者、建立通訊連接發送請求信息(或者直接發起 Http 請求)、最后返回獲取到的結果。
當然這里面需要考慮很多問題,如調用超時,請求異常,通訊連接的緩存,同步服務調用還是異步服務調用等等。
同步服務調用:客戶端發起遠程服務調用請求,用戶線程完成消息序列化之后,將消息投遞到通信框架,然后同步阻塞,等待通信線程發送請求并接收到應答之后,喚醒同步等待的用戶線程,用戶線程獲取到應答之后返回。
異步服務調用:基于 Java 的 Future 機制,客戶端發起遠程服務調用請求,該請求會被標上 RequestId,同時建立一個與 RequestId 對應的 Future,客戶端通過 Future 的 Get 方法獲取結果時會被阻塞。
服務端收到請求應達會回傳 RequestId,通過 RequestId 去解除對應 Future 的阻塞,同時 Set 對應結果,最后客戶端獲取到結果。
構建 Future,以 RequestId 為 Key,Put 到線程安全的 Map 中。Get 結果時需要寫入 Time Out 超時時間,防止由于結果的未返回而導致的長時間的阻塞。
SyncFuture<RpcResponse>syncFuture=newSyncFuture<RpcResponse>();SyncFutureCatch.syncFutureMap.put(rpcRequest.getRequestId(),syncFuture);try{RpcResponserpcResponse=syncFuture.get(timeOut,TimeUnit.MILLISECONDS);returnrpcResponse.getResult();}catch(Exceptione){throwe;}finally{SyncFutureCatch.syncFutureMap.remove(rpcRequest.getRequestId());}
結果返回時通過回傳的 RequestId 獲取對應 Future 寫入 Response,Future 線程解除阻塞:
log.debug("TcpClientreceivehead:"+headAnalysis+"TcpClientreceivedata:"+rpcResponse);SyncFuture<RpcResponse>syncFuture=SyncFutureCatch.syncFutureMap.get(rpcResponse.getRequestId());if(syncFuture!=null){syncFuture.setResponse(rpcResponse);}
importjava.util.concurrent.CountDownLatch;importjava.util.concurrent.Future;importjava.util.concurrent.TimeUnit;publicclassSyncFuture<T>implementsFuture<T>{//因為請求和響應是一一對應的,因此初始化CountDownLatch值為1。privateCountDownLatchlatch=newCountDownLatch(1);//需要響應線程設置的響應結果privateTresponse;//Futrue的請求時間,用于計算Future是否超時privatelongbeginTime=System.currentTimeMillis();publicSyncFuture(){}@Overridepublicbooleancancel(booleanmayInterruptIfRunning){returnfalse;}@OverridepublicbooleanisCancelled(){returnfalse;}@OverridepublicbooleanisDone(){if(response!=null){returntrue;}returnfalse;}//獲取響應結果,直到有結果才返回。@OverridepublicTget()throwsInterruptedException{latch.await();returnthis.response;}//獲取響應結果,直到有結果或者超過指定時間就返回。@OverridepublicTget(longtimeOut,TimeUnitunit)throwsInterruptedException{if(latch.await(timeOut,unit)){returnthis.response;}returnnull;}//用于設置響應結果,并且做countDown操作,通知請求線程publicvoidsetResponse(Tresponse){this.response=response;latch.countDown();}publiclonggetBeginTime(){returnbeginTime;}}
SyncFuture<RpcResponse>syncFuture=newSyncFuture<RpcResponse>();SyncFutureCatch.syncFutureMap.put(rpcRequest.getRequestId(),syncFuture);RpcResponserpcResponse=syncFuture.get(timeOut,TimeUnit.MILLISECONDS);SyncFutureCatch.syncFutureMap.remove(rpcRequest.getRequestId());
除了同步服務調用,異步服務調用,還有并行服務調用,泛化調用等調用形式。
高可用
簡單的介紹了下分布式服務框架,下面來說下分布式系統的高可用。一個系統設計開發出來,三天兩晚就出個大問題,導致無法使用,那這個系統也不是什么好系統。
業界流傳一句話:"我們系統支持 X 個 9 的可靠性"。這個 X 是代表一個數字,X 個 9 表示在系統 1 年時間的使用過程中,系統可以正常使用時間與總時間(1 年)之比。
3 個 9:(1-99.9%)*365*24=8.76 小時,表示該系統在連續運行 1 年時間里最多可能的業務中斷時間是 8.76 小時,4 個 9 即 52.6 分鐘,5 個 9 即 5.26 分鐘。要做到如此高的可靠性,是非常大的挑戰。
一個大型分布式項目可能是由幾十上百個項目構成,涉及到的服務成千上萬,主鏈上的一個流程就需要流轉多個團隊維護的項目。
拿 4 個 9 的可靠性來說,平攤到每個團隊的時間可能不到 10 分鐘。這 10 分鐘內需要頂住壓力,以最快的時間找到并解決問題,恢復系統的可用。
下面說說為了提高系統的可靠性都有哪些方案:
服務檢測:某臺服務器與注冊中心的連接中斷,其提供的服務也無響應時,系統應該能主動去重啟該服務,使其能正常對外提供。
故障隔離:集群環境下,某臺服務器能對外提供服務,但是因為其他原因,請求結果始終異常。
這時就需要主動將該節點從集群環境中剔除,避免繼續對后面的請求造成影響,非高峰時期再嘗試修復該問題。至于機房故障的情況,只能去屏蔽整個機房了。
目前餓了么做的是異地多活,即便單邊機房掛了,流量也可以全量切換至另外一邊機房,保證系統的可用。
監控:包含業務監控、服務異常監控、DB 中間件性能的監控等,系統出現異常的時候能及時的通知到開發人員。等到線下報上來的時候,可能影響已經很大了。
壓測:產線主鏈路的壓測是必不可少的,單靠集成測試,有些高并發的場景是無法覆蓋到的,壓測能暴露平常情況無法出現的問題,也能直觀的提現系統的吞吐能力。當業務激增時,可以考慮直接做系統擴容。
SOP 方案與演練:產線上隨時都可能會發生問題,抱著出現問題時再想辦法解決的態度是肯定不行的,時間根本來不及。
提前做好對應問題的 SOP 方案,能節省大量時間,盡快的恢復系統的正常。當然平常的演練也是不可少的,一旦產線故障可以做到從容不迫的去應對和處理。
除了上述方案外,還可以考慮服務策略的使用:
降級策略:業務高峰期,為了保證核心服務,需要停掉一些不太重要的業務。
如雙十一期間不允許發起退款、只允許查看 3 個月之內的歷史訂單等業務的降級,調用服務接口時,直接返回的空結果或異常等服務的降級,都屬于分布式系統的降級策略。
服務降級是可逆操作,當系統壓力恢復到一定值不需要降級服務時,需要去除降級,將服務狀態恢復正常。
服務降級主要包括屏蔽降級和容錯降級:
- 屏蔽降級:分布式服務框架直接屏蔽對遠程接口的請求,不發起對遠程服務的調用,直接返回空結果、拋出指定異常、執行本地模擬接口實現等方式。
- 容錯降級:非核心服務不可調用時,可以對故障服務做業務放通,保證主流程不受影響。如請求超時、消息解碼異常、系統擁塞保護異常, 服務提供方系統異常等情況。
筆者之前就碰到過因雙方沒有做容錯降級導致的系統故障的情況。午高峰時期,對方調用我們的一個非核心查詢接口,我們系統因為 Bug 問題一直異常,導致對方調用這個接口的頁面異常而無法跳轉到主流程頁面,影響了產線的生產。當時對方緊急發版才使系統恢復正常。
限流策略:說到限流,最先想到的就是秒殺活動了,一場秒殺活動的流量可能是正常流量的幾百至幾千倍,如此高的流量系統根本無法處理,只能通過限流來避免系統的崩潰。
服務的限流本質和秒殺活動的限流是一樣的,都是限制請求的流入,防止服務提供方因大量的請求而崩潰。
限流算法:令牌桶、漏桶、計數器算法。上述算法適合單機的限流,但涉及到整個集群的限流時,得考慮使用緩存中間件了。
例如:某個服務 1 分鐘內只允許請求 2 次,或者一天只允許使用 1000 次。
由于負載均衡存在,可能集群內每臺機器都會收到請求,這種時候就需要緩存來記錄調用方某段時間內的請求次數,再做限流處理。Redis 就很適合做此事。
熔斷策略:熔斷本質上是一種過載保護機制,這一概念來源于電子工程中的斷路器,當電流過大時,保險絲會熔斷,從而保護整個電路。
同樣在分布式系統中,當被調用的遠程服務無法使用時,如果沒有過載保護,就會導致請求的資源阻塞在遠程服務器上耗盡資源。
很多時候,剛開始可能只是出現了局部小規模的故障,然而由于種種原因,故障影響范圍越來越大,最終導致全局性的后果。
當下游服務因訪問壓力過大而響應變慢或失敗,上游服務為了保護自己以及系統整體的可用性,可以暫時切斷對下游服務的調用。
熔斷器的設計思路:
- Closed:初始狀態,熔斷器關閉,正常提供服務。
- Open: 失敗次數,失敗百分比達到一定的閾值之后,熔斷器打開,停止訪問服務。
- Half-Open:熔斷一定時間之后,小流量嘗試調用服務,如果成功則恢復,熔斷器變為 Closed 狀態。
數據一致性
一個系統設計開發出來,必須保證其運行的數據準確和一致性。拿支付系統來說:用戶銀行卡已經扣款成功,系統里卻顯示失敗,沒有給用戶的虛擬帳戶充值上,這會引起客訴。
說的再嚴重點,用戶發起提現,資金已經轉到其銀行賬戶,系統卻沒扣除對應虛擬帳號的余額,直接導致資金損失了。如果這時候用戶一直發起提現,那就酸爽了。
CAP 原則
說到數據一致性,就不得不說到 CAP 原則。CAP 原則中指出任何一個分布式系統中,Consistency(一致性 C)、 Availability(可用性 A)、Partition tolerance(分區容錯性 P),三者不可兼得。
傳統單機數據庫基于 ACID 特性(原子性(Atomicity)、一致性(Consistency)、隔離性(Isolation)、持久性(Durability)) ,放棄了分區容錯性,能做到可用性和一致性。
對于一個分布式系統而言,分區容錯性是一個最基本的要求。既然是一個分布式系統,那么分布式系統中的組件必然需要被部署到不同的節點,會出現節點與節點之間的網絡通訊。
而網絡問題又是一定會出現的異常情況,分區容錯性也就成為了一個分布式系統必然需要面對和解決的問題。
系統架構師往往需要把精力花在如何根據業務特點在一致性和可用性之間尋求平衡。
集中式系統,通過數據庫事務的控制,能做到數據的強一致性。但是分布式系統中,涉及多服務間的調用,通過分布式事務的方案:
- 兩階段提交(2PC)
- 三階段提交(3PC)
- 補償事務(TCC)
- ...
雖然能實現數據的強一致,但是都是通過犧牲可用性來實現。
BASE 理論
BASE 理論是對 CAP 原則中一致性和可用性權衡的結果:Basically Available(基本可用)、Soft state(軟狀態)和 Eventually consistent(最終一致性)。
BASE 理論,其來源于對大規模互聯網系統分布式實踐的總結,是基于 CAP 原則逐步演化而來的。
其最核心思想是:即使無法做到強一致性,但每個應用都可以根據自身業務特點,采用適當的方式來使系統達到最終一致性。
基本可用:是指分布式系統在出現不可預知故障的時候,允許損失部分可用性,這不等價于系統不可用。
軟狀態:指允許系統中的數據存在中間狀態,并認為該中間狀態的存在不會影響系統的整體可用性,即允許系統在不同節點的數據副本之間進行數據同步的過程存在延時。
最終一致性:強調的是所有的數據副本,在經過一段時間的同步之后,最終都能夠達到一致的狀態。
因此,最終一致性的本質是需要系統保證最終數據能夠達到一致,而不需要實時保證系統數據的強一致性。
總的來說,BASE 理論面向的是大型高可用可擴展的分布式系統,和傳統的事物 ACID 特性是相反的。
它完全不同于 ACID 的強一致性模型,而是通過犧牲強一致性來獲得可用性,并允許數據在一段時間內是不一致的,但最終達到一致狀態。
同時,在實際的分布式場景中,不同業務單元和組件對數據一致性的要求是不同的,因此在具體的分布式系統架構設計過程中,ACID 特性和 BASE 理論往往又會結合在一起。
結語
分布式系統涉及到的東西還有很多,如:分布式鎖、定時調度、數據分片、性能問題、各種中間件的使用等,筆者分享只是了解到的那一小部分的知識而已。
之前本著學習的目的也寫過一個非常簡單的分布式服務框架 blackRpc,通過它了解了分布式服務框架內部的一些活動。
本文中所有代碼都能在該項目中找到,有興趣讀者可以看看:
https://github.com/wangshiyu/blackRpc