Software designed to solve real-life problems must effectively address the issue of concurrency, where the order of operations cannot be trusted due to the unpredictability of timing, distance between interactions, and the potential for missed or delayed messages. To prevent corruption of data or application state, it is crucial to manage concurrency in a way that does not assume a particular order of operations.

LiveView ensures synchronization between the client and server using a persistent WebSocket connection, which transmits diff packages to update changes and minimize discrepancies. It guarantees reliable message delivery through an acknowledgment system and implements state reconciliation to synchronize the current state upon reconnection. Also, LiveView continuously monitors the connection status and queues any changes made on the client side when offline, maintaining consistent state under unstable conditions.

Code

Docs

Handling Multiple LiveViews

A webpage can host multiple LiveViews, each employing a unique channel within Phoenix’s Channels to multiplex and ensure data and messages are appropriately routed. This provides isolation between components, ensuring resilience.

Process Efficiency

Programming environments like Java manage WebSocket connections using threads, often leading to high resource usage, scalability challenges, and complex concurrency management. In contrast, LiveView leverages Erlang’s lightweight process model, which is inherently designed for minimal resource consumption and efficient scalability. Erlang’s architecture effectively manages concurrency with isolated processes, thus enhancing system reliability and performance. Unlike Java, where reducing WebSocket connections often requires intricate client-side logic, Elixir’s streamlined approach simplifies client application development, reducing complexity and improving maintainability.

Passing Data Between Views

In the GameApp, each browser tab that a user opens to view a list of players is managed by a gen_server process. Additionally, each tab includes a child process that manages a form and communicates updates to its parent via message passing. Opening two browser tabs, therefore, results in four processes, and three tabs result in six processes. These child processes communicate updates exclusively to their respective parent lists, and there is no direct inter-tab communication. To ensure the lists across different tabs remain synchronized, an independent process is required to manage a consistent state.

PubSub

Elixir’s PubSub extends gen_server behaviors, ensuring atomic and consistent updates to its internal state. This state contains entries for each topic, including lists of subscribers associated with them. When a process subscribes to a topic, the PubSub updates its state to record this new subscription. This update is handled via the handle_call function, a mechanism that processes requests synchronously, ensuring that state changes are reliable and immediate.

For broadcasting messages, the PubSub uses the handle_cast callback. This function is triggered when a new message is published to a topic. It retrieves the list of subscribers from its state and sends out the message in a non-blocking, asynchronous manner. Subscribers receive these messages through the handle_info callback, which they use to update their internal states based on the message content.

In case of a process failure, the gen_server’s built-in fault tolerance comes into play. The process can be restarted by its supervisor, which minimizes downtime and ensures a quick restoration of the PubSub to its prior state, maintaining the integrity and continuity of message distribution.

Managing the update_player Topic

Subscribing to the update_player topic

Upon mounting, the GameAppWeb.PlayerLive.Index LiveView subscribes to the update_player topic. This subscription is handled through the Phoenix PubSub system, which automatically creates the topic if it does not already exist.

defmodule GameAppWeb.PlayerLive.Index do
  use GameAppWeb, :live_view

  alias GameApp.Accounts
  alias Phoenix.PubSub

  @update_player_pub_topic "update_player"

  def mount(_params, _session, socket) do
    PubSub.subscribe(GameApp.PubSub, @update_player_pub_topic)
    {:ok, stream(socket, :players, Accounts.list_players())}
  end
end

Broadcasting to the update_player topic

To update other processes about changes, the broadcast_update/2 function sends a message to the update_player topic. This message includes a tuple describing the action taken on a player.

defp broadcast_update(action, player) do
  Phoenix.PubSub.broadcast(
    GameApp.PubSub,
    @update_player_pub_topic,
    {action, player}
  )
end

Handling Delete Events

A delete event in the handle_event/3 function processes three distinct scenarios: successful deletion of the player, failure to delete the player, and inability to find the player.

  1. Success: When the player is successfully deleted, a message is broadcasted to the update_player topic. This message notifies all subscribed processes that a player has been removed, prompting these processes to update their internal states accordingly.

  2. Failure: Currently, if the deletion attempt fails, the function alerts the user with a flash message, indicating that the deletion could not be processed correctly. This method makes the user aware of the failure but does not resolve the underlying issue. Future refactors should look to identify and resolve the error, or “Let it crash”, halting the process and letting the supervisor recover.

  3. Player Not Found: The third code path addresses situations where the player cannot be found, likely because they have already been removed by another process. In the GameApp, deletion events are intentionally designed to be idempotent, which simplifies the inherent concurrency problem by allowing them to be replayed multiple times without adverse effects. If the player is not found, the process deliberately forwards the deletion message to the PubSub, ensuring consistent application behavior and state across all processes.

def handle_event("delete", %{"id" => id}, socket) do
  case Accounts.get_player(id) do
    {:ok, player} ->
      case Accounts.delete_player(player) do
        {:ok, _} ->
          broadcast_update(:delete_player, player)
          {:noreply, socket}

        {:error, reason} ->
          {:noreply, socket |> put_flash(:error, "Could not delete player: #{reason}")}
      end

    {:none} ->
      broadcast_update(:delete_player, %Player{id: id})
      {:noreply, socket}
  end
end

Subscribing to Delete Events

The process is subscribed to the update_player topic and must handle incoming messages efficiently. Messages originating from other processes are managed through the handle_info/2 function, which is tailored to process specific actions.

def handle_info({:delete_player, player}, socket) do
  {:noreply, stream_delete(socket, :players, player)}
end

This function monitors for the :delete_player action and updates the process’s internal state by removing the specified player from the list-view. The idempotent nature of the delete_player action ensures that repeated commands do not alter the application’s state, thereby preventing inconsistencies even if the delete command is issued multiple times.

Handling Update Events

When the form component successfully saves a player, it sends a message to its parent list process indicating the save success with the updated player. First, broadcast the update through the update_player topic:

def handle_info({GameAppWeb.PlayerLive.FormComponent, {:saved, player}}, socket) do
  broadcast_update(:update_player, player)
  {:noreply, socket}
end

Receiving and Handling Broadcasts

Because the list process is subscribed to the update_player topic in the mount function, it is listening for incoming update messages, we just need to specifically listen to the {:update_player, player} tuple and update the internal state with this updated player information.

def handle_info({:update_player, player}, socket) do
  {:noreply, stream_insert(socket, :players, player)}
end

Now, every time a list is updated, all lists are updated simultaneously. By reducing the delay in these updates, we decrease the likelihood of conflicting changes made by different users to the same data.

Did We Solve the Concurrency Problem?

While synchronizing updates across all user interfaces reduces the likelihood of conflicting changes, it depends on the order of message delivery and does not resolve the concurrency problem. To address this, we need to introspect the current state of each player and update only if the incoming message represents a more recent state.

First, we have a challenge with LiveStream, LiveView’s logic that focuses on DOM updates by sending diffs to the client. LiveStream is not introspectable, so if we need the current data, we need to save it to the processes state.

Updating the mount Function

First, update the mount function to include a map of current player values:

def mount(_params, _session, socket) do
  PubSub.subscribe(GameApp.PubSub, @update_player_pub_topic)
  players = Accounts.list_players()
  players_map = Map.new(players, fn player -> {player.id, player} end)

  new_socket =
    socket
    |> assign(:players, players_map)
    |> stream(:players, players)

  {:ok, new_socket}
end

Add get_latest/2 logic

Next, create a helper function that returns the most recent update between two states.

  defp get_latest(existing_player, new_player) do
    if NaiveDateTime.compare(existing_player.updated_at, new_player.updated_at) == :gt do
      existing_player
    else
      new_player
    end
  end

Now, leverage get_latest/2 to ignore updates that are before the current player data.

  def handle_info({:update_player, incoming_player}, socket) do
    players = socket.assigns.players
    deleted_player_ids = socket.assigns.deleted_player_ids

    if MapSet.member?(deleted_player_ids, incoming_player.id) do
      {:noreply, socket}
    else
      case Map.get(players, incoming_player.id) do
        nil ->
          updated_players = Map.put(players, incoming_player.id, incoming_player)

          new_socket =
            socket
            |> assign(:players, updated_players)
            |> stream_insert(:players, incoming_player)

          {:noreply, new_socket}

        existing_player ->
          updated_player = get_latest(existing_player, incoming_player)
          updated_players = Map.put(players, incoming_player.id, incoming_player)

          new_socket =
            socket
            |> assign(:players, updated_players)
            |> stream_insert(:players, updated_player)

          {:noreply, new_socket}
      end
    end
  end

When no player exists in the current state, it incorporates the incoming player. However, this poses a challenge: how do we distinguish between an update involving a newly added player and one where a player has been deleted?

Conflict-Free Replicated Data Types (CRDTs)

Conflict-Free Replicated Data Types (CRDTs) are a helpful strategy for managing concurrency problems.

Delete-Free Set (DF-Set)

A Delete-Free Set (DF-Set) is utilized to track hard-deleted players, ensuring that once a player is removed, their identifier remains in the set. This effectively prevents any future operations on that deleted player.

Storing Deleted Player IDs

First, update the mount function to include a set specifically dedicated to holding deleted player IDs:

def mount(_params, _session, socket) do
  PubSub.subscribe(GameApp.PubSub, @update_player_pub_topic)
  players = Accounts.list_players()
  players_map = Map.new(players, fn player -> {player.id, player} end)

  new_socket =
    socket
    |> assign(:deleted_player_ids, MapSet.new())
    |> assign(:players, players_map)
    |> stream(:players, players)

  {:ok, new_socket}
end

Handling Deletions

Next, update the deletion process so a deleted player is removed from the current player state, and their ID is added to the deleted_player_ids.

def handle_info({:delete_player, player}, socket) do
  updated_players = Map.delete(socket.assigns.players, player.id)
  deleted_players = MapSet.put(socket.assigns.deleted_player_ids, player.id)

  new_socket =
    socket
    |> assign(:players, updated_players)
    |> assign(:deleted_player_ids, deleted_players)
    |> stream_delete(:players, player)

  {:noreply, new_socket}
end

Guarding Against Updates for Deleted Players

Finally, to prevent updates to players who have been previously deleted, add a guard condition in the update_player message handling to prevent updates on players known deleted players.

def handle_info({:update_player, new_player}, socket) do
  players = socket.assigns.players
  deleted_player_ids = socket.assigns.deleted_player_ids

  if MapSet.member?(deleted_player_ids, new_player.id) do
    {:noreply, socket}
  else
    case Map.get(players, new_player.id) do
      nil ->
        updated_players = Map.put(players, new_player.id, new_player)
        new_socket =
          socket
          |> assign(:players, updated_players)
          |> stream_insert(:players, new_player)

        {:noreply, new_socket}

      existing_player ->
        updated_player = get_latest(existing_player, new_player)
        updated_players = Map.put(players, new_player.id, updated_player)
        new_socket =
          socket
          |> assign(:players, updated_players)
          |> stream_insert(:players, updated_player)

        {:noreply, new_socket}
    end
  end
end

Have We Finally Resolved Concurrency?

Although our approach mitigates several concurrency issues, it still relies on maintaining an accurate and comprehensive list of deleted players. To fully leverage the benefits of a DF-Set, we must ensure that this list is consistently synchronized across all instances and sessions, which requires careful consideration of performance impacts and synchronization strategies.