Skip to content

Commit 3e84184

Browse files
committed
fix: add system message to signal replication ready
1 parent a73cb13 commit 3e84184

10 files changed

Lines changed: 371 additions & 96 deletions

File tree

ERROR_CODES.md

Lines changed: 85 additions & 85 deletions
Large diffs are not rendered by default.

lib/realtime/syn_handler.ex

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,12 @@ defmodule Realtime.SynHandler do
1212
@postgres_cdc_scope_prefix PostgresCdc.syn_topic_prefix()
1313

1414
@impl true
15-
def on_registry_process_updated(Connect, tenant_id, pid, %{conn: conn}, :normal) when is_pid(conn) do
16-
# Update that a database connection is ready
17-
Endpoint.local_broadcast(Connect.syn_topic(tenant_id), "ready", %{pid: pid, conn: conn})
15+
def on_registry_process_updated(Connect, tenant_id, pid, %{conn: conn} = meta, :normal) when is_pid(conn) do
16+
Endpoint.local_broadcast(Connect.syn_topic(tenant_id), "ready", %{
17+
pid: pid,
18+
conn: conn,
19+
replication_conn: Map.get(meta, :replication_conn)
20+
})
1821
end
1922

2023
def on_registry_process_updated(scope, tenant_id, _pid, meta, _reason) do

lib/realtime_web/channels/realtime_channel.ex

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,8 @@ defmodule RealtimeWeb.RealtimeChannel do
104104

105105
RealtimeWeb.Endpoint.subscribe(tenant_topic, metadata: metadata)
106106
RealtimeWeb.Endpoint.subscribe("realtime:operations:" <> tenant_id, metadata: metadata)
107+
RealtimeWeb.Endpoint.subscribe(Connect.syn_topic(tenant_id))
108+
send(self(), :notify_replication_ready)
107109

108110
is_new_api = new_api?(params)
109111
presence_enabled? = socket.assigns.presence_enabled?
@@ -133,7 +135,8 @@ defmodule RealtimeWeb.RealtimeChannel do
133135
self_broadcast: !!params["config"]["broadcast"]["self"],
134136
tenant_topic: tenant_topic,
135137
channel_name: sub_topic,
136-
presence_enabled?: presence_enabled?
138+
presence_enabled?: presence_enabled?,
139+
replication_ready_notified?: false
137140
}
138141

139142
socket =
@@ -290,6 +293,20 @@ defmodule RealtimeWeb.RealtimeChannel do
290293
{:noreply, assign(socket, %{pg_sub_ref: pg_sub_ref})}
291294
end
292295

296+
def handle_info(:notify_replication_ready, %{assigns: %{tenant: tenant_id}} = socket) do
297+
case Connect.replication_status(tenant_id) do
298+
{:ok, _replication_conn} -> {:noreply, push_replication_ready(socket)}
299+
{:error, :not_connected} -> {:noreply, socket}
300+
end
301+
end
302+
303+
def handle_info(%{event: "ready", payload: %{replication_conn: pid}}, socket)
304+
when is_pid(pid) do
305+
{:noreply, push_replication_ready(socket)}
306+
end
307+
308+
def handle_info(%{topic: "connect:" <> _}, socket), do: {:noreply, socket}
309+
293310
def handle_info(_msg, %{assigns: %{policies: %Policies{broadcast: %BroadcastPolicies{read: false}}}} = socket) do
294311
Logger.warning("Broadcast message ignored")
295312
{:noreply, socket}
@@ -723,6 +740,14 @@ defmodule RealtimeWeb.RealtimeChannel do
723740
{:stop, :normal, socket}
724741
end
725742

743+
defp push_replication_ready(%{assigns: %{replication_ready_notified?: true}} = socket), do: socket
744+
745+
defp push_replication_ready(%{assigns: %{tenant: tenant_id, channel_name: channel_name}} = socket) do
746+
RealtimeWeb.Endpoint.unsubscribe(Connect.syn_topic(tenant_id))
747+
push_system_message("broadcast", socket, "ok", "Replication connection established", channel_name)
748+
assign(socket, :replication_ready_notified?, true)
749+
end
750+
726751
defp push_system_message(extension, socket, status, error, channel_name)
727752
when is_map(error) and is_map_key(error, :error_code) and is_map_key(error, :error_message) do
728753
push(socket, "system", %{

mise.toml

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,3 +74,59 @@ run = """
7474
--db-url "postgresql://postgres:postgres@127.0.0.1:54322/postgres" \
7575
"$@"
7676
"""
77+
78+
[tasks.e2e-local]
79+
description = "Run e2e tests against a locally-built realtime image (carries unreleased migrations). Builds the repo, swaps it in for supabase's stock realtime, then runs the suite. e.g. mise run e2e-local -- --test postgres-changes-filters"
80+
dir = "test/e2e"
81+
run = """
82+
set -e
83+
REF=$(grep -E '^[[:space:]]*project_id' supabase/config.toml | sed -E 's/.*"(.*)".*/\\1/')
84+
IMAGE=realtime:e2e-local
85+
RT="supabase_realtime_${REF}"
86+
87+
# 1. Build the repo into a local image (Docker layer cache keeps rebuilds cheap).
88+
docker build -t "$IMAGE" ../..
89+
90+
# 2. Bring up the full supabase stack. We use its realtime container only as the source of
91+
# truth for env vars (JWT secret, DB creds, encryption key, etc.). A prior run may have left
92+
# realtime swapped out / removed; `supabase start` won't recreate an externally-removed
93+
# container, so if it's missing we recycle the stack to get a clean one back.
94+
supabase start
95+
if ! docker inspect "$RT" >/dev/null 2>&1; then
96+
echo "realtime container missing (left over from a prior swap); recreating supabase stack..."
97+
supabase stop >/dev/null 2>&1 || true
98+
supabase start
99+
fi
100+
101+
# 3. Swap realtime: copy supabase's exact env (via `env` inside their container, so we avoid
102+
# Go-template braces that collide with mise's templating), drop their container, and run ours
103+
# under the same name / network / aliases / healthcheck so Kong routes to it unchanged.
104+
ENVFILE=$(mktemp)
105+
docker exec "$RT" env > "$ENVFILE"
106+
docker rm -f "$RT" >/dev/null
107+
docker run -d --name "$RT" \
108+
--network "supabase_network_${REF}" \
109+
--network-alias realtime --network-alias realtime-dev \
110+
--env-file "$ENVFILE" \
111+
--health-cmd 'curl -sSfL --head -o /dev/null -H "Host:realtime-dev" http://127.0.0.1:4000/api/ping' \
112+
--health-interval 10s --health-timeout 2s --health-retries 3 \
113+
"$IMAGE" >/dev/null
114+
rm -f "$ENVFILE"
115+
116+
# 4. Wait for our realtime to report healthy (boot + tenant migrations). `docker ps` with a
117+
# health filter prints the container id only once it is healthy — no Go-template needed.
118+
echo "Waiting for local realtime ($IMAGE) to become healthy..."
119+
for _ in $(seq 1 30); do
120+
if [ -n "$(docker ps -q --filter "name=$RT" --filter health=healthy)" ]; then break; fi
121+
sleep 2
122+
done
123+
124+
# 5. Run the suite against the stack (Kong -> our realtime). Do NOT call `supabase start` again
125+
# here, or it would recreate realtime from the stock image.
126+
eval "$(supabase status --output env)"
127+
bun run realtime-check.ts --env local \
128+
--publishable-key "$PUBLISHABLE_KEY" \
129+
--secret-key "$SECRET_KEY" \
130+
--db-url "postgresql://postgres:postgres@127.0.0.1:54322/postgres" \
131+
"$@"
132+
"""

test/e2e/realtime-check.ts

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -274,16 +274,24 @@ async function waitForSubscribed(channel: ReturnType<SupabaseClient["channel"]>)
274274
// Subscribes a channel and waits until it is fully joined.
275275
// All data operations must happen after this returns to avoid delivery races.
276276
async function openChannel(channel: ReturnType<SupabaseClient["channel"]>): Promise<number> {
277+
let replicationReady = false;
278+
channel.on("system", "*", ({ message }: { message?: string }) => {
279+
if (message === "Replication connection established") replicationReady = true;
280+
});
277281
channel.subscribe();
278-
return waitForSubscribed(channel);
282+
const subscribeMs = await waitForSubscribed(channel);
283+
await waitFor(() => replicationReady ? true : null, "replication established");
284+
return subscribeMs;
279285
}
280286

281287
// Subscribes a postgres_changes channel and waits for both the join and the
282288
// system:ok confirmation that the server-side WAL subscription is active.
283289
async function openPostgresChannel(channel: ReturnType<SupabaseClient["channel"]>): Promise<{ subscribeMs: number; systemMs: number }> {
284290
const start = performance.now();
285291
let systemOk = false;
286-
channel.on("system", "*", ({ status }: { status: string }) => { if (status === "ok") systemOk = true; });
292+
channel.on("system", "*", ({ status, extension }: { status: string; extension?: string }) => {
293+
if (status === "ok" && extension === "postgres_changes") systemOk = true;
294+
});
287295
const subscribeMs = await openChannel(channel);
288296
const { latencyMs: systemMs } = await waitFor(() => systemOk ? true : null, "system ok");
289297
return { subscribeMs, systemMs: performance.now() - start };
@@ -1001,7 +1009,6 @@ async function runBroadcastChangesTests(_testUser: { email: string; password: st
10011009
.on("broadcast", { event: "INSERT" }, (res) => (result = res));
10021010

10031011
const subscribeMs = await openChannel(channel);
1004-
await sleep(500);
10051012
await supabase.from("broadcast_changes").insert({ value, id, topic: testTopic });
10061013
const { latencyMs: eventMs } = await waitFor(() => result, "INSERT event");
10071014

@@ -1031,7 +1038,6 @@ async function runBroadcastChangesTests(_testUser: { email: string; password: st
10311038
.on("broadcast", { event: "UPDATE" }, (res) => (result = res));
10321039

10331040
const subscribeMs = await openChannel(channel);
1034-
await sleep(100);
10351041
await supabase.from("broadcast_changes").insert({ value: originalValue, id, topic: testTopic });
10361042
await supabase.from("broadcast_changes").update({ value: updatedValue }).eq("id", id);
10371043
const { latencyMs: eventMs } = await waitFor(() => result, "UPDATE event");
@@ -1062,7 +1068,6 @@ async function runBroadcastChangesTests(_testUser: { email: string; password: st
10621068
.on("broadcast", { event: "DELETE" }, (res) => (result = res));
10631069

10641070
const subscribeMs = await openChannel(channel);
1065-
await sleep(100);
10661071
await supabase.from("broadcast_changes").insert({ value, id, topic: testTopic });
10671072
await supabase.from("broadcast_changes").delete().eq("id", id);
10681073
const { latencyMs: eventMs } = await waitFor(() => result, "DELETE event");
@@ -1439,7 +1444,6 @@ async function runBroadcastBinaryTests(supabase: SupabaseClient) {
14391444
.on("broadcast", { event }, (msg) => (result = msg.payload));
14401445

14411446
const subscribeMs = await openChannel(channel);
1442-
await sleep(100);
14431447

14441448
await sql`SELECT realtime.send_binary(${binary}::bytea, ${event}::text, ${topic}::text, true)`;
14451449

test/integration/rt_channel/billable_events_test.exs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ defmodule Realtime.Integration.RtChannel.BillableEventsTest do
110110

111111
# Join events
112112
assert_receive %Message{event: "phx_reply", payload: %{"status" => "ok"}, topic: ^topic}, 300
113+
113114
# Broadcast event
114115
payload = %{"event" => "TEST", "payload" => %{"msg" => 1}, "type" => "broadcast"}
115116

@@ -120,7 +121,7 @@ defmodule Realtime.Integration.RtChannel.BillableEventsTest do
120121
assert_receive %Message{topic: ^topic, event: "broadcast", payload: ^payload}
121122
end
122123

123-
refute_receive _any
124+
refute_receive %Message{topic: ^topic, event: "broadcast"}
124125

125126
# Wait for RateCounter to run
126127
RateCounterHelper.tick_tenant_rate_counters!(tenant.external_id)

test/integration/rt_channel/connection_lifecycle_test.exs

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ defmodule Realtime.Integration.RtChannel.ConnectionLifecycleTest do
1212
alias Phoenix.Socket.Message
1313
alias Realtime.Integration.WebsocketClient
1414
alias Realtime.Tenants
15+
alias Realtime.Tenants.Connect
1516
alias RealtimeWeb.UserSocket
1617

1718
@moduletag :capture_log
@@ -53,6 +54,60 @@ defmodule Realtime.Integration.RtChannel.ConnectionLifecycleTest do
5354
end
5455
end
5556

57+
describe "replication connection establishment" do
58+
test "Connect signals replication readiness on its syn topic with the replication_conn pid" do
59+
tenant = Containers.checkout_tenant(run_migrations: true)
60+
61+
Phoenix.PubSub.subscribe(Realtime.PubSub, Connect.syn_topic(tenant.external_id))
62+
{:ok, _db_conn} = Connect.lookup_or_start_connection(tenant.external_id)
63+
64+
assert_receive %Phoenix.Socket.Broadcast{event: "ready", payload: %{replication_conn: replication_conn}}
65+
when is_pid(replication_conn),
66+
5000
67+
68+
assert {:ok, ^replication_conn} = Connect.replication_status(tenant.external_id)
69+
end
70+
71+
test "every joining client receives the replication established system message, even after streaming has started",
72+
%{serializer: serializer} do
73+
tenant = Containers.checkout_tenant(run_migrations: true)
74+
75+
Phoenix.PubSub.subscribe(Realtime.PubSub, Connect.syn_topic(tenant.external_id))
76+
{:ok, _db_conn} = Connect.lookup_or_start_connection(tenant.external_id)
77+
78+
assert_receive %Phoenix.Socket.Broadcast{event: "ready", payload: %{replication_conn: replication_conn}}
79+
when is_pid(replication_conn),
80+
5000
81+
82+
{socket1, _} = get_connection(tenant, serializer, role: "authenticated")
83+
{socket2, _} = get_connection(tenant, serializer, role: "authenticated")
84+
85+
config = %{broadcast: %{self: true}, private: false}
86+
topic1 = "realtime:#{random_string()}"
87+
topic2 = "realtime:#{random_string()}"
88+
89+
WebsocketClient.join(socket1, topic1, %{config: config})
90+
WebsocketClient.join(socket2, topic2, %{config: config})
91+
92+
assert_receive %Message{event: "phx_reply", topic: ^topic1, payload: %{"status" => "ok"}}, 500
93+
assert_receive %Message{event: "phx_reply", topic: ^topic2, payload: %{"status" => "ok"}}, 500
94+
95+
assert_receive %Message{
96+
event: "system",
97+
topic: ^topic1,
98+
payload: %{"message" => "Replication connection established"}
99+
},
100+
2000
101+
102+
assert_receive %Message{
103+
event: "system",
104+
topic: ^topic2,
105+
payload: %{"message" => "Replication connection established"}
106+
},
107+
2000
108+
end
109+
end
110+
56111
describe "socket disconnect - tenant suspension" do
57112
setup [:rls_context]
58113

@@ -191,6 +246,14 @@ defmodule Realtime.Integration.RtChannel.ConnectionLifecycleTest do
191246
WebsocketClient.join(socket, topic, %{config: config})
192247

193248
assert_receive %Message{event: "phx_reply", payload: %{"status" => "ok"}, topic: ^topic}, 500
249+
250+
assert_receive %Message{
251+
event: "system",
252+
topic: ^topic,
253+
payload: %{"message" => "Replication connection established"}
254+
},
255+
2000
256+
194257
topic
195258
end
196259

test/integration/rt_channel/token_handling_test.exs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,13 @@ defmodule Realtime.Integration.RtChannel.TokenHandlingTest do
193193

194194
assert_receive %Message{event: "phx_reply", payload: %{"status" => "ok"}}, 500
195195

196+
assert_receive %Message{
197+
event: "system",
198+
topic: ^realtime_topic,
199+
payload: %{"message" => "Replication connection established"}
200+
},
201+
2000
202+
196203
WebsocketClient.send_event(socket, realtime_topic, "access_token", %{"access_token" => new_token})
197204

198205
refute_receive %Message{}
@@ -312,6 +319,13 @@ defmodule Realtime.Integration.RtChannel.TokenHandlingTest do
312319

313320
assert_receive %Message{event: "phx_reply", payload: %{"status" => "ok"}}, 500
314321

322+
assert_receive %Message{
323+
event: "system",
324+
topic: ^realtime_topic,
325+
payload: %{"message" => "Replication connection established"}
326+
},
327+
2000
328+
315329
WebsocketClient.send_event(socket, realtime_topic, "access_token", %{
316330
"access_token" => "sb_publishable_-fake_key"
317331
})

0 commit comments

Comments
 (0)