How We Cut a 150‑Billion Image Migration from 120 Days to 40 Days

Facing the challenge of moving 15 billion image files from Cassandra to S3, we iteratively redesigned the migration pipeline—from a single‑process approach to multi‑process, queue‑driven, and multi‑cluster deployments—reducing the projected 120‑day effort to just 40 days while ensuring reliability and performance.

360 Zhihui Cloud Developer
360 Zhihui Cloud Developer
360 Zhihui Cloud Developer
How We Cut a 150‑Billion Image Migration from 120 Days to 40 Days

In an era of explosive internet content growth, images become a major data carrier, and storing and managing billions of images present unprecedented challenges. This article shares the complete journey of migrating a 150‑billion‑image repository from Cassandra to S3 object storage, shrinking an estimated 120‑day migration to 40 days through successive optimizations.

First Stage: Single‑Process Migration Challenges

Initially we used the simplest single‑process migration scheme:

func singleProcessMigration(filePath string) {
    file, _ := os.Open(filePath)
    scanner := bufio.NewScanner(file)
    for scanner.Scan() {
        key := scanner.Text()
        data := fetchFromCassandra(key)
        uploadToS3(key, data)
    }
}

This approach had obvious problems:

Performance bottleneck : serial processing made migration extremely slow.

Progress uncontrollable : unable to record migration progress, making recovery after interruption difficult.

Resource waste : low CPU and network utilization on a single machine.

Measurements showed a speed of only 10‑20 files per second, which would require about five years to migrate 30 billion files.

Second Stage: Multi‑File Multi‑Process Improvement

To solve the single‑process issues we switched to a multi‑process solution:

Split large files into many small files.

Each process handles an independent file.

Each process maintains its own progress record.

func multiProcessMigration(fileList []string) {
    for _, file := range fileList {
        go func(f string) {
            // read last progress
            lastPos := readProgress(f)
            // continue migration from breakpoint
            migrateFromPosition(f, lastPos)
        }(file)
    }
}

After the improvement:

Migration speed increased to 1,000‑2,000 files per second.

Support for breakpoint continuation.

New problems emerged:

Progress management complexity : each process records progress independently, making unified management hard.

Error handling difficulty : retry mechanisms were insufficient.

Uneven resource allocation : some processes handled slower files.

Third Stage: Introducing a Message‑Queue Breakthrough

To address the above issues we introduced Redis as a message queue:

// Queue core code (app/task/queue/queue.go)
func Push(key string, verify string) {
    key = strings.TrimSpace(key)
    if key == "" { return }
    queue := queueName(verify)
    size, err := app.RedisW().LPush(queue, key).Result()
    if err != nil {
        app.Log("push_err").Errorf("Failed to push key: %s to queue: %s, error: %v", key, queue, err)
        return
    }
    app.Log().Infof("Added key: %s to queue: %s, new length: %d", key, queue, size)
}
func Pop(verify string) string {
    queue := queueName(verify)
    size, err := app.RedisW().LLen(queue).Result()
    if err != nil { log.Fatalf("failed to get queue length: %s, error: %v", queue, err) }
    if size > 0 {
        value, err := app.RedisW().RPop(queue).Result()
        if err != nil && err != redis.Nil {
            app.Log().Errorf("Failed to pop from queue: %s, error: %v", queue, err)
            log.Fatalf("queue pop error: %v", err)
        }
        return value
    }
    fmt.Printf("Queue %s is empty. Retrying...
", queue)
    time.Sleep(time.Second)
    return ""
}

Architecture optimization:

Producer : imports file content into the Redis queue.

Consumer : multiple processes consume tasks from the queue.

Monitoring system : real‑time monitoring of queue length and consumption speed.

Advantages:

Decoupled production and consumption : producers and consumers can be scaled independently.

Unified progress management : queue length provides an estimate of remaining work.

Flexible retry : failed tasks can be re‑queued.

Fourth Stage: Multi‑Cluster Deployment Performance Boost

To further improve performance we deployed multiple clusters:

nohup ./migrateimgbed -cluster wenku -go 100 >> migrateimgbed.log 2>&1 &

Core migration logic with concurrency control:

func Run() {
    data := make(chan string)
    goNum := app.GoNum
    verify := ""
    fmt.Println("redis info dump:", app.RedisW())
    var wg sync.WaitGroup
    for i := 0; i < goNum; i++ {
        wg.Add(1)
        go consume(data, &wg)
    }
    fmt.Println("Starting task run...")
    go func() {
        defer close(data)
        for {
            key := queue.Pop(verify)
            if key == "" {
                fmt.Println("No data in queue, exiting producer.")
                break
            }
            data <- key
        }
    }()
    wg.Wait()
}

Detailed migration function handling key parsing, Cassandra fetching, and S3 uploading with retries:

func do(key string) error {
    key = strings.TrimSpace(key)
    if strings.HasPrefix(key, "/d/") { key = ParseCustomKey(key) }
    if strings.HasSuffix(key, "_1") && len(key) == 21 { key = strings.TrimRight(key, "_1") }
    arr := strings.SplitAfter(key, "t01")
    if len(arr) < 2 || arr[0] != "t01" { return fmt.Errorf("invalid key format: %s", key) }
    s3Key, valid := ParseKey(key)
    if !valid { return fmt.Errorf("invalid parsed key format: %s", key) }
    start := time.Now()
    s3client := s3.NewClient(app.Cluster)
    var notFound bool
    reader, err := cass.GetFromApi(key, "shyc2")
    if err != nil || reader == nil {
        reader, err = cass.GetFromApi(key, "shbt")
        if err != nil || reader == nil { notFound = true }
    }
    if notFound {
        elapsed := time.Since(start)
        app.Log("not_found_key").Errorf("upload_failed %s %s", key, elapsed)
        return fmt.Errorf("data not found for key: %s", key)
    }
    if !uploadToS3(s3client, s3Key, reader, 3) {
        elapsed := time.Since(start)
        app.Log("found_upload_failed").Errorf("upload_failed %s %s", key, elapsed)
        return fmt.Errorf("failed to upload data to S3 for key: %s", key)
    }
    elapsed := time.Since(start)
    app.Log("ok").Infof("upload_ok %s %s %s", key, s3Key, elapsed)
    return nil
}

Performance indicators:

Single machine 100 goroutine: 3,000‑5,000 files/second (concurrency can be increased).

Six machines concurrent: ~10,000 QPS (some file loss, retries add overhead).

Daily migration volume: 300‑500 million files (depends on batch size).

Fifth Stage: Verification and Diff Handling

After migration we built a comprehensive verification mechanism:

func Verify() {
    data := make(chan string)
    var wg sync.WaitGroup
    fmt.Println(app.RedisW())
    for i := 0; i < app.GoNum; i++ {
        wg.Add(1)
        go consumeHead(data, &wg)
    }
    go func() {
        defer close(data)
        for {
            key := queue.Pop("verify")
            if key == "" { fmt.Println("No data in queue, exiting producer."); break }
            data <- key
        }
    }()
    wg.Wait()
}

For billion‑scale diff comparison we applied a merge‑sort idea:

func mergeSortedFiles(tempFiles []string, outputFile string) error {
    pq := &MinHeap{}
    heap.Init(pq)
    readers := make([]*bufio.Scanner, len(tempFiles))
    files := make([]*os.File, len(tempFiles))
    for i, fileName := range tempFiles {
        file, err := os.Open(fileName)
        if err != nil { return fmt.Errorf("error opening temp file %s: %w", fileName, err) }
        files[i] = file
        readers[i] = bufio.NewScanner(file)
    }
    for i, r := range readers {
        if r.Scan() { heap.Push(pq, FileLine{line: r.Text(), index: i}) }
    }
    output, err := os.Create(outputFile)
    if err != nil { return fmt.Errorf("error creating output file: %w", err) }
    defer output.Close()
    writer := bufio.NewWriter(output)
    var prevLine string
    for pq.Len() > 0 {
        min := heap.Pop(pq).(FileLine)
        if min.line != prevLine { writer.WriteString(min.line + "
"); prevLine = min.line }
        if readers[min.index].Scan() { heap.Push(pq, FileLine{line: readers[min.index].Text(), index: min.index}) }
    }
    writer.Flush()
    for _, f := range files { f.Close() }
    for _, fn := range tempFiles { os.Remove(fn) }
    return nil
}

Tool characteristics:

Chunk processing : splits large files into many small files.

Parallel sorting : multiple goroutines handle each chunk concurrently.

Multi‑way merge : uses a min‑heap to efficiently merge sorted files.

Memory optimization : dynamically adjusts chunk size to avoid overflow.

Sixth Stage: Performance Optimization Key Points

Connection‑pool optimization :

Reuse S3 and Redis connections.

Set appropriate timeouts and retry parameters.

Error handling mechanism (retryable upload):

func uploadToS3(client *s3.Client, key string, reader io.Reader, retries int) bool {
    for i := 0; i < retries; i++ {
        _, err := client.UploadFileByte(context.TODO(), key, reader)
        if err == nil { return true }
        backoff := time.Duration(1<<i) * time.Second
        app.Log("s3").Errorf("up_s3_err key %s, attempt %d, err %s, retrying in %v", key, i+1, err.Error(), backoff)
        time.Sleep(backoff)
    }
    return false
}

Resource monitoring :

Real‑time monitoring of queue backlog.

Dynamic adjustment of goroutine count.

Comprehensive logging of key metrics.

Summary and Outlook

Through four phases of continuous optimization we finally achieved:

Efficiency boost : reduced the estimated 4‑5‑month migration to 40 days.

Reliability guarantee : robust error handling and retry mechanisms.

Resource optimization : balanced migration speed with source‑site load by leveraging cluster resources effectively.

This 150‑billion‑image migration not only solved immediate business needs but also accumulated valuable experience for handling ultra‑large‑scale data migrations. The evolution of technology never stops, and we will continue to explore more efficient and reliable migration solutions.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

data migrationPerformance OptimizationGos3cassandra
360 Zhihui Cloud Developer
Written by

360 Zhihui Cloud Developer

360 Zhihui Cloud is an enterprise open service platform that aims to "aggregate data value and empower an intelligent future," leveraging 360's extensive product and technology resources to deliver platform services to customers.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.