When Tasks are mission-critical and require features such as persistence, retry logic, and scheduling, or when they need to be executed in a specific order and are resource-intensive enough to potentially strain the system, a managed queuing system becomes necessary. This is especially true for operations such as batch data processing, scheduled communications, or video encoding. To effectively solve this problem, you’d need to implement:

  • Persistence: Implement a persistence layer to store tasks and metadata.
  • Retry Mechanism: Manage retries, including setting maximum attempts and strategies.
  • Scheduling: Establish a scheduler to handle cron expressions.
  • Monitoring and Management: Create an interface to view and manage the state of tasks.
  • Scalability and Distribution: Ensure it operates across multiple nodes with load balancing to distribute tasks evenly.
  • Event Subscriptions and Notifications: Integrate a PubSub system to communicate task events within the application.
  • Fault Tolerance: Develop mechanisms to address and recover from node failures.

Or, you’d bring a package like Oban.

Add Dependencies

Start by adding Oban to the list of dependencies in the mix.exs file.

defp deps do
    [
      {:oban, "~> 2.17"}
    ]
end
  • oban: A job processing library that utilizes PostgreSQL.

Remember to update your dependencies.

Migrate the Database

Oban uses PostgreSQL to store job information, so a migration is necessary to set up the database structures.

  1. Generate a new migration:

     mix ecto.gen.migration add_oban_jobs_table
    
  2. Define the migration to set up and tear down Oban’s required database tables:

     defmodule GameApp.Repo.Migrations.AddObanJobsTable do
       use Ecto.Migration
    
       def up, do: Oban.Migrations.up(version: 11)
    
       def down, do: Oban.Migrations.down(version: 11)
     end
    

    Note: Always include up and down functions in migrations to ensure that changes can be safely reversed.

  3. Run the migrations in the development environment and reset the test database:

     make migrate.dev
    
     make reset.test
    

Configure Oban

Set up the configuration in config/config.exs by specifying the repository, configuring job pruning, and defining job queues.

config :game_app, Oban,
  repo: GameApp.Repo,
  plugins: [Oban.Plugins.Pruner],
  queues: [default: 10, long_jobs: 5]

And configure Oban in config/test.exs

config :game_app, Oban, testing: :inline

Add Oban to the Application Supervisors

Finally, ensure Oban is started with your application.ex by adding it to the supervision tree and referencing the config.

  def start(_type, _args) do
    children = [
      GameAppWeb.Telemetry,
      GameApp.Repo,
      {Oban, Application.fetch_env!(:game_app, Oban)},
      {Phoenix.PubSub, name: GameApp.PubSub},
      {Finch, name: GameApp.Finch},
      GameAppWeb.Endpoint
    ]
  end

Create a worker

In an Elixir application that uses Oban for background job processing, workers are crucial components that handle the tasks assigned to them.

View code

View docs

defmodule GameApp.Workers.LongJobWorker do
  use Oban.Worker, queue: :long_jobs, max_attempts: 3
  @long_job_topic "long_job"

  @impl true
  def perform(%Oban.Job{args: args}) do
    name = Map.get(args, "name", "Default Name")

    broadcast_update(
      :job_started,
      "#{name} job started"
    )

    random_sleep_ms = :rand.uniform(9_000) + 1_000
    Process.sleep(random_sleep_ms)

    broadcast_update(
      :job_completed,
      "#{name} completed after #{random_sleep_ms} ms"
    )

    :ok
  end

  def topic do
    @long_job_topic
  end

  defp broadcast_update(action, message) do
    Phoenix.PubSub.broadcast(
      GameApp.PubSub,
      @long_job_topic,
      {action, message}
    )
  end
end

Configuration Oban.Worker

  • queue: The worker is assigned to the long_jobs queue.
  • max_attempts: On failure, the job will be attempted up to 3 times.

Job perform

def perform(%Oban.Job{args: args}) do
    name = Map.get(args, "name", "Default Name")

    broadcast_update(
      :job_started,
      "#{name} job started"
    )

    random_sleep_ms = :rand.uniform(9_000) + 1_000
    Process.sleep(random_sleep_ms)

    broadcast_update(
      :job_completed,
      "#{name} completed after #{random_sleep_ms} ms"
    )

    :ok
  end

The GameApp.Workers.LongJobWorker simulates long-running tasks by pausing for a random duration between 1 and 10 seconds. It uses the name argument to generate readable status updates, which it broadcasts to the PubSub.

Triggering a Job with LiveView

View code

defmodule GameAppWeb.LongJobLive.Index do
  use GameAppWeb, :live_view
  alias GameApp.Workers.LongJobWorker
  alias Phoenix.PubSub

  @impl true
  def mount(_params, _session, socket) do
    PubSub.subscribe(GameApp.PubSub, LongJobWorker.topic())

    new_socket =
      socket
      |> assign(:job_results, [])
      |> assign(:job_number, 0)

    {:ok, new_socket}
  end

  @impl true
  def handle_event("trigger_job", _params, socket) do
    job_number = socket.assigns.job_number + 1

    job_name = "Job #{job_number}"

    LongJobWorker.new(%{
      name: job_name
    })
    |> Oban.insert()
    |> case do
      {:ok, _job} ->
        new_socket =
          socket
          |> assign(:job_number, job_number)

        {:noreply, new_socket}

      {:error, _reason} ->
        new_socket =
          socket
          |> put_flash(:error, "Failed to trigger #{job_name}.")

        {:noreply, new_socket}
    end
  end

  @impl true
  def handle_info({:job_started, message}, socket) do
    new_socket =
      socket
      |> put_flash(:info, message)

    {:noreply, new_socket}
  end

  @impl true
  def handle_info({:job_completed, message}, socket) do
    new_socket =
      socket
      |> update(:job_results, fn results ->
        results ++ [message]
      end)
      |> put_flash(:info, message)

    {:noreply, new_socket}
  end
end

The client-side event trigger_job increments the job count, constructs a unique job name, and then attempts to insert a new job into the Oban queue

  • Success Updates the :job_number.
  • Failure: Notifies the user of the failure.

Upon mounting, the LiveView subscribes to theLongJobWorker.topic and it handles the :job_started and :job_completed messages from the worker.

  • Job Started: Notifies the user.
  • Job Completed: Notifies the user and appends the :job_results list.