Skip to content

Commit a2370b5

Browse files
authored
SOLR-18077: Fix incorrect request reuse, fix unit tests. (#4396)
1 parent 60cab92 commit a2370b5

4 files changed

Lines changed: 188 additions & 18 deletions

File tree

solr/cross-dc-manager/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ dependencies {
3535
implementation libs.opentelemetry.sdk.metrics
3636
implementation libs.eclipse.jetty.server
3737
implementation libs.eclipse.jetty.ee10.servlet
38+
implementation libs.google.guava
3839
implementation libs.jakarta.servlet.api
3940
implementation libs.slf4j.api
4041
runtimeOnly libs.google.protobuf.javautils

solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumer.java

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,8 @@
7575
public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
7676
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
7777

78+
public static final String PROP_TOPIC_DEBUG = "solr.crossdc.consumer.topic.debug";
79+
7880
private final KafkaConsumer<String, MirroredSolrRequest<?>> kafkaConsumer;
7981
private final AdminClient adminClient;
8082
private final CountDownLatch startLatch;
@@ -101,6 +103,8 @@ public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
101103

102104
private volatile boolean running = false;
103105

106+
private boolean topicDebug = Boolean.parseBoolean(System.getProperty(PROP_TOPIC_DEBUG, "false"));
107+
104108
/**
105109
* Supplier for creating and managing a working CloudSolrClient instance. This class ensures that
106110
* the CloudSolrClient instance doesn't try to use its {@link
@@ -175,6 +179,7 @@ public KafkaCrossDcConsumer(
175179
conf.get(CrossDcConf.COLLAPSE_UPDATES), CrossDcConf.CollapseUpdates.PARTIAL);
176180
this.maxCollapseRecords = conf.getInt(KafkaCrossDcConf.MAX_COLLAPSE_RECORDS);
177181
this.startLatch = startLatch;
182+
178183
final Properties kafkaConsumerProps = new Properties();
179184

180185
kafkaConsumerProps.put(
@@ -375,6 +380,9 @@ boolean pollAndProcessRequests() {
375380
ConsumerRecord<String, MirroredSolrRequest<?>> lastRecord = null;
376381

377382
for (TopicPartition partition : records.partitions()) {
383+
if (log.isTraceEnabled()) {
384+
log.trace("Checking partition {}", partition.partition());
385+
}
378386
List<ConsumerRecord<String, MirroredSolrRequest<?>>> partitionRecords =
379387
records.records(partition);
380388

@@ -396,19 +404,31 @@ boolean pollAndProcessRequests() {
396404

397405
metrics.incrementInputMsgCounter();
398406
lastRecord = requestRecord;
399-
MirroredSolrRequest<?> req = requestRecord.value();
400-
SolrRequest<?> solrReq = req.getSolrRequest();
401-
MirroredSolrRequest.Type type = req.getType();
407+
final MirroredSolrRequest<?> req = requestRecord.value();
408+
final SolrRequest<?> solrReq = req.getSolrRequest();
409+
final MirroredSolrRequest.Type type = req.getType();
402410

403411
if (type != MirroredSolrRequest.Type.UPDATE) {
404412
String action = solrReq.getParams().get("action", "unknown");
405413
metrics.incrementInputReqCounter(type.name(), action);
406414
}
407415

408-
ModifiableSolrParams params = new ModifiableSolrParams(solrReq.getParams());
416+
final ModifiableSolrParams params = new ModifiableSolrParams(solrReq.getParams());
409417
if (log.isTraceEnabled()) {
410418
log.trace("-- picked type={}, params={}", req.getType(), params);
411419
}
420+
if (topicDebug) {
421+
solrReq.addHeader("topic.debug", "true");
422+
solrReq.addHeader("record.topic", requestRecord.topic());
423+
solrReq.addHeader("record.partition", String.valueOf(requestRecord.partition()));
424+
solrReq.addHeader("record.offset", String.valueOf(requestRecord.offset()));
425+
solrReq.addHeader("record.timestamp", String.valueOf(requestRecord.timestamp()));
426+
solrReq.addHeader("record.key", requestRecord.key());
427+
solrReq.addHeader("workUnit.nextOffset", String.valueOf(workUnit.nextOffset));
428+
solrReq.addHeader("workUnit.partition", String.valueOf(workUnit.partition));
429+
solrReq.addHeader("workUnit.topic", workUnit.topic);
430+
solrReq.addHeader("workUnit.items", String.valueOf(workUnit.workItems.size()));
431+
}
412432

413433
// determine if it's an UPDATE with deletes, or if the existing batch has deletes
414434
boolean hasDeletes = false;
@@ -450,6 +470,7 @@ boolean pollAndProcessRequests() {
450470
if (updateReqBatch == null) {
451471
// just initialize
452472
updateReqBatch = new UpdateRequest();
473+
updateReqBatch.addHeaders(solrReq.getHeaders());
453474
} else {
454475
if (collapseUpdates == CrossDcConf.CollapseUpdates.NONE) {
455476
throw new RuntimeException("Can't collapse requests.");
@@ -490,6 +511,7 @@ boolean pollAndProcessRequests() {
490511

491512
if (updateReqBatch != null) {
492513
sendBatch(updateReqBatch, MirroredSolrRequest.Type.UPDATE, lastRecord, workUnit);
514+
updateReqBatch = null;
493515
}
494516
try {
495517
partitionManager.checkForOffsetUpdates(partition);

solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/PartitionManager.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
package org.apache.solr.crossdc.manager.consumer;
1818

19+
import com.google.common.annotations.VisibleForTesting;
1920
import java.lang.invoke.MethodHandles;
2021
import java.util.ArrayDeque;
2122
import java.util.HashSet;
@@ -44,13 +45,16 @@ static class PartitionWork {
4445
final Queue<WorkUnit> partitionQueue = new ArrayDeque<>();
4546
}
4647

47-
static class WorkUnit {
48-
final TopicPartition partition;
49-
Set<Future<?>> workItems = new HashSet<>();
48+
@VisibleForTesting
49+
public static class WorkUnit {
50+
final int partition;
51+
final String topic;
52+
final Set<Future<?>> workItems = new HashSet<>();
5053
long nextOffset;
5154

52-
public WorkUnit(TopicPartition partition) {
53-
this.partition = partition;
55+
WorkUnit(TopicPartition partition) {
56+
this.partition = partition.partition();
57+
this.topic = partition.topic();
5458
}
5559
}
5660

solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/SolrAndKafkaIntegrationTest.java

Lines changed: 152 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,12 @@
1616
*/
1717
package org.apache.solr.crossdc.manager;
1818

19+
import static org.apache.solr.crossdc.common.CrossDcConf.COLLAPSE_UPDATES;
20+
import static org.apache.solr.crossdc.common.KafkaCrossDcConf.BATCH_SIZE_BYTES;
21+
import static org.apache.solr.crossdc.common.KafkaCrossDcConf.BOOTSTRAP_SERVERS;
1922
import static org.apache.solr.crossdc.common.KafkaCrossDcConf.DEFAULT_MAX_REQUEST_SIZE;
2023
import static org.apache.solr.crossdc.common.KafkaCrossDcConf.INDEX_UNMIRRORABLE_DOCS;
24+
import static org.apache.solr.crossdc.common.KafkaCrossDcConf.TOPIC_NAME;
2125

2226
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
2327
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakLingering;
@@ -28,14 +32,18 @@
2832
import java.util.Collections;
2933
import java.util.Date;
3034
import java.util.HashMap;
35+
import java.util.HashSet;
3136
import java.util.List;
3237
import java.util.Locale;
3338
import java.util.Map;
3439
import java.util.Properties;
40+
import java.util.Set;
41+
import java.util.concurrent.CountDownLatch;
3542
import java.util.concurrent.ExecutorService;
3643
import java.util.concurrent.Future;
3744
import java.util.stream.IntStream;
3845
import org.apache.commons.io.IOUtils;
46+
import org.apache.kafka.clients.consumer.ConsumerRecord;
3947
import org.apache.kafka.clients.producer.KafkaProducer;
4048
import org.apache.kafka.clients.producer.Producer;
4149
import org.apache.kafka.clients.producer.ProducerRecord;
@@ -45,6 +53,7 @@
4553
import org.apache.solr.SolrIgnoredThreadsFilter;
4654
import org.apache.solr.client.solrj.SolrClient;
4755
import org.apache.solr.client.solrj.SolrRequest;
56+
import org.apache.solr.client.solrj.SolrResponse;
4857
import org.apache.solr.client.solrj.impl.CloudSolrClient;
4958
import org.apache.solr.client.solrj.jetty.HttpJettySolrClient;
5059
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
@@ -56,14 +65,19 @@
5665
import org.apache.solr.cloud.MiniSolrCloudCluster;
5766
import org.apache.solr.cloud.SolrCloudTestCase;
5867
import org.apache.solr.common.SolrInputDocument;
68+
import org.apache.solr.common.params.CommonParams;
5969
import org.apache.solr.common.util.ExecutorUtil;
6070
import org.apache.solr.common.util.NamedList;
6171
import org.apache.solr.common.util.ObjectReleaseTracker;
6272
import org.apache.solr.common.util.SolrNamedThreadFactory;
73+
import org.apache.solr.common.util.Utils;
6374
import org.apache.solr.crossdc.common.KafkaCrossDcConf;
6475
import org.apache.solr.crossdc.common.MirroredSolrRequest;
6576
import org.apache.solr.crossdc.common.MirroredSolrRequestSerializer;
6677
import org.apache.solr.crossdc.manager.consumer.Consumer;
78+
import org.apache.solr.crossdc.manager.consumer.ConsumerMetrics;
79+
import org.apache.solr.crossdc.manager.consumer.KafkaCrossDcConsumer;
80+
import org.apache.solr.crossdc.manager.consumer.PartitionManager;
6781
import org.apache.solr.util.SolrKafkaTestsIgnoredThreadsFilter;
6882
import org.junit.After;
6983
import org.junit.Before;
@@ -90,11 +104,64 @@ public class SolrAndKafkaIntegrationTest extends SolrCloudTestCase {
90104
private static final int NUM_BROKERS = 1;
91105
public EmbeddedKafkaCluster kafkaCluster;
92106

107+
private static class ConsumerBatch {
108+
final String kafkaTopic;
109+
final int partitionId;
110+
final MirroredSolrRequest.Type type;
111+
final String collection;
112+
final Map<String, String> headers;
113+
final Set<String> addIds = new HashSet<>();
114+
final String json;
115+
116+
public ConsumerBatch(final MirroredSolrRequest.Type type, final SolrRequest<?> solrRequest) {
117+
this.kafkaTopic = solrRequest.getHeaders().get("record.topic");
118+
this.partitionId = Integer.parseInt(solrRequest.getHeaders().get("record.partition"));
119+
this.type = type;
120+
this.collection = solrRequest.getCollection();
121+
this.headers = solrRequest.getHeaders();
122+
if (solrRequest instanceof UpdateRequest) {
123+
UpdateRequest updateReq = (UpdateRequest) solrRequest;
124+
json =
125+
Utils.toJSONString(
126+
Map.of("params", updateReq.getParams(), "add", updateReq.getDocuments()));
127+
updateReq.getDocuments().forEach(doc -> addIds.add(doc.getFieldValue("id").toString()));
128+
} else {
129+
json =
130+
Utils.toJSONString(
131+
Map.of("params", solrRequest.getParams(), "class", solrRequest.getClass()));
132+
}
133+
}
134+
135+
@Override
136+
public String toString() {
137+
return "ConsumerBatch{"
138+
+ "kafkaTopic='"
139+
+ kafkaTopic
140+
+ '\''
141+
+ ", partitionId="
142+
+ partitionId
143+
+ ", type="
144+
+ type
145+
+ ", collection='"
146+
+ collection
147+
+ '\''
148+
+ ", headers="
149+
+ headers
150+
+ '\''
151+
+ ", json='"
152+
+ json
153+
+ '\''
154+
+ '}';
155+
}
156+
}
157+
93158
protected volatile MiniSolrCloudCluster solrCluster1;
94159
protected volatile MiniSolrCloudCluster solrCluster2;
95160

96161
protected volatile Consumer consumer;
97162

163+
private List<ConsumerBatch> consumerBatches;
164+
98165
private static final String TOPIC = "topic1";
99166

100167
private static final String COLLECTION = "collection1";
@@ -112,7 +179,28 @@ public void beforeSolrAndKafkaIntegrationTest() throws Exception {
112179
Thread.setDefaultUncaughtExceptionHandler(
113180
(t, e) -> log.error("Uncaught exception in thread {}", t, e));
114181
System.setProperty("otel.metrics.exporter", "prometheus");
115-
consumer = new Consumer();
182+
System.setProperty(KafkaCrossDcConsumer.PROP_TOPIC_DEBUG, "true");
183+
consumerBatches = new ArrayList<>();
184+
consumer =
185+
new Consumer() {
186+
@Override
187+
protected CrossDcConsumer getCrossDcConsumer(
188+
final KafkaCrossDcConf conf,
189+
final ConsumerMetrics metrics,
190+
final CountDownLatch startLatch) {
191+
return new KafkaCrossDcConsumer(conf, metrics, startLatch) {
192+
@Override
193+
public void sendBatch(
194+
final SolrRequest<? extends SolrResponse> solrReqBatch,
195+
final MirroredSolrRequest.Type type,
196+
final ConsumerRecord<String, MirroredSolrRequest<?>> lastRecord,
197+
final PartitionManager.WorkUnit workUnit) {
198+
consumerBatches.add(new ConsumerBatch(type, solrReqBatch));
199+
super.sendBatch(solrReqBatch, type, lastRecord, workUnit);
200+
}
201+
};
202+
}
203+
};
116204
Properties config = new Properties();
117205

118206
kafkaCluster =
@@ -124,13 +212,15 @@ public String bootstrapServers() {
124212
};
125213
kafkaCluster.start();
126214

127-
kafkaCluster.createTopic(TOPIC, 10, 1);
215+
// create many partitions to test for re-ordered reads
216+
kafkaCluster.createTopic(TOPIC, 3, 1);
128217

129218
// ensure small batches to test multi-partition ordering
130-
System.setProperty("batchSizeBytes", "128");
131-
System.setProperty("solr.crossdc.topicName", TOPIC);
132-
System.setProperty("solr.crossdc.bootstrapServers", kafkaCluster.bootstrapServers());
219+
System.setProperty(BATCH_SIZE_BYTES, "100");
220+
System.setProperty(TOPIC_NAME, TOPIC);
221+
System.setProperty(BOOTSTRAP_SERVERS, kafkaCluster.bootstrapServers());
133222
System.setProperty(INDEX_UNMIRRORABLE_DOCS, "false");
223+
System.setProperty(COLLAPSE_UPDATES, "none");
134224

135225
solrCluster1 =
136226
configureCluster(1).addConfig("conf", getFile("configs/cloud-minimal/conf")).configure();
@@ -238,10 +328,62 @@ public void testProducerToCloud() throws Exception {
238328
"Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.";
239329

240330
@Test
241-
@Ignore("SOLR-18077")
331+
public void testPartitioning() throws Exception {
332+
CollectionAdminRequest.Create create =
333+
CollectionAdminRequest.createCollection(ALT_COLLECTION, "conf", 1, 1);
334+
create.process(solrCluster1.getSolrClient());
335+
create.process(solrCluster2.getSolrClient());
336+
solrCluster1.waitForActiveCollection(ALT_COLLECTION, 1, 1);
337+
solrCluster2.waitForActiveCollection(ALT_COLLECTION, 1, 1);
338+
339+
CloudSolrClient client = solrCluster1.getSolrClient();
340+
int NUM_DOCS = 200;
341+
for (int i = 0; i < NUM_DOCS; i++) {
342+
SolrInputDocument doc = new SolrInputDocument();
343+
doc.addField("id", "id-" + i);
344+
doc.addField("id_i", i);
345+
doc.addField("text", "some test with a relatively long field. " + LOREM_IPSUM);
346+
doc.addField("collection_t", COLLECTION);
347+
348+
client.add(COLLECTION, doc);
349+
350+
doc = new SolrInputDocument();
351+
doc.addField("id", "id-" + i);
352+
doc.addField("id_i", i);
353+
doc.addField("text", "some test with a relatively long field. " + LOREM_IPSUM);
354+
doc.addField("collection_t", ALT_COLLECTION);
355+
356+
client.add(ALT_COLLECTION, doc);
357+
}
358+
client.commit(COLLECTION);
359+
client.commit(ALT_COLLECTION);
360+
// check that updates to different collections were always sent to the same partition
361+
Map<Integer, String> partitionsPerCol = new HashMap<>();
362+
Map<String, Set<String>> docsPerCol = new HashMap<>();
363+
for (ConsumerBatch batch : consumerBatches) {
364+
String collection =
365+
partitionsPerCol.computeIfAbsent(batch.partitionId, k -> batch.collection);
366+
docsPerCol.computeIfAbsent(collection, col -> new HashSet<>()).addAll(batch.addIds);
367+
assertEquals(
368+
"request in partition "
369+
+ batch.partitionId
370+
+ " has wrong collection "
371+
+ batch.collection
372+
+ ": "
373+
+ batch
374+
+ "\npartitions: "
375+
+ partitionsPerCol,
376+
collection,
377+
batch.collection);
378+
}
379+
docsPerCol.forEach(
380+
(col, ids) -> assertEquals("incorrect count in collection " + col, NUM_DOCS, ids.size()));
381+
}
382+
383+
@Test
242384
public void testStrictOrdering() throws Exception {
243385
CloudSolrClient client = solrCluster1.getSolrClient();
244-
int NUM_DOCS = 5000;
386+
int NUM_DOCS = 1000;
245387
// delay deletes by this many docs
246388
int DELTA = 100;
247389
for (int i = 0; i < NUM_DOCS; i++) {
@@ -454,11 +596,12 @@ private void assertClusterEventuallyHasDocs(
454596
boolean foundUpdates = false;
455597
for (int i = 0; i < 100; i++) {
456598
client.commit(collection);
457-
results = client.query(collection, new SolrQuery(query));
599+
results =
600+
client.query(collection, new SolrQuery(CommonParams.Q, query, CommonParams.FL, "*"));
458601
if (results.getResults().getNumFound() == expectedNumDocs) {
459602
foundUpdates = true;
460603
} else {
461-
Thread.sleep(200);
604+
Thread.sleep(300);
462605
}
463606
}
464607

0 commit comments

Comments
 (0)