Elixir: Running a cluster of dynamic nodes
Elixir Cluster Series
The previous post described a cluster with one dispatch node linked to three independent location nodes. Using CRDTs and supervisors, this cluster demonstrates resilience, functioning despite the loss of one or two location nodes. However, because the dispatch node assumes leadership responsibilities by generating and gossiping between locations, the system is not leaderless. A more effective strategy would use a uniform node type, making it interchangeable without concern for specific roles like dispatch or location.
Also, while the cluster can operate with missing nodes, it lacks a mechanism for recovery. To address this, it needs the ability to dynamically start and stop nodes.
Starting the Cluster
Install Dependencies
Install dependencies, if you have a Makefile installed, simply run:
make setup
Run the Task
This project includes a tasks.json
for Visual Studio Code.
- Open the Command Palette by pressing
Ctrl+Shift+P
(orCmd+Shift+P
on macOS). - Type “Run Task” and select it from the list.
- Choose “Run Dispatch Cluster”
This task will execute a series of scripts to start the nodes in the cluster, ensuring each component of the system is initiated correctly.
- Gotham runs on http://localhost:4900
- Metropolis runs on http://localhost:4901
- Capitol runs on http://localhost:4902
Visit any of the locations and create some superheroes. Each superhero starts within milliseconds, but it takes a second or two for the polling service to catch up and update the LiveView. Notice the superheroes will start in any active dispatch center and every dispatch center view will display all superheroes. These superheroes will continue fighting crime until they either run out of health points or are terminated from the view. If you shut down a dispatch center—either gracefully from the view or by forcefully closing its terminal—the superheroes migrate to another operational dispatch center. On rerunning the “Run Dispatch Cluster” task, any closed centers will come back online, ready to accept superheroes again.
You might notice that when superheroes migrate between nodes, they revert to their initial state. This isn’t a bug—it’s a fundamental aspect of how Erlang handles process state. In Erlang, persisting and retrieving state is a challenge apart from managing life cycles.
As Joe Armstrong puts it in a discussion thread:
“That’s it and all about it - it’s not related to supervisors or other system components. This is standard practice in a distributed system. Real fault tolerance requires more than one machine to ensure coverage in the event of a complete machine failure. This is precisely why we replicate everything. If an entire machine fails and we only have a pointer to vital data on that machine, then we’re in serious trouble.”
Managing Nodes
In this cluster, nodes frequently come and go, accommodating scaling, handling code updates, and recovering from unexpected failures.
Application
This application.ex
utilizes the libcluster
Erlang library to manage clusters of dynamic nodes. Its Cluster.Supervisor
employs Conflict-Free Replicated Data Types (CRDTs) and a Gossip protocol on port 45892
to maintain synchronization among leaderless nodes.
def start(_type, _args) do
topologies = [
dispatch_gossip_cluster: [
strategy: Cluster.Strategy.Gossip,
config: [
port: 45892
]
]
]
children = [
{Cluster.Supervisor, [topologies, [name: Dispatch.ClusterSupervisor]]},
]
Supervisor.start_link(children, strategy: :one_for_one)
end
The application.ex
also includes a shutdown function to gracefully stop the :dispatch
application and shut down the node.
def shutdown do
Logger.info("Shutting down application")
Application.stop(:dispatch)
:init.stop()
end
LiveView
The LiveView displays connected nodes and monitors for their connections and disconnections.
def mount(_params, _session, socket) do
:net_kernel.monitor_nodes(true, [])
new_socket =
socket
|> assign(:node_list, Node.list())
{:ok, new_socket}
end
def handle_info({:nodeup, _node}, socket) do
new_socket =
socket
|> assign(:node_list, Node.list())
{:noreply, new_socket}
end
@impl true
def handle_info({:nodedown, _node}, socket) do
new_socket =
socket
|> assign(:node_list, Node.list())
{:noreply, new_socket}
end
Node.list
retrieves the current list of connected nodes.monitor_nodes
is a feature of Erlang’s:net_kernel
, which broadcasts messages when nodes connect and disconnect.:nodeup
and:nodedown
are messages frommonitor_nodes
that notify when a node connects or disconnects. Here, LiveView responds by callingNode.list
to update its internal state.
The LiveView can also shut down another connected node.
def handle_event("stop_node", %{"node" => node_name}, socket) do
find_node = Enum.find(Node.list(), fn node -> Atom.to_string(node) == node_name end)
case find_node do
nil ->
Logger.error("Node not found: #{node_name}")
node ->
case :rpc.call(node, Dispatch.Application, :shutdown, []) do
{:badrpc, reason} ->
Logger.error("RPC failed: #{inspect(reason)}")
result ->
Logger.info("RPC succeeded: #{inspect(result)}")
end
end
{:noreply, socket}
end
:rpc.call
: Erlang enables the calling of remote procedures. In this case, it invokes the:shutdown
function in theDispatch.Application
of the remote node, shutting down the application and terminating the node.
Notice that instead of converting the node_name
directly into an atom, this approach converts the atoms of the nodes into strings to find the matching node atom. Because Erlang does not garbage collect atoms, it is a best practice to avoid converting dynamic user input into a potential unlimited number of atoms.
The Superhero
In the previous configuration, the cluster treated superheroes as data, passing it between nodes and leveraging a Conflict-Free Replicated Data Type (CRDT) for eventual consistency. However, there’s an alternative approach: superheroes can be conceptualized as processes, each representing an individual superhero. Instead of transferring data about superheroes, Erlang allows for the migration of the superheroes themselves.
In the Dispatch.SuperheroServer
GenServer, a superhero is an actor, actively patrolling and combating crime.
defmodule Dispatch.SuperheroServer do
use GenServer
require Logger
alias Dispatch.{SuperheroRegistry, Superhero, SuperheroApi}
alias Horde.Registry
@polling_interval 4000
def start_link(name) do
GenServer.start_link(__MODULE__, name, name: via_tuple(name))
end
def init(name) do
Process.flag(:trap_exit, true)
Logger.info("Initializing superhero server for #{name}")
send(self(), :init_superhero)
Process.send_after(self(), :decide_action, @polling_interval)
{:ok, %Superhero{name: name}}
end
def handle_call(:get_details, _from, superhero) do
{:reply, superhero, superhero}
end
def handle_cast(:resting, superhero) do
health_gain = :rand.uniform(40)
new_health = min(superhero.health + health_gain, 100)
new_superhero = Map.put(superhero, :health, new_health)
Logger.info(
"#{superhero.name} is resting and has regained #{health_gain} health points, new health: #{new_health}."
)
{:noreply, new_superhero}
end
def handle_cast(:fighting_crime, superhero) do
if :rand.uniform(2) == 1 do
new_superhero = Map.update!(superhero, :fights_won, &(&1 + 1))
Logger.info("#{superhero.name} won a fight, total wins: #{new_superhero.fights_won}")
{:noreply, new_superhero}
else
health_loss = :rand.uniform(40)
new_health = superhero.health - health_loss
new_superhero = Map.update!(superhero, :health, &(&1 - health_loss))
new_superhero = Map.update!(new_superhero, :fights_lost, &(&1 + 1))
Logger.info(
"#{superhero.name} lost a fight, lost #{health_loss} health, remaining health: #{new_health}"
)
if new_health <= 0 do
Logger.warning("#{superhero.name} has health <= 0, terminating.")
SuperheroApi.stop(superhero.name)
end
{:noreply, new_superhero}
end
end
def handle_info(:init_superhero, superhero) do
updated_superhero = Superhero.get_state(superhero.name)
{:noreply, updated_superhero}
end
def handle_info(:decide_action, superhero) do
new_superhero =
case random_action() do
:fighting ->
GenServer.cast(self(), :fighting_crime)
Map.put(superhero, :is_patrolling, true)
:resting ->
GenServer.cast(self(), :resting)
Map.put(superhero, :is_patrolling, false)
end
Process.send_after(self(), :decide_action, @polling_interval)
{:noreply, new_superhero}
end
def terminate(reason, superhero) do
Logger.info(
"Terminating superhero server for #{superhero.name} with reason: #{inspect(reason)}"
)
:ok
end
defp random_action do
if :rand.uniform(4) == 1 do
:resting
else
:fighting
end
end
def via_tuple(id), do: {:via, Registry, {SuperheroRegistry, id}}
end
Callback
:get_details
synchronously returns the state of the superhero to the requester.
Casts
:resting
gains between 1 and 40 health points.:fighting_crime
either results in a win or a loss. If lost, the superhero loses between 1 and 40 health points.
Internal Messages
:decide_action
alternates between fighting crime and resting, polling itself with a 4-second delay.
This GenServer manages the dynamic life of a superhero, making decisions that affect their ability to continue safeguarding the city.
Superhero Registry
While creating a named GenServer
with a superhero’s name is feasible, utilizing a Registry is more common in applications managing dynamic GenServers
. In this configuration, instead of referencing superheroes directly by name, each process registers itself with the Registry upon creation, establishing a centralized location to retrieve each superhero’s process identifier (pid
).
Horde
A common pain point in any cluster is managing a global registry. This instance employs the Horde
library, which utilizes a Conflict-Free Replicated Data Type (CRDT) to synchronize each node’s registry. The cluster balances superheroes using the Horde
library to randomly place each superhero on a known reliable node. Additionally, if a node shuts down or crashes, Horde
migrates superheroes to other known good nodes.
Horde Supervisor
children = [
{
Horde.DynamicSupervisor,
child_spec: {Dispatch.SuperheroServer, restart: :transient},
name: Dispatch.SuperheroSupervisor,
strategy: :one_for_one,
members: :auto,
distribution_strategy: Horde.UniformDistribution
},
]
Superhero Registry Handler
This module contains functions to managing a dynamic registry of superheroes.
defmodule Dispatch.SuperheroRegistryHandler do
require Logger
alias Dispatch.{SuperheroRegistry}
alias Horde.{Registry}
def get_all_superheroes do
Registry.select(SuperheroRegistry, [{{:"$1", :_, :_}, [], [:"$1"]}])
end
def get_pid_for_superhero(name) do
case Registry.lookup(SuperheroRegistry, name) do
[{pid, _}] ->
pid
_ ->
nil
end
end
def get_node_for_superhero(name) do
case Registry.lookup(SuperheroRegistry, name) do
[{pid, _}] ->
node(pid)
_ ->
:unknown
end
end
end
get_all_superheroes
retrieves the names of all active superheroGenServers
.get_pid_for_superhero
retrieves a superhero’spid
when given their name.get_node_for_superhero
determines the node location of a superhero’s process.
Superhero API
To improve code organization and modularity, the Superhero GenServer API is encapsulated in Dispatch.SuperheroApi
.
defmodule Dispatch.SuperheroApi do
require Logger
alias Dispatch.{
SuperheroRegistry,
SuperheroServer,
SuperheroSupervisor,
SuperheroRegistryHandler
}
alias Horde.{DynamicSupervisor, Registry}
def start(name) do
child_spec = %{
id: name,
start: {SuperheroServer, :start_link, [name]}
}
case DynamicSupervisor.start_child(SuperheroSupervisor, child_spec) do
{:ok, pid} ->
Logger.info("Superhero created successfully: #{name} with PID #{inspect(pid)}")
{:ok, pid}
{:error, reason} ->
Logger.error("Failed to create superhero: #{name} due to #{inspect(reason)}")
{:error, reason}
end
end
def stop(name) do
case SuperheroRegistryHandler.get_pid_for_superhero(name) do
nil ->
{:error, :not_found}
pid ->
:ok = DynamicSupervisor.terminate_child(SuperheroSupervisor, pid)
:ok = Registry.unregister(SuperheroRegistry, name)
{:ok, :terminated}
end
end
def get_details(name) do
GenServer.call(SuperheroServer.via_tuple(name), :get_details)
end
def get_all_superheroes_with_details do
SuperheroRegistryHandler.get_all_superheroes()
|> Enum.map(fn name ->
get_details(name)
|> Map.put(:node, SuperheroRegistryHandler.get_node_for_superhero(name))
end)
end
end
start
: Initiates the GenServer’sstart_link
function, registering the superhero process within theSuperheroSupervisor
and Horde’s dynamic registry. This ensures each superhero is actively managed and tracked from creation.stop
: Handles the deregistration of the superhero from Horde’s registry and terminates the process using theSuperheroSupervisor
, ensuring a clean shutdown of superhero processes.get_details
: Retrieves a superhero’s current state by making a synchronous call to the GenServer managing that superhero, utilizing the PID retrieved from Horde’s registry.get_all_superheroes_with_details
: Collects details from all registered superheroes. This function maps over the names of all superheroes, fetching their detailed states. While this approach is simplistic and not optimized for production environments, it works for this demonstration.
Polling
All the LiveViews will need to be kept up to date, the easiest way is set up a GenServer to poll the servers and broadcast on the node’s PubSub.
Conclusion
When discussing the robustness of production services, we often refer to uptime in terms of “nines.” For example, a dispatch center node with an uptime of 99.9%—known as “one nine”—translates to approximately 5.26 minutes of downtime per year. By simply moving to three isolated nodes, the likelihood of all three failing simultaneously drops to about 0.03 seconds per year, achieving an uptime of 99.9999999%, or “seven nines.”
Erlang’s inherent integration of fault tolerance, dynamic scalability, and seamless node connectivity into its runtime system simplifies complex cluster management tasks significantly compared to other technologies. Its native ability to gracefully handle failures, through automatic process restarts and efficient load distribution across nodes, ensures high availability and robust performance.
A common challenge in Erlang, particularly within large-scale distributed systems, is effectively tracking the location of processes across multiple nodes. In our scenario, managing a moderate number of superheroes and dispatch centers, the system remains manageable. However, as the system scales—especially when each superhero requires backup instances across additional dispatch centers—the complexity and overhead of maintaining a global registry can become a performance bottleneck.
Horde leverages Conflict-Free Replicated Data Types (CRDTs) to manage a global registry using eventual consistency. Yet, in scenarios where immediate consistency is crucial, a consensus-based strategy might be necessary, introducing additional latency due to the overhead of achieving agreement across nodes.