Elixir: Running a cluster of nodes
Elixir Cluster Series
The Superhero Cluster comprises location nodes: gotham
, metropolis
, and capitol
. Each node functions as an independent actor, autonomously assigning superheroes to patrol duties. The cluster also includes a unique dispatch
node, which provides independent real-time web views for users to create and manage superheroes across the system.
This cluster operates on a leaderless architecture. While the data held by each actor may vary at any given moment, the cluster ensures eventual consistency, meaning that over time, all nodes will converge to a consistent state.
The Superhero Cluster follows a “last in wins” strategy. For example, if a dispatcher reassigns a superhero to a new location, but the superhero’s current location assigns them to patrol duties before this update is received, the system will prioritize the latest action, with the superhero reverting to their original location.
Starting the Cluster
Install Dependencies
Install dependencies for both the /location
and /superhero
subdirectories. If you have a Makefile installed, simply run:
make setup
Run the Task
This project includes a tasks.json
for Visual Studio Code.
- Open the Command Palette by pressing
Ctrl+Shift+P
(orCmd+Shift+P
on macOS). - Type “Run Task” and select it from the list.
- Choose “Run Superhero Cluster.”
This task will execute a series of scripts to start the nodes in the cluster, ensuring each component of the system is initiated correctly.
The superhero dispatcher runs on http://localhost:4900
A closer look at the Gotham node script
iex MIX_ENV=dev CITY_NAME=gotham --name [email protected] --cookie secret_superhero_cookie -S mix
-
MIX_ENV=dev
: Sets the Mix environment todevelopment
. -
CITY_NAME=gotham
: Specifies the city name for the location node. -
-S mix
: Launches the Elixir application within an interactive shell. -
--name [email protected]
: Sets the node name togotham
and binds it to the localhost IP address (127.0.0.1
). -
--cookie secret_superhero_cookie
: Erlang/Elixir uses a shared cookie for authentication between nodes in a cluster.
Location Nodes
Learn more about the location nodes in the Resilient distributed systems blog entry.
Dispatch Node
This node contains logic to syncronize data and provide real-time views for users to dispatch superheroes.
Application Structure
defmodule Dispatch.Application do
require Logger
use Application
@nodes ["[email protected]", "[email protected]", "[email protected]"]
def start(_type, _args) do
connect_to_nodes(@nodes)
children = [
DispatchWeb.Telemetry,
{Phoenix.PubSub, name: Dispatch.PubSub},
Dispatch.GossipServer,
Dispatch.PollServer,
DispatchWeb.Endpoint
]
opts = [
strategy: :one_for_one,
max_restarts: 3,
max_seconds: 1,
name: Dispatch.Supervisor
]
Supervisor.start_link(children, opts)
end
def config_change(changed, _new, removed) do
DispatchWeb.Endpoint.config_change(changed, removed)
:ok
end
defp connect_to_nodes(nodes) do
Enum.each(nodes, fn node ->
case Node.connect(String.to_atom(node)) do
true -> Logger.info("Connected to #{node} successfully.")
false -> Logger.error("Failed to connect to #{node}.")
:ignored -> Logger.info("Already connected to #{node}.")
end
end)
end
end
Startup Process
Upon initialization, the application establishes connections with the nodes: gotham
, metropolis
, and capitol
and then starts a supervisor that manages the Gossip and Polling GenServers.
Location Module
Erlang has a concept of “location transparency,” where processes interact with remote nodes as if they were local.
By convention, GenServer communications are wrapped in an API, ensuring that interactions remain consistent and manageable.
defmodule Dispatch.Location do
@locations [:gotham, :metropolis, :capitol]
def get_locations do
@locations
end
def get_data(location) when location in @locations do
GenServer.call({:global, location}, :get_superheroes)
end
def get_data_from_random_location do
location = Enum.random(@locations)
get_data(location)
end
def update_data(data, location) when location in @locations do
GenServer.cast({:global, location}, {:update_superheroes, data})
end
def update_data_to_random_location(data) do
location = Enum.random(@locations)
update_data(data, location)
end
def assign_to_location(data, superhero_id, location) when location in @locations do
GenServer.call({:global, location}, {:assign_superhero, data, superhero_id})
end
def bomb_random_city do
location = Enum.random(@locations)
GenServer.cast({:global, location}, :bomb_city)
{:ok, location}
end
end
The Location
module encapsulates behaviors such as retrieving and updating data, assigning superheroes, and intentionally causing disruptions.
GossipServer
The Superhero cluster uses a gossip (epidemic) protocol, where information spreads through the network like rumors in social networks. Gossip is a form of eventual consistency, where each node maintains a unique copy of the data, with all nodes converging to the same state over time.
defmodule Dispatch.GossipServer do
use GenServer
alias Dispatch.Location
@gossip_interval 1_000
def start_link(_) do
GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
end
def start_gossip do
GenServer.cast(__MODULE__, :start_gossip)
end
def init(state) do
schedule_gossip()
{:ok, state}
end
def handle_cast(:start_gossip, state) do
gossip()
schedule_gossip()
{:noreply, state}
end
def handle_info(:gossip, state) do
gossip()
schedule_gossip()
{:noreply, state}
end
defp schedule_gossip do
Process.send_after(self(), :gossip, @gossip_interval)
end
defp gossip do
locations = Location.get_locations()
source = Enum.random(locations)
destination = Enum.random(locations -- [source])
source_data = Location.get_data(source)
Location.update_data(source_data, destination)
end
end
The GossipServer
gossips every second, randomly selecting a node, requesting its current data, and passing that information to another randomly selected node. Shorter intervals result in quicker system convergence. There is no error handling logic here; if the GossipServer
contacts a terminated location, it will crash and be restarted by the supervisor.
Poll Server
The dispatch
node keeps the LiveViews updated by asking a random location server for its data and boadcasting to the LiveView
subscribers.
defmodule Dispatch.PollServer do
use GenServer
alias Dispatch.Location
alias Phoenix.PubSub
@poll_interval 2_000
@poll_topic "data_topic"
def topic do
@poll_topic
end
def start_link(_) do
GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
end
def start_poll do
GenServer.cast(__MODULE__, :start_poll)
end
def init(state) do
schedule_poll()
{:ok, state}
end
def handle_cast(:start_poll, state) do
poll()
schedule_poll()
{:noreply, state}
end
def handle_info(:poll, state) do
poll()
schedule_poll()
{:noreply, state}
end
defp schedule_poll do
Process.send_after(self(), :poll, @poll_interval)
end
defp poll do
PubSub.broadcast(
Dispatch.PubSub,
@poll_topic,
{:update_data, Location.get_data_from_random_location()}
)
end
end
Like the GossipServer
, the PollServer
will crash and be restarted if it contacts a terminated location.
When something goes wrong, these GenServers simply crash, and the supervisor restarts them into a known good state. This “Let it crash” approach eliminates the need to build error handling for every possible transient reason a location might be unavailable.
Superhero Module
The Superhero
module implements the data struct and the Conflict-free Replicated Data Type (CRDT) used to maintain data consistency across cluster.
See the Leveraging CRDTs for eventual consistency blog post for more information.
It also contains the struct defining the cluster’s data and a create_superhero
helper function.
defmodule Dispatch.Superhero do
defstruct superheroes: %{},
tombstones: %{}
def create_superhero do
%{
id: UUID.uuid4(),
name: Faker.Superhero.name(),
location: :home,
is_patrolling: false,
last_updated: System.system_time(:second),
health: 100
}
end
def converge_superhero(data_a, data_b) do
combined_superheroes =
Map.merge(data_a.superheroes, data_b.superheroes, fn _id, sh1, sh2 ->
if sh1.last_updated > sh2.last_updated, do: sh1, else: sh2
end)
combined_tombstones =
Map.merge(data_a.tombstones, data_b.tombstones, fn _id, d1, d2 ->
d1 or d2
end)
filtered_superheroes =
Map.filter(combined_superheroes, fn {id, _} ->
not Map.has_key?(combined_tombstones, id)
end)
%{superheroes: filtered_superheroes, tombstones: combined_tombstones}
end
end
Index Module
The Index module generates a real-time web interface for managing superheroes using LiveView
.
defmodule DispatchWeb.DispatchLive.Index do
use DispatchWeb, :live_view
alias Dispatch.{Location, Superhero, PollServer}
alias Phoenix.PubSub
require Logger
def mount(_params, _session, socket) do
PubSub.subscribe(Dispatch.PubSub, PollServer.topic())
new_socket =
socket
|> assign(:data, %Superhero{})
|> assign(:location_options, [:home | Location.get_locations()])
{:ok, new_socket}
end
def handle_event("create", _params, socket) do
superhero = Superhero.create_superhero()
data = socket.assigns.data
updated_data = %{
superheroes: Map.put_new(data.superheroes, superhero.id, superhero),
tombstones: data.tombstones
}
new_data = Superhero.converge_superhero(data, updated_data)
Location.update_data_to_random_location(new_data)
new_socket =
socket
|> assign(:data, new_data)
{:noreply, new_socket}
end
def handle_event("delete", %{"id" => id}, socket) do
data = socket.assigns.data
updated_data = %{
superheroes: Map.delete(data.superheroes, id),
tombstones: Map.put(data.tombstones, id, true)
}
new_data = Superhero.converge_superhero(data, updated_data)
Location.update_data_to_random_location(new_data)
{:noreply, assign(socket, :data, new_data)}
end
def handle_event("select_changed", %{"value" => location, "id" => id}, socket) do
new_socket =
case location do
"gotham" -> update_location(socket, id, :gotham)
"metropolis" -> update_location(socket, id, :metropolis)
"capitol" -> update_location(socket, id, :capitol)
_ -> update_home(socket, id)
end
{:noreply, new_socket}
end
def handle_event("bomb", _params, socket) do
{:ok, bombed_location} = Location.bomb_random_city()
new_socket =
socket |> put_flash(:info, "#{bombed_location} has been destroyed")
{:noreply, new_socket}
end
def handle_info({:update_data, data}, socket) do
local_data = socket.assigns.data
{:noreply, socket |> assign(:data, Superhero.converge_superhero(local_data, data))}
end
defp update_home(socket, superhero_id) do
data = socket.assigns.data
new_data = assign_to_home(data, superhero_id)
Location.update_data_to_random_location(new_data)
socket |> assign(:data, new_data)
end
defp update_location(socket, superhero_id, location) do
new_data =
socket.assigns.data
|> Location.assign_to_location(superhero_id, location)
socket
|> assign(:data, new_data)
end
defp assign_to_home(data, superhero_id) do
timestamp = System.system_time(:second)
if Map.has_key?(data.tombstones, superhero_id) do
Logger.warning("Attempt to assign dead superhero with ID #{superhero_id}")
data
else
Logger.info("Assigning superhero with ID #{superhero_id} to home")
updated_hero = Map.get(data.superheroes, superhero_id, %{})
new_health = min(updated_hero.health + 20, 100)
superheroes_updated =
Map.put(
data.superheroes,
superhero_id,
Map.merge(updated_hero, %{
location: :home,
is_patrolling: false,
last_updated: timestamp,
health: new_health
})
)
%{data | superheroes: superheroes_updated}
end
end
end
-
create
: A button click creates a new superhero, updating the local data and sending it to a random location in the cluster using a gossip protocol. -
delete
: A button click terminates a superhero, updating the local data and propagating it to a random location in the cluster. -
select_changed
: A selection change in the drop-down informs the selected location of the intention to assign a superhero. The location updates the data, merges the updated data with its local state, and returns a copy thatLiveView
uses to update its local state. -
bomb
: A button click sends a message to a location to terminate the city, leaving the cluster to manage the recovery. -
update_data
: Subscribes to the polling topic, which sends an:update_data
event along with data from a random location.
Conclusion
The Superhero cluster operates on a leaderless architecture, allowing it to function even in a degraded state. When a dispatcher bombs a city, the affected node crashes and immediately restarts in an empty state, relying on eventual consistency to synchronize its data over time. A location node can be terminated either by overwhelming its supervisor with frequent bombings or by deleting its terminal. Despite these disruptions, the Superhero cluster continues to run, with processes crashing and restarting as necessary.
For example, if gotham
is terminated and a dispatcher attempts to assign a superhero to gotham
, the LiveView
will restart, and eventual consistency will synchronize the data to the last known good state. This resilient design ensures the system remains operational, even under disruptive conditions.
Eventual consistency prioritizes speed, isolation, and resilience by allowing nodes in a distributed system to operate independently and asynchronously, enabling the cluster to continue functioning even if parts of it are temporarily unavailable or experiencing high latency. The tradeoff, however, is the potential for inconsistencies, where conflicting changes might lead to data being overwritten or lost.