本文要點
- 在構建應用程序和系統時,我們一直面臨的一個挑戰是如何有效地在它們之間交換信息,同時保持接口修改的靈活性,而不會對其他地方產生不恰當的影響。
- 事件提供了一種“金發姑娘式”的方法,實時API可以作為應用程序的基礎,既靈活又高性能,既松耦合又高效。在你所工作的業務領域,你可能會想到很多事件示例。它們可以是人類之間產生的互動,也可以是機器之間產生的交互。
- Apache Kafka提供了一個可伸縮的事件流平臺,你可以用它來構建強大的基于事件的應用程序。Kafka通過Kafka Streams API提供流式處理能力。
- ksqlDB是一個專門為流式處理應用程序而構建的事件流數據庫。它提供了一個基于SQL的API來查詢和處理Kafka中的數據。
- ksqlDB的特性包括過濾、轉換和連接來自流和表的數據,通過聚合事件創建物化視圖,等等。
在構建應用程序和系統時,我們一直面臨的一個挑戰是如何有效地在它們之間交換信息,同時保持接口修改的靈活性,而不會對其他地方產生不恰當的影響。接口越是具體和簡單化,在做出變更時就越有可能需要進行徹底的重寫。反過來也是成立的,通用的集成模式可以適用,并得到廣泛支持,但這是以性能作為代價。
事件提供了一種“金發姑娘式”的方法,實時 API 可以作為應用程序的基礎,既靈活又高性能,既松耦合又高效。
事件可以被視為其他大多數數據結構的構建塊。一般來說,它們記錄某件事情發生的事實和發生的時間點。一個事件可以捕獲不同級別的信息:從一個簡單的通知到一個可以描述所發生事情的完整狀態的事件。
我們可以通過聚合事件來創建狀態。除了作為狀態的基礎,事件還可以用于在發生事件時異步觸發其他地方的動作——這是事件驅動架構的基礎。通過這種方式,我們可以構建事件消費者來滿足我們的需求——包括無狀態的和有狀態的。事件生產者可以選擇維護狀態,但沒有必要這樣做,因為事件消費者可以從接收到的事件中重新構建狀態。
在你所工作的業務領域,你可能會想到很多事件示例。它們可以是人類之間產生的互動,也可以是機器之間產生的交互。它們可能包含一個豐富的有效負載,或者它們本質上只是一個通知。例如:
- 事件:userLogin
- 有效載荷:zbeeblebrox在2020-08-17 16:26:39 BST時登錄。
- 事件:CarParked
- 有效載荷:車輛A42 XYZ在2020-08-17 16:36:27時停在了X42位置上。
- 事件:orderPlaced
- 有效載荷:Robin在2020-08-17 16:35:41 BST時買了4罐總價為2.25英鎊的烘豆。
這些事件可以用來直接觸發其他地方的動作(如處理訂單的服務),也可以通過聚合來提供信息(如當前停車場已經被占用的數量,就可以知道還有多少空位)。
所以,如果事件是我們構建應用程序和服務的基石,那么我們需要一種技術來支持我們——這就是 Apache Kafka 的切入點。Kafka 是一個可伸縮的事件流平臺,它提供了:
- Pub/Sub
- 發布(寫)和訂閱(讀)事件流,包括從其他系統持續導入/導出數據。
- 有狀態的流式處理
- 按照你的需要持久和可靠地存儲事件流。
- 存儲
- 按事件發生的順序或追溯的方式處理事件流。
Kafka 采用了分布式日志的概念。通過這個簡單但功能強大的分布式、不可變、僅追加的日志的概念,我們可以以一種可伸縮和高效的方式捕獲和存儲業務和系統產生的事件。這些事件可以供多個用戶使用,也可以進行進一步的處理和聚合,既可以直接使用,也可以存儲在 RDBMS、數據湖和 NoSQL 等存儲系統中。
在本文的其余部分中,我將探索 Apache Kafka 提供的 API,并演示如何在應用程序中使用它們。
生產者和消費者 API
像 Kafka 這樣的系統,它的偉大之處在于生產者和消費者是解耦的,這意味著我們可以在不需要消費者的情況下生產數據(由于是解耦的,我們可以大規模地這樣做)。一個事件發生了,我們把它發送到 kafka,就這么簡單。我們所需要知道的就是 Kafka 集群的細節,以及我們想要發送事件到的主題(Kafka 組織數據的一種方式,有點像 RDBMS 中的表)。
Kafka 有很多不同語言的客戶端。這里有一個使用 Go 產生事件到 Kafka 的例子:
package mainimport ( "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka")func main() { topic := "test_topic" p, _ := kafka.NewProducer(&kafka.ConfigMap{ "bootstrap.servers": "localhost:9092"}) defer p.Close() p.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: 0}, Value: []byte("Hello world")}, nil)}
因為 Kafka 是持久地存儲事件的,所以當我們想要使用事件時,它們是可用的,直到我們使用完后才過期(這個可以根據主題來配置)。
事件被寫入 Kafka 主題后,就可供一個或多個消費者讀取。消費者可以采用傳統的發布/訂閱方式,并在新事件到達時接收它們,也可以根據應用程序的需要選擇重新消費之前某個時間點的事件。Kafka 的這種回放功能要歸功于它的持久和可伸縮的存儲層,這給很多重要的實際應用場景提供了巨大優勢,如機器學習和 A/B 測試,這些場景同時需要歷史數據和實時數據。在受監管的行業中,數據必須保留多年才符合法律規定。傳統的消息傳遞系統如 RabbitMQ、ActiveMQ 無法支持這樣的要求。
package mainimport ( "fmt" "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka")func main() { topic := "test_topic" cm := kafka.ConfigMap{ "bootstrap.servers": "localhost:9092", "go.events.channel.enable": true, "group.id": "rmoff_01"} c, _ := kafka.NewConsumer(&cm) defer c.Close() c.Subscribe(topic, nil) for { select { case ev := <-c.Events(): switch ev.(type) { case *kafka.Message: km := ev.(*kafka.Message) fmt.Printf("? Message '%v' received from topic '%v'n", string(km.Value), string(*km.TopicPartition.Topic)) } } }}
當一個消費者連接到 Kafka 時,它會提供一個消費者群組標識符。消費者群組支持兩種功能。首先,Kafka 用它跟蹤消費者讀取主題的偏移量,當消費者重新連接時,可以從之前的位置繼續讀取。第二,消費者應用程序可能希望使用多個消費者讀取數據,形成一個消費者群組,從而并行處理數據。Kafka 將事件分配給群組內的每一個消費者,如果隨后有成員離開或加入(例如當一個消費者實例發生崩潰時),會主動管理好群組。
這意味著多個服務可以使用相同的數據,而它們之間沒有任何相互依賴關系。同樣的數據也可以使用 Kafka Connect API 路由到其他數據存儲中。
Kafka 提供了 JAVA、C/C++、Go、Python 和 Node.js 等語言的生產者和消費者 API。不過,如果你的應用程序想要使用 HTTP 而不是原生的 Kafka 協議呢?這個時候可以使用 REST 代理。
在 Kafka 中使用 REST API
假設我們正在為智能停車場的設備開發一個應用程序。記錄汽車停車位的事件的有效載荷可能看起來像這樣:
{ "name": "NCP Sheffield", "space": "A42", "occupied": true}
我們可以把這個事件發送到 Kafka 主題上,它也會將記錄事件的時間作為事件元數據的一部分。使用Confluent REST Proxy向 Kafka 生成數據只需要進行一個簡單的 REST 調用:
curl -X POST -H "Content-Type: Application/vnd.kafka.json.v2+json" -H "Accept: application/vnd.kafka.v2+json" --data '{"records":[{"value":{ "name": "NCP Sheffield", "space": "A42", "occupied": true }}]}' "http://localhost:8082/topics/carpark"
任何一個應用程序都可以使用之前介紹的原生消費者 API 或使用 REST API 來消費這個主題。與原生消費者 API 一樣,使用 REST API 的消費者也是消費者群組的成員,這個時候叫作訂閱。因此,對于 REST API,必須首先聲明消費者和訂閱:
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"name": "rmoff_consumer", "format": "json", "auto.offset.reset": "earliest"}' http://localhost:8082/consumers/rmoff_consumercurl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"topics":["carpark"]}' http://localhost:8082/consumers/rmoff_consumer/instances/rmoff_consumer/subscription
完成這些之后,就可以讀取事件了:
curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" http://localhost:8082/consumers/rmoff_consumer/instances/rmoff_consumer/records[ { "topic": "carpark", "key": null, "value": { "name": "Sheffield NCP", "space": "A42", "occupied": true }, "partition": 0, "offset": 0 }]
如果有多個事件要接收,可以通過批次獲取。如果想要檢查新事件,需要再次進行 REST 調用。
我們已經介紹了如何向 Kafka 寫入數據和從 Kafka 主題獲取數據。但是,很多時候,我們想做的不只是簡單的 Pub/Sub。我們想基于事件流看到更大的圖景——所有汽車來來去去的情況、現在有多少空車位或者某個特定停車場的更新流。
條件通知、流式處理和物化視圖
如果你認為 Kafka 只是一個提供 Pub/Sub 功能的系統,就跟認為 iphone 只是一個用來撥打和接收電話的設備一樣。我的意思是,如果把 Pub/Sub 看成 Kafka 提供的眾多能力當中的一個,這是沒有錯的……它的作用確實遠遠不止于此。Kafka 通過 Kafka Streams API 提供了流式處理能力。這是一個功能豐富的 Java 客戶端庫,用于在 Kafka 中大規模和跨多臺機器對數據進行有狀態的流式處理。Kafka Streams 被沃爾瑪、Ticketmaster 和 Bloomberg 等公司廣泛應用,它還是 ksqlDB 的基礎。
ksqlDB是一個專門為流式處理應用程序構建的事件流數據庫。它提供了一個基于 SQL 的 API 來查詢和處理 Kafka 中的數據。ksqlDB 的特性包括過濾、轉換和連接來自流和表的數據,通過聚合事件創建物化視圖,等等。
要使用 ksqlDB 中的數據,我們首先需要聲明一個 schema:
CREATE STREAM CARPARK_EVENTS (NAME VARCHAR, SPACE VARCHAR, OCCUPIED BOOLEAN) WITH (KAFKA_TOPIC='carpark', VALUE_FORMAT='JSON');
ksqlDB 是一個集群應用程序,這個初始聲明工作可以在啟動時完成,也可以根據需要由客戶端來完成。完成這些之后,客戶端就可以訂閱來自原始主題的變更流(帶有過濾器)。例如,想要獲得一個停車場有空位的通知,可以運行以下命令:
SELECT TIMESTAMPTOSTRING(ROWTIME,'yyyy-MM-dd HH:mm:ss') AS EVENT_TS, SPACE FROM CARPARK_EVENTS WHERE NAME='Sheffield NCP' AND OCCUPIED=false EMIT CHANGES;
與 SQL 查詢不同,這個查詢是一種持續查詢(使用 EMIT CHANGES 子句指定)。持續查詢,即推送(Push)查詢,將在事件發生時(現在和將來)持續地返回新的匹配項,直到事件終止為止。ksqlDB 還支拉取(Pull)查詢(我們將在下面探討),這些查詢的行為與常規 RDBMS 的查詢差不多,返回某個時間點的值。因此,ksqlDB 既支持流也支持靜態狀態,在實際當中,大多數應用程序需要根據正在執行的操作來選擇這兩種方式。
ksqlDB 提供了一個全面的 REST API,通過 curl 進行上面的 SQL 調用看起來像這樣:
curl --http2 'http://localhost:8088/query-stream' --data-raw '{"sql":"SELECT TIMESTAMPTOSTRING(ROWTIME,'''yyyy-MM-dd HH:mm:ss''') AS EVENT_TS, SPACE FROM CARPARK_EVENTS WHERE NAME='''Sheffield NCP''' and OCCUPIED=false EMIT CHANGES;"}'
這個調用產生一個來自服務器端的響應流(帶有頭部信息),然后源主題有匹配的事件時,這些事件被發送到客戶端:
{"queryId":"383894a7-05ee-4ec8-bb3b-c5ad39811539","columnNames":["EVENT_TS","SPACE"],"columnTypes":["STRING","STRING"]}…["2020-08-05 16:02:33","A42"]………["2020-08-05 16:07:31","D72"]…
我們也可以使用 ksqlDB 來定義和填充新的數據流。在 SELECT 語句前加上 CREATE STREAM streamname AS,就可以將持續查詢的輸出路由到 Kafka 主題。因此,我們可以使用 ksqlDB 轉換、連接、過濾發送給 Kafka 的事件。ksqlDB 支持將表作為一等對象類型,我們可以用它來增強接收到的有關停車場信息的事件:
CREATE STREAM CARPARKS AS SELECT E.NAME AS NAME, E.SPACE, R.LOCATION, R.CAPACITY, E.OCCUPIED, CASE WHEN OCCUPIED=TRUE THEN 1 ELSE -1 END AS OCCUPIED_IND FROM CARPARK_EVENTS E INNER JOIN CARPARK_REFERENCE R ON E.NAME = R.NAME;
我們仍然使用 CASE 語句來創建可用車位的計數。上面的 CREATE STREAM 填充了一個 Kafka 主題,看起來像這樣:
最后,讓我們看看如何在 ksqlDB 中創建有狀態聚合并在客戶端查詢。要創建物化視圖,需要運行包含聚合函數的 SQL:
CREATE TABLE CARPARK_SPACES AS SELECT NAME, SUM(OCCUPIED_IND) AS OCCUPIED_SPACES FROM CARPARKS GROUP BY NAME;
這個狀態是在分布式 ksqlDB 節點中維護的,可以使用 REST API 查詢:
curl --http2 'http://localhost:8088/query-stream' --data-raw '{"sql":"SELECT OCCUPIED_SPACES FROM CARPARK_SPACES WHERE NAME='''Birmingham NCP''';"}'
與上面看到的響應流不同,針對狀態的查詢(稱為“拉取查詢”,而不是“推送查詢”)是立即返回的,然后退出:
{"queryId":null,"columnNames":["OCCUPIED_SPACES"],"columnTypes":["INTEGER"]}[30]
如果應用程序希望獲得最新的數字,它們可以重新發出查詢,結果可能會發生變化,也可能不會:
curl --http2 'http://localhost:8088/query-stream' --data-raw '{"sql":"SELECT OCCUPIED_SPACES FROM CARPARK_SPACES WHERE NAME='''Birmingham NCP''';"}'{"queryId":null,"columnNames":["OCCUPIED_SPACES"],"columnTypes":["INTEGER"]}[29]
ksqlDB 官方提供了一個Java客戶端,社區提供了Python和Go客戶端。
與其他系統集成
將 Kafka 作為異步消息傳遞的高可伸縮性和持久性代理的一個好處是在應用程序之間交換的數據也可以用于驅動流式處理(如上所述),或直接送入依賴的系統。
繼續以停車場應用程序為例,我們很可能想要在其他地方使用這些停車或離場事件,例如:
- 分析,以便知道停車的行為和趨勢;
- 機器學習預測容量需求;
- 向第三方供應商提供數據。
你可以使用 Apache Kafka 的 Connect API 來定義 Kafka 內外系統的流式集成。例如,從 Kafka 實時流數據到 S3,你可以這樣:
curl -i -X PUT -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/sink-s3/config -d ' { "connector.class": "io.confluent.connect.s3.S3SinkConnector", "topics": "carpark", "s3.bucket.name": "rmoff-carparks", "s3.region": "us-west-2", "flush.size": "1024", "storage.class": "io.confluent.connect.s3.storage.S3Storage", "format.class": "io.confluent.connect.s3.format.json.JsonFormat" }'
現在,用于驅動應用程序通知和構建可以直接查詢狀態的應用程序的數據也流到了 S3。這些應用場景之間都是解耦的。如果我們隨后想要將數據流到另一個地方,比如 Snowflake,只需要添加另一個 Kafka 連接配置,其他消費者完全不受影響。Kafka Connect 也可以將數據流到 Kafka。例如,我們可以使用變更數據捕獲(CDC)對 ksqlDB 中的 CARPARK_REFERENCE 表進行流化。
結論
Kafka 提供了一個可伸縮的事件流平臺,你可以用它來構建強大的基于事件的應用程序。將事件作為連接應用程序和服務的基礎,你可以從多方面受益,包括松散耦合、服務自治、彈性、靈活演化和彈性。
你可以使用 Kafka API 及其周邊生態系統(包括 ksqlDB)來進行基于訂閱的消費和查詢物化視圖,而不需要額外的數據存儲。在 API 方面,既有原生客戶端 API,也有 REST API。
要了解更多關于 Kafka 的信息,請訪問 developer.confluent.io。Confluent Platform 是 Apache Kafka 的一個發行版,包含了本文討論的所有組件。它可以在本地使用,也可以作為托管服務使用(Confluent Cloud)。你可以在GitHub上找到本文的代碼示例和用于運行示例的 Docker Compose 文件。如果你想了解更多有關如何使用 Kafka 構建事件驅動系統的知識,那么一定要閱讀 Ben Stopford 的優秀著作《設計事件驅動系統》。
作者簡介
Robin Moffatt 是 Confluent 的高級開發者布道師。Confluent 是由 Apache Kafka 原作者(同時也是 Oracle ACE 董事)創立的。自 2009 年以來,他一直在各種技術大會(包括 QCon、Devoxx、Strata、Kafka 峰會和Øredev)上演講。讀者可以在網上找到他的演講,訂閱他的YouTube頻道,閱讀他的博文。工作之余,Robin 喜歡跑步,喝啤酒,吃油炸早餐。
原文鏈接:
Real Time APIs in the Context of Apache Kafka