Understanding and Solving Small File Problems in Hive and Spark
This article explains what constitutes a small file in HDFS, why they harm memory, compute and cluster load, outlines common sources such as data sources, streaming and dynamic partitioning, and provides detailed Hive and Spark solutions—including CombineHiveInputFormat, merge parameters, distribute by, and custom Spark extensions—to efficiently merge small files and improve job performance.
Background: In big‑data storage, a "small file" is far smaller than the HDFS block size (typically <1 MB, up to 32 MB) and leads to excessive NameNode metadata, memory waste, and reduced parallelism.
Impact: Small files increase memory usage, cause many short‑lived Map tasks that waste CPU, and overload the NameNode with high request rates.
Causes: They arise from data sources that already contain many tiny files, streaming pipelines (Kafka, Spark Streaming, Flink) that write in short windows, and over‑partitioning when using 动态分区 (dynamic partitioning) in Hive.
Hive Solution – Three "Swords": 1. Input merging via CombineHiveInputFormat , which performs logical splits and then logical merges in three stages (node‑internal, rack‑internal, inter‑rack). 2. Output merging using the merge parameter, which triggers a map‑only job to combine small output files after the main job finishes. 3. Output merging via distribute by , which forces rows with the same key (e.g., partition column) to the same reducer, reducing the number of output files.
CombineHiveInputFormat Details: The class delegates to Hadoop's CombineFileInputFormat for split calculation, stores node and rack metadata in hash maps, and merges blocks based on configurable parameters such as mapred.max.split.size , mapred.min.split.size.per.node , and mapred.min.split.size.per.rack . The merging proceeds in three phases—node‑internal, rack‑internal, and rack‑to‑rack—producing balanced splits that improve Map task parallelism.
Output Merge via merge Parameter: Hive checks the average size of files in the output directory against hive.merge.smallfiles.avgsize . If below the threshold, it launches a map‑only job that re‑uses CombineFileInputFormat after resetting split‑size parameters. A known bug in Hive 2.1.1 prevents parameter reset for RCFile/ORCFile formats; it is fixed in Hive 2.2.0.
Using distribute by : By adding distribute by dt to an INSERT statement, rows sharing the same partition value are sent to the same reducer, turning many tiny files per partition into a single file per partition and avoiding the file‑count limit ( hive.exec.max.created.files ).
Additional Hive Optimizations: Adjust hive.exec.max.created.files to raise the file‑count limit, tune map‑only jobs, increase map task numbers, and set hive.optimize.sort.dynamic.partition=false to prevent unnecessary shuffles that create extra jobs.
Spark Solutions: 1. Spark extensions (e.g., Kyuubi's org.apache.kyuubi.sql.KyuubiSparkSQLExtension ) add an extra shuffle stage to merge small files. 2. Custom spark.sql.sources.commitProtocolClass implementations replace the default SQLHadoopMapReduceCommitProtocol to perform file merging during the commitTask and commitJob phases, moving merged files to the final output directory.
References: Links to Spark small‑file merging on AWS S3, auxiliary optimization rules, and Hive JIRA issue HIVE‑15178.
Appendix: Animated illustration of node‑internal merging and additional diagrams.
Qunar Tech Salon
Qunar Tech Salon is a learning and exchange platform for Qunar engineers and industry peers. We share cutting-edge technology trends and topics, providing a free platform for mid-to-senior technical professionals to exchange and learn.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.