Elasticsearch的數據來自MySQL數據庫中,所以當我們的MySQL發生改變時,Elasticsearch也要跟著改變,這時候我們的es的數據就要和mysql同步了
同步實現思路
常見的數據同步方案有三種:
- 同步調用
- 異步通知
- 監聽binlog
方案一:
- hotel-demo對外提供接口,用來修改elasticsearch中的數據
- 酒店管理服務在完成數據庫操作后,直接調用hotel-demo提供的接口,
也就是說MySQL修改完去修改es的數據
- 優點:實現簡單,粗暴
- 缺點:業務耦合度高
方案二
- hotel-admin對mysql數據庫數據完成增、刪、改后,發送MQ消息
- hotel-demo監聽MQ,接收到消息后完成elasticsearch數據修改
- 優點:低耦合,實現難度一般
- 缺點:依賴mq的可靠性
- 這個實現方式也就是使用mq進行操縱,當我們修改MySQL的服務器修改完以后會將信息發送給MQ,然后修改ES的會進行監聽,當監聽到了以后就進行修改es的操作
方式三:
- 給mysql開啟binlog功能
- mysql完成增、刪、改操作都會記錄在binlog中
- hotel-demo基于canal監聽binlog變化,實時更新elasticsearch中的內容
- 也就是監聽mysql,如果MySQL的數據有變化那么就直接去改變es的數據
- 優點:完全解除服務間耦合
- 缺點:開啟binlog增加數據庫負擔、實現復雜度高
在這里使用的是第二種實現方案:使用MQ來寫
同步案例代碼
用來操控ES的代碼(負責監聽MQ隊列)
/**
* 監聽增加和修改的隊列
* 因為我們的ES中可以進行全量修改,當有這個id的數據的時候那么就先刪除再新增,沒有這個數據那么就直接新增
* 所以隊列過來的id不管是新增還是修改es都可以判斷如果有這個數據id那么就先刪除再新增,如果沒有這個數據就直接新增,所以新增和修改他倆用一個方法就行了
*
* @param id 隊列中需要進行操作的id
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = MqConstants.HOTEL_INSERT_QUEUE),
exchange = @Exchange(name = MqConstants.HOTEL_EXCHANGE, type = ExchangeTypes.DIRECT),
key = MqConstants.HOTEL_INSERT_KEY
))
public void insertAndUpdate(Long id) {
if (id == null) {
return;
}
log.info("入參:{}", id);
//監聽到以后拿到id去數據庫查詢整個數據
Hotel hotel = iHotelService.getById(id);
//因為查的mysql數據和es的數據有些不一樣所以需要做轉換
HotelDoc hotelDoc = new HotelDoc(hotel);
//轉換為json
String hotelDocJson = JSON.toJSONString(hotelDoc);
System.out.println("hotelDocJson = " + hotelDocJson);
//發送到ES中,因為我們的ES中可以進行全量修改,當有這個id的數據的時候那么就先刪除再新增,沒有這個數據那么就直接新增
//創建請求語義對象 添加文檔數據
IndexRequest request = new IndexRequest("hotel");
//這個新增就是PUT在es中
request.id(hotel.getId().toString()).source(hotelDocJson, XContentType.JSON);
//發送請求
try {
IndexResponse response = client.index(request, RequestOptions.DEFAULT);
RestStatus status = response.status();
log.info("響應結果為:{}", status);
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 監聽刪除隊列
*
* @param id 隊列中需要進行操作的id
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = MqConstants.HOTEL_DELETE_QUEUE),
exchange = @Exchange(name = MqConstants.HOTEL_EXCHANGE, type = ExchangeTypes.DIRECT),
key = MqConstants.HOTEL_DELETE_KEY
))
public void deleteByMqId(Long id) {
if (id == null) {
return;
}
log.info("入參:{}", id);
//先創建語義對象,直接就可以給里面寫id的字段
DeleteRequest request = new DeleteRequest("hotel", id.toString());
//發送請求
try {
DeleteResponse response = client.delete(request, RequestOptions.DEFAULT);
RestStatus status = response.status();
log.info("響應結果為:{}", status);
} catch (IOException e) {
e.printStackTrace();
}
用來操作MySQL的代碼:
@RestController
@RequestMApping("hotel")
public class HotelController {
//注入和RabbitMQ鏈接
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private IHotelService hotelService;
@GetMapping("/{id}")
public Hotel queryById(@PathVariable("id") Long id) {
return hotelService.getById(id);
}
@GetMapping("/list")
public PageResult hotelList(
@RequestParam(value = "page", defaultValue = "1") Integer page,
@RequestParam(value = "size", defaultValue = "1") Integer size
) {
Page<Hotel> result = hotelService.page(new Page<>(page, size));
return new PageResult(result.getTotal(), result.getRecords());
}
@PostMapping
public void saveHotel(@RequestBody Hotel hotel) {
hotelService.save(hotel);
rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE, MqConstants.HOTEL_INSERT_KEY, hotel.getId());
}
@PutMapping()
public void updateById(@RequestBody Hotel hotel) {
if (hotel.getId() == null) {
throw new InvalidParameterException("id不能為空");
}
hotelService.updateById(hotel);
rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE, MqConstants.HOTEL_INSERT_KEY, hotel.getId());
}
@DeleteMapping("/{id}")
public void deleteById(@PathVariable("id") Long id) {
hotelService.removeById(id);
rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE, MqConstants.HOTEL_DELETE_KEY, id);
}
}
當然也是可以不使用注解來寫,直接在配置文件中寫隊列綁定的交換機