本文介紹了將可迭代轉換為RDD的處理方法,對大家解決問題具有一定的參考價值,需要的朋友們下面隨著小編來一起學習吧!
問題描述
我想從火花流到幾個彈性搜索索引。
我創建了成對的<key(index), value>
,當我執行groupByKey時,結果是<key(index), Iterable<value>>
的元組,但為了使用ElasticSearch-Spark插件保存到ElasticSearch,我需要JavaRDD<value>
的值。
我知道有一個可以從List創建Java RDD的SparkConext.parallize(List)選項,但該選項只能在驅動程序上執行。
是否有其他選項可以創建可以在Executor上執行的JavaRDD?或者我可以實現Tuple2<key(index), JavaRDD<value>>
在Executor上工作的另一種方式?
如果沒有,我如何才能在驅動程序上僅將Iterator切換到JavaRDD,并在Executor處將插件寫入ElasticSearch?
謝謝,
Daniela
推薦答案
我想說的是,必須有如下所示的smth
JavaPairRDD<Key, Iterable<Value>> pair = ...;
JavaRDD<Iterable<Value>> values = pair.map(t2 -> t2._2());
JavaRDD<Value> onlyValues = values.flatMap(it -> it);
替代方法是
JavaPairRDD<Key, Iterable<Value>> pair = ...;
JavaRDD<Key, Value> keyValues = pair.flatMapValues(v1 -> v1);
JavaRDD<Value> values = keyValues.map(t2 -> t2._2());
這篇關于將可迭代轉換為RDD的文章就介紹到這了,希望我們推薦的答案對大家有所幫助,