本文介紹了為什么當(dāng)我發(fā)送兩個輸入流時,Spark Streaming停止工作?的處理方法,對大家解決問題具有一定的參考價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)吧!
問題描述
我正在開發(fā)一個Spark流應(yīng)用程序,其中我需要使用來自兩個服務(wù)器的輸入流,每個服務(wù)器每秒向Spark上下文發(fā)送一條JSON消息。
我的問題是,如果我只在一個流上執(zhí)行操作,一切都運行得很好。但如果我有來自不同服務(wù)器的兩個流,那么Spark在可以打印任何東西之前凍結(jié),并且只有在兩個服務(wù)器都發(fā)送了它們必須發(fā)送的所有JSON消息時(當(dāng)它檢測到socketTextStream
沒有接收數(shù)據(jù)時)才開始重新工作。
以下是我的代碼:
JavaReceiverInputDStream<String> streamData1 = ssc.socketTextStream("localhost",996,
StorageLevels.MEMORY_AND_DISK_SER);
JavaReceiverInputDStream<String> streamData2 = ssc.socketTextStream("localhost", 9995,StorageLevels.MEMORY_AND_DISK_SER);
JavaPairDStream<Integer, String> dataStream1= streamData1.mapToPair(new PairFunction<String, Integer, String>() {
public Tuple2<Integer, String> call(String stream) throws Exception {
Tuple2<Integer,String> streamPair= new Tuple2<Integer, String>(1, stream);
return streamPair;
}
});
JavaPairDStream<Integer, String> dataStream2= streamData2.mapToPair(new PairFunction<String, Integer, String>() {
public Tuple2<Integer, String> call(String stream) throws Exception {
Tuple2<Integer,String> streamPair= new Tuple2<Integer, String>(2, stream);
return streamPair;
}
});
dataStream2.print(); //for example
請注意,沒有錯誤消息,Spark Simple在啟動上下文后凍結(jié),雖然我從端口收到JSON消息,但它沒有顯示任何內(nèi)容。
非常感謝。
推薦答案
查看Spark Streaming documentation中的這些警告,并查看它們是否適用:
要記住的要點
在本地運行Spark Streaming程序時,請勿使用local或local1為主URL。這兩種情況都意味著只有一個線程將用于本地運行任務(wù)。如果您使用的是基于接收器的輸入DStream(例如Sockets、Kafka、Flume等),則將使用單個線程來運行接收器,而不會留下處理接收到的數(shù)據(jù)的線程。因此,在本地運行時,請始終使用”local[n]”作為主URL,其中n>要運行的接收器的數(shù)量(有關(guān)如何設(shè)置主URL的信息,請參閱Spark屬性)。
將邏輯擴展到在集群上運行,分配給Spark Streaming應(yīng)用程序的核心數(shù)必須多于接收器數(shù)。否則,系統(tǒng)將接收數(shù)據(jù),但無法處理數(shù)據(jù)。
這篇關(guān)于為什么當(dāng)我發(fā)送兩個輸入流時,Spark Streaming停止工作?的文章就介紹到這了,希望我們推薦的答案對大家有所幫助,