Using Ecto.Multi and Phoenix.PubSub to update Phoenix Liveview
Eric Sullivan
Tagged as ecto, pubsub, liveview, phoenix
The initial MVP of Severus is as platform to keep contact information up to date. If one user updates their phone number, their connections should see that update. In an ideal world, that update would be instantaneous, and not require a browser refresh. As Severus is a phoenix application, Livewiew made this requirement easy, but hooking it into Ecto took a little work.
Setup
I’ll create a new application and document the steps in this tutorial as distinct commits. The repo can be found on github
Note: These instructions use phoenix v1.6.6 and liveview v0.17.5
mix phx.new liveview_pubsub_demo
I also added support for authentication using mix phx.gen.auth
:
mix phx.gen.auth Accounts User users
mix deps.get
mix ecto.migrate
At this point, you should be able to start the app and Log in.
mix phx.server
A User Phone Number
For this demonstration, we’ll create a user_phone_number
, which is just a phone_number associated with a user. We can use the generators to get running quickly, but we’ll need to make a couple changes to ensure the user is being set correctly.
mix phx.gen.live ContactInfo UserPhoneNumber user_phone_number phone_number:string user_id:references:users
(Note: the table should have been called user_phone_numbers, but I didn’t notice this typo until several commits later)
You can see the migration correctly references the users table:
add :user_id, references(:users, on_delete: :nothing)
But we need to change the schema:
schema "user_phone_number" do
field :phone_number, :string
- field :user_id, :id
+ belongs_to :user, Spidersilk.Accounts.User
timestamps()
end
@doc false
def changeset(user_phone_number, attrs) do
user_phone_number
|> cast(attrs, [:phone_number])
- |> validate_required([:phone_number])
+ |> validate_required([:phone_number, :user_id])
end
Note that we don’t pass the user_id
in as attrs
. As that variable contains user-supplied data, we can’t trust it to enforce security. That means we’re responsible for setting the user before we get to the changeset, which requires a few more changes to the context and the associated tests, namely converting the arity of create_user_phone_number/1
to create_user_phone_number/2
:
- def create_user_phone_number(attrs \\ %{}) do
- %UserPhoneNumber{}
+ def create_user_phone_number(%UserPhoneNumber{} = user_phone_number, attrs \\ %{}) do
+ user_phone_number
|> UserPhoneNumber.changeset(attrs)
|> Repo.insert()
end
At this point we have one failure, and that’s because liveview isn’t quite hooked up to the current_user.
Adding the current_user to Liveview
Let’s adjust the router a little to require a user before accessing the user_phone_numbers
scope "/", LiveviewPubsubDemoWeb do
- pipe_through :browser
+ pipe_through [:browser, :require_authenticated_user]
live "/user_phone_number", UserPhoneNumberLive.Index, :index
live "/user_phone_number/new", UserPhoneNumberLive.Index, :new
live "/user_phone_number/:id/edit", UserPhoneNumberLive.Index, :edit
live "/user_phone_number/:id", UserPhoneNumberLive.Show, :show
live "/user_phone_number/:id/show/edit", UserPhoneNumberLive.Show, :edit
end
If you try to access the page you’ll now get a helpful error message:
Once you register and log in you should be able to see all the phone_numbers, but what we did is not sufficient for production usage. If that statement surprises you, please read Security considerations of the LiveView model.
The new on_mount/1
hook, as well as live_session/3
makes this easier. Lets adjust the router a bit more:
+ live_session :default, on_mount: LiveviewPubsubDemoWeb.UserAuthLive do
scope "/", LiveviewPubsubDemoWeb do
pipe_through [:browser, :require_authenticated_user]
live "/user_phone_number", UserPhoneNumberLive.Index, :index
live "/user_phone_number/new", UserPhoneNumberLive.Index, :new
live "/user_phone_number/:id/edit", UserPhoneNumberLive.Index, :edit
live "/user_phone_number/:id", UserPhoneNumberLive.Show, :show
live "/user_phone_number/:id/show/edit", UserPhoneNumberLive.Show, :edit
end
+ end
defmodule LiveviewPubsubDemoWeb.UserAuthLive do
import Phoenix.LiveView
alias LiveviewPubsubDemo.Accounts
alias LiveviewPubsubDemoWeb.Router.Helpers, as: Routes
def on_mount(:default, _params, %{"user_token" => user_token} = _session, socket) do
socket = assign_new(socket, :current_user, fn ->
user_token && Accounts.get_user_by_session_token(user_token)
end)
if socket.assigns.current_user do
{:cont, socket}
else
{:halt, redirect(socket, to: Routes.user_session_path(socket, :new))}
end
end
end
Now we can safely live_redirect, ensuring that the current_user is always verified. With this update we can fix the failing test:
Add this to the top of the UserPhoneNumberLiveTest
setup :register_and_log_in_user
In the UserPhoneNumberLive.Index
set the user in the new action:
defp apply_action(socket, :new, _params) do
socket
|> assign(:page_title, "New User phone number")
- |> assign(:user_phone_number, %UserPhoneNumber{})
+ |> assign(:user_phone_number, %UserPhoneNumber{user_id: socket.assigns.current_user.id})
end
And update the form_component:
defp save_user_phone_number(socket, :new, user_phone_number_params) do
- case ContactInfo.create_user_phone_number(user_phone_number_params) do
+ case ContactInfo.create_user_phone_number(socket.assigns.user_phone_number, user_phone_number_params) do
{:ok, _user_phone_number} ->
{:noreply,
socket
|> put_flash(:info, "User phone number created successfully")
|> push_redirect(to: socket.assigns.return_to)}
{:error, %Ecto.Changeset{} = changeset} ->
{:noreply, assign(socket, changeset: changeset)}
end
end
The tests should all work now, and our setup is complete. I am not scoping the records to the user or doing authorization, but we don’t need it to demonstrate how to integrate Ecto with Liveview. In practice you’ll need to limit the phone numbers to the current_user, and you’ll need to authorize each event. One approach I’ve used is to call bodyguard on each handle_event/3
callback.
The Ecto Publisher
UTC Timestamps
I switched to using UTC timestamps, so if you’re running this locally run mix ecto.reset
in config/config.exs
config :liveview_pubsub_demo, LiveviewPubsubDemo.Repo,
migration_timestamps: [type: :utc_datetime]
in lib/liveview_pubsub_demo/contact_info/user_phone_number.ex
+ @timestamps_opts type: :utc_datetime
schema "user_phone_number" do
Adding Ecto.Multi
I changed the update_user_phone_number/2
function from:
def update_user_phone_number(%UserPhoneNumber{} = user_phone_number, attrs) do
user_phone_number
|> UserPhoneNumber.changeset(attrs)
|> Repo.update()
end
to:
def update_user_phone_number(%UserPhoneNumber{} = user_phone_number, attrs) do
result =
Multi.new()
|> multi_update_user_phone_number(:user_phone_number, user_phone_number, attrs)
|> Repo.transaction()
case result do
{:ok, %{user_phone_number: user_phone_number}} ->
{:ok, user_phone_number}
{:error, :user_phone_number, changeset, _changes} ->
{:error, changeset}
end
end
def multi_update_user_phone_number(
multi,
id,
%UserPhoneNumber{} = user_phone_number,
attrs \\ %{}
) do
changeset = UserPhoneNumber.changeset(user_phone_number, attrs)
multi
|> Multi.update(id, changeset)
|> Multi.merge(fn changes ->
user_phone_number = changes[id]
Events.multi_create_user_event(
Multi.new(),
id,
%{
type: "user_phone_number.updated",
user_id: user_phone_number.user_id,
event_at: user_phone_number.updated_at,
data: Map.take(user_phone_number, [:id, :user_id, :phone_number])
},
[
"user-phone-number.updated.*",
"user-phone-number.updated.id:#{user_phone_number.id}",
"user-phone-number.updated.user-id:#{user_phone_number.user_id}",
"user-phone-number.*.id:#{user_phone_number.id}",
"user-phone-number.*.user-id:#{user_phone_number.user_id}",
"user-phone-number.*.*"
]
)
end)
end
The Event Context
That also required an Events context and schema:
defmodule LiveviewPubsubDemo.Events do
import Ecto.Changeset
alias Ecto.Multi
alias LiveviewPubsubDemo.Events.UserEvent
def multi_create_user_event(
multi,
id,
attrs,
topics \\ []
) do
changeset = %UserEvent{}
|> UserEvent.changeset(attrs)
multi
|> Multi.run({:user_event, id}, fn _repo, _changes ->
user_event = apply_action!(changeset, :create)
Enum.map(topics, fn topic ->
:ok = Phoenix.PubSub.broadcast(LiveviewPubsubDemo.PubSub, topic, user_event)
end)
{:ok, user_event}
end)
end
end
defmodule LiveviewPubsubDemo.Events.UserEvent do
use Ecto.Schema
import Ecto.Changeset
@primary_key false
embedded_schema do
field :type, :string
field :data, :map
field :event_at, :utc_datetime
field :user_id, :integer
end
@doc false
def changeset(user_event, attrs) do
user_event
|> cast(attrs, [:type, :data, :event_at, :user_id])
|> validate_required([:type, :data, :event_at, :user_id])
end
end
And finally a test:
test "update_user_phone_number/2 with valid data publishes events" do
Phoenix.PubSub.subscribe(LiveviewPubsubDemo.PubSub, "user-phone-number.*.*")
user_phone_number = user_phone_number_fixture()
update_attrs = %{phone_number: "some updated phone_number"}
assert {:ok, %UserPhoneNumber{} = user_phone_number} = ContactInfo.update_user_phone_number(user_phone_number, update_attrs)
user_event = %LiveviewPubsubDemo.Events.UserEvent{
data: %{
id: user_phone_number.id,
phone_number: "some updated phone_number",
user_id: user_phone_number.user_id
},
event_at: user_phone_number.updated_at,
type: "user_phone_number.updated",
user_id: user_phone_number.user_id
}
assert_receive ^user_event
end
There are quite a few changes here so I added a mini FAQ. Feel free to reach out if there are other decisions that are not intuitive.
Why UTC Timestamps?
When testing that the event was being published, the assert_receive was failing with a naive datetime for event_at
. I prefer UTC datetimes so I updated the timestamps to always be UTC. That required a change to how migrations are run, as well as a notation in the schema. In a typical app I’d created a custom schema and use that instead of use Ecto.Schema
Why Ecto.Multi?
I based this on the Transactional Outbox Pattern. In fact, I typically write the UserEvent
to the database and publish it, so transactions are ideal. I’ve also used the same approach to push events to Kafka or another message broker. I’ve also found that I re-use the functions in different contexts (delete the phone number and the user). So being able to pass a multi into the function makes these composable. The other oddity is the id
variable. In Ecto.Multi
, all operations take a name as the first parameter. Sometimes I have a need to operate on several records of the same kind (update multiple phone numbers), in that case I can pass in a tuple as the name:
|> multi_update_user_phone_number(:user_phone_number, user_phone_number, attrs)
vs:
|> multi_update_user_phone_number({:user_phone_number, user_phone_number.id}, user_phone_number, attrs)
Are the topic names special?
Sort of… Phoenix.PubSub doesn’t care about the *
, but that doesn’t mean we can’t still pattern match on it. Generally the topics are model.action.some-id:###
. I also replace the actions and ids with a *
so you can subscribe to the wildcard version, which we’ll see in the next section.
Why did you only test user-phone-number.*.*
To make this blog post shorter :)
In my production codebase I don’t actually like hooking into pubsub, as it couples the code in a way that’s harder to test. If you get a chance to see that you’ll notice that I have a PubSubMock (using mox). I check that the mock broadcast/4
method is called from the context. That also makes testing the liveviews easier, as you can turn off PubSub as desired.
embedded_schema?
This is a personal preference. I really like that I can use it in place of a struct, but it’s not necessary.
This won’t work
Did you skip ahead? Ok, we’ll revisit this, but I’m surprised you noticed. Liveview is a little too fast for this setup…
The Liveview Subscriber
I’ll only cover updating the index here, but the same principle applies to the show and edit actions (you can subscribe to the specific topic, like user-phone-number.*.id:#{socket.assigns.user_phone_number.id}
).
def mount(_params, _session, socket) do
+ if connected?(socket) do
+ [
+ "user-phone-number.*.user-id:#{socket.assigns.current_user.id}",
+ ]
+ |> Enum.each(&Phoenix.PubSub.subscribe(LiveviewPubsubDemo.PubSub, &1))
+ end
{:ok, assign(socket, :user_phone_number_collection, list_user_phone_number())}
end
@impl true
def handle_info(
%LiveviewPubsubDemo.Events.UserEvent{type: "user_phone_number.updated"} =
_user_event,
socket
) do
socket = socket
|> assign(:user_phone_number_collection, list_user_phone_number())
{:noreply, socket}
end
This approach doesn’t do anything special to update the list, it just reloads the data when it changes. The default generator uses modals for the new
and edit
actions, which means you must handle the update event even if the action isn’t index
.
The test is pretty simple:
test "handles 'user_phone_number.updated' event", %{conn: conn, user_phone_number: user_phone_number} do
{:ok, index_live, _html} = live(conn, Routes.user_phone_number_index_path(conn, :index))
refute has_element?(index_live, "td", "NEW_NUMBER")
assert {:ok, %LiveviewPubsubDemo.ContactInfo.UserPhoneNumber{} = _user_phone_number} =
LiveviewPubsubDemo.ContactInfo.update_user_phone_number(
user_phone_number,
%{phone_number: "NEW_NUMBER"}
)
assert has_element?(index_live, "td", "NEW_NUMBER")
end
I packaged these as a single commit. The test works, but you’ll notice some odd behaviour if you open up two browsers to the index and edit a record. Depending on your computer, it might work. Or, you’ll have to update it a second time, and you’ll see the second browser show the data from the first update, as captured below.
Remember when I mentioned Liveview was too fast? The problem is we’re sending the pubsub event inside a database transaction. The process that is running Ecto.Multi
(the window on the left) hands the event to the PubSub process, and attempts to commit the transaction. At the same time, the second Liveview process (the window on the right) receives the event and tries to retrieve the results from the database. Sometime it’s there, and sometimes the original transaction hasn’t been committed. In that case, the Liveview doesn’t see any changes and our browser doesn’t update. But, when we update the record a second time, the Liveview can always retrieve the data from the initial transaction. We don’t notice this in out test suite due to the database sandbox running in a single transaction.
The (delayed) Ecto Publisher
The solution I use is to gather all the events, and then send them after the transaction has been successfully committed. We can pattern match on the Ecto.Multi
results {:user_event, _}
and create a custom transaction method.
Change the Repo to add the following methods:
defmodule LiveviewPubsubDemo.Repo do
use Ecto.Repo,
otp_app: :liveview_pubsub_demo,
adapter: Ecto.Adapters.Postgres
def broadcast_transaction(multi) do
multi
|> transaction()
|> broadcast_user_events()
end
defp broadcast_user_events({:error, _error, _changeset, _changes} = result), do: result
defp broadcast_user_events({:ok, data} = result) do
data
|> Map.to_list()
|> Enum.map(fn
{{:user_event, _}, user_event_tuple} ->
user_event_tuple
_ ->
nil
end)
|> Enum.filter(& &1)
|> Enum.each(fn user_event_tuple ->
broadcast_user_event(user_event_tuple)
end)
result
end
defp broadcast_user_event({user_event, topics}) do
Enum.each(topics, fn topic ->
Phoenix.PubSub.broadcast(LiveviewPubsubDemo.PubSub, topic, user_event)
end)
end
end
Update our event model to store the event instead of publishing it:
def multi_create_user_event(
multi,
id,
attrs,
topics \\ []
) do
changeset = %UserEvent{}
|> UserEvent.changeset(attrs)
multi
|> Multi.run({:user_event, id}, fn _repo, _changes ->
user_event = apply_action!(changeset, :create)
- Enum.map(topics, fn topic ->
- :ok = Phoenix.PubSub.broadcast(LiveviewPubsubDemo.PubSub, topic, user_event)
- end)
-
- {:ok, user_event}
+ {:ok, {user_event, topics}}
end)
end
Finally, change the context to call the broadcast_transaction/1
def update_user_phone_number(%UserPhoneNumber{} = user_phone_number, attrs) do
result =
Multi.new()
|> multi_update_user_phone_number(:user_phone_number, user_phone_number, attrs)
- |> Repo.transaction()
+ |> Repo.broadcast_transaction()
case result do
{:ok, %{user_phone_number: user_phone_number}} ->
{:ok, user_phone_number}
{:error, :user_phone_number, changeset, _changes} ->
{:error, changeset}
end
end
Your tests should continue to work as before, but now the browser will update when the context is used to modify the records.
I haven’t added comments to this blog, so feel free to open a github issue with any questions. Thank you for reading