Phoenix Channels: Inverting responsibility
WebSockets facilitate real-time, bi-directional communication with low latency but introduce inherent challenges with concurrency. They lack guaranteed message delivery, where messages can arrive out of sequence or not at all. This can lead to inaccurate data presentations on the client side, which is reasonable in non-critical contexts but unacceptable in safety-sensitive environments where errors pose significant risks.
Adding acknowledgments in Phoenix Channels shifts the burden of maintaining data accuracy from the client to the server.
Acknowledgments
A Channel spawns processes for each WebSocket connection, which simplifies client acknowledgments.
Updates to the Superhero Channel
First, create an empty Map and add it to the Channel’s state tracking acknowledgments.
def join("superheroes:lobby", _payload, socket) do
PubSub.subscribe(GameApp.PubSub, SuperheroController.topic())
send(self(), :send_initial_superheroes)
new_socket =
socket
|> assign(:superheroes, Superheroes.list_superheroes())
|> assign(:awaiting_ack, %{})
{:ok, new_socket}
end
Next, generate a unique message ID, add it to the :awaiting_ack
Map in the process’s state, and include that ID in the message to the client. Also, send a message to itself after a timeout to check if the message has been acknowledged.
def handle_info(:send_initial_superheroes, socket) do
superheroes = socket.assigns[:superheroes]
message_id = Ecto.UUID.generate()
new_socket =
socket
|> assign(:awaiting_ack, %{message_id => true})
|> assign(:superheroes, superheroes)
push(socket, "hydrate", %{
superheroes: format_superheroes(superheroes),
message_id: message_id
})
Process.send_after(self(), {:check_ack, message_id}, @ack_timeout)
{:noreply, new_socket}
end
Next, watch for acknowledgment messages from the client and delete the message_id
from the :awaiting_ack
in the process’s state.
def handle_in("message_ack", %{"message_id" => message_id, "status" => status}, socket) do
current_acks = socket.assigns[:awaiting_ack] || %{}
new_acks = Map.delete(current_acks, message_id)
new_socket = assign(socket, :awaiting_ack, new_acks)
{:noreply, new_socket}
end
Finally, catch the :check_ack
message and evaluate whether it has been acknowledged. If not, take action.
def handle_info({:check_ack, message_id}, socket) do
if Map.has_key?(socket.assigns.awaiting_ack, message_id) do
Logger.error("No acknowledgment received for message #{message_id}")
{:stop, :no_ack_received, socket}
else
Logger.debug("Acknowledgment received for message #{message_id}")
{:noreply, socket}
end
end
In this implementation, if a client fails to acknowledge a message, we don’t determine the cause of the failure; we simply recognize that it failed and terminate the process and WebSocket connection. This is a “Let it crash” strategy, trusting that the supervisors will start a new process, resolving the transient error.
Updates to the React Superhero Component
Upon receiving a message, the component immediately sends back a message_ack
to confirm receipt via the WebSocket connection.
const handleAck = (channel, message_id) => {
channel.push("message_ack", {
message_id,
status: "Received"
});
};
useEffect(() => {
const channel = socket.channel("superheroes:lobby", {});
channel.join()
.receive("ok", resp => { console.log("Joined successfully", resp); })
.receive("error", resp => { console.log("Unable to join", resp); });
channel.on("hydrate", payload => {
const { superheroes, message_id } = payload;
setSuperheroes(superheroes);
handleAck(channel, message_id);
});
return () => {
channel.leave();
};
}, []);
In this setup, the handleAck
function is defined to push a message_ack
message with the message_id
and a status indicating that the message was received. This acknowledgment is triggered inside the useEffect
hook, which also manages the channel’s lifecycle, joining on mount and leaving on cleanup.
Conclusion
WebSocket connections, by their very nature, exhibit unpredictable behavior that can compromise system stability. By inverting responsibility, the server becomes a proactive actor within the system, allowing it to manage, and if necessary, terminate client connections that threaten the integrity of operations. Such control is crucial for maintaining high standards in safety-critical environments, ensuring that the system remains robust and reliable.