KAFKA-19465: feat(connect): add value.type config to InsertHeader SMT#22560
Open
wilmerdooley wants to merge 1 commit into
Open
KAFKA-19465: feat(connect): add value.type config to InsertHeader SMT#22560wilmerdooley wants to merge 1 commit into
wilmerdooley wants to merge 1 commit into
Conversation
Signed-off-by: wilmerdooley <wilmerdooley1@gmail.com>
Member
|
Thanks for the PR. This is adding new configurations. These are considered part of the public API so it requires a Kafka Improvement Proposal (KIP). You can see the process on https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals |
Author
|
Thanks for taking a look, and for the pointer to the process. That makes sense, the new |
|
A label of 'needs-attention' was automatically added to this PR in order to raise the |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
This PR adds a new
value.typeconfiguration option to theInsertHeaderSMT inconnect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertHeader.java. The option lets users specify the ConnectSchematype for the literal value being inserted as a header, supportingint8,int16,int32,int64,float32,float64,boolean,string, andbytes. Whenvalue.typeis omitted, the existing behavior of usingValues.parseStringis preserved, so the change is backward compatible.This addresses the underlying need behind KAFKA-10428 by allowing
InsertHeaderto produce schemas other thanSchema.STRING_SCHEMA(most notablySchema.BYTES_SCHEMA), which is required for aByteArrayheader.converter. Invalidvalue.typevalues are rejected at configure time with aConfigException.Testing strategy
Unit tests in
connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertHeaderTest.javacover the new behavior.insertionWithExplicitTypesexercises each supported type end to end throughapply, asserting that the produced header carries the expectedSchemaand the expected value (withassertArrayEqualsfor thebytescase). A separateconfigRejectsInvalidValueTypetest verifies that an unknown type is rejected withConfigException. The existing tests continue to pass because the default path (novalue.typeset) is unchanged.JIRA: https://issues.apache.org/jira/browse/KAFKA-19465