日日操夜夜添-日日操影院-日日草夜夜操-日日干干-精品一区二区三区波多野结衣-精品一区二区三区高清免费不卡

公告:魔扣目錄網為廣大站長提供免費收錄網站服務,提交前請做好本站友鏈:【 網站目錄:http://www.ylptlb.cn 】, 免友鏈快審服務(50元/站),

點擊這里在線咨詢客服
新站提交
  • 網站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會員:747

本文主要從以下幾個方面介紹Flink的流處理API——Source

一、從集合中讀取數據

二、從文件中讀取數據

三、從Kafka中讀取數據

四、自定義Source

數據處理的過程基本可以分為三個階段分別是,數據從來哪里,做什么業務邏輯,落地到哪里去。

這三部分在Flink中分別被稱為Source、Transform和Sink

版本:

scala:2.11.12

Kafka:0.8.2.2

Flink:1.7.2

pom.xml依賴部分(log日志的依賴一定要加上,否則當Flink從Kafka0.8中讀取數據報Failed to instantiate SLF4J LoggerFactory Reported exception)

    <dependencies>

        <dependency>
            <groupId>org.Apache.flink</groupId>
            <artifactId>flink-scala_2.11</artifactId>
            <version>1.7.2</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>1.7.2</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>1.7.2</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.10 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.8_2.11</artifactId>
            <version>1.7.2</version>
        </dependency>


        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.22</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.22</version>
        </dependency>

        <dependency>
            <groupId>org.apache.bahir</groupId>
            <artifactId>flink-connector-redis_2.11</artifactId>
            <version>1.0</version>
        </dependency>

        <dependency>
            <groupId>MySQL</groupId>
            <artifactId>mysql-connector-JAVA</artifactId>
            <version>5.1.38</version>
        </dependency>


    </dependencies>

一、從集合中讀取數據

package xxx

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}

// 樣例類,傳感器ID,時間戳,溫度 (后面都使用這個樣例類作為數據的類型)
case class SensorReading(id: String, timestamo: Long, temperature: Double){
  override def toString: String = {
    id+":"+ timestamo.toString + "," + temperature
  }
}

/**
*從集合中讀取數據
*/
object Sensor {
  def main(args: Array[String]): Unit = {
    val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    
    import  org.apache.flink.api.scala._
    val stream1: DataStream[SensorReading] = environment.fromCollection(List(
      SensorReading("sensor_1", 1547718199, 35.80018327300259),
      SensorReading("sensor_6", 1547718201, 15.402984393403084),
      SensorReading("sensor_7", 1547718202, 6.720945201171228),
      SensorReading("sensor_10", 1547718205, 38.101067604893444)
    ))

    stream1.print("Stream1:").setParallelism(1)
    environment.execute()
  }
}

二、從文件中讀取數據

Flink流處理API——Source

 

package xxx

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}

// 樣例類,傳感器ID,時間戳,溫度
case class SensorReading(id: String, timestamo: Long, temperature: Double){
  override def toString: String = {
    id+":"+ timestamo.toString + "," + temperature
  }
}
/**
*從文件中讀取數據
*/
object Sensor {
  def main(args: Array[String]): Unit = {
    val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
     val stream2: DataStream[String] = environment.readTextFile(
       "D:\Scala\Code\FlinkTest\src\main\resources\sensor.txt")
    stream2.print("Stream2:").setParallelism(1)
    environment.execute()
  }
}

三、從Kafka中讀取數據

Kafka的brokerList:slave1:9092,slave2:9092,slave3:9092

zookeeper集群:slave2:2181,slave3:2181,slave3:2181

package xxx

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08

/**
 * 從kafka中讀取數據
 */
object ReadDataFromKafka {
  def main(args: Array[String]): Unit = {

    // 設置讀取的kafka參數
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "slave1:9092,slave2:9092,slave3:9092")
    properties.setProperty("group.id", "flink_group1")
    properties.setProperty("zookeeper.connect", "slave2:2181,slave3:2181.slave4:2181")
    properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") // key的反序列化
    properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") // value的反序列化
    properties.setProperty("auto.offset.reset", "latest") // 偏移量

    val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    // 鏈接kafka讀取數據
    val kafkaStream: DataStream[String] = environment.addSource(new FlinkKafkaConsumer08[String]("sensor",
      new SimpleStringSchema(), properties))

    kafkaStream.print().setParallelism(1)
    environment.execute("readDataFromKafka")

  }

}

四、自定義Source

package xxx

import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala._

import scala.util.Random

/**
 * 自定義Source
 */
object ReadDataFromMySource {
  def main(args: Array[String]): Unit = {
    val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val dataStream: DataStream[String] = environment.addSource(new MySource())

    dataStream.print().setParallelism(1)

    environment.execute("MySource")
    
  }

}


class MySource extends  SourceFunction[String]{
  // 表示數據源是否正常運行
  var running:Boolean = true

  // 數據正常生成
  override def run(sourceContext: SourceFunction.SourceContext[String]): Unit = {
    val random = new Random()

    var temp = 1.to(10).map(
      i => (i, 100 + random.nextGaussian() * 100)
    )
    
    while (running){
      // 更新數值
      temp = temp.map(
        t=>(t._1, t._2 + random.nextGaussian())
      )

      // 當前時間
      val curTime = System.currentTimeMillis()

      temp.foreach(t=>{
        sourceContext.collect(curTime+": "+ t._1 + "--> "+ t._2)
      })

      Thread.sleep(500)
    }

  }

  // 取消數據生成
  override def cancel(): Unit ={
    running = false
  }
}

分享到:
標簽:Flink
用戶無頭像

網友整理

注冊時間:

網站:5 個   小程序:0 個  文章:12 篇

  • 51998

    網站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會員

趕快注冊賬號,推廣您的網站吧!
最新入駐小程序

數獨大挑戰2018-06-03

數獨一種數學游戲,玩家需要根據9

答題星2018-06-03

您可以通過答題星輕松地創建試卷

全階人生考試2018-06-03

各種考試題,題庫,初中,高中,大學四六

運動步數有氧達人2018-06-03

記錄運動步數,積累氧氣值。還可偷

每日養生app2018-06-03

每日養生,天天健康

體育訓練成績評定2018-06-03

通用課目體育訓練成績評定