Building a Real‑Time Data Processing Pipeline with Apache Kafka, Spark Streaming, and Cassandra
This tutorial explains how to create a highly scalable, fault‑tolerant real‑time data processing platform by configuring a Kafka topic, a Cassandra keyspace, adding Spark and connector dependencies, developing a Java‑based Spark Streaming pipeline, enabling checkpoints, and deploying the application with spark‑submit.
Apache Kafka is a scalable, high‑performance, low‑latency platform for reading and writing data like a messaging system; Spark Streaming is a scalable, high‑throughput, fault‑tolerant real‑time stream processing engine (written in Scala but offering a Java API); and Apache Cassandra is a distributed NoSQL database. This article shows how to combine these three components to build a highly scalable, fault‑tolerant real‑time data processing platform.
Preparation : Create a Kafka topic named messages and a Cassandra keyspace vocabulary with a table words:
$KAFKA_HOME$\bin\windows\kafka-topics.bat --create \
--zookeeper localhost:2181 \
--replication-factor 1 --partitions 1 \
--topic messages CREATE KEYSPACE vocabulary WITH REPLICATION = {
'class' : 'SimpleStrategy',
'replication_factor' : 1
};
USE vocabulary;
CREATE TABLE words (word text PRIMARY KEY, count int);Dependencies : Use Maven to add Spark core, Spark SQL, Spark Streaming, Spark‑Kafka connector, and Cassandra connector dependencies.
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.3.0</version>
<scope>provided</scope>
</dependency>
... (other dependencies omitted for brevity) ...Data pipeline development :
Obtain a JavaStreamingContext with a SparkConf that sets the application name and Cassandra host.
Read data from the Kafka topic using KafkaUtils.createDirectStream with appropriate deserializers.
Process the DStream to split lines into words, map each word to a count of 1, and reduce by key to obtain per‑batch word counts.
Write the word‑count results to the Cassandra words table using the Spark‑Cassandra connector.
SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("WordCountingApp");
sparkConf.set("spark.cassandra.connection.host", "127.0.0.1");
JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(1));
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);
Collection<String> topics = Arrays.asList("messages");
JavaInputDStream<ConsumerRecord<String, String>> messages =
KafkaUtils.createDirectStream(streamingContext, LocationStrategies.PreferConsistent(), ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams));
JavaPairDStream<String, Integer> wordCounts = messages
.map(record -> new Tuple2<>(record.key(), record.value()))
.map(tuple -> tuple._2())
.flatMap(line -> Arrays.asList(line.split("\\s+")).iterator())
.mapToPair(word -> new Tuple2<>(word, 1))
.reduceByKey((i1, i2) -> i1 + i2);
wordCounts.foreachRDD(javaRdd -> {
Map<String, Integer> wordCountMap = javaRdd.collectAsMap();
for (String key : wordCountMap.keySet()) {
List<Word> wordList = Arrays.asList(new Word(key, wordCountMap.get(key)));
JavaRDD<Word> rdd = streamingContext.sparkContext().parallelize(wordList);
javaFunctions(rdd).writerBuilder("vocabulary", "words", mapToRow(Word.class)).saveToCassandra();
}
});
streamingContext.start();
streamingContext.awaitTermination();Checkpoints : Enable checkpoints to preserve state across batches and compute cumulative word frequencies.
streamingContext.checkpoint("./.checkpoint");
JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> cumulativeWordCounts = wordCounts
.mapWithState(StateSpec.function((word, one, state) -> {
int sum = one.orElse(0) + (state.exists() ? state.get() : 0);
Tuple2<String, Integer> output = new Tuple2<>(word, sum);
state.update(sum);
return output;
}));Deployment : Package the application and submit it with spark-submit. After the job runs, the word counts appear in the Cassandra words table. The full source code is available at https://github.com/eugenp/tutorials/tree/master/apache-spark .
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
