流處理(Stream processing)是一種計算機(jī)編程范式,其允許給定一個數(shù)據(jù)序列(流處理數(shù)據(jù)源),一系列數(shù)據(jù)操作(函數(shù))被應(yīng)用到流中的每個元素。同時流處理工具可以顯著提高程序員的開發(fā)效率,允許他們編寫有效、干凈和簡潔的代碼。
流數(shù)據(jù)處理在我們的日常工作中非常常見,舉個例子,我們在業(yè)務(wù)開發(fā)中往往會記錄許多業(yè)務(wù)日志,這些日志一般是先發(fā)送到Kafka,然后再由Job消費(fèi)Kafaka寫到elasticsearch,在進(jìn)行日志流處理的過程中,往往還會對日志做一些處理,比如過濾無效的日志,做一些計算以及重新組合日志等等,示意圖如下:
流處理工具fx
gozero是一個功能完備的微服務(wù)框架,框架中內(nèi)置了很多非常實用的工具,其中就包含流數(shù)據(jù)處理工具fx,下面我們通過一個簡單的例子來認(rèn)識下該工具:
package main
import (
"fmt"
"os"
"os/signal"
"syscall"
"time"
"github.com/tal-tech/go-zero/core/fx"
)
func main() {
ch := make(chan int)
go inputStream(ch)
go outputStream(ch)
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGTERM, syscall.SIGINT)
<-c
}
func inputStream(ch chan int) {
count := 0
for {
ch <- count
time.Sleep(time.Millisecond * 500)
count++
}
}
func outputStream(ch chan int) {
fx.From(func(source chan<- interface{}) {
for c := range ch {
source <- c
}
}).Walk(func(item interface{}, pipe chan<- interface{}) {
count := item.(int)
pipe <- count
}).Filter(func(item interface{}) bool {
itemInt := item.(int)
if itemInt%2 == 0 {
return true
}
return false
}).ForEach(func(item interface{}) {
fmt.Println(item)
})
}
inputStream函數(shù)模擬了流數(shù)據(jù)的產(chǎn)生,outputStream函數(shù)模擬了流數(shù)據(jù)的處理過程,其中From函數(shù)為流的輸入,Walk函數(shù)并發(fā)的作用在每一個item上,F(xiàn)ilter函數(shù)對item進(jìn)行過濾為true保留為false不保留,F(xiàn)orEach函數(shù)遍歷輸出每一個item元素。
流數(shù)據(jù)處理中間操作
一個流的數(shù)據(jù)處理可能存在許多的中間操作,每個中間操作都可以作用在流上。就像流水線上的工人一樣,每個工人操作完零件后都會返回處理完成的新零件,同理流處理中間操作完成后也會返回一個新的流。
fx的流處理中間操作:
操作函數(shù)功能輸入Distinct去除重復(fù)的itemKeyFunc,返回需要去重的keyFilter過濾不滿足條件的itemFilterFunc,Option控制并發(fā)量Group對item進(jìn)行分組KeyFunc,以key進(jìn)行分組Head取出前n個item,返回新streamint64保留數(shù)量Map對象轉(zhuǎn)換MapFunc,Option控制并發(fā)量Merge合并item到slice并生成新streamReverse反轉(zhuǎn)itemSort對item進(jìn)行排序LessFunc實現(xiàn)排序算法Tail與Head功能類似,取出后n個item組成新streamint64保留數(shù)量Walk作用在每個item上WalkFunc,Option控制并發(fā)量
下圖展示了每個步驟和每個步驟的結(jié)果:
用法與原理分析
From
通過From函數(shù)構(gòu)建流并返回Stream,流數(shù)據(jù)通過channel進(jìn)行存儲:
// 例子
s := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 0}
fx.From(func(source chan<- interface{}) {
for _, v := range s {
source <- v
}
})
// 源碼
func From(generate GenerateFunc) Stream {
source := make(chan interface{})
go func() {
defer close(source)
// 構(gòu)造流數(shù)據(jù)寫入channel
generate(source)
}()
return Range(source)
}
Filter
Filter函數(shù)提供過濾item的功能,F(xiàn)ilterFunc定義過濾邏輯true保留item,false則不保留:
// 例子 保留偶數(shù)
s := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 0}
fx.From(func(source chan<- interface{}) {
for _, v := range s {
source <- v
}
}).Filter(func(item interface{}) bool {
if item.(int)%2 == 0 {
return true
}
return false
})
// 源碼
func (p Stream) Filter(fn FilterFunc, opts ...Option) Stream {
return p.Walk(func(item interface{}, pipe chan<- interface{}) {
// 執(zhí)行過濾函數(shù)true保留,false丟棄
if fn(item) {
pipe <- item
}
}, opts...)
}
Group
Group對流數(shù)據(jù)進(jìn)行分組,需定義分組的key,數(shù)據(jù)分組后以slice存入channel:
// 例子 按照首字符"g"或者"p"分組,沒有則分到另一組
ss := []string{"golang", "google", "php", "Python", "JAVA", "c++"}
fx.From(func(source chan<- interface{}) {
for _, s := range ss {
source <- s
}
}).Group(func(item interface{}) interface{} {
if strings.HasPrefix(item.(string), "g") {
return "g"
} else if strings.HasPrefix(item.(string), "p") {
return "p"
}
return ""
}).ForEach(func(item interface{}) {
fmt.Println(item)
})
}
// 源碼
func (p Stream) Group(fn KeyFunc) Stream {
// 定義分組存儲map
groups := make(map[interface{}][]interface{})
for item := range p.source {
// 用戶自定義分組key
key := fn(item)
// key相同分到一組
groups[key] = Append(groups[key], item)
}
source := make(chan interface{})
go func() {
for _, group := range groups {
// 相同key的一組數(shù)據(jù)寫入到channel
source <- group
}
close(source)
}()
return Range(source)
}
Reverse
reverse可以對流中元素進(jìn)行反轉(zhuǎn)處理:
// 例子
fx.Just(1, 2, 3, 4, 5).Reverse().ForEach(func(item interface{}) {
fmt.Println(item)
})
// 源碼
func (p Stream) Reverse() Stream {
var items []interface{}
// 獲取流中數(shù)據(jù)
for item := range p.source {
items = append(items, item)
}
// 反轉(zhuǎn)算法
for i := len(items)/2 - 1; i >= 0; i-- {
opp := len(items) - 1 - i
items[i], items[opp] = items[opp], items[i]
}
// 寫入流
return Just(items...)
}
Distinct
distinct對流中元素進(jìn)行去重,去重在業(yè)務(wù)開發(fā)中比較常用,經(jīng)常需要對用戶id等做去重操作:
// 例子
fx.Just(1, 2, 2, 2, 3, 3, 4, 5, 6).Distinct(func(item interface{}) interface{} {
return item
}).ForEach(func(item interface{}) {
fmt.Println(item)
})
// 結(jié)果為 1,2,3,4,5,6
// 源碼
func (p Stream) Distinct(fn KeyFunc) Stream {
source := make(chan interface{})
threading.GoSafe(func() {
defer close(source)
// 通過key進(jìn)行去重,相同key只保留一個
keys := make(map[interface{}]lang.PlaceholderType)
for item := range p.source {
key := fn(item)
// key存在則不保留
if _, ok := keys[key]; !ok {
source <- item
keys[key] = lang.Placeholder
}
}
})
return Range(source)
}
Walk
Walk函數(shù)并發(fā)的作用在流中每一個item上,可以通過WithWorkers設(shè)置并發(fā)數(shù),默認(rèn)并發(fā)數(shù)為16,最小并發(fā)數(shù)為1,如設(shè)置unlimitedWorkers為true則并發(fā)數(shù)無限制,但并發(fā)寫入流中的數(shù)據(jù)由defaultWorkers限制,WalkFunc中用戶可以自定義后續(xù)寫入流中的元素,可以不寫入也可以寫入多個元素:
// 例子
fx.Just("aaa", "bbb", "ccc").Walk(func(item interface{}, pipe chan<- interface{}) {
newItem := strings.ToUpper(item.(string))
pipe <- newItem
}).ForEach(func(item interface{}) {
fmt.Println(item)
})
// 源碼
func (p Stream) walkLimited(fn WalkFunc, option *rxOptions) Stream {
pipe := make(chan interface{}, option.workers)
go func() {
var wg sync.WaitGroup
pool := make(chan lang.PlaceholderType, option.workers)
for {
// 控制并發(fā)數(shù)量
pool <- lang.Placeholder
item, ok := <-p.source
if !ok {
<-pool
break
}
wg.Add(1)
go func() {
defer func() {
wg.Done()
<-pool
}()
// 作用在每個元素上
fn(item, pipe)
}()
}
// 等待處理完成
wg.Wait()
close(pipe)
}()
return Range(pipe)
}
并發(fā)處理
fx工具除了進(jìn)行流數(shù)據(jù)處理以外還提供了函數(shù)并發(fā)功能,在微服務(wù)中實現(xiàn)某個功能往往需要依賴多個服務(wù),并發(fā)的處理依賴可以有效的降低依賴耗時,提升服務(wù)的性能。
fx.Parallel(func() {
userRPC() // 依賴1
}, func() {
accountRPC() // 依賴2
}, func() {
orderRPC() // 依賴3
})
注意fx.Parallel進(jìn)行依賴并行處理的時候不會有error返回,如需有error返回或者有一個依賴報錯需要立馬結(jié)束依賴請求請使用MapReduce工具進(jìn)行處理。
總結(jié)
本篇文章介紹了流處理的基本概念和gozero中的流處理工具fx,在實際的生產(chǎn)中流處理場景應(yīng)用也非常多,希望本篇文章能給大家?guī)硪欢ǖ膯l(fā),更好的應(yīng)對工作中的流處理場景。