文章总结: 文档详细介绍了在Go语言中使用RedisZSET实现延时任务队列的技术方案。核心方案是将任务执行时间戳作为score存入有序集合,通过消费者轮询获取到期任务。关键点包括使用Lua脚本实现原子化任务获取与删除、业务层幂等性校验、退避重试机制以及死信队列处理。文章强调需结合数据库状态补偿机制确保可靠性,适用于订单超时等中等精度延时场景。 综合评分: 85 文章分类: 安全开发,解决方案,技术标准
go语言里用Redis 实现延时任务队列,如何延时?
原创
go go
Go语言教程
2026年6月22日 13:14 陕西
在小说阅读器读本章
去阅读
订单创建成功了,30 分钟后没支付要自动关单。
这事如果直接起一个 time.AfterFunc,我第一眼就不太信。服务一重启,内存里的定时器全没了;实例一多,同一个订单可能被关两次;任务量一上来,堆里挂一堆 timer,排查问题也不好看。
这种延时任务,我一般会先拿 Redis 的 ZSET 顶一下。
不是因为它多高级,而是够直接。
延时靠什么?
靠 score。
把任务丢进 Redis 的有序集合里,score 放任务应该执行的时间戳。消费者不停捞 score <= 当前时间 的任务,捞到就执行。
大概长这样:
ZADD delay:order 1735219800000 close_order:10086
这个任务的意思就是:到 1735219800000 这个毫秒时间后,订单 10086 可以处理了。
真正写 Go 的时候,我不会只塞一个订单号进去,至少要带任务 ID、业务类型、重试次数。线上排障时,日志里只有一个订单号,后面会骂自己。
生产任务大概这样写:
type DelayJob struct {
ID string `json:"id"`
Topic string `json:"topic"`
BizID string `json:"biz_id"`
Retry int `json:"retry"`
CreatedAt int64 `json:"created_at"`
}
func PushCloseOrderJob(ctx context.Context, rdb *redis.Client, orderID string, delay time.Duration) error {
now := time.Now()
job := DelayJob{
ID: "close_order:" + orderID,
Topic: "order.close.timeout",
BizID: orderID,
Retry: 0,
CreatedAt: now.UnixMilli(),
}
body, err := json.Marshal(job)
if err != nil {
return err
}
runAt := now.Add(delay).UnixMilli()
return rdb.ZAdd(ctx, "dq:order", redis.Z{
Score: float64(runAt),
Member: string(body),
}).Err()
}
这里有个小问题:Member 用 JSON,后面删除的时候必须原样删除。JSON 字段顺序别乱搞,所以任务入队之后别重新 Marshal 再删。
有人会写两步:
ZRANGEBYSCORE dq:order 0 now LIMIT 0 1
ZREM dq:order job
这个写法在单消费者时没啥事,一上多实例就容易出妖怪。两个消费者同时拿到同一个任务,然后都去执行业务,谁先删掉不一定。
关单这种业务还好,最终查一下订单状态能兜住。要是发券、退款、扣库存,这种重复消费你试试,账对不上时人会很安静。
我一般用 Lua 把“取出来”和“删掉”放到一个原子操作里。
var popDueJob = redis.NewScript(`
local key = KEYS[1]
local now = ARGV[1]
local jobs = redis.call("ZRANGEBYSCORE", key, "-inf", now, "LIMIT", 0, 1)
if #jobs == 0 then
return nil
end
local removed = redis.call("ZREM", key, jobs[1])
if removed == 1 then
return jobs[1]
end
return nil
`)
消费者代码也别写得太“优雅”,延时队列这种东西,最怕日志不够。你要知道它这轮有没有捞到任务,任务执行失败后有没有重新投递,下一次什么时候再跑。
func StartDelayWorker(ctx context.Context, rdb *redis.Client) {
for {
select {
case <-ctx.Done():
return
default:
}
raw, err := popDueJob.Run(ctx, rdb, []string{"dq:order"}, time.Now().UnixMilli()).Text()
if err == redis.Nil {
time.Sleep(200 * time.Millisecond)
continue
}
if err != nil {
log.Printf("delay_queue pop error: %v", err)
time.Sleep(time.Second)
continue
}
var job DelayJob
if err := json.Unmarshal([]byte(raw), &job); err != nil {
log.Printf("delay_queue bad job raw=%s err=%v", raw, err)
continue
}
if err := handleJob(ctx, rdb, job); err != nil {
log.Printf("delay_queue handle fail job_id=%s biz_id=%s retry=%d err=%v",
job.ID, job.BizID, job.Retry, err)
_ = retryLater(ctx, rdb, job, err)
continue
}
log.Printf("delay_queue done job_id=%s topic=%s biz_id=%s",
job.ID, job.Topic, job.BizID)
}
}
业务处理这里,不要上来就关单。
先查状态。
func handleJob(ctx context.Context, rdb *redis.Client, job DelayJob) error {
switch job.Topic {
case "order.close.timeout":
return closeOrderIfUnpaid(ctx, job.BizID)
default:
return fmt.Errorf("unknown topic: %s", job.Topic)
}
}
func closeOrderIfUnpaid(ctx context.Context, orderID string) error {
order, err := loadOrder(ctx, orderID)
if err != nil {
return err
}
if order.Status != "WAIT_PAY" {
log.Printf("close_order skip order_id=%s status=%s", orderID, order.Status)
return nil
}
return markOrderClosed(ctx, orderID)
}
这句判断很土,但是救命。
延时队列只能保证“到点后把任务拿出来执行”,不能保证业务一定只执行一次。真正的一次性,要靠业务幂等。比如关单时带状态条件:
UPDATE t_order
SET status = 'CLOSED', closed_at = NOW()
WHERE order_id = ?
AND status = 'WAIT_PAY';
影响行数是 1,说明这次真关了。
影响行数是 0,要么已经支付,要么已经被别的消费者关了。别硬报错。
失败重试也别立刻塞回去,否则 Redis 和业务系统都被你自己打爆。简单点,退避重试:
func retryLater(ctx context.Context, rdb *redis.Client, job DelayJob, cause error) error {
if job.Retry >= 5 {
log.Printf("delay_queue dead job_id=%s biz_id=%s err=%v", job.ID, job.BizID, cause)
return rdb.LPush(ctx, "dq:order:dead", mustJSON(job)).Err()
}
job.Retry++
wait := time.Duration(job.Retry*job.Retry) * time.Second
nextAt := time.Now().Add(wait).UnixMilli()
return rdb.ZAdd(ctx, "dq:order", redis.Z{
Score: float64(nextAt),
Member: mustJSON(job),
}).Err()
}
func mustJSON(job DelayJob) string {
b, _ := json.Marshal(job)
return string(b)
}
这里有个取舍。
这种 Redis 延时队列,适合订单超时关闭、短信补偿、状态轮询、轻量级回调重试。任务量不是特别夸张,延时精度要求在几百毫秒到几秒,完全够用。
但它不是银弹。
消费者是轮询的,间隔太小,Redis QPS 会上去;间隔太大,任务执行就会有抖动。ZSET 里任务特别多时,也要拆 key,比如按业务拆、按时间片拆,不然一个大 key 后面迁移和排查都难受。
还有一点,Redis 挂了怎么办?
别装没看见。重要任务最好在业务库里也留一份状态。Redis 更像调度器,不要让它变成唯一账本。订单这种场景,库里本来就有订单状态,延时任务丢了,还可以靠定时补偿扫一遍:
SELECT order_id
FROM t_order
WHERE status = 'WAIT_PAY'
AND created_at < NOW() - INTERVAL '30 minutes'
LIMIT 200;
这条补偿 SQL 不一定天天跑出数据,但线上留着心里踏实。
最后再把这个方案压扁看一眼:
任务入队时,ZADD,score 写执行时间。
消费者执行时,Lua 脚本原子取到期任务并删除。
业务处理时,先查状态,再用条件更新兜幂等。
失败时,按退避时间重新 ZADD。
处理不了的,丢死信队列,别无限重试。
延时任务难的不是“怎么延时”,Redis 的 ZSET 已经把这事做得很直白了。难的是重复消费、任务丢失、服务重启、失败重试这些边角。代码写到这些地方,才算真能上线。
免责声明:
本文所载程序、技术方法仅面向合法合规的安全研究与教学场景,旨在提升网络安全防护能力,具有明确的技术研究属性。
任何单位或个人未经授权,将本文内容用于攻击、破坏等非法用途的,由此引发的全部法律责任、民事赔偿及连带责任,均由行为人独立承担,本站不承担任何连带责任。
本站内容均为技术交流与知识分享目的发布,若存在版权侵权或其他异议,请通过邮件联系处理,具体联系方式可点击页面上方的联系我。
本文转载自:Go语言教程 go go《go语言里用Redis 实现延时任务队列,如何延时?》
版权声明
本站仅做备份收录,仅供研究与教学参考之用。
读者将信息用于其他用途的,全部法律及连带责任由读者自行承担,本站不承担任何责任。









评论