WebSockets facilitate real-time, bi-directional communication with low latency but introduce inherent challenges with concurrency. They lack guaranteed message delivery, where messages can arrive out of sequence or not at all. This can lead to inaccurate data presentations on the client side, which is reasonable in non-critical contexts but unacceptable in safety-sensitive environments where errors pose significant risks.

Adding acknowledgments in Phoenix Channels shifts the burden of maintaining data accuracy from the client to the server.

See the code

Read the docs

Acknowledgments

A Channel spawns processes for each WebSocket connection, which simplifies client acknowledgments.

Updates to the Superhero Channel

First, create an empty Map and add it to the Channel’s state tracking acknowledgments.

def join("superheroes:lobby", _payload, socket) do
    PubSub.subscribe(GameApp.PubSub, SuperheroController.topic())
    send(self(), :send_initial_superheroes)

    new_socket =
      socket
      |> assign(:superheroes, Superheroes.list_superheroes())
      |> assign(:awaiting_ack, %{})

    {:ok, new_socket}
end

Next, generate a unique message ID, add it to the :awaiting_ack Map in the process’s state, and include that ID in the message to the client. Also, send a message to itself after a timeout to check if the message has been acknowledged.

def handle_info(:send_initial_superheroes, socket) do
    superheroes = socket.assigns[:superheroes]
    message_id = Ecto.UUID.generate()

    new_socket =
      socket
      |> assign(:awaiting_ack, %{message_id => true})
      |> assign(:superheroes, superheroes)

    push(socket, "hydrate", %{
      superheroes: format_superheroes(superheroes),
      message_id: message_id
    })

    Process.send_after(self(), {:check_ack, message_id}, @ack_timeout)

    {:noreply, new_socket}
end

Next, watch for acknowledgment messages from the client and delete the message_id from the :awaiting_ack in the process’s state.

def handle_in("message_ack", %{"message_id" => message_id, "status" => status}, socket) do
    current_acks = socket.assigns[:awaiting_ack] || %{}
    new_acks = Map.delete(current_acks, message_id)

    new_socket = assign(socket, :awaiting_ack, new_acks)

    {:noreply, new_socket}
end

Finally, catch the :check_ack message and evaluate whether it has been acknowledged. If not, take action.

def handle_info({:check_ack, message_id}, socket) do
    if Map.has_key?(socket.assigns.awaiting_ack, message_id) do
      Logger.error("No acknowledgment received for message #{message_id}")
      {:stop, :no_ack_received, socket}
    else
      Logger.debug("Acknowledgment received for message #{message_id}")
      {:noreply, socket}
    end
end

In this implementation, if a client fails to acknowledge a message, we don’t determine the cause of the failure; we simply recognize that it failed and terminate the process and WebSocket connection. This is a “Let it crash” strategy, trusting that the supervisors will start a new process, resolving the transient error.

Updates to the React Superhero Component

Upon receiving a message, the component immediately sends back a message_ack to confirm receipt via the WebSocket connection.

const handleAck = (channel, message_id) => {
  channel.push("message_ack", {
    message_id,
    status: "Received"
  });
};

useEffect(() => {
  const channel = socket.channel("superheroes:lobby", {});

  channel.join()
    .receive("ok", resp => { console.log("Joined successfully", resp); })
    .receive("error", resp => { console.log("Unable to join", resp); });

  channel.on("hydrate", payload => {
    const { superheroes, message_id } = payload;

    setSuperheroes(superheroes);
    handleAck(channel, message_id);
  });

  return () => {
    channel.leave();
  };
}, []);

In this setup, the handleAck function is defined to push a message_ack message with the message_id and a status indicating that the message was received. This acknowledgment is triggered inside the useEffect hook, which also manages the channel’s lifecycle, joining on mount and leaving on cleanup.

Conclusion

WebSocket connections, by their very nature, exhibit unpredictable behavior that can compromise system stability. By inverting responsibility, the server becomes a proactive actor within the system, allowing it to manage, and if necessary, terminate client connections that threaten the integrity of operations. Such control is crucial for maintaining high standards in safety-critical environments, ensuring that the system remains robust and reliable.