Setting Up Apache Spark Standalone with Docker and Using Apache Zeppelin for Data Processing
This guide demonstrates how to build a Docker‑based Spark standalone environment, configure Apache Zeppelin to connect to it, and perform data analysis on local CSV files, HDFS, and streaming sources such as Twitter and Kafka, with complete code examples.
In a previous article we introduced Apache Zeppelin’s main features; this lesson uses two simple examples to show how Zeppelin and Spark work together.
Apache Spark supports three cluster managers (Standalone, Apache Mesos, Hadoop YARN). Here we build a Spark standalone mode environment using Docker scripts.
Spark Standalone Mode Environment Setup
Spark Standalone is a lightweight cluster manager that can be set up easily.
Note: Zeppelin and Spark share port 8080 for their Web UI, so you may need to change zeppelin.server.port in conf/zeppelin-site.xml.
1. Build Docker Image
cd $ZEPPELIN_HOME/scripts/docker/spark-cluster-managers/spark_standalone
docker build -t "spark_standalone" .2. Run Docker Container
docker run -it \
-p 8080:8080 \
-p 7077:7077 \
-p 8888:8888 \
-p 8081:8081 \
-h sparkmaster \
--name spark_standalone \
spark_standalone bash;Bind the sparkmaster hostname to /etc/hosts inside the container.
3. Configure Spark Interpreter in Zeppelin
Set Spark master to spark://<hostname>:7077 on the interpreter settings page.
4. Run Zeppelin with Spark Interpreter
After executing a paragraph with the Spark interpreter, browse to https://<hostname>:8080 to verify the Spark cluster.
ps -ef | grep sparkSpark on Zeppelin Reading Local Files
Assume a local bank.csv file with sample data:
age:Integer, job:String, marital : String, education : String, balance : Integer
20;teacher;single;本科;20000
25;plumber;single;本科;10000
...Convert the CSV to an RDD of Bank objects and register as a temporary table:
val bankText = sc.textFile("yourPath/bank/bank-full.csv")
case class Bank(age:Integer, job:String, marital : String, education : String, balance : Integer)
val bank = bankText.map(s=>s.split(";")).filter(s=>s(0)!="\"age\"").map(
s=>Bank(s(0).toInt,
s(1).replaceAll("\"", ""),
s(2).replaceAll("\"", ""),
s(3).replaceAll("\"", ""),
s(5).replaceAll("\"", "").toInt)
)
bank.toDF().registerTempTable("bank")Example SQL to visualize age distribution:
%sql select age, count(1) from bank where age < 30 group by age order by ageParameters can be made dynamic, e.g.,
%sql select age, count(1) from bank where age < ${maxAge=30} group by age order by age.
Spark on Zeppelin Reading HDFS Files
Enable the HDFS interpreter in Zeppelin and read a JSON file from HDFS, then save the first column as Parquet.
Spark on Zeppelin Reading Streaming Data
Example of reading Twitter real‑time stream:
import org.apache.spark.streaming._
import org.apache.spark.streaming.twitter._
import org.apache.spark.storage.StorageLevel
import scala.io.Source
import scala.collection.mutable.HashMap
import java.io.File
import org.apache.log4j.Logger
import org.apache.log4j.Level
import sys.process.stringSeqToProcess
/** Configures the Oauth Credentials for accessing Twitter */
def configureTwitterCredentials(apiKey: String, apiSecret: String, accessToken: String, accessTokenSecret: String) {
val configs = new HashMap[String, String] ++= Seq(
"apiKey" -> apiKey, "apiSecret" -> apiSecret, "accessToken" -> accessToken, "accessTokenSecret" -> accessTokenSecret)
println("Configuring Twitter OAuth")
configs.foreach{ case(key, value) =>
if (value.trim.isEmpty) {
throw new Exception("Error setting authentication - value for " + key + " not set")
}
val fullKey = "twitter4j.oauth." + key.replace("api", "consumer")
System.setProperty(fullKey, value.trim)
println("\tProperty " + fullKey + " set as [" + value.trim + "]")
}
println()
}
// Configure Twitter credentials
val apiKey = "xxxxxxxxxxxxxxxxxxxxxxxxx"
val apiSecret = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
val accessToken = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
val accessTokenSecret = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
configureTwitterCredentials(apiKey, apiSecret, accessToken, accessTokenSecret)
import org.apache.spark.streaming.twitter._
val ssc = new StreamingContext(sc, Seconds(2))
val tweets = TwitterUtils.createStream(ssc, None)
val twt = tweets.window(Seconds(60))
case class Tweet(createdAt:Long, text:String)
twt.map(status=>
Tweet(status.getCreatedAt().getTime()/1000, status.getText())
).foreachRDD(rdd=>
rdd.toDF().registerAsTable("tweets")
)
twt.print
ssc.startSimilarly, Zeppelin can read Kafka streams and perform word count:
%spark
import _root_.kafka.serializer.DefaultDecoder
import _root_.kafka.serializer.StringDecoder
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
sc.setLogLevel("INFO")
val ssc = new StreamingContext(sc, Seconds(5))
val kafkaConf = Map(
"metadata.broker.list" -> "localhost:9092",
"zookeeper.connect" -> "localhost:2181",
"group.id" -> "kafka-streaming-example",
"zookeeper.connection.timeout.ms" -> "1000"
)
val lines = KafkaUtils.createStream[Array[Byte], String, DefaultDecoder, StringDecoder](
ssc,
kafkaConf,
Map("test" -> 1),
StorageLevel.MEMORY_ONLY
)
val words = lines.flatMap{ case(x, y) => y.split(" ")}
import spark.implicits._
val w = words.map(x=> (x,1L)).reduceByKey(_+_)
w.foreachRDD(rdd => rdd.toDF.registerTempTable("counts"))
ssc.startQuery the top 10 words per micro‑batch: select * from counts order by _2 desc limit 10 The article concludes with an invitation to try these powerful features.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
