作者/蔡錫生
簡(jiǎn)介
gocraft/work是一款使用go開(kāi)發(fā)的任務(wù)處理軟件,通過(guò)redis存儲(chǔ)任務(wù)隊(duì)列,可以使用工作池同時(shí)處理多個(gè)任務(wù)。本文主要介紹任務(wù)注冊(cè)和任務(wù)消費(fèi)的源代碼。
功能特性
• Fast and efficient. Faster than this, this, and this. See below for benchmarks.
• Reliable - don't lose jobs even if your process crashes.
• Middleware on jobs -- good for metrics instrumentation, logging, etc.
• If a job fails, it will be retried a specified number of times.
• Schedule jobs to hAppen in the future.
• Enqueue unique jobs so that only one job with a given name/arguments exists in the queue at once.
• Web UI to manage failed jobs and observe the system.
• Periodically enqueue jobs on a cron-like schedule.
• Pause / unpause jobs and control concurrency within and across processes.
注冊(cè)Job
注冊(cè)Job流程
1. 創(chuàng)建redis client pool。
2. 創(chuàng)建對(duì)象,定義任務(wù)處理函數(shù)。
3. 創(chuàng)建任務(wù)工作池,需要傳入被處理對(duì)象結(jié)構(gòu)體,最大并發(fā)數(shù),命名空間,redis client pool。
4. 創(chuàng)建Job,需要傳入job名稱和job處理函數(shù), job在redis中使用列表存儲(chǔ),key的組成:nameSapce:job:jobName,同一namespace支持多種類型任務(wù)處理。
這里使用任務(wù)名稱作為key存入redis, 任務(wù)處理參數(shù)存放到列表中
func main() {
// Make a new pool. Arguments:
// Context{} is a struct that will be the context for the request.
// 10 is the max concurrency
// "my_app_namespace" is the Redis namespace
// redisPool is a Redis pool
pool := work.NewWorkerPool(Context{}, 10, "my_app_namespace", redisPool)
// Add middleware that will be executed for each job
pool.Middleware((*Context).Log)
// Map the name of jobs to handler functions
// pool 中的 jobTypes是一個(gè)字典,key 是任務(wù)名稱, value 是 任務(wù)處理函數(shù)
// 當(dāng)有任務(wù)的時(shí)候,會(huì)將任務(wù)需要的參數(shù) 放入到redis key 為jobName的列表中
// 第二個(gè)參數(shù)必須是 工作池對(duì)象的方法
pool.Job("send_email", (*Context).SendEmail)
// Customize options:
pool.JobWithOptions("export", work.JobOptions{Priority: 10, MaxFails: 1}, (*Context).Export)
// Start processing jobs
pool.Start()
...
}
發(fā)送Job
發(fā)送job其實(shí)調(diào)用NewEnqueuer方法向redis的列表中壓入元素(具體的內(nèi)容是任務(wù)參數(shù))。
package main
import (
"github.com/gomodule/redigo/redis"
"github.com/gocraft/work"
)
// Make a redis pool
var redisPool = &redis.Pool{
MaxActive: 5,
MaxIdle: 5,
Wait: true,
Dial: func() (redis.Conn, error) {
return redis.Dial("tcp", ":6379")
},
}
// Make an enqueuer with a particular namespace
var enqueuer = work.NewEnqueuer("my_app_namespace", redisPool)
func main() {
// Enqueue a job named "send_email" with the specified parameters.
_, err := enqueuer.Enqueue("send_email", work.Q{"address": "test@example.com", "subject": "hello world", "customer_id": 4})
if err != nil {
log.Fatal(err)
}
}
Woker Fetch Job
在New WrokPool的時(shí)候會(huì)根據(jù)并法參數(shù)concurrency,創(chuàng)建同等個(gè)數(shù)的woker。
Worker是一個(gè)job處理者,通過(guò)永久for循環(huán),不間斷的從redis的任務(wù)隊(duì)列中獲取任務(wù),在處理任務(wù)的時(shí)候,協(xié)程阻塞,等待一個(gè)任務(wù)處理完,再繼續(xù)下一個(gè)。
下面的代碼是worker在for循環(huán)中的重要操作(1) fetch job (2) process job
func (w *worker) loop() {
for {
select {
。。。
case <-timer.C:
job, err := w.fetchJob()
w.process(job)
}
}
}
fetchJob本質(zhì)是redis的pop,push操作。首先將redis列表中的任務(wù)移除,然后再放入到處理隊(duì)列中,這個(gè)操作必須是原子操作(原子性是指事務(wù)是一個(gè)不可再分割的工作單元,事務(wù)中的操作要么都發(fā)生,要么都不發(fā)生),作者使用了lua腳本完成。最后返回一個(gè)job對(duì)象,里面有后面任務(wù)處理函數(shù)需要的args,即這里的rawJson。
func (w *worker) fetchJob() (*Job, error) {
scriptArgs = append(scriptArgs, w.poolID) // ARGV[1]
...
values, err := redis.Values(w.redisFetchScript.Do(conn, scriptArgs...))
...
job, err := newJob(rawJSON, dequeuedFrom, inProgQueue)
..
return job, nil
}
Worker handle Job.
Pool.JobWithOptions(InstallMasterJob, work.JobOptions{Priority: 1, MaxFails: 1}, ConsumeJob)
workpool注冊(cè)任務(wù)ConsumeJob后, 該任務(wù)ConsumeJob會(huì)被賦值給worker.jobTypes[job.Name].GenericHandler, 他的反射類型被賦值給了jobType.DynamicHandler。如果該消費(fèi)任務(wù)使用了上下文參數(shù)。
創(chuàng)建消費(fèi)任務(wù)的2種方法
• If you don't need context:
func YourFunctionName(job *work.Job) error.
• If you want your handler to accept a context:
func (c *Context) YourFunctionName(job *work.Job) error // or,
func YourFunctionName(c *Context, job *work.Job) error.
func (wp *WorkerPool) JobWithOptions(name string, jobOpts JobOptions, fn interface{}) *WorkerPool {
jobOpts = applyDefaultsAndValidate(jobOpts)
vfn := reflect.ValueOf(fn)
validateHandlerType(wp.contextType, vfn)
jt := &jobType{
Name: name,
//vfn 任務(wù)消費(fèi)方法的反射類型, 如果消費(fèi)方法中有ctx 參數(shù),那么會(huì)調(diào)用反射執(zhí)行
DynamicHandler: vfn,
JobOptions: jobOpts,
}
if gh, ok := fn.(func(*Job) error); ok {
// 用戶的任務(wù)消費(fèi)函數(shù),被賦值給了jobType的GenericHandler, 如果消費(fèi)方法只有一個(gè)job參數(shù),則執(zhí)行GenericHandler
jt.IsGeneric = true
jt.GenericHandler = gh
}
wp.jobTypes[name] = jt
for _, w := range wp.workers {
w.updateMiddlewareAndJobTypes(wp.middleware, wp.jobTypes)
}
return wp
}
執(zhí)行消費(fèi)任務(wù)的真正代碼
worker對(duì)象的processJob(job * Job)方法 調(diào)用了runJob方法執(zhí)行GenericHandler or DynamicHandler.Call。
func runJob(job *Job, ctxType reflect.Type, middleware []*middlewareHandler, jt *jobType) (returnCtx reflect.Value, returnError error) {
。。。。
next = func() error {
。。。。
if jt.IsGeneric {
// 任務(wù)消費(fèi)方法沒(méi)有ctx時(shí)候執(zhí)行
return jt.GenericHandler(job)
}
// 任務(wù)消費(fèi)方法有ctx時(shí)執(zhí)行
res := jt.DynamicHandler.Call([]reflect.Value{returnCtx, reflect.ValueOf(job)})
x := res[0].Interface()
if x == nil {
return nil
}
return x.(error)
}
...
returnError = next()
return
}
LStack產(chǎn)品簡(jiǎn)介
面向行業(yè)應(yīng)用開(kāi)發(fā)商(ISV/SI)提供混合云/邊緣云場(chǎng)景下云原生應(yīng)用開(kāi)發(fā)測(cè)試、交付、運(yùn)維一站式服務(wù),幫助企業(yè)采用云原生敏捷開(kāi)發(fā)交付方法論,從而提高軟件開(kāi)發(fā)人員效率、減少運(yùn)維成本,加快數(shù)字化轉(zhuǎn)型,并最終實(shí)現(xiàn)業(yè)務(wù)創(chuàng)新。