作者介紹
小火牛,項目管理高級工程師,具有多年大數據平臺運維管理及開發優化經驗。管理過多個上千節點集群,擅長對外多租戶平臺的維護開發。信科院大數據性能測試、功能測試主力,大廠PK獲得雙項第一。
前言
某業務導致NameNode RPC通信頻繁,后來觀察監控發現,是由于該業務獲取HDFS列表文件的頻率過于頻繁。檢查代碼后,優化由20s獲取一次目錄列表改為5分鐘獲取一次,獲取列表的RPC操作次數下降了約1.5倍,平均每秒減少了2~3w次的RPC操作。
還有很多業務場景,通過分析觀察RPC畫像,都發現了其不合理性,這里就不一一列舉了。本文主要記錄如何通過ELK快速分析NameNode RPC操作并對接Grafana展示。
通過ELK快速分析NameNode RPC操作
ELK是當前比較主流的分布式日志收集處理工具。這里采用Filebeat→Kafka集群→Logstash→ES→Kibana。
采用原因:
1)Filebeat是基于原先logstash-forwarder的源碼改造出來的。換句話說:Filebeat就是新版的logstash-forwarder,也會是Elastic Stack在shipper端的第一選擇。
2)小貼士:雖然LogStash::Inputs::TCP用Ruby的Socket和OpenSSL庫實現了高級的SSL功能,但Logstash本身只能在SizedQueue中緩存20個事件。這就是我們建議在生產環境中換用其他消息隊列的原因。
而redis服務器是Logstash官方推薦的Broker選擇,Broker角色也就意味著會同時存在輸入和輸出兩個插件。
Kafka是一個高吞吐量的分布式發布訂閱日志服務,具有高可用、高性能、分布式、高擴展、持久性等特性。目前已經在各大公司中廣泛使用。和之前采用Redis做輕量級消息隊列不同,Kafka利用磁盤作隊列,所以也就無所謂消息緩沖時的磁盤問題。此外,如果公司內部已有Kafka服務在運行,Logstash也可以快速接入,免去重復建設的麻煩。
3)目前Logstash1.5版本已自帶支持Kafka插件,所以只需要學會如何書寫Logstash規則,并且Kafka消費使用high-level消費。
4)Filebeat部署在應用服務器上(只負責Logstash的讀取和轉發,降低CPU負載消耗,確保不會搶占應用資源),Logstash、ES、Kibana在一臺服務器上(此處的Logstash負責日志的過濾,會消耗一定的CPU負載,可以考慮如何優化過濾的語法步驟來達到降低負載)。
具體搭建步驟:Filebeat安裝使用(思考后決定Filebeat使用Zip安裝或者tar.gz方便修改配置打包分發。)→Logstash插件配置。
以下是架構圖:
1、Filebeat采集hdfs-audit.log日志傳輸給Kafka或者Logstash
[hadoop@lf319-m3-002 filebeat]$ vi dynamically.config/audit-logstash.yml
filebeat.prospectors:
- input_type: log
paths:
- "/var/log/hadoop-hdfs/hdfs-audit.log"
harvester_buffer_size: 32768
scan_frequency: 1s
backoff: 10ms
#backoff <= max_backoff <= scan_frequency
processors:
- drop_fields:
fields: ["beat", "beat.name", "beat.hostname","beat.version","input_type","offset","@timestamp","type","source"]
output.logstash:
hosts: ["logstash-host:5044"," logstash-host:5045"]
loadbalance: true
worker: 4
bulk_max_size: 4096
#output.console:
# pretty: true
xpack.monitoring:
enabled: true
elasticsearch:
hosts: ["https://es-host1:9200", "https:// es-host2:9200"]
username: beats_system
password: beat@123
2、Logstash進一步分解日志,格式化日志數據
這里需要我們先查看下日志的格式,然后選擇方便的日志格式化方式來解析日志。
日志格式案例:
2019-08-25 13:11:58,630 INFO FSNamesystem.audit: allowed=trueugi=lf_zh_pro (auth:SIMPLE)ip=/dn-ipcmd=getfileinfosrc=/user/lf_zh_pro/test/CommonFilter/sync/biz_id=B43/day_id=20190825/prov_id=089/part-00019-1566675749932.gzdst=perm=proto=rpc
2019-08-25 13:11:58,630 INFO FSNamesystem.audit: allowed=trueugi=lf_xl_bp (auth:SIMPLE)ip=/dn-ipcmd=createsrc=/user/lf_xl_bp/lf_xl_src.db/src_d_trip_all/date_id=20190825/hour_id=13/minute_id=00/.hive-staging_hive_2019-08-25_13-10-18_301_9180087219965934496-1/_task_tmp.-ext-10002/prov_id=031/_tmp.000238_0dst=perm=lf_xl_bp:lf_xl_bp:rw-rw-r--proto=rpc
2019-08-25 13:11:58,630 INFO FSNamesystem.audit: allowed=trueugi=ubd_obx_test (auth:SIMPLE)ip=/ dn-ipcmd=rename
通過觀察可以發現上面的每條日志格式都是一致的,都由時間戳、日志級別、是否開啟審計、用戶、來源IP、命令類型這幾個字段組成。那么相較于grok來說dissect更加簡明。
Dissect的使用規則:https://www.elastic.co/guide/en/logstash/current/plugins-filters-dissect.html
Logstash配置如下:
input {
beats {
port => "5045"
}
}
filter {
if "/user/if_ia_pro/output/test" in [message] {
dissect {
mApping => { "message" => "%{logd} %{drop} %{level} %{log-type}: %{?allowed}=%{&allowed}%{?ugi}=%{&ugi} (%{?authtype})%{?ip}=/%{&ip}%{?cmd}=%{&cmd}%{}=/user/if_ia_pro/output/test/%{src2}/%{src3}/%{}%{?dst}=%{&dst}%{?perm}=%{&perm}%{?proto}=%{&proto}" }
add_field => {
"srctable" => "/user/if_ia_pro/output/test/%{src2}/%{src3}"
"logdate" => "%{logd} %{drop}"
}
remove_field => ['message','src2','src3','logd','drop']
}
}
else if "/user/lf_zh_pro/lf_safedata_pro/output/" in [message] {
dissect {
mapping => { "message" => "%{logd} %{drop} %{level} %{log-type}: %{?allowed}=%{&allowed}%{?ugi}=%{&ugi} (%{?authtype})%{?ip}=/%{&ip}%{?cmd}=%{&cmd}%{}=/user/lf_zh_pro/lf_safedata_pro/output/%{src2}/%{}%{?dst}=%{&dst}%{?perm}=%{&perm}%{?proto}=%{&proto}" }
add_field => {
"srctable" => "/user/lf_zh_pro/lf_safedata_pro/output/%{src2}"
"logdate" => "%{logd} %{drop}"
}
remove_field => ['message','src2','drop']
}
}
else if "/files/" in [message] {
dissect {
mapping => { "message" => "%{logd} %{drop} %{level} %{log-type}: %{?allowed}=%{&allowed}%{?ugi}=%{&ugi} (%{?authtype})%{?ip}=/%{&ip}%{?cmd}=%{&cmd}%{}=/files/%{src2}/%{}%{?dst}=%{&dst}%{?perm}=%{&perm}%{?proto}=%{&proto}" }
add_field => {
"srctable" => "/files/%{src2}"
"logdate" => "%{logd} %{drop}"
}
remove_field => ['message','src2','drop']
}
}
else {
dissect {
mapping => { "message" => "%{logd} %{drop} %{level} %{log-type}: %{?allowed}=%{&allowed}%{?ugi}=%{&ugi} (%{?authtype})%{?ip}=/%{&ip}%{?cmd}=%{&cmd}%{}=/%{src}/%{src1}/%{src2}/%{src3}/%{}%{?dst}=%{&dst}%{?perm}=%{&perm}%{?proto}=%{&proto}" }
add_field => {
"srctable" => "/%{src}/%{src1}/%{src2}/%{src3}"
"logdate" => "%{logd} %{drop}"
}
remove_field => ['message','src','src1','src2','src3','logd','drop']
}
}
date {
match => [ "logdate","ISO8601" ]
target => "@times"
remove_field => ['logdate']
}
}
output {
elasticsearch {
hosts => ["es-host:9200"]
index => "logstash-hdfs-auit-%{+YYYY.MM.dd}"
user => "elastic"
password => "password"
}
stdout { }
}
3、ES上觀察數據
Filebeat和Logstash配置好采集分析hdfs-audit.log之后啟動進程,到ES上觀察會發現創建有logstash-hdfs-auit- YYYY.MM.dd的index。
具體查看數據,可以看到已經具備多個需要使用到的字段。
Grafana配置NameNode RPC操作
最后一步就需要在Grafana上配置連接ES數據庫。
然后創建Dashboard依次配置以下幾種查詢展示:
1)集群整體RPC每分鐘連接次數
2)HDFS路徑All下All類型每分鐘操作計數
3)All類型操作計數最多的hdfs路徑
4)路徑All下操作計數排行前五的類型 和All操作類型下操作計數前五的路徑
總結
那么現在對于企業來說,不管是在物理機上還是云上,玩自己的大數據平臺跑生產任務,就不可避免會有不夠合理不夠優化的任務,比如最簡單的集群對拷任務出現異常中斷時,我們通常會掛定時任務并對hadoop distcp添加-update參數,進行對比更新覆蓋,這時當定時吊起的過于頻繁,就會發現當對拷目錄下文件數越來越多,NameNode對該目錄的listStatus類型的RPC連接會激增,這時我們就需要優化對拷任務。
RPC的監控只是監控大數據平臺的一個指標,這里通過這篇文章,帶大家了解下如何快速地采集分析平臺日志,并進行展示監控。