在日常數據存儲和查詢時,很多小伙伴都喜歡用ES做索引,很多還把ES當成數據庫來用。誠然ES的讀寫性能非常優秀,但是大家有沒有遇到過ES丟數據的問題?也就是說數據庫和ES的數據不一致。今天老李正好看在公眾號銘毅天下Elasticsearch上看到一篇介紹這個問題的文章,里面的內容寫的非常的清楚,把對數據的方法和思路全都理了出來。下面把文章分享給大家,希望能夠使大家在日常工作中少踩一點坑。當然了,能用來填坑就更好了。
1、實戰線上問題
- Q1:Logstash 同步 postgreSQL 到 Elasticsearch 數據不一致。
在使用 Logstash 從 pg 庫中將一張表導入到 ES 中時,發現 ES 中的數據量和 PG 庫中的這張表的數據量存在較大差距。如何快速比對哪些數據沒有插入?導入過程中,Logstash 日志沒有異常。PG 中這張表有 7600W。
- Q2:mq 異步雙寫數據庫、es 的方案中,如何保證數據庫數據和 es 數據的一致性?
2、推薦解決方案之一——ID 比較法
如下示例,僅拿問題1舉例驗證,問題2原理一致。
2.1 方案探討
要找出哪些數據沒有插入到 Elasticsearch 中,可以采用以下方法:
- 確保 Logstash 配置文件中的 input 插件的 JDBC 驅動程序正確配置,以便從 PostgreSQL 數據庫中提取所有數據。注意 statement 參數,確保它選擇了所有需要的數據。
- 檢查 Logstash 配置文件的 output 插件,確保正確配置了 Elasticsearch 的連接參數。同時,檢查是否有過濾器在導入過程中過濾掉了部分數據。
- 在 Logstash 配置文件中添加一個 stdout 插件,將從 PostgreSQL 數據庫中讀取的數據記錄到文件中。
例如,可以添加以下內容:
output {
elasticsearch {
...Elasticsearch 配置...
}
stdout {
codec => json_lines
path => "/path/to/logstash_output.log"
}
}
將 Logstash 輸出文件與 PostgreSQL 數據庫中的原始數據進行比較,以找出未導入的數據。可以使用 Python/ target=_blank class=infotextkey>Python、Shell 腳本或其他編程語言編寫一個簡單的腳本來執行此操作。
如果 Logstash 輸出文件中的記錄數與 PostgreSQL 數據庫中的記錄數一致,但 Elasticsearch 中的記錄數不一致,請檢查 Elasticsearch 集群的健康狀況和日志。確認集群是否在接收和索引數據時遇到問題。
如果問題仍然存在,嘗試將批量操作的大小減小,以減輕 Elasticsearch 和 Logstash 的負擔。可以通過在 Logstash 配置文件的 output 插件中設置 flush_size 和 idle_flush_time 參數來實現。
處理大量數據時,可能需要調整 Logstash 和 Elasticsearch 的性能和資源配置。根據硬件和網絡條件,可能需要優化批量操作、JVM 設置、線程池大小等方面的設置。
2.2 比較腳本的實現
以下是一個簡單的 Shell 腳本示例,用于比較 Logstash 輸出文件(JSON 格式)和 PostgreSQL 數據庫中的數據。該腳本將比較特定字段(如 id)以確定哪些數據可能未導入到 Elasticsearch。
首先,從 PostgreSQL 數據庫中導出數據,將其保存為 CSV 文件:
COPY (SELECT id FROM your_table) TO '/path/to/postgres_data.csv' WITH
接下來,創建一個名為 compare.sh 的 Shell 腳本:
#!/bin/bash
# 將 JSON 文件中的 ID 提取到一個文件中
jq '.id' /path/to/logstash_output.log > logstash_ids.txt
# 刪除 JSON 中的雙引號
sed -i 's/"//g' logstash_ids.txt
# 對 Logstash 和 PostgreSQL 的 ID 文件進行排序
sort -n logstash_ids.txt > logstash_ids_sorted.txt
sort -n /path/to/postgres_data.csv > postgres_ids_sorted.txt
# 使用 comm 比較兩個已排序的 ID 文件
comm -23 postgres_ids_sorted.txt logstash_ids_sorted.txt > missing_ids.txt
# 輸出結果
echo "以下 ID 在 Logstash 輸出文件中未找到:"
cat missing_ids.txt
為腳本添加可執行權限并運行:
chmod +x compare.sh
./compare.sh
此腳本會比較 logstash_output.log 和 postgres_data.csv 文件中的 ID。如果發現缺失的 ID,它們將被保存在 missing_ids.txt 文件中,并輸出到控制臺。請注意,該腳本假設已經安裝了 jq(一個命令行 JSON 處理器)。如果沒有,請先安裝 jq。
3、推薦方案二——redis 加速對比
在這種情況下,可以使用 Redis 的集合數據類型來存儲 PostgreSQL 數據庫和 Logstash 輸出文件中的 ID。接下來,可以使用 Redis 提供的集合操作來找到缺失的 ID。
以下是一個使用 Redis 實現加速比對的示例:
首先,從 PostgreSQL 數據庫中導出數據,將其保存為 CSV 文件:
COPY (SELECT id FROM your_table) TO '/path/to/postgres_data.csv' WITH CSV HEADER;
安裝并啟動 Redis。
使用 Python 腳本將 ID 數據加載到 Redis:
import redis
import csv
# 連接到 Redis
r = redis.StrictRedis(host='localhost', port=6379, db=0)
# 從 PostgreSQL 導出的 CSV 文件中加載數據
with open('/path/to/postgres_data.csv', newline='') as csvfile:
csv_reader = csv.reader(csvfile)
next(csv_reader) # 跳過表頭
for row in csv_reader:
r.sadd('postgres_ids', row[0])
# 從 Logstash 輸出文件中加載數據
with open('/path/to/logstash_output.log', newline='') as logstash_file:
for line in logstash_file:
id = line.split('"id":')[1].split(',')[0].strip()
r.sadd('logstash_ids', id)
# 計算差集
missing_ids = r.sdiff('postgres_ids', 'logstash_ids')
# 輸出缺失的 ID
print("以下 ID 在 Logstash 輸出文件中未找到:")
for missing_id in missing_ids:
print(missing_id)
這個 Python 腳本使用 Redis 集合數據類型存儲 ID,然后計算它們之間的差集以找到缺失的 ID。需要先安裝 Python 的 Redis 庫。可以使用以下命令安裝:
pip install redis
這個腳本是一個基本示例,可以根據需要修改和擴展它。使用 Redis 的優點是它能在內存中快速處理大量數據,而不需要在磁盤上讀取和寫入臨時文件。
4、小結
方案一:使用 Shell 腳本和 grep 命令
- 優點:
(1)簡單,易于實現。
(2)不需要額外的庫或工具。
- 缺點:
(1)速度較慢,因為它需要在磁盤上讀寫臨時文件。
(2)對于大數據量的情況,可能會導致較高的磁盤 I/O 和內存消耗。
方案二:使用 Redis 實現加速比對
- 優點:
(1)速度更快,因為 Redis 是基于內存的數據結構存儲。
(2)可擴展性較好,可以處理大量數據。
- 缺點:
(1)實現相對復雜,需要編寫額外的腳本。
(2)需要安裝和運行 Redis 服務器。
根據需求和數據量,可以選擇合適的方案。如果處理的數據量較小,且對速度要求不高,可以選擇方案一,使用 Shell 腳本和 grep 命令。這種方法簡單易用,但可能在大數據量下表現不佳。
如果需要處理大量數據,建議選擇方案二,使用 Redis 實現加速比對。這種方法速度更快,能夠有效地處理大數據量。然而,這種方法需要額外的設置和配置,例如安裝 Redis 服務器和編寫 Python 腳本。
在實際應用中,可能需要根據具體需求進行權衡,以選擇最適合的解決方案。