LiveView: Navigating concurrency in real-time applications
Software designed to solve real-life problems must effectively address the issue of concurrency, where the order of operations cannot be trusted due to the unpredictability of timing, distance between interactions, and the potential for missed or delayed messages. To prevent corruption of data or application state, it is crucial to manage concurrency in a way that does not assume a particular order of operations.
LiveView ensures synchronization between the client and server using a persistent WebSocket connection, which transmits diff packages to update changes and minimize discrepancies. It guarantees reliable message delivery through an acknowledgment system and implements state reconciliation to synchronize the current state upon reconnection. Also, LiveView continuously monitors the connection status and queues any changes made on the client side when offline, maintaining consistent state under unstable conditions.
Handling Multiple LiveViews
A webpage can host multiple LiveViews, each employing a unique channel within Phoenix’s Channels to multiplex and ensure data and messages are appropriately routed. This provides isolation between components, ensuring resilience.
Process Efficiency
Programming environments like Java manage WebSocket connections using threads, often leading to high resource usage, scalability challenges, and complex concurrency management. In contrast, LiveView leverages Erlang’s lightweight process model, which is inherently designed for minimal resource consumption and efficient scalability. Erlang’s architecture effectively manages concurrency with isolated processes, thus enhancing system reliability and performance. Unlike Java, where reducing WebSocket connections often requires intricate client-side logic, Elixir’s streamlined approach simplifies client application development, reducing complexity and improving maintainability.
Passing Data Between Views
In the GameApp, each browser tab that a user opens to view a list of players is managed by a gen_server
process. Additionally, each tab includes a child process that manages a form and communicates updates to its parent via message passing. Opening two browser tabs, therefore, results in four processes, and three tabs result in six processes. These child processes communicate updates exclusively to their respective parent lists, and there is no direct inter-tab communication. To ensure the lists across different tabs remain synchronized, an independent process is required to manage a consistent state.
PubSub
Elixir’s PubSub
extends gen_server
behaviors, ensuring atomic and consistent updates to its internal state. This state contains entries for each topic, including lists of subscribers associated with them. When a process subscribes to a topic, the PubSub
updates its state to record this new subscription. This update is handled via the handle_call
function, a mechanism that processes requests synchronously, ensuring that state changes are reliable and immediate.
For broadcasting messages, the PubSub
uses the handle_cast
callback. This function is triggered when a new message is published to a topic. It retrieves the list of subscribers from its state and sends out the message in a non-blocking, asynchronous manner. Subscribers receive these messages through the handle_info
callback, which they use to update their internal states based on the message content.
In case of a process failure, the gen_server
’s built-in fault tolerance comes into play. The process can be restarted by its supervisor, which minimizes downtime and ensures a quick restoration of the PubSub
to its prior state, maintaining the integrity and continuity of message distribution.
Managing the update_player
Topic
Subscribing to the update_player
topic
Upon mounting, the GameAppWeb.PlayerLive.Index
LiveView subscribes to the update_player
topic. This subscription is handled through the Phoenix PubSub system, which automatically creates the topic if it does not already exist.
defmodule GameAppWeb.PlayerLive.Index do
use GameAppWeb, :live_view
alias GameApp.Accounts
alias Phoenix.PubSub
@update_player_pub_topic "update_player"
def mount(_params, _session, socket) do
PubSub.subscribe(GameApp.PubSub, @update_player_pub_topic)
{:ok, stream(socket, :players, Accounts.list_players())}
end
end
Broadcasting to the update_player
topic
To update other processes about changes, the broadcast_update/2
function sends a message to the update_player
topic. This message includes a tuple describing the action taken on a player.
defp broadcast_update(action, player) do
Phoenix.PubSub.broadcast(
GameApp.PubSub,
@update_player_pub_topic,
{action, player}
)
end
Handling Delete Events
A delete event in the handle_event/3
function processes three distinct scenarios: successful deletion of the player
, failure to delete the player
, and inability to find the player
.
-
Success: When the player is successfully deleted, a message is broadcasted to the
update_player
topic. This message notifies all subscribed processes that a player has been removed, prompting these processes to update their internal states accordingly. -
Failure: Currently, if the deletion attempt fails, the function alerts the user with a flash message, indicating that the deletion could not be processed correctly. This method makes the user aware of the failure but does not resolve the underlying issue. Future refactors should look to identify and resolve the error, or “Let it crash”, halting the process and letting the supervisor recover.
-
Player Not Found: The third code path addresses situations where the player cannot be found, likely because they have already been removed by another process. In the GameApp, deletion events are intentionally designed to be idempotent, which simplifies the inherent concurrency problem by allowing them to be replayed multiple times without adverse effects. If the player is not found, the process deliberately forwards the deletion message to the PubSub, ensuring consistent application behavior and state across all processes.
def handle_event("delete", %{"id" => id}, socket) do
case Accounts.get_player(id) do
{:ok, player} ->
case Accounts.delete_player(player) do
{:ok, _} ->
broadcast_update(:delete_player, player)
{:noreply, socket}
{:error, reason} ->
{:noreply, socket |> put_flash(:error, "Could not delete player: #{reason}")}
end
{:none} ->
broadcast_update(:delete_player, %Player{id: id})
{:noreply, socket}
end
end
Subscribing to Delete Events
The process is subscribed to the update_player
topic and must handle incoming messages efficiently. Messages originating from other processes are managed through the handle_info/2
function, which is tailored to process specific actions.
def handle_info({:delete_player, player}, socket) do
{:noreply, stream_delete(socket, :players, player)}
end
This function monitors for the :delete_player
action and updates the process’s internal state by removing the specified player from the list-view
. The idempotent nature of the delete_player
action ensures that repeated commands do not alter the application’s state, thereby preventing inconsistencies even if the delete command is issued multiple times.
Handling Update Events
When the form
component successfully saves a player, it sends a message to its parent list
process indicating the save success with the updated player. First, broadcast the update through the update_player
topic:
def handle_info({GameAppWeb.PlayerLive.FormComponent, {:saved, player}}, socket) do
broadcast_update(:update_player, player)
{:noreply, socket}
end
Receiving and Handling Broadcasts
Because the list process
is subscribed to the update_player
topic in the mount
function, it is listening for incoming update messages, we just need to specifically listen to the {:update_player, player}
tuple and update the internal state with this updated player information.
def handle_info({:update_player, player}, socket) do
{:noreply, stream_insert(socket, :players, player)}
end
Now, every time a list is updated, all lists are updated simultaneously. By reducing the delay in these updates, we decrease the likelihood of conflicting changes made by different users to the same data.
Did We Solve the Concurrency Problem?
While synchronizing updates across all user interfaces reduces the likelihood of conflicting changes, it depends on the order of message delivery and does not resolve the concurrency problem. To address this, we need to introspect the current state of each player and update only if the incoming message represents a more recent state.
First, we have a challenge with LiveStream
, LiveView’s logic that focuses on DOM updates by sending diffs to the client. LiveStream
is not introspectable, so if we need the current data, we need to save it to the processes state.
Updating the mount
Function
First, update the mount
function to include a map of current player values:
def mount(_params, _session, socket) do
PubSub.subscribe(GameApp.PubSub, @update_player_pub_topic)
players = Accounts.list_players()
players_map = Map.new(players, fn player -> {player.id, player} end)
new_socket =
socket
|> assign(:players, players_map)
|> stream(:players, players)
{:ok, new_socket}
end
Add get_latest/2
logic
Next, create a helper function that returns the most recent update between two states.
defp get_latest(existing_player, new_player) do
if NaiveDateTime.compare(existing_player.updated_at, new_player.updated_at) == :gt do
existing_player
else
new_player
end
end
Now, leverage get_latest/2
to ignore updates that are before the current player data.
def handle_info({:update_player, incoming_player}, socket) do
players = socket.assigns.players
deleted_player_ids = socket.assigns.deleted_player_ids
if MapSet.member?(deleted_player_ids, incoming_player.id) do
{:noreply, socket}
else
case Map.get(players, incoming_player.id) do
nil ->
updated_players = Map.put(players, incoming_player.id, incoming_player)
new_socket =
socket
|> assign(:players, updated_players)
|> stream_insert(:players, incoming_player)
{:noreply, new_socket}
existing_player ->
updated_player = get_latest(existing_player, incoming_player)
updated_players = Map.put(players, incoming_player.id, incoming_player)
new_socket =
socket
|> assign(:players, updated_players)
|> stream_insert(:players, updated_player)
{:noreply, new_socket}
end
end
end
When no player exists in the current state, it incorporates the incoming player. However, this poses a challenge: how do we distinguish between an update involving a newly added player and one where a player has been deleted?
Conflict-Free Replicated Data Types (CRDTs)
Conflict-Free Replicated Data Types (CRDTs) are a helpful strategy for managing concurrency problems.
Delete-Free Set (DF-Set)
A Delete-Free Set (DF-Set) is utilized to track hard-deleted players, ensuring that once a player is removed, their identifier remains in the set. This effectively prevents any future operations on that deleted player.
Storing Deleted Player IDs
First, update the mount
function to include a set specifically dedicated to holding deleted player IDs:
def mount(_params, _session, socket) do
PubSub.subscribe(GameApp.PubSub, @update_player_pub_topic)
players = Accounts.list_players()
players_map = Map.new(players, fn player -> {player.id, player} end)
new_socket =
socket
|> assign(:deleted_player_ids, MapSet.new())
|> assign(:players, players_map)
|> stream(:players, players)
{:ok, new_socket}
end
Handling Deletions
Next, update the deletion process so a deleted player is removed from the current player
state, and their ID is added to the deleted_player_ids
.
def handle_info({:delete_player, player}, socket) do
updated_players = Map.delete(socket.assigns.players, player.id)
deleted_players = MapSet.put(socket.assigns.deleted_player_ids, player.id)
new_socket =
socket
|> assign(:players, updated_players)
|> assign(:deleted_player_ids, deleted_players)
|> stream_delete(:players, player)
{:noreply, new_socket}
end
Guarding Against Updates for Deleted Players
Finally, to prevent updates to players who have been previously deleted, add a guard condition in the update_player
message handling to prevent updates on players known deleted players.
def handle_info({:update_player, new_player}, socket) do
players = socket.assigns.players
deleted_player_ids = socket.assigns.deleted_player_ids
if MapSet.member?(deleted_player_ids, new_player.id) do
{:noreply, socket}
else
case Map.get(players, new_player.id) do
nil ->
updated_players = Map.put(players, new_player.id, new_player)
new_socket =
socket
|> assign(:players, updated_players)
|> stream_insert(:players, new_player)
{:noreply, new_socket}
existing_player ->
updated_player = get_latest(existing_player, new_player)
updated_players = Map.put(players, new_player.id, updated_player)
new_socket =
socket
|> assign(:players, updated_players)
|> stream_insert(:players, updated_player)
{:noreply, new_socket}
end
end
end
Have We Finally Resolved Concurrency?
Although our approach mitigates several concurrency issues, it still relies on maintaining an accurate and comprehensive list of deleted players. To fully leverage the benefits of a DF-Set, we must ensure that this list is consistently synchronized across all instances and sessions, which requires careful consideration of performance impacts and synchronization strategies.