Tap tap tap-a-roo

I spent some time last week tightening up the Either DSL, but I also wanted to fix something that’s been bothering me.

Get the code for this blog

A Dishonest Return

It’s my BroadcastUpdate. I should name it something like BestEffortBroadcastUpdate, or maybe TapBroadcastUpdate.

defmodule MissionControl.Superhero.Notifiers.BroadcastUpdate do
  @behaviour Funx.Monad.Either.Dsl.Behaviour

  @impl true
  def run(superhero, _opts, _env) do
    notification =
      Ash.Notifier.Notification.new(
        MissionControl.Superhero,
        data: superhero,
        action: %{type: :update}
      )

    Phoenix.PubSub.broadcast(
      MissionControl.PubSub,
      "superhero:#{superhero.id}",
      %{
        topic: "superhero:#{superhero.id}",
        payload: notification
      }
    )

    superhero
  end
end

By ignoring the return of the broadcast and forwarding the input, I am asserting for the caller that the result of the effect does not matter.

But the broadcast can fail. I’m lying about the category it lives in.

Make the Contract Honest

defmodule MissionControl.Superhero.Notifiers.BroadcastUpdate do
  @behaviour Funx.Monad.Either.Dsl.Behaviour

  @impl true
  def run(superhero, _opts, _env) do
    notification =
      Ash.Notifier.Notification.new(
        MissionControl.Superhero,
        data: superhero,
        action: %{type: :update}
      )

    case Phoenix.PubSub.broadcast(
           MissionControl.PubSub,
           "superhero:#{superhero.id}",
           %{
             topic: "superhero:#{superhero.id}",
             payload: notification
           }
         ) do
      :ok ->
        {:ok, superhero}

      {:error, reason} ->
        {:error, reason}
    end
  end
end

It’s now a Kleisli function. Rather than Superhero to Superhero it’s Superhero to Either(Superhero). I’m being honest that broadcast can fail.

Ironically, I am fudging a bit here for the sake of the story. In practice broadcast is best-effort and always returns :ok. That makes this :error branch a dead path, which means I’m lying about lying.

Our logic still works:

 @impl true
  def change(changeset, _opts, _context) do
    Ash.Changeset.after_action(changeset, fn _changeset, assignment ->
      either assignment.superhero_id, as: :tuple do
        bind get_superhero(assignment)
        bind MissionControl.dispatch_superhero()
        map BroadcastUpdate
        map_left handle_dispatch_error(assignment)
        map fn _ -> assignment end
      end
    end)
  end

But that’s because we happen to be discarding whatever comes out of the success path and mapping back to assignment at the end. (Remember, it’s a requirement for an Ash change within the Assignment resource).

Let’s take a closer look:

either assignment.superhero_id, as: :tuple do
    bind get_superhero(assignment)
    bind MissionControl.dispatch_superhero()
    map BroadcastUpdate
end

Here, the result is Right(Right(superhero)), we have a nested Either, which isn’t what we want.

Instead, we need bind:

either assignment.superhero_id, as: :tuple do
    bind get_superhero(assignment)
    bind MissionControl.dispatch_superhero()
    bind BroadcastUpdate
end

Bind flattens, giving us Right(superhero).

This is why we tend to find bind logic named flatmap, such as Enum.flat_map/2.

Ignoring a Result

Elixir has Kernel.tap/2, which takes a value, applies the effect, and returns the original value.

Kernel.tap/2 works in simple pipelines, but for ours we’ll need a context-aware tap:

either assignment.superhero_id, as: :tuple do
    bind get_superhero(assignment)
    bind MissionControl.dispatch_superhero()
    tap BroadcastUpdate
end

Here, when the pipeline is in a success state (Right), it pauses to execute the given function and then deliberately ignores its result. If the pipeline is in an error state (Left), it skips the tap and nothing happens.

We are saying, “If the pipeline has succeeded so far, call this function to broadcast an update, but ignore the result and proceed with the value you currently have.”

Where We’ve Traveled

Here is our current DispatchSuperhero:

defmodule MissionControl.Assignment.Changes.DispatchSuperhero do
  use Ash.Resource.Change
  use Funx.Monad.Either

  import Funx.Foldable

  alias MissionControl.Superhero.Notifiers.BroadcastUpdate

  @impl true
  def change(changeset, _opts, _context) do
    Ash.Changeset.after_action(changeset, fn _changeset, assignment ->
      either assignment.superhero_id, as: :tuple do
        bind get_superhero(assignment)
        bind MissionControl.dispatch_superhero()
        tap BroadcastUpdate
        map_left handle_dispatch_error(assignment)
        map fn _ -> assignment end
      end
    end)
  end

  defp get_superhero(superhero_id, assignment) do
    either superhero_id do
      bind MissionControl.get_superhero()
      map_left format_superhero_error(assignment)
    end
  end

  defp handle_dispatch_error(error, assignment) do
    either assignment do
      bind MissionControl.update_assignment(%{status: :open})
    end
    |> fold_l(
      fn rollback_error -> aggregate_errors(error, rollback_error) end,
      fn _ -> error end
    )
  end

  defp format_superhero_error(%Ash.Error.Query.NotFound{}, assignment) do
    Ash.Error.Changes.InvalidChanges.exception(
      fields: [:superhero_id],
      message:
        "Cannot dispatch assignment: superhero with ID #{assignment.superhero_id} not found"
    )
  end

  defp format_superhero_error(error, _assignment), do: error

  defp aggregate_errors(%Ash.Error.Invalid{errors: e1}, %Ash.Error.Invalid{errors: e2}),
    do: %Ash.Error.Invalid{errors: e1 ++ e2}

  defp aggregate_errors(e1, e2),
    do: Ash.Error.to_error_class([e1, e2])
end

And here is where we started:

defmodule MissionControl.Assignment.Changes.DispatchSuperhero do
  use Ash.Resource.Change

  @impl true
  def change(changeset, _opts, _context) do
    Ash.Changeset.after_action(changeset, fn _changeset, assignment ->
      case MissionControl.get_superhero(assignment.superhero_id) do
        {:ok, %MissionControl.Superhero{} = superhero} ->
          case MissionControl.dispatch_superhero(superhero) do
            {:ok, updated_superhero} ->
              broadcast_update(MissionControl.Superhero, updated_superhero)
              {:ok, assignment}

            {:error, dispatch_error} ->
              case rollback_assignment(assignment) do
                {:ok, _} ->
                  {:error, dispatch_error}

                {:error, rollback_error} ->
                  {:error, aggregate_errors(dispatch_error, rollback_error)}
              end
          end

        {:error, %Ash.Error.Query.NotFound{}} ->
          error =
            Ash.Error.Changes.InvalidChanges.exception(
              fields: [:superhero_id],
              message:
                "Cannot dispatch assignment: superhero with ID #{assignment.superhero_id} not found"
            )

          case rollback_assignment(assignment) do
            {:ok, _} ->
              {:error, error}

            {:error, rollback_error} ->
              {:error, aggregate_errors(error, rollback_error)}
          end

        {:error, other_error} ->
          case rollback_assignment(assignment) do
            {:ok, _} ->
              {:error, other_error}

            {:error, rollback_error} ->
              {:error, aggregate_errors(other_error, rollback_error)}
          end
      end
    end)
  end

  defp rollback_assignment(assignment) do
    assignment
    |> Ash.Changeset.for_action(:update, %{status: :open})
    |> Ash.update()
  end

  defp aggregate_errors(%Ash.Error.Invalid{errors: e1}, %Ash.Error.Invalid{errors: e2}),
    do: %Ash.Error.Invalid{errors: e1 ++ e2}

  defp aggregate_errors(e1, e2),
    do: Ash.Error.to_error_class([e1, e2])

  defp broadcast_update(resource, record) do
    topic = "#{resource_prefix(resource)}:#{record.id}"

    notification =
      Ash.Notifier.Notification.new(resource,
        data: record,
        action: %{type: :update}
      )

    Phoenix.PubSub.broadcast(
      MissionControl.PubSub,
      topic,
      %{topic: topic, payload: notification}
    )
  end

  defp resource_prefix(MissionControl.Superhero), do: "superhero"
end

If we are working in a feature factory, we optimize for the fastest path to writing. Where we started is easy to write because it uses familiar syntax and localizes behavior.

But when we need to express complex and continually changing business logic, we’ll take additional time to refactor prioritizing readability. We need to be able to drop into the code and immediately understand the intended logic.

It’s not right versus wrong. It’s competing priorities.

Resources

Ash Framework

Learn how to use Ash Framework to build robust, maintainable Elixir applications. This book covers the principles and practices of domain-driven design with Ash's declarative approach.

Advanced Functional Programming with Elixir

Dive deeper into functional programming patterns and advanced Elixir techniques. Learn how to build robust, maintainable applications using functional programming principles.

Funx - Functional Programming for Elixir

A library of functional programming abstractions for Elixir, including monads, monoids, Eq, Ord, and more. Built as an ecosystem where learning is the priority from the start.