We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
1 parent 5cbb6be commit 065c5bfCopy full SHA for 065c5bf
1 file changed
netpalm/backend/core/executor/executor.py
@@ -85,14 +85,8 @@ async def run(self) -> None:
85
event_topics = self._event_registry.get_topics()
86
all_topics = list(set(job_topics + event_topics))
87
88
- # Discover existing pinned topics from Kafka metadata
89
- await self._consumer.start()
90
- cluster_topics = self._consumer.topics()
91
- pinned_prefix = self._settings.kafka_pinned_topic_prefix + "."
92
- pinned_topics = [t for t in cluster_topics if t.startswith(pinned_prefix)]
93
- all_topics = list(set(all_topics + pinned_topics))
94
-
95
self._consumer.subscribe(all_topics)
+ await self._consumer.start()
96
await self._producer.start()
97
log.info(f"NetpalmExecutor: subscribed to {all_topics}")
98
0 commit comments