Design and Implementation of a Live Streaming Danmaku System Using Redis, Kafka, and WebSocket
This article explains how to design a live‑streaming danmaku (bullet‑screen) feature by using Redis Zset as the underlying data structure, limiting the list to the latest ten messages, and handling retrieval via polling APIs or WebSocket, with Kafka for message buffering and a distributed lock to avoid ordering issues.
The author introduces a live‑streaming danmaku system, describing the need to store and display scrolling comments (danmaku) efficiently during a video broadcast.
1. Underlying data structure – Redis is chosen, specifically the sorted‑set (Zset) type, where each comment is stored with a timestamp score so that entries are automatically ordered by time. Only the most recent ten comments are retained by trimming the Zset.
// Create Zset and limit to 10 entries
public class DanmakuService {
private Jedis jedis = new Jedis("localhost");
public void addDanmaku(String roomId, String danmaku, long timestamp) {
String key = "room:" + roomId + ":danmaku";
jedis.zadd(key, timestamp, danmaku);
// Keep only the latest 10 comments
jedis.zremrangeByRank(key, 0, -(11));
}
}2. Danmaku retrieval – Two approaches are presented:
Polling API: the client periodically (e.g., every 3 seconds) requests new comments after the last known timestamp.
WebSocket: a persistent connection pushes new comments instantly, eliminating the 3‑second delay.
// Polling API example
public class DanmakuService {
private Jedis jedis = new Jedis("localhost");
public Set
getRecentDanmaku(String roomId, long lastTimestamp) {
String key = "room:" + roomId + ":danmaku";
return jedis.zrangeByScore(key, lastTimestamp + 1, Long.MAX_VALUE);
}
}3. Message flow – User‑generated comments are sent to a Kafka topic, which smooths traffic spikes. A consumer (DanmakuListener) reads from Kafka and writes each comment into the Redis Zset, again trimming to the latest ten entries.
public class DanmakuListener {
private Jedis jedis = new Jedis("localhost");
@KafkaListener(topics = "danmaku", groupId = "group_id")
public void listen(ConsumerRecord
record) {
String roomId = record.key();
String message = record.value();
long timestamp = System.currentTimeMillis();
String key = "room:" + roomId + ":danmaku";
jedis.zadd(key, timestamp, message);
jedis.zremrangeByRank(key, 0, -11);
}
}4. Handling ordering conflicts – To avoid a later comment overwriting an earlier one due to race conditions, a distributed lock is applied when writing to Redis.
// Add comment with distributed lock
public class DanmakuService {
private RedisLockUtil redisLock = new RedisLockUtil();
private Jedis jedis = new Jedis("localhost");
public void addDanmakuWithLock(String roomId, String danmaku, long timestamp) {
String lockKey = "lock:" + roomId;
try {
if (redisLock.acquireLock(lockKey)) {
String key = "room:" + roomId + ":danmaku";
jedis.zadd(key, timestamp, danmaku);
jedis.zremrangeByRank(key, 0, -11);
}
} finally {
redisLock.releaseLock(lockKey);
}
}
}
public class RedisLockUtil {
private Jedis jedis = new Jedis("localhost");
private static final int EXPIRE_TIME = 5000; // 5 seconds
public boolean acquireLock(String lockKey) {
long currentTime = System.currentTimeMillis();
String result = jedis.set(lockKey, String.valueOf(currentTime), "NX", "PX", EXPIRE_TIME);
return "OK".equals(result);
}
public void releaseLock(String lockKey) {
jedis.del(lockKey);
}
}Overall, the article provides a step‑by‑step guide for building a scalable, real‑time danmaku feature using Java, Redis, Kafka, and optional WebSocket upgrades.
Rare Earth Juejin Tech Community
Juejin, a tech community that helps developers grow.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.