Backend Development 15 min read

Implementing Raft Command Handling and Log Replication in Go

This article explains how to extend a Go implementation of the Raft consensus algorithm to handle client commands, replicate logs across the cluster, manage commit pipelines, and ensure election safety, while providing detailed code examples and discussion of underlying concepts.

360 Tech Engineering
360 Tech Engineering
360 Tech Engineering
Implementing Raft Command Handling and Log Replication in Go

In this third part of the Raft series, the article introduces client interaction, describing how a client submits a command to the leader, the leader replicates the command to followers, and the command is considered committed once a majority acknowledges it.

1. Client Interaction

The client sends a command to the leader, which then replicates it to its followers. Once a majority of nodes have the command in their logs, the command is committed and the client is notified.

2. Implementation: Commit Pipeline

A commitChan chan<- CommitEntry is used to deliver committed entries to the client. The CommitEntry struct contains the command, its log index, and the term.

type CommitEntry struct {
    // Command is the client command being committed.
    Command interface{}

    // Index is the log index at which the client command is committed.
    Index int

    // Term is the Raft term at which the client command is committed.
    Term int
}

The leader appends new commands to its log and returns true if it is the leader; otherwise it returns false.

func (cm *ConsensusModule) Submit(command interface{}) bool {
    cm.mu.Lock()
    defer cm.mu.Unlock()

    cm.dlog("Submit received by %v: %v", cm.state, command)
    if cm.state == Leader {
        cm.log = append(cm.log, LogEntry{Command: command, Term: cm.currentTerm})
        cm.dlog("... log=%v", cm.log)
        return true
    }
    return false
}

3. Raft Log

Log entries are defined as:

type LogEntry struct {
    Command interface{}
    Term    int
}

Each node maintains log []LogEntry . The leader appends new entries and replicates them via AppendEntries RPCs.

4. Submitting New Commands

The Submit method adds a command to the leader’s log and triggers replication.

5. Replicating Log Entries

The leader sends heartbeats (AppendEntries RPCs) to followers, including any new entries. Followers validate PrevLogIndex and PrevLogTerm , insert missing entries, and update their commitIndex when the leader’s commit index advances.

func (cm *ConsensusModule) leaderSendHeartbeats() {
    cm.mu.Lock()
    savedCurrentTerm := cm.currentTerm
    cm.mu.Unlock()

    for _, peerId := range cm.peerIds {
        go func(peerId int) {
            cm.mu.Lock()
            ni := cm.nextIndex[peerId]
            prevLogIndex := ni - 1
            prevLogTerm := -1
            if prevLogIndex >= 0 {
                prevLogTerm = cm.log[prevLogIndex].Term
            }
            entries := cm.log[ni:]
            args := AppendEntriesArgs{Term: savedCurrentTerm, LeaderId: cm.id, PrevLogIndex: prevLogIndex, PrevLogTerm: prevLogTerm, Entries: entries, LeaderCommit: cm.commitIndex}
            cm.mu.Unlock()
            var reply AppendEntriesReply
            if err := cm.server.Call(peerId, "ConsensusModule.AppendEntries", args, &reply); err == nil {
                cm.mu.Lock()
                defer cm.mu.Unlock()
                // handle reply, update nextIndex, matchIndex, commitIndex, etc.
            }
        }(peerId)
    }
}

6. Updating Followers' Logs

Followers process AppendEntries RPCs, checking term consistency, appending new entries, and advancing commitIndex when appropriate.

func (cm *ConsensusModule) AppendEntries(args AppendEntriesArgs, reply *AppendEntriesReply) error {
    cm.mu.Lock()
    defer cm.mu.Unlock()
    if cm.state == Dead { return nil }
    // term checks, log consistency checks, entry insertion, commit index update
    // send reply with success flag and current term
    return nil
}

7. Election Safety

RequestVote RPCs include lastLogIndex and lastLogTerm to ensure a candidate’s log is at least as up‑to‑date as a majority of the cluster before it can be elected.

func (cm *ConsensusModule) RequestVote(args RequestVoteArgs, reply *RequestVoteReply) error {
    cm.mu.Lock()
    defer cm.mu.Unlock()
    // term and log checks, grant vote if appropriate
    return nil
}

8. Next Steps

The current implementation lacks persistence; future work will add durable storage to prevent data loss on server restarts.

References: Raft paper (https://raft.github.io/raft.pdf) and source code (https://github.com/eliben/raft).

backenddistributed systemsGoRaftConsensusLog Replication
360 Tech Engineering
Written by

360 Tech Engineering

Official tech channel of 360, building the most professional technology aggregation platform for the brand.

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.