用Redis实现延迟队列

代码地址

思路

生产者发送一个定时任务到sorted set队列,score为未来的执行时间;这样队列第一个数据就是最近要执行的(消费)的任务

消费者定时(一个很短的时间间隔)去队列取出第一个任务,看是否到执行时间(score), 如果到了就执行任务,并将这个任务从队列删掉。

实现

任务

1
2
3
4
5
6
7
// DTask the delay task
type DTask struct {
ID string `json:"id"`
Payload string `json:"payload"`
CreateTime int64 `json:"create_time"`
ExecuteTime int64 `json:"execute_time"`
}

生产者

1
2
3
4
5
// Producer is delay task producer
type Producer interface {
// ProduceTask enqueue a delay task
ProduceTask(*redis.Client, *DTask) error
}

消费者

1
2
3
4
5
// Consumer is the consumer interface
type Consumer interface {
// Consume handle task from redis sorted set
Consume(*redis.Client)
}

发送任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// ProduceTask create a delay task(redis sorted set)
func (p *DefaultProducer) ProduceTask(client *redis.Client, task *DTask) error {
pipe := client.Pipeline()
defer pipe.Close()

key := fmt.Sprintf(DelayTaskKey, task.ID)
if err := pipe.HMSet(key, structs.Map(task)).Err(); err != nil {
return err
}

// 创建任务到sorted set队列,分数为执行时间
z := redis.Z{Member: task.ID, Score: float64(task.ExecuteTime)}
if err := pipe.ZAdd(DelayTaskListKey, z).Err(); err != nil {
return err
}

if _, err := pipe.Exec(); err != nil {
return pipe.Discard()
}

return nil
}

消费任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// Consume default consume the delay task
func (c *DefaultConsumer) Consume(client *redis.Client) {
ticker := time.NewTicker(c.interval)
done := make(chan struct{})
quit := make(chan os.Signal)
signal.Notify(quit, os.Interrupt)

go func() {
for {
select {
case <-ticker.C:
if err := c.handleTask(client); err != nil {
log.Println("DefaultConsumer Consumer handleTask error: ", err)
}
case <-quit:
ticker.Stop()
done <- struct{}{}
return
}
}
}()

<-done
}