KAFKA-20169: Support static membership for Kafka Streams with the streams rebalance protocol at Client Side.#22559
Conversation
There was a problem hiding this comment.
Pull request overview
This PR completes client-side support for static membership (group.instance.id) when Kafka Streams uses the Streams rebalance protocol (group.protocol=streams, per KIP-1071). It removes prior config validation that blocked static membership under the Streams protocol, adds explicit fatal handling for UNRELEASED_INSTANCE_ID in the Streams heartbeat manager, and extends unit tests to cover the expected close/leave-epoch behavior for static members.
Changes:
- Allow
group.instance.idwhengroup.protocol=streams(including unprefixed,consumer., andmain.consumer.configs). - Treat
UNRELEASED_INSTANCE_IDas a known fatal error inStreamsGroupHeartbeatRequestManager. - Add/extend unit tests for static-member close semantics and poll-on-close request contents.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated no comments.
Show a summary per file
| File | Description |
|---|---|
| streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java | Replaces the previous “should throw” assertion with a parameterized test validating static membership is accepted under Streams protocol and propagated to main consumer configs. |
| streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java | Removes the Streams-protocol compatibility check that rejected group.instance.id, enabling static membership configuration. |
| clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java | Adds unit tests ensuring static members use the correct leave epoch on close for DEFAULT/REMAIN_IN_GROUP vs LEAVE_GROUP. |
| clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java | Extends fatal-error coverage to include UNRELEASED_INSTANCE_ID and verifies poll-on-close includes static leave epoch + instance id for static members. |
| clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java | Adds explicit fatal handling for UNRELEASED_INSTANCE_ID in error response processing. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
lucasbru
left a comment
There was a problem hiding this comment.
Thanks for the PR @chickenchickenlove !
| @Test | ||
| public void testStaticMemberRemainInGroupUsesStaticLeaveEpochOnClose() { | ||
| CloseOptions.GroupMembershipOperation operation = CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP; | ||
| MemberState expectedState = MemberState.LEAVING; | ||
| int expectedEpoch = StreamsGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH; | ||
| verifyStaticMemberLeaveOnClose(operation, expectedState, expectedEpoch); | ||
| } | ||
|
|
||
| @Test | ||
| public void testStaticMemberDefaultUsesLeaveGroupStaticMemberEpochOnClose() { | ||
| CloseOptions.GroupMembershipOperation operation = CloseOptions.GroupMembershipOperation.DEFAULT; | ||
| MemberState expectedState = MemberState.LEAVING; | ||
| int expectedEpoch = StreamsGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH; | ||
| verifyStaticMemberLeaveOnClose(operation, expectedState, expectedEpoch); | ||
| } | ||
|
|
||
| @Test | ||
| public void testStaticMemberLeaveGroupUsesLeaveGroupEpochOnClose() { | ||
| CloseOptions.GroupMembershipOperation operation = CloseOptions.GroupMembershipOperation.LEAVE_GROUP; | ||
| MemberState expectedState = MemberState.LEAVING; | ||
| int expectedEpoch = StreamsGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; | ||
| verifyStaticMemberLeaveOnClose(operation, expectedState, expectedEpoch); | ||
| } |
There was a problem hiding this comment.
These three only differ by the operation and expected epoch, could they be a single @ParameterizedTest like testPollOnCloseWhenStaticMemberIsLeaving above? Also the names are inconsistent for the same constant: UsesStaticLeaveEpoch vs UsesLeaveGroupStaticMemberEpoch.
| final Metrics localMetrics = new Metrics(time); | ||
| StreamsMembershipManager membershipManagerWithStaticMember = new StreamsMembershipManager( | ||
| GROUP_ID, | ||
| Optional.of("instance-1"), |
There was a problem hiding this comment.
Minor: this hardcodes "instance-1" while StreamsGroupHeartbeatRequestManagerTest uses an INSTANCE_ID constant. Worth a constant here too for consistency.
| @@ -1600,12 +1600,6 @@ protected StreamsConfig(final Map<?, ?> props, | |||
|
|
|||
| private void verifyStreamsProtocolCompatibility(final boolean doLog) { | |||
| if (doLog && isStreamsProtocolEnabled()) { | |||
There was a problem hiding this comment.
Now that group.instance.id is accepted with the streams protocol, are there docs that still state static membership isn't supported here? Worth checking the Streams upgrade/config docs so they don't contradict this.
There was a problem hiding this comment.
You're right.
There is indeed an issue there as well. I'll address that part too and include it in this PR.
|
|
||
| @ParameterizedTest | ||
| @EnumSource(value = CloseOptions.GroupMembershipOperation.class, names = {"DEFAULT", "REMAIN_IN_GROUP"}) | ||
| public void testPollOnCloseWhenStaticMemberIsLeaving(final CloseOptions.GroupMembershipOperation operation) { |
There was a problem hiding this comment.
Coverage here is unit-level only. Is there an integration/system test exercising static-member close under group.protocol=streams, or is that expected to come separately?
There was a problem hiding this comment.
I was planning to cover this with a Ducktape test.
I'll take a look at the existing integration tests and check whether this can be covered by adding a new test case.
…eams rebalance protocol at Client Side.
5d2ace3 to
44d3eb3
Compare
|
@lucasbru |
lucasbru
left a comment
There was a problem hiding this comment.
Hey. Another round of reviews. Substantiative still. I'm a bit worried about 4.4 at the moment, as I think we are also lacking some testing from the current PRs.
For example consumer has test_fencing_static_consumer, but we don't cover this in the current PRs yet.
There is also testAsyncStaticMemberCloseWithLeaveGroupTriggersRebalance which we'd need to replicate for streams gorups
| heartbeatRequestState.reset(); | ||
| break; | ||
|
|
||
| case UNRELEASED_INSTANCE_ID: |
There was a problem hiding this comment.
Nit: the body here is byte-for-byte identical to the INVALID_REQUEST / GROUP_MAX_SIZE_REACHED / STREAMS_INVALID_TOPOLOGY / STREAMS_INVALID_TOPOLOGY_EPOCH / STREAMS_TOPOLOGY_FENCED fall-through group (same logger.error call + handleFatalFailure(error.exception(errorMessage))). UNSUPPORTED_VERSION is standalone because it substitutes UNSUPPORTED_VERSION_ERROR_MESSAGE. Could UNRELEASED_INSTANCE_ID just be added as another label in the fall-through group?
| handleFatalFailure(error.exception(errorMessage)); | ||
| break; | ||
|
|
||
| case UNSUPPORTED_VERSION: |
There was a problem hiding this comment.
Now that static membership is enabled, FENCED_INSTANCE_ID becomes reachable — GroupMetadataManager has a throwIfInstanceIdIsFenced overload for Streams. It currently falls to the default branch with a generic 'unexpected error FENCED_INSTANCE_ID' log. ConsumerHeartbeatRequestManager has an explicit case with the message 'This is expected in the case that the member was removed from the group by an admin client, and another member joined using the same group instance id.' Worth adding the same here so operators get an actionable message.
| streams.close(CloseOptions.groupMembershipOperation(CloseOptions.GroupMembershipOperation.LEAVE_GROUP) | ||
| .withTimeout(Duration.ofSeconds(30))); | ||
|
|
||
| waitForEmptyStreamGroup( |
There was a problem hiding this comment.
Passing timeoutMs=0 to waitForEmptyStreamGroup means a single-shot check with no retry window: TestUtils.retryOnExceptionWithTimeout sets expectedEnd=now, calls the condition once, and if not met throws immediately. streams.close() returning doesn't guarantee the coordinator has processed the leave heartbeat yet. Under load this will be flaky. Same issue exists in testCloseOptionsLeaveGroupStreamsProtocol and testCloseOptionsDefaultStreamsProtocol in this class.
Should pass DEFAULT_MAX_WAIT_MS or a reasonable explicit timeout here.
| @ParameterizedTest | ||
| @MethodSource("staticMemberLeaveOnCloseOperations") | ||
| public void testStaticMemberUsesExpectedLeaveEpochOnClose( | ||
| final CloseOptions.GroupMembershipOperation operation, |
There was a problem hiding this comment.
localMetrics is never closed. Metrics implements Closeable. The test could close it in a try-finally or just inline the construction into the StreamsMembershipManager constructor call like the other static-member tests in this class do.
Summary
This PR adds the remaining client-side support for static membership
when Kafka Streams uses the streams rebalance protocol
(
group.protocol=streams) introduced by KIP-1071.The change allows
group.instance.idto be used with the streamsprotocol, sends the proper static-member leave epoch when a static
Streams member closes with
DEFAULTorREMAIN_IN_GROUP, and treatsUNRELEASED_INSTANCE_IDas a known fatal heartbeat error.Some close-related changes that were part of my previous full PR
#21603 are intentionally not
included here because they are already covered by
#21579, which introduced
CloseOptions.DEFAULTfor Kafka Streams.Changes
group.protocol=streams.UNRELEASED_INSTANCE_IDexplicitly inStreamsGroupHeartbeatRequestManager.DEFAULTandREMAIN_IN_GROUPuseLEAVE_GROUP_STATIC_MEMBER_EPOCH.LEAVE_GROUPusesLEAVE_GROUP_MEMBER_EPOCH.pollOnClosesends the static leave epoch and instance id.group.instance.idis allowedfor unprefixed,
consumer., andmain.consumer.configurations.Reviewers: Lucas Brutschy lbrutschy@confluent.io