Elixir and Mnesia: Running a cluster with state
Elixir Cluster Series
In a previous blog post, we explored the lifecycle of superheroes within the cluster, where each superhero ran on a GenServer distributed across three dispatch center nodes. When a node crashes, its superhero processes migrate to other known good nodes.
Now, it’s time to tackle the challenge of maintaining state. A superhero must save its internal state so that it can be restored on restart.
Let’s revisit Joe Armstrong’s insight from this discussion thread:
“That’s it and all about it - it’s not related to supervisors or other system components. This is standard practice in a distributed system. Real fault tolerance requires more than one machine to ensure coverage in the event of a complete machine failure. This is precisely why we replicate everything. If an entire machine fails and we only have a pointer to vital data on that machine, then we’re in serious trouble.”
State management must be independent of the lifecycle, and data must be replicated across multiple locations to ensure reliability.
See this post for information on starting the superhero cluster
Erlang Term Storage
Erlang Term Storage (ETS) is Erlang’s built-in key-value store, which is a good solution if the problem is maintaining a copy of a process’s internal state to be used upon an unexpected lifecycle restart. If the data needs to be persisted between node restarts, it is easy to transition to Disk-based ETS (DETS) which persists the data on disk to be recovered on system restart.
However, ETS is local, it is not a strategy for maintaining data across a distributed cluster.
Choices
According to Brewer’s Theorem, a distributed data system cannot simultaneously achieve Consistency, Availability, and Partition Tolerance (CAP).
- Consistency ensures all nodes see the same data at the same time.
- Availability guarantees every request receives a response.
- Partition Tolerance means the system functions when network failures split it into disconnected parts.
A distributed system can provide two of these three guarantees, forcing trade-offs based on requirements.
In the Superhero Dispatch Cluster, the choice is whether to ensure that each dispatch center can continue to operate even when communication is disrupted, or whether it’s more important that all operational centers maintain a consistent view of the data.
Conflict-Free Replicated Data Types (CRDTs)
An earlier solution used a CRDT, which maintained a complete copy of the data in each node and communicated with a gossip (epidemic) protocol. This approach prioritizes availability and partition tolerance (AT). Because the nodes operate independently, they continue to function even when isolated. However, this comes at the cost of eventual consistency, where all nodes do not see the same data at the same time (C).
Mnesia
Mnesia is Erlang’s built-in distributed database management system. Like the CRDT solution, data is replicated across nodes, ensuring high availability (A). It uses transactions to write to all nodes simultaneously, guaranteeing consistent data (C). However, if communication is lost (P), Mnesia will block progress until connectivity is restored.
Set Up
Mnesia needs to be aware of the nodes that are part of its cluster. The timing of when these nodes join, or whether they join at all, doesn’t matter as long as they are recognized as part of the cluster.
The order of operations in Mnesia setup is crucial. This setup is implemented in a GenServer, making it easy to start and supervise within the application. The typical pattern in a GenServer is to initialize and then have it call itself to asynchronously perform setup tasks. However, Mnesia setup needs to be sequential, so all the logic is performed within the init
function.
defmodule Dispatch.Store.Setup do
use GenServer
require Logger
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
@impl true
def init(opts) do
Keyword.get(opts, :nodes, [])
|> setup_mnesia()
|> create_superhero_table()
:mnesia.wait_for_tables([:superhero], 5000)
{:ok, %{}}
end
defp setup_mnesia(nodes) do
:mnesia.create_schema([Node.self()])
:mnesia.start()
:mnesia.change_config(:extra_db_nodes, nodes)
Logger.info("Mnesia configured with nodes: #{inspect(nodes)}")
nodes
end
defp create_superhero_table(nodes) do
Logger.info("Creating superhero table on #{Node.self()}.")
case :mnesia.create_table(:superhero, superhero_table_opts(nodes)) do
{:atomic, :ok} ->
Logger.info("Superhero table created successfully.")
{:aborted, {:already_exists, table}} ->
Logger.info("#{table} table already exists.")
end
nodes
end
defp superhero_table_opts(nodes) do
[
{:attributes,
[:id, :name, :node, :is_patrolling, :last_updated, :fights_won, :fights_lost, :health]},
{:ram_copies, nodes},
{:type, :set}
]
end
end
setup_mnesia
create_schema
: Mnesia requires a schema to be set up first. The first node will receive an:ok
response, while subsequent nodes will receive an:aborted
response, indicating that the schema has already been created.:mnesia.start
: Each node must start its own local instance of Mnesia.:mnesia.change_config
: Each node needs to update its local configuration to recognize the nodes that make up the cluster.
-
create_superhero_table
: The first node needs to set up the table that other nodes will join. Mnesia’screate_table
will attempt to create the table and will abort if it already exists on another known node. The superhero table includes a list of attributes, maintains a RAM copy on each node, and is of typeset
, meaning eachid
can have only one corresponding value. Note that the attributes are simply names—Mnesia saves records as tuples, these names are a convenience to document the intended meaning of each position in the tuple. :mnesia.wait_for_tables
The setup will wait up to 5 seconds to establish and receive a local copy of the table.
Mnesia follows an idempotent strategy: it attempts an operation, succeeds if it hasn’t been done already, and aborts if it has. It is safe to ignore the aborts.
Create, Read, Update, and Delete (CRUD)
The SuperheroStore
wraps Erlang’s Mnesia functions into more Elixir-friendly code and provides an API for basic create, read, update, and delete functionality.
defmodule Dispatch.Store.SuperheroStore do
alias Dispatch.Superhero
require Logger
def upsert_superhero(%{id: id} = superhero) do
superhero_tuple =
%Superhero{id: id}
|> Map.merge(superhero)
|> Map.put(:last_updated, System.system_time(:second))
|> convert_to_tuple()
|> Tuple.insert_at(0, :superhero)
transaction_result =
:mnesia.transaction(fn ->
:mnesia.write(superhero_tuple)
:ok
end)
case transaction_result do
{:atomic, :ok} ->
{:ok, superhero}
{:aborted, reason} ->
Logger.error("Failed to upsert superhero #{id} from Mnesia: #{inspect(reason)}")
{:error, reason}
end
end
def get_superhero(id) do
transaction_result =
:mnesia.transaction(fn ->
:mnesia.read({:superhero, id})
end)
case transaction_result do
{:atomic, [head]} ->
superhero =
head
|> Tuple.delete_at(0)
|> convert_to_struct()
{:ok, superhero}
{:atomic, []} ->
{:error, :not_found}
{:aborted, reason} ->
Logger.error("Failed to get superhero #{id} from Mnesia: #{inspect(reason)}")
{:error, reason}
end
end
def get_all_superheroes do
result =
:mnesia.transaction(fn ->
:mnesia.match_object({:superhero, :_, :_, :_, :_, :_, :_, :_, :_})
end)
case result do
{:atomic, data} when data != [] ->
superheroes =
Enum.map(data, fn tuple ->
tuple
|> Tuple.delete_at(0)
|> convert_to_struct()
end)
{:ok, superheroes}
{:atomic, []} ->
{:ok, []}
{:aborted, reason} ->
Logger.error("Failed to get superheroes from Mnesia: #{inspect(reason)}")
{:error, reason}
end
end
def delete_superhero(id) do
result =
:mnesia.transaction(fn ->
:mnesia.delete({:superhero, id})
end)
case result do
{:atomic, :ok} ->
{:ok, :deleted}
{:aborted, reason} ->
Logger.error("Failed to delete superhero #{id} from Mnesia: #{inspect(reason)}")
{:error, reason}
end
end
def convert_to_tuple(%Superhero{
id: id,
name: name,
node: node,
is_patrolling: is_patrolling,
last_updated: last_updated,
fights_won: fights_won,
fights_lost: fights_lost,
health: health
}) do
{id, name, node, is_patrolling, last_updated, fights_won, fights_lost, health}
end
def convert_to_struct(
{id, name, node, is_patrolling, last_updated, fights_won, fights_lost, health}
) do
%Superhero{
id: id,
name: name,
node: node,
is_patrolling: is_patrolling,
last_updated: last_updated,
fights_won: fights_won,
fights_lost: fights_lost,
health: health
}
end
end
The application passes a superhero
as a struct, but Mnesia saves records as tuples. The convert_to_tuple
and convert_to_struct
functions switch between these representations. The first position in Mnesia’s tuple is reserved for the table atom, and the second serves as the unique identifier.
upsert_superhero
: Since the:superhero
table is configured as aset
, any duplicateid
will replace the existing record. The:mnesia.write
function does not differentiate between creating and updating records.get_superhero
: The:mnesia.read
function takes a tuple containing the table name andid
and retrieves the corresponding record. Because the table is aset
, this function always returns a single value.get_all_superheroes
: The:mnesia.match_object
function scans the entire table and returns all matching results. Although not efficient, this approach is sufficient for this example.delete_superhero
: The:mnesia.delete
function takes a tuple containing the table name and the key and deletes the corresponding entry.
The safest way to implement Mnesia operations is by using atomic transactions. Transactions use locks to ensure data consistency and integrity, but they can be slower than non-transactional operations. While Mnesia is generally performant, applications with very large databases might consider performing certain actions outside of a transaction (using “dirty” operations) to improve speed. However, this approach sacrifices the guarantees provided by atomic transactions, so it should be used judiciously.
Superhero GenServer
The SuperheroServer
has been refactored to integrate with the new database. While the superhero continues to fight crime, it now records its internal state in Mnesia whenever it takes an action.
defmodule Dispatch.SuperheroServer do
use GenServer
require Logger
alias Dispatch.{SuperheroRegistry, Superhero, SuperheroApi, Store}
alias Horde.Registry
alias Store.SuperheroStore
@polling_interval 4000
@max_health 100
@default_fights 0
def start_link(id) do
GenServer.start_link(__MODULE__, id, name: via_tuple(id))
end
@impl true
def init(id) do
Process.flag(:trap_exit, true)
Logger.info("Initializing superhero server for #{id}")
send(self(), :init_superhero)
schedule_next_action()
{:ok, %Superhero{id: id}}
end
@impl true
def handle_info(:init_superhero, superhero) do
superhero =
case SuperheroStore.get_superhero(superhero.id) do
{:ok, existing_superhero} ->
Logger.info(
"Superhero #{superhero.id} already exists in Mnesia. Using existing superhero."
)
existing_superhero |> Map.put(:node, node())
{:error, :not_found} ->
generate_new_superhero(superhero)
{:error, reason} ->
Logger.error(
"Failed to retrieve superhero #{superhero.id} from Mnesia: #{inspect(reason)}"
)
superhero
end
send(self(), {:update_superhero, superhero})
{:noreply, superhero}
end
@impl true
def handle_info(:decide_action, superhero) do
superhero =
case random_action() do
:fighting ->
GenServer.cast(self(), :fighting_crime)
Map.put(superhero, :is_patrolling, true)
:resting ->
GenServer.cast(self(), :resting)
Map.put(superhero, :is_patrolling, false)
end
schedule_next_action()
{:noreply, superhero}
end
@impl true
def handle_info({:update_superhero, superhero}, _state) do
case SuperheroStore.upsert_superhero(superhero) do
{:ok, saved_superhero} ->
Logger.info("Superhero #{saved_superhero.id} added to Mnesia.")
{:noreply, saved_superhero}
{:error, reason} ->
Logger.error("Failed to add superhero #{superhero.id} to Mnesia: #{inspect(reason)}")
{:noreply, superhero}
end
end
@impl true
def handle_cast(:resting, superhero) do
health_gain = :rand.uniform(40)
new_health = min(superhero.health + health_gain, @max_health)
updated_superhero = Map.put(superhero, :health, new_health)
Logger.info(
"#{superhero.name} is resting and has regained #{health_gain} health points, new health: #{new_health}."
)
send(self(), {:update_superhero, updated_superhero})
{:noreply, updated_superhero}
end
@impl true
def handle_cast(:fighting_crime, superhero) do
case :rand.uniform(2) do
1 -> {:noreply, handle_win(superhero)}
2 -> {:noreply, handle_loss(superhero)}
end
end
@impl true
def terminate(reason, superhero) do
Logger.info(
"Terminating superhero server for #{superhero.name} with reason: #{inspect(reason)}"
)
:ok
end
defp generate_new_superhero(superhero) do
new_superhero =
superhero
|> Map.put(:node, node())
|> Map.put(:name, "#{Faker.Superhero.prefix()} #{Faker.Superhero.name()}")
send(self(), {:update_superhero, new_superhero})
new_superhero
end
defp handle_win(superhero) do
updated_superhero = Map.update(superhero, :fights_won, @default_fights, &(&1 + 1))
Logger.info("#{superhero.name} won a fight, total wins: #{updated_superhero.fights_won}")
send(self(), {:update_superhero, updated_superhero})
updated_superhero
end
defp handle_loss(superhero) do
health_loss = :rand.uniform(40)
updated_superhero = update_superhero_losses(superhero, health_loss)
Logger.info(
"#{superhero.name} lost a fight, lost #{health_loss} health, remaining health: #{updated_superhero.health}"
)
if updated_superhero.health <= 0 do
handle_critical_health(updated_superhero)
else
send(self(), {:update_superhero, updated_superhero})
updated_superhero
end
end
defp handle_critical_health(superhero) do
Logger.warning("#{superhero.name} has health <= 0, terminating.")
SuperheroStore.delete_superhero(superhero.id)
SuperheroApi.stop(superhero.id)
{:noreply, superhero}
end
defp update_superhero_losses(superhero, health_loss) do
superhero
|> Map.update(:health, @max_health, &(&1 - health_loss))
|> Map.update(:fights_lost, @default_fights, &(&1 + 1))
end
defp random_action do
if :rand.uniform(4) == 1, do: :resting, else: :fighting
end
defp schedule_next_action do
Process.send_after(self(), :decide_action, @polling_interval)
end
def via_tuple(id), do: {:via, Registry, {SuperheroRegistry, id}}
end
The previous version of the superhero dispatch application used the superhero’s name as the unique identifier. Now that we have storage set up, it is time to refactor the app to use a unique ID.
:init_superhero
is the asynchronous callback triggered after initializing a newsuperhero
. It checks Mnesia for an existing record of the superhero’s state, uses the existing record if found, or creates a new one if not. It then adds the current node location and sends a message to itself to update Mnesia.:update_superhero
is an asynchronous call to update Mnesia with the current internal state. For extremely time-sensitive operations, it might make sense to call the database synchronously, in which case Mnesia can be called directly.handle_critical_health
now terminates the GenServer and deletes the record from Mnesia.
PubSub
Previously, the application polled the SuperheroRegistry
for a list of superhero PIDs
, queried each process for its internal state, and then checked the registry again for the current node where the PID
existed. This approach was not very efficient and would quickly break down as the number of active superheroes grew.
Now that we have a database containing the current state of the superheroes, we could simply poll the database. However, a better approach is to use Mnesia’s event subscription feature and broadcast the updates.
defmodule Dispatch.Store.PubSub do
use GenServer
alias Dispatch.Superhero
alias Phoenix.PubSub
alias Dispatch.Store.SuperheroStore
require Logger
def topic do
"data_on_#{Node.self()}"
end
def start_link(_) do
GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
end
@impl true
def init(state) do
:mnesia.subscribe({:table, :superhero, :detailed})
Logger.info("Subscribed to changes in the superhero table.")
{:ok, state}
end
@impl true
def handle_info(
{:mnesia_table_event, {:write, :superhero, superhero_details, _op_extra, _tid_info}},
state
) do
Logger.info("Superhero updated or added: #{inspect(superhero_details)}")
superhero =
superhero_details
|> Tuple.delete_at(0)
|> SuperheroStore.convert_to_struct()
PubSub.broadcast(
Dispatch.PubSub,
topic(),
{:update_superhero, superhero}
)
{:noreply, state}
end
@impl true
def handle_info(
{:mnesia_table_event, {:delete, :superhero, superhero_details, _op_extra, _tid_info}},
state
) do
Logger.info("Superhero deleted: #{inspect(superhero_details)}")
{:superhero, id} = superhero_details
PubSub.broadcast(
Dispatch.PubSub,
topic(),
{:delete_superhero, %Superhero{id: id}}
)
{:noreply, state}
end
end
This module subscribes to Mnesia and monitors the superhero table. If a superhero record is written or deleted, the module broadcasts the updated superhero to subscribers.
Note that now that we have individual record updates and deletions, we can leverage LiveView’s @stream
and broadcast the changes rather than the entire list.
Conclusion
Mnesia, first introduced in 1998, can be viewed as either “niche and outdated” or “well-established and battle-tested”. Despite its longevity, Mnesia hasn’t seen widespread adoption in the Elixir community, partly due to the lack of a modern, user-friendly wrapper built into Elixir. While the community has the Amnesia wrapper, it does not have many downloads.
Mnesia offers a Declarative Query Language called QLC (Query List Comprehension) for composing complex queries. However, QLC doesn’t integrate well with Elixir, making it more practical to complex queries in Erlang.
Mnesia deserves recognition for solving complex problems in distributed, dynamic, and concurrent systems long before these issues became mainstream concerns in software engineering. It remains a robust solution for ensuring consistency and availability across nodes in a cluster. Before opting for a more modern tool, it’s worth considering what Mnesia can offer and whether it is the right solution for your specific problem.