Elixir Cluster Series

  1. Leveraging CRDTs for eventual consistency
  2. Elixir: Resilient distributed systems
  3. Elixir: Running a cluster of nodes
  4. Elixir: Running a cluster of dynamic nodes
  5. Elixir and Mnesia: Running a cluster with state
  6. Elixir and Raft: Running a cluster with state

Resilient distributed systems are engineered to function continuously, even amidst component failures or unexpected disruptions. One effective approach involves the leaderless deployment of independent, self-sufficient processes. In this framework, each process operates as a standalone unit, fully capable of managing its own state and executing its responsibilities autonomously.

A City is designed to be fault tolerant: isolation prevents the spread of failures, autonomy simplifies the recovery process, and minimal coordination between nodes prevents deadlock and contention.

See the code

City GenServer

defmodule Location.City do
  use GenServer
  require Logger

  @patrol_interval 5_000
  @fight_interval 5_000

  def start_link(city_name) do
    GenServer.start_link(
      __MODULE__,
      %{city_name: city_name, data: %{superheroes: %{}, tombstones: %{}}},
      name: {:global, String.to_atom(city_name)}
    )
  end
end

In this module, Location.City, each city is represented as a GenServer process. The start_link/1 function initializes these processes with unique names derived from the city_name. By using {:global, String.to_atom(city_name)}, each city’s GenServer is registered globally, which means it can be referenced using that name from anywhere across the distributed system. It also starts with a default state, which includes a Map superheroes and tombstones. Note, a production system would likely have a more sophisticated strategy for discovering processes, but it is a reasonable solution for this context.

Supervision

defmodule Location.Application do
  use Application

  @impl true
  def start(_type, _args) do
    city_name = System.get_env("CITY_NAME")

    children = [
      {Location.City, city_name}
    ]

    opts = [
      strategy: :one_for_one,
      max_restarts: 3,
      max_seconds: 5,
      name: Location.Supervisor
    ]

    Supervisor.start_link(children, opts)
  end
end
  • Strategy: The supervisor independently restarts a child process without affecting others.

  • Restart Intensity: The supervisor will cease restarting a process if it crashes more than three times within a five-second window, preventing a runaway failing process from destabilizing the system.

Send and Receive Superhero State

  def handle_call(:get_superheroes, _from, state) do
    {:reply, state.data, state}
  end

  def handle_cast({:update_superheroes, updated_data}, state) do
    Logger.info("Updating superheroes with new data for #{state.city_name}")

    combined_data = converge_superhero(state.data, updated_data)

    {:noreply, update_state(state, combined_data.superheroes, combined_data.tombstones)}
  end
  • :get_superheroes is a callback that retrieves the current superhero state from the server and returns it to the caller.

  • :update_superheroes accepts incoming state updates and runs them against the converge_superhero CRDT function, which merges the city’s current state with the incoming data. This ensures that state updates are integrated seamlessly and consistently across different instances.

See the Leveraging CRDTs for eventual consistency blog post for more information.

Assign a Superhero

 def handle_call({:assign_superhero, incoming_data, superhero_id}, _from, state) do
    timestamp = System.system_time(:second)

    combined_data = converge_superhero(state.data, incoming_data)

    if Map.has_key?(combined_data.tombstones, superhero_id) do
      Logger.warning("Attempt to assign dead superhero with ID #{superhero_id}")
      {:noreply, state}
    else
      Logger.info("Assigning superhero with ID #{superhero_id} to #{state.city_name}")

      updated_hero = Map.get(combined_data.superheroes, superhero_id, %{})

      new_health = min(updated_hero.health + 20, 100)

      superheroes_updated =
        Map.put(
          combined_data.superheroes,
          superhero_id,
          Map.merge(updated_hero, %{
            location: state.city_name,
            last_updated: timestamp,
            is_patrolling: false,
            health: new_health
          })
        )

      new_state = update_state(state, superheroes_updated, combined_data.tombstones)

      {:reply, new_state.data, new_state}
    end
  end
  • :assign_superhero: This function handles the assignment of superheroes by taking incoming state data and a superhero_id. It first merges the incoming state with the city’s current state. If the superhero is not marked as deceased in the tombstones, the function then assigns the superhero to the city and grants a 20-point health boost.

Sending on Patrol

GenServers function as actors, and theCity actor is actively organizing patrols.

  @patrol_interval 5_000

  @impl true
  def init(state) do
    schedule_patrol()
    {:ok, state}
  end

  @impl true
  def handle_info(:set_patrol, state) do
    schedule_patrol()
    {:noreply, state |> set_superhero_patrolling()}
  end

  defp schedule_patrol do
    Process.send_after(self(), :set_patrol, @patrol_interval)
  end

  defp set_superhero_patrolling(state) do
    city_superheroes = get_superheroes_in_city(state)

    if Enum.empty?(city_superheroes) do
      Logger.info("No superheroes available to patrol in #{state.city_name}")
      state
    else
      random_id = Enum.random(city_superheroes)
      updated_heroes = set_hero_patrolling(state, random_id)

      superhero_name = updated_heroes[random_id].name

      Logger.info("Superhero #{superhero_name} is now patrolling in #{state.city_name}")
      update_state(state, updated_heroes, state.data.tombstones)
    end
  end

  defp get_superheroes_in_city(state) do
    state.data.superheroes
    |> Enum.filter(fn {_id, hero} -> hero.location == state.city_name end)
    |> Enum.map(fn {id, _hero} -> id end)
  end

  defp set_hero_patrolling(state, superhero_id) do
    timestamp = System.system_time(:second)

    Map.update!(state.data.superheroes, superhero_id, fn hero ->
      %{hero | is_patrolling: true, last_updated: timestamp}
    end)
  end
  • schedule_patrol: This function sends a :set_patrol message back to itself every five seconds, initiating the patrol scheduling logic.

  • handle_info(:set_patrol, state): This function schedules a new :set_patrol message and invokes set_superhero_patrolling to handle patrol assignments.

  • set_superhero_patrolling: This function retrieves all superheroes assigned to the city, randomly selects one to patrol, and logs the action. This operation is idempotent; a superhero can be set to patrol multiple times without adverse effects.

Fighting Villains

The City actor also battles superheroes against villains.

  @fight_interval 5_000

  def init(state) do
    schedule_fight()
    {:ok, state}
  end

  @impl true
  def handle_info(:fight_villain, state) do
    state = fight_villain(state)
    schedule_fight()
    {:noreply, state}
  end

  defp schedule_fight do
    Process.send_after(self(), :fight_villain, @fight_interval)
  end

  defp fight_villain(state) do
    patrolling_superheroes = get_patrolling_superheroes(state)

    if Enum.empty?(patrolling_superheroes) do
      Logger.info("No patrolling superheroes available to fight in #{state.city_name}")
      state
    else
      random_id = Enum.random(patrolling_superheroes)
      updated_heroes = update_superhero_health(state, random_id)

      if superhero_defeated?(updated_heroes[random_id]) do
        remove_defeated_superhero(state, updated_heroes, random_id)
      else
        update_state(state, updated_heroes, state.data.tombstones)
      end
    end
  end

  defp get_patrolling_superheroes(state) do
    state.data.superheroes
    |> Enum.filter(fn {_id, hero} ->
      hero.location == state.city_name and hero.is_patrolling
    end)
    |> Enum.map(fn {id, _hero} -> id end)
  end

  defp update_superhero_health(state, superhero_id) do
    health_reduction = :rand.uniform(11) + 4
    timestamp = System.system_time(:second)

    Map.update!(state.data.superheroes, superhero_id, fn hero ->
      new_health = hero.health - health_reduction
      log_fight_result(hero.name, health_reduction, new_health, state.city_name)
      %{hero | health: max(new_health, 0), last_updated: timestamp}
    end)
  end

  defp superhero_defeated?(hero) do
    hero.health <= 0
  end

  defp remove_defeated_superhero(state, updated_heroes, superhero_id) do
    updated_heroes = Map.delete(updated_heroes, superhero_id)
    updated_tombstones = Map.put(state.data.tombstones, superhero_id, true)

    update_state(state, updated_heroes, updated_tombstones)
  end

  defp update_state(state, superheroes, tombstones) do
    %{state | data: %{superheroes: superheroes, tombstones: tombstones}}
  end

  defp log_fight_result(superhero_name, health_reduction, new_health, city_name) do
    if new_health <= 0 do
      Logger.info("Superhero #{superhero_name} lost the fight and was removed from #{city_name}")
    else
      Logger.info(
        "Superhero #{superhero_name} fought a villain and lost #{health_reduction} health points. Remaining health: #{new_health}"
      )
    end
  end
end
  • schedule_fight: This function ensures a fight is scheduled every five seconds, maintaining continuous action within the city.

  • handle_info(:fight_villain, state): This handles the actual fighting logic, updating the state after each encounter and scheduling the next battle.

  • fight_villain: Retrieves patrolling superheroes and initiates a fight with a randomly selected superhero. The function handles health updates and determines if a superhero is defeated, removing them if necessary.

Bombing the City

Another process can bomb the city, which immediately crashes the process, forcing it to restart from scratch by the supervisor with a clean state. As a result, all data within the city will be lost.

def handle_cast(:bomb_city, _from, state) do
    Logger.info("#{state.city_name} is being bombed!")
    exit(:boom)
  end

It is important to note that when a process restarts, it does not restart from the last known good state. If that logic is needed, it is handled outside of the supervision and restart strategy.

Keeping Data Up to Date

The City process doesn’t just hold its state; it holds all relevant state data, selectively extracting information as needed. Its state is updated from external sources through :assign_superhero and :update_superheroes. In this way, the City process operates autonomously, indifferent to events or the existence of other cities.

Conclusion

In the world of distributed systems, each City process stands as an island. They are self-sufficient, managing their state and duties without entanglement. When a city falls, it is a local event, not a system-wide catastrophe. On unexpected terminiation, the city’s supervisor resurrects it with minimal disruption. These cities don’t need to know about each other to function; they just do, using CRDTs. The system doesn’t just survive failures; it expects and embraces them.