Skip to content

Commit dd01034

Browse files
lukebakkenthe-mikedavis
authored andcommitted
amqp_client: Per-channel queue_types_published tracking for direct connections
Previously, queue_types_published in amqp_gen_connection was a flat set with no per-channel cleanup. Once a connection published to a queue type it was permanently considered as publishing to that type, causing over-blocking when channels closed. Fix by threading the server channel pid through the channel startup path as a tagged tuple {direct, ServerPid} | {network, WriterPid}: - amqp_channel_sup: start_writer/6 returns a tagged writer tuple; start_link/6 passes it through to the caller - amqp_channel: set_writer/2 unwraps the tagged tuple - amqp_channels_manager: stores ClientPid -> TaggedWriter mapping; sends channel_closed/2 with the tagged writer on channel exit - amqp_gen_connection: tracks queue_types_published per server channel pid (keyed by {direct, ServerPid}); removes the entry on channel_closed Network connections are unaffected: rabbit_reader handles blocking independently and amqp_gen_connection ignores {network, _} channel_closed. Test: amqp_client_SUITE:per_queue_type_disk_alarm_direct_connection verifies that a direct connection is unblocked when the publishing channel closes, even while the alarm remains active.
1 parent 51b0310 commit dd01034

5 files changed

Lines changed: 71 additions & 32 deletions

File tree

deps/amqp_client/src/amqp_channel.erl

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -352,7 +352,11 @@ start_link(Driver, Connection, ChannelNumber, Consumer, Identity) ->
352352
gen_server:start_link(
353353
?MODULE, [Driver, Connection, ChannelNumber, Consumer, Identity], []).
354354

355-
set_writer(Pid, Writer) ->
355+
set_writer(Pid, {network, Writer}) ->
356+
set_writer(Pid, Writer);
357+
set_writer(Pid, {direct, Writer}) ->
358+
set_writer(Pid, Writer);
359+
set_writer(Pid, Writer) when is_pid(Pid) andalso is_pid(Writer) ->
356360
gen_server:cast(Pid, {set_writer, Writer}).
357361

358362
enable_delivery_flow_control(Pid) ->

deps/amqp_client/src/amqp_channel_sup.erl

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,10 @@ start_link(Type, Connection, ConnName, InfraArgs, ChNumber,
3434
modules => [amqp_channel]},
3535
{ok, ChPid} = supervisor:start_child(Sup, ChildSpec),
3636
case start_writer(Sup, Type, InfraArgs, ConnName, ChNumber, ChPid) of
37-
{ok, Writer} ->
38-
amqp_channel:set_writer(ChPid, Writer),
37+
{ok, TaggedWriter} ->
38+
amqp_channel:set_writer(ChPid, TaggedWriter),
3939
{ok, AState} = init_command_assembler(Type),
40-
{ok, Sup, {ChPid, AState}};
40+
{ok, Sup, {ChPid, TaggedWriter, AState}};
4141
{error, _}=Error ->
4242
rabbit_misc:shutdown_supervisor(Sup),
4343
Error
@@ -59,12 +59,11 @@ start_writer(_Sup, direct, [ConnPid, Node, User, VHost, Collector, AmqpParams],
5959
case rpc:call(Node, rabbit_direct, start_channel,
6060
[ChNumber, ChPid, ConnPid, ConnName, ?PROTOCOL, User,
6161
VHost, ?CLIENT_CAPABILITIES, Collector, AmqpParams], ?DIRECT_OPERATION_TIMEOUT) of
62-
{ok, _Writer} = Reply ->
63-
Reply;
62+
{ok, Writer} ->
63+
{ok, {direct, Writer}};
6464
{badrpc, Reason} ->
6565
{error, {Reason, Node}};
66-
Error ->
67-
Error
66+
Error -> Error
6867
end;
6968
start_writer(Sup, network, [Sock, FrameMax], ConnName, ChNumber, ChPid) ->
7069
GCThreshold = application:get_env(amqp_client, writer_gc_threshold, ?DEFAULT_GC_THRESHOLD),
@@ -77,7 +76,11 @@ start_writer(Sup, network, [Sock, FrameMax], ConnName, ChNumber, ChPid) ->
7776
shutdown => ?WORKER_WAIT,
7877
type => worker,
7978
modules => [rabbit_writer]},
80-
supervisor:start_child(Sup, ChildSpec).
79+
case supervisor:start_child(Sup, ChildSpec) of
80+
{ok, Writer} ->
81+
{ok, {network, Writer}};
82+
Error -> Error
83+
end.
8184

8285
init_command_assembler(direct) -> {ok, none};
8386
init_command_assembler(network) -> rabbit_command_assembler:init().

deps/amqp_client/src/amqp_channels_manager.erl

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
channel_sup_sup,
2424
map_num_pa = gb_trees:empty(), %% Number -> {Pid, AState}
2525
map_pid_num = #{}, %% Pid -> Number
26+
map_pid_writer = #{}, %% Pid -> {direct|network, WriterPid}
2627
channel_max = ?MAX_CHANNEL_NUMBER,
2728
closing = false}).
2829

@@ -107,8 +108,8 @@ handle_open_channel(ProposedNumber, Consumer, InfraArgs,
107108
{ok, Number} ->
108109
case amqp_channel_sup_sup:start_channel_sup(ChSupSup, InfraArgs,
109110
Number, Consumer) of
110-
{ok, _ChSup, {Ch, AState}} ->
111-
NewState = internal_register(Number, Ch, AState, State),
111+
{ok, _ChSup, {Ch, TaggedWriter, AState}} ->
112+
NewState = internal_register(Number, Ch, TaggedWriter, AState, State),
112113
erlang:monitor(process, Ch),
113114
{reply, {ok, Ch}, NewState};
114115
{error, _} = Error ->
@@ -158,12 +159,15 @@ handle_down(Pid, Reason, State) ->
158159
Number -> handle_channel_down(Pid, Number, Reason, State)
159160
end.
160161

161-
handle_channel_down(Pid, Number, Reason, State) ->
162+
handle_channel_down(Pid, Number, Reason, State = #state{connection = Connection,
163+
map_pid_writer = MapPW}) ->
162164
maybe_report_down(Pid, case Reason of {shutdown, R} -> R;
163165
_ -> Reason
164166
end,
165167
State),
168+
TaggedWriter = maps:get(Pid, MapPW),
166169
NewState = internal_unregister(Number, Pid, State),
170+
amqp_gen_connection:channel_closed(Connection, TaggedWriter),
167171
check_all_channels_terminated(NewState),
168172
{noreply, NewState}.
169173

@@ -214,19 +218,25 @@ internal_pass_frame(Number, Frame, State) ->
214218
internal_update_npa(Number, ChPid, NewAState, State)
215219
end.
216220

217-
internal_register(Number, Pid, AState,
218-
State = #state{map_num_pa = MapNPA, map_pid_num = MapPN}) ->
221+
internal_register(Number, Pid, TaggedWriter, AState,
222+
State = #state{map_num_pa = MapNPA, map_pid_num = MapPN,
223+
map_pid_writer = MapPW}) ->
219224
MapNPA1 = gb_trees:enter(Number, {Pid, AState}, MapNPA),
220225
MapPN1 = maps:put(Pid, Number, MapPN),
221-
State#state{map_num_pa = MapNPA1,
222-
map_pid_num = MapPN1}.
226+
MapPW1 = maps:put(Pid, TaggedWriter, MapPW),
227+
State#state{map_num_pa = MapNPA1,
228+
map_pid_num = MapPN1,
229+
map_pid_writer = MapPW1}.
223230

224231
internal_unregister(Number, Pid,
225-
State = #state{map_num_pa = MapNPA, map_pid_num = MapPN}) ->
232+
State = #state{map_num_pa = MapNPA, map_pid_num = MapPN,
233+
map_pid_writer = MapPW}) ->
226234
MapNPA1 = gb_trees:delete(Number, MapNPA),
227235
MapPN1 = maps:remove(Pid, MapPN),
228-
State#state{map_num_pa = MapNPA1,
229-
map_pid_num = MapPN1}.
236+
MapPW1 = maps:remove(Pid, MapPW),
237+
State#state{map_num_pa = MapNPA1,
238+
map_pid_num = MapPN1,
239+
map_pid_writer = MapPW1}.
230240

231241
internal_is_empty(#state{map_num_pa = MapNPA}) ->
232242
gb_trees:is_empty(MapNPA).

deps/amqp_client/src/amqp_gen_connection.erl

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,9 @@
1414
-behaviour(gen_server).
1515

1616
-export([start_link/2, connect/1, open_channel/3, hard_error_in_channel/3,
17-
channel_internal_error/3, server_misbehaved/2, channels_terminated/1,
18-
close/3, server_close/2, info/2, info_keys/0, info_keys/1,
19-
register_blocked_handler/2, update_secret/2]).
17+
channel_internal_error/3, channel_closed/2, server_misbehaved/2,
18+
channels_terminated/1, close/3, server_close/2, info/2, info_keys/0,
19+
info_keys/1, register_blocked_handler/2, update_secret/2]).
2020
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
2121
handle_info/2]).
2222

@@ -32,7 +32,7 @@
3232
%% connection.block, connection.unblock handler
3333
block_handler,
3434
blocked_by = sets:new([{version, 2}]),
35-
queue_types_published = sets:new([{version, 2}]),
35+
queue_types_published = #{},
3636
closing = false %% #closing{} | false
3737
}).
3838

@@ -97,6 +97,9 @@ hard_error_in_channel(Pid, ChannelPid, Reason) ->
9797
channel_internal_error(Pid, ChannelPid, Reason) ->
9898
gen_server:cast(Pid, {channel_internal_error, ChannelPid, Reason}).
9999

100+
channel_closed(Pid, TaggedWriter) ->
101+
gen_server:cast(Pid, {channel_closed, TaggedWriter}).
102+
100103
server_misbehaved(Pid, AmqpError) ->
101104
gen_server:cast(Pid, {server_misbehaved, AmqpError}).
102105

@@ -223,10 +226,23 @@ handle_cast({conserve_resources, Source, Conserve},
223226
end,
224227
State1 = State#state{blocked_by = BlockedBy1},
225228
maybe_block(State, State1);
226-
handle_cast({channel_published_to_queue_type, _ChPid, QT},
229+
handle_cast({channel_published_to_queue_type, ServerChPid, QT},
230+
#state{queue_types_published = QTs} = State) ->
231+
%% ServerChPid is the server-side channel pid. We key by {direct, ServerChPid}
232+
%% so channel_closed can remove the entry using the tagged writer.
233+
Key = {direct, ServerChPid},
234+
ChQTs = maps:get(Key, QTs, sets:new([{version, 2}])),
235+
State1 = State#state{queue_types_published =
236+
QTs#{Key => sets:add_element(QT, ChQTs)}},
237+
maybe_block(State, State1);
238+
handle_cast({channel_closed, {direct, ServerChPid}},
227239
#state{queue_types_published = QTs} = State) ->
228-
State1 = State#state{queue_types_published = sets:add_element(QT, QTs)},
229-
maybe_block(State, State1).
240+
State1 = State#state{queue_types_published =
241+
maps:remove({direct, ServerChPid}, QTs)},
242+
maybe_block(State, State1);
243+
handle_cast({channel_closed, {network, _WriterPid}}, State) ->
244+
%% Network connections are handled by rabbit_reader, not here.
245+
{noreply, State}.
230246

231247
%% @private
232248
handle_info({'DOWN', _, process, BlockHandler, Reason},
@@ -284,7 +300,9 @@ maybe_block(State0, State1) ->
284300

285301
should_block(#state{blocked_by = BlockedBy, queue_types_published = QTs}) ->
286302
lists:any(fun ({disk, QT}) ->
287-
sets:is_element(QT, QTs);
303+
maps:fold(fun(_, ChQTs, Acc) ->
304+
Acc orelse sets:is_element(QT, ChQTs)
305+
end, false, QTs);
288306
(_Resource) ->
289307
true
290308
end, sets:to_list(BlockedBy)).

deps/rabbit/test/amqp_client_SUITE.erl

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3824,7 +3824,8 @@ per_queue_type_disk_alarm(Config) ->
38243824
per_queue_type_disk_alarm_direct_connection(Config) ->
38253825
%% Test that a direct connection (used by plugins such as STOMP) is
38263826
%% blocked when a per-queue-type disk alarm fires and the connection
3827-
%% has published to that queue type, and unblocked when the alarm clears.
3827+
%% has published to that queue type, and unblocked when the publishing
3828+
%% channel closes - even while the alarm remains active.
38283829
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
38293830
Resource = {disk, rabbit_classic_queue},
38303831
QName = atom_to_binary(?FUNCTION_NAME),
@@ -3851,15 +3852,18 @@ per_queue_type_disk_alarm_direct_connection(Config) ->
38513852
after 5000 -> exit(connection_was_not_blocked)
38523853
end,
38533854

3854-
%% Clear the alarm. The connection should unblock.
3855-
ok = rpc:call(Node, rabbit_alarm, clear_alarm, [{resource_limit, Resource, Node}]),
3855+
%% Close the publishing channel. The connection should unblock since no
3856+
%% active channel has published to the alarmed queue type.
3857+
ok = amqp_channel:close(Ch),
38563858
receive
38573859
#'connection.unblocked'{} -> ok
38583860
after 5000 -> exit(connection_was_not_unblocked)
38593861
end,
38603862

3861-
#'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
3862-
ok = amqp_channel:close(Ch),
3863+
ok = rpc:call(Node, rabbit_alarm, clear_alarm, [{resource_limit, Resource, Node}]),
3864+
{ok, Ch2} = amqp_connection:open_channel(Conn),
3865+
#'queue.delete_ok'{} = amqp_channel:call(Ch2, #'queue.delete'{queue = QName}),
3866+
ok = amqp_channel:close(Ch2),
38633867
ok = amqp_connection:close(Conn).
38643868

38653869
max_message_size_client_to_server(Config) ->

0 commit comments

Comments
 (0)