作者:LX
一、 環(huán)境搭建
1、 啟動Spark集群服務
1)啟動Spark集群
2)子節(jié)點加入集群
3)查看是否加入成功
2、啟動Elasticsearch數(shù)據(jù)庫
1)可以啟動自己安裝的Elasticsearch數(shù)據(jù)庫
2)也可以啟動iServerDataStore,默認會啟動一個Elasticsearch數(shù)據(jù)庫
二、 創(chuàng)建流處理模型
流模型的創(chuàng)建有兩種方式:
(一) 流處理模型編輯器
1、使用瀏覽器訪問http://localhost:8090/iserver/manager,登陸SuperMapiServer管理頁面,點擊【服務】-> 【概述】 ->【配置流數(shù)據(jù)服務】。
2、添加接收器
1)接收器介紹
接收器類型(9種接收器):
-
-
SocketReceiver:Socket客戶端接收器,接收Socket消息的節(jié)點。
-
MultiSocketReceiver:Socket多客戶端接收器,同時接收多個Socket消息的節(jié)點,接收的消息內容必須是相同的。
-
SocketServerReceiver:Socket服務接收器,Socket服務端接收節(jié)點,用于作為服務端接收其他Socket客戶的發(fā)送的消息。
-
WebSocketReceiver:WebSocket接收器,接收WebSocket消息的節(jié)點。
-
TextFileReceiver:文本文件接收器,監(jiān)控指定目錄,讀取新增文件的內容。
-
SingleTextFileReceiver:單文本文件接收器,根據(jù)設置讀取監(jiān)控文件的內容,支持讀取Json、GeoJSON和CSV格式的文件。
-
KafkaReceiver:Kafka接收器,接收kafka消息的節(jié)點。
-
HttpReceiver:Http接收器,接收HTTP 的消息節(jié)點,目前只支持HTTP的Get方法。
-
JMSReceiver:JMS接收器,接收JMS標準協(xié)議消息的節(jié)點,用于接收ActiveMQ、RabbitMQ等消息中間件的消息
2)添加接收器
本文以讀取iServer示范數(shù)據(jù)flights.csv文件為例,所以需要將“接收器”中“單文本接收器”,用鼠標拖到“節(jié)點編輯器”中。如下圖所示:
3)編輯節(jié)點信息
PS:
i.文件路徑必須寫絕對路徑,不能寫相對路徑。
ii.metadata ,是接收消息的元數(shù)據(jù),用于描述消息的格式定義。需指定以下信息:
title:元數(shù)據(jù)的名稱,用于區(qū)分其他元數(shù)據(jù)。String類型
featureType:FeatureType類。接收消息轉換的地理要素類型,如點POINT、線LINE、面REGION等。
epsg:int類型。元數(shù)據(jù)地理要素的投影EPSG編碼。
fieldInfos:接收消息轉換后的字段信息。需指定:
name:String類型。字段名稱,為字段的唯一標識
source:String類型。字段在原始信息中的位置,決定了從原始信息中的什么位置去解析成為本字段的值。成為本字段的值。如果原始信息為CSV 格式,source值為 CSV 中的字段序號,如"source": "4" 代表了CSV 數(shù)據(jù)中的第5 個字段;如果原始數(shù)據(jù)為json 格式的,那么source 值為 json中鍵值對的鍵。
nType:FieldType類型。字段的類型,如:字符型TEXT、雙精度浮點型DOUBLE、整型 INT等
本文元數(shù)據(jù)的配置如下:
根據(jù)CSV中字段內容:
T0000,121.465069,29.824944,1.49370399857E12,Lishe,Jiangbei,2017-09-1302:53:47
從“FieldInfo-0”到“FieldInfo-6”依次填寫以下內容:
將鼠標放到“元數(shù)據(jù)”的“StreamingMetadata”標簽上,可以看到上一步的詳細配置信息,確認信息無誤后,點擊“檢查并返回”按鈕。
-
-
添加發(fā)送器
1)添加發(fā)送器
將“接收器”中的“Elasticsearch添加發(fā)送器”用鼠標拖到“節(jié)點編輯器”中,如下圖所示:
2)編輯發(fā)送器節(jié)點信息
鼠標單擊“節(jié)點編輯器”中的“Elasticsearch添加發(fā)送器”,添加如下信息:
4、連接發(fā)送器和接收器
拖拽“節(jié)點編輯器”中的“單文本文件接收器”右側的綠色方塊,將拖出的箭頭指向“Elasticsearch添加發(fā)送器”(如下圖),命名為“ESstreaming”,點擊“發(fā)布”即可發(fā)布流處理模型。
(二)手動編寫streaming文件
手動編寫streaming文件相關的參數(shù)配置按照iServer幫助文檔的“流處理模型配置參數(shù)說明”來進行編寫,如下圖:http://support.supermap.com.cn/DataWarehouse/WebDocHelp/iServer/index.htm
編寫好的streaming文件如下:
{
"sparkParameter": {
"checkPointDir": "tmp",
"interval": 5000
},
"stream": {
"nodeDic": {
"CSVFileReader": {
"metadata": {
"epsg": 4326,
"fieldInfos": [
{
"name": "id",
"source": "0",
"nType": "TEXT"
},
{
"name": "x",
"source": "1",
"nType": "DOUBLE"
},
{
"name": "y",
"source": "2",
"nType": "DOUBLE"
},
{
"name": "z",
"source": "3",
"nType": "DOUBLE"
},
{
"name": "fromStation",
"source": "4",
"nType": "TEXT"
},
{
"name": "toStation",
"source": "5",
"nType": "TEXT"
},
{
"name": "datetime",
"source": "6",
"nType": "TEXT"
}
],
"dateTimeFormat": "yyyy-MM-ddHH:mm:ss",
"timeFieldName": "datetime",
"featureType": "POINT",
"title": "",
"idFieldName": "id"
},
"readInterva": 5000,
"rowsOneTime": 50,
"nextNodes": [
"EsAppendSender"
],
"reader": {
"className":"com.supermap.bdt.streaming.formatter.CSVFormatter",
"separator": ","
},
"filePath":"E:/supermap/iserver/912/supermap-iserver-9.1.2-win64-zip/samples/streamingmodels/readcsv/flights.csv",
"name": "CSVFileReader",
"prevNodes": [],
"className":"com.supermap.bdt.streaming.receiver.SingleTextFileReceiver",
"caption": "單文本文件接收器",
"description": null
},
"EsAppendSender": {
"className":"com.supermap.bdt.streaming.sender.EsAppendSender",
"caption": "Elasticsearch添加發(fā)送器",
"name": "EsAppendSender",
"nextNodes": [],
"prevNodes": [
"CSVFileReader"
],
"description": null,
"formatter": {
"className":"com.supermap.bdt.streaming.formatter.GeoJsonFormatter"
},
"url": "192.168.15.200:9200",
"queueName": "streamingdata",
"directoryPath": "streaming"
}
}
},
"version": 9000
}
三、發(fā)布流處理模型
對于用模型編輯器編輯的直接點擊模型編輯器左上角的【發(fā)布】即可。
對于自己編寫的streaming文件,在服務管理“首頁”點擊快速發(fā)布一個或一組服務,選擇數(shù)據(jù)來源為"流處理模型",點擊“下一步”;
指定服務名;選擇配置信息來源,iServer提供了“配置信息”和“配置文件”兩種方式。
配置信息:輸入流處理模型中的全部內容
配置文件:輸入后綴為.streaming 的流處理模型文件
是否啟用Token,輸入用于驗證用戶身份的Token(令牌)
點擊“完成”按鈕完成發(fā)布流程:
四、查看存儲到ES的數(shù)據(jù)
發(fā)布流模型之后,ES數(shù)據(jù)庫中能看到,創(chuàng)建索引成功
訪問
http://localhost:9200/streamingdata/_search能看到,存儲到ES中的數(shù)據(jù)