本文介紹了如何使用睡覺(jué)接口在沒(méi)有使用的情況下從Sink(或一般稱為RichFunction)獲取作業(yè)名稱?的處理方法,對(duì)大家解決問(wèn)題具有一定的參考價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)吧!
問(wèn)題描述
如標(biāo)題所示。雖然getJobId
在RuntimeContext
中可用,但作業(yè)名稱不可用。
https://nightlies.apache.org/flink/flink-docs-release-1.13/api/java/org/apache/flink/api/common/functions/RuntimeContext.html
嘗試從配置中獲取似乎效果不佳:
@Override
public void open(Configuration parameters) throws Exception {
String jobName = parameters.getString(PipelineOptions.NAME); // this is null
}
我們?nèi)绾芜\(yùn)行獨(dú)立示例管道:
public static void main(String... args) {
try {
ParameterTool parameterTool = ParameterTool.fromArgs(args);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// some pipeline setup
env.execute("This-is-job-name");
} catch (Exception e) {
// logging
}
推薦答案
假設(shè)您將作業(yè)名稱作為參數(shù)傳遞給作業(yè),您希望將其設(shè)置如下:
public static void main(String... args) {
ParameterTool parameterTool = ParameterTool.fromArgs(args);
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(parameters);
然后這應(yīng)該會(huì)起作用
@Override
public void open(Configuration parameters) throws Exception {
ParameterTool params = (ParameterTool)
getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
String jobName = params.get(nameOfParameterWithJobName);
}
傳遞給open
的配置始終為空–這是一種不再使用的過(guò)時(shí)機(jī)制。未更改方法簽名以避免破壞公共API。
將此類信息傳遞給RichFunction的另一個(gè)好方法是將其傳遞給構(gòu)造函數(shù)。
這篇關(guān)于如何使用睡覺(jué)接口在沒(méi)有使用的情況下從Sink(或一般稱為RichFunction)獲取作業(yè)名稱?的文章就介紹到這了,希望我們推薦的答案對(duì)大家有所幫助,