go语言里用Redis实现延时任务队列,如何延时?

admin 2026-06-24 05:05:28 网络安全文章 来源:ZONE.CI 全球网 0 阅读模式

文章总结: 文档详细介绍了在Go语言中使用RedisZSET实现延时任务队列的技术方案。核心方案是将任务执行时间戳作为score存入有序集合,通过消费者轮询获取到期任务。关键点包括使用Lua脚本实现原子化任务获取与删除、业务层幂等性校验、退避重试机制以及死信队列处理。文章强调需结合数据库状态补偿机制确保可靠性,适用于订单超时等中等精度延时场景。 综合评分: 85 文章分类: 安全开发,解决方案,技术标准


cover_image

go语言里用Redis 实现延时任务队列,如何延时?

原创

go go

Go语言教程

2026年6月22日 13:14 陕西

在小说阅读器读本章

去阅读

订单创建成功了,30 分钟后没支付要自动关单。

这事如果直接起一个 time.AfterFunc,我第一眼就不太信。服务一重启,内存里的定时器全没了;实例一多,同一个订单可能被关两次;任务量一上来,堆里挂一堆 timer,排查问题也不好看。

这种延时任务,我一般会先拿 Redis 的 ZSET 顶一下。

不是因为它多高级,而是够直接。

延时靠什么?

靠 score

把任务丢进 Redis 的有序集合里,score 放任务应该执行的时间戳。消费者不停捞 score <= 当前时间 的任务,捞到就执行。

大概长这样:

ZADD delay:order 1735219800000 close_order:10086

这个任务的意思就是:到 1735219800000 这个毫秒时间后,订单 10086 可以处理了。

真正写 Go 的时候,我不会只塞一个订单号进去,至少要带任务 ID、业务类型、重试次数。线上排障时,日志里只有一个订单号,后面会骂自己。

生产任务大概这样写:

type&nbsp;DelayJob&nbsp;struct&nbsp;{
&nbsp;ID &nbsp; &nbsp; &nbsp; &nbsp;string&nbsp;`json:"id"`
&nbsp;Topic &nbsp; &nbsp;&nbsp;string&nbsp;`json:"topic"`
&nbsp;BizID &nbsp; &nbsp;&nbsp;string&nbsp;`json:"biz_id"`
&nbsp;Retry &nbsp; &nbsp;&nbsp;int&nbsp; &nbsp;&nbsp;`json:"retry"`
&nbsp;CreatedAt&nbsp;int64&nbsp;&nbsp;`json:"created_at"`
}

func&nbsp;PushCloseOrderJob(ctx context.Context, rdb *redis.Client, orderID&nbsp;string, delay time.Duration)&nbsp;error&nbsp;{
&nbsp;now := time.Now()
&nbsp;job := DelayJob{
&nbsp; ID: &nbsp; &nbsp; &nbsp; &nbsp;"close_order:"&nbsp;+ orderID,
&nbsp; Topic: &nbsp; &nbsp;&nbsp;"order.close.timeout",
&nbsp; BizID: &nbsp; &nbsp; orderID,
&nbsp; Retry: &nbsp; &nbsp;&nbsp;0,
&nbsp; CreatedAt: now.UnixMilli(),
&nbsp;}

&nbsp;body, err := json.Marshal(job)
&nbsp;if&nbsp;err !=&nbsp;nil&nbsp;{
&nbsp;&nbsp;return&nbsp;err
&nbsp;}

&nbsp;runAt := now.Add(delay).UnixMilli()

&nbsp;return&nbsp;rdb.ZAdd(ctx,&nbsp;"dq:order", redis.Z{
&nbsp; Score: &nbsp;float64(runAt),
&nbsp; Member:&nbsp;string(body),
&nbsp;}).Err()
}

这里有个小问题:Member 用 JSON,后面删除的时候必须原样删除。JSON 字段顺序别乱搞,所以任务入队之后别重新 Marshal 再删。

有人会写两步:

ZRANGEBYSCORE dq:order&nbsp;0&nbsp;now LIMIT&nbsp;0&nbsp;1
ZREM dq:order job

这个写法在单消费者时没啥事,一上多实例就容易出妖怪。两个消费者同时拿到同一个任务,然后都去执行业务,谁先删掉不一定。

关单这种业务还好,最终查一下订单状态能兜住。要是发券、退款、扣库存,这种重复消费你试试,账对不上时人会很安静。

我一般用 Lua 把“取出来”和“删掉”放到一个原子操作里。

var&nbsp;popDueJob = redis.NewScript(`
local key = KEYS[1]
local now = ARGV[1]

local jobs = redis.call("ZRANGEBYSCORE", key, "-inf", now, "LIMIT", 0, 1)
if&nbsp;#jobs&nbsp;== 0 then
&nbsp; &nbsp; return nil
end

local removed = redis.call("ZREM", key, jobs[1])
if removed == 1 then
&nbsp; &nbsp; return jobs[1]
end

return nil
`)

消费者代码也别写得太“优雅”,延时队列这种东西,最怕日志不够。你要知道它这轮有没有捞到任务,任务执行失败后有没有重新投递,下一次什么时候再跑。

func&nbsp;StartDelayWorker(ctx context.Context, rdb *redis.Client)&nbsp;{
&nbsp;for&nbsp;{
&nbsp;&nbsp;select&nbsp;{
&nbsp;&nbsp;case&nbsp;<-ctx.Done():
&nbsp; &nbsp;return
&nbsp;&nbsp;default:
&nbsp; }

&nbsp; raw, err := popDueJob.Run(ctx, rdb, []string{"dq:order"}, time.Now().UnixMilli()).Text()
&nbsp;&nbsp;if&nbsp;err == redis.Nil {
&nbsp; &nbsp;time.Sleep(200&nbsp;* time.Millisecond)
&nbsp; &nbsp;continue
&nbsp; }
&nbsp;&nbsp;if&nbsp;err !=&nbsp;nil&nbsp;{
&nbsp; &nbsp;log.Printf("delay_queue pop error: %v", err)
&nbsp; &nbsp;time.Sleep(time.Second)
&nbsp; &nbsp;continue
&nbsp; }

&nbsp;&nbsp;var&nbsp;job DelayJob
&nbsp;&nbsp;if&nbsp;err := json.Unmarshal([]byte(raw), &job); err !=&nbsp;nil&nbsp;{
&nbsp; &nbsp;log.Printf("delay_queue bad job raw=%s err=%v", raw, err)
&nbsp; &nbsp;continue
&nbsp; }

&nbsp;&nbsp;if&nbsp;err := handleJob(ctx, rdb, job); err !=&nbsp;nil&nbsp;{
&nbsp; &nbsp;log.Printf("delay_queue handle fail job_id=%s biz_id=%s retry=%d err=%v",
&nbsp; &nbsp; job.ID, job.BizID, job.Retry, err)

&nbsp; &nbsp;_ = retryLater(ctx, rdb, job, err)
&nbsp; &nbsp;continue
&nbsp; }

&nbsp; log.Printf("delay_queue done job_id=%s topic=%s biz_id=%s",
&nbsp; &nbsp;job.ID, job.Topic, job.BizID)
&nbsp;}
}

业务处理这里,不要上来就关单。

先查状态。

func&nbsp;handleJob(ctx context.Context, rdb *redis.Client, job DelayJob)&nbsp;error&nbsp;{
&nbsp;switch&nbsp;job.Topic {
&nbsp;case&nbsp;"order.close.timeout":
&nbsp;&nbsp;return&nbsp;closeOrderIfUnpaid(ctx, job.BizID)
&nbsp;default:
&nbsp;&nbsp;return&nbsp;fmt.Errorf("unknown topic: %s", job.Topic)
&nbsp;}
}

func&nbsp;closeOrderIfUnpaid(ctx context.Context, orderID&nbsp;string)&nbsp;error&nbsp;{
&nbsp;order, err := loadOrder(ctx, orderID)
&nbsp;if&nbsp;err !=&nbsp;nil&nbsp;{
&nbsp;&nbsp;return&nbsp;err
&nbsp;}

&nbsp;if&nbsp;order.Status !=&nbsp;"WAIT_PAY"&nbsp;{
&nbsp; log.Printf("close_order skip order_id=%s status=%s", orderID, order.Status)
&nbsp;&nbsp;return&nbsp;nil
&nbsp;}

&nbsp;return&nbsp;markOrderClosed(ctx, orderID)
}

这句判断很土,但是救命。

延时队列只能保证“到点后把任务拿出来执行”,不能保证业务一定只执行一次。真正的一次性,要靠业务幂等。比如关单时带状态条件:

UPDATE&nbsp;t_order
SET&nbsp;status&nbsp;=&nbsp;'CLOSED', closed_at =&nbsp;NOW()
WHERE&nbsp;order_id = ?
&nbsp;&nbsp;AND&nbsp;status&nbsp;=&nbsp;'WAIT_PAY';

影响行数是 1,说明这次真关了。

影响行数是 0,要么已经支付,要么已经被别的消费者关了。别硬报错。

失败重试也别立刻塞回去,否则 Redis 和业务系统都被你自己打爆。简单点,退避重试:

func&nbsp;retryLater(ctx context.Context, rdb *redis.Client, job DelayJob, cause error)&nbsp;error&nbsp;{
&nbsp;if&nbsp;job.Retry >=&nbsp;5&nbsp;{
&nbsp; log.Printf("delay_queue dead job_id=%s biz_id=%s err=%v", job.ID, job.BizID, cause)
&nbsp;&nbsp;return&nbsp;rdb.LPush(ctx,&nbsp;"dq:order:dead", mustJSON(job)).Err()
&nbsp;}

&nbsp;job.Retry++

&nbsp;wait := time.Duration(job.Retry*job.Retry) * time.Second
&nbsp;nextAt := time.Now().Add(wait).UnixMilli()

&nbsp;return&nbsp;rdb.ZAdd(ctx,&nbsp;"dq:order", redis.Z{
&nbsp; Score: &nbsp;float64(nextAt),
&nbsp; Member: mustJSON(job),
&nbsp;}).Err()
}

func&nbsp;mustJSON(job DelayJob)&nbsp;string&nbsp;{
&nbsp;b, _ := json.Marshal(job)
&nbsp;return&nbsp;string(b)
}

这里有个取舍。

这种 Redis 延时队列,适合订单超时关闭、短信补偿、状态轮询、轻量级回调重试。任务量不是特别夸张,延时精度要求在几百毫秒到几秒,完全够用。

但它不是银弹。

消费者是轮询的,间隔太小,Redis QPS 会上去;间隔太大,任务执行就会有抖动。ZSET 里任务特别多时,也要拆 key,比如按业务拆、按时间片拆,不然一个大 key 后面迁移和排查都难受。

还有一点,Redis 挂了怎么办?

别装没看见。重要任务最好在业务库里也留一份状态。Redis 更像调度器,不要让它变成唯一账本。订单这种场景,库里本来就有订单状态,延时任务丢了,还可以靠定时补偿扫一遍:

SELECT&nbsp;order_id
FROM&nbsp;t_order
WHERE&nbsp;status&nbsp;=&nbsp;'WAIT_PAY'
&nbsp;&nbsp;AND&nbsp;created_at <&nbsp;NOW() -&nbsp;INTERVAL&nbsp;'30 minutes'
LIMIT&nbsp;200;

这条补偿 SQL 不一定天天跑出数据,但线上留着心里踏实。

最后再把这个方案压扁看一眼:

任务入队时,ZADD,score 写执行时间。

消费者执行时,Lua 脚本原子取到期任务并删除。

业务处理时,先查状态,再用条件更新兜幂等。

失败时,按退避时间重新 ZADD

处理不了的,丢死信队列,别无限重试。

延时任务难的不是“怎么延时”,Redis 的 ZSET 已经把这事做得很直白了。难的是重复消费、任务丢失、服务重启、失败重试这些边角。代码写到这些地方,才算真能上线。


免责声明:

本文所载程序、技术方法仅面向合法合规的安全研究与教学场景,旨在提升网络安全防护能力,具有明确的技术研究属性。

任何单位或个人未经授权,将本文内容用于攻击、破坏等非法用途的,由此引发的全部法律责任、民事赔偿及连带责任,均由行为人独立承担,本站不承担任何连带责任。

本站内容均为技术交流与知识分享目的发布,若存在版权侵权或其他异议,请通过邮件联系处理,具体联系方式可点击页面上方的联系我

本文转载自:Go语言教程 go go《go语言里用Redis 实现延时任务队列,如何延时?》

评论:0   参与:  0