Elasticsearch設計了Ingest特性,在數據寫入前做一些前置的過濾、轉換等簡單的數據處理邏輯,能支持Logstash的大多數常用場景,在5.0版本中發布。為數據的采集和處理提供了一種新的方式,可以在許多場景下不使用Logstash,簡化了系統架構。
Ingest Node
完成預處理的有個獨立的Node角色 Ingest Node,默認集群每個節點都具有ingest的作用,也可以獨立節點。其開啟方式為:在 elasticsearch.yml 中定義(默認開啟):
node.ingest:false
Ingest Node作為ES索引文檔的一部分,只處理單個數據,局限在上下文中,不能與外部打交道。
Ingest 節點接收到數據之后,根據請求參數中指定的管道流 id,找到對應的已注冊管道流,對數據進行處理,然后將處理過后的數據,然后將文檔傳回 index(索引)或者 bulk APIs。
Ingest Pipeline
確定ingest節點后,要定義pipeline,pipeline中指定具體的邏輯。
PUT _ingest/pipeline/log_pipeline
{
"description":"log pipeline",
"processors":[
{
"grok":{
"field":"message",
"patterns":[
"%{TIMESTAMP_ISO8601:timestamp} \[%{DATA:project}\] \[%{DATA:thread}\] %{GREEDYDATA:message}"
]
}
},
{
"date": {
"field": "timestamp",
"formats": [
"yyyy-MM-dd HH:mm:ss,SSS"
],
"timezone": "Asia/Shanghai"
}
}
]
}
Ingest API共分為4種操作,分別對應:
- PUT(新增)。
- GET(獲取)。
- DELETE(刪除)。
- Simulate (仿真模擬)。
Ingest Pipeline的使用:
- Beats中指定Pipeline參數,在output.elasticsearch參數加,pipeline: my-pipeline 。
- 在index API中使用,POST my-index/_doc?pipeline=my-pipeline { ... } 。
- reindex時使用,POST _reindex{"source": {...},"dest": {...,"pipeline": "my-pipeline"} } 。
- Index settings參數中指定,index.default_pipeline 與 index.final_pipeline。
Processor
Ingest Pipeline的Processor,相當于 Logstash 的 filter 插件。事實上其主要處理器來自 Logstash 的 filter 代碼。目前最重要的幾個處理器分別是:
- convert,類型轉換等。
- grok,將輸入字段提取到多個字段。
- gsub,替換輸入字段的內容,可選輸出到另外字段。
- date,時間格式化等。
還有其他很多處理器,Append,rename,remove,attachement,geoip 和 user-agent,有的還需要單獨安裝 。
注:
- grok規則參考:github.com/logstash-plugins/logstash-patterns-core/blob/master/patterns/grok-patterns
- 測試grok:使用ES插件:POST _ ingest/pipeline/_simulate