How etcd Implements Raft and Persistent Storage: A Deep Dive into Its Core Modules
This article explains the inner workings of etcd, covering its Raft consensus implementation, node election and state management, as well as the persistent storage layer built on BoltDB, and includes detailed code excerpts to illustrate each component.
Introduction
etcd is positioned as a reliable distributed key‑value store that holds critical data for a cluster and helps keep the system running smoothly.
Compared with ZooKeeper, etcd is a key‑value store while ZooKeeper is a centralized configuration and synchronization service.
etcd exposes a gRPC interface, allowing clients to interact via Protobuf or the etcdctl command‑line tool.
Raft Consensus Algorithm
etcd uses the Raft algorithm to achieve distributed consistency among its nodes. A typical etcd cluster consists of 3‑5 nodes for high availability.
Each Raft node can be in one of three states: Leader, Follower, or Candidate. Only one Leader exists at a time, and Followers are passive, responding only to requests from the Leader or Candidate.
Time in Raft is divided into terms; each term begins with a Leader election. If the Leader fails, a new election starts.
Node Election
When a Follower does not receive heartbeats within the election timeout, it triggers a MsgHup message to start a pre‑election or election phase.
func (r *raft) Step(m pb.Message) error {
switch m.Type {
case pb.MsgHup:
if r.state != StateLeader {
if r.preVote {
r.campaign(campaignPreElection)
} else {
r.campaign(campaignElection)
}
}
}
return nil
}The election process involves the candidate voting for itself, sending MsgVote to other nodes, and collecting MsgVoteResp. If a majority is reached, the candidate becomes the Leader.
Timers and Heartbeats
Followers reset their election timer upon receiving a MsgHeartbeat from the Leader. The Leader periodically sends heartbeats using tickHeartbeat, which ultimately generates MsgBeat messages that are translated to MsgHeartbeat for Followers.
func (r *raft) tickHeartbeat() {
r.heartbeatElapsed++
r.electionElapsed++
if r.heartbeatElapsed >= r.heartbeatTimeout {
r.heartbeatElapsed = 0
r.Step(pb.Message{From: r.id, Type: pb.MsgBeat})
}
}Node States
The Raft implementation defines four state‑transition functions: becomeFollower(term, lead) – resets the node, sets the step function to stepFollower, and starts the election timer. becomeCandidate() – increments the term, votes for itself, and sets the step function to stepCandidate. becomeLeader() – sets the step function to stepLeader, switches the timer to tickHeartbeat, and appends a no‑op entry.
Each state has its own message‑handling logic ( stepLeader, stepCandidate, stepFollower) that processes votes, heartbeats, and other Raft messages.
Storage Layer
etcd supports two major versions: V2 (in‑memory only) and V3 (persistent). V3 uses a backend abstraction that hides the underlying storage engine, which is BoltDB (or its maintained fork bbolt).
type Backend interface {
ReadTx() ReadTx
BatchTx() BatchTx
Snapshot() Snapshot
Hash(ignores map[IgnoreKey]struct{}) (uint32, error)
Size() int64
SizeInUse() int64
Defrag() error
ForceCommit()
Close() error
}The backend creates a readTx for read‑only operations and a batchTx for read‑write batches. A background goroutine periodically commits pending write batches.
func (b *backend) run() {
t := time.NewTimer(b.batchInterval)
for {
select {
case <-t.C:
if b.batchTx.safePending() != 0 {
b.batchTx.Commit()
}
t.Reset(b.batchInterval)
case <-b.stopc:
b.batchTx.CommitAndStop()
return
}
}
}Read‑Only Transactions
readTxprovides two main methods: UnsafeRange for key‑range queries and UnsafeForEach for iterating over a bucket. It uses an in‑memory buffer to serve cached reads before falling back to BoltDB.
func (rt *readTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
if endKey == nil {
limit = 1
}
keys, vals := rt.buf.Range(bucketName, key, endKey, limit)
if int64(len(keys)) == limit {
return keys, vals
}
// fallback to BoltDB if buffer insufficient
bucket := rt.tx.Bucket(bucketName)
if bucket == nil {
return keys, vals
}
c := bucket.Cursor()
k2, v2 := unsafeRange(c, key, endKey, limit-int64(len(keys)))
return append(k2, keys...), append(v2, vals...)
}Read‑Write Transactions
batchTximplements UnsafePut, UnsafeDelete, and Commit. Writes are staged in a BoltDB transaction and committed either manually or by the periodic background runner.
func (t *batchTx) UnsafePut(bucketName, key, value []byte) {
t.unsafePut(bucketName, key, value, false)
}
func (t *batchTx) unsafePut(bucketName, key, value []byte, seq bool) {
bucket := t.tx.Bucket(bucketName)
if err := bucket.Put(key, value); err != nil {
plog.Fatalf("cannot put key into bucket (%v)", err)
}
t.pending++
}
func (t *batchTx) Commit() {
t.Lock()
t.commit(false)
t.Unlock()
}Metrics about commit latency, rebalance time, and write time are exported to Prometheus for monitoring.
未完,待续
Architecture Talk
Rooted in the "Dao" of architecture, we provide pragmatic, implementation‑focused architecture content.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
