Analyzing Spark's Iceberg Data Reading Process and Small‑File Merging
This article explains how Spark reads data from Apache Iceberg tables by parsing snapshots and manifest files into DataFile objects, creates Batch and InputPartition objects, uses readers to materialize InternalRows, and then demonstrates how Iceberg's RewriteDataFilesAction can merge tiny Parquet files into larger ones through Spark‑driven tasks.
Part 1: Spark reading Iceberg workflow
When Spark reads an Iceberg table it first resolves the table’s snapshot, manifest files and data files into DataFile objects, then converts each file into an InternalRow for the engine layer.
The read process consists of two main steps:
Parse Iceberg metadata (snapshot, manifest) to obtain DataFile objects.
Read the physical files and wrap each record as a Spark InternalRow.
Spark engine integration
Spark implements the org.apache.spark.sql.connector.read.Batch interface. The workflow is: SparkTable implements SupportsRead and creates a SparkScanBuilder via newScanBuilder. SparkScanBuilder builds a SparkBatchQueryScan. SparkBatchQueryScan creates a Batch object via toBatch. Batch generates input partitions with planInputPartitions.
The partitions are turned into Reader instances that actually read the files.
The Batch interface is defined as:
//org.apache.spark.sql.connector.read.Batch;
public interface Batch {
// Represents an input split
InputPartition[] planInputPartitions();
// Creates a Reader for each split
PartitionReaderFactory createReaderFactory();
}Each InputPartition is a ReadTask that encapsulates an Iceberg DataFile. Spark then uses a ReaderFactory to create either a row‑oriented RowReader or a columnar BatchReader:
static class ReaderFactory implements PartitionReaderFactory {
@Override
public PartitionReader<InternalRow> createReader(InputPartition partition) {
if (partition instanceof ReadTask) {
return new RowReader((ReadTask) partition);
}
}
@Override // vectorized reading support
public PartitionReader<ColumnarBatch> createColumnarReader(InputPartition partition) {
if (partition instanceof ReadTask) {
return new BatchReader((ReadTask) partition, batchSize);
}
}
}The core row reader iterates over the combined scan tasks, opens each file (Parquet, ORC, etc.), and returns rows via next() and get() methods.
Part 2: Iceberg small‑file merging overview
Iceberg provides the RewriteDataFilesAction API to compact many tiny files into larger ones. The following Spark code creates a session, loads a table, and rewrites files to a target size of 10 KB:
Configuration conf = new Configuration();
conf.set(METASTOREURIS.varname, "thrift://localhost:9083");
Map<String, String> maps = Maps.newHashMap();
maps.put("path", "default.iteblog");
DataSourceOptions options = new DataSourceOptions(maps);
Table table = findTable(options, conf);
SparkSession.builder()
.master("local[2]")
.config(SQLConf.PARTITION_OVERWRITE_MODE().key(), "dynamic")
.config("spark.hadoop." + METASTOREURIS.varname, "thrift://localhost:9083")
.config("spark.executor.heartbeatInterval", "100000")
.config("spark.network.timeoutInterval", "100000")
.enableHiveSupport()
.getOrCreate();
Actions.forTable(table).rewriteDataFiles()
.targetSizeInBytes(10 * 1024) // 10KB
.execute();After execution Iceberg groups files by partition, splits large files, combines small ones, and creates a new CombinedScanTask list. These tasks are parallelized as an RDD, each task reads the original files and writes merged output files:
JavaRDD<CombinedScanTask> taskRDD = sparkContext.parallelize(combinedScanTasks, combinedScanTasks.size());
public List<DataFile> rewriteDataForTasks(JavaRDD<CombinedScanTask> taskRDD) {
JavaRDD<TaskResult> taskCommitRDD = taskRDD.map(this::rewriteDataForTask);
return taskCommitRDD.collect().stream()
.flatMap(taskCommit -> Arrays.stream(taskCommit.files()))
.collect(Collectors.toList());
}Each rewriteDataForTask creates a RowDataReader, an appropriate AppenderFactory, and a BaseWriter (partitioned or unpartitioned) to write the merged Parquet files. The new files are added to a fresh snapshot while the old tiny files are marked as deleted.
In practice the merge may not always produce files smaller than the target size, and partitions containing a single file are left untouched.
Key takeaways
Spark reads Iceberg tables by converting metadata into ReadTask objects and using the DataSource V2 read APIs.
Iceberg’s RewriteDataFilesAction leverages Spark parallelism to compact small Parquet files, improving query performance and storage efficiency.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
