Consuming Kerberos‑Protected Kafka Data with Spark Streaming and Storing into Kudu
This guide demonstrates how to configure a Spark Streaming application running on YARN in cluster mode to securely consume Kerberos‑protected Kafka topics and write the processed data into Kudu tables, including necessary Java code, Kerberos keytab setup, Kafka client configuration, and spark‑submit commands.
The article explains how to build a Spark Streaming job that reads data from a Kerberos‑secured Kafka cluster and persists it into Kudu tables while running on YARN in cluster mode.
Key assumptions are that both Kafka and Kudu are integrated with Kerberos, a non‑root user (e.g., wms) runs the job, and Spark is launched in yarn‑cluster mode.
Main Java application (simplified):
package deng.yb.sparkStreaming;
import java.net.URLDecoder;
import java.util.*;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kudu.spark.kudu.KuduContext;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.*;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.kafka010.*;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import com.alibaba.fastjson.*;
import deng.yb.sparkStreaming.kafka.KafkaTools;
import deng.yb.sparkStreaming.utils.*;
public class EApp {
private static final Logger logger = Logger.getLogger(App.class);
private static final String BEAN_CONF = "classpath:spring/spring-bean.xml";
private static Map<String, String> conf = new HashMap<>();
private static final String EPP_REQUEST = "POST /api/sky_server_data_app/track/user_time HTTP/1.1";
private static final String APP_REQUEST = "POST /api/sky_server_data_app/code/app_code HTTP/1.1";
private static final String CONFIG = "commonConfig";
private static final String MASTER = "master";
private static final String APP_NAME = "appName";
private static final String COLUMNS = "columns";
private static final String TOPIC = "topic";
private static final String TABLE = "tables";
static {
SpringContextUtil.setApplicationContext(new ClassPathXmlApplicationContext(new String[]{BEAN_CONF}));
conf = (Map<String, String>) SpringContextUtil.getBean(CONFIG);
}
public static void main(String args[]) {
try {
SparkSession spark = SparkSession.builder()
.appName(conf.get(APP_NAME)).master(conf.get(MASTER)).getOrCreate();
Map<String, Object> confMap = KafkaTools.kafkaConf(conf);
String[] topicArr = conf.get(TOPIC).split(",");
Collection<String> topics = Arrays.asList(topicArr);
StreamingContext sc = new StreamingContext(spark.sparkContext(), Durations.milliseconds(5000));
JavaStreamingContext jssc = new JavaStreamingContext(sc);
// ... (stream processing, transformation, Kudu write) ...
jssc.start();
jssc.awaitTermination();
} catch (Exception e) {
logger.error("处理消息错误2!", e);
}
}
}KafkaTools utility (simplified):
public static Map<String, Object> kafkaConf(Map<String, String> conf) {
if (conf == null) return null;
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, "DemoConsumer");
kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
kafkaParams.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000");
kafkaParams.put("security.protocol", "SASL_PLAINTEXT");
kafkaParams.put("sasl.kerberos.service.name", "kafka");
kafkaParams.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, conf.get("bootStrapServers"));
return kafkaParams;
}Kerberos JAAS configuration and keytab preparation:
# Create kafka_client_jaas.conf
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
useTicketCache=false
serviceName="kafka"
keyTab="./wms.keytab"
principal="[email protected]";
}
# Place wms.keytab in the same directory
-rwxr-xr-x 1 root root 352 Jul 16 09:48 wms.keytabBefore Spark starts, the driver and each executor must obtain a Kerberos ticket for the wms user. A cron job runs every five minutes to renew the ticket:
*/5 * * * * ./doCommand.sh "su wms -c 'kinit -kt /usr/wms/sparkstreaming/wms.keytab [email protected]'" > /usr/wms/sparkstreaming/lastupdateSubmitting the Spark job:
spark2-submit \
--driver-java-options=-Djava.security.auth.login.config=/etc/wonhighconf/bi/bi-sparkstreaming/kafka_client_jaas.conf \
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./kafka_client_jaas.conf" \
--files kafka_client_jaas.conf,wms.keytab \
--master yarn \
--deploy-mode cluster \
--class deng.yb.sparkStreaming.App \
/usr/wms/sparkstreaming/sparkStreaming-0.0.1-SNAPSHOT.jarAfter submission, the job can be monitored via YARN UI and Spark logs to verify successful consumption, transformation, and insertion into Kudu tables.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Big Data Technology & Architecture
Wang Zhiwu, a big data expert, dedicated to sharing big data technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
