Elixir Cluster Series

  1. Leveraging CRDTs for eventual consistency
  2. Elixir: Resilient distributed systems
  3. Elixir: Running a cluster of nodes
  4. Elixir: Running a cluster of dynamic nodes
  5. Elixir and Mnesia: Running a cluster with state
  6. Elixir and Raft: Running a cluster with state

The previous post described a cluster with one dispatch node linked to three independent location nodes. Using CRDTs and supervisors, this cluster demonstrates resilience, functioning despite the loss of one or two location nodes. However, because the dispatch node assumes leadership responsibilities by generating and gossiping between locations, the system is not leaderless. A more effective strategy would use a uniform node type, making it interchangeable without concern for specific roles like dispatch or location.

Also, while the cluster can operate with missing nodes, it lacks a mechanism for recovery. To address this, it needs the ability to dynamically start and stop nodes.

See the code

Starting the Cluster

Install Dependencies

Install dependencies, if you have a Makefile installed, simply run:

make setup

Run the Task

This project includes a tasks.json for Visual Studio Code.

  1. Open the Command Palette by pressing Ctrl+Shift+P (or Cmd+Shift+P on macOS).
  2. Type “Run Task” and select it from the list.
  3. Choose “Run Dispatch Cluster”

This task will execute a series of scripts to start the nodes in the cluster, ensuring each component of the system is initiated correctly.

Visit any of the locations and create some superheroes. Each superhero starts within milliseconds, but it takes a second or two for the polling service to catch up and update the LiveView. Notice the superheroes will start in any active dispatch center and every dispatch center view will display all superheroes. These superheroes will continue fighting crime until they either run out of health points or are terminated from the view. If you shut down a dispatch center—either gracefully from the view or by forcefully closing its terminal—the superheroes migrate to another operational dispatch center. On rerunning the “Run Dispatch Cluster” task, any closed centers will come back online, ready to accept superheroes again.

You might notice that when superheroes migrate between nodes, they revert to their initial state. This isn’t a bug—it’s a fundamental aspect of how Erlang handles process state. In Erlang, persisting and retrieving state is a challenge apart from managing life cycles.

As Joe Armstrong puts it in a discussion thread:

“That’s it and all about it - it’s not related to supervisors or other system components. This is standard practice in a distributed system. Real fault tolerance requires more than one machine to ensure coverage in the event of a complete machine failure. This is precisely why we replicate everything. If an entire machine fails and we only have a pointer to vital data on that machine, then we’re in serious trouble.”

Managing Nodes

In this cluster, nodes frequently come and go, accommodating scaling, handling code updates, and recovering from unexpected failures.

Application

This application.ex utilizes the libcluster Erlang library to manage clusters of dynamic nodes. Its Cluster.Supervisor employs Conflict-Free Replicated Data Types (CRDTs) and a Gossip protocol on port 45892 to maintain synchronization among leaderless nodes.

def start(_type, _args) do
  topologies = [
    dispatch_gossip_cluster: [
      strategy: Cluster.Strategy.Gossip,
      config: [
        port: 45892
      ]
    ]
  ]

  children = [
    {Cluster.Supervisor, [topologies, [name: Dispatch.ClusterSupervisor]]},
  ]

  Supervisor.start_link(children, strategy: :one_for_one)
end

The application.ex also includes a shutdown function to gracefully stop the :dispatch application and shut down the node.

def shutdown do
    Logger.info("Shutting down application")
    Application.stop(:dispatch)
    :init.stop()
end

LiveView

The LiveView displays connected nodes and monitors for their connections and disconnections.

def mount(_params, _session, socket) do
    :net_kernel.monitor_nodes(true, [])

    new_socket =
      socket
      |> assign(:node_list, Node.list())

    {:ok, new_socket}
end

def handle_info({:nodeup, _node}, socket) do

    new_socket =
      socket
      |> assign(:node_list, Node.list())

    {:noreply, new_socket}
end

@impl true
def handle_info({:nodedown, _node}, socket) do

    new_socket =
      socket
      |> assign(:node_list, Node.list())

    {:noreply, new_socket}
end
  • Node.list retrieves the current list of connected nodes.
  • monitor_nodes is a feature of Erlang’s :net_kernel, which broadcasts messages when nodes connect and disconnect.
  • :nodeup and :nodedown are messages from monitor_nodes that notify when a node connects or disconnects. Here, LiveView responds by calling Node.list to update its internal state.

The LiveView can also shut down another connected node.

 def handle_event("stop_node", %{"node" => node_name}, socket) do
    find_node = Enum.find(Node.list(), fn node -> Atom.to_string(node) == node_name end)

    case find_node do
      nil ->
        Logger.error("Node not found: #{node_name}")

      node ->
        case :rpc.call(node, Dispatch.Application, :shutdown, []) do
          {:badrpc, reason} ->
            Logger.error("RPC failed: #{inspect(reason)}")

          result ->
            Logger.info("RPC succeeded: #{inspect(result)}")
        end
    end

    {:noreply, socket}
  end
  • :rpc.call: Erlang enables the calling of remote procedures. In this case, it invokes the :shutdown function in the Dispatch.Application of the remote node, shutting down the application and terminating the node.

Notice that instead of converting the node_name directly into an atom, this approach converts the atoms of the nodes into strings to find the matching node atom. Because Erlang does not garbage collect atoms, it is a best practice to avoid converting dynamic user input into a potential unlimited number of atoms.

The Superhero

In the previous configuration, the cluster treated superheroes as data, passing it between nodes and leveraging a Conflict-Free Replicated Data Type (CRDT) for eventual consistency. However, there’s an alternative approach: superheroes can be conceptualized as processes, each representing an individual superhero. Instead of transferring data about superheroes, Erlang allows for the migration of the superheroes themselves.

In the Dispatch.SuperheroServer GenServer, a superhero is an actor, actively patrolling and combating crime.

defmodule Dispatch.SuperheroServer do
  use GenServer
  require Logger

  alias Dispatch.{SuperheroRegistry, Superhero, SuperheroApi}
  alias Horde.Registry

  @polling_interval 4000

  def start_link(name) do
    GenServer.start_link(__MODULE__, name, name: via_tuple(name))
  end

  def init(name) do
    Process.flag(:trap_exit, true)
    Logger.info("Initializing superhero server for #{name}")

    send(self(), :init_superhero)
    Process.send_after(self(), :decide_action, @polling_interval)

    {:ok, %Superhero{name: name}}
  end

  def handle_call(:get_details, _from, superhero) do
    {:reply, superhero, superhero}
  end

  def handle_cast(:resting, superhero) do
    health_gain = :rand.uniform(40)
    new_health = min(superhero.health + health_gain, 100)
    new_superhero = Map.put(superhero, :health, new_health)

    Logger.info(
      "#{superhero.name} is resting and has regained #{health_gain} health points, new health: #{new_health}."
    )

    {:noreply, new_superhero}
  end

  def handle_cast(:fighting_crime, superhero) do
    if :rand.uniform(2) == 1 do
      new_superhero = Map.update!(superhero, :fights_won, &(&1 + 1))
      Logger.info("#{superhero.name} won a fight, total wins: #{new_superhero.fights_won}")
      {:noreply, new_superhero}
    else
      health_loss = :rand.uniform(40)
      new_health = superhero.health - health_loss
      new_superhero = Map.update!(superhero, :health, &(&1 - health_loss))
      new_superhero = Map.update!(new_superhero, :fights_lost, &(&1 + 1))

      Logger.info(
        "#{superhero.name} lost a fight, lost #{health_loss} health, remaining health: #{new_health}"
      )

      if new_health <= 0 do
        Logger.warning("#{superhero.name} has health <= 0, terminating.")
        SuperheroApi.stop(superhero.name)
      end

      {:noreply, new_superhero}
    end
  end

  def handle_info(:init_superhero, superhero) do
    updated_superhero = Superhero.get_state(superhero.name)
    {:noreply, updated_superhero}
  end

  def handle_info(:decide_action, superhero) do
    new_superhero =
      case random_action() do
        :fighting ->
          GenServer.cast(self(), :fighting_crime)
          Map.put(superhero, :is_patrolling, true)

        :resting ->
          GenServer.cast(self(), :resting)
          Map.put(superhero, :is_patrolling, false)
      end

    Process.send_after(self(), :decide_action, @polling_interval)
    {:noreply, new_superhero}
  end

  def terminate(reason, superhero) do
    Logger.info(
      "Terminating superhero server for #{superhero.name} with reason: #{inspect(reason)}"
    )

    :ok
  end

  defp random_action do
    if :rand.uniform(4) == 1 do
      :resting
    else
      :fighting
    end
  end

  def via_tuple(id), do: {:via, Registry, {SuperheroRegistry, id}}
end

Callback

  • :get_details synchronously returns the state of the superhero to the requester.

Casts

  • :resting gains between 1 and 40 health points.
  • :fighting_crime either results in a win or a loss. If lost, the superhero loses between 1 and 40 health points.

Internal Messages

  • :decide_action alternates between fighting crime and resting, polling itself with a 4-second delay.

This GenServer manages the dynamic life of a superhero, making decisions that affect their ability to continue safeguarding the city.

Superhero Registry

While creating a named GenServer with a superhero’s name is feasible, utilizing a Registry is more common in applications managing dynamic GenServers. In this configuration, instead of referencing superheroes directly by name, each process registers itself with the Registry upon creation, establishing a centralized location to retrieve each superhero’s process identifier (pid).

Horde

A common pain point in any cluster is managing a global registry. This instance employs the Horde library, which utilizes a Conflict-Free Replicated Data Type (CRDT) to synchronize each node’s registry. The cluster balances superheroes using the Horde library to randomly place each superhero on a known reliable node. Additionally, if a node shuts down or crashes, Horde migrates superheroes to other known good nodes.

Horde Supervisor

 children = [
      {
        Horde.DynamicSupervisor,
        child_spec: {Dispatch.SuperheroServer, restart: :transient},
        name: Dispatch.SuperheroSupervisor,
        strategy: :one_for_one,
        members: :auto,
        distribution_strategy: Horde.UniformDistribution
      },
    ]

Superhero Registry Handler

This module contains functions to managing a dynamic registry of superheroes.

defmodule Dispatch.SuperheroRegistryHandler do
  require Logger
  alias Dispatch.{SuperheroRegistry}
  alias Horde.{Registry}

  def get_all_superheroes do
    Registry.select(SuperheroRegistry, [{{:"$1", :_, :_}, [], [:"$1"]}])
  end

  def get_pid_for_superhero(name) do
    case Registry.lookup(SuperheroRegistry, name) do
      [{pid, _}] ->
        pid

      _ ->
        nil
    end
  end

  def get_node_for_superhero(name) do
    case Registry.lookup(SuperheroRegistry, name) do
      [{pid, _}] ->
        node(pid)

      _ ->
        :unknown
    end
  end
end
  • get_all_superheroes retrieves the names of all active superhero GenServers.
  • get_pid_for_superhero retrieves a superhero’s pid when given their name.
  • get_node_for_superhero determines the node location of a superhero’s process.

Superhero API

To improve code organization and modularity, the Superhero GenServer API is encapsulated in Dispatch.SuperheroApi.

defmodule Dispatch.SuperheroApi do
  require Logger

  alias Dispatch.{
    SuperheroRegistry,
    SuperheroServer,
    SuperheroSupervisor,
    SuperheroRegistryHandler
  }
  alias Horde.{DynamicSupervisor, Registry}

  def start(name) do
    child_spec = %{
      id: name,
      start: {SuperheroServer, :start_link, [name]}
    }

    case DynamicSupervisor.start_child(SuperheroSupervisor, child_spec) do
      {:ok, pid} ->
        Logger.info("Superhero created successfully: #{name} with PID #{inspect(pid)}")
        {:ok, pid}

      {:error, reason} ->
        Logger.error("Failed to create superhero: #{name} due to #{inspect(reason)}")
        {:error, reason}
    end
  end

  def stop(name) do
    case SuperheroRegistryHandler.get_pid_for_superhero(name) do
      nil ->
        {:error, :not_found}

      pid ->
        :ok = DynamicSupervisor.terminate_child(SuperheroSupervisor, pid)
        :ok = Registry.unregister(SuperheroRegistry, name)

        {:ok, :terminated}
    end
  end

  def get_details(name) do
    GenServer.call(SuperheroServer.via_tuple(name), :get_details)
  end

  def get_all_superheroes_with_details do
    SuperheroRegistryHandler.get_all_superheroes()
    |> Enum.map(fn name ->
      get_details(name)
      |> Map.put(:node, SuperheroRegistryHandler.get_node_for_superhero(name))
    end)
  end
end
  • start: Initiates the GenServer’s start_link function, registering the superhero process within the SuperheroSupervisor and Horde’s dynamic registry. This ensures each superhero is actively managed and tracked from creation.
  • stop: Handles the deregistration of the superhero from Horde’s registry and terminates the process using the SuperheroSupervisor, ensuring a clean shutdown of superhero processes.
  • get_details: Retrieves a superhero’s current state by making a synchronous call to the GenServer managing that superhero, utilizing the PID retrieved from Horde’s registry.
  • get_all_superheroes_with_details: Collects details from all registered superheroes. This function maps over the names of all superheroes, fetching their detailed states. While this approach is simplistic and not optimized for production environments, it works for this demonstration.

Polling

All the LiveViews will need to be kept up to date, the easiest way is set up a GenServer to poll the servers and broadcast on the node’s PubSub.

Conclusion

When discussing the robustness of production services, we often refer to uptime in terms of “nines.” For example, a dispatch center node with an uptime of 99.9%—known as “one nine”—translates to approximately 5.26 minutes of downtime per year. By simply moving to three isolated nodes, the likelihood of all three failing simultaneously drops to about 0.03 seconds per year, achieving an uptime of 99.9999999%, or “seven nines.”

Erlang’s inherent integration of fault tolerance, dynamic scalability, and seamless node connectivity into its runtime system simplifies complex cluster management tasks significantly compared to other technologies. Its native ability to gracefully handle failures, through automatic process restarts and efficient load distribution across nodes, ensures high availability and robust performance.

A common challenge in Erlang, particularly within large-scale distributed systems, is effectively tracking the location of processes across multiple nodes. In our scenario, managing a moderate number of superheroes and dispatch centers, the system remains manageable. However, as the system scales—especially when each superhero requires backup instances across additional dispatch centers—the complexity and overhead of maintaining a global registry can become a performance bottleneck.

Horde leverages Conflict-Free Replicated Data Types (CRDTs) to manage a global registry using eventual consistency. Yet, in scenarios where immediate consistency is crucial, a consensus-based strategy might be necessary, introducing additional latency due to the overhead of achieving agreement across nodes.