Big Data 17 min read

Analyzing Spark's Iceberg Data Reading Process and Small‑File Merging

This article explains how Spark reads data from Apache Iceberg tables by parsing snapshots and manifest files into DataFile objects, creates Batch and InputPartition objects, uses readers to materialize InternalRows, and then demonstrates how Iceberg's RewriteDataFilesAction can merge tiny Parquet files into larger ones through Spark‑driven tasks.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
Analyzing Spark's Iceberg Data Reading Process and Small‑File Merging

Part 1: Spark reading Iceberg workflow

When Spark reads an Iceberg table it first resolves the table’s snapshot, manifest files and data files into DataFile objects, then converts each file into an InternalRow for the engine layer.

The read process consists of two main steps:

Parse Iceberg metadata (snapshot, manifest) to obtain DataFile objects.

Read the physical files and wrap each record as a Spark InternalRow.

Spark engine integration

Spark implements the org.apache.spark.sql.connector.read.Batch interface. The workflow is: SparkTable implements SupportsRead and creates a SparkScanBuilder via newScanBuilder. SparkScanBuilder builds a SparkBatchQueryScan. SparkBatchQueryScan creates a Batch object via toBatch. Batch generates input partitions with planInputPartitions.

The partitions are turned into Reader instances that actually read the files.

The Batch interface is defined as:

//org.apache.spark.sql.connector.read.Batch;
public interface Batch {

  // Represents an input split
  InputPartition[] planInputPartitions();

  // Creates a Reader for each split
  PartitionReaderFactory createReaderFactory();
}

Each InputPartition is a ReadTask that encapsulates an Iceberg DataFile. Spark then uses a ReaderFactory to create either a row‑oriented RowReader or a columnar BatchReader:

static class ReaderFactory implements PartitionReaderFactory {

    @Override
    public PartitionReader<InternalRow> createReader(InputPartition partition) {
      if (partition instanceof ReadTask) {
        return new RowReader((ReadTask) partition);
      }
    }

    @Override // vectorized reading support
    public PartitionReader<ColumnarBatch> createColumnarReader(InputPartition partition) {
      if (partition instanceof ReadTask) {
        return new BatchReader((ReadTask) partition, batchSize);
      }
    }
  }

The core row reader iterates over the combined scan tasks, opens each file (Parquet, ORC, etc.), and returns rows via next() and get() methods.

Part 2: Iceberg small‑file merging overview

Iceberg provides the RewriteDataFilesAction API to compact many tiny files into larger ones. The following Spark code creates a session, loads a table, and rewrites files to a target size of 10 KB:

Configuration conf = new Configuration();
conf.set(METASTOREURIS.varname, "thrift://localhost:9083");
Map<String, String> maps = Maps.newHashMap();
maps.put("path", "default.iteblog");
DataSourceOptions options = new DataSourceOptions(maps);
Table table = findTable(options, conf);
SparkSession.builder()
        .master("local[2]")
        .config(SQLConf.PARTITION_OVERWRITE_MODE().key(), "dynamic")
        .config("spark.hadoop." + METASTOREURIS.varname, "thrift://localhost:9083")
        .config("spark.executor.heartbeatInterval", "100000")
        .config("spark.network.timeoutInterval", "100000")
        .enableHiveSupport()
        .getOrCreate();
Actions.forTable(table).rewriteDataFiles()
        .targetSizeInBytes(10 * 1024) // 10KB
        .execute();

After execution Iceberg groups files by partition, splits large files, combines small ones, and creates a new CombinedScanTask list. These tasks are parallelized as an RDD, each task reads the original files and writes merged output files:

JavaRDD<CombinedScanTask> taskRDD = sparkContext.parallelize(combinedScanTasks, combinedScanTasks.size());

public List<DataFile> rewriteDataForTasks(JavaRDD<CombinedScanTask> taskRDD) {
    JavaRDD<TaskResult> taskCommitRDD = taskRDD.map(this::rewriteDataForTask);
    return taskCommitRDD.collect().stream()
        .flatMap(taskCommit -> Arrays.stream(taskCommit.files()))
        .collect(Collectors.toList());
}

Each rewriteDataForTask creates a RowDataReader, an appropriate AppenderFactory, and a BaseWriter (partitioned or unpartitioned) to write the merged Parquet files. The new files are added to a fresh snapshot while the old tiny files are marked as deleted.

In practice the merge may not always produce files smaller than the target size, and partitions containing a single file are left untouched.

Key takeaways

Spark reads Iceberg tables by converting metadata into ReadTask objects and using the DataSource V2 read APIs.

Iceberg’s RewriteDataFilesAction leverages Spark parallelism to compact small Parquet files, improving query performance and storage efficiency.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

Big DataData LakeSparkIcebergfile merging
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.