Elixir and Raft: Running a cluster with state
Elixir Cluster Series
Raft is a consensus algorithm that operates with a single leader and multiple followers. Each node runs a state machine that processes an ordered series of log entries to maintain the current state. All interactions, including log updates and read requests, are directed through the leader. When the leader receives a log entry to update the store, it replicates the log to the followers. Once a quorum of followers acknowledges it has written the log entry, the leader commits the change to its state and then instructs followers to do the same.
If the leader cannot achieve a quorum, it will not commit the log entry to its store. While followers may receive the log entry, they will not commit it until instructed by the leader. On future requests, followers will detect any uncommitted log entries and discard them, synchronizing with the new leader’s committed logs to catch up. This two-stage commit process ensures that uncommitted log entries can be safely rolled back, but once a state change is committed, it cannot be undone.
Ra library
Ra is an open-source, production-ready Raft implementation developed in Erlang by Team RabbitMQ. While designed to be general-purpose, it is shaped by RabbitMQ’s specific requirements.
Starting Superhero Dispatch Cluster
Install Dependencies
Install dependencies, if you have a Makefile installed, simply run:
make setup
Run the Task
This project includes a tasks.json
for Visual Studio Code.
- Open the Command Palette by pressing
Ctrl+Shift+P
(orCmd+Shift+P
on macOS). - Type “Run Task” and select it from the list.
- Choose “Run Dispatch Cluster”
This task will execute a series of scripts to start the nodes in the cluster.
- Gotham runs on http://localhost:4900
- Metropolis runs on http://localhost:4901
- Capitol runs on http://localhost:4902
- Smallville runs on http://localhost:4903
- Asgard runs on http://localhost:4904
Expected Behavior
Open the dispatch centers and add superheroes. You’ll see their updates in real-time, synced across all nodes as they fight crime. When a dispatch center shuts down, its superheroes migrate to another node and continue from where they left off. If the leader shuts down, a new leader is appointed, and the system resumes normal operations, with the new leader processing log entries and synchronizing followers. Shutting down a second dispatch center has the same effect. However, if a third center shuts down, leaving only two nodes, the system cannot form a quorum, preventing updates. Once a node is restarted, the election process completes, and the superheroes continue fighting crime.
Setup the Raft Cluster
The Superhero cluster operates with a predetermined set of members. The order of node boot-up is not predetermined, so on startup, each attempts to start the Raft cluster.
defmodule Dispatch.RaftSetup do
use GenServer
require Logger
alias Dispatch.RaftStore
@cluster_name :dispatch_cluster
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
@impl true
def init(opts) do
:ra.start()
nodes = Keyword.get(opts, :nodes, [])
machine = {:module, RaftStore, %{}}
ids = Enum.map(nodes, fn node -> {@cluster_name, node} end)
case :ra.start_cluster(:default, @cluster_name, machine, ids) do
{:ok, started, not_started} ->
Logger.info(
"Node #{Node.self()} successfully started the Raft cluster. Started: #{inspect(started)}, Not started: #{inspect(not_started)}"
)
{:ok, %{nodes: nodes}}
abort ->
Logger.error("Failed to start Raft cluster, reason: #{inspect(abort)}")
{:ok, %{nodes: nodes}}
end
{:ok, %{nodes: nodes}}
end
end
The :ra
library logs error messages in the background as each node boots, often indicating issues such as the inability to reach another node or that the cluster has already been initialized. When calling start_cluster
, it returns {:error, :cluster_not_formed}
during the startup of the first two nodes, followed by {:ok, started, not_started}
on nodes three through five. This behavior is expected, as this Raft cluster requires at least three nodes to establish leader consensus. The code traps and logs the :cluster_not_formed
error as a non-critical abort and allows the process to continue.
Although this approach works, the background logs from the :ra
library indicate it prefers all expected nodes to be active and reachable prior to starting the Raft cluster.
Revisiting Brewer’s Theorem (CAP)
Brewer’s Theorem posits that a distributed data system cannot simultaneously achieve Consistency, Availability, and Partition Tolerance (CAP).
- Consistency ensures that all nodes view the same data at the same time.
- Availability guarantees that every request receives a response, regardless of the success of the operation.
- Partition Tolerance means the system continues to function even when network failures split it into disconnected parts.
CRDT and Gossip: Availability and Partition Tolerance (AP)
An earlier solution employed Conflict-Free Replicated Data Types (CRDT), with each node maintaining a complete copy of the data communicated via a gossip (epidemic) protocol. This design allows each node to operate independently at the expense of eventual consistency, where nodes are not guarenteed to see the same data simultaneously.
Mnesia: Consistency and Availability (CA)
Another earlier solution utilized Mnesia, which provides a consistent view of data and ensures every request receives a response, but at the expense of Partition Tolerance. When a node becomes isolated while still running, Mnesia blocks until communication is reestablished.
Raft: Consistency and Partition Tolerance (CP)
Raft ensures data consistency across a distributed system while providing some partition tolerance, though this comes at the cost of guaranteed availability. The system becomes unavailable during leader elections or when a quorum cannot be reached.
Erlang Term Storage (ETS)
I mentioned in the Mnesia blog, “ETS is local and is not designed to handle data replication across a distributed cluster.” While this is true, Raft expects each node to maintain its own independent copy of the store, guaranteeing that the copies remain consistent across nodes without requiring rollbacks. In this case, ETS is a good choice for implementing a key-value store.
Managing ETS Tables With a State Machine
Writes
defmodule Dispatch.RaftStore do
@behaviour :ra_machine
require Logger
defstruct ets_table: nil, index: nil, term: nil
@impl true
def init(_config) do
ets_table = :ets.new(:raft_store, [:set, :public, :named_table])
%__MODULE__{
ets_table: ets_table
}
end
@impl true
def apply(
metadata,
{:write, key, expected_value, new_value, equality_fn},
%__MODULE__{ets_table: ets_table} = state
) do
current_value =
case :ets.lookup(ets_table, key) do
[] -> :undefined
[{_key, value}] -> value
end
case current_value do
:undefined ->
:ets.insert(ets_table, {key, new_value})
new_state = %__MODULE__{
state
| index: metadata.index,
term: metadata.term
}
{new_state, {{:read, new_value}, %{index: metadata.index, term: metadata.term}},
update_side_effects(key, new_value)}
_ ->
case equality_fn.(current_value, expected_value) do
true ->
:ets.insert(ets_table, {key, new_value})
new_state = %__MODULE__{
state
| index: metadata.index,
term: metadata.term
}
{new_state, {{:read, new_value}, %{index: metadata.index, term: metadata.term}},
update_side_effects(key, new_value)}
false ->
{state,
{{:error, :not_match, current_value}, %{index: metadata.index, term: metadata.term}},
update_failure_side_effects(key, "Value mismatch", current_state)}
end
end
end
@impl true
def apply(metadata, {:dirty_write, key, value}, %__MODULE__{ets_table: ets_table} = state) do
:ets.insert(ets_table, {key, value})
new_state = %__MODULE__{
state
| index: metadata.index,
term: metadata.term
}
{new_state, {metadata.index, metadata.term}, update_side_effects(key, value)}
end
@impl true
def apply(metadata, {:delete, key}, %__MODULE__{ets_table: ets_table} = state) do
:ets.delete(ets_table, key)
new_state = %__MODULE__{
state
| index: metadata.index,
term: metadata.term
}
{new_state, {metadata.index, metadata.term}, delete_side_effects(key)}
end
defp update_side_effects(key, value) do
[{:send_msg, {:message_server, node()}, {:key_updated, key, value}, [:cast]}]
end
defp delete_side_effects(key) do
[{:send_msg, {:message_server, node()}, {:key_deleted, key}, [:cast]}]
end
defp update_failure_side_effects(key, reason, current_state) do
[{:send_msg, {:message_server, node()}, {:failure, key, reason, current_state}, [:cast]}]
end
end
Apply
Regardless of which node calls the apply
behavior, it is always implemented by the leader. The action will be blocked while the leader is gaining consensus and if there is a current leader election.
The apply
behavior returns the updated state, the index
and term
used by Raft to maintain order and identify missing updates, along with a list of side effects, where we can cause :ra
to take certain actions. For instance, update_side_effects
directs :ra
to send a cast message to the process named :message_server
, informing it that a key has been updated.
-
:dirty_write updates a quorum of followers’ ETS tables, commits the update to the leader’s ETS table, triggers the followers to commit, and, as a side effect, sends a message to the
:message_server
. -
:write is the same as
:dirty_write
, but with optimistic concurrency control, which can fail, so it sends either a success or failure message. -
:delete generates an action to remove a record from the ETS table and sends a message on its success.
Optimistic Concurrency Control
In distributed clusters, conflicts can occur when multiple nodes attempt to modify the same data concurrently. Previously, we used Conflict-Free Replicated Data Types (CRDTs) to resolve these conflicts, ensuring eventual consistency across nodes without requiring coordination.
This solution implements optimistic concurrency control with a compare-and-swap (CAS). In a CAS write transaction, the node provides what it believes is the correct current view of the data. If this view matches the actual state, the update is applied. If the view is outdated—indicating another node has modified the data since the last read, the write is rejected. The node resolves the conflict by either adopting the newer data, submitting an updated write request, or merging the conflicting changes before resubmitting.
API
Similar to GenServers, this module benefits from having a public interface to abstract some of the underlying code complexity.
Write with CAS
Write includes a default equality function which can be overridden by the caller to tailor the comparison logic.
def write(cluster_name, key, expected_value, new_value, opts \\ []) do
defaults = [eq_fun: fn a, b -> a == b end]
options = Keyword.merge(defaults, opts)
eq_fun = Keyword.get(options, :eq_fun)
cmd = {:write, key, expected_value, new_value, eq_fun}
case :ra.process_command(cluster_name, cmd) do
{:ok, {{:read, _new_value}, _metadata}, _leader} ->
{:ok, :written}
{:ok, {{:error, :not_match, current_value}, _metadata}, _leader} ->
{:error, :not_match, current_value}
{:timeout, _} ->
{:error, :timeout}
{:error, reason} ->
{:error, reason}
end
end
:process_command
is a synchronous function that issues critical updates to the state machine, ensuring they are processed in compliance with the Raft protocol, and providing immediate feedback on the command’s success or failure.
Write Without CAS
def dirty_write(cluster_name, key, value) do
cmd = {:dirty_write, key, value}
case :ra.process_command(cluster_name, cmd) do
{:ok, {_index, _term}, _leader_ra_node_id} ->
{:ok, :written}
{:timeout, _} ->
{:error, :timeout}
{:error, reason} ->
{:error, reason}
end
end
Delete
def delete(cluster_name, key) do
cmd = {:delete, key}
case :ra.process_command(cluster_name, cmd) do
{:ok, {_index, _term}, _leader_ra_node_id} ->
{:ok, :deleted}
{:timeout, _} ->
{:error, :timeout}
{:error, reason} ->
{:error, reason}
end
end
Reads
The :ra
library supports two types of read operations: local_query
and consistent_query
. Local queries fetch data from the node’s local state, which is faster but may not always reflect the most recent write operations from other nodes. Consistent queries, on the other hand, ensure data consistency by reading from the cluster’s leader node, which always has the most up-to-date state.
If a leadership election is underway, a consistent query will block until a new leader is confirmed.
Single Read Operation
def read(cluster_name, key) do
query_fn = fn
%__MODULE__{ets_table: ets_table} ->
case :ets.lookup(ets_table, key) do
[] -> :undefined
[{_key, value}] -> value
end
end
case :ra.consistent_query(cluster_name, query_fn) do
{:ok, :undefined, _leader} ->
{:error, :not_found}
{:ok, value, _leader} ->
{:ok, value}
{:timeout, _} ->
Logger.info("Read operation timed out.")
{:error, :timeout}
{:error, :nodedown} ->
Logger.info("Read operation failed; node is down.")
{:error, :nodedown}
result ->
Logger.warning("Unexpected result for read operation: #{inspect(result)}")
{:error, :unexpected_result}
end
end
Read All Values
def read_all(cluster_name) do
query_fn = fn
%__MODULE__{ets_table: ets_table} ->
:ets.tab2list(ets_table) |> Enum.map(fn {_key, value} -> value end)
end
case :ra.consistent_query(cluster_name, query_fn) do
{:ok, values, _leader} ->
{:ok, values}
{:timeout, _} ->
{:error, :timeout}
{:error, :nodedown} ->
{:error, :nodedown}
result ->
Logger.warning("Unexpected result for read_all operation: #{inspect(result)}")
{:error, :unexpected_result}
end
end
Handling Timeouts and Errors
Raft preserves system integrity by becoming unavailable when necessary, synchronous interactions fail gracefully with a timeout.
Superhero Store
The Superhero Store implements the generic RaftStore
defmodule Dispatch.Store.SuperheroStore do
alias Dispatch.{Superhero, RaftStore}
require Logger
@cluster_name :dispatch_cluster
def upsert_superhero(
%Superhero{id: id} = expected_superhero,
%Superhero{} = new_superhero
) do
RaftStore.write(@cluster_name, id, expected_superhero, new_superhero)
end
def get_superhero(id) do
RaftStore.read(@cluster_name, id)
end
def get_all_superheroes do
superheroes =
RaftStore.read_all(@cluster_name)
superheroes
end
def delete_superhero(id) do
RaftStore.delete(@cluster_name, id)
end
end
upsert_superhero
enforces the Superhero
struct in the store.
The RA library seems to have a spec type issue related to ra_server_id()
causing these functions, and any code that uses them, to trigger Dialyzer warnings.
Pubsub
In the Mnesia implementation, local Mnesia change events were broadcast to the web view, ensuring real-time updates. However, because all nodes broadcasted from their local store, this led to duplicate messages across the cluster. To resolve this, the topic was prepended with the node()
, and the web views only listened to messages generated from their own node. With Raft, the :ra
apply function includes side effects, but these are only executed on the leader. As a result, there is now a single source of messages, so the node prefix is no longer needed.
defmodule Dispatch.Store.PubSub do
use GenServer
require Logger
alias Dispatch.Superhero
alias Phoenix.PubSub
def topic do
"superhero_data"
end
def start_link(_) do
GenServer.start_link(__MODULE__, %{}, name: :message_server)
end
@impl true
def init(arg) do
{:ok, arg}
end
@impl true
def handle_cast({:key_updated, _key, value}, state) do
PubSub.broadcast(
Dispatch.PubSub,
topic(),
{:update_superhero, value}
)
{:noreply, state}
end
@impl true
def handle_cast({:key_deleted, key}, state) do
PubSub.broadcast(
Dispatch.PubSub,
topic(),
{:delete_superhero, %Superhero{id: key}}
)
{:noreply, state}
end
@impl true
def handle_cast({:failure, key, reason, current_state}, state) do
Logger.error(
"Received failure on #{key} with reason: #{reason}, state: #{inspect(current_state)}"
)
{:noreply, state}
end
end
Superhero GenServer
From a Brewer’s Theorem (CAP) perspective, a GenServer aligns well with an AP solution, allowing it to operate independently and rely on eventual consistency to synchronize with other nodes over time. However, with Raft (CP), availability is not guaranteed, meaning the GenServer must now handle the contingency of waiting for data to become available before it can proceed.
Init Superhero
In addition to :ok
and :error, :not_found
, the read operation can now return :error, :timeout
, which represents Raft being unavailable, such as during an election to establish a new leader.
def handle_info(:init_superhero, %{superhero: superhero} = state) do
case SuperheroStore.get_superhero(superhero.id) do
{:ok, existing_superhero} ->
Logger.info("Using existing superhero #{superhero.id}.")
new_state = %{
state
| superhero: existing_superhero |> Map.put(:node, node()),
current_superhero: existing_superhero
}
send(self(), :update_superhero)
schedule_next_action()
{:noreply, new_state}
{:error, :not_found} ->
new_superhero = generate_new_superhero(superhero)
new_state = %{
state
| superhero: new_superhero |> Map.put(:node, node()),
current_superhero: new_superhero
}
send(self(), :update_superhero)
schedule_next_action()
{:noreply, new_state}
{:error, :timeout} ->
Logger.info("Timeout occurred, retrying superhero initialization.")
Process.send_after(self(), :init_superhero, 2000)
{:noreply, state}
end
end
-
:ok, existing_superhero
retrieves the current representation of the superhero from the leader’s ETS table. It updates the node information to the current node, saves the updated data back to the store, and triggers the superhero to resume fighting crime. -
:error, :not_found
causes the GenServer to create a new superhero, adds the current node, saves the new superhero to the database, and begins fighting crime. -
:error, :timeout
loops the GenServer back on itself after two seconds, attempting to initialize the superhero again.
Notice that the state now holds both the superhero
and current_superhero
. Since Raft does not have a concept of transactions, the GenServer retains what it believes to be the current representation of the superhero in the database, along with its updated version. If another node has modified the superhero, the GenServer will need to incorporate that change.
Update Superhero
The update now returns {:ok, :written}
, {:error, :not_match}
, or {:error, :timeout}
, indicating success, an error due to another node modifying the database, or an error caused by Raft being unavailable.
def handle_info(
:update_superhero,
%{current_superhero: current_superhero, superhero: updated_superhero} = state
) do
new_state =
case SuperheroStore.upsert_superhero(current_superhero, updated_superhero) do
{:ok, _} ->
Logger.info("Superhero #{updated_superhero.id} successfully updated in store.")
%{state | current_superhero: updated_superhero, superhero: updated_superhero}
{:error, :not_match, current_value} ->
Logger.info(
"Superhero #{updated_superhero.id} was updated in the store by another node: #{inspect(current_value)}."
)
%{state | current_superhero: current_value, superhero: current_value}
{:error, :timeout} ->
Logger.info("Update store timeout for superhero #{updated_superhero.id}, reverted.")
%{state | superhero: current_superhero}
end
{:noreply, new_state}
end
-
:ok
updates both thesuperhero
andcurrent_superhero
to reflect what this GenServer believes is the current state, ensuring they are aligned for the next update. -
:error, :not_match
acknowledges that another node has modified the superhero and resets the GenServer’s superhero to the known current state. In this case, any changes made will be lost, though another approach might involve merging the changes and resubmitting. -
:error, :timeout
indicates that Raft is unavailable and reverts to the last known state, stopping the superhero from progressing while the cluster is down.
State Machine
A Raft state machine must adhere to the following principles:
-
Determinism: The operations of the state machine must produce the same result every time given the same sequence of log entries. This means no randomness or reliance on external systems with variable responses. Ensuring determinism guarantees that all nodes, when processing identical logs, will reach the same state. The
:ra
framework enforces this by isolating side effects, ensuring they don’t interfere with the log’s deterministic processing. -
Idempotence: The state machine must handle the same log entry being applied multiple times without changing the final state. This ensures that retries or duplicate log entries do not cause inconsistencies, making idempotence critical for maintaining correct state, even in the presence of network issues or replays.
-
Serialization: The state machine must support serializing its current state into a snapshot, which can be saved and restored later. This capability is essential for efficient log compaction, allowing the system to discard older log entries that have already been applied. Serialization speeds up the process of bringing new or recovering nodes up to date by allowing them to load the snapshot and process only the log entries created after the snapshot was taken.
Snapshot
Raft handles log growth over time through a process called log compaction. This is done by creating snapshots of the state machine’s current state, which serve as new starting points for the system. Once a snapshot is taken, all log entries preceding it are discarded, as they are no longer needed to reconstruct the state. When new nodes join the cluster, or existing nodes need to catch up, they first load the latest snapshot and then apply any subsequent log entries to reach the current state. This approach ensures efficient log management and reduces the overhead of storing and replaying long histories of log entries.
Conclusion
Whether to use Raft to solve the Superhero Dispatch problem depends on whether it’s imperative that all dispatch centers share a consistent view of superhero data at the same time, or whether it’s more important that the dispatch centers continue to operate regardless of the condition of other nodes. Brewer’s theorem teaches us there are no free lunches in distributed systems. If partition tolerance and consistent data are critical, Raft is a good solution. If availability and partition tolerance are paramount, consider a CRDT. And if partition tolerance is not needed, Mnesia might be the best choice.