Elasticsearch Leader Election in 7.x and Later: Raft Algorithm Implementation and Source Code Analysis
This article explains why Elasticsearch switched to the Raft consensus algorithm for master election from version 7.0, outlines the Raft basics, and provides a detailed walkthrough of the relevant Java source code that implements the new election process.
In the previous article the author described Elasticsearch's pre‑7.x master election based on the Bully algorithm; starting with version 7.0 the system adopts the Raft algorithm for leader election.
Raft is used because the old Zen discovery required the discovery.zen.minimum_master_nodes setting, which could cause temporary unavailability if mis‑configured, and because the previous three‑round ping election was too slow.
Raft defines three node states—Follower, Candidate, Leader. A node becomes a Candidate when it does not receive heartbeats from a Leader, starts an election, and becomes Leader after receiving votes from a majority of nodes.
The high‑level election flow in Elasticsearch is illustrated by the diagram (omitted) and is driven by the Discovery interface, with the new Raft‑based implementation class named Coordinator .
When a node starts, it calls startInitialJoin() :
@Override
public void startInitialJoin() {
synchronized (mutex) {
becomeCandidate("startInitialJoin");
}
clusterBootstrapService.scheduleUnconfiguredBootstrap();
}The scheduleUnconfiguredBootstrap() method checks that the node is a master‑eligible node and schedules the bootstrap after the default 3‑second timeout:
void scheduleUnconfiguredBootstrap() {
if (unconfiguredBootstrapTimeout == null) {
return;
}
if (!transportService.getLocalNode().isMasterNode()) {
return;
}
transportService.getThreadPool().scheduleUnlessShuttingDown(
unconfiguredBootstrapTimeout, Names.GENERIC, new Runnable() {
@Override
public void run() {
Set
discoveredNodes = getDiscoveredNodes();
List
zen1Nodes = discoveredNodes.stream()
.filter(Coordinator::isZen1Node).collect(Collectors.toList());
if (zen1Nodes.isEmpty()) {
startBootstrap(discoveredNodes, emptyList());
} else {
logger.info("avoiding best‑effort cluster bootstrapping due to discovery of pre‑7.0 nodes {}", zen1Nodes);
}
}
});
}The startBootstrap() method validates the discovered nodes and then calls doBootstrap() :
private void startBootstrap(Set
discoveryNodes, List
unsatisfiedRequirements) {
assert discoveryNodes.stream().allMatch(DiscoveryNode::isMasterNode);
assert discoveryNodes.stream().noneMatch(Coordinator::isZen1Node);
if (bootstrappingPermitted.compareAndSet(true, false)) {
doBootstrap(new VotingConfiguration(Stream.concat(
discoveryNodes.stream().map(DiscoveryNode::getId),
unsatisfiedRequirements.stream().map(s -> BOOTSTRAP_PLACEHOLDER_PREFIX + s))
.collect(Collectors.toSet())));
}
}The doBootstrap() method creates the initial voting configuration and triggers the election scheduler:
private void doBootstrap(VotingConfiguration votingConfiguration) {
assert transportService.getLocalNode().isMasterNode();
try {
votingConfigurationConsumer.accept(votingConfiguration);
} catch (Exception e) {
transportService.getThreadPool().scheduleUnlessShuttingDown(
TimeValue.timeValueSeconds(10), Names.GENERIC, new Runnable() {
@Override
public void run() {
doBootstrap(votingConfiguration);
}
});
}
}The setInitialConfiguration() method performs basic checks, updates the cluster metadata, and starts the election scheduler:
public boolean setInitialConfiguration(final VotingConfiguration votingConfiguration) {
synchronized (mutex) {
ClusterState currentState = getStateForMasterService();
List
knownNodes = new ArrayList<>();
knownNodes.add(getLocalNode());
peerFinder.getFoundPeers().forEach(knownNodes::add);
if (!votingConfiguration.hasQuorum(knownNodes.stream().map(DiscoveryNode::getId).collect(Collectors.toList()))) {
throw new CoordinationStateRejectedException("not enough nodes discovered to form a quorum in the initial configuration " +
"[knownNodes=" + knownNodes + ", " + votingConfiguration + "]");
}
logger.info("setting initial configuration to {}", votingConfiguration);
CoordinationMetaData coordinationMetaData = CoordinationMetaData.builder(currentState.coordinationMetaData())
.lastAcceptedConfiguration(votingConfiguration)
.lastCommittedConfiguration(votingConfiguration)
.build();
MetaData.Builder metaDataBuilder = MetaData.builder(currentState.metaData());
metaDataBuilder.generateClusterUuidIfNeeded();
metaDataBuilder.coordinationMetaData(coordinationMetaData);
coordinationState.get().setInitialState(ClusterState.builder(currentState).metaData(metaDataBuilder).build());
preVoteCollector.update(getPreVoteResponse(), null);
startElectionScheduler();
return true;
}
}The election scheduler repeatedly invokes preVoteCollector.start() to send pre‑vote requests to all discovered nodes:
private void startElectionScheduler() {
electionScheduler = electionSchedulerFactory.startElectionScheduler(gracePeriod, new Runnable() {
@Override
public void run() {
synchronized (mutex) {
if (mode == Mode.CANDIDATE) {
if (!localNodeMayWinElection(getLastAcceptedState())) {
return;
}
List
discoveredNodes = getDiscoveredNodes().stream()
.filter(n -> !isZen1Node(n)).collect(Collectors.toList());
prevotingRound = preVoteCollector.start(lastAcceptedState, discoveredNodes);
}
}
}
});
}When a node receives a pre‑vote request, handlePreVoteRequest() updates the maximum term seen and either accepts or rejects the request based on the current leader:
private PreVoteResponse handlePreVoteRequest(final PreVoteRequest request) {
updateMaxTermSeen.accept(request.getCurrentTerm());
DiscoveryNode leader = state.v1();
if (leader == null || leader.equals(request.getSourceNode())) {
return state.v2();
}
throw new CoordinationStateRejectedException("rejecting " + request + " as there is already a leader");
}The updateMaxTermSeen() method bumps the term if a higher term is observed and may trigger a new election:
private void updateMaxTermSeen(final long term) {
synchronized (mutex) {
maxTermSeen = Math.max(maxTermSeen, term);
long currentTerm = getCurrentTerm();
if (mode == Mode.LEADER && maxTermSeen > currentTerm) {
if (publicationInProgress()) {
logger.debug("updateMaxTermSeen: maxTermSeen = {} > currentTerm = {}, enqueueing term bump", maxTermSeen, currentTerm);
} else {
try {
logger.debug("updateMaxTermSeen: maxTermSeen = {} > currentTerm = {}, bumping term", maxTermSeen, currentTerm);
ensureTermAtLeast(getLocalNode(), maxTermSeen);
startElection();
} catch (Exception e) {
logger.warn(new ParameterizedMessage("failed to bump term to {}", maxTermSeen), e);
becomeCandidate("updateMaxTermSeen");
}
}
}
}
}After collecting enough pre‑votes, handlePreVoteResponse() builds a temporary VoteCollection and, if a quorum is reached, calls startElection() to send StartJoinRequest to all other nodes:
private void handlePreVoteResponse(final PreVoteResponse response, final DiscoveryNode sender) {
updateMaxTermSeen.accept(response.getCurrentTerm());
if (response.getLastAcceptedTerm() > clusterState.term() ||
(response.getLastAcceptedTerm() == clusterState.term() &&
response.getLastAcceptedVersion() > clusterState.getVersionOrMetaDataVersion())) {
return;
}
preVotesReceived.put(sender, response);
VoteCollection voteCollection = new VoteCollection();
// add local and remote pre‑votes to voteCollection ...
if (!electionStrategy.isElectionQuorum(..., voteCollection)) {
return;
}
startElection.run();
}The startElection() method creates a StartJoinRequest with an incremented term and sends it to every non‑Zen node:
private void startElection() {
synchronized (mutex) {
if (mode == Mode.CANDIDATE) {
if (!localNodeMayWinElection(getLastAcceptedState())) {
return;
}
StartJoinRequest startJoinRequest = new StartJoinRequest(getLocalNode(), Math.max(getCurrentTerm(), maxTermSeen) + 1);
getDiscoveredNodes().forEach(node -> {
if (!isZen1Node(node)) {
joinHelper.sendStartJoinRequest(startJoinRequest, node);
}
});
}
}
}When a node receives a StartJoinRequest , it registers a handler that creates a Join via joinLeaderInTerm() and sends a JoinRequest back to the leader:
transportService.registerRequestHandler(START_JOIN_ACTION_NAME, Names.GENERIC, false, false,
StartJoinRequest::new,
(request, channel, task) -> {
DiscoveryNode destination = request.getSourceNode();
sendJoinRequest(destination, Optional.of(joinLeaderInTerm.apply(request)));
channel.sendResponse(Empty.INSTANCE);
});The joinLeaderInTerm() method updates the term, may transition the node to Candidate, and returns the generated Join object:
private Join joinLeaderInTerm(StartJoinRequest startJoinRequest) {
synchronized (mutex) {
Join join = coordinationState.get().handleStartJoin(startJoinRequest);
lastJoin = Optional.of(join);
peerFinder.setCurrentTerm(getCurrentTerm());
if (mode != Mode.CANDIDATE) {
becomeCandidate("joinLeaderInTerm");
} else {
followersChecker.updateFastResponseState(getCurrentTerm(), mode);
preVoteCollector.update(getPreVoteResponse(), null);
}
return join;
}
}Upon receiving a JoinRequest , the node validates the request (checking cluster UUID, version compatibility, etc.) and then processes the join. If the node wins the election during this processing, it transitions to Leader via becomeLeader() .
Finally, coordinationState.handleJoin() adds the join vote to the vote set and determines whether a majority has been reached, marking the election as won when appropriate.
政采云技术
ZCY Technology Team (Zero), based in Hangzhou, is a growth-oriented team passionate about technology and craftsmanship. With around 500 members, we are building comprehensive engineering, project management, and talent development systems. We are committed to innovation and creating a cloud service ecosystem for government and enterprise procurement. We look forward to your joining us.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.