Big Data 9 min read

How to Spot Missing Records When Syncing PostgreSQL to Elasticsearch with Logstash

This guide explains why Logstash may import fewer rows than exist in a PostgreSQL table, and provides two practical methods—shell‑script ID comparison and Redis‑accelerated set operations—to reliably detect and troubleshoot missing records in Elasticsearch.

ITPUB
ITPUB
ITPUB
How to Spot Missing Records When Syncing PostgreSQL to Elasticsearch with Logstash

1. Real‑world Problem

Q1: Logstash imports a PostgreSQL table (≈76 million rows) into Elasticsearch, but the document count in ES is far lower; no errors appear in Logstash logs.

Q2: In an asynchronous dual‑write architecture (database + ES), how can we guarantee data consistency?

2. Recommended Solution 1 – ID Comparison Method

To identify which rows were not indexed, follow these steps:

Verify that the Logstash input JDBC plugin is correctly configured and that the statement selects all required rows.

Check the output plugin configuration for Elasticsearch and ensure no filter unintentionally drops data.

Add a stdout plugin to write every record fetched from PostgreSQL to a local file.

Example Logstash snippet:

output {
  elasticsearch {
    ...Elasticsearch configuration...
  }
  stdout {
    codec => json_lines
    path => "/path/to/logstash_output.log"
  }
}

Compare the Logstash output file with the original PostgreSQL data using a simple script (Python, Shell, etc.). If the record counts match but ES still differs, inspect the ES cluster health and logs, and consider reducing the bulk size via flush_size and idle_flush_time settings.

2.1 Shell Script Implementation

The following Bash script extracts IDs from the Logstash JSON log, sorts them together with IDs exported from PostgreSQL, and uses comm to list missing IDs.

#!/bin/bash
# Extract IDs from Logstash JSON output
jq '.id' /path/to/logstash_output.log > logstash_ids.txt
# Remove surrounding quotes
sed -i 's/"//g' logstash_ids.txt
# Sort both ID files
sort -n logstash_ids.txt > logstash_ids_sorted.txt
sort -n /path/to/postgres_data.csv > postgres_ids_sorted.txt
# Find IDs present in PostgreSQL but absent in Logstash
comm -23 postgres_ids_sorted.txt logstash_ids_sorted.txt > missing_ids.txt

echo "Missing IDs in Logstash output:"
cat missing_ids.txt

Make the script executable and run it:

chmod +x compare.sh
./compare.sh

The script requires jq for JSON parsing; install it if necessary.

2.2 Redis‑Accelerated Comparison

For larger datasets, loading IDs into Redis sets enables fast set‑difference operations.

Python example (requires the redis package):

import redis
import csv

# Connect to Redis
r = redis.StrictRedis(host='localhost', port=6379, db=0)

# Load PostgreSQL IDs into a Redis set
with open('/path/to/postgres_data.csv', newline='') as csvfile:
    csv_reader = csv.reader(csvfile)
    next(csv_reader)  # skip header
    for row in csv_reader:
        r.sadd('postgres_ids', row[0])

# Load Logstash IDs into another Redis set
with open('/path/to/logstash_output.log', newline='') as logstash_file:
    for line in logstash_file:
        id = line.split('"id":')[1].split(',')[0].strip()
        r.sadd('logstash_ids', id)

# Compute the difference
missing_ids = r.sdiff('postgres_ids', 'logstash_ids')

print("Missing IDs in Logstash output:")
for missing_id in missing_ids:
    print(missing_id)

Install the Redis client library with:

pip install redis

3. Summary of Approaches

Approach 1 – Shell Script & Grep

Advantages : Simple, no extra dependencies.

Disadvantages : Slower due to disk I/O; high memory/CPU usage on very large tables.

Approach 2 – Redis‑Based Acceleration

Advantages : Fast, in‑memory set operations; scales to massive data volumes.

Disadvantages : Requires a running Redis server and additional scripting.

Choose the shell‑script method for small‑to‑moderate data sets where speed is not critical, and opt for the Redis solution when handling tens of millions of rows or when performance is a priority. In practice, you may need to combine both techniques and fine‑tune Logstash/Elasticsearch bulk settings to achieve reliable, high‑throughput synchronization.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

PythonElasticsearchredisPostgreSQLLogstashshell scriptdata comparison
ITPUB
Written by

ITPUB

Official ITPUB account sharing technical insights, community news, and exciting events.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.