Skip to content

Commit dbfe3f2

Browse files
committed
change approach to make it reliant on the channel instead of waiting for pubsub
1 parent 8249ca4 commit dbfe3f2

4 files changed

Lines changed: 85 additions & 44 deletions

File tree

config/config.exs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@ config :realtime,
1212
ecto_repos: [Realtime.Repo],
1313
version: Mix.Project.config()[:version],
1414
replication_watchdog_interval: :timer.minutes(5),
15-
replication_watchdog_timeout: :timer.minutes(1)
15+
replication_watchdog_timeout: :timer.minutes(1),
16+
replication_ready_timeout: :timer.minutes(1)
1617

1718
# Configures the endpoint
1819
config :realtime, RealtimeWeb.Endpoint,

lib/realtime_web/channels/realtime_channel.ex

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ defmodule RealtimeWeb.RealtimeChannel do
3030
alias RealtimeWeb.RealtimeChannel.Tracker
3131

3232
@confirm_token_ms_interval :timer.minutes(5)
33+
@replication_ready_check_interval 10
3334
@fullsweep_after Application.compile_env!(:realtime, :websocket_fullsweep_after)
3435

3536
@impl true
@@ -104,7 +105,6 @@ defmodule RealtimeWeb.RealtimeChannel do
104105

105106
RealtimeWeb.Endpoint.subscribe(tenant_topic, metadata: metadata)
106107
RealtimeWeb.Endpoint.subscribe("realtime:operations:" <> tenant_id, metadata: metadata)
107-
RealtimeWeb.Endpoint.subscribe(Connect.syn_topic(tenant_id))
108108
send(self(), :notify_replication_ready)
109109

110110
is_new_api = new_api?(params)
@@ -136,7 +136,8 @@ defmodule RealtimeWeb.RealtimeChannel do
136136
tenant_topic: tenant_topic,
137137
channel_name: sub_topic,
138138
presence_enabled?: presence_enabled?,
139-
replication_ready_notified?: false
139+
replication_ready_notified?: false,
140+
replication_ready_deadline: System.monotonic_time(:millisecond) + replication_ready_timeout()
140141
}
141142

142143
socket =
@@ -293,19 +294,26 @@ defmodule RealtimeWeb.RealtimeChannel do
293294
{:noreply, assign(socket, %{pg_sub_ref: pg_sub_ref})}
294295
end
295296

296-
def handle_info(:notify_replication_ready, %{assigns: %{tenant: tenant_id}} = socket) do
297-
case Connect.replication_status(tenant_id) do
298-
{:ok, _replication_conn} -> {:noreply, push_replication_ready(socket)}
299-
{:error, :not_connected} -> {:noreply, socket}
300-
end
297+
def handle_info(:notify_replication_ready, %{assigns: %{replication_ready_notified?: true}} = socket) do
298+
{:noreply, socket}
301299
end
302300

303-
def handle_info(%{event: "ready", payload: %{replication_conn: pid}}, socket)
304-
when is_pid(pid) do
305-
{:noreply, push_replication_ready(socket)}
306-
end
301+
def handle_info(:notify_replication_ready, socket) do
302+
%{assigns: %{tenant: tenant_id, channel_name: channel_name, replication_ready_deadline: deadline}} = socket
307303

308-
def handle_info(%{topic: "connect:" <> _}, socket), do: {:noreply, socket}
304+
cond do
305+
match?({:ok, _replication_conn}, Connect.replication_status(tenant_id)) ->
306+
push_system_message("broadcast", socket, "ok", "Replication connection established", channel_name)
307+
{:noreply, assign(socket, :replication_ready_notified?, true)}
308+
309+
System.monotonic_time(:millisecond) >= deadline ->
310+
shutdown_response(socket, "Replication connection was not established in time")
311+
312+
true ->
313+
Process.send_after(self(), :notify_replication_ready, @replication_ready_check_interval)
314+
{:noreply, socket}
315+
end
316+
end
309317

310318
def handle_info(_msg, %{assigns: %{policies: %Policies{broadcast: %BroadcastPolicies{read: false}}}} = socket) do
311319
Logger.warning("Broadcast message ignored")
@@ -740,12 +748,8 @@ defmodule RealtimeWeb.RealtimeChannel do
740748
{:stop, :normal, socket}
741749
end
742750

743-
defp push_replication_ready(%{assigns: %{replication_ready_notified?: true}} = socket), do: socket
744-
745-
defp push_replication_ready(%{assigns: %{tenant: tenant_id, channel_name: channel_name}} = socket) do
746-
RealtimeWeb.Endpoint.unsubscribe(Connect.syn_topic(tenant_id))
747-
push_system_message("broadcast", socket, "ok", "Replication connection established", channel_name)
748-
assign(socket, :replication_ready_notified?, true)
751+
defp replication_ready_timeout do
752+
Application.fetch_env!(:realtime, :replication_ready_timeout)
749753
end
750754

751755
defp push_system_message(extension, socket, status, error, channel_name)

test/integration/rt_channel/billable_events_test.exs

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,13 @@ defmodule Realtime.Integration.RtChannel.BillableEventsTest do
7676

7777
# Join events
7878
assert_receive %Message{event: "phx_reply", payload: %{"status" => "ok"}, topic: ^topic}, 300
79-
assert_receive %Message{topic: ^topic, event: "system"}, 5000
79+
80+
assert_receive %Message{
81+
topic: ^topic,
82+
event: "system",
83+
payload: %{"extension" => "postgres_changes", "status" => "ok"}
84+
},
85+
50000
8086

8187
# Wait for RateCounter to run
8288
RateCounterHelper.tick_tenant_rate_counters!(tenant.external_id)
@@ -201,13 +207,25 @@ defmodule Realtime.Integration.RtChannel.BillableEventsTest do
201207

202208
# Join events
203209
assert_receive %Message{event: "phx_reply", payload: %{"status" => "ok"}, topic: ^topic}, 300
204-
assert_receive %Message{topic: ^topic, event: "system"}, 5000
210+
211+
assert_receive %Message{
212+
topic: ^topic,
213+
event: "system",
214+
payload: %{"extension" => "postgres_changes", "status" => "ok"}
215+
},
216+
5000
205217

206218
# Add second user to test the "multiplication" of billable events
207219
{socket, _} = get_connection(tenant, serializer)
208220
WebsocketClient.join(socket, topic, %{config: config})
209221
assert_receive %Message{event: "phx_reply", payload: %{"status" => "ok"}, topic: ^topic}, 300
210-
assert_receive %Message{topic: ^topic, event: "system"}, 5000
222+
223+
assert_receive %Message{
224+
topic: ^topic,
225+
event: "system",
226+
payload: %{"extension" => "postgres_changes", "status" => "ok"}
227+
},
228+
5000
211229

212230
tenant = Tenants.get_tenant_by_external_id(tenant.external_id)
213231
{:ok, conn} = Database.connect(tenant, "realtime_test", :stop)
@@ -221,7 +239,7 @@ defmodule Realtime.Integration.RtChannel.BillableEventsTest do
221239
event: "postgres_changes",
222240
payload: %{"data" => %{"schema" => "public", "table" => "test", "type" => "INSERT"}}
223241
},
224-
5000
242+
500
225243
end
226244

227245
# Wait for RateCounter to run
@@ -249,7 +267,13 @@ defmodule Realtime.Integration.RtChannel.BillableEventsTest do
249267

250268
# Join events
251269
assert_receive %Message{event: "phx_reply", payload: %{"status" => "ok"}, topic: ^topic}, 300
252-
assert_receive %Message{topic: ^topic, event: "system"}, 5000
270+
271+
assert_receive %Message{
272+
topic: ^topic,
273+
event: "system",
274+
payload: %{"extension" => "postgres_changes", "status" => "error"}
275+
},
276+
5000
253277

254278
# Wait for RateCounter to run
255279
RateCounterHelper.tick_tenant_rate_counters!(tenant.external_id)

test/realtime_web/channels/realtime_channel_replication_ready_test.exs

Lines changed: 32 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -23,57 +23,69 @@ defmodule RealtimeWeb.RealtimeChannelReplicationReadyTest do
2323
assert_receive %Socket.Message{event: "system", payload: %{message: "Replication connection established"}}, 500
2424
end
2525

26-
test "pushes the system message when the syn ready broadcast arrives after join", %{tenant: tenant} do
27-
stub(Connect, :lookup_or_start_connection, fn _ -> {:ok, self()} end)
28-
stub(Connect, :replication_status, fn _ -> {:error, :not_connected} end)
26+
test "pushes the system message once replication becomes ready while polling", %{tenant: tenant} do
27+
{:ok, counter} = Agent.start_link(fn -> 0 end)
2928

30-
assert {:ok, _, _} = join(tenant)
29+
stub(Connect, :lookup_or_start_connection, fn _ -> {:ok, self()} end)
3130

32-
refute_receive %Socket.Message{event: "system", payload: %{message: "Replication connection established"}}, 200
31+
stub(Connect, :replication_status, fn _ ->
32+
case Agent.get_and_update(counter, fn n -> {n, n + 1} end) do
33+
n when n < 3 -> {:error, :not_connected}
34+
_ -> {:ok, self()}
35+
end
36+
end)
3337

34-
signal_ready(tenant, self())
38+
assert {:ok, _, _} = join(tenant)
3539

3640
assert_receive %Socket.Message{event: "system", payload: %{message: "Replication connection established"}}, 500
3741
end
3842

39-
test "ignores syn ready broadcasts without a replication connection", %{tenant: tenant} do
43+
test "does not push while replication is unavailable", %{tenant: tenant} do
4044
stub(Connect, :lookup_or_start_connection, fn _ -> {:ok, self()} end)
4145
stub(Connect, :replication_status, fn _ -> {:error, :not_connected} end)
4246

4347
assert {:ok, _, _} = join(tenant)
4448

45-
signal_ready(tenant, nil)
46-
4749
refute_receive %Socket.Message{event: "system", payload: %{message: "Replication connection established"}}, 300
4850
end
4951

50-
test "notifies at most once and stops listening after the first signal", %{tenant: tenant} do
52+
test "notifies at most once", %{tenant: tenant} do
5153
stub(Connect, :lookup_or_start_connection, fn _ -> {:ok, self()} end)
5254
stub(Connect, :replication_status, fn _ -> {:ok, self()} end)
5355

5456
assert {:ok, _, _} = join(tenant)
5557

5658
assert_receive %Socket.Message{event: "system", payload: %{message: "Replication connection established"}}, 500
5759

58-
signal_ready(tenant, self())
59-
6060
refute_receive %Socket.Message{event: "system", payload: %{message: "Replication connection established"}}, 300
6161
end
6262

63+
test "stops the channel when replication is not established before the timeout", %{tenant: tenant} do
64+
previous = Application.get_env(:realtime, :replication_ready_timeout)
65+
Application.put_env(:realtime, :replication_ready_timeout, 50)
66+
on_exit(fn -> Application.put_env(:realtime, :replication_ready_timeout, previous) end)
67+
68+
stub(Connect, :lookup_or_start_connection, fn _ -> {:ok, self()} end)
69+
stub(Connect, :replication_status, fn _ -> {:error, :not_connected} end)
70+
71+
assert {:ok, _, socket} = join(tenant)
72+
ref = Process.monitor(socket.channel_pid)
73+
74+
assert_receive %Socket.Message{
75+
event: "system",
76+
payload: %{status: "error", message: "Replication connection was not established in time"}
77+
},
78+
500
79+
80+
assert_receive {:DOWN, ^ref, :process, _, _}, 500
81+
end
82+
6383
defp join(tenant) do
6484
jwt = generate_jwt_token(tenant)
6585
{:ok, socket} = connect(UserSocket, %{}, conn_opts(tenant, jwt))
6686
subscribe_and_join(socket, "realtime:test", %{"config" => %{}})
6787
end
6888

69-
defp signal_ready(tenant, replication_conn) do
70-
RealtimeWeb.Endpoint.local_broadcast(
71-
Connect.syn_topic(tenant.external_id),
72-
"ready",
73-
%{pid: self(), conn: self(), replication_conn: replication_conn}
74-
)
75-
end
76-
7789
defp conn_opts(tenant, token) do
7890
[
7991
connect_info: %{

0 commit comments

Comments
 (0)