簡單Go語言延遲隊列思路:
package mAIn
import (
"fmt"
"time"
)
// Message is a structure that holds the contents of the message.
type Message struct {
ID int
Body string
Delay time.Duration
}
// delayMessageQueue holds the queue of messages
type delayMessageQueue struct {
queue []Message
}
func (d *delayMessageQueue) send(msg Message) {
go func() {
timer := time.NewTimer(msg.Delay)
<-timer.C
d.queue = Append(d.queue, msg)
fmt.Printf("Message %d sentn", msg.ID)
}()
}
func (d *delayMessageQueue) receive() {
for {
if len(d.queue) > 0 {
msg := d.queue[0]
fmt.Printf("Message %d received n", msg.ID)
d.queue = d.queue[1:] // Dequeue
}
end
}
func main() {
dmq := &delayMessageQueue{}
dmq.send(Message{ID: 1, Body: "Hello, World", Delay: 2 * time.Second})
dmq.send(Message{ID: 2, Body: "Hello, Go", Delay: 1 * time.Second})
// Keep the main function alive to let goroutines finish
time.Sleep(5 * time.Second)
dmq.receive()
}
我們定義了一個消息結構,包含消息ID、消息內容和延遲。我們也定義了一個延遲消息隊列,它有兩個方法,一個發送消息,一個接收消息。
發送方法將消息放入一個goroutine中,然后用一個定時器等待指定的延遲時間。當定時器到達時,消息會添加到隊列中。
接收方法將持續檢查隊列,一旦發現隊列中有消息,就會打印消息內容并將其從隊列中移除。通過time到達指定的時間然后進行發送。每個goroutine都有自己的定時器,這是非常低效的。實際上,我們應該使用一個最小堆維護所有的定時器,并且只有一個goroutine在阻塞等待最早的定時器。如果有更早的定時器插入,喚醒那個goroutine并阻塞等待新的最早的定時器。
進階版本
其中所有延遲任務都由一個優先隊列(最小堆)進行維護,取出最早到達的任務進行處理。我給出一個簡版的示例,其他更多的細節如并發控制、錯誤處理等,您可以在實際開發中完善。
Go語言標準庫container/heap
提供了堆操作的實現,通過組合使用該包提供的heap.Push
和heap.Pop
,可以實現一個優先隊列。
package main
import (
"container/heap"
"fmt"
"sync"
"time"
)
// 優先隊列內部維護的隊列元素
type Item struct {
value string
priority int64 // 延遲任務到期時間(用時間戳表示)
index int // 隊列元素在堆中的索引
}
// 優先隊列:底層用最小堆實現
type PriorityQueue []*Item
func (pq PriorityQueue) Len() int { return len(pq) }
// 注意這里比較的是任務的優先級,為了讓離當前時間最近的任務在堆頂,我們讓比較結果顛倒
func (pq PriorityQueue) Less(i, j int) bool {
return pq[i].priority < pq[j].priority
}
func (pq PriorityQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
pq[i].index = i
pq[j].index = j
}
// 插入元素
func (pq *PriorityQueue) Push(x interface{}) {
n := len(*pq)
item := x.(*Item)
item.index = n
*pq = append(*pq, item)
}
// 刪除元素
func (pq *PriorityQueue) Pop() interface{} {
old := *pq
n := len(old)
item := old[n-1]
old[n-1] = nil // avoid memory leak
item.index = -1 // for safety
*pq = old[0 : n-1]
return item
}
var mutex sync.Mutex // 并發控制互斥鎖
// 設定好比cmpTime更晚的任務執行時間并加入堆
func addTask(pq *PriorityQueue, cmpTime int64, diff int64) {
mutex.Lock() // 加鎖
taskTime := cmpTime + diff*int64(time.Second)
item := &Item{
value: fmt.Sprintf("任務%d", taskTime),
priority: taskTime,
}
heap.Push(pq, item)
mutex.Unlock() // 解鎖
}
// 從堆上取出時間最早的任務執行
func doTask(pq *PriorityQueue) {
for {
mutex.Lock()
if len(*pq) == 0 {
mutex.Unlock()
continue // 堆空則不處理
}
// 堆非空,取出最早任務項
item := heap.Pop(pq).(*Item)
now := time.Now().Unix()
if item.priority-now > 0 {
// 未到執行時間,任務重新入堆
heap.Push(pq, item)
} else {
// 執行任務
fmt.Printf("%s 執行n", item.value)
}
mutex.Unlock()
// 防止doTask過于緊密,每次循環停頓1秒
time.Sleep(1 * time.Second)
}
}
func main() {
pq := make(PriorityQueue, 0)
heap.Init(&pq)
// 啟動執行任務goroutine
go doTask(&pq)
now := time.Now().Unix()
// 預設3個延遲任務,延遲時間分別為8s, 2s, 5s
addTask(&pq, now, 8)
addTask(&pq, now, 2)
addTask(&pq, now, 5)
// 保持主進程
time.Sleep(15 * time.Second)
}
在這個實現中,我們創建一個優先隊列(Priority Queue),優先級最高的任務(即最早執行的任務)總是位于堆的頂部,這樣我們可以確保總是首先處理最早執行的任務。當新的任務到來或者任務完成時,我們都用heap.Push
和heap.Pop
重新調整堆以保證最早執行的任務總是在堆頂。
關于更詳細的并發控制以及錯誤處理,這需要根據實際的業務需求進行對應的修改和處理。