Phoenix Channels: Tracking presence
Systems that require human intervention, especially in response to emergencies, depend on having accurate information about who is currently available. Phoenix Presence
leverages Conflict-free Replicated Data Types (CRDTs) and the BEAM to provide a reliable and scalable presence tracking solution.
Dispatching Superheroes
A system designed to dispatch superheroes must accurately know which superheroes are available.
Superhero Presence
Create a presence module using Phoenix and add it to the application’s supervisor:
defmodule GameAppWeb.SuperheroPresence do
use Phoenix.Presence,
otp_app: :game_app,
pubsub_server: GameApp.PubSub
end
application.ex
children = [
GameAppWeb.SuperheroPresence,
]
Notifications Channel
Create a channel dedicated to notifying the superheroes:
defmodule GameAppWeb.NotificationsChannel do
use GameAppWeb, :channel
alias GameAppWeb.SuperheroPresence
alias Faker.Superhero
alias Phoenix.PubSub
def get_topic(recipient) do
"notifications:#{recipient}"
end
@impl true
def join("notifications:lobby", _payload, socket) do
user_name = Superhero.name()
socket = assign(socket, :user_name, user_name)
SuperheroPresence.track(socket, user_name, %{
online_at: inspect(System.system_time(:second)),
user_name: user_name
})
send(self(), :after_join)
{:ok, socket}
end
@impl true
def handle_info(:after_join, socket) do
push(socket, "user_info", %{user_name: socket.assigns.user_name})
presence_state = SuperheroPresence.list(socket)
push(socket, "presence_state", presence_state)
{:noreply, socket}
end
end
When a superhero connects via WebSocket, a new process is spawned to handle the connection. This process assigns a name, tracks their presence, and sends a message with the superhero’s name and returns a list of superheroes currently present in the system.
A channel can broadcast to all other present superheroes as well as all present superheroes:
@impl true
def handle_in("notify_others", %{"body" => body}, socket) do
timestamp = System.system_time(:second)
broadcast_from(socket, "notification", %{
"body" => body,
"timestamp" => timestamp
})
{:noreply, socket}
end
@impl true
def handle_in("notify_all", %{"body" => body}, socket) do
timestamp = System.system_time(:second)
broadcast(socket, "notification", %{
"body" => body,
"timestamp" => timestamp
})
{:noreply, socket}
end
Notification Component
Include a useEffect
hook to manage the connection to the notification channel and handle messages:
useEffect(() => {
const channel = socket.channel("notifications:lobby");
channelRef.current = channel;
const presence = new Presence(channel);
presence.onSync(() => {
setPresences(presence.list());
});
channel.join()
.receive("ok", () => {
console.log("Joined notifications successfully");
})
.receive("error", (resp) => {
console.error("Unable to join Notifications channel", resp);
});
channel.on("user_info", (payload) => {
setUserName(payload.user_name);
});
channel on("notification", (payload) => {
setNotifications((notifications) => [...notifications, payload].slice(-5));
});
return () => {
channel.leave();
};
}, []);
Open a tab, and you’ll see your superhero name, the presence of other superheroes, and incoming notifications. Open a second tab, and you’ll see a new superhero, and its presence will be updated on the first tab in real-time.
Notifying Between Superheroes
A channel can broadcast messages to all processes within its room but lacks information on the exact locations (PIDs) of these processes. Although presence
tracks which superheroes are available, it does not specify their locations. Therefore, if a superhero wants to send a message directly to another, a different strategy is required.
defmodule GameAppWeb.NotificationsChannel do
use GameAppWeb, :channel
alias GameAppWeb.SuperheroPresence
alias Faker.Superhero
alias Phoenix.PubSub
def topic(recipient) do
"notifications:#{recipient}"
end
@impl true
def join("notifications:lobby", _payload, socket) do
user_name = Superhero.name()
socket = assign(socket, :user_name, user_name)
PubSub.subscribe(GameApp.PubSub, NotificationsService.topic(user_name))
SuperheroPresence.track(socket, user_name, %{
online_at: inspect(System.system_time(:second)),
user_name: user_name
})
send(self(), :after_join)
{:ok, socket}
end
@impl true
def handle_info(%{event: "notification", payload: payload}, socket) do
push(socket, "notification", payload)
{:noreply, socket}
end
@impl true
def handle_in("notify_direct", %{"body" => body, "recipient" => recipient}, socket) do
timestamp = System.system_time(:second)
PubSub.broadcast(GameApp.PubSub, topic(recipient), %{
event: "notification",
payload: %{
"body" => body,
"timestamp" => timestamp
}
})
{:noreply, socket}
end
end
Upon joining, a superhero subscribes to a personalized topic dedicated to their notifications. This setup includes handling messages from that topic via handle_info
and processing incoming messages from the superhero’s WebSocket connection through handle_in
. Superheroes can now directly message other superheroes.
System Notifications
To facilitate our superhero dispatch system, we need a mechanism to generate and send targeted messages to individual superheroes. Below is a simple GenServer
:
defmodule GameApp.Services.NotificationsService do
use GenServer
alias GameAppWeb.SuperheroPresence
alias Phoenix.PubSub
require Logger
def topic(recipient) do
"notifications:#{recipient}"
end
def start_link(_) do
GenServer.start_link(__MODULE__, %{})
end
@impl true
def init(state) do
schedule_send_notification()
{:ok, state}
end
@impl true
def handle_info(:send_notification, state) do
send_random_notification()
schedule_send_notification()
{:noreply, state}
end
defp schedule_send_notification do
Process.send_after(self(), :send_notification, @interval)
end
defp send_random_notification do
presences are listed from "notifications:lobby", and usernames are extracted from the Map keys. If usernames are available, a random recipient is chosen:
usernames = Map.keys(presences)
timestamp = System.system_time(:second)
if length(usernames) > 0 do
recipient = Enum.random(usernames)
Logger.info("Sending notification to #{recipient}")
PubSub.broadcast(GameApp.PubSub, topic(recipient), %{
event: "notification",
payload: %{
"body" => "Message to #{recipient} from internal service",
"timestamp" => timestamp
}
})
end
end
end
This GenServer
needs to be added to the application’s supervisor application.ex
.
children = [
GameApp.Services.NotificationsService
]
Now, every 5 seconds, the notification service sends a message to itself, triggering send_random_notification
, which randomly selects an available superhero and sends them a message.
Conclusion
In a distributed system, it is critical to acknowledge any process might unexpectedly fail at any time. As a result, the system should minimize reliance on the precise location of a process. In the superhero dispatch system, a Channel
handles only its associated WebSocket connection, and Presence
tracks the active participants but remains agnostic about their locations. The task of tracking location is handled solely by PubSub
. This decouples process functionality from physical locations, allowing for dynamic responses and enhanced fault tolerance.