Using Ecto.Multi and Phoenix.PubSub to update Phoenix Liveview

Eric Sullivan

Tagged as ecto, pubsub, liveview, phoenix

The initial MVP of Severus is as platform to keep contact information up to date. If one user updates their phone number, their connections should see that update. In an ideal world, that update would be instantaneous, and not require a browser refresh. As Severus is a phoenix application, Livewiew made this requirement easy, but hooking it into Ecto took a little work.

Setup

I’ll create a new application and document the steps in this tutorial as distinct commits. The repo can be found on github

Note: These instructions use phoenix v1.6.6 and liveview v0.17.5

mix phx.new liveview_pubsub_demo

I also added support for authentication using mix phx.gen.auth:

mix phx.gen.auth Accounts User users
mix deps.get
mix ecto.migrate

commit

At this point, you should be able to start the app and Log in.

mix phx.server

A User Phone Number

For this demonstration, we’ll create a user_phone_number, which is just a phone_number associated with a user. We can use the generators to get running quickly, but we’ll need to make a couple changes to ensure the user is being set correctly.

mix phx.gen.live ContactInfo UserPhoneNumber user_phone_number phone_number:string user_id:references:users

commit

(Note: the table should have been called user_phone_numbers, but I didn’t notice this typo until several commits later)

You can see the migration correctly references the users table:

add :user_id, references(:users, on_delete: :nothing)

But we need to change the schema:

  schema "user_phone_number" do
    field :phone_number, :string
-   field :user_id, :id
+   belongs_to :user, Spidersilk.Accounts.User
    timestamps()
  end

  @doc false
  def changeset(user_phone_number, attrs) do
    user_phone_number
    |> cast(attrs, [:phone_number])
-   |> validate_required([:phone_number])
+   |> validate_required([:phone_number, :user_id])
  end

Note that we don’t pass the user_id in as attrs. As that variable contains user-supplied data, we can’t trust it to enforce security. That means we’re responsible for setting the user before we get to the changeset, which requires a few more changes to the context and the associated tests, namely converting the arity of create_user_phone_number/1 to create_user_phone_number/2:

-  def create_user_phone_number(attrs \\ %{}) do
-    %UserPhoneNumber{}
+  def create_user_phone_number(%UserPhoneNumber{} = user_phone_number, attrs \\ %{}) do
+   user_phone_number
    |> UserPhoneNumber.changeset(attrs)
    |> Repo.insert()
  end

commit

At this point we have one failure, and that’s because liveview isn’t quite hooked up to the current_user.

Adding the current_user to Liveview

Let’s adjust the router a little to require a user before accessing the user_phone_numbers

  scope "/", LiveviewPubsubDemoWeb do
-   pipe_through :browser
+   pipe_through [:browser, :require_authenticated_user]

    live "/user_phone_number", UserPhoneNumberLive.Index, :index
    live "/user_phone_number/new", UserPhoneNumberLive.Index, :new
    live "/user_phone_number/:id/edit", UserPhoneNumberLive.Index, :edit

    live "/user_phone_number/:id", UserPhoneNumberLive.Show, :show
    live "/user_phone_number/:id/show/edit", UserPhoneNumberLive.Show, :edit
  end

If you try to access the page you’ll now get a helpful error message:

Login

Once you register and log in you should be able to see all the phone_numbers, but what we did is not sufficient for production usage. If that statement surprises you, please read Security considerations of the LiveView model.

The new on_mount/1 hook, as well as live_session/3 makes this easier. Lets adjust the router a bit more:

+ live_session :default, on_mount: LiveviewPubsubDemoWeb.UserAuthLive do
    scope "/", LiveviewPubsubDemoWeb do
      pipe_through [:browser, :require_authenticated_user]

      live "/user_phone_number", UserPhoneNumberLive.Index, :index
      live "/user_phone_number/new", UserPhoneNumberLive.Index, :new
      live "/user_phone_number/:id/edit", UserPhoneNumberLive.Index, :edit

      live "/user_phone_number/:id", UserPhoneNumberLive.Show, :show
      live "/user_phone_number/:id/show/edit", UserPhoneNumberLive.Show, :edit
    end
+ end
defmodule LiveviewPubsubDemoWeb.UserAuthLive do
  import Phoenix.LiveView

  alias LiveviewPubsubDemo.Accounts
  alias LiveviewPubsubDemoWeb.Router.Helpers, as: Routes

  def on_mount(:default, _params, %{"user_token" => user_token} = _session, socket) do
    socket = assign_new(socket, :current_user, fn ->
      user_token && Accounts.get_user_by_session_token(user_token)
    end)

    if socket.assigns.current_user do
      {:cont, socket}
    else
      {:halt, redirect(socket, to: Routes.user_session_path(socket, :new))}
    end
  end
end

Now we can safely live_redirect, ensuring that the current_user is always verified. With this update we can fix the failing test:

Add this to the top of the UserPhoneNumberLiveTest

  setup :register_and_log_in_user

In the UserPhoneNumberLive.Index set the user in the new action:

  defp apply_action(socket, :new, _params) do
    socket
    |> assign(:page_title, "New User phone number")
-   |> assign(:user_phone_number, %UserPhoneNumber{})
+   |> assign(:user_phone_number, %UserPhoneNumber{user_id: socket.assigns.current_user.id})
  end

And update the form_component:

  defp save_user_phone_number(socket, :new, user_phone_number_params) do
-    case ContactInfo.create_user_phone_number(user_phone_number_params) do
+    case ContactInfo.create_user_phone_number(socket.assigns.user_phone_number, user_phone_number_params) do
      {:ok, _user_phone_number} ->
        {:noreply,
         socket
         |> put_flash(:info, "User phone number created successfully")
         |> push_redirect(to: socket.assigns.return_to)}

      {:error, %Ecto.Changeset{} = changeset} ->
        {:noreply, assign(socket, changeset: changeset)}
    end
  end

commit

The tests should all work now, and our setup is complete. I am not scoping the records to the user or doing authorization, but we don’t need it to demonstrate how to integrate Ecto with Liveview. In practice you’ll need to limit the phone numbers to the current_user, and you’ll need to authorize each event. One approach I’ve used is to call bodyguard on each handle_event/3 callback.

The Ecto Publisher

UTC Timestamps

I switched to using UTC timestamps, so if you’re running this locally run mix ecto.reset

in config/config.exs

config :liveview_pubsub_demo, LiveviewPubsubDemo.Repo,
  migration_timestamps: [type: :utc_datetime]

in lib/liveview_pubsub_demo/contact_info/user_phone_number.ex

+  @timestamps_opts type: :utc_datetime
   schema "user_phone_number" do

Adding Ecto.Multi

I changed the update_user_phone_number/2 function from:

  def update_user_phone_number(%UserPhoneNumber{} = user_phone_number, attrs) do
    user_phone_number
    |> UserPhoneNumber.changeset(attrs)
    |> Repo.update()
  end

to:

  def update_user_phone_number(%UserPhoneNumber{} = user_phone_number, attrs) do
    result =
      Multi.new()
      |> multi_update_user_phone_number(:user_phone_number, user_phone_number, attrs)
      |> Repo.transaction()

    case result do
      {:ok, %{user_phone_number: user_phone_number}} ->
        {:ok, user_phone_number}

      {:error, :user_phone_number, changeset, _changes} ->
        {:error, changeset}
    end
  end

  def multi_update_user_phone_number(
        multi,
        id,
        %UserPhoneNumber{} = user_phone_number,
        attrs \\ %{}
      ) do
    changeset = UserPhoneNumber.changeset(user_phone_number, attrs)

    multi
    |> Multi.update(id, changeset)
    |> Multi.merge(fn changes ->
      user_phone_number = changes[id]

      Events.multi_create_user_event(
        Multi.new(),
        id,
        %{
          type: "user_phone_number.updated",
          user_id: user_phone_number.user_id,
          event_at: user_phone_number.updated_at,
          data: Map.take(user_phone_number, [:id, :user_id, :phone_number])
        },
        [
          "user-phone-number.updated.*",
          "user-phone-number.updated.id:#{user_phone_number.id}",
          "user-phone-number.updated.user-id:#{user_phone_number.user_id}",
          "user-phone-number.*.id:#{user_phone_number.id}",
          "user-phone-number.*.user-id:#{user_phone_number.user_id}",
          "user-phone-number.*.*"
        ]
      )
    end)
  end

The Event Context

That also required an Events context and schema:

  defmodule LiveviewPubsubDemo.Events do
    import Ecto.Changeset

    alias Ecto.Multi
    alias LiveviewPubsubDemo.Events.UserEvent

    def multi_create_user_event(
          multi,
          id,
          attrs,
          topics \\ []
        ) do
      changeset = %UserEvent{}
      |> UserEvent.changeset(attrs)

      multi
      |> Multi.run({:user_event, id}, fn _repo, _changes ->
        user_event = apply_action!(changeset, :create)

        Enum.map(topics, fn topic ->
          :ok = Phoenix.PubSub.broadcast(LiveviewPubsubDemo.PubSub, topic, user_event)
        end)

        {:ok, user_event}
      end)
    end
  end
  defmodule LiveviewPubsubDemo.Events.UserEvent do
    use Ecto.Schema
    import Ecto.Changeset

    @primary_key false
    embedded_schema do
      field :type, :string
      field :data, :map
      field :event_at, :utc_datetime
      field :user_id, :integer
    end

    @doc false
    def changeset(user_event, attrs) do
      user_event
      |> cast(attrs, [:type, :data, :event_at, :user_id])
      |> validate_required([:type, :data, :event_at, :user_id])
    end
  end

And finally a test:

  test "update_user_phone_number/2 with valid data publishes events" do
    Phoenix.PubSub.subscribe(LiveviewPubsubDemo.PubSub, "user-phone-number.*.*")

    user_phone_number = user_phone_number_fixture()
    update_attrs = %{phone_number: "some updated phone_number"}

    assert {:ok, %UserPhoneNumber{} = user_phone_number} = ContactInfo.update_user_phone_number(user_phone_number, update_attrs)

    user_event = %LiveviewPubsubDemo.Events.UserEvent{
      data: %{
        id: user_phone_number.id,
        phone_number: "some updated phone_number",
        user_id: user_phone_number.user_id
      },
      event_at: user_phone_number.updated_at,
      type: "user_phone_number.updated",
      user_id: user_phone_number.user_id
    }

    assert_receive ^user_event
  end

commit

There are quite a few changes here so I added a mini FAQ. Feel free to reach out if there are other decisions that are not intuitive.

Why UTC Timestamps?

When testing that the event was being published, the assert_receive was failing with a naive datetime for event_at. I prefer UTC datetimes so I updated the timestamps to always be UTC. That required a change to how migrations are run, as well as a notation in the schema. In a typical app I’d created a custom schema and use that instead of use Ecto.Schema

Why Ecto.Multi?

I based this on the Transactional Outbox Pattern. In fact, I typically write the UserEvent to the database and publish it, so transactions are ideal. I’ve also used the same approach to push events to Kafka or another message broker. I’ve also found that I re-use the functions in different contexts (delete the phone number and the user). So being able to pass a multi into the function makes these composable. The other oddity is the id variable. In Ecto.Multi, all operations take a name as the first parameter. Sometimes I have a need to operate on several records of the same kind (update multiple phone numbers), in that case I can pass in a tuple as the name:

  |> multi_update_user_phone_number(:user_phone_number, user_phone_number, attrs)

vs:

  |> multi_update_user_phone_number({:user_phone_number, user_phone_number.id}, user_phone_number, attrs)

Are the topic names special?

Sort of… Phoenix.PubSub doesn’t care about the *, but that doesn’t mean we can’t still pattern match on it. Generally the topics are model.action.some-id:###. I also replace the actions and ids with a * so you can subscribe to the wildcard version, which we’ll see in the next section.

Why did you only test user-phone-number.*.*

To make this blog post shorter :)

In my production codebase I don’t actually like hooking into pubsub, as it couples the code in a way that’s harder to test. If you get a chance to see that you’ll notice that I have a PubSubMock (using mox). I check that the mock broadcast/4 method is called from the context. That also makes testing the liveviews easier, as you can turn off PubSub as desired.

embedded_schema?

This is a personal preference. I really like that I can use it in place of a struct, but it’s not necessary.

This won’t work

Did you skip ahead? Ok, we’ll revisit this, but I’m surprised you noticed. Liveview is a little too fast for this setup…

The Liveview Subscriber

I’ll only cover updating the index here, but the same principle applies to the show and edit actions (you can subscribe to the specific topic, like user-phone-number.*.id:#{socket.assigns.user_phone_number.id}).

  def mount(_params, _session, socket) do
+   if connected?(socket) do
+     [
+       "user-phone-number.*.user-id:#{socket.assigns.current_user.id}",
+     ]
+     |> Enum.each(&Phoenix.PubSub.subscribe(LiveviewPubsubDemo.PubSub, &1))
+   end

    {:ok, assign(socket, :user_phone_number_collection, list_user_phone_number())}
  end
  @impl true
  def handle_info(
        %LiveviewPubsubDemo.Events.UserEvent{type: "user_phone_number.updated"} =
          _user_event,
        socket
      ) do
    socket = socket
      |> assign(:user_phone_number_collection, list_user_phone_number())

    {:noreply, socket}
  end

This approach doesn’t do anything special to update the list, it just reloads the data when it changes. The default generator uses modals for the new and edit actions, which means you must handle the update event even if the action isn’t index.

The test is pretty simple:

    test "handles 'user_phone_number.updated' event", %{conn: conn, user_phone_number: user_phone_number} do
      {:ok, index_live, _html} = live(conn, Routes.user_phone_number_index_path(conn, :index))

      refute has_element?(index_live, "td", "NEW_NUMBER")

      assert {:ok, %LiveviewPubsubDemo.ContactInfo.UserPhoneNumber{} = _user_phone_number} =
               LiveviewPubsubDemo.ContactInfo.update_user_phone_number(
                 user_phone_number,
                 %{phone_number: "NEW_NUMBER"}
               )

      assert has_element?(index_live, "td", "NEW_NUMBER")
    end

I packaged these as a single commit. The test works, but you’ll notice some odd behaviour if you open up two browsers to the index and edit a record. Depending on your computer, it might work. Or, you’ll have to update it a second time, and you’ll see the second browser show the data from the first update, as captured below.

Too Fast

Remember when I mentioned Liveview was too fast? The problem is we’re sending the pubsub event inside a database transaction. The process that is running Ecto.Multi (the window on the left) hands the event to the PubSub process, and attempts to commit the transaction. At the same time, the second Liveview process (the window on the right) receives the event and tries to retrieve the results from the database. Sometime it’s there, and sometimes the original transaction hasn’t been committed. In that case, the Liveview doesn’t see any changes and our browser doesn’t update. But, when we update the record a second time, the Liveview can always retrieve the data from the initial transaction. We don’t notice this in out test suite due to the database sandbox running in a single transaction.

The (delayed) Ecto Publisher

The solution I use is to gather all the events, and then send them after the transaction has been successfully committed. We can pattern match on the Ecto.Multi results {:user_event, _} and create a custom transaction method.

Change the Repo to add the following methods:

defmodule LiveviewPubsubDemo.Repo do
  use Ecto.Repo,
    otp_app: :liveview_pubsub_demo,
    adapter: Ecto.Adapters.Postgres

  def broadcast_transaction(multi) do
    multi
    |> transaction()
    |> broadcast_user_events()
  end

  defp broadcast_user_events({:error, _error, _changeset, _changes} = result), do: result

  defp broadcast_user_events({:ok, data} = result) do
    data
    |> Map.to_list()
    |> Enum.map(fn
      {{:user_event, _}, user_event_tuple} ->
        user_event_tuple

      _ ->
        nil
    end)
    |> Enum.filter(& &1)
    |> Enum.each(fn user_event_tuple ->
      broadcast_user_event(user_event_tuple)
    end)

    result
  end

  defp broadcast_user_event({user_event, topics}) do
    Enum.each(topics, fn topic ->
      Phoenix.PubSub.broadcast(LiveviewPubsubDemo.PubSub, topic, user_event)
    end)
  end
end

Update our event model to store the event instead of publishing it:

def multi_create_user_event(
      multi,
      id,
      attrs,
      topics \\ []
    ) do
  changeset = %UserEvent{}
  |> UserEvent.changeset(attrs)

  multi
  |> Multi.run({:user_event, id}, fn _repo, _changes ->
    user_event = apply_action!(changeset, :create)

-   Enum.map(topics, fn topic ->
-     :ok = Phoenix.PubSub.broadcast(LiveviewPubsubDemo.PubSub, topic, user_event)
-   end)
-
-   {:ok, user_event}
+   {:ok, {user_event, topics}}
  end)
end

Finally, change the context to call the broadcast_transaction/1

  def update_user_phone_number(%UserPhoneNumber{} = user_phone_number, attrs) do
    result =
      Multi.new()
      |> multi_update_user_phone_number(:user_phone_number, user_phone_number, attrs)
-      |> Repo.transaction()
+      |> Repo.broadcast_transaction()

    case result do
      {:ok, %{user_phone_number: user_phone_number}} ->
        {:ok, user_phone_number}

      {:error, :user_phone_number, changeset, _changes} ->
        {:error, changeset}
    end
  end

commit

Your tests should continue to work as before, but now the browser will update when the context is used to modify the records.

Final

I haven’t added comments to this blog, so feel free to open a github issue with any questions. Thank you for reading