Elixir Cluster Series

  1. Leveraging CRDTs for eventual consistency
  2. Elixir: Resilient distributed systems
  3. Elixir: Running a cluster of nodes
  4. Elixir: Running a cluster of dynamic nodes
  5. Elixir and Mnesia: Running a cluster with state
  6. Elixir and Raft: Running a cluster with state

In a previous blog post, we explored the lifecycle of superheroes within the cluster, where each superhero ran on a GenServer distributed across three dispatch center nodes. When a node crashes, its superhero processes migrate to other known good nodes.

Now, it’s time to tackle the challenge of maintaining state. A superhero must save its internal state so that it can be restored on restart.

Let’s revisit Joe Armstrong’s insight from this discussion thread:

“That’s it and all about it - it’s not related to supervisors or other system components. This is standard practice in a distributed system. Real fault tolerance requires more than one machine to ensure coverage in the event of a complete machine failure. This is precisely why we replicate everything. If an entire machine fails and we only have a pointer to vital data on that machine, then we’re in serious trouble.”

State management must be independent of the lifecycle, and data must be replicated across multiple locations to ensure reliability.

See the code

See this post for information on starting the superhero cluster

Erlang Term Storage

Erlang Term Storage (ETS) is Erlang’s built-in key-value store, which is a good solution if the problem is maintaining a copy of a process’s internal state to be used upon an unexpected lifecycle restart. If the data needs to be persisted between node restarts, it is easy to transition to Disk-based ETS (DETS) which persists the data on disk to be recovered on system restart.

However, ETS is local, it is not a strategy for maintaining data across a distributed cluster.

Choices

According to Brewer’s Theorem, a distributed data system cannot simultaneously achieve Consistency, Availability, and Partition Tolerance (CAP).

  • Consistency ensures all nodes see the same data at the same time.
  • Availability guarantees every request receives a response.
  • Partition Tolerance means the system functions when network failures split it into disconnected parts.

A distributed system can provide two of these three guarantees, forcing trade-offs based on requirements.

In the Superhero Dispatch Cluster, the choice is whether to ensure that each dispatch center can continue to operate even when communication is disrupted, or whether it’s more important that all operational centers maintain a consistent view of the data.

Conflict-Free Replicated Data Types (CRDTs)

An earlier solution used a CRDT, which maintained a complete copy of the data in each node and communicated with a gossip (epidemic) protocol. This approach prioritizes availability and partition tolerance (AT). Because the nodes operate independently, they continue to function even when isolated. However, this comes at the cost of eventual consistency, where all nodes do not see the same data at the same time (C).

Mnesia

Mnesia is Erlang’s built-in distributed database management system. Like the CRDT solution, data is replicated across nodes, ensuring high availability (A). It uses transactions to write to all nodes simultaneously, guaranteeing consistent data (C). However, if communication is lost (P), Mnesia will block progress until connectivity is restored.

Set Up

Mnesia needs to be aware of the nodes that are part of its cluster. The timing of when these nodes join, or whether they join at all, doesn’t matter as long as they are recognized as part of the cluster.

The order of operations in Mnesia setup is crucial. This setup is implemented in a GenServer, making it easy to start and supervise within the application. The typical pattern in a GenServer is to initialize and then have it call itself to asynchronously perform setup tasks. However, Mnesia setup needs to be sequential, so all the logic is performed within the init function.

defmodule Dispatch.Store.Setup do
  use GenServer
  require Logger

  def start_link(opts) do
    GenServer.start_link(__MODULE__, opts, name: __MODULE__)
  end

  @impl true
  def init(opts) do
    Keyword.get(opts, :nodes, [])
    |> setup_mnesia()
    |> create_superhero_table()

    :mnesia.wait_for_tables([:superhero], 5000)
    {:ok, %{}}
  end

  defp setup_mnesia(nodes) do
    :mnesia.create_schema([Node.self()])
    :mnesia.start()
    :mnesia.change_config(:extra_db_nodes, nodes)
    Logger.info("Mnesia configured with nodes: #{inspect(nodes)}")
    nodes
  end

  defp create_superhero_table(nodes) do
    Logger.info("Creating superhero table on #{Node.self()}.")

    case :mnesia.create_table(:superhero, superhero_table_opts(nodes)) do
      {:atomic, :ok} ->
        Logger.info("Superhero table created successfully.")

      {:aborted, {:already_exists, table}} ->
        Logger.info("#{table} table already exists.")
    end

    nodes
  end

  defp superhero_table_opts(nodes) do
    [
      {:attributes,
       [:id, :name, :node, :is_patrolling, :last_updated, :fights_won, :fights_lost, :health]},
      {:ram_copies, nodes},
      {:type, :set}
    ]
  end
end
  1. setup_mnesia
    1. create_schema: Mnesia requires a schema to be set up first. The first node will receive an :ok response, while subsequent nodes will receive an :aborted response, indicating that the schema has already been created.
    2. :mnesia.start: Each node must start its own local instance of Mnesia.
    3. :mnesia.change_config: Each node needs to update its local configuration to recognize the nodes that make up the cluster.
  2. create_superhero_table: The first node needs to set up the table that other nodes will join. Mnesia’s create_table will attempt to create the table and will abort if it already exists on another known node. The superhero table includes a list of attributes, maintains a RAM copy on each node, and is of type set, meaning each id can have only one corresponding value. Note that the attributes are simply names—Mnesia saves records as tuples, these names are a convenience to document the intended meaning of each position in the tuple.

  3. :mnesia.wait_for_tables The setup will wait up to 5 seconds to establish and receive a local copy of the table.

Mnesia follows an idempotent strategy: it attempts an operation, succeeds if it hasn’t been done already, and aborts if it has. It is safe to ignore the aborts.

Create, Read, Update, and Delete (CRUD)

The SuperheroStore wraps Erlang’s Mnesia functions into more Elixir-friendly code and provides an API for basic create, read, update, and delete functionality.

defmodule Dispatch.Store.SuperheroStore do
  alias Dispatch.Superhero
  require Logger

  def upsert_superhero(%{id: id} = superhero) do
    superhero_tuple =
      %Superhero{id: id}
      |> Map.merge(superhero)
      |> Map.put(:last_updated, System.system_time(:second))
      |> convert_to_tuple()
      |> Tuple.insert_at(0, :superhero)

    transaction_result =
      :mnesia.transaction(fn ->
        :mnesia.write(superhero_tuple)
        :ok
      end)

    case transaction_result do
      {:atomic, :ok} ->
        {:ok, superhero}

      {:aborted, reason} ->
        Logger.error("Failed to upsert superhero #{id} from Mnesia: #{inspect(reason)}")
        {:error, reason}
    end
  end

  def get_superhero(id) do
    transaction_result =
      :mnesia.transaction(fn ->
        :mnesia.read({:superhero, id})
      end)

    case transaction_result do
      {:atomic, [head]} ->
        superhero =
          head
          |> Tuple.delete_at(0)
          |> convert_to_struct()

        {:ok, superhero}

      {:atomic, []} ->
        {:error, :not_found}

      {:aborted, reason} ->
        Logger.error("Failed to get superhero #{id} from Mnesia: #{inspect(reason)}")
        {:error, reason}
    end
  end

  def get_all_superheroes do
    result =
      :mnesia.transaction(fn ->
        :mnesia.match_object({:superhero, :_, :_, :_, :_, :_, :_, :_, :_})
      end)

    case result do
      {:atomic, data} when data != [] ->
        superheroes =
          Enum.map(data, fn tuple ->
            tuple
            |> Tuple.delete_at(0)
            |> convert_to_struct()
          end)

        {:ok, superheroes}

      {:atomic, []} ->
        {:ok, []}

      {:aborted, reason} ->
        Logger.error("Failed to get superheroes from Mnesia: #{inspect(reason)}")
        {:error, reason}
    end
  end

  def delete_superhero(id) do
    result =
      :mnesia.transaction(fn ->
        :mnesia.delete({:superhero, id})
      end)

    case result do
      {:atomic, :ok} ->
        {:ok, :deleted}

      {:aborted, reason} ->
        Logger.error("Failed to delete superhero #{id} from Mnesia: #{inspect(reason)}")
        {:error, reason}
    end
  end

  def convert_to_tuple(%Superhero{
        id: id,
        name: name,
        node: node,
        is_patrolling: is_patrolling,
        last_updated: last_updated,
        fights_won: fights_won,
        fights_lost: fights_lost,
        health: health
      }) do
    {id, name, node, is_patrolling, last_updated, fights_won, fights_lost, health}
  end

  def convert_to_struct(
        {id, name, node, is_patrolling, last_updated, fights_won, fights_lost, health}
      ) do
    %Superhero{
      id: id,
      name: name,
      node: node,
      is_patrolling: is_patrolling,
      last_updated: last_updated,
      fights_won: fights_won,
      fights_lost: fights_lost,
      health: health
    }
  end
end

The application passes a superhero as a struct, but Mnesia saves records as tuples. The convert_to_tuple and convert_to_struct functions switch between these representations. The first position in Mnesia’s tuple is reserved for the table atom, and the second serves as the unique identifier.

  • upsert_superhero: Since the :superhero table is configured as a set, any duplicate id will replace the existing record. The :mnesia.write function does not differentiate between creating and updating records.
  • get_superhero: The :mnesia.read function takes a tuple containing the table name and id and retrieves the corresponding record. Because the table is a set, this function always returns a single value.
  • get_all_superheroes: The :mnesia.match_object function scans the entire table and returns all matching results. Although not efficient, this approach is sufficient for this example.
  • delete_superhero: The :mnesia.delete function takes a tuple containing the table name and the key and deletes the corresponding entry.

The safest way to implement Mnesia operations is by using atomic transactions. Transactions use locks to ensure data consistency and integrity, but they can be slower than non-transactional operations. While Mnesia is generally performant, applications with very large databases might consider performing certain actions outside of a transaction (using “dirty” operations) to improve speed. However, this approach sacrifices the guarantees provided by atomic transactions, so it should be used judiciously.

Superhero GenServer

The SuperheroServer has been refactored to integrate with the new database. While the superhero continues to fight crime, it now records its internal state in Mnesia whenever it takes an action.

defmodule Dispatch.SuperheroServer do
  use GenServer
  require Logger

  alias Dispatch.{SuperheroRegistry, Superhero, SuperheroApi, Store}
  alias Horde.Registry
  alias Store.SuperheroStore

  @polling_interval 4000
  @max_health 100
  @default_fights 0

  def start_link(id) do
    GenServer.start_link(__MODULE__, id, name: via_tuple(id))
  end

  @impl true
  def init(id) do
    Process.flag(:trap_exit, true)

    Logger.info("Initializing superhero server for #{id}")
    send(self(), :init_superhero)
    schedule_next_action()

    {:ok, %Superhero{id: id}}
  end

  @impl true
  def handle_info(:init_superhero, superhero) do
    superhero =
      case SuperheroStore.get_superhero(superhero.id) do
        {:ok, existing_superhero} ->
          Logger.info(
            "Superhero #{superhero.id} already exists in Mnesia. Using existing superhero."
          )

          existing_superhero |> Map.put(:node, node())

        {:error, :not_found} ->
          generate_new_superhero(superhero)

        {:error, reason} ->
          Logger.error(
            "Failed to retrieve superhero #{superhero.id} from Mnesia: #{inspect(reason)}"
          )

          superhero
      end

    send(self(), {:update_superhero, superhero})
    {:noreply, superhero}
  end

  @impl true
  def handle_info(:decide_action, superhero) do
    superhero =
      case random_action() do
        :fighting ->
          GenServer.cast(self(), :fighting_crime)
          Map.put(superhero, :is_patrolling, true)

        :resting ->
          GenServer.cast(self(), :resting)
          Map.put(superhero, :is_patrolling, false)
      end

    schedule_next_action()
    {:noreply, superhero}
  end

  @impl true
  def handle_info({:update_superhero, superhero}, _state) do
    case SuperheroStore.upsert_superhero(superhero) do
      {:ok, saved_superhero} ->
        Logger.info("Superhero #{saved_superhero.id} added to Mnesia.")
        {:noreply, saved_superhero}

      {:error, reason} ->
        Logger.error("Failed to add superhero #{superhero.id} to Mnesia: #{inspect(reason)}")
        {:noreply, superhero}
    end
  end

  @impl true
  def handle_cast(:resting, superhero) do
    health_gain = :rand.uniform(40)
    new_health = min(superhero.health + health_gain, @max_health)
    updated_superhero = Map.put(superhero, :health, new_health)

    Logger.info(
      "#{superhero.name} is resting and has regained #{health_gain} health points, new health: #{new_health}."
    )

    send(self(), {:update_superhero, updated_superhero})
    {:noreply, updated_superhero}
  end

  @impl true
  def handle_cast(:fighting_crime, superhero) do
    case :rand.uniform(2) do
      1 -> {:noreply, handle_win(superhero)}
      2 -> {:noreply, handle_loss(superhero)}
    end
  end

  @impl true
  def terminate(reason, superhero) do
    Logger.info(
      "Terminating superhero server for #{superhero.name} with reason: #{inspect(reason)}"
    )

    :ok
  end

  defp generate_new_superhero(superhero) do
    new_superhero =
      superhero
      |> Map.put(:node, node())
      |> Map.put(:name, "#{Faker.Superhero.prefix()} #{Faker.Superhero.name()}")

    send(self(), {:update_superhero, new_superhero})
    new_superhero
  end

  defp handle_win(superhero) do
    updated_superhero = Map.update(superhero, :fights_won, @default_fights, &(&1 + 1))
    Logger.info("#{superhero.name} won a fight, total wins: #{updated_superhero.fights_won}")
    send(self(), {:update_superhero, updated_superhero})
    updated_superhero
  end

  defp handle_loss(superhero) do
    health_loss = :rand.uniform(40)
    updated_superhero = update_superhero_losses(superhero, health_loss)

    Logger.info(
      "#{superhero.name} lost a fight, lost #{health_loss} health, remaining health: #{updated_superhero.health}"
    )

    if updated_superhero.health <= 0 do
      handle_critical_health(updated_superhero)
    else
      send(self(), {:update_superhero, updated_superhero})
      updated_superhero
    end
  end

  defp handle_critical_health(superhero) do
    Logger.warning("#{superhero.name} has health <= 0, terminating.")
    SuperheroStore.delete_superhero(superhero.id)
    SuperheroApi.stop(superhero.id)
    {:noreply, superhero}
  end

  defp update_superhero_losses(superhero, health_loss) do
    superhero
    |> Map.update(:health, @max_health, &(&1 - health_loss))
    |> Map.update(:fights_lost, @default_fights, &(&1 + 1))
  end

  defp random_action do
    if :rand.uniform(4) == 1, do: :resting, else: :fighting
  end

  defp schedule_next_action do
    Process.send_after(self(), :decide_action, @polling_interval)
  end

  def via_tuple(id), do: {:via, Registry, {SuperheroRegistry, id}}
end

The previous version of the superhero dispatch application used the superhero’s name as the unique identifier. Now that we have storage set up, it is time to refactor the app to use a unique ID.

  • :init_superhero is the asynchronous callback triggered after initializing a new superhero. It checks Mnesia for an existing record of the superhero’s state, uses the existing record if found, or creates a new one if not. It then adds the current node location and sends a message to itself to update Mnesia.
  • :update_superhero is an asynchronous call to update Mnesia with the current internal state. For extremely time-sensitive operations, it might make sense to call the database synchronously, in which case Mnesia can be called directly.
  • handle_critical_health now terminates the GenServer and deletes the record from Mnesia.

PubSub

Previously, the application polled the SuperheroRegistry for a list of superhero PIDs, queried each process for its internal state, and then checked the registry again for the current node where the PID existed. This approach was not very efficient and would quickly break down as the number of active superheroes grew.

Now that we have a database containing the current state of the superheroes, we could simply poll the database. However, a better approach is to use Mnesia’s event subscription feature and broadcast the updates.

defmodule Dispatch.Store.PubSub do
  use GenServer
  alias Dispatch.Superhero
  alias Phoenix.PubSub
  alias Dispatch.Store.SuperheroStore

  require Logger

  def topic do
    "data_on_#{Node.self()}"
  end

  def start_link(_) do
    GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
  end

  @impl true
  def init(state) do
    :mnesia.subscribe({:table, :superhero, :detailed})
    Logger.info("Subscribed to changes in the superhero table.")
    {:ok, state}
  end

  @impl true
  def handle_info(
        {:mnesia_table_event, {:write, :superhero, superhero_details, _op_extra, _tid_info}},
        state
      ) do
    Logger.info("Superhero updated or added: #{inspect(superhero_details)}")

    superhero =
      superhero_details
      |> Tuple.delete_at(0)
      |> SuperheroStore.convert_to_struct()

    PubSub.broadcast(
      Dispatch.PubSub,
      topic(),
      {:update_superhero, superhero}
    )

    {:noreply, state}
  end

  @impl true
  def handle_info(
        {:mnesia_table_event, {:delete, :superhero, superhero_details, _op_extra, _tid_info}},
        state
      ) do
    Logger.info("Superhero deleted: #{inspect(superhero_details)}")

    {:superhero, id} = superhero_details

    PubSub.broadcast(
      Dispatch.PubSub,
      topic(),
      {:delete_superhero, %Superhero{id: id}}
    )

    {:noreply, state}
  end
end

This module subscribes to Mnesia and monitors the superhero table. If a superhero record is written or deleted, the module broadcasts the updated superhero to subscribers.

Note that now that we have individual record updates and deletions, we can leverage LiveView’s @stream and broadcast the changes rather than the entire list.

Conclusion

Mnesia, first introduced in 1998, can be viewed as either “niche and outdated” or “well-established and battle-tested”. Despite its longevity, Mnesia hasn’t seen widespread adoption in the Elixir community, partly due to the lack of a modern, user-friendly wrapper built into Elixir. While the community has the Amnesia wrapper, it does not have many downloads.

Mnesia offers a Declarative Query Language called QLC (Query List Comprehension) for composing complex queries. However, QLC doesn’t integrate well with Elixir, making it more practical to complex queries in Erlang.

Mnesia deserves recognition for solving complex problems in distributed, dynamic, and concurrent systems long before these issues became mainstream concerns in software engineering. It remains a robust solution for ensuring consistency and availability across nodes in a cluster. Before opting for a more modern tool, it’s worth considering what Mnesia can offer and whether it is the right solution for your specific problem.