Big Data 8 min read

Building a Real‑Time Data Processing Pipeline with Apache Kafka, Spark Streaming, and Cassandra

This tutorial explains how to create a highly scalable, fault‑tolerant real‑time data processing platform by configuring a Kafka topic, a Cassandra keyspace, adding Spark and connector dependencies, developing a Java‑based Spark Streaming pipeline, enabling checkpoints, and deploying the application with spark‑submit.

Big Data Technology & Architecture
Big Data Technology & Architecture
Big Data Technology & Architecture
Building a Real‑Time Data Processing Pipeline with Apache Kafka, Spark Streaming, and Cassandra

Apache Kafka is a scalable, high‑performance, low‑latency platform for reading and writing data like a messaging system; Spark Streaming is a scalable, high‑throughput, fault‑tolerant real‑time stream processing engine (written in Scala but offering a Java API); and Apache Cassandra is a distributed NoSQL database. This article shows how to combine these three components to build a highly scalable, fault‑tolerant real‑time data processing platform.

Preparation : Create a Kafka topic named messages and a Cassandra keyspace vocabulary with a table words:

$KAFKA_HOME$\bin\windows\kafka-topics.bat --create \
  --zookeeper localhost:2181 \
  --replication-factor 1 --partitions 1 \
  --topic messages
CREATE KEYSPACE vocabulary WITH REPLICATION = {
    'class' : 'SimpleStrategy',
    'replication_factor' : 1
};
USE vocabulary;
CREATE TABLE words (word text PRIMARY KEY, count int);

Dependencies : Use Maven to add Spark core, Spark SQL, Spark Streaming, Spark‑Kafka connector, and Cassandra connector dependencies.

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.3.0</version>
    <scope>provided</scope>
</dependency>
... (other dependencies omitted for brevity) ...

Data pipeline development :

Obtain a JavaStreamingContext with a SparkConf that sets the application name and Cassandra host.

Read data from the Kafka topic using KafkaUtils.createDirectStream with appropriate deserializers.

Process the DStream to split lines into words, map each word to a count of 1, and reduce by key to obtain per‑batch word counts.

Write the word‑count results to the Cassandra words table using the Spark‑Cassandra connector.

SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("WordCountingApp");
sparkConf.set("spark.cassandra.connection.host", "127.0.0.1");
JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(1));

Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);
Collection<String> topics = Arrays.asList("messages");
JavaInputDStream<ConsumerRecord<String, String>> messages =
    KafkaUtils.createDirectStream(streamingContext, LocationStrategies.PreferConsistent(), ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams));

JavaPairDStream<String, Integer> wordCounts = messages
    .map(record -> new Tuple2<>(record.key(), record.value()))
    .map(tuple -> tuple._2())
    .flatMap(line -> Arrays.asList(line.split("\\s+")).iterator())
    .mapToPair(word -> new Tuple2<>(word, 1))
    .reduceByKey((i1, i2) -> i1 + i2);

wordCounts.foreachRDD(javaRdd -> {
    Map<String, Integer> wordCountMap = javaRdd.collectAsMap();
    for (String key : wordCountMap.keySet()) {
        List<Word> wordList = Arrays.asList(new Word(key, wordCountMap.get(key)));
        JavaRDD<Word> rdd = streamingContext.sparkContext().parallelize(wordList);
        javaFunctions(rdd).writerBuilder("vocabulary", "words", mapToRow(Word.class)).saveToCassandra();
    }
});

streamingContext.start();
streamingContext.awaitTermination();

Checkpoints : Enable checkpoints to preserve state across batches and compute cumulative word frequencies.

streamingContext.checkpoint("./.checkpoint");
JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> cumulativeWordCounts = wordCounts
    .mapWithState(StateSpec.function((word, one, state) -> {
        int sum = one.orElse(0) + (state.exists() ? state.get() : 0);
        Tuple2<String, Integer> output = new Tuple2<>(word, sum);
        state.update(sum);
        return output;
    }));

Deployment : Package the application and submit it with spark-submit. After the job runs, the word counts appear in the Cassandra words table. The full source code is available at https://github.com/eugenp/tutorials/tree/master/apache-spark .

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

JavaReal-TimeBig Datadata pipelineKafkaSpark Streamingcassandra
Big Data Technology & Architecture
Written by

Big Data Technology & Architecture

Wang Zhiwu, a big data expert, dedicated to sharing big data technology.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.