Elixir: Resilient distributed systems
Elixir Cluster Series
Resilient distributed systems are engineered to function continuously, even amidst component failures or unexpected disruptions. One effective approach involves the leaderless deployment of independent, self-sufficient processes. In this framework, each process operates as a standalone unit, fully capable of managing its own state and executing its responsibilities autonomously.
A City is designed to be fault tolerant: isolation prevents the spread of failures, autonomy simplifies the recovery process, and minimal coordination between nodes prevents deadlock and contention.
City GenServer
defmodule Location.City do
use GenServer
require Logger
@patrol_interval 5_000
@fight_interval 5_000
def start_link(city_name) do
GenServer.start_link(
__MODULE__,
%{city_name: city_name, data: %{superheroes: %{}, tombstones: %{}}},
name: {:global, String.to_atom(city_name)}
)
end
end
In this module, Location.City
, each city is represented as a GenServer
process. The start_link/1
function initializes these processes with unique names derived from the city_name
. By using {:global, String.to_atom(city_name)}
, each city’s GenServer
is registered globally, which means it can be referenced using that name from anywhere across the distributed system. It also starts with a default state, which includes a Map superheroes and tombstones. Note, a production system would likely have a more sophisticated strategy for discovering processes, but it is a reasonable solution for this context.
Supervision
defmodule Location.Application do
use Application
@impl true
def start(_type, _args) do
city_name = System.get_env("CITY_NAME")
children = [
{Location.City, city_name}
]
opts = [
strategy: :one_for_one,
max_restarts: 3,
max_seconds: 5,
name: Location.Supervisor
]
Supervisor.start_link(children, opts)
end
end
-
Strategy: The supervisor independently restarts a child process without affecting others.
-
Restart Intensity: The supervisor will cease restarting a process if it crashes more than three times within a five-second window, preventing a runaway failing process from destabilizing the system.
Send and Receive Superhero State
def handle_call(:get_superheroes, _from, state) do
{:reply, state.data, state}
end
def handle_cast({:update_superheroes, updated_data}, state) do
Logger.info("Updating superheroes with new data for #{state.city_name}")
combined_data = converge_superhero(state.data, updated_data)
{:noreply, update_state(state, combined_data.superheroes, combined_data.tombstones)}
end
-
:get_superheroes
is a callback that retrieves the current superhero state from the server and returns it to the caller. -
:update_superheroes
accepts incoming state updates and runs them against theconverge_superhero
CRDT function, which merges the city’s current state with the incoming data. This ensures that state updates are integrated seamlessly and consistently across different instances.
See the Leveraging CRDTs for eventual consistency blog post for more information.
Assign a Superhero
def handle_call({:assign_superhero, incoming_data, superhero_id}, _from, state) do
timestamp = System.system_time(:second)
combined_data = converge_superhero(state.data, incoming_data)
if Map.has_key?(combined_data.tombstones, superhero_id) do
Logger.warning("Attempt to assign dead superhero with ID #{superhero_id}")
{:noreply, state}
else
Logger.info("Assigning superhero with ID #{superhero_id} to #{state.city_name}")
updated_hero = Map.get(combined_data.superheroes, superhero_id, %{})
new_health = min(updated_hero.health + 20, 100)
superheroes_updated =
Map.put(
combined_data.superheroes,
superhero_id,
Map.merge(updated_hero, %{
location: state.city_name,
last_updated: timestamp,
is_patrolling: false,
health: new_health
})
)
new_state = update_state(state, superheroes_updated, combined_data.tombstones)
{:reply, new_state.data, new_state}
end
end
:assign_superhero
: This function handles the assignment of superheroes by taking incoming state data and asuperhero_id
. It first merges the incoming state with the city’s current state. If the superhero is not marked as deceased in thetombstones
, the function then assigns the superhero to the city and grants a 20-point health boost.
Sending on Patrol
GenServers function as actors, and theCity
actor is actively organizing patrols.
@patrol_interval 5_000
@impl true
def init(state) do
schedule_patrol()
{:ok, state}
end
@impl true
def handle_info(:set_patrol, state) do
schedule_patrol()
{:noreply, state |> set_superhero_patrolling()}
end
defp schedule_patrol do
Process.send_after(self(), :set_patrol, @patrol_interval)
end
defp set_superhero_patrolling(state) do
city_superheroes = get_superheroes_in_city(state)
if Enum.empty?(city_superheroes) do
Logger.info("No superheroes available to patrol in #{state.city_name}")
state
else
random_id = Enum.random(city_superheroes)
updated_heroes = set_hero_patrolling(state, random_id)
superhero_name = updated_heroes[random_id].name
Logger.info("Superhero #{superhero_name} is now patrolling in #{state.city_name}")
update_state(state, updated_heroes, state.data.tombstones)
end
end
defp get_superheroes_in_city(state) do
state.data.superheroes
|> Enum.filter(fn {_id, hero} -> hero.location == state.city_name end)
|> Enum.map(fn {id, _hero} -> id end)
end
defp set_hero_patrolling(state, superhero_id) do
timestamp = System.system_time(:second)
Map.update!(state.data.superheroes, superhero_id, fn hero ->
%{hero | is_patrolling: true, last_updated: timestamp}
end)
end
-
schedule_patrol
: This function sends a:set_patrol
message back to itself every five seconds, initiating the patrol scheduling logic. -
handle_info(:set_patrol, state)
: This function schedules a new:set_patrol
message and invokesset_superhero_patrolling
to handle patrol assignments. -
set_superhero_patrolling
: This function retrieves all superheroes assigned to the city, randomly selects one to patrol, and logs the action. This operation is idempotent; a superhero can be set to patrol multiple times without adverse effects.
Fighting Villains
The City
actor also battles superheroes against villains.
@fight_interval 5_000
def init(state) do
schedule_fight()
{:ok, state}
end
@impl true
def handle_info(:fight_villain, state) do
state = fight_villain(state)
schedule_fight()
{:noreply, state}
end
defp schedule_fight do
Process.send_after(self(), :fight_villain, @fight_interval)
end
defp fight_villain(state) do
patrolling_superheroes = get_patrolling_superheroes(state)
if Enum.empty?(patrolling_superheroes) do
Logger.info("No patrolling superheroes available to fight in #{state.city_name}")
state
else
random_id = Enum.random(patrolling_superheroes)
updated_heroes = update_superhero_health(state, random_id)
if superhero_defeated?(updated_heroes[random_id]) do
remove_defeated_superhero(state, updated_heroes, random_id)
else
update_state(state, updated_heroes, state.data.tombstones)
end
end
end
defp get_patrolling_superheroes(state) do
state.data.superheroes
|> Enum.filter(fn {_id, hero} ->
hero.location == state.city_name and hero.is_patrolling
end)
|> Enum.map(fn {id, _hero} -> id end)
end
defp update_superhero_health(state, superhero_id) do
health_reduction = :rand.uniform(11) + 4
timestamp = System.system_time(:second)
Map.update!(state.data.superheroes, superhero_id, fn hero ->
new_health = hero.health - health_reduction
log_fight_result(hero.name, health_reduction, new_health, state.city_name)
%{hero | health: max(new_health, 0), last_updated: timestamp}
end)
end
defp superhero_defeated?(hero) do
hero.health <= 0
end
defp remove_defeated_superhero(state, updated_heroes, superhero_id) do
updated_heroes = Map.delete(updated_heroes, superhero_id)
updated_tombstones = Map.put(state.data.tombstones, superhero_id, true)
update_state(state, updated_heroes, updated_tombstones)
end
defp update_state(state, superheroes, tombstones) do
%{state | data: %{superheroes: superheroes, tombstones: tombstones}}
end
defp log_fight_result(superhero_name, health_reduction, new_health, city_name) do
if new_health <= 0 do
Logger.info("Superhero #{superhero_name} lost the fight and was removed from #{city_name}")
else
Logger.info(
"Superhero #{superhero_name} fought a villain and lost #{health_reduction} health points. Remaining health: #{new_health}"
)
end
end
end
-
schedule_fight
: This function ensures a fight is scheduled every five seconds, maintaining continuous action within the city. -
handle_info(:fight_villain, state)
: This handles the actual fighting logic, updating the state after each encounter and scheduling the next battle. -
fight_villain
: Retrieves patrolling superheroes and initiates a fight with a randomly selected superhero. The function handles health updates and determines if a superhero is defeated, removing them if necessary.
Bombing the City
Another process can bomb the city, which immediately crashes the process, forcing it to restart from scratch by the supervisor with a clean state. As a result, all data within the city will be lost.
def handle_cast(:bomb_city, _from, state) do
Logger.info("#{state.city_name} is being bombed!")
exit(:boom)
end
It is important to note that when a process restarts, it does not restart from the last known good state. If that logic is needed, it is handled outside of the supervision and restart strategy.
Keeping Data Up to Date
The City process doesn’t just hold its state; it holds all relevant state data, selectively extracting information as needed. Its state is updated from external sources through :assign_superhero
and :update_superheroes
. In this way, the City process operates autonomously, indifferent to events or the existence of other cities.
Conclusion
In the world of distributed systems, each City process stands as an island. They are self-sufficient, managing their state and duties without entanglement. When a city falls, it is a local event, not a system-wide catastrophe. On unexpected terminiation, the city’s supervisor resurrects it with minimal disruption. These cities don’t need to know about each other to function; they just do, using CRDTs. The system doesn’t just survive failures; it expects and embraces them.