日日操夜夜添-日日操影院-日日草夜夜操-日日干干-精品一区二区三区波多野结衣-精品一区二区三区高清免费不卡

公告:魔扣目錄網(wǎng)為廣大站長(zhǎng)提供免費(fèi)收錄網(wǎng)站服務(wù),提交前請(qǐng)做好本站友鏈:【 網(wǎng)站目錄:http://www.ylptlb.cn 】, 免友鏈快審服務(wù)(50元/站),

點(diǎn)擊這里在線咨詢客服
新站提交
  • 網(wǎng)站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會(huì)員:747

「技術(shù)干貨」gocraft/work工作隊(duì)列源碼簡(jiǎn)介

 

作者/蔡錫生

 

簡(jiǎn)介

 

gocraft/work是一款使用go開(kāi)發(fā)的任務(wù)處理軟件,通過(guò)redis存儲(chǔ)任務(wù)隊(duì)列,可以使用工作池同時(shí)處理多個(gè)任務(wù)。本文主要介紹任務(wù)注冊(cè)和任務(wù)消費(fèi)的源代碼。

 

功能特性

 

• Fast and efficient. Faster than this, this, and this. See below for benchmarks.

 

• Reliable - don't lose jobs even if your process crashes.

 

• Middleware on jobs -- good for metrics instrumentation, logging, etc.

 

• If a job fails, it will be retried a specified number of times.

 

• Schedule jobs to hAppen in the future.

 

• Enqueue unique jobs so that only one job with a given name/arguments exists in the queue at once.

 

• Web UI to manage failed jobs and observe the system.

 

• Periodically enqueue jobs on a cron-like schedule.

 

• Pause / unpause jobs and control concurrency within and across processes.

 

注冊(cè)Job

 

注冊(cè)Job流程

 

1. 創(chuàng)建redis client pool。

 

2. 創(chuàng)建對(duì)象,定義任務(wù)處理函數(shù)。

 

3. 創(chuàng)建任務(wù)工作池,需要傳入被處理對(duì)象結(jié)構(gòu)體,最大并發(fā)數(shù),命名空間,redis client pool。

 

4. 創(chuàng)建Job,需要傳入job名稱和job處理函數(shù), job在redis中使用列表存儲(chǔ),key的組成:nameSapce:job:jobName,同一namespace支持多種類型任務(wù)處理。

 

這里使用任務(wù)名稱作為key存入redis, 任務(wù)處理參數(shù)存放到列表中

 

func main() {
  // Make a new pool. Arguments:
  // Context{} is a struct that will be the context for the request.
  // 10 is the max concurrency
  // "my_app_namespace" is the Redis namespace
  // redisPool is a Redis pool
  pool := work.NewWorkerPool(Context{}, 10, "my_app_namespace", redisPool)

  // Add middleware that will be executed for each job
  pool.Middleware((*Context).Log)

  // Map the name of jobs to handler functions
  // pool 中的 jobTypes是一個(gè)字典,key 是任務(wù)名稱, value 是 任務(wù)處理函數(shù)
  // 當(dāng)有任務(wù)的時(shí)候,會(huì)將任務(wù)需要的參數(shù) 放入到redis key 為jobName的列表中
  // 第二個(gè)參數(shù)必須是 工作池對(duì)象的方法
  pool.Job("send_email", (*Context).SendEmail)

  // Customize options:
  pool.JobWithOptions("export", work.JobOptions{Priority: 10, MaxFails: 1}, (*Context).Export)

  // Start processing jobs
  pool.Start()
  ...
}

 

發(fā)送Job

 

發(fā)送job其實(shí)調(diào)用NewEnqueuer方法向redis的列表中壓入元素(具體的內(nèi)容是任務(wù)參數(shù))。

 

package main

import (
  "github.com/gomodule/redigo/redis"
  "github.com/gocraft/work"
)

// Make a redis pool
var redisPool = &redis.Pool{
  MaxActive: 5,
  MaxIdle: 5,
  Wait: true,
  Dial: func() (redis.Conn, error) {
    return redis.Dial("tcp", ":6379")
  },
}

// Make an enqueuer with a particular namespace
var enqueuer = work.NewEnqueuer("my_app_namespace", redisPool)

func main() {
  // Enqueue a job named "send_email" with the specified parameters.
  _, err := enqueuer.Enqueue("send_email", work.Q{"address": "test@example.com", "subject": "hello world", "customer_id": 4})
  if err != nil {
    log.Fatal(err)
  }
}

 

Woker Fetch Job

 

在New WrokPool的時(shí)候會(huì)根據(jù)并法參數(shù)concurrency,創(chuàng)建同等個(gè)數(shù)的woker。

 

Worker是一個(gè)job處理者,通過(guò)永久for循環(huán),不間斷的從redis的任務(wù)隊(duì)列中獲取任務(wù),在處理任務(wù)的時(shí)候,協(xié)程阻塞,等待一個(gè)任務(wù)處理完,再繼續(xù)下一個(gè)。

 

下面的代碼是worker在for循環(huán)中的重要操作(1) fetch job (2) process job

 

func (w *worker) loop() {
  for {
    select {
     。。。
    case <-timer.C:
      job, err := w.fetchJob()
      w.process(job)
    }
  }
}

 

fetchJob本質(zhì)是redis的pop,push操作。首先將redis列表中的任務(wù)移除,然后再放入到處理隊(duì)列中,這個(gè)操作必須是原子操作(原子性是指事務(wù)是一個(gè)不可再分割的工作單元,事務(wù)中的操作要么都發(fā)生,要么都不發(fā)生),作者使用了lua腳本完成。最后返回一個(gè)job對(duì)象,里面有后面任務(wù)處理函數(shù)需要的args,即這里的rawJson。

 

func (w *worker) fetchJob() (*Job, error) {

   scriptArgs = append(scriptArgs, w.poolID) // ARGV[1]
  ...
   values, err := redis.Values(w.redisFetchScript.Do(conn, scriptArgs...))
    ...
   job, err := newJob(rawJSON, dequeuedFrom, inProgQueue)
    ..
   return job, nil
}

 

Worker handle Job.

 

Pool.JobWithOptions(InstallMasterJob, work.JobOptions{Priority: 1, MaxFails: 1}, ConsumeJob)

 

workpool注冊(cè)任務(wù)ConsumeJob后, 該任務(wù)ConsumeJob會(huì)被賦值給worker.jobTypes[job.Name].GenericHandler, 他的反射類型被賦值給了jobType.DynamicHandler。如果該消費(fèi)任務(wù)使用了上下文參數(shù)。

 

創(chuàng)建消費(fèi)任務(wù)的2種方法

 

• If you don't need context:

 

func YourFunctionName(job *work.Job) error.

 

• If you want your handler to accept a context:

 

func (c *Context) YourFunctionName(job *work.Job) error // or,

 

func YourFunctionName(c *Context, job *work.Job) error.

 

func (wp *WorkerPool) JobWithOptions(name string, jobOpts JobOptions, fn interface{}) *WorkerPool {
  jobOpts = applyDefaultsAndValidate(jobOpts)

  vfn := reflect.ValueOf(fn)
  validateHandlerType(wp.contextType, vfn)
  jt := &jobType{
    Name:           name,
    //vfn 任務(wù)消費(fèi)方法的反射類型, 如果消費(fèi)方法中有ctx 參數(shù),那么會(huì)調(diào)用反射執(zhí)行
    DynamicHandler: vfn, 
    JobOptions:     jobOpts,
  }
  if gh, ok := fn.(func(*Job) error); ok {
    // 用戶的任務(wù)消費(fèi)函數(shù),被賦值給了jobType的GenericHandler, 如果消費(fèi)方法只有一個(gè)job參數(shù),則執(zhí)行GenericHandler
    jt.IsGeneric = true
    jt.GenericHandler = gh
  }

  wp.jobTypes[name] = jt

  for _, w := range wp.workers {
    w.updateMiddlewareAndJobTypes(wp.middleware, wp.jobTypes)
  }

  return wp
}

 

執(zhí)行消費(fèi)任務(wù)的真正代碼

 

worker對(duì)象的processJob(job * Job)方法 調(diào)用了runJob方法執(zhí)行GenericHandler or DynamicHandler.Call。

 

func runJob(job *Job, ctxType reflect.Type, middleware []*middlewareHandler, jt *jobType) (returnCtx reflect.Value, returnError error) {
  。。。。
  next = func() error {
    。。。。
    if jt.IsGeneric {
      // 任務(wù)消費(fèi)方法沒(méi)有ctx時(shí)候執(zhí)行
      return jt.GenericHandler(job)
    }
    
    // 任務(wù)消費(fèi)方法有ctx時(shí)執(zhí)行
    res := jt.DynamicHandler.Call([]reflect.Value{returnCtx, reflect.ValueOf(job)})
    x := res[0].Interface()
    if x == nil {
      return nil
    }
    return x.(error)
  }
  ...
  returnError = next()

  return
}

 

LStack產(chǎn)品簡(jiǎn)介

 

面向行業(yè)應(yīng)用開(kāi)發(fā)商(ISV/SI)提供混合云/邊緣云場(chǎng)景下云原生應(yīng)用開(kāi)發(fā)測(cè)試、交付、運(yùn)維一站式服務(wù),幫助企業(yè)采用云原生敏捷開(kāi)發(fā)交付方法論,從而提高軟件開(kāi)發(fā)人員效率、減少運(yùn)維成本,加快數(shù)字化轉(zhuǎn)型,并最終實(shí)現(xiàn)業(yè)務(wù)創(chuàng)新。

分享到:
標(biāo)簽:gocraft work
用戶無(wú)頭像

網(wǎng)友整理

注冊(cè)時(shí)間:

網(wǎng)站:5 個(gè)   小程序:0 個(gè)  文章:12 篇

  • 51998

    網(wǎng)站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會(huì)員

趕快注冊(cè)賬號(hào),推廣您的網(wǎng)站吧!
最新入駐小程序

數(shù)獨(dú)大挑戰(zhàn)2018-06-03

數(shù)獨(dú)一種數(shù)學(xué)游戲,玩家需要根據(jù)9

答題星2018-06-03

您可以通過(guò)答題星輕松地創(chuàng)建試卷

全階人生考試2018-06-03

各種考試題,題庫(kù),初中,高中,大學(xué)四六

運(yùn)動(dòng)步數(shù)有氧達(dá)人2018-06-03

記錄運(yùn)動(dòng)步數(shù),積累氧氣值。還可偷

每日養(yǎng)生app2018-06-03

每日養(yǎng)生,天天健康

體育訓(xùn)練成績(jī)?cè)u(píng)定2018-06-03

通用課目體育訓(xùn)練成績(jī)?cè)u(píng)定