本文介紹了如何修復不兼容的類型:org.apache.beam.sdk.options.ValueProvider<;java.lang.String>;無法轉換為java.lang.String";的處理方法,對大家解決問題具有一定的參考價值,需要的朋友們下面隨著小編來一起學習吧!
問題描述
我按照this link創建了一個模板,該模板構建了一個從KafkaIO讀取的光束管道。但我總是遇到”不兼容的類型:org.apache.beam.sdk.options.ValueProvider無法轉換為java.lang.String”。導致錯誤的是行”.withBootstrapServers(options.getKafkaServer())”。BEAM版本為2.9.0,以下是我的部分代碼。
public interface Options extends PipelineOptions {
@Description("Kafka server")
@Required
ValueProvider<String> getKafkaServer();
void setKafkaServer(ValueProvider<String> value);
@Description("Topic to read from")
@Required
ValueProvider<String> getInputTopic();
void setInputTopic(ValueProvider<String> value);
@Description("Topic to write to")
@Required
ValueProvider<String> getOutputTopic();
void setOutputTopic(ValueProvider<String> value);
@Description("File path to write to")
@Required
ValueProvider<String> getOutput();
void setOutput(ValueProvider<String> value);
}
public static void main(String[] args) {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
Pipeline p = Pipeline.create(options);
PCollection<String> processedData = p.apply(KafkaIO.<Long, String>read()
.withBootstrapServers(options.getKafkaServer())
.withTopic(options.getInputTopic())
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withoutMetadata()
)
以下是我運行代碼的方式:
mvn compile exec:java
-Dexec.mainClass=${MyClass}
-Pdataflow-runner -Dexec.args="
--project=${MyClass}
--stagingLocation=gs://${MyBucket}/staging
--tempLocation=gs://${MyBucket}/temp
--templateLocation=gs://${MyBucket}/templates/${MyClass}
--runner=DataflowRunner"
推薦答案
要通過ValueProvider
訪問值,需要使用get
方法,然后獲取具有其具體類型的值。
例如:
當有選項時:
ValueProvider<String> getKafkaServer();
可以通過以下方式訪問:
getKafkaServer().get()
這將返回您的字符串對象。
KafkaIo Api似乎需要獲取字符串參數,而不是ValueProvider,您必須從ValueProvider包裝中提取該值。
這篇關于如何修復不兼容的類型:org.apache.beam.sdk.options.ValueProvider<;java.lang.String>;無法轉換為java.lang.String";的文章就介紹到這了,希望我們推薦的答案對大家有所幫助,