Elixir and Oban: For mission-critical tasks
When Tasks
are mission-critical and require features such as persistence, retry logic, and scheduling, or when they need to be executed in a specific order and are resource-intensive enough to potentially strain the system, a managed queuing system becomes necessary. This is especially true for operations such as batch data processing, scheduled communications, or video encoding. To effectively solve this problem, you’d need to implement:
- Persistence: Implement a persistence layer to store tasks and metadata.
- Retry Mechanism: Manage retries, including setting maximum attempts and strategies.
- Scheduling: Establish a scheduler to handle cron expressions.
- Monitoring and Management: Create an interface to view and manage the state of tasks.
- Scalability and Distribution: Ensure it operates across multiple nodes with load balancing to distribute tasks evenly.
- Event Subscriptions and Notifications: Integrate a PubSub system to communicate task events within the application.
- Fault Tolerance: Develop mechanisms to address and recover from node failures.
Or, you’d bring a package like Oban
.
Add Dependencies
Start by adding Oban to the list of dependencies in the mix.exs
file.
defp deps do
[
{:oban, "~> 2.17"}
]
end
- oban: A job processing library that utilizes PostgreSQL.
Remember to update your dependencies.
Migrate the Database
Oban uses PostgreSQL to store job information, so a migration is necessary to set up the database structures.
-
Generate a new migration:
mix ecto.gen.migration add_oban_jobs_table
-
Define the migration to set up and tear down Oban’s required database tables:
defmodule GameApp.Repo.Migrations.AddObanJobsTable do use Ecto.Migration def up, do: Oban.Migrations.up(version: 11) def down, do: Oban.Migrations.down(version: 11) end
Note: Always include
up
anddown
functions in migrations to ensure that changes can be safely reversed. -
Run the migrations in the development environment and reset the test database:
make migrate.dev
make reset.test
Configure Oban
Set up the configuration in config/config.exs
by specifying the repository, configuring job pruning, and defining job queues.
config :game_app, Oban,
repo: GameApp.Repo,
plugins: [Oban.Plugins.Pruner],
queues: [default: 10, long_jobs: 5]
And configure Oban in config/test.exs
config :game_app, Oban, testing: :inline
Add Oban to the Application Supervisors
Finally, ensure Oban is started with your application.ex
by adding it to the supervision tree and referencing the config.
def start(_type, _args) do
children = [
GameAppWeb.Telemetry,
GameApp.Repo,
{Oban, Application.fetch_env!(:game_app, Oban)},
{Phoenix.PubSub, name: GameApp.PubSub},
{Finch, name: GameApp.Finch},
GameAppWeb.Endpoint
]
end
Create a worker
In an Elixir application that uses Oban for background job processing, workers are crucial components that handle the tasks assigned to them.
defmodule GameApp.Workers.LongJobWorker do
use Oban.Worker, queue: :long_jobs, max_attempts: 3
@long_job_topic "long_job"
@impl true
def perform(%Oban.Job{args: args}) do
name = Map.get(args, "name", "Default Name")
broadcast_update(
:job_started,
"#{name} job started"
)
random_sleep_ms = :rand.uniform(9_000) + 1_000
Process.sleep(random_sleep_ms)
broadcast_update(
:job_completed,
"#{name} completed after #{random_sleep_ms} ms"
)
:ok
end
def topic do
@long_job_topic
end
defp broadcast_update(action, message) do
Phoenix.PubSub.broadcast(
GameApp.PubSub,
@long_job_topic,
{action, message}
)
end
end
Configuration Oban.Worker
- queue: The worker is assigned to the
long_jobs
queue. - max_attempts: On failure, the job will be attempted up to 3 times.
Job perform
def perform(%Oban.Job{args: args}) do
name = Map.get(args, "name", "Default Name")
broadcast_update(
:job_started,
"#{name} job started"
)
random_sleep_ms = :rand.uniform(9_000) + 1_000
Process.sleep(random_sleep_ms)
broadcast_update(
:job_completed,
"#{name} completed after #{random_sleep_ms} ms"
)
:ok
end
The GameApp.Workers.LongJobWorker
simulates long-running tasks by pausing for a random duration between 1 and 10 seconds. It uses the name
argument to generate readable status updates, which it broadcasts to the PubSub.
Triggering a Job with LiveView
defmodule GameAppWeb.LongJobLive.Index do
use GameAppWeb, :live_view
alias GameApp.Workers.LongJobWorker
alias Phoenix.PubSub
@impl true
def mount(_params, _session, socket) do
PubSub.subscribe(GameApp.PubSub, LongJobWorker.topic())
new_socket =
socket
|> assign(:job_results, [])
|> assign(:job_number, 0)
{:ok, new_socket}
end
@impl true
def handle_event("trigger_job", _params, socket) do
job_number = socket.assigns.job_number + 1
job_name = "Job #{job_number}"
LongJobWorker.new(%{
name: job_name
})
|> Oban.insert()
|> case do
{:ok, _job} ->
new_socket =
socket
|> assign(:job_number, job_number)
{:noreply, new_socket}
{:error, _reason} ->
new_socket =
socket
|> put_flash(:error, "Failed to trigger #{job_name}.")
{:noreply, new_socket}
end
end
@impl true
def handle_info({:job_started, message}, socket) do
new_socket =
socket
|> put_flash(:info, message)
{:noreply, new_socket}
end
@impl true
def handle_info({:job_completed, message}, socket) do
new_socket =
socket
|> update(:job_results, fn results ->
results ++ [message]
end)
|> put_flash(:info, message)
{:noreply, new_socket}
end
end
The client-side event trigger_job
increments the job count, constructs a unique job name, and then attempts to insert a new job into the Oban queue
- Success Updates the
:job_number
. - Failure: Notifies the user of the failure.
Upon mounting, the LiveView subscribes to theLongJobWorker.topic
and it handles the :job_started
and :job_completed
messages from the worker.
- Job Started: Notifies the user.
- Job Completed: Notifies the user and appends the
:job_results
list.