|
9 | 9 |
|
10 | 10 | import concurrent.futures |
11 | 11 | import math |
| 12 | +import threading |
12 | 13 | import time |
13 | 14 | from collections import Counter |
14 | 15 | from typing import Any, Callable |
|
34 | 35 | from rptest.services.redpanda import ( |
35 | 36 | RESTART_LOG_ALLOW_LIST, |
36 | 37 | LoggingConfig, |
| 38 | + MetricsEndpoint, |
37 | 39 | SISettings, |
38 | 40 | ) |
39 | 41 | from rptest.services.rpk_consumer import RpkConsumer |
@@ -514,6 +516,70 @@ def _tiered_storage_warmup(self, scale: ScaleParameters, topic_name: str): |
514 | 516 | str(scale.local_retention_after_warmup), |
515 | 517 | ) |
516 | 518 |
|
| 519 | + def _run_cloud_io_lane_sampler( |
| 520 | + self, stop_event: threading.Event, interval_sec: int = 20 |
| 521 | + ): |
| 522 | + """Periodically log per-lane cloud_io scheduler occupancy during a |
| 523 | + consume (CORE-15812). A cloud-topic fetch read runs in the |
| 524 | + `consumer_fetch` lane; when it is starved of the shared client pool its |
| 525 | + `waiters` pile up while `in_flight` sits at the reserved floor -- the |
| 526 | + read-side stall behind the ManyPartitions cloud_topics consumer hang. |
| 527 | + All lanes are logged so contention is attributable to producer_upload / |
| 528 | + default_group. Reads the public metrics endpoint (internal metrics are |
| 529 | + disabled at high partition counts, so the internal endpoint is empty) in |
| 530 | + a single scrape pass per tick; self-disables when the scheduler metrics |
| 531 | + are absent (e.g. non cloud-topic runs).""" |
| 532 | + lanes = ("consumer_fetch", "producer_upload", "default_group") |
| 533 | + # in_flight + waiters are the public per-lane gauges; the admit counters |
| 534 | + # are internal-only (and internal metrics are off at scale). |
| 535 | + per_lane_fields = ("in_flight", "waiters") |
| 536 | + agg_fields = ("available_slots", "total_capacity") |
| 537 | + patterns = [f"cloud_io_scheduler_{f}" for f in (*per_lane_fields, *agg_fields)] |
| 538 | + announced = False |
| 539 | + while not stop_event.is_set(): |
| 540 | + try: |
| 541 | + result = self.redpanda.metrics_samples( |
| 542 | + sample_patterns=patterns, |
| 543 | + metrics_endpoint=MetricsEndpoint.PUBLIC_METRICS, |
| 544 | + ) |
| 545 | + if not result: |
| 546 | + if not announced: |
| 547 | + self.logger.info( |
| 548 | + "cloud_io_scheduler metrics absent; lane sampler disabled" |
| 549 | + ) |
| 550 | + return |
| 551 | + |
| 552 | + def lane_sum(field: str, lane: str) -> int: |
| 553 | + ms = result.get(f"cloud_io_scheduler_{field}") |
| 554 | + if ms is None: |
| 555 | + return 0 |
| 556 | + return int( |
| 557 | + sum( |
| 558 | + s.value |
| 559 | + for s in ms.samples |
| 560 | + if s.labels.get("group_id") == lane |
| 561 | + ) |
| 562 | + ) |
| 563 | + |
| 564 | + def total(field: str) -> int: |
| 565 | + ms = result.get(f"cloud_io_scheduler_{field}") |
| 566 | + return int(sum(s.value for s in ms.samples)) if ms else 0 |
| 567 | + |
| 568 | + parts = [ |
| 569 | + f"{lane}[inflight={lane_sum('in_flight', lane)} " |
| 570 | + f"waiters={lane_sum('waiters', lane)}]" |
| 571 | + for lane in lanes |
| 572 | + ] |
| 573 | + self.logger.info( |
| 574 | + f"cloud_io lanes (summed/shards): {' '.join(parts)} " |
| 575 | + f"available_slots={total('available_slots')} " |
| 576 | + f"total_capacity={total('total_capacity')}" |
| 577 | + ) |
| 578 | + announced = True |
| 579 | + except Exception as e: |
| 580 | + self.logger.warning(f"cloud_io lane sampler error: {e}") |
| 581 | + stop_event.wait(interval_sec) |
| 582 | + |
517 | 583 | def _write_and_random_read( |
518 | 584 | self, |
519 | 585 | scale: ScaleParameters, |
@@ -700,7 +766,21 @@ def _write_and_random_read( |
700 | 766 | ) |
701 | 767 | verifier.start(clean=False) |
702 | 768 |
|
703 | | - verifier.wait(timeout_sec=expect_transmit_time) |
| 769 | + # CORE-15812: sample per-lane cloud_io scheduler occupancy during the |
| 770 | + # consume so consumer_fetch read-lane starvation is visible in the |
| 771 | + # artifacts. Self-disables when the scheduler metrics are absent. |
| 772 | + lane_sampler_stop = threading.Event() |
| 773 | + lane_sampler = threading.Thread( |
| 774 | + target=self._run_cloud_io_lane_sampler, |
| 775 | + args=(lane_sampler_stop,), |
| 776 | + daemon=True, |
| 777 | + ) |
| 778 | + lane_sampler.start() |
| 779 | + try: |
| 780 | + verifier.wait(timeout_sec=expect_transmit_time) |
| 781 | + finally: |
| 782 | + lane_sampler_stop.set() |
| 783 | + lane_sampler.join(timeout=30) |
704 | 784 | for i, v in enumerate(verifier.consumers): |
705 | 785 | assert v.consumer_status.validator.invalid_reads == 0 |
706 | 786 | if not scale.tiered_storage_enabled: |
@@ -975,6 +1055,22 @@ def _test_many_partitions( |
975 | 1055 | ) |
976 | 1056 | self.redpanda.set_si_settings(cloud_si_settings) |
977 | 1057 |
|
| 1058 | + if cloud_topics_enabled: |
| 1059 | + # CORE-15812 experiment: cloud-topic leveling and compaction run in |
| 1060 | + # the cloud_io default_group lane and contend with consumer_fetch |
| 1061 | + # reads for the shared client pool. In build 85771 leveling did |
| 1062 | + # ~17.6k range merges during the consume (the heavy drain); |
| 1063 | + # compaction was enabled but near-idle on these delete-policy |
| 1064 | + # topics. Disable both to test whether freeing default_group I/O |
| 1065 | + # lets the consumer drain. Reconciliation stays on -- it is the |
| 1066 | + # L0->L1 data path. |
| 1067 | + self.redpanda.add_extra_rp_conf( |
| 1068 | + { |
| 1069 | + "cloud_topics_leveling_disabled": True, |
| 1070 | + "cloud_topics_compaction_disabled": True, |
| 1071 | + } |
| 1072 | + ) |
| 1073 | + |
978 | 1074 | # By default run with one huge topic for maximum metadata stress. It is |
979 | 1075 | # more stressful for redpanda when clients request the metadata for |
980 | 1076 | # many partitions at once, and the simplest way to get traffic |
|
0 commit comments