How We Cut a 150‑Billion Image Migration from 120 Days to 40 Days
Facing the challenge of moving 15 billion image files from Cassandra to S3, we iteratively redesigned the migration pipeline—from a single‑process approach to multi‑process, queue‑driven, and multi‑cluster deployments—reducing the projected 120‑day effort to just 40 days while ensuring reliability and performance.
In an era of explosive internet content growth, images become a major data carrier, and storing and managing billions of images present unprecedented challenges. This article shares the complete journey of migrating a 150‑billion‑image repository from Cassandra to S3 object storage, shrinking an estimated 120‑day migration to 40 days through successive optimizations.
First Stage: Single‑Process Migration Challenges
Initially we used the simplest single‑process migration scheme:
func singleProcessMigration(filePath string) {
file, _ := os.Open(filePath)
scanner := bufio.NewScanner(file)
for scanner.Scan() {
key := scanner.Text()
data := fetchFromCassandra(key)
uploadToS3(key, data)
}
}This approach had obvious problems:
Performance bottleneck : serial processing made migration extremely slow.
Progress uncontrollable : unable to record migration progress, making recovery after interruption difficult.
Resource waste : low CPU and network utilization on a single machine.
Measurements showed a speed of only 10‑20 files per second, which would require about five years to migrate 30 billion files.
Second Stage: Multi‑File Multi‑Process Improvement
To solve the single‑process issues we switched to a multi‑process solution:
Split large files into many small files.
Each process handles an independent file.
Each process maintains its own progress record.
func multiProcessMigration(fileList []string) {
for _, file := range fileList {
go func(f string) {
// read last progress
lastPos := readProgress(f)
// continue migration from breakpoint
migrateFromPosition(f, lastPos)
}(file)
}
}After the improvement:
Migration speed increased to 1,000‑2,000 files per second.
Support for breakpoint continuation.
New problems emerged:
Progress management complexity : each process records progress independently, making unified management hard.
Error handling difficulty : retry mechanisms were insufficient.
Uneven resource allocation : some processes handled slower files.
Third Stage: Introducing a Message‑Queue Breakthrough
To address the above issues we introduced Redis as a message queue:
// Queue core code (app/task/queue/queue.go)
func Push(key string, verify string) {
key = strings.TrimSpace(key)
if key == "" { return }
queue := queueName(verify)
size, err := app.RedisW().LPush(queue, key).Result()
if err != nil {
app.Log("push_err").Errorf("Failed to push key: %s to queue: %s, error: %v", key, queue, err)
return
}
app.Log().Infof("Added key: %s to queue: %s, new length: %d", key, queue, size)
}
func Pop(verify string) string {
queue := queueName(verify)
size, err := app.RedisW().LLen(queue).Result()
if err != nil { log.Fatalf("failed to get queue length: %s, error: %v", queue, err) }
if size > 0 {
value, err := app.RedisW().RPop(queue).Result()
if err != nil && err != redis.Nil {
app.Log().Errorf("Failed to pop from queue: %s, error: %v", queue, err)
log.Fatalf("queue pop error: %v", err)
}
return value
}
fmt.Printf("Queue %s is empty. Retrying...
", queue)
time.Sleep(time.Second)
return ""
}Architecture optimization:
Producer : imports file content into the Redis queue.
Consumer : multiple processes consume tasks from the queue.
Monitoring system : real‑time monitoring of queue length and consumption speed.
Advantages:
Decoupled production and consumption : producers and consumers can be scaled independently.
Unified progress management : queue length provides an estimate of remaining work.
Flexible retry : failed tasks can be re‑queued.
Fourth Stage: Multi‑Cluster Deployment Performance Boost
To further improve performance we deployed multiple clusters:
nohup ./migrateimgbed -cluster wenku -go 100 >> migrateimgbed.log 2>&1 &Core migration logic with concurrency control:
func Run() {
data := make(chan string)
goNum := app.GoNum
verify := ""
fmt.Println("redis info dump:", app.RedisW())
var wg sync.WaitGroup
for i := 0; i < goNum; i++ {
wg.Add(1)
go consume(data, &wg)
}
fmt.Println("Starting task run...")
go func() {
defer close(data)
for {
key := queue.Pop(verify)
if key == "" {
fmt.Println("No data in queue, exiting producer.")
break
}
data <- key
}
}()
wg.Wait()
}Detailed migration function handling key parsing, Cassandra fetching, and S3 uploading with retries:
func do(key string) error {
key = strings.TrimSpace(key)
if strings.HasPrefix(key, "/d/") { key = ParseCustomKey(key) }
if strings.HasSuffix(key, "_1") && len(key) == 21 { key = strings.TrimRight(key, "_1") }
arr := strings.SplitAfter(key, "t01")
if len(arr) < 2 || arr[0] != "t01" { return fmt.Errorf("invalid key format: %s", key) }
s3Key, valid := ParseKey(key)
if !valid { return fmt.Errorf("invalid parsed key format: %s", key) }
start := time.Now()
s3client := s3.NewClient(app.Cluster)
var notFound bool
reader, err := cass.GetFromApi(key, "shyc2")
if err != nil || reader == nil {
reader, err = cass.GetFromApi(key, "shbt")
if err != nil || reader == nil { notFound = true }
}
if notFound {
elapsed := time.Since(start)
app.Log("not_found_key").Errorf("upload_failed %s %s", key, elapsed)
return fmt.Errorf("data not found for key: %s", key)
}
if !uploadToS3(s3client, s3Key, reader, 3) {
elapsed := time.Since(start)
app.Log("found_upload_failed").Errorf("upload_failed %s %s", key, elapsed)
return fmt.Errorf("failed to upload data to S3 for key: %s", key)
}
elapsed := time.Since(start)
app.Log("ok").Infof("upload_ok %s %s %s", key, s3Key, elapsed)
return nil
}Performance indicators:
Single machine 100 goroutine: 3,000‑5,000 files/second (concurrency can be increased).
Six machines concurrent: ~10,000 QPS (some file loss, retries add overhead).
Daily migration volume: 300‑500 million files (depends on batch size).
Fifth Stage: Verification and Diff Handling
After migration we built a comprehensive verification mechanism:
func Verify() {
data := make(chan string)
var wg sync.WaitGroup
fmt.Println(app.RedisW())
for i := 0; i < app.GoNum; i++ {
wg.Add(1)
go consumeHead(data, &wg)
}
go func() {
defer close(data)
for {
key := queue.Pop("verify")
if key == "" { fmt.Println("No data in queue, exiting producer."); break }
data <- key
}
}()
wg.Wait()
}For billion‑scale diff comparison we applied a merge‑sort idea:
func mergeSortedFiles(tempFiles []string, outputFile string) error {
pq := &MinHeap{}
heap.Init(pq)
readers := make([]*bufio.Scanner, len(tempFiles))
files := make([]*os.File, len(tempFiles))
for i, fileName := range tempFiles {
file, err := os.Open(fileName)
if err != nil { return fmt.Errorf("error opening temp file %s: %w", fileName, err) }
files[i] = file
readers[i] = bufio.NewScanner(file)
}
for i, r := range readers {
if r.Scan() { heap.Push(pq, FileLine{line: r.Text(), index: i}) }
}
output, err := os.Create(outputFile)
if err != nil { return fmt.Errorf("error creating output file: %w", err) }
defer output.Close()
writer := bufio.NewWriter(output)
var prevLine string
for pq.Len() > 0 {
min := heap.Pop(pq).(FileLine)
if min.line != prevLine { writer.WriteString(min.line + "
"); prevLine = min.line }
if readers[min.index].Scan() { heap.Push(pq, FileLine{line: readers[min.index].Text(), index: min.index}) }
}
writer.Flush()
for _, f := range files { f.Close() }
for _, fn := range tempFiles { os.Remove(fn) }
return nil
}Tool characteristics:
Chunk processing : splits large files into many small files.
Parallel sorting : multiple goroutines handle each chunk concurrently.
Multi‑way merge : uses a min‑heap to efficiently merge sorted files.
Memory optimization : dynamically adjusts chunk size to avoid overflow.
Sixth Stage: Performance Optimization Key Points
Connection‑pool optimization :
Reuse S3 and Redis connections.
Set appropriate timeouts and retry parameters.
Error handling mechanism (retryable upload):
func uploadToS3(client *s3.Client, key string, reader io.Reader, retries int) bool {
for i := 0; i < retries; i++ {
_, err := client.UploadFileByte(context.TODO(), key, reader)
if err == nil { return true }
backoff := time.Duration(1<<i) * time.Second
app.Log("s3").Errorf("up_s3_err key %s, attempt %d, err %s, retrying in %v", key, i+1, err.Error(), backoff)
time.Sleep(backoff)
}
return false
}Resource monitoring :
Real‑time monitoring of queue backlog.
Dynamic adjustment of goroutine count.
Comprehensive logging of key metrics.
Summary and Outlook
Through four phases of continuous optimization we finally achieved:
Efficiency boost : reduced the estimated 4‑5‑month migration to 40 days.
Reliability guarantee : robust error handling and retry mechanisms.
Resource optimization : balanced migration speed with source‑site load by leveraging cluster resources effectively.
This 150‑billion‑image migration not only solved immediate business needs but also accumulated valuable experience for handling ultra‑large‑scale data migrations. The evolution of technology never stops, and we will continue to explore more efficient and reliable migration solutions.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
360 Zhihui Cloud Developer
360 Zhihui Cloud is an enterprise open service platform that aims to "aggregate data value and empower an intelligent future," leveraging 360's extensive product and technology resources to deliver platform services to customers.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
