Systems that require human intervention, especially in response to emergencies, depend on having accurate information about who is currently available. Phoenix Presence leverages Conflict-free Replicated Data Types (CRDTs) and the BEAM to provide a reliable and scalable presence tracking solution.

See the code

Read the docs

Dispatching Superheroes

A system designed to dispatch superheroes must accurately know which superheroes are available.

Superhero Presence

Create a presence module using Phoenix and add it to the application’s supervisor:

defmodule GameAppWeb.SuperheroPresence do
  use Phoenix.Presence,
    otp_app: :game_app,
    pubsub_server: GameApp.PubSub
end

application.ex

children = [
  GameAppWeb.SuperheroPresence,
]

Notifications Channel

Create a channel dedicated to notifying the superheroes:

defmodule GameAppWeb.NotificationsChannel do
  use GameAppWeb, :channel
  alias GameAppWeb.SuperheroPresence
  alias Faker.Superhero
  alias Phoenix.PubSub

  def get_topic(recipient) do
    "notifications:#{recipient}"
  end

  @impl true
  def join("notifications:lobby", _payload, socket) do
    user_name = Superhero.name()
    socket = assign(socket, :user_name, user_name)

    SuperheroPresence.track(socket, user_name, %{
      online_at: inspect(System.system_time(:second)),
      user_name: user_name
    })

    send(self(), :after_join)
    {:ok, socket}
  end

  @impl true
  def handle_info(:after_join, socket) do
    push(socket, "user_info", %{user_name: socket.assigns.user_name})
    presence_state = SuperheroPresence.list(socket)
    push(socket, "presence_state", presence_state)

    {:noreply, socket}
  end
end

When a superhero connects via WebSocket, a new process is spawned to handle the connection. This process assigns a name, tracks their presence, and sends a message with the superhero’s name and returns a list of superheroes currently present in the system.

A channel can broadcast to all other present superheroes as well as all present superheroes:

 @impl true
  def handle_in("notify_others", %{"body" => body}, socket) do
    timestamp = System.system_time(:second)

    broadcast_from(socket, "notification", %{
      "body" => body,
      "timestamp" => timestamp
    })

    {:noreply, socket}
  end

  @impl true
  def handle_in("notify_all", %{"body" => body}, socket) do
    timestamp = System.system_time(:second)

    broadcast(socket, "notification", %{
      "body" => body,
      "timestamp" => timestamp
    })

    {:noreply, socket}
  end

Notification Component

Include a useEffect hook to manage the connection to the notification channel and handle messages:

See the entire component

  useEffect(() => {
    const channel = socket.channel("notifications:lobby");
    channelRef.current = channel;
    const presence = new Presence(channel);

    presence.onSync(() => {
      setPresences(presence.list());
    });

    channel.join()
      .receive("ok", () => {
        console.log("Joined notifications successfully");
      })
      .receive("error", (resp) => {
        console.error("Unable to join Notifications channel", resp);
      });

    channel.on("user_info", (payload) => {
      setUserName(payload.user_name);
    });

    channel on("notification", (payload) => {
      setNotifications((notifications) => [...notifications, payload].slice(-5));
    });

    return () => {
      channel.leave();
    };
  }, []);

Open a tab, and you’ll see your superhero name, the presence of other superheroes, and incoming notifications. Open a second tab, and you’ll see a new superhero, and its presence will be updated on the first tab in real-time.

Notifying Between Superheroes

A channel can broadcast messages to all processes within its room but lacks information on the exact locations (PIDs) of these processes. Although presence tracks which superheroes are available, it does not specify their locations. Therefore, if a superhero wants to send a message directly to another, a different strategy is required.

defmodule GameAppWeb.NotificationsChannel do
  use GameAppWeb, :channel
  alias GameAppWeb.SuperheroPresence
  alias Faker.Superhero
  alias Phoenix.PubSub

  def topic(recipient) do
    "notifications:#{recipient}"
  end

  @impl true
  def join("notifications:lobby", _payload, socket) do
    user_name = Superhero.name()
    socket = assign(socket, :user_name, user_name)

    PubSub.subscribe(GameApp.PubSub, NotificationsService.topic(user_name))

    SuperheroPresence.track(socket, user_name, %{
      online_at: inspect(System.system_time(:second)),
      user_name: user_name
    })

    send(self(), :after_join)
    {:ok, socket}
  end

  @impl true
  def handle_info(%{event: "notification", payload: payload}, socket) do
    push(socket, "notification", payload)
    {:noreply, socket}
  end

  @impl true
  def handle_in("notify_direct", %{"body" => body, "recipient" => recipient}, socket) do
    timestamp = System.system_time(:second)

    PubSub.broadcast(GameApp.PubSub, topic(recipient), %{
      event: "notification",
      payload: %{
        "body" => body,
        "timestamp" => timestamp
      }
    })

    {:noreply, socket}
  end
end

Upon joining, a superhero subscribes to a personalized topic dedicated to their notifications. This setup includes handling messages from that topic via handle_info and processing incoming messages from the superhero’s WebSocket connection through handle_in. Superheroes can now directly message other superheroes.

System Notifications

To facilitate our superhero dispatch system, we need a mechanism to generate and send targeted messages to individual superheroes. Below is a simple GenServer:

defmodule GameApp.Services.NotificationsService do
  use GenServer
  alias GameAppWeb.SuperheroPresence
  alias Phoenix.PubSub
  require Logger

  def topic(recipient) do
    "notifications:#{recipient}"
  end

  def start_link(_) do
    GenServer.start_link(__MODULE__, %{})
  end

  @impl true
  def init(state) do
    schedule_send_notification()
    {:ok, state}
  end

  @impl true
  def handle_info(:send_notification, state) do
    send_random_notification()
    schedule_send_notification()
    {:noreply, state}
  end

  defp schedule_send_notification do
    Process.send_after(self(), :send_notification, @interval)
  end

  defp send_random_notification do
    presences are listed from "notifications:lobby", and usernames are extracted from the Map keys. If usernames are available, a random recipient is chosen:

    usernames = Map.keys(presences)
    timestamp = System.system_time(:second)

    if length(usernames) > 0 do
      recipient = Enum.random(usernames)

      Logger.info("Sending notification to #{recipient}")

      PubSub.broadcast(GameApp.PubSub, topic(recipient), %{
        event: "notification",
        payload: %{
          "body" => "Message to #{recipient} from internal service",
          "timestamp" => timestamp
        }
      })
    end
  end
end

This GenServer needs to be added to the application’s supervisor application.ex.

children = [
  GameApp.Services.NotificationsService
]

Now, every 5 seconds, the notification service sends a message to itself, triggering send_random_notification, which randomly selects an available superhero and sends them a message.

Conclusion

In a distributed system, it is critical to acknowledge any process might unexpectedly fail at any time. As a result, the system should minimize reliance on the precise location of a process. In the superhero dispatch system, a Channel handles only its associated WebSocket connection, and Presence tracks the active participants but remains agnostic about their locations. The task of tracking location is handled solely by PubSub. This decouples process functionality from physical locations, allowing for dynamic responses and enhanced fault tolerance.