Elixir and Task: Managing asynchronous execution
“The real world is concurrent… When you send an email, you have no idea what’s going on at the other end. You don’t share your variables with them or your data tables. They just take the message and do something. That’s how the real world works.” – Joe Armstrong
In Erlang, asynchronous execution is not merely a feature; it is foundational to the language. Erlang employs the spawn
function to initiate an isolated process that executes a task, utilizes message passing to send the result back to the parent, and then terminates. The parent can monitor the lifecycle of its spawned process, enabling it to respond effectively to expected and unexpected termination events.
Much of Elixir’s value lies in making Erlang’s powerful but esoteric features accessible to a broader audience. The Task
module in Elixir is an example, wrapping Erlang’s spawn
with a more developer-friendly interface. When a Task
is initiated, it creates a new process that executes a given function and, upon completion, sends the result back to the parent process. The Task.await
function handles both the returned message as well as the monitoring logic, allowing developers to use typical patterns such as async/await
and try/rescue
.
Increment Worker
defmodule GameApp.Workers.IncrementWorker do
def sync(integer) when is_integer(integer) do
integer + 1
end
def async(integer) when is_integer(integer) do
Task.async(fn ->
Process.sleep(3_000)
sync(integer)
end)
end
end
The sync
function increments its input and directly returns the result. It’s an example of a pure function in Elixir, taking an input and producing an output without side effects.
The async
function is a Task
that spawns a new process to run sync
. The process waits three seconds, returns the results, and then terminates. It’s important to note that a process can fail for known or unknown reasons.
Using sync
in a LiveView
def handle_event("increment_click", _params, socket) do
new_value =
socket.assigns.increment_value
|> GameApp.Workers.IncrementWorker.sync()
new_socket =
socket
|> assign(:increment_value, new_value)
{:noreply, new_socket}
end
Here, the client-generated event increment_click
triggers the LiveView to employ the IncrementWorker
to update the :increment_value
. After obtaining the incremented value from IncrementWorker.sync()
, it updates the LiveView
state, a change reflected in real-time on the client.
Using async
in a LiveView
def handle_event("async_increment_click", _params, socket) do
new_value =
socket.assigns.increment_value
|> IncrementWorker.async()
|> Task.await()
new_socket =
socket
|> assign(:increment_value, new_value)
{:noreply, new_socket}
end
Here, we call IncrementWorker.async
and pipe Task.await
, which waits for the asynchronous operation to complete and then updates the :increment_value
.
Is it actually Asynchronous?
In JavaScript, asynchronous operations are executed by the event loop, which manages tasks in a queue, and processes them once the main thread is free. This structure allows the program to remain responsive by handling other tasks while waiting for asynchronous operations to complete.
In contrast, LiveView manages operations within a GenServer
process, which processes events sequentially, ensuring consistent and orderly execution. While this approach guarantees that each event is handled in order, it also means that long-running operations block the queue, affecting responsiveness.
JavaScript’s event loop can introduce subtle bugs through race conditions when the application’s state depends on the timing of these operations. Conversely, LiveView
eliminates race condition bugs, but can lead to an unresponsive client.
Real async
in a LiveView
def handle_event("real_async_increment_click", _params, socket) do
socket.assigns.increment_value
|> IncrementWorker.async()
{:noreply, socket}
end
Here, the client-generated event real_async_increment_click
triggers handle_event
, which is implemented via handle_call
, the GenServer
’s synchronous blocking call. Instead of waiting for IncrementWorker.async
to complete, it immediately returns the socket state, allowing the application to continue.
Task
implements monitor
behind the scenes, which means the Task
will send a message to this LiveView
when it terminates. If the LiveView
process does not handle the message, it will crash.
def handle_info({:DOWN, _ref, :process, _pid, :normal}, socket) do
{:noreply, socket}
end
This handle_info
listens for :normal
terminations from any of its monitored processes and returns the socket, effectively ignoring them. Unexpected shutdowns will still fall through to be handled by the supervisor.
def handle_info({_pid, message}, socket) do
new_socket =
socket
|> assign(:increment_value, message)
|> put_flash(:info, "Increment success.")
{:noreply, new_socket}
end
This pattern matches to handle any message that includes a two-element tuple. It ignores the first value, which identifies the process that sent the message, and assigns the second value to :increment_value
. Additionally, it flashes an informational message to the user indicating that the process has completed.
Now that we have non-blocking async in our LiveView, we cannot depend on order, which means we are susceptible to race conditions.