Mastering Delayed, Priority, and Retry Tasks with River – A Go Queue Deep Dive
This article explains how River, a Go job‑queue library, implements delayed execution, priority handling, exponential‑backoff retries, batch inserts, monitoring, and best‑practice patterns, and compares it with other queue solutions to help developers build reliable, high‑performance background processing pipelines.
Delayed Tasks: The Art of Timed Execution
River allows scheduling jobs for future execution by setting the scheduled_at column to a future timestamp. For example, inserting a job that checks order timeout 30 minutes later ensures orders are automatically cancelled if unpaid, eliminating the need for separate polling cron jobs.
// Execute order timeout check after 30 minutes
_, err = riverClient.Insert(ctx, CheckOrderTimeoutArgs{OrderID: "order_12345"}, &river.InsertOpts{ScheduledAt: time.Now().Add(30 * time.Minute)})The implementation works in three steps: (1) store the future scheduled_at value, (2) workers query with WHERE scheduled_at <= NOW(), and (3) jobs whose time has not arrived are automatically filtered out.
Priority Queues: Doing Important Work First
River supports per‑job priority via the priority field; lower numbers mean higher priority. Example code shows a VIP order (priority 1) and a normal order (priority 3). Workers consume jobs ordered by ORDER BY priority ASC, guaranteeing that critical tasks run before less important ones.
// High priority: VIP order
_, err = riverClient.Insert(ctx, ProcessOrderArgs{OrderID: "vip_order_001"}, &river.InsertOpts{Priority: 1, Queue: "orders"})
// Low priority: normal order
_, err = riverClient.Insert(ctx, ProcessOrderArgs{OrderID: "normal_order_002"}, &river.InsertOpts{Priority: 3, Queue: "orders"})Retry Mechanism: Making Failures Tolerable
River provides a configurable retry strategy. Workers can define MaxAttempts() and NextRetry(job). The example uses exponential back‑off, increasing the delay after each attempt (30 s, 2 min, 8 min, …). When the attempt count exceeds the maximum, the job state is set to discarded.
type SendEmailWorker struct { river.WorkerDefaults[SendEmailArgs] }
func (w *SendEmailWorker) MaxAttempts() int { return 5 }
func (w *SendEmailWorker) NextRetry(job *river.Job[SendEmailArgs]) time.Time {
seconds := int(math.Pow(2, float64(job.Attempt))) * 30
return time.Now().Add(time.Duration(seconds) * time.Second)
}
func (w *SendEmailWorker) Work(ctx context.Context, job *river.Job[SendEmailArgs]) error {
err := sendEmail(job.Args.Email, job.Args.Subject, job.Args.Body, job.Args.Attachments)
if err != nil {
return fmt.Errorf("send failed, retry after %s: %w", w.NextRetry(job).Sub(time.Now()), err)
}
return nil
}Batch Processing: Performance Booster
Inserting many jobs at once using PostgreSQL's COPY FROM command can increase throughput tenfold. The sample builds a slice of InsertManyParams for 1,000 email notifications and inserts them in a single call.
// Batch enqueue 1000 email jobs
jobs := make([]river.InsertManyParams, 0, 1000)
for i := 0; i < 1000; i++ {
jobs = append(jobs, river.InsertManyParams{Args: SendEmailArgs{Email: fmt.Sprintf("user%[email protected]", i), Subject: "批量通知", Body: "系统升级通知"}})
}
_, err = riverClient.InsertMany(ctx, jobs)Monitoring & Operations: Observability
River exposes queue statistics (pending, running, completed, failed). These metrics can be logged or exported to Prometheus/Grafana for real‑time health checks. Example code prints stats and raises an alert when pending jobs exceed a threshold.
stats, err := riverClient.QueueStats(ctx, "default")
if err != nil { log.Fatal(err) }
fmt.Printf("Queue: %s
Pending: %d
Running: %d
Completed: %d
Failed: %d
", stats.Queue, stats.Pending, stats.Running, stats.Completed, stats.Failed)
if stats.Pending > 10000 { log.Println("Alert: queue backlog, consider increasing MaxWorkers") }Real‑World E‑commerce Example
A payment callback updates order status, decrements stock, and enqueues asynchronous tasks (SMS, email, points) within a single database transaction. Priorities ensure the SMS is sent immediately, while points are processed later. This guarantees data consistency and fast API response.
func handlePaymentCallback(w http.ResponseWriter, r *http.Request) {
orderID := r.FormValue("order_id")
tx, err := db.Begin()
if err != nil { http.Error(w, "系统错误", 500); return }
defer tx.Rollback()
// Update order status
_, err = tx.Exec(`UPDATE orders SET status='paid', paid_at=NOW() WHERE id=$1 AND status='pending'`, orderID)
if err != nil { http.Error(w, "订单更新失败", 500); return }
// Decrement stock
_, err = tx.Exec(`UPDATE products p SET stock = stock - oi.quantity FROM order_items oi WHERE oi.order_id=$1 AND p.id = oi.product_id`, orderID)
if err != nil { http.Error(w, "库存扣减失败", 500); return }
// Enqueue SMS (high priority)
_, err = riverClient.InsertTx(ctx, tx, SendSMSArgs{OrderID: orderID}, &river.InsertOpts{Priority: 1})
// Enqueue Email (normal priority)
_, err = riverClient.InsertTx(ctx, tx, SendEmailArgs{OrderID: orderID}, &river.InsertOpts{Priority: 2})
// Enqueue Points (low priority)
_, err = riverClient.InsertTx(ctx, tx, AddPointsArgs{OrderID: orderID}, &river.InsertOpts{Priority: 3})
if err := tx.Commit(); err != nil { http.Error(w, "系统错误", 500); return }
w.Write([]byte("SUCCESS"))
}Horizontal Comparison with Other Queue Solutions
Key differences include transaction support (River has native DB transactions, others need two‑phase commit), concurrency model (lightweight Goroutine vs. multi‑process), deployment complexity (single PostgreSQL vs. Redis, RabbitMQ, Kafka clusters), performance, persistence, ordering guarantees, operational cost, and learning curve. River excels for medium‑scale systems that require strong consistency with low operational overhead.
Best Practices & Caveats
Ensure all job logic is idempotent so repeated execution does not cause side effects.
Set sensible timeouts for long‑running or external‑service‑dependent jobs to avoid blocking the queue.
Continuously monitor backlog and failure rates; scale workers proactively.
Configure the DB connection pool to match the number of River workers and overall load.
Keep job payloads small (e.g., pass object IDs instead of large blobs).
Isolate queues by task type (critical, reporting, notifications) to improve robustness and scaling granularity.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
