前置掌握:SpringBoot基礎使用、RocketMQ和SpringBoot的整合使用
源碼地址:gitee.com/tianxincode… 文章只會說明核心代碼,其他的基礎整合配置和多環境自動隔離參考源碼即可
一、為什么要二次封裝
為了不產生歧義,文章中提到的二次封裝均是基于原始使用方式的封裝,而非源碼級別的二次封裝
換句話說:如果都需要對源碼進行封裝了,那么說明公司業務規模都到一定程度了,二次封裝這種東西已經不需要討論了,封裝已經是一個共識
- 首先明確一點:不進行二次封裝完全不影響RocketMQ的使用,可以選擇二次封裝和不選擇二次封裝 二次封裝可以提供更多的功能和更簡潔的使用方式 如果一個封裝搞得比原始使用方式更復雜,那么就失去了二次封裝的意義 Q1:二次封裝可不可以不要?完全可以,完全不影響正常使用 Q2:二次封裝有沒有必要?仁者見仁智者見智的問題,如果覺得沒有必要那么這篇文章可以跳過~
- ORM框架中典型的一個二次封裝框架就是MyBatisPlus(簡稱MP),后者是對MyBatis原生使用的增加,不使用MP直接使用MyBatis可不可以?完全可以,那為什么要用二次封裝后的MP? 場景:大部分的數據庫操作,無外乎CRUD,那么最常用的比如(根據名稱就可以知道這個方法做什么用,就沒有必要再二次說明了):updateById、batchUpdate、deleteById、saveOrUpdate、batchInsert。對于上面這5個操作,變化的只是表字段和表名,剩下的語法都是一樣 不封裝:直接使用MyBatis完全可以自己實現上面方法的功能,但是每個表都需要寫一遍自己上面的方法,假設有100張表,那么就會多出495個(下面說明)重復功能代碼,而且所有代碼都是冗余的 封裝后:由封裝者提供上面5個方法的公共實現,然后所有需要使用上面功能的Service只需要繼承封裝好的類就自然的擁有了上面的5大功能,那么代碼的冗余量就從100張表*5個方法==500,去掉封裝的5個,節省了495的代碼冗余量
- 所以二次封裝是為了更方便用、更簡潔、更加適用于系統,量身打造可以大大提升開發效率。就如上面的5個方法,完全重復性的東西為什么要浪費開發時間來做這些冗余的事情呢?
1.1 二次封裝不同觀點
讓我們以一個生活中的蛋炒飯開個頭
原始框架好比提供了原材料:廚具、雞蛋,米飯等食材、菜譜
- 對于框架的使用通常有以下兩種方式
- 第一種:根據菜譜來進行做飯(使用原生的方法調用),洗菜、做飯、刷碗、打掃不管啥入參自己管
- 第二種:找一個人來學會這道菜(負責二次封裝的人)的多種做法(封裝大部分業務場景)并做成一種點餐式的服務,誰想吃哪種類型的蛋炒飯直接點餐(調用封裝好的方法)就可以吃上香噴噴的蛋炒飯
問題:哪種方案更好? 答案:兩種各有各的優勢(在說廢話,哈哈~)
- 第一種:原生方式 優點:可以按照各種方式靈活調用,比如每個人都使用RocketMQTemplate原生send發送消息,想要發送什么類型的消息就發送什么類型的消息,比較隨意 缺點:代碼大量冗余,從構建參數對象、發送消息、消費消息、異常處理、日志記錄、異常重試啥啥的都是自己搞,每個消費隊列就會出現上面所有的步驟。比如現在有一個訂單處理中心A接收來自各種類型訂單,此時如B、C兩個原始訂單來源想讓A處理訂單,那么B和C都需要按照A的要求進行調用,代碼會冗余
- 第二種:點餐式服務 優點:封裝了大部分統一的方法調用,比如 發送消息、異常處理、日志記錄等等都是重復的,封裝后點餐的人不需要再關系這一部分要怎么處理,只需要告訴點餐服務要不要進行異常、要不異常重試等等,那么此時對于點餐人來說只需要 付錢(調用服務)、吃飯(消費消息),除此之外啥也不用管,全部由點餐服務提供者完成所有上述兩個步驟外的其它操作 缺點:無法滿足所有點餐人的要求,有的人喜歡味道重一點,有的喜歡味道淡一點。但是這個缺點完全可以處理,比如點餐服務提供了自定義廚房(返回原始發送對象),此時調用者可以按照第一種方式進行使用
- 選哪種? 個人而言:業務系統復雜的優先選擇第二種 ,簡單業務的選擇第一種(盡量采用封裝,后續維護方便)。對于一個復雜的系統,本身業務級的代碼就已經很多了,結果還要每個人處理全部一樣的東西,消費者越多代碼冗余越多。如果一個系統只是為了使用MQ來進行業務分離,消費者也不多,那么可以選擇最快的方式,但是最終會選擇第二種,如果業務隨著時間增長越復雜,越晚改成第二種花費的代價越大! 第一種就好比此時我們要直接操作內存,原生操作就好比C++或C,可以直接操作內存,但是同時用完后還要自己寫各種異常處理和釋放內存;代碼封裝就好比JAVA,我們只需要告訴Java我們要使用內存,然后用完就不用管
- 企業中,業務功能產出是一級優先級,在此之上才能有更高級的東西。技術服務于業務,而不是業務服務于技術!比如現在30個人的系統,我們要使用緩存加速訪問,那么我們是選擇 內部緩存(直接用集合或者map存起來)還是用redis? 內部緩存和Redis能不能達到目的?能 哪個更方便更快?內部緩存!內部對象就很快實現 如果業務發展遲早會轉為Redis這種專業的緩存中間件,就好比業務發展前第一種,業務發展后選擇第二種,但是對于大部分業務系統來說功能增加是很快的,特別是產品同事上一分鐘提需求下一分鐘就要上線這種(開個玩笑~),所以我們在引用一個技術需不需要進行二次封裝時需要技術負責人對業務增長有一個預判。建議是都進行封裝一下
1.2 封裝的抽離點
- 對于二次封裝,其中最主要的就是找出該框架在日常使用中所出現的大部分涉及到的操作,然后找出變化操作和不變化操作
- RocketMQ日常使用主要場景為例: 發送消息階段:準備需要發送的消息、發送消息、記錄原始消息日志、發送失敗處理、可靠性處理 消費消息階段:記錄接收消息日志、業務處理、業務日志記錄、異常處理、異常重試、異常通知、死信處理
- 提取變化點和不變化點(可以抽取為公共處理的場景) 發送消息階段: 變化點:準備需要發送的消息 不變化點:發送消息、記錄原始消息日志、發送失敗處理、可靠性處理 消費消息階段 變化點:業務處理、業務日志記錄 不變化點:記錄接收消息日志、異常處理、異常重試、異常通知、死信處理
- 從上可以看到,對于RocketMQ的使用,大部分場景都是可以抽離成一個公共的方法處理,只有業務級的需要自己處理,所以如果我們把不變化場景抽取后,每個同事只需要寫自己業務相關部分即可
- 抽取后的復雜度:對于新加一個消費者,只需要處理業務相關三個場景(準備需要發送的消息、業務處理、業務日志記錄),剩下的九個場景,只需要封裝一次就可以。需要現在就幾十個消費者,可以想想一些減少了多少代碼冗余
1.3 設計模式的應用
- 要封裝出一個好的抽象層,【設計模式】建議好好體會和學習一下
- 設計模式對于用不到的人來說比較虛幻,對于用的到的人來說,這個真牛X
二、二次封裝核心要點
2.1 二次封裝核心點
2.1.1 封裝主要討論點
- 對于RocketMQ或者說對于整個MQ體系來說(不管是RabbitMQ、RocketMQ、Kafka)等封裝的核心主要有兩個:發送消息、消費消息者兩個場景
- 對于RocketMQ我們主要討論三個地方:RocketMQTemplate封裝、RocketMQListener封裝和廣播消息的封裝
- 廣播消息是分布式系統中同時讓所有節點都干一件事情的一個好的方式,如果用不到忽略廣播消息即可
2.1.2 發送/消費的幾種消息實體
- RocketMQ發送消息對于不同的使用來說,大部分選擇下面的幾種發送消息類型 A、發送Json對象,比如Fastjson的JSONObject B、直接發送轉Json后的String對象 C、根據業務封裝對應實體類 D、直接使用原生MessageExt接收
- 怎么選擇?怎么選擇才是最優? 上面哪一種都可以達到目的,如果要統一封裝就必須要有一個標準 怎么選擇只需要回答這個問題:在不看消息發送者的情況下,消費者怎么知道發送者發送的消息含義? 比如現在有一個訂單消息,如果我們不看消息發送者,怎么知道發送者給消費者發送哪些字段 A、B、D可以嗎?一定不可以!JSON對象和String對象,如果我們不看消息發送者不可能知道到底發送了啥,這點我相信沒有可以討論的地方,因為類型決定了這個操作不可能 C可以嗎?可以!此時不需要看消息發送者,只需要看消費者的實體類點進去,有哪些業務字段一清二楚 可能有杠要抬了,有看實體類的功夫,我看消息發送者都看完了 靈魂拷問1:如果消息發送者和消費者不在一個系統怎么看?邪魅一笑,不同業務線可能沒代碼權限吧?分布式系統完全獨立可能吧? 靈魂拷問2:如果現在需要一個功能,如果某些必須要的字段消息發送者如果沒有給的話需要校驗,普通String和JSONObject怎么實現?換成實體類呢?
- 基于上述討論點,封裝建議基于實體類來,實體類不管是排查問題、新人熟悉系統代碼、信息校驗等String和JSONObject無法像實體類一樣輕松勝任
2.2 RocketMQTemplate封裝
2.2.1 封裝基礎實體類
- 基礎消息實體類封裝了除了業務消息外所有其他公共字段,主要看下面代碼中的字段和注釋
- 基礎抽象消息實體,包含基礎的消息、根據自己的業務消息設置更多的字段 其中也可以包含所有消費者可能用得到的方法等,比如做些數據的加解密
package com.codecoord.rocketmq.domain;
import lombok.Data;
import java.time.LocalDateTime;
import java.util.UUID;
/**
* 基礎消息實體,包含基礎的消息
* 根據自己的業務消息設置更多的字段
*
* @author tianxincoord@163.com
* @since 2022/6/16
*/
@Data
public abstract class BaseMqMessage {
/**
* 業務鍵,用于RocketMQ控制臺查看消費情況
*/
protected String key;
/**
* 發送消息來源,用于排查問題
*/
protected String source = "";
/**
* 發送時間
*/
protected LocalDateTime sendTime = LocalDateTime.now();
/**
* 跟蹤id,用于slf4j等日志記錄跟蹤id,方便查詢業務鏈
*/
protected String traceId = UUID.randomUUID().toString();
/**
* 重試次數,用于判斷重試次數,超過重試次數發送異常警告
*/
protected Integer retryTimes = 0;
}
- 有了此基礎抽象實體類,那么剩下的所有業務消息實體只需要繼承此基類,然后在自己業務類中包含自己需要的字段即可,因為這些公共字段不管是向上轉型還是向下轉型,子類和父類都可以看得到
2.2.2 RocketMQTemplate
- RocketMQTemplate發送消息的代碼如果不封裝,我們發送消息需要這樣 String destination = topic + ":" + tag; template.syncSend(destination, message);
- 每個人發送消息都要自己處理這個冒號,直接傳入topic和tag不香嗎?按照抽離變化點中的變化點,只有消息是變化的,除此之外的其他規則交給封裝類
- RocketMQTemplate主要封裝發送消息的日志、異常的處理、消息key設置、等等其他配置
- 封裝代碼類如下,下面包含了主要發送方式,更多自己添加即可 這里就是消息發送的點餐機器,同時也提供了封裝方法也提供原始RocketMQTemplate供使用 此處只是提供一種方式,生產中按照項目組商量決定
package com.codecoord.rocketmq.template;
import com.alibaba.fastjson.JSONObject;
import com.codecoord.rocketmq.constant.RocketMqSysConstant;
import com.codecoord.rocketmq.domain.BaseMqMessage;
import com.codecoord.rocketmq.util.JsonUtil;
import org.Apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* RocketMQ模板類
*
* @author tianxincoord@163.com
* @since 2022/4/15
*/
@Component
public class RocketMqTemplate {
private static final Logger LOGGER = LoggerFactory.getLogger(RocketMqTemplate.class);
@Resource(name = "rocketMQTemplate")
private RocketMQTemplate template;
/**
* 獲取模板,如果封裝的方法不夠提供原生的使用方式
*/
public RocketMQTemplate getTemplate() {
return template;
}
/**
* 構建目的地
*/
public String buildDestination(String topic, String tag) {
return topic + RocketMqSysConstant.DELIMITER + tag;
}
/**
* 發送同步消息
*/
public <T extends BaseMqMessage> SendResult send(String topic, String tag, T message) {
// 注意分隔符
return send(topic + RocketMqSysConstant.DELIMITER + tag, message);
}
public <T extends BaseMqMessage> SendResult send(String destination, T message) {
// 設置業務鍵,此處根據公共的參數進行處理
// 更多的其它基礎業務處理...
Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build();
SendResult sendResult = template.syncSend(destination, sendMessage);
// 此處為了方便查看給日志轉了json,根據選擇選擇日志記錄方式,例如ELK采集
LOGGER.info("[{}]同步消息[{}]發送結果[{}]", destination, JsonUtil.toJson(message), JSONObject.toJSON(sendResult));
return sendResult;
}
/**
* 發送延遲消息
*/
public <T extends BaseMqMessage> SendResult send(String topic, String tag, T message, int delayLevel) {
return send(topic + RocketMqSysConstant.DELIMITER + tag, message, delayLevel);
}
public <T extends BaseMqMessage> SendResult send(String destination, T message, int delayLevel) {
Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build();
SendResult sendResult = template.syncSend(destination, sendMessage, 3000, delayLevel);
LOGGER.info("[{}]延遲等級[{}]消息[{}]發送結果[{}]", destination, delayLevel, JsonUtil.toJson(message), JsonUtil.toJson(sendResult));
return sendResult;
}
}
- 這個類是最基礎的原始封裝類,相當于餐館提供的點餐服務。上面提供無業務特性的發送,比如想要發送日志消息或者動態發送消息目的場景
3.2.3 增強RocketMQTemplate
- 以訂單處理中心來說,變化點僅僅只是單號等業務數據不一樣,發往訂單處理中心的消息不管是topic還是tag等等其實完全都一樣,那么此時可以根據業務來增加封裝
- 增強原始功能需要注意下面兩個點 所有父類能出現的地方,子類都能出現:也就是子類擁有功能 >= 父類 ,比如Java的List,只要入參是List的地方,傳ArrayList和LinkedList完全可以 增強功能不能改變原始功能的行為:比如父類有一個方法say是說話,結果子類覆寫了say改成了行為是吃飯,然后當調用者調用say的時候得到了一個完全預期外的結果
- 就以訂單中心消息發送為例,封裝OrderMessageTemplate繼承自RocketMqTemplate,此時前者就擁有了封裝父類的所有基礎方法,擁有了所有父類的功能。然后可以在前者增加自身業務特性的發送方法,比如發送訂單處理消息
package com.codecoord.rocketmq.template;
import com.codecoord.rocketmq.constant.RocketMqBizConstant;
import com.codecoord.rocketmq.domain.RocketMqEntityMessage;
import org.apache.rocketmq.client.producer.SendResult;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import javax.validation.constraints.NotNull;
import java.time.LocalDate;
import java.time.LocalDateTime;
/**
* 訂單類發送消息模板工具類
*
* @author tianxincode@163.com
* @since 2022/6/16
*/
@Component
public class OrderMessageTemplate extends RocketMqTemplate {
/// 如果不采用繼承也可以直接注入使用
/* @Resource
private RocketMqTemplate rocketMqTemplate; */
/**
* 入參只需要傳入是哪個訂單號和業務體消息即可,其他操作根據需要處理
* 這樣對于調用者而言,可以更加簡化調用
*/
public SendResult sendOrderPaid(@NotNull String orderId, String body) {
RocketMqEntityMessage message = new RocketMqEntityMessage();
message.setKey(orderId);
message.setSource("訂單支付");
message.setMessage(body);
// 這兩個字段只是為了測試
message.setBirthday(LocalDate.now());
message.setTradeTime(LocalDateTime.now());
return send(RocketMqBizConstant.SOURCE_TOPIC, RocketMqBizConstant.ORDER_PAID_TAG, message);
}
}
- 此時對于調用者只需要 orderMessageTemplate.sendOrderPaid("O001", "xxx");就可以把消息發送到訂單處理中心
- 封裝后的好處,假如現在有10個訂單來源,現在需要調整消息發送格式,如果不進行封裝那么10個來源發送的地方都需要改;如果進行了二次封裝,只需要改sendOrderPaid方法即可,而且還不會出錯,此時優勢就體現出來了
2.3 RocketMQListener封裝
- RocketMQListener是消費消息的核心,同時也涉及到更多的操作,比如:基礎日志記錄、異常處理、消息重試、警告通知等等等
- 按照抽離變化點,RocketMQListener只應該處理與自身業務相關的,除此之外的其它應該交給父類,子類只需要告訴父類要不要異常處理、要不要重試等等,點餐式服務
- 封裝消息消費的抽象類 注意泛型限定為標準基礎消息類,這樣能到消費者的一定有統一的標準類BaseMqMessage 下面簡單封裝示例
package com.codecoord.rocketmq.listener;
import com.codecoord.rocketmq.constant.RocketMqSysConstant;
import com.codecoord.rocketmq.domain.BaseMqMessage;
import com.codecoord.rocketmq.template.RocketMqTemplate;
import com.codecoord.rocketmq.util.JsonUtil;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import javax.annotation.Resource;
import java.time.Instant;
import java.util.Objects;
/**
* 抽象消息監聽器,封裝了所有公共處理業務,如
* 1、基礎日志記錄
* 2、異常處理
* 3、消息重試
* 4、警告通知
* 5、....
*
* @author tianxincoord@163.com
* @since 2022/4/17
*/
public abstract class BaseMqMessageListener<T extends BaseMqMessage> {
/**
* 這里的日志記錄器是哪個子類的就會被哪個子類的類進行初始化
*/
protected final Logger logger = LoggerFactory.getLogger(this.getClass());
@Resource
private RocketMqTemplate rocketMqTemplate;
/**
* 消息者名稱
*
* @return 消費者名稱
*/
protected abstract String consumerName();
/**
* 消息處理
*
* @param message 待處理消息
* @throws Exception 消費異常
*/
protected abstract void handleMessage(T message) throws Exception;
/**
* 超過重試次數消息,需要啟用isRetry
*
* @param message 待處理消息
*/
protected abstract void overMaxRetryTimesMessage(T message);
/**
* 是否過濾消息,例如某些
*
* @param message 待處理消息
* @return true: 本次消息被過濾,false:不過濾
*/
protected boolean isFilter(T message) {
return false;
}
/**
* 是否異常時重復發送
*
* @return true: 消息重試,false:不重試
*/
protected abstract boolean isRetry();
/**
* 消費異常時是否拋出異常
*
* @return true: 拋出異常,false:消費異常(如果沒有開啟重試則消息會被自動ack)
*/
protected abstract boolean isThrowException();
/**
* 最大重試次數
*
* @return 最大重試次數,默認10次
*/
protected int maxRetryTimes() {
return 10;
}
/**
* isRetry開啟時,重新入隊延遲時間
*
* @return -1:立即入隊重試
*/
protected int retryDelayLevel() {
return -1;
}
/**
* 由父類來完成基礎的日志和調配,下面的只是提供一個思路
*/
public void dispatchMessage(T message) {
MDC.put(RocketMqSysConstant.TRACE_ID, message.getTraceId());
// 基礎日志記錄被父類處理了
logger.info("[{}]消費者收到消息[{}]", consumerName(), JsonUtil.toJson(message));
if (isFilter(message)) {
logger.info("消息不滿足消費條件,已過濾");
return;
}
// 超過最大重試次數時調用子類方法處理
if (message.getRetryTimes() > maxRetryTimes()) {
overMaxRetryTimesMessage(message);
return;
}
try {
long start = Instant.now().toEpochMilli();
handleMessage(message);
long end = Instant.now().toEpochMilli();
logger.info("消息消費成功,耗時[{}ms]", (end - start));
} catch (Exception e) {
logger.error("消息消費異常", e);
// 是捕獲異常還是拋出,由子類決定
if (isThrowException()) {
throw new RuntimeException(e);
}
if (isRetry()) {
// 獲取子類RocketMQMessageListener注解拿到topic和tag
RocketMQMessageListener annotation = this.getClass().getAnnotation(RocketMQMessageListener.class);
if (Objects.nonNull(annotation)) {
message.setSource(message.getSource() + "消息重試");
message.setRetryTimes(message.getRetryTimes() + 1);
SendResult sendResult;
try {
// 如果消息發送不成功,則再次重新發送,如果發送異常則拋出由MQ再次處理(異常時不走延遲消息)
// 此處捕獲之后,相當于此條消息被消息完成然后重新發送新的消息
sendResult = rocketMqTemplate.send(annotation.topic(), annotation.selectorExpression(), message, retryDelayLevel());
} catch (Exception ex) {
throw new RuntimeException(ex);
}
// 發送失敗的處理就是不進行ACK,由RocketMQ重試
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
throw new RuntimeException("重試消息發送失敗");
}
}
}
}
}
}
4.封裝消費最終類 注意:收到的消息是先委派給父類,父類進行調度管理
package com.codecoord.rocketmq.listener;
import com.codecoord.rocketmq.constant.RocketMqBizConstant;
import com.codecoord.rocketmq.domain.RocketMqEntityMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
/**
* 實體類消費監聽器,在實現RocketMQListener中間還加了一層BaseMqMessageListener來處理基礎業務消息
*
* @author tianxincoord@163.com
* @since 2022/5/12
*/
@Slf4j
@Component
@RocketMQMessageListener(
topic = RocketMqBizConstant.SOURCE_TOPIC,
consumerGroup = RocketMqBizConstant.SOURCE_GROUP,
selectorExpression = RocketMqBizConstant.SOURCE_TAG,
// 指定消費者線程數,默認64,生產中請注意配置,避免過大或者過小
consumeThreadMax = 5
)
public class RocketEntityMessageListener extends BaseMqMessageListener<RocketMqEntityMessage>
implements RocketMQListener<RocketMqEntityMessage> {
/**
* 此處只是說明封裝的思想,更多還是要根據業務操作決定
* 內功心法有了,無論什么招式都可以發揮最大威力
*/
@Override
protected String consumerName() {
return "RocketMQ二次封裝消息消費者";
}
@Override
public void onMessage(RocketMqEntityMessage message) {
// 注意,此時這里沒有直接處理業務,而是先委派給父類做基礎操作,然后父類做完基礎操作后會調用子類的實際處理類型
super.dispatchMessage(message);
}
@Override
protected void handleMessage(RocketMqEntityMessage message) throws Exception {
// 此時這里才是最終的業務處理,代碼只需要處理資源類關閉異常,其他的可以交給父類重試
System.out.println("業務消息處理");
}
@Override
protected void overMaxRetryTimesMessage(RocketMqEntityMessage message) {
// 當超過指定重試次數消息時此處方法會被調用
// 生產中可以進行回退或其他業務操作
}
@Override
protected boolean isRetry() {
return false;
}
@Override
protected int maxRetryTimes() {
// 指定需要的重試次數,超過重試次數overMaxRetryTimesMessage會被調用
return 5;
}
@Override
protected boolean isThrowException() {
// 是否拋出異常,到消費異常時是被父類攔截處理還是直接拋出異常
return false;
}
}
5.封裝后對于子類來說,只需要告訴父類要不要做就擁有了最開始說的所有功能,簡化了使用,此時子類消費者只需要專注于自己的業務核心處理就可以了
2.4 廣播消息的應用場景
- 應用場景:多租戶或者服務有內部緩存需要刷新情況下如果需要刷新租戶信息或者緩存信息
- 也就是需要所有服務節點都需要同事做某一件事情的時候,此時可以借助廣播消息發送消息到所有節點刷新,無需一個節點一個節點的處理
- 特別說明:廣播消息默認會在家目錄下創建消費進度文件,會以www.tianxincoord.com:9876@www.tianxincoord.com:9876這種地址形式生成文件路徑,但是由于帶有:符號,windows下是不允許此符號作為文件夾名稱的,所以如果rocketMQ的鏈接地址不是連接串(不帶有端口)可以取消下面的messageModel注釋,否則啟動的時候就會提示目標卷或者路徑不存在,其實是因為這個問題
package com.codecoord.rocketmq.listener;
import com.codecoord.rocketmq.constant.RocketMqBizConstant;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
/**
* 廣播消息
* 應用場景:多租戶或者服務有內部緩存需要刷新情況下如果需要刷新租戶信息或者緩存信息
* 也就是需要所有服務節點都需要同事做某一件事情的時候
* 此時可以借助廣播消息發送消息到所有節點刷新,無需一個節點一個節點的處理
*
* 特別說明:廣播消息默認會在家目錄下創建消費進度文件,會以www.tianxincoord.com:9876@www.tianxincoord.com:9876
* 這種地址形式生成文件路徑,但是由于帶有:符號,windows下是不允許此符號作為文件夾名稱的
* 所以如果rocketMQ的鏈接地址不是連接串(不帶有端口)可以取消下面的messageModel注釋
* 否則啟動的時候就會提示目標卷或者路徑不存在,其實是因為這個問題
*
* @author tianxincoord@163.com
* @since 2022/5/12
*/
@Slf4j
@Component
@RocketMQMessageListener(
topic = RocketMqBizConstant.SOURCE_TOPIC,
consumerGroup = RocketMqBizConstant.SOURCE_BROADCASTING_GROUP,
selectorExpression = RocketMqBizConstant.SOURCE_BROADCASTING_TAG
// messageModel = MessageModel.BROADCASTING
)
public class RocketBroadcastingListener implements RocketMQListener<MessageExt> {
/**
* MessageExt:內置的消息實體,生產中根據需要自己封裝實體
*/
@Override
public void onMessage(MessageExt message) {
log.info("收到廣播消息【{}】", new String(message.getBody()));
}
}
2.3 代碼封裝完結測試
封裝測試大家可以直接參考RocketMqController即可
package com.codecoord.rocketmq.controller;
import com.alibaba.fastjson.JSONObject;
import com.codecoord.rocketmq.constant.RocketMqBizConstant;
import com.codecoord.rocketmq.domain.RocketMqEntityMessage;
import com.codecoord.rocketmq.template.OrderMessageTemplate;
import com.codecoord.rocketmq.template.RocketMqTemplate;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.springframework.web.bind.annotation.RequestMApping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.UUID;
/**
* 消息發送
*
* @author tianxin01@huice.com
* @since 2022/6/16
*/
@RestController
@RequestMapping("/rocketmq")
@Slf4j
public class RocketMqController {
/**
* 注意此處注入的是封裝的RocketMqTemplate
*/
@Resource
private RocketMqTemplate rocketMqTemplate;
/**
* 注入對應業務的模板類
*/
@Resource
private OrderMessageTemplate orderMessageTemplate;
/**
* 通過實體類發送消息,發送注意事項請參考實體類
* 說明:也可以在RocketMqTemplate按照業務封裝發送方法,這樣只需要調用方法指定基礎業務消息接口
*/
@RequestMapping("/entity/message")
public Object sendMessage() {
RocketMqEntityMessage message = new RocketMqEntityMessage();
// 設置業務key
message.setKey(UUID.randomUUID().toString());
// 設置消息來源,便于查詢we年
message.setSource("封裝測試");
// 業務消息內容
message.setMessage("當前消息發送時間為:" + LocalDateTime.now());
// Java時間字段需要單獨處理,否則會序列化失敗
message.setBirthday(LocalDate.now());
message.setTradeTime(LocalDateTime.now());
return rocketMqTemplate.send(RocketMqBizConstant.SOURCE_TOPIC, RocketMqBizConstant.SOURCE_TAG, message);
}
/**
* 此時對于調用者而且,無需創建任何類
* 如果某天需要調整消息發送來源,如果不封裝,所有原來產生message的地方全部改
* 如果封裝了,只需要改sendOrderPaid就可以切換
*/
@RequestMapping("/order/paid")
public Object sendOrderPaidMessage() {
return orderMessageTemplate.sendOrderPaid(UUID.randomUUID().toString(), "客戶下單了...,快快備貨");
}
/**
* 直接將對象進行傳輸,也可以自己進行json轉化后傳輸
*/
@RequestMapping("/messageExt/message")
public SendResult convertAndSend() {
// 生產中不推薦使用jsonObject傳遞,不看發送者無法知道傳遞的消息包含什么信息
JSONObject jsonObject = new JSONObject();
jsonObject.put("type", "messageExt");
String destination = rocketMqTemplate.buildDestination(RocketMqBizConstant.SOURCE_TOPIC, RocketMqBizConstant.SOURCE_BROADCASTING_TAG);
// 如果要走內部方法發送則必須要按照標準來,否則就使用原生的消息發送
return rocketMqTemplate.getTemplate().syncSend(destination, jsonObject);
}
}
>作者:TianXinCoord
鏈接:
https://juejin.cn/post/7110782118547947550
來源:稀土掘金