Why River Is the Go‑Friendly Queue That Guarantees Transactional Consistency

This article explains how the River library leverages PostgreSQL to provide a Go‑native job queue with true transactional guarantees, high concurrency via goroutines, and efficient scheduling using SKIP LOCKED, while offering step‑by‑step setup and code examples for rapid adoption.

FunTester
FunTester
FunTester
Why River Is the Go‑Friendly Queue That Guarantees Transactional Consistency

What Is River?

River is a Go‑specific job‑queue library that tightly integrates with PostgreSQL instead of using a separate message broker. By storing jobs in the same database as application data, River enables true transactional enqueueing: a database update and a job insertion can occur in a single atomic transaction.

Why Choose River?

Transactional guarantee – The library lets you wrap business logic and job insertion in one transaction, eliminating the classic distributed‑transaction problem where a database update succeeds but the corresponding queue operation fails.

// In a single DB transaction, update order and enqueue SMS task
tx, err := db.Begin()
if err != nil { return err }
defer tx.Rollback()
_, err = tx.Exec("UPDATE orders SET status = 'paid' WHERE id = ?", orderID)
if err != nil { return err }
_, err = riverClient.InsertTx(ctx, tx, SendSMSArgs{Phone: user.Phone, Content: "支付成功"}, nil)
if err != nil { return err }
return tx.Commit()

This pattern is especially valuable for finance or e‑commerce systems that require strict data consistency.

High‑Concurrency Processing

River exploits Go’s lightweight goroutine model. Instead of a multi‑process worker model (e.g., Ruby’s Sidekiq), River can run hundreds of goroutine workers within a single process.

workers := river.NewWorkers()
workers.Add(SendEmailWorker{})
workers.Add(ResizeImageWorker{})
workers.Add(GenerateReportWorker{})
riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
    Queues: map[string]river.QueueConfig{
        river.QueueDefault: {MaxWorkers: 100},
        "emails":  {MaxWorkers: 50},
        "reports": {MaxWorkers: 20},
    },
    Workers: workers,
})

Benefits:

Resource efficiency : goroutine memory overhead is far lower than full processes.

Scalability : increase MaxWorkers without adding machines.

PostgreSQL Advantages

Modern PostgreSQL (9.5+) offers the SKIP LOCKED clause, which lets multiple workers fetch jobs without blocking each other. This turns the database into an efficient queue engine capable of handling thousands of tasks per second while preserving ACID guarantees.

SELECT * FROM river_job
WHERE state = 'available' AND queue = 'default'
ORDER BY priority DESC, scheduled_at
LIMIT 100
FOR UPDATE SKIP LOCKED;

With SSD I/O, a typical MacBook Air can process 10 000 simple jobs per second using River.

Underlying Implementation

River creates a river_job table to store all jobs. Key columns include id, state, queue, kind, args (JSONB), priority, timestamps, and attempt counters.

CREATE TABLE river.river_job (
    id BIGSERIAL PRIMARY KEY,
    state TEXT NOT NULL,
    queue TEXT NOT NULL,
    kind TEXT NOT NULL,
    args JSONB NOT NULL,
    priority SMALLINT NOT NULL DEFAULT 3,
    scheduled_at TIMESTAMPTZ NOT NULL,
    attempted_at TIMESTAMPTZ,
    attempt SMALLINT NOT NULL DEFAULT 0,
    max_attempts SMALLINT NOT NULL DEFAULT 25,
    errors JSONB,
    created_at TIMESTAMPTZ NOT NULL,
    finalized_at TIMESTAMPTZ
);
CREATE INDEX idx_river_job_fetch ON river.river_job(queue, state, scheduled_at, priority) WHERE state = 'available';

Job fetching runs inside a transaction that atomically updates the state to running and returns a batch of jobs:

func (c *Client) fetchJobs(ctx context.Context, queue string, limit int) ([]*Job, error) {
    tx, err := c.db.Begin(ctx)
    if err != nil { return nil, err }
    defer tx.Rollback(ctx)
    rows, err := tx.Query(ctx, `
        UPDATE river_job SET state='running', attempted_at=NOW(), attempt=attempt+1
        WHERE id IN (
            SELECT id FROM river_job
            WHERE state='available' AND queue=$1 AND scheduled_at<=NOW()
            ORDER BY priority ASC, scheduled_at ASC
            LIMIT $2
            FOR UPDATE SKIP LOCKED
        )
        RETURNING *
    `, queue, limit)
    jobs := parseJobs(rows)
    return jobs, tx.Commit(ctx)
}

Key points:

Batch fetching reduces round‑trips.

Atomic state change prevents duplicate consumption. SKIP LOCKED avoids contention among workers.

Concurrency Scheduling Model

A single producer goroutine pulls jobs in batches and pushes them onto a buffered channel. Multiple worker goroutines consume from this channel, achieving load balancing and reducing database pressure.

┌───────────────────────┐
│   River Client         │
│  ┌─────────────────┐  │
│  │ Producer Goroutine│ │
│  └───────┬─────────┘ │
│          ▼            │
│  ┌─────────────────┐  │
│  │ Job Channel     │ │
│  └───────┬─────────┘ │
│   ┌─────┴─────┐ ┌─────┴─────┐
│   │Worker 1   │ │Worker N   │
│   └───────────┘ └───────────┘
└───────────────────────┘

Advantages include reduced DB load, automatic load balancing, and easy horizontal scaling of workers.

Quick Start: From Zero to One

Installation & Configuration

go get github.com/riverqueue/river
go get github.com/riverqueue/river/riverdriver/riverpgxv5
go get github.com/jackc/pgx/v5/pgxpool

Create the required schema (River can auto‑create it, but you may run the statements manually):

CREATE SCHEMA river;
CREATE TABLE river.river_job (
    id bigserial PRIMARY KEY,
    state text NOT NULL,
    queue text NOT NULL,
    kind text NOT NULL,
    args jsonb NOT NULL,
    -- other columns omitted for brevity
);
CREATE INDEX idx_river_job_state_queue ON river.river_job(state, queue);

Define a Type‑Safe Task

// Task arguments struct (JSONB storage, but Go‑typed)
type SendEmailArgs struct {
    Email       string   `json:"email"`
    Subject     string   `json:"subject"`
    Body        string   `json:"body"`
    Attachments []string `json:"attachments,omitempty"`
}
func (SendEmailArgs) Kind() string { return "send_email" }

type SendEmailWorker struct { river.WorkerDefaults[SendEmailArgs] }

func (w *SendEmailWorker) Work(ctx context.Context, job *river.Job[SendEmailArgs]) error {
    args := job.Args
    fmt.Printf("Sending email to %s
", args.Email)
    fmt.Printf("Subject: %s
", args.Subject)
    if err := sendEmail(args.Email, args.Subject, args.Body, args.Attachments); err != nil {
        return fmt.Errorf("send email failed: %w", err)
    }
    fmt.Println("Email sent successfully!")
    return nil
}

Start the Client and Enqueue a Job

package main

import (
    "context"
    "fmt"
    "log"
    "github.com/jackc/pgx/v5/pgxpool"
    "github.com/riverqueue/river"
    "github.com/riverqueue/river/riverdriver/riverpgxv5"
)

func main() {
    ctx := context.Background()
    dbPool, err := pgxpool.New(ctx, "postgres://user:pass@localhost/mydb")
    if err != nil { log.Fatal(err) }
    defer dbPool.Close()

    workers := river.NewWorkers()
    river.AddWorker(workers, &SendEmailWorker{})

    riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{Queues: map[string]river.QueueConfig{river.QueueDefault: {MaxWorkers: 100}}, Workers: workers})
    if err != nil { log.Fatal(err) }

    if err := riverClient.Start(ctx); err != nil { log.Fatal(err) }
    defer riverClient.Stop(ctx)

    _, err = riverClient.Insert(ctx, SendEmailArgs{Email: "[email protected]", Subject: "Welcome to River", Body: "This is a test email"}, nil)
    if err != nil { log.Fatal(err) }
    fmt.Println("Job enqueued, awaiting processing…")
    select {}
}

Running the program will start the worker pool, pull the queued email task, and print a success message.

Conclusion

River provides a Go‑native, PostgreSQL‑backed job queue that eliminates distributed‑transaction headaches, leverages Go’s concurrency model for massive parallelism, and benefits from modern PostgreSQL features like SKIP LOCKED. By consolidating the queue and data store, it reduces operational complexity and cost while delivering ACID‑safe, high‑throughput background processing.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

concurrencyGoMessage QueuePostgreSQLTransactionalRiver
FunTester
Written by

FunTester

10k followers, 1k articles | completely useless

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.