Skip to content

Commit 74e045b

Browse files
committed
change approach to make it reliant on the channel instead of waiting for pubsub
1 parent 6aa89b8 commit 74e045b

3 files changed

Lines changed: 56 additions & 39 deletions

File tree

config/config.exs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@ config :realtime,
1212
ecto_repos: [Realtime.Repo],
1313
version: Mix.Project.config()[:version],
1414
replication_watchdog_interval: :timer.minutes(5),
15-
replication_watchdog_timeout: :timer.minutes(1)
15+
replication_watchdog_timeout: :timer.minutes(1),
16+
replication_ready_timeout: :timer.minutes(1)
1617

1718
# Configures the endpoint
1819
config :realtime, RealtimeWeb.Endpoint,

lib/realtime_web/channels/realtime_channel.ex

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ defmodule RealtimeWeb.RealtimeChannel do
3030
alias RealtimeWeb.RealtimeChannel.Tracker
3131

3232
@confirm_token_ms_interval :timer.minutes(5)
33+
@replication_ready_check_interval 10
3334
@fullsweep_after Application.compile_env!(:realtime, :websocket_fullsweep_after)
3435

3536
@impl true
@@ -104,7 +105,6 @@ defmodule RealtimeWeb.RealtimeChannel do
104105

105106
RealtimeWeb.Endpoint.subscribe(tenant_topic, metadata: metadata)
106107
RealtimeWeb.Endpoint.subscribe("realtime:operations:" <> tenant_id, metadata: metadata)
107-
RealtimeWeb.Endpoint.subscribe(Connect.syn_topic(tenant_id))
108108
send(self(), :notify_replication_ready)
109109

110110
is_new_api = new_api?(params)
@@ -136,7 +136,8 @@ defmodule RealtimeWeb.RealtimeChannel do
136136
tenant_topic: tenant_topic,
137137
channel_name: sub_topic,
138138
presence_enabled?: presence_enabled?,
139-
replication_ready_notified?: false
139+
replication_ready_notified?: false,
140+
replication_ready_deadline: System.monotonic_time(:millisecond) + replication_ready_timeout()
140141
}
141142

142143
socket =
@@ -293,19 +294,26 @@ defmodule RealtimeWeb.RealtimeChannel do
293294
{:noreply, assign(socket, %{pg_sub_ref: pg_sub_ref})}
294295
end
295296

296-
def handle_info(:notify_replication_ready, %{assigns: %{tenant: tenant_id}} = socket) do
297-
case Connect.replication_status(tenant_id) do
298-
{:ok, _replication_conn} -> {:noreply, push_replication_ready(socket)}
299-
{:error, :not_connected} -> {:noreply, socket}
300-
end
297+
def handle_info(:notify_replication_ready, %{assigns: %{replication_ready_notified?: true}} = socket) do
298+
{:noreply, socket}
301299
end
302300

303-
def handle_info(%{event: "ready", payload: %{replication_conn: pid}}, socket)
304-
when is_pid(pid) do
305-
{:noreply, push_replication_ready(socket)}
306-
end
301+
def handle_info(:notify_replication_ready, socket) do
302+
%{assigns: %{tenant: tenant_id, channel_name: channel_name, replication_ready_deadline: deadline}} = socket
307303

308-
def handle_info(%{topic: "connect:" <> _}, socket), do: {:noreply, socket}
304+
cond do
305+
match?({:ok, _replication_conn}, Connect.replication_status(tenant_id)) ->
306+
push_system_message("broadcast", socket, "ok", "Replication connection established", channel_name)
307+
{:noreply, assign(socket, :replication_ready_notified?, true)}
308+
309+
System.monotonic_time(:millisecond) >= deadline ->
310+
shutdown_response(socket, "Replication connection was not established in time")
311+
312+
true ->
313+
Process.send_after(self(), :notify_replication_ready, @replication_ready_check_interval)
314+
{:noreply, socket}
315+
end
316+
end
309317

310318
def handle_info(_msg, %{assigns: %{policies: %Policies{broadcast: %BroadcastPolicies{read: false}}}} = socket) do
311319
Logger.warning("Broadcast message ignored")
@@ -740,12 +748,8 @@ defmodule RealtimeWeb.RealtimeChannel do
740748
{:stop, :normal, socket}
741749
end
742750

743-
defp push_replication_ready(%{assigns: %{replication_ready_notified?: true}} = socket), do: socket
744-
745-
defp push_replication_ready(%{assigns: %{tenant: tenant_id, channel_name: channel_name}} = socket) do
746-
RealtimeWeb.Endpoint.unsubscribe(Connect.syn_topic(tenant_id))
747-
push_system_message("broadcast", socket, "ok", "Replication connection established", channel_name)
748-
assign(socket, :replication_ready_notified?, true)
751+
defp replication_ready_timeout do
752+
Application.fetch_env!(:realtime, :replication_ready_timeout)
749753
end
750754

751755
defp push_system_message(extension, socket, status, error, channel_name)

test/realtime_web/channels/realtime_channel_replication_ready_test.exs

Lines changed: 32 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -23,57 +23,69 @@ defmodule RealtimeWeb.RealtimeChannelReplicationReadyTest do
2323
assert_receive %Socket.Message{event: "system", payload: %{message: "Replication connection established"}}, 500
2424
end
2525

26-
test "pushes the system message when the syn ready broadcast arrives after join", %{tenant: tenant} do
27-
stub(Connect, :lookup_or_start_connection, fn _ -> {:ok, self()} end)
28-
stub(Connect, :replication_status, fn _ -> {:error, :not_connected} end)
26+
test "pushes the system message once replication becomes ready while polling", %{tenant: tenant} do
27+
{:ok, counter} = Agent.start_link(fn -> 0 end)
2928

30-
assert {:ok, _, _} = join(tenant)
29+
stub(Connect, :lookup_or_start_connection, fn _ -> {:ok, self()} end)
3130

32-
refute_receive %Socket.Message{event: "system", payload: %{message: "Replication connection established"}}, 200
31+
stub(Connect, :replication_status, fn _ ->
32+
case Agent.get_and_update(counter, fn n -> {n, n + 1} end) do
33+
n when n < 3 -> {:error, :not_connected}
34+
_ -> {:ok, self()}
35+
end
36+
end)
3337

34-
signal_ready(tenant, self())
38+
assert {:ok, _, _} = join(tenant)
3539

3640
assert_receive %Socket.Message{event: "system", payload: %{message: "Replication connection established"}}, 500
3741
end
3842

39-
test "ignores syn ready broadcasts without a replication connection", %{tenant: tenant} do
43+
test "does not push while replication is unavailable", %{tenant: tenant} do
4044
stub(Connect, :lookup_or_start_connection, fn _ -> {:ok, self()} end)
4145
stub(Connect, :replication_status, fn _ -> {:error, :not_connected} end)
4246

4347
assert {:ok, _, _} = join(tenant)
4448

45-
signal_ready(tenant, nil)
46-
4749
refute_receive %Socket.Message{event: "system", payload: %{message: "Replication connection established"}}, 300
4850
end
4951

50-
test "notifies at most once and stops listening after the first signal", %{tenant: tenant} do
52+
test "notifies at most once", %{tenant: tenant} do
5153
stub(Connect, :lookup_or_start_connection, fn _ -> {:ok, self()} end)
5254
stub(Connect, :replication_status, fn _ -> {:ok, self()} end)
5355

5456
assert {:ok, _, _} = join(tenant)
5557

5658
assert_receive %Socket.Message{event: "system", payload: %{message: "Replication connection established"}}, 500
5759

58-
signal_ready(tenant, self())
59-
6060
refute_receive %Socket.Message{event: "system", payload: %{message: "Replication connection established"}}, 300
6161
end
6262

63+
test "stops the channel when replication is not established before the timeout", %{tenant: tenant} do
64+
previous = Application.get_env(:realtime, :replication_ready_timeout)
65+
Application.put_env(:realtime, :replication_ready_timeout, 50)
66+
on_exit(fn -> Application.put_env(:realtime, :replication_ready_timeout, previous) end)
67+
68+
stub(Connect, :lookup_or_start_connection, fn _ -> {:ok, self()} end)
69+
stub(Connect, :replication_status, fn _ -> {:error, :not_connected} end)
70+
71+
assert {:ok, _, socket} = join(tenant)
72+
ref = Process.monitor(socket.channel_pid)
73+
74+
assert_receive %Socket.Message{
75+
event: "system",
76+
payload: %{status: "error", message: "Replication connection was not established in time"}
77+
},
78+
500
79+
80+
assert_receive {:DOWN, ^ref, :process, _, _}, 500
81+
end
82+
6383
defp join(tenant) do
6484
jwt = generate_jwt_token(tenant)
6585
{:ok, socket} = connect(UserSocket, %{}, conn_opts(tenant, jwt))
6686
subscribe_and_join(socket, "realtime:test", %{"config" => %{}})
6787
end
6888

69-
defp signal_ready(tenant, replication_conn) do
70-
RealtimeWeb.Endpoint.local_broadcast(
71-
Connect.syn_topic(tenant.external_id),
72-
"ready",
73-
%{pid: self(), conn: self(), replication_conn: replication_conn}
74-
)
75-
end
76-
7789
defp conn_opts(tenant, token) do
7890
[
7991
connect_info: %{

0 commit comments

Comments
 (0)