日日操夜夜添-日日操影院-日日草夜夜操-日日干干-精品一区二区三区波多野结衣-精品一区二区三区高清免费不卡

公告:魔扣目錄網為廣大站長提供免費收錄網站服務,提交前請做好本站友鏈:【 網站目錄:http://www.ylptlb.cn 】, 免友鏈快審服務(50元/站),

點擊這里在線咨詢客服
新站提交
  • 網站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會員:747

最近看了下關于分布式限流的部分,看到Sentinel的分布式限流,也就是集群限流的部分,想搭個環境看看,結果發現網上關于這方面的內容基本可以說沒有,你甚至很難跑起來他的demo,就算能跑起來,估計也得自己研究半天,麻煩的要死。

我猜測很重要的原因可能就是Sentinel關于這塊做的并不完善,而且從官方的Issue中能看出來,其實官方對于這塊后續并沒有計劃去做的更好。

那么廢話不多說,在此之前,肯定要先說下關于Sentinel集群限流方面的原理,沒有原理一切都是空中樓閣。

集群限流原理

原理這方面比較好解釋,就是在原本的限流規則中加了一個clusterMode參數,如果是true的話,那么會走集群限流的模式,反之就是單機限流。

如果是集群限流,判斷身份是限流客戶端還是限流服務端,客戶端則和服務端建立通信,所有的限流都通過和服務端的交互來達到效果。

對于Sentinel集群限流,包含兩種模式,內嵌式和獨立式。

內嵌式

什么是內嵌式呢,簡單來說,要限流那么必然要有個服務端去處理多個客戶端的限流請求,對于內嵌式來說呢,就是整個微服務集群內部選擇一臺機器節點作為限流服務端(Sentinel把這個叫做token-server),其他的微服務機器節點作為限流的客戶端(token-client),這樣的做法有缺點也有優點。

3分鐘徹底弄懂Sentinel集群限流探索

 

限流-嵌入式

首先說優點:這種方式部署不需要獨立部署限流服務端節省獨立部署服務端產生的額外服務器開支降低部署和維護復雜度

再說缺點,缺點的話也可以說是整個Sentinel在集群限流這方面做得不夠好的問題。

先說第一個缺點:無自動故障轉移機制

無論是內嵌式還是獨立式的部署方案,都無法做到自動的故障轉移。

所有的server和client都需要事先知道IP的請求下做出配置,如果server掛了,需要手動的修改配置,否則集群限流會退化成單機限流。

比如你的交易服務有3臺機器ABC,其中A被手動設置為server,BC則是作為client,當A服務器宕機之后,需要手動修改BC中一臺作為server,否則整個集群的機器都將退化回單機限流的模式。

但是,如果client掛了,則是不會影響到整個集群限流的,比如B掛了,那么A和C將會繼續組成集群限流。

如果B再次重啟成功,那么又會重新加入到整個集群限流當中來,因為會有一個自動重連的機制,默認的時間是N*2秒,逐漸遞增的一個時間。

這是想用Sentinel做集群限流并且使用內嵌式需要考慮的問題,要自己去實現自動故障轉移的機制,當然,server節點選舉也要自己實現了。

對于這個問題,官方提供了可以修改server/client的API接口,另外一個就是可以基于動態的數據源配置方式,這個我們后面再談。

第二個缺點:適用于單微服務集群內部限流

這個其實也是顯而易見的道理,都內部選舉一臺作為server去限流了,如果還跨多個微服務的話,顯然是不太合理的行為,現實中這種情況肯定也是非常少見的了,當然你非要想跨多個微服務集群也不是不可以,只要你開心就好。

第三個缺點:server節點的機器性能會受到一定程度的影響

這個肯定也比較好理解的,作為server去限流,那么其他的客戶端肯定要和server去通信才能做到集群限流啊,對不對,所以一定程度上肯定會影響到server節點本身服務的性能,但是我覺得問題不大,就當server節點多了一個流量比較大的接口好了。

具體上會有多大的影響,我沒有實際對這塊做出實際的測試,如果真的流量非常大,需要實際測試一下這方面的問題。

我認為影響還是可控的,本身server和client基于netty通信,通信的內容其實也非常的小。

獨立式

說完內嵌式的這些點,然后再說獨立式,也非常好理解,就是單獨部署一臺機器作為限流服務端server,就不在本身微服務集群內部選一臺作為server了。

3分鐘徹底弄懂Sentinel集群限流探索

 

限流-獨立式

很明顯,優點就是解決了上面的缺點。

  1. 不會和內嵌式一樣,影響到server節點的本身性能
  2. 可以適用于跨多個微服務之間的集群限流

優點可以說就是解決了內嵌式的兩個缺點,那么缺點也來了,這同樣也是Sentinel本身并沒有幫助我們去解決的問題。

缺點一:需要獨立部署,會產生額外的資源(錢)和運維復雜度

缺點二:server默認是單機,需要自己實現高可用方案

缺點二很致命啊,官方的server實現默認就是單機的,單點問題大家懂的都懂,自己實現高可用,我真的是有點服了。

這么說Sentinel這個集群限流就是簡單的實現了一下,真正復雜的部分他都沒管,你可以這么理解。

run起來

那基本原理大概了解之后,還是要真正跑起來看看效果的,畢竟開頭我就說了,網上這方面真的是感覺啥也搜不到,下面以嵌入式集群的方式舉例。

無論集群限流還是單機限流的方式,官方都支持寫死配置和動態數據源的配置方式,寫的話下面的代碼中也都有,被我注釋掉了,至于動態數據源的配置,會基于Apollo來實現。

理解一下動態數據源的配置方式,基于這個我們可以實現限流規則的動態刷新,還有重點的一點可以做到基于修改配置方式的半自動故障轉移。

動態數據源支持推和拉兩種方式,比如文件系統和Eureka就是拉取的方式,定時讀取文件內容的變更,Eureka則是建立HTTP連接,定時獲取元數據的變更。

推送的方式主要是基于事件監聽機制,比如Apollo和Nacos,redis官方則是基于Pub/Sub來實現,默認的實現方式是基于Lettuce,如果想用其他的客戶端要自己實現。

3分鐘徹底弄懂Sentinel集群限流探索

 

限流-集群工作模式

首先,該引入的包還是引入。

<dependency>
  <groupId>com.alibaba.csp</groupId>
  <artifactId>sentinel-annotation-aspectj</artifactId>
  <version>1.8.4</version>
</dependency>

<dependency>
  <groupId>com.alibaba.csp</groupId>
  <artifactId>sentinel-transport-simple-http</artifactId>
  <version>1.8.4</version>
</dependency>

<dependency>
  <groupId>com.alibaba.csp</groupId>
  <artifactId>sentinel-cluster-client-default</artifactId>
  <version>1.8.4</version>
</dependency>
<dependency>
  <groupId>com.alibaba.csp</groupId>
  <artifactId>sentinel-cluster-server-default</artifactId>
  <version>1.8.4</version>
</dependency>
<dependency>
  <groupId>com.alibaba.csp</groupId>
  <artifactId>sentinel-datasource-apollo</artifactId>
  <version>1.8.4</version>
</dependency>

實現SPI,在resources目錄的META-INF/services下新增名為com.alibaba.csp.sentinel.init.InitFunc的文件,內容寫上我們自己實現的類名,比如我的com.irving.demo.init.DemoClusterInitFunc。

3分鐘徹底弄懂Sentinel集群限流探索

 

實現InitFunc接口,重寫init方法,代碼直接貼出來,這里整體依賴的是Apollo的配置方式,注釋的部分是我在測試的時候寫死代碼的配置方式,也是可以用的。

public class DemoClusterInitFunc implements InitFunc {
    private final String namespace = "Application";
    private final String ruleKey = "demo_sentinel";
    private final String ruleServerKey = "demo_cluster";
    private final String defaultRuleValue = "[]";

    @Override
    public void init() throws Exception {
        // 初始化 限流規則
        initDynamicRuleProperty();
        //初始化 客戶端配置
        initClientConfigProperty();
        // 初始化 服務端配置信息
        initClientServerAssignProperty();
        registerClusterRuleSupplier();
        // token-server的傳輸規則
        initServerTransportConfigProperty();
        // 初始化 客戶端和服務端狀態
        initStateProperty();
    }

    /**
     * 限流規則和熱點限流規則配置
     */
    private void initDynamicRuleProperty() {
        ReadableDataSource<String, List<FlowRule>> ruleSource = new ApolloDataSource<>(namespace, ruleKey,
                defaultRuleValue, source -> JSON.parseobject(source, new TypeReference<List<FlowRule>>() {
        }));
        FlowRuleManager.register2Property(ruleSource.getProperty());

        ReadableDataSource<String, List<ParamFlowRule>> paramRuleSource = new ApolloDataSource<>(namespace, ruleKey,
                defaultRuleValue, source -> JSON.parseObject(source, new TypeReference<List<ParamFlowRule>>() {
        }));
        ParamFlowRuleManager.register2Property(paramRuleSource.getProperty());
    }

    /**
     * 客戶端配置,注釋的部分是通過Apollo配置,只有一個配置我就省略了
     */
    private void initClientConfigProperty() {
//        ReadableDataSource<String, ClusterClientConfig> clientConfigDs = new ApolloDataSource<>(namespace, ruleKey,
//                defaultRuleValue, source -> JSON.parseObject(source, new TypeReference<ClusterClientConfig>() {
//        }));
//        ClusterClientConfigManager.registerClientConfigProperty(clientConfigDs.getProperty());

        ClusterClientConfig clientConfig = new ClusterClientConfig();
        clientConfig.setRequestTimeout(1000);
        ClusterClientConfigManager.applyNewConfig(clientConfig);
    }

    /**
     * client->server 傳輸配置,設置端口號,注釋的部分是寫死的配置方式
     */
    private void initServerTransportConfigProperty() {
        ReadableDataSource<String, ServerTransportConfig> serverTransportDs = new ApolloDataSource<>(namespace, ruleServerKey,
                defaultRuleValue, source -> {
            List<ClusterGroupEntity> groupList = JSON.parseObject(source, new TypeReference<List<ClusterGroupEntity>>() {
            });
            ServerTransportConfig serverTransportConfig = Optional.ofNullable(groupList)
                    .flatMap(this::extractServerTransportConfig)
                    .orElse(null);
            return serverTransportConfig;
        });
        ClusterServerConfigManager.registerServerTransportProperty(serverTransportDs.getProperty());
//        ClusterServerConfigManager.loadGlobalTransportConfig(new ServerTransportConfig().setIdleSeconds(600).setPort(transPort));
    }

    private void registerClusterRuleSupplier() {
        ClusterFlowRuleManager.setPropertySupplier(namespace -> {
            ReadableDataSource<String, List<FlowRule>> ds = new ApolloDataSource<>(this.namespace, ruleKey,
                    defaultRuleValue, source -> JSON.parseObject(source, new TypeReference<List<FlowRule>>() {
            }));
            return ds.getProperty();
        });
        ClusterParamFlowRuleManager.setPropertySupplier(namespace -> {
            ReadableDataSource<String, List<ParamFlowRule>> ds = new ApolloDataSource<>(this.namespace, ruleKey,
                    defaultRuleValue, source -> JSON.parseObject(source, new TypeReference<List<ParamFlowRule>>() {
            }));
            return ds.getProperty();
        });
    }

    /**
     * 服務端配置,設置server端口和IP,注釋的配置是寫死的方式,這個在服務端是不用配置的,只有客戶端需要配置用來連接服務端
     */
    private void initClientServerAssignProperty() {
        ReadableDataSource<String, ClusterClientAssignConfig> clientAssignDs = new ApolloDataSource<>(namespace, ruleServerKey,
                defaultRuleValue, source -> {
            List<ClusterGroupEntity> groupList = JSON.parseObject(source, new TypeReference<List<ClusterGroupEntity>>() {
            });

            ClusterClientAssignConfig clusterClientAssignConfig = Optional.ofNullable(groupList)
                    .flatMap(this::extractClientAssignment)
                    .orElse(null);
            return clusterClientAssignConfig;
        });
        ClusterClientConfigManager.registerServerAssignProperty(clientAssignDs.getProperty());

//        ClusterClientAssignConfig serverConfig = new ClusterClientAssignConfig();
//        serverConfig.setServerHost("127.0.0.1");
//        serverConfig.setServerPort(transPort);
//        ConfigSupplierRegistry.setNamespaceSupplier(() -> "trade-center");
//        ClusterClientConfigManager.applyNewAssignConfig(serverConfig);
    }

    private Optional<ClusterClientAssignConfig> extractClientAssignment(List<ClusterGroupEntity> groupList) {
        ClusterGroupEntity tokenServer = groupList.stream().filter(x -> x.getState().equals(ClusterStateManager.CLUSTER_SERVER)).findFirst().get();
        Integer currentmachineState = Optional.ofNullable(groupList).map(s -> groupList.stream().filter(this::machineEqual).findFirst().get().getState()).orElse(ClusterStateManager.CLUSTER_NOT_STARTED);
        if (currentMachineState.equals(ClusterStateManager.CLUSTER_CLIENT)) {
            String ip = tokenServer.getIp();
            Integer port = tokenServer.getPort();
            return Optional.of(new ClusterClientAssignConfig(ip, port));
        }
        return Optional.empty();
    }

    /**
     * 初始化客戶端和服務端狀態,注釋的也是寫死的配置方式
     */
    private void initStateProperty() {
        ReadableDataSource<String, Integer> clusterModeDs = new ApolloDataSource<>(namespace, ruleServerKey,
                defaultRuleValue, source -> {
            List<ClusterGroupEntity> groupList = JSON.parseObject(source, new TypeReference<List<ClusterGroupEntity>>() {
            });
            Integer state = Optional.ofNullable(groupList).map(s -> groupList.stream().filter(this::machineEqual).findFirst().get().getState()).orElse(ClusterStateManager.CLUSTER_NOT_STARTED);
            return state;
        });
        ClusterStateManager.registerProperty(clusterModeDs.getProperty());

//            ClusterStateManager.applyState(ClusterStateManager.CLUSTER_SERVER);

    }

    private Optional<ServerTransportConfig> extractServerTransportConfig(List<ClusterGroupEntity> groupList) {
        return groupList.stream()
                .filter(x -> x.getMachineId().equalsIgnoreCase(getCurrentMachineId()) && x.getState().equals(ClusterStateManager.CLUSTER_SERVER))
                .findAny()
                .map(e -> new ServerTransportConfig().setPort(e.getPort()).setIdleSeconds(600));
    }

    private boolean machineEqual(/*@Valid*/ ClusterGroupEntity group) {
        return getCurrentMachineId().equals(group.getMachineId());
    }

    private String getCurrentMachineId() {
        // 通過-Dcsp.sentinel.api.port=8719 配置, 默認8719,隨后遞增
        return HostNameUtil.getIp() + SEPARATOR + TransportConfig.getPort();
    }
  
    private static final String SEPARATOR = "@";
}

基礎類,定義配置的基礎信息。

@Data
public class ClusterGroupEntity {
    private String machineId;
    private String ip;
    private Integer port;
    private Integer state;
}

然后是Apollo中的限流規則的配置和server/client集群關系的配置。

需要說明一下的就是flowId,這個是區分限流規則的全局唯一ID,必須要有,否則集群限流會有問題。

thresholdType代表限流模式,默認是0,代表單機均攤,比如這里count限流QPS=20,有3臺機器,那么集群限流閾值就是60,如果是1代表全局閾值,也就是count配置的值就是集群限流的上限。

demo_sentinel=[
    {
        "resource": "test_res", //限流資源名
        "count": 20, //集群限流QPS
        "clusterMode": true, //true為集群限流模式
        "clusterConfig": {
            "flowId": 111, //這個必須得有,否則會有問題
            "thresholdType": 1 //限流模式,默認為0單機均攤,1是全局閾值
        }
    }
]
demo_cluster=[
    {
        "ip": "192.168.3.20",
        "machineId": "192.168.3.20@8720",
        "port": 9999, //server和client通信接口
        "state": 1 //指定為server
    },
    {
        "ip": "192.168.3.20",
        "machineId": "192.168.3.20@8721",
        "state": 0
    },
    {
        "ip": "192.168.3.20",
        "machineId": "192.168.3.20@8722",
        "state": 0
    }
]

OK,到這里代碼和配置都已經OK,還需要跑起來Sentinel控制臺,這個不用教,還有啟動參數。

本地可以直接跑多個客戶端,注意修改端口號:-Dserver.port=9100 -Dcsp.sentinel.api.port=8720這兩個一塊改,至于怎么連Apollo這塊我就省略了,自己整吧,公司應該都有,不行的話用代碼里的寫死的方式也可以用。

-Dserver.port=9100 -Dcsp.sentinel.api.port=8720 -Dcsp.sentinel.dashboard.server=localhost:8080 -Dcsp.sentinel.log.use.pid=true 

因為有流量之后控制臺才能看到限流的情況,所以用官方給的限流測試代碼修改一下,放到Springboot啟動類中,觸發限流規則的初始化。

@SpringBootApplication
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
        new FlowQpsDemo();
    }
}

測試限流代碼:

public class FlowQpsDemo {

    private static final String KEY = "test_res";

    private static AtomicInteger pass = new AtomicInteger();
    private static AtomicInteger block = new AtomicInteger();
    private static AtomicInteger total = new AtomicInteger();

    private static volatile boolean stop = false;

    private static final int threadCount = 32;

    private static int seconds = 60 + 40;

    public FlowQpsDemo() {
        tick();
        simulateTraffic();
    }

    private static void simulateTraffic() {
        for (int i = 0; i < threadCount; i++) {
            Thread t = new Thread(new RunTask());
            t.setName("simulate-traffic-Task");
            t.start();
        }
    }

    private static void tick() {
        Thread timer = new Thread(new TimerTask());
        timer.setName("sentinel-timer-task");
        timer.start();
    }

    static class TimerTask implements Runnable {

        @Override
        public void run() {
            long start = System.currentTimeMillis();
            System.out.println("begin to statistic!!!");

            long oldTotal = 0;
            long oldPass = 0;
            long oldBlock = 0;
            while (!stop) {
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                }
                long globalTotal = total.get();
                long oneSecondTotal = globalTotal - oldTotal;
                oldTotal = globalTotal;

                long globalPass = pass.get();
                long oneSecondPass = globalPass - oldPass;
                oldPass = globalPass;

                long globalBlock = block.get();
                long oneSecondBlock = globalBlock - oldBlock;
                oldBlock = globalBlock;

                System.out.println(seconds + " send qps is: " + oneSecondTotal);
                System.out.println(TimeUtil.currentTimeMillis() + ", total:" + oneSecondTotal
                        + ", pass:" + oneSecondPass
                        + ", block:" + oneSecondBlock);

                if (seconds-- <= 0) {
//                    stop = true;
                }
            }

            long cost = System.currentTimeMillis() - start;
            System.out.println("time cost: " + cost + " ms");
            System.out.println("total:" + total.get() + ", pass:" + pass.get()
                    + ", block:" + block.get());
            System.exit(0);
        }
    }

    static class RunTask implements Runnable {
        @Override
        public void run() {
            while (!stop) {
                Entry entry = null;

                try {
                    entry = SphU.entry(KEY);
                    // token acquired, means pass
                    pass.addAndGet(1);
                } catch (BlockException e1) {
                    block.incrementAndGet();
                } catch (Exception e2) {
                    // biz exception
                } finally {
                    total.incrementAndGet();
                    if (entry != null) {
                        entry.exit();
                    }
                }

                Random random2 = new Random();
                try {
                    TimeUnit.MILLISECONDS.sleep(random2.nextInt(50));
                } catch (InterruptedException e) {
                    // ignore
                }
            }
        }
    }
}

啟動之后查看控制臺,可以看到嵌入式的集群服務端已經啟動好。

3分鐘徹底弄懂Sentinel集群限流探索

 

查看限流的情況:

3分鐘徹底弄懂Sentinel集群限流探索

 

最后為了測試效果,再啟動一個客戶端,修改端口號為9200和8721,可以看到新的客戶端已經連接到了服務端,不過這里顯示的總QPS 30000和我們配置的不符,這個不用管他。

3分鐘徹底弄懂Sentinel集群限流探索

 


3分鐘徹底弄懂Sentinel集群限流探索

 

好了,這個就是集群限流原理和使用配置方式,當然了,你可以啟動多臺服務,然后手動修改Apollo中的state參數修改服務端,驗證修改配置的方式是否能實現故障轉移機制,另外就是關閉client或者server驗證是否回退到單機限流的情況,這里就不一一測試了,因為我已經測試過了呀。

對于獨立式的部署方式基本也是一樣的,只是單獨啟動一個服務端的服務,需要手動配置server,而嵌入式的則不需要,loadServerNamespaceSet配置為自己的服務名稱即可。

ClusterTokenServer tokenServer = new SentinelDefaultTokenServer();

ClusterServerConfigManager.loadGlobalTransportConfig(new ServerTransportConfig()
.setIdleSeconds(600)
.setPort(11111));
ClusterServerConfigManager.loadServerNamespaceSet(Collections.singleton(DemoConstants.APP_NAME));

tokenServer.start();

 

來源:公眾號——艾小仙

分享到:
標簽:Sentinel
用戶無頭像

網友整理

注冊時間:

網站:5 個   小程序:0 個  文章:12 篇

  • 51998

    網站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會員

趕快注冊賬號,推廣您的網站吧!
最新入駐小程序

數獨大挑戰2018-06-03

數獨一種數學游戲,玩家需要根據9

答題星2018-06-03

您可以通過答題星輕松地創建試卷

全階人生考試2018-06-03

各種考試題,題庫,初中,高中,大學四六

運動步數有氧達人2018-06-03

記錄運動步數,積累氧氣值。還可偷

每日養生app2018-06-03

每日養生,天天健康

體育訓練成績評定2018-06-03

通用課目體育訓練成績評定