Skip to content

Commit 850bc8b

Browse files
committed
change the approach to make opt in
1 parent 4bc694c commit 850bc8b

9 files changed

Lines changed: 130 additions & 182 deletions

File tree

ERROR_CODES.md

Lines changed: 85 additions & 85 deletions
Large diffs are not rendered by default.

lib/realtime_web/channels/payloads/config.ex

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ defmodule RealtimeWeb.Channels.Payloads.Config do
1515
embeds_one :presence, Presence
1616
embeds_many :postgres_changes, PostgresChange
1717
field :private, FlexibleBoolean, default: false
18+
field :replication_ready, FlexibleBoolean, default: false
1819
end
1920

2021
def changeset(config, attrs) do
@@ -28,7 +29,7 @@ defmodule RealtimeWeb.Channels.Payloads.Config do
2829
|> Map.new()
2930

3031
config
31-
|> cast(attrs, [:private], message: &Join.error_message/2)
32+
|> cast(attrs, [:private, :replication_ready], message: &Join.error_message/2)
3233
|> cast_embed(:broadcast, invalid_message: "unable to parse, expected a map")
3334
|> cast_embed(:presence, invalid_message: "unable to parse, expected a map")
3435
|> cast_embed(:postgres_changes, invalid_message: "unable to parse, expected an array of maps")

lib/realtime_web/channels/realtime_channel.ex

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,8 @@ defmodule RealtimeWeb.RealtimeChannel do
116116

117117
RealtimeWeb.Endpoint.subscribe(tenant_topic, metadata: metadata)
118118
RealtimeWeb.Endpoint.subscribe("realtime:operations:" <> tenant_id, metadata: metadata)
119-
send(self(), :notify_replication_ready)
119+
120+
replication_ready_opt_in? = !!params["config"]["replication_ready"]
120121

121122
is_new_api = new_api?(params)
122123
presence_enabled? = socket.assigns.presence_enabled?
@@ -146,11 +147,21 @@ defmodule RealtimeWeb.RealtimeChannel do
146147
self_broadcast: Join.self_broadcast?(join),
147148
tenant_topic: tenant_topic,
148149
channel_name: sub_topic,
149-
presence_enabled?: presence_enabled?,
150-
replication_ready_notified?: false,
151-
replication_ready_deadline: System.monotonic_time(:millisecond) + replication_ready_timeout()
150+
presence_enabled?: presence_enabled?
152151
}
153152

153+
assigns =
154+
if replication_ready_opt_in? do
155+
Process.send_after(self(), :notify_replication_ready, @replication_ready_check_interval)
156+
157+
Map.merge(assigns, %{
158+
replication_ready_notified?: false,
159+
replication_ready_deadline: System.monotonic_time(:millisecond) + replication_ready_timeout()
160+
})
161+
else
162+
assigns
163+
end
164+
154165
socket =
155166
socket
156167
|> assign_counter(tenant)
@@ -314,7 +325,7 @@ defmodule RealtimeWeb.RealtimeChannel do
314325

315326
cond do
316327
match?({:ok, _replication_conn}, Connect.replication_status(tenant_id)) ->
317-
push_system_message("broadcast", socket, "ok", "Replication connection established", channel_name)
328+
push_system_message("system", socket, "ok", "Replication connection established", channel_name)
318329
{:noreply, assign(socket, :replication_ready_notified?, true)}
319330

320331
System.monotonic_time(:millisecond) >= deadline ->

test/e2e/realtime-check.ts

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -274,24 +274,16 @@ async function waitForSubscribed(channel: ReturnType<SupabaseClient["channel"]>)
274274
// Subscribes a channel and waits until it is fully joined.
275275
// All data operations must happen after this returns to avoid delivery races.
276276
async function openChannel(channel: ReturnType<SupabaseClient["channel"]>): Promise<number> {
277-
let replicationReady = false;
278-
channel.on("system", "*", ({ message }: { message?: string }) => {
279-
if (message === "Replication connection established") replicationReady = true;
280-
});
281277
channel.subscribe();
282-
const subscribeMs = await waitForSubscribed(channel);
283-
await waitFor(() => replicationReady ? true : null, "replication established");
284-
return subscribeMs;
278+
return waitForSubscribed(channel);
285279
}
286280

287281
// Subscribes a postgres_changes channel and waits for both the join and the
288282
// system:ok confirmation that the server-side WAL subscription is active.
289283
async function openPostgresChannel(channel: ReturnType<SupabaseClient["channel"]>): Promise<{ subscribeMs: number; systemMs: number }> {
290284
const start = performance.now();
291285
let systemOk = false;
292-
channel.on("system", "*", ({ status, extension }: { status: string; extension?: string }) => {
293-
if (status === "ok" && extension === "postgres_changes") systemOk = true;
294-
});
286+
channel.on("system", "*", ({ status }: { status: string }) => { if (status === "ok") systemOk = true; });
295287
const subscribeMs = await openChannel(channel);
296288
const { latencyMs: systemMs } = await waitFor(() => systemOk ? true : null, "system ok");
297289
return { subscribeMs, systemMs: performance.now() - start };
@@ -1009,6 +1001,7 @@ async function runBroadcastChangesTests(_testUser: { email: string; password: st
10091001
.on("broadcast", { event: "INSERT" }, (res) => (result = res));
10101002

10111003
const subscribeMs = await openChannel(channel);
1004+
await sleep(500);
10121005
await supabase.from("broadcast_changes").insert({ value, id, topic: testTopic });
10131006
const { latencyMs: eventMs } = await waitFor(() => result, "INSERT event");
10141007

@@ -1038,6 +1031,7 @@ async function runBroadcastChangesTests(_testUser: { email: string; password: st
10381031
.on("broadcast", { event: "UPDATE" }, (res) => (result = res));
10391032

10401033
const subscribeMs = await openChannel(channel);
1034+
await sleep(100);
10411035
await supabase.from("broadcast_changes").insert({ value: originalValue, id, topic: testTopic });
10421036
await supabase.from("broadcast_changes").update({ value: updatedValue }).eq("id", id);
10431037
const { latencyMs: eventMs } = await waitFor(() => result, "UPDATE event");
@@ -1068,6 +1062,7 @@ async function runBroadcastChangesTests(_testUser: { email: string; password: st
10681062
.on("broadcast", { event: "DELETE" }, (res) => (result = res));
10691063

10701064
const subscribeMs = await openChannel(channel);
1065+
await sleep(100);
10711066
await supabase.from("broadcast_changes").insert({ value, id, topic: testTopic });
10721067
await supabase.from("broadcast_changes").delete().eq("id", id);
10731068
const { latencyMs: eventMs } = await waitFor(() => result, "DELETE event");
@@ -1776,6 +1771,7 @@ async function runBroadcastBinaryTests(supabase: SupabaseClient) {
17761771
.on("broadcast", { event }, (msg) => (result = msg.payload));
17771772

17781773
const subscribeMs = await openChannel(channel);
1774+
await sleep(100);
17791775

17801776
await sql`SELECT realtime.send_binary(${binary}::bytea, ${event}::text, ${topic}::text, true)`;
17811777

test/integration/rt_channel/billable_events_test.exs

Lines changed: 6 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -76,13 +76,7 @@ defmodule Realtime.Integration.RtChannel.BillableEventsTest do
7676

7777
# Join events
7878
assert_receive %Message{event: "phx_reply", payload: %{"status" => "ok"}, topic: ^topic}, 300
79-
80-
assert_receive %Message{
81-
topic: ^topic,
82-
event: "system",
83-
payload: %{"extension" => "postgres_changes", "status" => "ok"}
84-
},
85-
50000
79+
assert_receive %Message{topic: ^topic, event: "system"}, 5000
8680

8781
# Wait for RateCounter to run
8882
RateCounterHelper.tick_tenant_rate_counters!(tenant.external_id)
@@ -116,7 +110,6 @@ defmodule Realtime.Integration.RtChannel.BillableEventsTest do
116110

117111
# Join events
118112
assert_receive %Message{event: "phx_reply", payload: %{"status" => "ok"}, topic: ^topic}, 300
119-
120113
# Broadcast event
121114
payload = %{"event" => "TEST", "payload" => %{"msg" => 1}, "type" => "broadcast"}
122115

@@ -127,7 +120,7 @@ defmodule Realtime.Integration.RtChannel.BillableEventsTest do
127120
assert_receive %Message{topic: ^topic, event: "broadcast", payload: ^payload}
128121
end
129122

130-
refute_receive %Message{topic: ^topic, event: "broadcast"}
123+
refute_receive _any
131124

132125
# Wait for RateCounter to run
133126
RateCounterHelper.tick_tenant_rate_counters!(tenant.external_id)
@@ -207,25 +200,13 @@ defmodule Realtime.Integration.RtChannel.BillableEventsTest do
207200

208201
# Join events
209202
assert_receive %Message{event: "phx_reply", payload: %{"status" => "ok"}, topic: ^topic}, 300
210-
211-
assert_receive %Message{
212-
topic: ^topic,
213-
event: "system",
214-
payload: %{"extension" => "postgres_changes", "status" => "ok"}
215-
},
216-
5000
203+
assert_receive %Message{topic: ^topic, event: "system"}, 5000
217204

218205
# Add second user to test the "multiplication" of billable events
219206
{socket, _} = get_connection(tenant, serializer)
220207
WebsocketClient.join(socket, topic, %{config: config})
221208
assert_receive %Message{event: "phx_reply", payload: %{"status" => "ok"}, topic: ^topic}, 300
222-
223-
assert_receive %Message{
224-
topic: ^topic,
225-
event: "system",
226-
payload: %{"extension" => "postgres_changes", "status" => "ok"}
227-
},
228-
5000
209+
assert_receive %Message{topic: ^topic, event: "system"}, 5000
229210

230211
tenant = Tenants.get_tenant_by_external_id(tenant.external_id)
231212
{:ok, conn} = Database.connect(tenant, "realtime_test", :stop)
@@ -239,7 +220,7 @@ defmodule Realtime.Integration.RtChannel.BillableEventsTest do
239220
event: "postgres_changes",
240221
payload: %{"data" => %{"schema" => "public", "table" => "test", "type" => "INSERT"}}
241222
},
242-
500
223+
5000
243224
end
244225

245226
# Wait for RateCounter to run
@@ -267,13 +248,7 @@ defmodule Realtime.Integration.RtChannel.BillableEventsTest do
267248

268249
# Join events
269250
assert_receive %Message{event: "phx_reply", payload: %{"status" => "ok"}, topic: ^topic}, 300
270-
271-
assert_receive %Message{
272-
topic: ^topic,
273-
event: "system",
274-
payload: %{"extension" => "postgres_changes", "status" => "error"}
275-
},
276-
5000
251+
assert_receive %Message{topic: ^topic, event: "system"}, 5000
277252

278253
# Wait for RateCounter to run
279254
RateCounterHelper.tick_tenant_rate_counters!(tenant.external_id)

test/integration/rt_channel/connection_lifecycle_test.exs

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ defmodule Realtime.Integration.RtChannel.ConnectionLifecycleTest do
6868
assert {:ok, ^replication_conn} = Connect.replication_status(tenant.external_id)
6969
end
7070

71-
test "every joining client receives the replication established system message, even after streaming has started",
71+
test "clients that opt in receive the replication established system message, even after streaming has started",
7272
%{serializer: serializer} do
7373
tenant = Containers.checkout_tenant(run_migrations: true)
7474

@@ -82,7 +82,7 @@ defmodule Realtime.Integration.RtChannel.ConnectionLifecycleTest do
8282
{socket1, _} = get_connection(tenant, serializer, role: "authenticated")
8383
{socket2, _} = get_connection(tenant, serializer, role: "authenticated")
8484

85-
config = %{broadcast: %{self: true}, private: false}
85+
config = %{broadcast: %{self: true}, private: false, replication_ready: true}
8686
topic1 = "realtime:#{random_string()}"
8787
topic2 = "realtime:#{random_string()}"
8888

@@ -246,14 +246,6 @@ defmodule Realtime.Integration.RtChannel.ConnectionLifecycleTest do
246246
WebsocketClient.join(socket, topic, %{config: config})
247247

248248
assert_receive %Message{event: "phx_reply", payload: %{"status" => "ok"}, topic: ^topic}, 500
249-
250-
assert_receive %Message{
251-
event: "system",
252-
topic: ^topic,
253-
payload: %{"message" => "Replication connection established"}
254-
},
255-
2000
256-
257249
topic
258250
end
259251

test/integration/rt_channel/token_handling_test.exs

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -193,13 +193,6 @@ defmodule Realtime.Integration.RtChannel.TokenHandlingTest do
193193

194194
assert_receive %Message{event: "phx_reply", payload: %{"status" => "ok"}}, 500
195195

196-
assert_receive %Message{
197-
event: "system",
198-
topic: ^realtime_topic,
199-
payload: %{"message" => "Replication connection established"}
200-
},
201-
2000
202-
203196
WebsocketClient.send_event(socket, realtime_topic, "access_token", %{"access_token" => new_token})
204197

205198
refute_receive %Message{}
@@ -319,13 +312,6 @@ defmodule Realtime.Integration.RtChannel.TokenHandlingTest do
319312

320313
assert_receive %Message{event: "phx_reply", payload: %{"status" => "ok"}}, 500
321314

322-
assert_receive %Message{
323-
event: "system",
324-
topic: ^realtime_topic,
325-
payload: %{"message" => "Replication connection established"}
326-
},
327-
2000
328-
329315
WebsocketClient.send_event(socket, realtime_topic, "access_token", %{
330316
"access_token" => "sb_publishable_-fake_key"
331317
})

test/realtime_web/channels/realtime_channel_replication_ready_test.exs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ defmodule RealtimeWeb.RealtimeChannelReplicationReadyTest do
6060
refute_receive %Socket.Message{event: "system", payload: %{message: "Replication connection established"}}, 300
6161
end
6262

63-
test "stops the channel when replication is not established before the timeout", %{tenant: tenant} do
63+
test "shuts down the channel when replication is not established before the timeout", %{tenant: tenant} do
6464
previous = Application.get_env(:realtime, :replication_ready_timeout)
6565
Application.put_env(:realtime, :replication_ready_timeout, 50)
6666
on_exit(fn -> Application.put_env(:realtime, :replication_ready_timeout, previous) end)
@@ -80,10 +80,21 @@ defmodule RealtimeWeb.RealtimeChannelReplicationReadyTest do
8080
assert_receive {:DOWN, ^ref, :process, _, _}, 500
8181
end
8282

83+
test "does not arm replication readiness notifications unless opted in", %{tenant: tenant} do
84+
stub(Connect, :lookup_or_start_connection, fn _ -> {:ok, self()} end)
85+
stub(Connect, :replication_status, fn _ -> {:ok, self()} end)
86+
87+
jwt = generate_jwt_token(tenant)
88+
{:ok, socket} = connect(UserSocket, %{}, conn_opts(tenant, jwt))
89+
assert {:ok, _, _} = subscribe_and_join(socket, "realtime:test", %{"config" => %{}})
90+
91+
refute_receive %Socket.Message{event: "system", payload: %{message: "Replication connection established"}}, 300
92+
end
93+
8394
defp join(tenant) do
8495
jwt = generate_jwt_token(tenant)
8596
{:ok, socket} = connect(UserSocket, %{}, conn_opts(tenant, jwt))
86-
subscribe_and_join(socket, "realtime:test", %{"config" => %{}})
97+
subscribe_and_join(socket, "realtime:test", %{"config" => %{"replication_ready" => true}})
8798
end
8899

89100
defp conn_opts(tenant, token) do

test/realtime_web/channels/realtime_channel_test.exs

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -105,12 +105,6 @@ defmodule RealtimeWeb.RealtimeChannelTest do
105105
"commit_timestamp" => _
106106
} = Jason.encode!(data) |> Jason.decode!()
107107

108-
assert_receive %Socket.Message{
109-
event: "system",
110-
payload: %{message: "Replication connection established"}
111-
},
112-
500
113-
114108
refute_receive %Socket.Message{}
115109
refute_receive %Socket.Reply{}
116110
end
@@ -185,12 +179,6 @@ defmodule RealtimeWeb.RealtimeChannelTest do
185179
"commit_timestamp" => _
186180
} = Jason.encode!(data) |> Jason.decode!()
187181

188-
assert_receive %Socket.Message{
189-
event: "system",
190-
payload: %{message: "Replication connection established"}
191-
},
192-
500
193-
194182
refute_receive _any
195183
end
196184

@@ -515,12 +503,6 @@ defmodule RealtimeWeb.RealtimeChannelTest do
515503
}
516504
}
517505

518-
assert_receive %Socket.Message{
519-
event: "system",
520-
payload: %{message: "Replication connection established"}
521-
},
522-
500
523-
524506
refute_receive %Socket.Message{}
525507
end
526508
end
@@ -705,12 +687,6 @@ defmodule RealtimeWeb.RealtimeChannelTest do
705687
assert_receive %Socket.Reply{payload: %{}, topic: "realtime:test", status: :ok}, 500
706688
# no presence_diff this time
707689

708-
assert_receive %Socket.Message{
709-
event: "system",
710-
payload: %{message: "Replication connection established"}
711-
},
712-
500
713-
714690
refute_receive %Socket.Message{}
715691
refute_receive %Socket.Reply{}
716692
end

0 commit comments

Comments
 (0)