Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.internals.AppInfoParser;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.Values;
import org.apache.kafka.connect.header.Headers;
import org.apache.kafka.connect.transforms.util.SimpleConfig;

import java.nio.charset.StandardCharsets;
import java.util.Map;

import static org.apache.kafka.common.config.ConfigDef.NO_DEFAULT_VALUE;
Expand All @@ -35,6 +37,21 @@ public class InsertHeader<R extends ConnectRecord<R>> implements Transformation<

public static final String HEADER_FIELD = "header";
public static final String VALUE_LITERAL_FIELD = "value.literal";
public static final String VALUE_TYPE_FIELD = "value.type";

private static final String TYPE_INT8 = "int8";
private static final String TYPE_INT16 = "int16";
private static final String TYPE_INT32 = "int32";
private static final String TYPE_INT64 = "int64";
private static final String TYPE_FLOAT32 = "float32";
private static final String TYPE_FLOAT64 = "float64";
private static final String TYPE_BOOLEAN = "boolean";
private static final String TYPE_STRING = "string";
private static final String TYPE_BYTES = "bytes";

private static final ConfigDef.ValidString VALID_TYPES = ConfigDef.ValidString.in(
TYPE_INT8, TYPE_INT16, TYPE_INT32, TYPE_INT64, TYPE_FLOAT32, TYPE_FLOAT64,
TYPE_BOOLEAN, TYPE_STRING, TYPE_BYTES);

public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(HEADER_FIELD, ConfigDef.Type.STRING,
Expand All @@ -44,7 +61,18 @@ public class InsertHeader<R extends ConnectRecord<R>> implements Transformation<
.define(VALUE_LITERAL_FIELD, ConfigDef.Type.STRING,
NO_DEFAULT_VALUE, new ConfigDef.NonNullValidator(),
ConfigDef.Importance.HIGH,
"The literal value that is to be set as the header value on all records.");
"The literal value that is to be set as the header value on all records.")
.define(VALUE_TYPE_FIELD, ConfigDef.Type.STRING,
null, ConfigDef.LambdaValidator.with(
(name, value) -> {
if (value != null) {
VALID_TYPES.ensureValid(name, value);
}
},
VALID_TYPES::toString),
ConfigDef.Importance.MEDIUM,
"The schema type for the header value. Valid types are int8, int16, int32, int64, float32, "
+ "float64, boolean, string, and bytes.");

private String header;

Expand Down Expand Up @@ -78,6 +106,40 @@ public void close() {
public void configure(Map<String, ?> props) {
final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
header = config.getString(HEADER_FIELD);
literalValue = Values.parseString(config.getString(VALUE_LITERAL_FIELD));
String valueLiteral = config.getString(VALUE_LITERAL_FIELD);
String valueType = config.getString(VALUE_TYPE_FIELD);
literalValue = valueType == null ? Values.parseString(valueLiteral) : typeAndValue(valueType, valueLiteral);
}

private static SchemaAndValue typeAndValue(String valueType, String valueLiteral) {
Schema schema = typeToSchema(valueType);
Object value = switch (schema.type()) {
case INT8 -> Values.convertToByte(null, valueLiteral);
case INT16 -> Values.convertToShort(null, valueLiteral);
case INT32 -> Values.convertToInteger(null, valueLiteral);
case INT64 -> Values.convertToLong(null, valueLiteral);
case FLOAT32 -> Values.convertToFloat(null, valueLiteral);
case FLOAT64 -> Values.convertToDouble(null, valueLiteral);
case BOOLEAN -> Values.convertToBoolean(null, valueLiteral);
case STRING -> Values.convertToString(null, valueLiteral);
case BYTES -> valueLiteral.getBytes(StandardCharsets.UTF_8);
default -> throw new IllegalArgumentException("Unsupported header value type: " + valueType);
};
return new SchemaAndValue(schema, value);
}

private static Schema typeToSchema(String valueType) {
return switch (valueType) {
case TYPE_INT8 -> Schema.INT8_SCHEMA;
case TYPE_INT16 -> Schema.INT16_SCHEMA;
case TYPE_INT32 -> Schema.INT32_SCHEMA;
case TYPE_INT64 -> Schema.INT64_SCHEMA;
case TYPE_FLOAT32 -> Schema.FLOAT32_SCHEMA;
case TYPE_FLOAT64 -> Schema.FLOAT64_SCHEMA;
case TYPE_BOOLEAN -> Schema.BOOLEAN_SCHEMA;
case TYPE_STRING -> Schema.STRING_SCHEMA;
case TYPE_BYTES -> Schema.BYTES_SCHEMA;
default -> throw new IllegalArgumentException("Unsupported header value type: " + valueType);
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,17 @@
import org.apache.kafka.common.utils.internals.AppInfoParser;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.header.ConnectHeaders;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.header.Headers;
import org.apache.kafka.connect.source.SourceRecord;

import org.junit.jupiter.api.Test;

import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

Expand All @@ -36,9 +39,16 @@ public class InsertHeaderTest {
private final InsertHeader<SourceRecord> xform = new InsertHeader<>();

private Map<String, ?> config(String header, String valueLiteral) {
return config(header, valueLiteral, null);
}

private Map<String, ?> config(String header, String valueLiteral, String valueType) {
Map<String, String> result = new HashMap<>();
result.put(InsertHeader.HEADER_FIELD, header);
result.put(InsertHeader.VALUE_LITERAL_FIELD, valueLiteral);
if (valueType != null) {
result.put(InsertHeader.VALUE_TYPE_FIELD, valueType);
}
return result;
}

Expand Down Expand Up @@ -81,6 +91,20 @@ public void insertionWithByteHeader() {
assertEquals(expect, xformed.headers());
}

@Test
public void insertionWithExplicitTypes() {
assertInsertionWithExplicitType("int8", "1", Schema.INT8_SCHEMA, (byte) 1);
assertInsertionWithExplicitType("int16", "2", Schema.INT16_SCHEMA, (short) 2);
assertInsertionWithExplicitType("int32", "3", Schema.INT32_SCHEMA, 3);
assertInsertionWithExplicitType("int64", "4", Schema.INT64_SCHEMA, 4L);
assertInsertionWithExplicitType("float32", "1.5", Schema.FLOAT32_SCHEMA, 1.5f);
assertInsertionWithExplicitType("float64", "2.5", Schema.FLOAT64_SCHEMA, 2.5d);
assertInsertionWithExplicitType("boolean", "true", Schema.BOOLEAN_SCHEMA, true);
assertInsertionWithExplicitType("string", "1", Schema.STRING_SCHEMA, "1");
assertInsertionWithExplicitType("bytes", "bytes-value", Schema.BYTES_SCHEMA,
"bytes-value".getBytes(StandardCharsets.UTF_8));
}

@Test
public void configRejectsNullHeaderKey() {
assertThrows(ConfigException.class, () -> xform.configure(config(null, "1")));
Expand All @@ -91,6 +115,31 @@ public void configRejectsNullHeaderValue() {
assertThrows(ConfigException.class, () -> xform.configure(config("inserted", null)));
}

@Test
public void configRejectsInvalidValueType() {
assertThrows(ConfigException.class, () -> xform.configure(config("inserted", "1", "invalid")));
}

private void assertInsertionWithExplicitType(String valueType, String valueLiteral, Schema schema,
Object expectedValue) {
xform.configure(config("inserted", valueLiteral, valueType));
ConnectHeaders headers = new ConnectHeaders();
headers.addString("existing", "existing-value");

SourceRecord original = sourceRecord(headers);
SourceRecord xformed = xform.apply(original);
assertNonHeaders(original, xformed);
assertEquals("existing-value", xformed.headers().lastWithName("existing").value());

Header inserted = xformed.headers().lastWithName("inserted");
assertEquals(schema, inserted.schema());
if (expectedValue instanceof byte[] expectedBytes) {
assertArrayEquals(expectedBytes, (byte[]) inserted.value());
} else {
assertEquals(expectedValue, inserted.value());
}
}

private void assertNonHeaders(SourceRecord original, SourceRecord xformed) {
assertEquals(original.sourcePartition(), xformed.sourcePartition());
assertEquals(original.sourceOffset(), xformed.sourceOffset());
Expand Down