本文主要從以下幾個方面介紹Flink的流處理API——Source
一、從集合中讀取數據
二、從文件中讀取數據
三、從Kafka中讀取數據
四、自定義Source
數據處理的過程基本可以分為三個階段分別是,數據從來哪里,做什么業務邏輯,落地到哪里去。
這三部分在Flink中分別被稱為Source、Transform和Sink
版本:
scala:2.11.12
Kafka:0.8.2.2
Flink:1.7.2
pom.xml依賴部分(log日志的依賴一定要加上,否則當Flink從Kafka0.8中讀取數據報Failed to instantiate SLF4J LoggerFactory Reported exception)
<dependencies>
<dependency>
<groupId>org.Apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>1.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.7.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.7.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.10 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.8_2.11</artifactId>
<version>1.7.2</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.22</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.22</version>
</dependency>
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>MySQL</groupId>
<artifactId>mysql-connector-JAVA</artifactId>
<version>5.1.38</version>
</dependency>
</dependencies>
一、從集合中讀取數據
package xxx
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
// 樣例類,傳感器ID,時間戳,溫度 (后面都使用這個樣例類作為數據的類型)
case class SensorReading(id: String, timestamo: Long, temperature: Double){
override def toString: String = {
id+":"+ timestamo.toString + "," + temperature
}
}
/**
*從集合中讀取數據
*/
object Sensor {
def main(args: Array[String]): Unit = {
val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
val stream1: DataStream[SensorReading] = environment.fromCollection(List(
SensorReading("sensor_1", 1547718199, 35.80018327300259),
SensorReading("sensor_6", 1547718201, 15.402984393403084),
SensorReading("sensor_7", 1547718202, 6.720945201171228),
SensorReading("sensor_10", 1547718205, 38.101067604893444)
))
stream1.print("Stream1:").setParallelism(1)
environment.execute()
}
}
二、從文件中讀取數據
package xxx
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
// 樣例類,傳感器ID,時間戳,溫度
case class SensorReading(id: String, timestamo: Long, temperature: Double){
override def toString: String = {
id+":"+ timestamo.toString + "," + temperature
}
}
/**
*從文件中讀取數據
*/
object Sensor {
def main(args: Array[String]): Unit = {
val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val stream2: DataStream[String] = environment.readTextFile(
"D:\Scala\Code\FlinkTest\src\main\resources\sensor.txt")
stream2.print("Stream2:").setParallelism(1)
environment.execute()
}
}
三、從Kafka中讀取數據
Kafka的brokerList:slave1:9092,slave2:9092,slave3:9092
zookeeper集群:slave2:2181,slave3:2181,slave3:2181
package xxx
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08
/**
* 從kafka中讀取數據
*/
object ReadDataFromKafka {
def main(args: Array[String]): Unit = {
// 設置讀取的kafka參數
val properties = new Properties()
properties.setProperty("bootstrap.servers", "slave1:9092,slave2:9092,slave3:9092")
properties.setProperty("group.id", "flink_group1")
properties.setProperty("zookeeper.connect", "slave2:2181,slave3:2181.slave4:2181")
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") // key的反序列化
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") // value的反序列化
properties.setProperty("auto.offset.reset", "latest") // 偏移量
val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// 鏈接kafka讀取數據
val kafkaStream: DataStream[String] = environment.addSource(new FlinkKafkaConsumer08[String]("sensor",
new SimpleStringSchema(), properties))
kafkaStream.print().setParallelism(1)
environment.execute("readDataFromKafka")
}
}
四、自定義Source
package xxx
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala._
import scala.util.Random
/**
* 自定義Source
*/
object ReadDataFromMySource {
def main(args: Array[String]): Unit = {
val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val dataStream: DataStream[String] = environment.addSource(new MySource())
dataStream.print().setParallelism(1)
environment.execute("MySource")
}
}
class MySource extends SourceFunction[String]{
// 表示數據源是否正常運行
var running:Boolean = true
// 數據正常生成
override def run(sourceContext: SourceFunction.SourceContext[String]): Unit = {
val random = new Random()
var temp = 1.to(10).map(
i => (i, 100 + random.nextGaussian() * 100)
)
while (running){
// 更新數值
temp = temp.map(
t=>(t._1, t._2 + random.nextGaussian())
)
// 當前時間
val curTime = System.currentTimeMillis()
temp.foreach(t=>{
sourceContext.collect(curTime+": "+ t._1 + "--> "+ t._2)
})
Thread.sleep(500)
}
}
// 取消數據生成
override def cancel(): Unit ={
running = false
}
}