Elixir Cluster Series

  1. Leveraging CRDTs for eventual consistency
  2. Elixir: Resilient distributed systems
  3. Elixir: Running a cluster of nodes
  4. Elixir: Running a cluster of dynamic nodes
  5. Elixir and Mnesia: Running a cluster with state
  6. Elixir and Raft: Running a cluster with state

The Superhero Cluster comprises location nodes: gotham, metropolis, and capitol. Each node functions as an independent actor, autonomously assigning superheroes to patrol duties. The cluster also includes a unique dispatch node, which provides independent real-time web views for users to create and manage superheroes across the system.

This cluster operates on a leaderless architecture. While the data held by each actor may vary at any given moment, the cluster ensures eventual consistency, meaning that over time, all nodes will converge to a consistent state.

The Superhero Cluster follows a “last in wins” strategy. For example, if a dispatcher reassigns a superhero to a new location, but the superhero’s current location assigns them to patrol duties before this update is received, the system will prioritize the latest action, with the superhero reverting to their original location.

See the code

Starting the Cluster

Install Dependencies

Install dependencies for both the /location and /superhero subdirectories. If you have a Makefile installed, simply run:

make setup

Run the Task

This project includes a tasks.json for Visual Studio Code.

  1. Open the Command Palette by pressing Ctrl+Shift+P (or Cmd+Shift+P on macOS).
  2. Type “Run Task” and select it from the list.
  3. Choose “Run Superhero Cluster.”

This task will execute a series of scripts to start the nodes in the cluster, ensuring each component of the system is initiated correctly.

The superhero dispatcher runs on http://localhost:4900

A closer look at the Gotham node script

iex MIX_ENV=dev CITY_NAME=gotham --name [email protected] --cookie secret_superhero_cookie -S mix
  • MIX_ENV=dev: Sets the Mix environment to development.

  • CITY_NAME=gotham: Specifies the city name for the location node.

  • -S mix: Launches the Elixir application within an interactive shell.

  • --name [email protected]: Sets the node name to gotham and binds it to the localhost IP address (127.0.0.1).

  • --cookie secret_superhero_cookie: Erlang/Elixir uses a shared cookie for authentication between nodes in a cluster.

Location Nodes

Learn more about the location nodes in the Resilient distributed systems blog entry.

Dispatch Node

This node contains logic to syncronize data and provide real-time views for users to dispatch superheroes.

Application Structure

defmodule Dispatch.Application do
  require Logger
  use Application
  @nodes ["[email protected]", "[email protected]", "[email protected]"]

  def start(_type, _args) do
    connect_to_nodes(@nodes)

    children = [
      DispatchWeb.Telemetry,
      {Phoenix.PubSub, name: Dispatch.PubSub},
      Dispatch.GossipServer,
      Dispatch.PollServer,
      DispatchWeb.Endpoint
    ]

    opts = [
      strategy: :one_for_one,
      max_restarts: 3,
      max_seconds: 1,
      name: Dispatch.Supervisor
    ]

    Supervisor.start_link(children, opts)
  end

  def config_change(changed, _new, removed) do
    DispatchWeb.Endpoint.config_change(changed, removed)
    :ok
  end

  defp connect_to_nodes(nodes) do
    Enum.each(nodes, fn node ->
      case Node.connect(String.to_atom(node)) do
        true -> Logger.info("Connected to #{node} successfully.")
        false -> Logger.error("Failed to connect to #{node}.")
        :ignored -> Logger.info("Already connected to #{node}.")
      end
    end)
  end
end

Startup Process

Upon initialization, the application establishes connections with the nodes: gotham, metropolis, and capitol and then starts a supervisor that manages the Gossip and Polling GenServers.

Location Module

Erlang has a concept of “location transparency,” where processes interact with remote nodes as if they were local.

By convention, GenServer communications are wrapped in an API, ensuring that interactions remain consistent and manageable.

defmodule Dispatch.Location do
  @locations [:gotham, :metropolis, :capitol]

  def get_locations do
    @locations
  end

  def get_data(location) when location in @locations do
    GenServer.call({:global, location}, :get_superheroes)
  end

  def get_data_from_random_location do
    location = Enum.random(@locations)
    get_data(location)
  end

  def update_data(data, location) when location in @locations do
    GenServer.cast({:global, location}, {:update_superheroes, data})
  end

  def update_data_to_random_location(data) do
    location = Enum.random(@locations)
    update_data(data, location)
  end

  def assign_to_location(data, superhero_id, location) when location in @locations do
    GenServer.call({:global, location}, {:assign_superhero, data, superhero_id})
  end

  def bomb_random_city do
    location = Enum.random(@locations)
    GenServer.cast({:global, location}, :bomb_city)
    {:ok, location}
  end
end

The Location module encapsulates behaviors such as retrieving and updating data, assigning superheroes, and intentionally causing disruptions.

GossipServer

The Superhero cluster uses a gossip (epidemic) protocol, where information spreads through the network like rumors in social networks. Gossip is a form of eventual consistency, where each node maintains a unique copy of the data, with all nodes converging to the same state over time.

defmodule Dispatch.GossipServer do
  use GenServer
  alias Dispatch.Location

  @gossip_interval 1_000

  def start_link(_) do
    GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
  end

  def start_gossip do
    GenServer.cast(__MODULE__, :start_gossip)
  end

  def init(state) do
    schedule_gossip()
    {:ok, state}
  end

  def handle_cast(:start_gossip, state) do
    gossip()
    schedule_gossip()
    {:noreply, state}
  end

  def handle_info(:gossip, state) do
    gossip()
    schedule_gossip()
    {:noreply, state}
  end

  defp schedule_gossip do
    Process.send_after(self(), :gossip, @gossip_interval)
  end

  defp gossip do
    locations = Location.get_locations()

    source = Enum.random(locations)
    destination = Enum.random(locations -- [source])

    source_data = Location.get_data(source)
    Location.update_data(source_data, destination)
  end
end

The GossipServer gossips every second, randomly selecting a node, requesting its current data, and passing that information to another randomly selected node. Shorter intervals result in quicker system convergence. There is no error handling logic here; if the GossipServer contacts a terminated location, it will crash and be restarted by the supervisor.

Poll Server

The dispatch node keeps the LiveViews updated by asking a random location server for its data and boadcasting to the LiveView subscribers.

defmodule Dispatch.PollServer do
  use GenServer
  alias Dispatch.Location
  alias Phoenix.PubSub

  @poll_interval 2_000
  @poll_topic "data_topic"

  def topic do
    @poll_topic
  end

  def start_link(_) do
    GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
  end

  def start_poll do
    GenServer.cast(__MODULE__, :start_poll)
  end

  def init(state) do
    schedule_poll()
    {:ok, state}
  end

  def handle_cast(:start_poll, state) do
    poll()
    schedule_poll()
    {:noreply, state}
  end

  def handle_info(:poll, state) do
    poll()
    schedule_poll()
    {:noreply, state}
  end

  defp schedule_poll do
    Process.send_after(self(), :poll, @poll_interval)
  end

  defp poll do
    PubSub.broadcast(
      Dispatch.PubSub,
      @poll_topic,
      {:update_data, Location.get_data_from_random_location()}
    )
  end
end

Like the GossipServer, the PollServer will crash and be restarted if it contacts a terminated location.

When something goes wrong, these GenServers simply crash, and the supervisor restarts them into a known good state. This “Let it crash” approach eliminates the need to build error handling for every possible transient reason a location might be unavailable.

Superhero Module

The Superhero module implements the data struct and the Conflict-free Replicated Data Type (CRDT) used to maintain data consistency across cluster.

See the Leveraging CRDTs for eventual consistency blog post for more information.

It also contains the struct defining the cluster’s data and a create_superhero helper function.

defmodule Dispatch.Superhero do
  defstruct superheroes: %{},
            tombstones: %{}

  def create_superhero do
    %{
      id: UUID.uuid4(),
      name: Faker.Superhero.name(),
      location: :home,
      is_patrolling: false,
      last_updated: System.system_time(:second),
      health: 100
    }
  end

  def converge_superhero(data_a, data_b) do
    combined_superheroes =
      Map.merge(data_a.superheroes, data_b.superheroes, fn _id, sh1, sh2 ->
        if sh1.last_updated > sh2.last_updated, do: sh1, else: sh2
      end)

    combined_tombstones =
      Map.merge(data_a.tombstones, data_b.tombstones, fn _id, d1, d2 ->
        d1 or d2
      end)

    filtered_superheroes =
      Map.filter(combined_superheroes, fn {id, _} ->
        not Map.has_key?(combined_tombstones, id)
      end)

    %{superheroes: filtered_superheroes, tombstones: combined_tombstones}
  end
end

Index Module

The Index module generates a real-time web interface for managing superheroes using LiveView.

defmodule DispatchWeb.DispatchLive.Index do
  use DispatchWeb, :live_view
  alias Dispatch.{Location, Superhero, PollServer}
  alias Phoenix.PubSub

  require Logger

  def mount(_params, _session, socket) do
    PubSub.subscribe(Dispatch.PubSub, PollServer.topic())

    new_socket =
      socket
      |> assign(:data, %Superhero{})
      |> assign(:location_options, [:home | Location.get_locations()])

    {:ok, new_socket}
  end

  def handle_event("create", _params, socket) do
    superhero = Superhero.create_superhero()
    data = socket.assigns.data

    updated_data = %{
      superheroes: Map.put_new(data.superheroes, superhero.id, superhero),
      tombstones: data.tombstones
    }

    new_data = Superhero.converge_superhero(data, updated_data)
    Location.update_data_to_random_location(new_data)

    new_socket =
      socket
      |> assign(:data, new_data)

    {:noreply, new_socket}
  end

  def handle_event("delete", %{"id" => id}, socket) do
    data = socket.assigns.data

    updated_data = %{
      superheroes: Map.delete(data.superheroes, id),
      tombstones: Map.put(data.tombstones, id, true)
    }

    new_data = Superhero.converge_superhero(data, updated_data)
    Location.update_data_to_random_location(new_data)

    {:noreply, assign(socket, :data, new_data)}
  end

  def handle_event("select_changed", %{"value" => location, "id" => id}, socket) do
    new_socket =
      case location do
        "gotham" -> update_location(socket, id, :gotham)
        "metropolis" -> update_location(socket, id, :metropolis)
        "capitol" -> update_location(socket, id, :capitol)
        _ -> update_home(socket, id)
      end

    {:noreply, new_socket}
  end

  def handle_event("bomb", _params, socket) do
    {:ok, bombed_location} = Location.bomb_random_city()

    new_socket =
      socket |> put_flash(:info, "#{bombed_location} has been destroyed")

    {:noreply, new_socket}
  end

  def handle_info({:update_data, data}, socket) do
    local_data = socket.assigns.data

    {:noreply, socket |> assign(:data, Superhero.converge_superhero(local_data, data))}
  end

  defp update_home(socket, superhero_id) do
    data = socket.assigns.data
    new_data = assign_to_home(data, superhero_id)
    Location.update_data_to_random_location(new_data)
    socket |> assign(:data, new_data)
  end

  defp update_location(socket, superhero_id, location) do
    new_data =
      socket.assigns.data
      |> Location.assign_to_location(superhero_id, location)

    socket
    |> assign(:data, new_data)
  end

  defp assign_to_home(data, superhero_id) do
    timestamp = System.system_time(:second)

    if Map.has_key?(data.tombstones, superhero_id) do
      Logger.warning("Attempt to assign dead superhero with ID #{superhero_id}")
      data
    else
      Logger.info("Assigning superhero with ID #{superhero_id} to home")

      updated_hero = Map.get(data.superheroes, superhero_id, %{})
      new_health = min(updated_hero.health + 20, 100)

      superheroes_updated =
        Map.put(
          data.superheroes,
          superhero_id,
          Map.merge(updated_hero, %{
            location: :home,
            is_patrolling: false,
            last_updated: timestamp,
            health: new_health
          })
        )

      %{data | superheroes: superheroes_updated}
    end
  end
end
  • create: A button click creates a new superhero, updating the local data and sending it to a random location in the cluster using a gossip protocol.

  • delete: A button click terminates a superhero, updating the local data and propagating it to a random location in the cluster.

  • select_changed: A selection change in the drop-down informs the selected location of the intention to assign a superhero. The location updates the data, merges the updated data with its local state, and returns a copy that LiveView uses to update its local state.

  • bomb: A button click sends a message to a location to terminate the city, leaving the cluster to manage the recovery.

  • update_data: Subscribes to the polling topic, which sends an :update_data event along with data from a random location.

Conclusion

The Superhero cluster operates on a leaderless architecture, allowing it to function even in a degraded state. When a dispatcher bombs a city, the affected node crashes and immediately restarts in an empty state, relying on eventual consistency to synchronize its data over time. A location node can be terminated either by overwhelming its supervisor with frequent bombings or by deleting its terminal. Despite these disruptions, the Superhero cluster continues to run, with processes crashing and restarting as necessary.

For example, if gotham is terminated and a dispatcher attempts to assign a superhero to gotham, the LiveView will restart, and eventual consistency will synchronize the data to the last known good state. This resilient design ensures the system remains operational, even under disruptive conditions.

Eventual consistency prioritizes speed, isolation, and resilience by allowing nodes in a distributed system to operate independently and asynchronously, enabling the cluster to continue functioning even if parts of it are temporarily unavailable or experiencing high latency. The tradeoff, however, is the potential for inconsistencies, where conflicting changes might lead to data being overwritten or lost.