一、前言
RocketMQ 的消費者可以根據 Tag 進行消息過濾,也支持自定義屬性過濾。消息過濾目前是在 Broker 端實現的,優點是減少了對于 Consumer 無用消息的網絡傳輸,缺點是增加了 Broker 的負擔、而且實現相對復雜。
目前 RocketMQ 只支持兩個模式過濾器,一個是基于 TAG,另外一個是基于 SQL92。其中 TAG 模式相對于比較簡單;而另外一個就相當的復雜,實現方式跟 spring 表達式有點相似;同時也提供了一個配置項,來決定是否開啟 SQL92;
- enablePropertyFilter 是否支持根據屬性過濾,默認為 false,如果使用基于標的式 SQL92 模式過濾消息,則該參數必須設置為 true。
另外關于類過濾的,很快就過期,官方不推薦使用該模式,所以這里不在這里解讀。
二、源碼導讀
1、消費者過濾器管理組件consumerFilterManager源碼分析,其中布隆過濾器的數據結構是怎樣的,怎么進行注冊的;
2、過濾原理,通過解讀ExpressionMessageFilter類來分析其運作原理;
三、過濾數據管理
BrokerController中有一個ConsumerFilterManager,就是用來管理消費者過濾器數據的;
- 構造方法;
- 數據對象的結構;
- 消費組批量注冊過濾數據對象;
- 根據消費組取消注冊;
- 判斷數據是否死亡;
這里就分析這幾個核心方法吧,其余的方法其實也差不多;
1、構造方法public class ConsumerFilterManager extends ConfigManager {private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.FILTER_LOGGER_NAME);// 定義了24小時常量private static final long MS_24_HOUR = 24 * 3600 * 1000;// topic -> filtersprivate ConcurrentMapfilterDataByTopic = new ConcurrentHashMap(256);private transient BrokerController brokerController;// 布隆過濾器,簡單解釋一下什么叫做布隆過濾器,bit位數組,寫入數據先hash再就更正bit位里的01// 查詢的時候,可以對查詢數據計算hash再到一個位置找是01,如果是0肯定沒出現過這條數據,如果是1,有可能出現過// 可以快速篩查你的數據要不然是肯定沒出現過,要不然是可能出現過private transient BloomFilter bloomFilter;public ConsumerFilterManager() {// just for test,僅限于測試this.bloomFilter = BloomFilter.createByFn(20, 64);public ConsumerFilterManager(BrokerController brokerController) {this.brokerController = brokerController;this.BloomFilter = BloomFilter.createByFn( // 根據配置創建布隆過濾器brokerController.getBrokerConfig().getMaxErrorRateOfBloomFilter(),brokerController.getBrokerConfig().getExpectConsumerNumUseFilter()// then set bit map length of store config.brokerController.getMessageStoreConfig().setBitMapLengthConsumeQueueExt(this.bloomFilter.getM()
2、數據對象的結構
我們可以發現ConsumerFilterManager是繼承至ConfigManager,如果看過我以前的文章就知道ConfigManager有一個抽象方法configFilePath,是用來標明文件持久化路徑的;
我們直接找到子類的對應方法
@Overridepublic String configFilePath() {if (this.brokerController != null) {return BrokerPathConfigHelper.getConsumerFilterPath(this.brokerController.getMessageStoreConfig().getStorePathrootDir()return BrokerPathConfigHelper.getConsumerFilterPath("./unit_test");
public static String getConsumerFilterPath(final String rootDir) {return rootDir + File.separator + "config" + File.separator + "consumerFilter.json";
再找到對應的文件,只不過這個文件中數據為空
但是沒事,我在網上找到一個文件數據結構,文件存放的格式如下:
"filterDataByTopic":{"Topic":{"topic": String,"groupFilterData": {"consumerGroup":{"consumerGroup" : String,"topic": String,"expression": String,"expressionType": String,"bornTime": long,"deadTime": long,"bloomFilterData":{"bitPos": int[],"bitNum": int},"clientVersion": long},},
從字面都可以大概猜出其用意;除了那個 BloomFilter 相關的字段屬性;
ConsumerFilterManager 對象中的 bloomFilter 屬性我們可以理解是一個工具方法;而 ConsumerFilterData 對象中的 bloomFilterData 屬性是這個消費組中的數值數組,用來判斷是否滿足過濾條件的消息;
// 一個topic的各個消費組的過濾數據public static class FilterDataMapByTopic {// 一個topic,針對他訂閱的各個消費組的過濾數據映射關系private ConcurrentMapgroupFilterData = new ConcurrentHashMap();private String topic;
public class ConsumerFilterData {private String consumerGroup;private String topic;private String expression;private String expressionType;private transient Expression compiledExpression;private long bornTime;private long deadTime = 0;private BloomFilterData bloomFilterData;private long clientVersion;
3、消費組批量注冊過濾數據對象
場景:當一個消費組里的一個消費者客戶端跟我的broker之間建立了連接了以后,注冊消費者之后就需要批量注冊布隆過濾器,會通過DefaultConsumerIdsChangeListener監聽器的handle方法調用ConsumerFilterManager的register方法完成批量注冊,之前在分析ConsumerManager消費者管理組件有分析過的;
// 注冊消費組里的最新的一波訂閱和過濾public void register(final String consumerGroup,final Collection subList) {// 對訂閱數據進行遍歷for (SubscriptionData subscriptionData : subList) {// 注冊register(subscriptionData.getTopic(),consumerGroup,subscriptionData.getSubString(),subscriptionData.getexpressionType(),subscriptionData.getSubVersion()// make illegal topic dead.// 對我的一個消費組拿到我對各個topic的過濾數據Collection groupFilterData = getByGroup(consumerGroup);Iterator iterator = groupFilterData.iterator();while (iterator.hasNext()) {ConsumerFilterData filterData = iterator.next();boolean exist = false;for (SubscriptionData subscriptionData : subList) {// 判斷是否存在if (subscriptionData.getTopic().equals(filterData.getTopic())) {exist = true;break;if (!exist && !filterData.isDead()) {filterData.setDeadTime(System.currentTimeMillis());log.info("Consumer filter changed: {}, make illegal topic dead:{}", consumerGroup, filterData);
public boolean register(// topicfinal String topic,// 消費組final String consumerGroup,// 過濾表達式final String expression,// 過濾類型final String type,// 客戶端版本號final long clientVersion) {if (ExpressionType.isTagType(type)) {return false;if (expression == null || expression.length() == 0) {return false;// 根據topic獲取topic的各個消費組的過濾數據FilterDataMapByTopic filterDataMapByTopic = this.filterDataByTopic.get(topic);if (filterDataMapByTopic == null) {FilterDataMapByTopic temp = new FilterDataMapByTopic(topic);FilterDataMapByTopic prev = this.filterDataByTopic.putIfAbsent(topic, temp);filterDataMapByTopic = prev != null ? prev : temp;// 生成了一個消費組對一個topic的布隆過濾器BloomFilterData bloomFilterData = bloomFilter.generate(consumerGroup + "#" + topic);return filterDataMapByTopic.register(consumerGroup, expression, type, bloomFilterData, clientVersion);
再調用FilterDataMapByTopic的register方法
public boolean register(// 消費組String consumerGroup,// 過濾表達式String expression,// 過濾類型String type,// 布隆過濾器數據BloomFilterData bloomFilterData,// 客戶端版本long clientVersion) {// 獲取到這個消費組的過濾器數據ConsumerFilterData old = this.groupFilterData.get(consumerGroup);if (old == null) {ConsumerFilterData consumerFilterData = build(topic, consumerGroup, expression, type, clientVersion);if (consumerFilterData == null) {return false;// 給這個過濾數據里設置進去一個布隆過濾器consumerFilterData.setBloomFilterData(bloomFilterData);old = this.groupFilterData.putIfAbsent(consumerGroup, consumerFilterData);if (old == null) {log.info("New consumer filter registered: {}", consumerFilterData);return true;} else {// 如果存在則比較一下版本是否一致if (clientVersion <= old.getClientVersion()) {if (!type.equals(old.getExpressionType()) || !expression.equals(old.getExpression())) {log.warn("Ignore consumer({} : {}) filter(concurrent), because of version {} <= {}, but maybe info changed!old={}:{}, ignored={}:{}",consumerGroup, topic,clientVersion, old.getClientVersion(),old.getExpressionType(), old.getExpression(),type, expression);// 如果數據已存在,并且數據狀態為死亡,則重新激活if (clientVersion == old.getClientVersion() && old.isDead()) {// 重新激活reAlive(old);return true;return false;} else {this.groupFilterData.put(consumerGroup, consumerFilterData);log.info("New consumer filter registered(concurrent): {}, old: {}", consumerFilterData, old);return true;} else {if (clientVersion <= old.getClientVersion()) {if (!type.equals(old.getExpressionType()) || !expression.equals(old.getExpression())) {log.info("Ignore consumer({}:{}) filter, because of version {} <= {}, but maybe info changed!old={}:{}, ignored={}:{}",consumerGroup, topic,clientVersion, old.getClientVersion(),old.getExpressionType(), old.getExpression(),type, expression);if (clientVersion == old.getClientVersion() && old.isDead()) {reAlive(old);return true;return false;boolean change = !old.getExpression().equals(expression) || !old.getExpressionType().equals(type);if (old.getBloomFilterData() == null && bloomFilterData != null) {change = true;if (old.getBloomFilterData() != null && !old.getBloomFilterData().equals(bloomFilterData)) {change = true;// if subscribe data is changed, or consumer is died too long.if (change) {ConsumerFilterData consumerFilterData = build(topic, consumerGroup, expression, type, clientVersion);if (consumerFilterData == null) {// new expression compile error, remove old, let client report error.this.groupFilterData.remove(consumerGroup);return false;consumerFilterData.setBloomFilterData(bloomFilterData);this.groupFilterData.put(consumerGroup, consumerFilterData);log.info("Consumer filter info change, old: {}, new: {}, change: {}",old, consumerFilterData, change);return true;} else {old.setClientVersion(clientVersion);if (old.isDead()) {reAlive(old);return true;
public static ConsumerFilterData build(// topicfinal String topic,// 消費組final String consumerGroup,// 過濾表達式final String expression,// 過濾類型final String type,// 客戶端版本final long clientVersion) {if (ExpressionType.isTagType(type)) {return null;ConsumerFilterData consumerFilterData = new ConsumerFilterData();consumerFilterData.setTopic(topic);consumerFilterData.setConsumerGroup(consumerGroup);consumerFilterData.setBornTime(System.currentTimeMillis());consumerFilterData.setDeadTime(0);consumerFilterData.setExpression(expression);consumerFilterData.setExpressionType(type);consumerFilterData.setClientVersion(clientVersion);try {consumerFilterData.setCompiledExpression(FilterFactory.INSTANCE.get(type).compile(expression)} catch (Throwable e) {log.error("parse error: expr={}, topic={}, group={}, error={}", expression, topic, consumerGroup, e.getMessage());return null;return consumerFilterData;
重新激活
這里重新激活就是將死亡時間設置為0,判斷是否死亡就是死亡時間deadTime是否大于出生時間bornTime;
protected void reAlive(ConsumerFilterData filterData) {long oldDeadTime = filterData.getDeadTime();filterData.setDeadTime(0);log.info("Re alive consumer filter: {}, oldDeadTime: {}", filterData, oldDeadTime);
4、根據消費組取消注冊public void unRegister(final String consumerGroup) {for (Entry entry : filterDataByTopic.entrySet()) {entry.getValue().unRegister(consumerGroup);
org.Apache.rocketmq.broker.filter.ConsumerFilterManager.FilterDataMapByTopic#unRegister
public void unRegister(String consumerGroup) {if (!this.groupFilterData.containsKey(consumerGroup)) {return;// 獲取消費組對應的過濾數據ConsumerFilterData data = this.groupFilterData.get(consumerGroup);// 如果為空,或者已經死亡或者說已不可用了則直接返回if (data == null || data.isDead()) {return;long now = System.currentTimeMillis();log.info("Unregister consumer filter: {}, deadTime: {}", data, now);// 設置數據死亡時間data.setDeadTime(now);
5、判斷數據是否死亡
org.apache.rocketmq.broker.filter.ConsumerFilterData#isDead
public boolean isDead() {return this.deadTime >= this.bornTime;
四、過濾原理
客戶端向 Broker 端拉取消息時,Broker 從 commitlog、consumequeue 文件中拿到數據,接著會進行過濾,判斷是否滿足指定的條件;所以,過濾的工作是在于 DefaultMessageStore 對象中的 getMessage 方法,該方法入參中有這樣的對象 MessageFilter;ExpressionMessageFilter實現其接口;
我們先看一下其接口的暴露的兩個方法:
public interface MessageFilter {boolean isMatchedByConsumeQueue(final Long tagsCode,final ConsumeQueueExt.CqExtUnit cqExtUnit);boolean isMatchedByCommitLog(final ByteBuffer msgBuffer,final Map properties);
從上面的兩個方法中,不難猜測出其是對 consumequeue 以及 commitlog 進行過濾;
1、TAG 過濾
tag 過濾只針對 consumequeue 的,所以在 MessageFilter 接口的 isMatchedByCommitLog 是默認返回 true;
2、SQL92
在 isMatchedByConsumeQueue 方法中,并沒有 SQL92 進行過濾,而是用 BloomFilter 進行過濾,可以理解為 BloomFilter 是 SQL92 的緩存過濾器。先通過 consumequeue 先過濾不符合的消息,然后在 isMatchedByCommitLog 嚴格過濾;
然而要使用這個布隆過濾器,需要打開相關的 RocketMQ 配置項才可以生效:
- enableConsumeQueueExt 是否啟用 ConsumeQueue 拓展屬性,默認為 false,這樣子的話 isMatchedByConsumeQueue 方法,永遠都會返回 true;
- enableCalcFilterBitMap 需要設置為 true,否則永遠都不會被命中;因為設置為 true 時,CommitLogDispatcherCalcBitMap 才會去設置 ConsumerFilterData 對象中的 bloomFilterData 數組中的對應的位置為 1;如果不設置為 false,則 bloomFilterData 永遠都是為 0;
需要設置 enableConsumeQueueExt 為 true 開啟拓展屬性,這樣子才能使用 BloomFilter 進行過濾;
話不多說,上代碼,我們重點看 isMatchedByConsumeQueue 方法的 tag 模式過濾;
public class ExpressionMessageFilter implements MessageFilter {protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.FILTER_LOGGER_NAME);protected final SubscriptionData subscriptionData;protected final ConsumerFilterData consumerFilterData;protected final ConsumerFilterManager consumerFilterManager;protected final boolean bloomDataValid;public ExpressionMessageFilter(SubscriptionData subscriptionData, ConsumerFilterData consumerFilterData,ConsumerFilterManager consumerFilterManager) {this.subscriptionData = subscriptionData;this.consumerFilterData = consumerFilterData;this.consumerFilterManager = consumerFilterManager;if (consumerFilterData == null) {bloomDataValid = false;return;BloomFilter bloomFilter = this.consumerFilterManager.getBloomFilter();if (bloomFilter != null && bloomFilter.isValid(consumerFilterData.getBloomFilterData())) {bloomDataValid = true;} else {bloomDataValid = false;@Overridepublic boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) {if (null == subscriptionData) {return true;if (subscriptionData.isClassFilterMode()) {return true;// by tags code.if (ExpressionType.isTagType(subscriptionData.getExpressionType())) {// 如果consumequeue中沒有tag,則返回true。能消費該消息if (tagsCode == null) {return true;// 如果訂閱組中的subString等于*,則說明訂閱組是不需要過濾的,返回true,能消費該消息;if (subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)) {return true;// 如果consumequeue中的tag在訂閱組的codeSet中,則說明訂閱組是能消費該消息的,返回true;否則返回false;return subscriptionData.getCodeSet().contains(tagsCode.intValue());} else {// no expression or no bloomif (consumerFilterData == null || consumerFilterData.getExpression() == null|| consumerFilterData.getCompiledExpression() == null || consumerFilterData.getBloomFilterData() == null) {return true;// message is before consumerif (cqExtUnit == null || !consumerFilterData.isMsgInLive(cqExtUnit.getMsgStoreTime())) {log.debug("Pull matched because not in live: {}, {}", consumerFilterData, cqExtUnit);return true;byte[] filterBitMap = cqExtUnit.getFilterBitMap();BloomFilter bloomFilter = this.consumerFilterManager.getBloomFilter();if (filterBitMap == null || !this.bloomDataValid|| filterBitMap.length * Byte.SIZE != consumerFilterData.getBloomFilterData().getBitNum()) {return true;BitsArray bitsArray = null;try {bitsArray = BitsArray.create(filterBitMap);boolean ret = bloomFilter.isHit(consumerFilterData.getBloomFilterData(), bitsArray);log.debug("Pull {} by bit map:{}, {}, {}", ret, consumerFilterData, bitsArray, cqExtUnit);return ret;} catch (Throwable e) {log.error("bloom filter error, sub=" + subscriptionData+ ", filter=" + consumerFilterData + ", bitMap=" + bitsArray, e);return true;@Overridepublic boolean isMatchedByCommitLog(ByteBuffer msgBuffer, Map properties) {if (subscriptionData == null) {return true;if (subscriptionData.isClassFilterMode()) {return true;if (ExpressionType.isTagType(subscriptionData.getExpressionType())) {return true;ConsumerFilterData realFilterData = this.consumerFilterData;Map tempProperties = properties;// no expressionif (realFilterData == null || realFilterData.getExpression() == null|| realFilterData.getCompiledExpression() == null) {return true;if (tempProperties == null && msgBuffer != null) {tempProperties = MessageDecoder.decodeProperties(msgBuffer);Object ret = null;try {MessageEvaluationContext context = new MessageEvaluationContext(tempProperties);ret = realFilterData.getCompiledExpression().evaluate(context);} catch (Throwable e) {log.error("Message Filter error, " + realFilterData + ", " + tempProperties, e);log.debug("Pull eval result: {}, {}, {}", ret, realFilterData, tempProperties);if (ret == null || !(ret instanceof Boolean)) {return false;return (Boolean) ret;
五、總結
- ConsumerFilterManager管理topic對應的各個消費組的過濾數據,提供了注冊數據過濾對象,取消注冊過濾數據對象等;
- ExpressionMessageFilter調用ConsumerFilterManager的BloomFilter組件中的isHit進行過濾;