php小編小新在這里給大家介紹一下Go語(yǔ)言中的Apache Beam左連接。Apache Beam是一種分布式數(shù)據(jù)處理框架,它提供了一種通用的編程模型,用于在不同的分布式數(shù)據(jù)處理引擎上執(zhí)行批處理和流處理任務(wù)。而左連接是一種常見的數(shù)據(jù)處理操作,它可以將兩個(gè)數(shù)據(jù)集按照某個(gè)鍵進(jìn)行關(guān)聯(lián),返回左側(cè)數(shù)據(jù)集中的所有記錄,以及與之匹配的右側(cè)數(shù)據(jù)集中的記錄。本文將詳細(xì)介紹Go語(yǔ)言中如何使用Apache Beam進(jìn)行左連接操作。
問題內(nèi)容
有沒有簡(jiǎn)單的方法可以使用 go 執(zhí)行 2 個(gè) pcollection 的左連接?
我發(fā)現(xiàn) sql 連接僅在 java 中可用。
package main import ( "context" "flag" "github.com/apache/beam/sdks/v2/go/pkg/beam" "github.com/apache/beam/sdks/v2/go/pkg/beam/log" "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx" ) type customer struct { CustID int FName string } type order struct { OrderID int Amount int Cust_ID int } func main() { flag.Parse() beam.Init() ctx := context.Background() p := beam.NewPipeline() s := p.Root() var custList = []customer{ {1, "Bob"}, {2, "Adam"}, {3, "John"}, {4, "Ben"}, {5, "Jose"}, {6, "Bryan"}, {7, "Kim"}, {8, "Tim"}, } var orderList = []order{ {123, 100, 1}, {125, 30, 3}, {128, 50, 7}, } custPCol := beam.CreateList(s, custList) orderPCol := beam.CreateList(s, orderList) // Left Join custPcol with orderPCol // Expected Result // CustID | FName |OrderID| Amount // 1 | Bob | 123 | 100 // 2 | Adam | | // 3 | John | 125 | 100 // 4 | Ben | | // 5 | Jose | | // 6 | Bryan | | // 7 | Kim | 125 | 100 // 8 | Tim | | if err := beamx.Run(ctx, p); err != nil { log.Exitf(ctx, "Failed to execute job: %v", err) } }
登錄后復(fù)制
我想加入這 2 個(gè) pcollection 并執(zhí)行進(jìn)一步的操作。我看到了有關(guān) cogroupbykey 的文檔,但無(wú)法將其轉(zhuǎn)換為普通 sql join 可以執(zhí)行的格式。
對(duì)此有什么建議嗎?
解決方法
嘗試這樣
type resulttype struct { custid int fname string orderid int amount int } result := beam.pardo(s, func(c customer, iterorder func(*order) bool) resulttype { var o order for iterorder(&o) { if c.custid == o.cust_id { return resulttype{ custid: c.custid, fname: c.fname, orderid: o.orderid, amount: o.amount, } } } return resulttype{ custid: c.custid, fname: c.fname, } }, custpcol, beam.sideinput{input: orderpcol})
登錄后復(fù)制
或者如果您想使用 cogroupbykey …
custWithKeyPCol := beam.ParDo(s, func(c customer) (int, customer) { return c.CustID, c }, custPCol) orderWithKeyPCol := beam.ParDo(s, func(o order) (int, order) { return o.Cust_ID, o }, orderPCol) resultPCol := beam.CoGroupByKey(s, custWithKeyPCol, orderWithKeyPCol) beam.ParDo0(s, func(CustID int, custIter func(*customer) bool, orderIter func(*order) bool) { c, o := customer{}, order{} for custIter(&c) { if ok := orderIter(&o); ok { fmt.Println(CustID, c.FName, o.OrderID, o.Amount) } fmt.Println(CustID, c.FName) } }, resultPCol)
登錄后復(fù)制