我們使用JwtAccessTokenConverter來生成Jwt Token,JwtAccessTokenConverter默認使用的Signer是macSigner,而MacSigner默認的算法是HMACSHA256。下面這個地方配置的Signkey一定要與上面配置EMQX的JWT認證時的Secret一致,這樣就可以讓EMQX驗證Token的真假。
背景
RuoYi是一款基于Spring Boot、Spring Security和MyBatis的快速開發(fā)框架,它目前在中后臺管理系統(tǒng)開發(fā)領(lǐng)域擁有大量的使用者。但是它當(dāng)前缺少對websocket協(xié)議雙工通信信道的支持,即長鏈接管理這塊的功能比較弱。導(dǎo)致像實時通知(后臺服務(wù)與前端的消息推送,異步通知),實時數(shù)據(jù)可視化(后端獲取數(shù)據(jù)并推送到前端)等功能不好實現(xiàn)。本文介紹如果結(jié)合第三方消息代理服務(wù)器EMQX實現(xiàn)上述功能。
準(zhǔn)備
Docker安裝EMQX
docker pull emqx/emqx:5.0.21
docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8084:8084 -p 8883:8883 -p 18083:18083 emqx:5.0.21
EMQX訪問配置
登錄EMQX后臺,點擊菜單Access Control ->Authentication。
新增配置,選擇JWT,因為RuoYi的框架使用的是JWT,這個地方就是讓EMQX的認證方式和RuoYi的一致,這樣RuoYi返回的token,我們可以直接通過它來調(diào)用EMQX的restful接口,實現(xiàn)發(fā)送消息。
新增配置認證
登錄EMQX管理后臺,配置認證
其中JWT from這個地方我們選擇username,然后算法根據(jù)你的RuoYi框架實現(xiàn)的token的算法,這個地方我們使用的是hmac.其他的實踐可以查看這https://www.emqx.io/docs/en/v5.0/access-control/authn/jwt.html#configure-with-dashboard。
配置RuoYi的token生成
我們使用JwtAccessTokenConverter來生成jwt token,JwtAccessTokenConverter默認使用的signer是MacSigner,而MacSigner默認的算法是HMACSHA256。下面這個地方配置的signkey一定要與上面配置EMQX的JWT認證時的Secret一致,這樣就可以讓EMQX驗證token的真假。
@Bean
public JwtAccessTokenConverter jwtAccessTokenConverter() {
JwtAccessTokenConverter converter = new JwtAccessTokenConverter();
converter.setSigningKey("mytokenkey");// token key
converter.setAccessTokenConverter(new NewTokenConverter());
return converter;
}
JwtAccessTokenConverter的signer
我們想擴展ken的額外信息,便可以繼承AccessTokenConverter,寫一個新的。在這個轉(zhuǎn)化器中,我們可以加入額外的屬性identity,這樣token里面就包含了這些有用的信息。
package com.ruoyi.framework.auth.config.tokenconvert;
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//
import org.springframework.security.core.Authentication;
import org.springframework.security.core.GrantedAuthority;
import org.springframework.security.core.authority.AuthorityUtils;
import org.springframework.security.oauth2.common.DefaultOAuth2AccessToken;
import org.springframework.security.oauth2.common.OAuth2AccessToken;
import org.springframework.security.oauth2.provider.OAuth2Authentication;
import org.springframework.security.oauth2.provider.OAuth2Request;
import org.springframework.security.oauth2.provider.token.AccessTokenConverter;
import org.springframework.security.oauth2.provider.token.DefaultUserAuthenticationConverter;
import org.springframework.security.oauth2.provider.token.UserAuthenticationConverter;
import JAVA.util.*;
public class NewTokenConverter implements AccessTokenConverter {
private UserAuthenticationConverter userTokenConverter = new DefaultUserAuthenticationConverter();
private boolean includeGrantType;
private String scopeAttribute = "scope";
private String clientIdAttribute = "client_id";
public NewTokenConverter() {
}
public void setUserTokenConverter(UserAuthenticationConverter userTokenConverter) {
this.userTokenConverter = userTokenConverter;
}
public void setIncludeGrantType(boolean includeGrantType) {
this.includeGrantType = includeGrantType;
}
public void setScopeAttribute(String scopeAttribute) {
this.scopeAttribute = scopeAttribute;
}
public void setClientIdAttribute(String clientIdAttribute) {
this.clientIdAttribute = clientIdAttribute;
}
public Map<String, ?> convertAccessToken(OAuth2AccessToken token, OAuth2Authentication authentication) {
Map<String, Object> response = new HashMap();
OAuth2Request clientToken = authentication.getOAuth2Request();
if (!authentication.isClientOnly()) {
response.putAll(this.userTokenConverter.convertUserAuthentication(authentication.getUserAuthentication()));
} else if (clientToken.getAuthorities() != null && !clientToken.getAuthorities().isEmpty()) {
response.put("authorities", AuthorityUtils.authorityListToSet(clientToken.getAuthorities()));
}
if (token.getScope() != null) {
response.put(this.scopeAttribute, token.getScope());
}
if (token.getAdditionalInformation().contAInsKey("jti")) {
response.put("jti", token.getAdditionalInformation().get("jti"));
}
if (token.getExpiration() != null) {
response.put("exp", token.getExpiration().getTime() / 1000L);
}
if (authentication.getName()!=null) {
response.put("identity", authentication.getName());
}
if (this.includeGrantType && authentication.getOAuth2Request().getGrantType() != null) {
response.put("grant_type", authentication.getOAuth2Request().getGrantType());
}
response.putAll(token.getAdditionalInformation());
response.put(this.clientIdAttribute, clientToken.getClientId());
if (clientToken.getResourceIds() != null && !clientToken.getResourceIds().isEmpty()) {
response.put("aud", clientToken.getResourceIds());
}
return response;
}
public OAuth2AccessToken extractAccessToken(String value, Map<String, ?> map) {
DefaultOAuth2AccessToken token = new DefaultOAuth2AccessToken(value);
Map<String, Object> info = new HashMap(map);
info.remove("exp");
info.remove("aud");
info.remove(this.clientIdAttribute);
info.remove(this.scopeAttribute);
if (map.containsKey("exp")) {
token.setExpiration(new Date((Long)map.get("exp") * 1000L));
}
if (map.containsKey("jti")) {
info.put("jti", map.get("jti"));
}
token.setScope(this.extractScope(map));
token.setAdditionalInformation(info);
return token;
}
public OAuth2Authentication extractAuthentication(Map<String, ?> map) {
Map<String, String> parameters = new HashMap();
Set<String> scope = this.extractScope(map);
Authentication user = this.userTokenConverter.extractAuthentication(map);
String clientId = (String)map.get(this.clientIdAttribute);
parameters.put(this.clientIdAttribute, clientId);
if (this.includeGrantType && map.containsKey("grant_type")) {
parameters.put("grant_type", (String)map.get("grant_type"));
}
Set<String> resourceIds = new LinkedHashSet((Collection)(map.containsKey("aud") ? this.getAudience(map) : Collections.emptySet()));
Collection<? extends GrantedAuthority> authorities = null;
if (user == null && map.containsKey("authorities")) {
String[] roles = (String[])((Collection)map.get("authorities")).toArray(new String[0]);
authorities = AuthorityUtils.createAuthorityList(roles);
}
OAuth2Request request = new OAuth2Request(parameters, clientId, authorities, true, scope, resourceIds, (String)null, (Set)null, (Map)null);
return new OAuth2Authentication(request, user);
}
private Collection<String> getAudience(Map<String, ?> map) {
Object auds = map.get("aud");
if (auds instanceof Collection) {
Collection<String> result = (Collection)auds;
return result;
} else {
return Collections.singleton((String)auds);
}
}
private Set<String> extractScope(Map<String, ?> map) {
Set<String> scope = Collections.emptySet();
if (map.containsKey(this.scopeAttribute)) {
Object scopeObj = map.get(this.scopeAttribute);
if (String.class.isInstance(scopeObj)) {
scope = new LinkedHashSet(Arrays.asList(((String)String.class.cast(scopeObj)).split(" ")));
} else if (Collection.class.isAssignableFrom(scopeObj.getClass())) {
Collection<String> scopeColl = (Collection)scopeObj;
scope = new LinkedHashSet(scopeColl);
}
}
return (Set)scope;
}
}
客戶端連接
npm install --save mqtt
import mqtt from 'mqtt'
createWebSocket() {
const clientId = 'mqttjs_' + Math.random().toString(16).substr(2, 8)
const host = process.env.VUE_App_WEB_SOCKET
const options = {
keepalive: 60,
clientId: clientId,
username:getToken(),
protocolId: 'MQTT',
protocolVersion: 4,
clean: true,
reconnectPeriod: 1000,
connectTimeout: 30 * 1000,
will: {
topic: 'WillMsg',
payload: 'Connection Closed abnormally..!',
qos: 0,
retain: false
},
}
console.log('Connecting mqtt client')
this.ws = mqtt.connect(host, options)
this.ws.on('error', (err) => {
console.log('Connection error: ', err)
this.ws.end()
})
this.ws.on('reconnect', () => {
console.log('Reconnecting...')
})
this.ws.on('connect', () => {
console.log('Client connected:' + clientId)
// Subscribe
this.ws.subscribe(store.getters.topic, { qos: 0 })
})
// Received
this.ws.on('message', (topic, message, packet) => {
console.log('Received Message: ' + message.toString() + 'nOn topic: ' + topic)
})
},
測試
我們測試收到了消息提示,同時打開console,看到我們連接成功了。
收到消息
連接log
參考鏈接
https://www.emqx.com/en/blog/connect-to-mqtt-broker-with-websocket。