Big Data 41 min read

Understanding and Solving the Small File Problem in Big Data Systems

This article examines the pervasive small‑file issue in big‑data environments, explains its impact on storage and processing performance, and presents a comprehensive set of solutions—including file merging, Hadoop archives, SequenceFiles, HBase, CombineFileInputFormat, and Spark/Flink strategies—to mitigate metadata overhead and improve I/O efficiency.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
Understanding and Solving the Small File Problem in Big Data Systems

The massive small‑file problem is recognized both in industry and academia as a critical bottleneck in big‑data storage and processing, second only to data skew. Small files degrade IOPS, throughput, and overall cluster stability because traditional disk file systems and VFS are optimized for large sequential I/O.

Small‑File Problem Overview

Key issues include inefficient metadata management (multiple disk accesses for each file’s dentry, inode, and data), poor data layout causing fragmentation, and complex I/O paths through VFS that amplify random access costs.

In distributed file systems, metadata and data are separated across nodes, so the small‑file impact is replicated on each node, further stressing NameNode memory and network traffic.

Benefits of Small‑File Merging

Combining many logical files into a single physical file reduces metadata volume, improves data locality, lowers disk fragmentation, and simplifies I/O paths (open → seek instead of full path lookup), effectively creating a micro‑filesystem optimized for write‑once‑read‑many workloads.

Hadoop‑Specific Strategies

Hadoop defines a small file as one whose size is less than 75% of the block size (commonly 128 MB). Excessive small files increase NameNode memory usage (≈150 bytes per file) and inflate MapReduce task counts, leading to long startup times and network overhead.

Common solutions:

Batch file merging via periodic MapReduce jobs.

SequenceFiles (key = filename, value = content) to pack many files into a splittable block.

HBase column‑family storage for random‑access workloads.

Hadoop Archive (HAR) files to bundle files without changing downstream tools.

NameNode federation to distribute metadata across multiple NameNodes.

CombineFileInputFormat to merge files at read time without persisting a new file.

Hive‑level merging (set hive.merge.* parameters) after INSERT/CTAS operations.

Append feature (rarely supported) for incremental growth of existing files.

Spark Streaming Small‑File Mitigation

Streaming creates a file per partition per micro‑batch, quickly exploding file counts. Mitigation techniques include:

Increasing batch interval to reduce the number of writes.

Using rdd.coalesce() or rdd.repartition() to lower partition count before output.

External periodic merge jobs that consolidate streaming output.

Custom foreach logic to append to existing files (when supported).

Spark SQL Small‑File Solutions

Excessive partitions lead to many output files. Solutions:

Control final DataSet partitions with repartition or coalesce.

Use Hive‑style hints: INSERT … SELECT /*+ COALESCE(numPartitions) */ … or /*+ REPARTITION(numPartitions) */.

Schedule regular merges of Hive partition directories.

Flink Small‑File Compaction

Flink’s filesystem connector supports checkpoint‑based rolling and can be configured to auto‑compact small files. A custom PartitionCommitPolicy can merge Parquet parts after each checkpoint.

package me.lmagics.flinkexp.hiveintegration.util;

import org.apache.flink.hive.shaded.parquet.example.data.Group;
import org.apache.flink.hive.shaded.parquet.hadoop.ParquetFileReader;
import org.apache.flink.hive.shaded.parquet.hadoop.ParquetFileWriter.Mode;
import org.apache.flink.hive.shaded.parquet.hadoop.ParquetReader;
import org.apache.flink.hive.shaded.parquet.hadoop.ParquetWriter;
import org.apache.flink.hive.shaded.parquet.hadoop.example.ExampleParquetWriter;
import org.apache.flink.hive.shaded.parquet.hadoop.example.GroupReadSupport;
import org.apache.flink.hive.shaded.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.flink.hive.shaded.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.flink.hive.shaded.parquet.hadoop.util.HadoopInputFile;
import org.apache.flink.hive.shaded.parquet.schema.MessageType;
import org.apache.flink.table.filesystem.PartitionCommitPolicy;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class ParquetFileMergingCommitPolicy implements PartitionCommitPolicy {
  private static final Logger LOGGER = LoggerFactory.getLogger(ParquetFileMergingCommitPolicy.class);

  @Override
  public void commit(Context context) throws Exception {
    Configuration conf = new Configuration();
    FileSystem fs = FileSystem.get(conf);
    String partitionPath = context.partitionPath().getPath();

    List<Path> files = listAllFiles(fs, new Path(partitionPath), "part-");
    LOGGER.info("{} files in path {}", files.size(), partitionPath);

    MessageType schema = getParquetSchema(files, conf);
    if (schema == null) { return; }
    LOGGER.info("Fetched parquet schema: {}", schema.toString());

    Path result = merge(partitionPath, schema, files, fs);
    LOGGER.info("Files merged into {}", result.toString());
  }

  private List<Path> listAllFiles(FileSystem fs, Path dir, String prefix) throws IOException {
    List<Path> result = new ArrayList<>();
    RemoteIterator<LocatedFileStatus> dirIterator = fs.listFiles(dir, false);
    while (dirIterator.hasNext()) {
      LocatedFileStatus fileStatus = dirIterator.next();
      Path filePath = fileStatus.getPath();
      if (fileStatus.isFile() && filePath.getName().startsWith(prefix)) {
        result.add(filePath);
      }
    }
    return result;
  }

  private MessageType getParquetSchema(List<Path> files, Configuration conf) throws IOException {
    if (files.isEmpty()) { return null; }
    HadoopInputFile inputFile = HadoopInputFile.fromPath(files.get(0), conf);
    ParquetFileReader reader = ParquetFileReader.open(inputFile);
    ParquetMetadata metadata = reader.getFooter();
    MessageType schema = metadata.getFileMetaData().getSchema();
    reader.close();
    return schema;
  }

  private Path merge(String partitionPath, MessageType schema, List<Path> files, FileSystem fs) throws IOException {
    Path mergeDest = new Path(partitionPath + "/result-" + System.currentTimeMillis() + ".parquet");
    ParquetWriter<Group> writer = ExampleParquetWriter.builder(mergeDest)
      .withType(schema)
      .withConf(fs.getConf())
      .withWriteMode(Mode.CREATE)
      .withCompressionCodec(CompressionCodecName.SNAPPY)
      .build();
    for (Path file : files) {
      ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), file)
        .withConf(fs.getConf())
        .build();
      Group data;
      while ((data = reader.read()) != null) {
        writer.write(data);
      }
      reader.close();
    }
    writer.close();
    for (Path file : files) { fs.delete(file, false); }
    return mergeDest;
  }
}

Enabling auto-compaction=true and setting compaction.file-size allows Flink to automatically merge small Parquet files after each checkpoint, reducing the number of files without manual intervention.

Conclusion

Addressing the small‑file problem requires a mix of architectural choices (e.g., HAR, HBase, federation), data‑pipeline adjustments (batching, coalescing, repartitioning), and tool‑specific features (CombineFileInputFormat, Hive merge settings, Flink compaction). Selecting the right combination based on workload characteristics can dramatically improve storage efficiency and processing performance.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

Performance OptimizationFlinkSparkHadoopNameNodefile merging
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.