diff --git a/changelog/unreleased/SOLR-18094-zk-quorum-noderole.yml b/changelog/unreleased/SOLR-18094-zk-quorum-noderole.yml new file mode 100644 index 000000000000..08e8c319f3d6 --- /dev/null +++ b/changelog/unreleased/SOLR-18094-zk-quorum-noderole.yml @@ -0,0 +1,10 @@ +# See https://github.com/apache/solr/blob/main/dev-docs/changelog.adoc +title: Capability for Solr to run embedded ZooKeeper in a quorum/ensemble mode, allowing multiple Solr nodes to form a distributed ZooKeeper ensemble within their own processes. Controlled by a new solr node-role. +type: added # added, changed, fixed, deprecated, removed, dependency_update, security, other +authors: + - name: Eric Pugh + - name: Jason Gerlowski + - name: Jan Høydahl +links: + - name: SOLR-18094 + url: https://issues.apache.org/jira/browse/SOLR-18094 diff --git a/solr/core/src/java/org/apache/solr/cloud/SolrZkServer.java b/solr/core/src/java/org/apache/solr/cloud/SolrZkServer.java deleted file mode 100644 index cdd3ae90f861..000000000000 --- a/solr/core/src/java/org/apache/solr/cloud/SolrZkServer.java +++ /dev/null @@ -1,312 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.solr.cloud; - -import java.io.IOException; -import java.io.Reader; -import java.lang.invoke.MethodHandles; -import java.net.InetSocketAddress; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.Map; -import java.util.Properties; -import java.util.concurrent.atomic.AtomicReference; -import org.apache.solr.common.SolrException; -import org.apache.solr.common.util.EnvUtils; -import org.apache.solr.servlet.CoreContainerProvider; -import org.apache.solr.util.AddressUtils; -import org.apache.zookeeper.server.ServerConfig; -import org.apache.zookeeper.server.ZooKeeperServerMain; -import org.apache.zookeeper.server.quorum.QuorumPeer; -import org.apache.zookeeper.server.quorum.QuorumPeerConfig; -import org.apache.zookeeper.server.quorum.QuorumPeerMain; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class SolrZkServer { - private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - - public static final String ZK_WHITELIST_PROPERTY = "zookeeper.4lw.commands.whitelist"; - public static final String ZK_MAX_CNXNS_PROPERTY = "zookeeper.maxCnxns"; - // Per ZooKeeper, "0" means no limit for max client connections. - public static final String ZK_MAX_CNXNS_DEFAULT = "0"; - - boolean zkRun = false; - String zkHost; - - int solrPort; - Properties props; - SolrZkServerProps zkProps; - - private Thread zkThread; // the thread running a zookeeper server, only if zkRun is true - - private Path dataHome; // o.a.zookeeper.**.QuorumPeerConfig needs a File not a Path - private String confHome; - - public SolrZkServer(boolean zkRun, String zkHost, Path dataHome, String confHome, int solrPort) { - this.zkRun = zkRun; - this.zkHost = zkHost; - this.dataHome = dataHome; - this.confHome = confHome; - this.solrPort = solrPort; - } - - public String getClientString() { - if (zkHost != null) { - return zkHost; - } - - if (zkProps == null) { - return null; - } - - // if the string wasn't passed as zkHost, then use the standalone server we started - if (!zkRun) { - return null; - } - - InetSocketAddress addr = zkProps.getClientPortAddress(); - String hostName; - // We cannot advertise 0.0.0.0, so choose the best host to advertise - // (the same that the Solr Node defaults to) - if (addr.getAddress().isAnyLocalAddress()) { - hostName = AddressUtils.getHostToAdvertise(); - } else { - hostName = addr.getAddress().getHostAddress(); - } - return hostName + ":" + addr.getPort(); - } - - public void parseConfig() { - if (zkProps == null) { - zkProps = new SolrZkServerProps(); - // set default data dir - // TODO: use something based on IP+port??? support ensemble all from same solr home? - zkProps.setDataDir(dataHome); - zkProps.zkRun = zkRun; - zkProps.solrPort = Integer.toString(solrPort); - } - - var zooCfgPath = Path.of(confHome).resolve("zoo.cfg"); - if (!Files.exists(zooCfgPath)) { - log.info("Zookeeper configuration not found in {}, using built-in default", confHome); - String solrInstallDir = System.getProperty(CoreContainerProvider.SOLR_INSTALL_DIR); - if (solrInstallDir == null) { - throw new SolrException( - SolrException.ErrorCode.SERVER_ERROR, - "Could not find default zoo.cfg file due to missing " - + CoreContainerProvider.SOLR_INSTALL_DIR); - } - zooCfgPath = Path.of(solrInstallDir).resolve("server").resolve("solr").resolve("zoo.cfg"); - } - - try { - props = SolrZkServerProps.getProperties(zooCfgPath); - SolrZkServerProps.injectServers(props, zkRun, zkHost); - // This is the address that the embedded Zookeeper will bind to. Like Solr, it defaults to - // "127.0.0.1". - props.setProperty( - "clientPortAddress", EnvUtils.getProperty("solr.zookeeper.embedded.host", "127.0.0.1")); - if (props.getProperty("clientPort") == null) { - props.setProperty("clientPort", Integer.toString(solrPort + 1000)); - } - zkProps.parseProperties(props); - } catch (QuorumPeerConfig.ConfigException | IOException e) { - if (zkRun) { - throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e); - } - } - } - - public Map getServers() { - return zkProps.getServers(); - } - - public void start() { - if (!zkRun) { - return; - } - - ensureZkMaxCnxnsConfigured(); - if (System.getProperty(ZK_WHITELIST_PROPERTY) == null) { - System.setProperty(ZK_WHITELIST_PROPERTY, "ruok, mntr, conf"); - } - AtomicReference zkException = new AtomicReference<>(); - zkThread = - new Thread( - () -> { - try { - if (zkProps.getServers().size() > 1) { - QuorumPeerMain zkServer = new QuorumPeerMain(); - zkServer.runFromConfig(zkProps); - } else { - ServerConfig sc = new ServerConfig(); - sc.readFrom(zkProps); - ZooKeeperServerMain zkServer = new ZooKeeperServerMain(); - zkServer.runFromConfig(sc); - } - log.info("ZooKeeper Server exited."); - } catch (Exception e) { - log.error("ZooKeeper Server ERROR", e); - zkException.set(e); - } - }, - "embeddedZkServer"); - - if (zkProps.getServers().size() > 1) { - if (log.isInfoEnabled()) { - log.info( - "STARTING EMBEDDED ENSEMBLE ZOOKEEPER SERVER at port {}, listening on host {}", - zkProps.getClientPortAddress().getPort(), - zkProps.getClientPortAddress().getAddress().getHostAddress()); - } - } else { - if (log.isInfoEnabled()) { - log.info( - "STARTING EMBEDDED ENSEMBLE ZOOKEEPER SERVER at port {}, listening on host {}", - zkProps.getClientPortAddress().getPort(), - zkProps.getClientPortAddress().getAddress().getHostAddress()); - } - } - - zkThread.setDaemon(true); - zkThread.start(); - try { - // We don't have any way to hook into the ZK server object to check that it is running, so we - // just wait and hope - Thread.sleep(500); // pause for ZooKeeper to start - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new SolrException( - SolrException.ErrorCode.SERVER_ERROR, - "Interrupted while starting embedded zookeeper server", - e); - } - - if (zkException.get() != null) { - log.info("Embedded ZK dataHome={}", dataHome); - throw new SolrException( - SolrException.ErrorCode.SERVER_ERROR, - "Could not start embedded zookeeper server", - zkException.get()); - } - } - - public void stop() { - if (!zkRun) { - return; - } - zkThread.interrupt(); - } - - static void ensureZkMaxCnxnsConfigured() { - System.getProperties().putIfAbsent(ZK_MAX_CNXNS_PROPERTY, ZK_MAX_CNXNS_DEFAULT); - } -} - -// Allows us to set a default for the data dir before parsing -// zoo.cfg (which validates that there is a dataDir) -class SolrZkServerProps extends QuorumPeerConfig { - private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - - String solrPort; // port that Solr is listening on - boolean zkRun; - - /** - * Parse a ZooKeeper configuration file - * - * @param configPath the path of the configuration file - * @throws IllegalArgumentException if a config file does not exist at the given path - * @throws ConfigException error processing configuration - */ - public static Properties getProperties(Path configPath) throws ConfigException { - log.info("Reading configuration from: {}", configPath); - - if (!Files.exists(configPath)) { - throw new IllegalArgumentException(configPath + " file is missing"); - } - - try (Reader reader = Files.newBufferedReader(configPath)) { - Properties cfg = new Properties(); - cfg.load(reader); - return cfg; - } catch (IOException | IllegalArgumentException e) { - throw new ConfigException("Error processing " + configPath, e); - } - } - - // Adds server.x if they don't exist, based on zkHost if it does exist. - // Given zkHost=localhost:1111,localhost:2222 this will inject - // server.0=localhost:1112:1113 - // server.1=localhost:2223:2224 - public static void injectServers(Properties props, boolean zkRun, String zkHost) { - - // if clientPort not already set, use zkRun - if (zkRun && props.getProperty("clientPort") == null) { - // int portIdx = zkRun.lastIndexOf(':'); - int portIdx = "".lastIndexOf(':'); - if (portIdx > 0) { - // String portStr = zkRun.substring(portIdx + 1); - String portStr = "".substring(portIdx + 1); - props.setProperty("clientPort", portStr); - } - } - - boolean hasServers = hasServers(props); - - if (!hasServers && zkHost != null) { - int alg = Integer.parseInt(props.getProperty("electionAlg", "3").trim()); - String[] hosts = zkHost.split(","); - int serverNum = 0; - for (String hostAndPort : hosts) { - hostAndPort = hostAndPort.trim(); - int portIdx = hostAndPort.lastIndexOf(':'); - String clientPortStr = hostAndPort.substring(portIdx + 1); - int clientPort = Integer.parseInt(clientPortStr); - String host = hostAndPort.substring(0, portIdx); - - String serverStr = host + ':' + (clientPort + 1); - // zk leader election algorithms other than 0 need an extra port for leader election. - if (alg != 0) { - serverStr = serverStr + ':' + (clientPort + 2); - } - - props.setProperty("server." + serverNum, serverStr); - serverNum++; - } - } - } - - public static boolean hasServers(Properties props) { - for (Object key : props.keySet()) if (((String) key).startsWith("server.")) return true; - return false; - } - - public void setDataDir(Path dataDir) { - this.dataDir = dataDir.toFile(); - } - - /** - * Parse config from a Properties. - * - * @param zkProp Properties to parse from. - */ - @Override - public void parseProperties(Properties zkProp) throws IOException, ConfigException { - super.parseProperties(zkProp); - } -} diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java index a759ce80461a..0a7da7b914a4 100644 --- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java +++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java @@ -290,7 +290,7 @@ public JerseyAppHandlerCache getJerseyAppHandlerCache() { private final ObjectCache objectCache = new ObjectCache(); - public final NodeRoles nodeRoles = new NodeRoles(System.getProperty(NodeRoles.NODE_ROLES_PROP)); + public final NodeRoles nodeRoles = new NodeRoles(EnvUtils.getProperty(NodeRoles.NODE_ROLES_PROP)); private final ExecutorService indexSearcherExecutor; diff --git a/solr/core/src/java/org/apache/solr/core/NodeConfig.java b/solr/core/src/java/org/apache/solr/core/NodeConfig.java index 1634c2c29e8c..0073c24789f9 100644 --- a/solr/core/src/java/org/apache/solr/core/NodeConfig.java +++ b/solr/core/src/java/org/apache/solr/core/NodeConfig.java @@ -235,11 +235,16 @@ public static NodeConfig loadNodeConfig(Path solrHome, Properties nodeProperties initModules(loader, null); nodeProperties = SolrXmlConfig.wrapAndSetZkHostFromSysPropIfNeeded(nodeProperties); - // TODO: Only job of this block is to - // delay starting a solr core to satisfy - // ZkFailoverTest test case... String zkHost = nodeProperties.getProperty(SolrXmlConfig.ZK_HOST); - if (StrUtils.isNotNullOrEmpty(zkHost)) { + NodeRoles nodeRoles = new NodeRoles(EnvUtils.getProperty(NodeRoles.NODE_ROLES_PROP)); + boolean zookeeperQuorumNode = + NodeRoles.MODE_ON.equals(nodeRoles.getRoleMode(NodeRoles.Role.ZOOKEEPER_QUORUM)); + + // This block demonstrates how we pause and wait for a ZooKeeper to be available before + // continuing. + // See the ZkFailoverTest to see how changing solr.cloud.wait.for.zk.seconds impacts this + // capability. + if (StrUtils.isNotNullOrEmpty(zkHost) && !zookeeperQuorumNode) { int startUpZkTimeOut = 1000 * Integer.getInteger( diff --git a/solr/core/src/java/org/apache/solr/core/NodeRoles.java b/solr/core/src/java/org/apache/solr/core/NodeRoles.java index c38c92297c76..00c5c3d57ef5 100644 --- a/solr/core/src/java/org/apache/solr/core/NodeRoles.java +++ b/solr/core/src/java/org/apache/solr/core/NodeRoles.java @@ -113,6 +113,18 @@ public String modeWhenRoleIsAbsent() { public Set supportedModes() { return Set.of(MODE_ON, MODE_OFF); } + }, + + ZOOKEEPER_QUORUM("zookeeper_quorum") { + @Override + public Set supportedModes() { + return Set.of(MODE_ON, MODE_OFF); + } + + @Override + public String modeWhenRoleIsAbsent() { + return MODE_OFF; + } }; public final String roleName; diff --git a/solr/core/src/java/org/apache/solr/core/ZkContainer.java b/solr/core/src/java/org/apache/solr/core/ZkContainer.java index 810eac98faa0..62424b2912a6 100644 --- a/solr/core/src/java/org/apache/solr/core/ZkContainer.java +++ b/solr/core/src/java/org/apache/solr/core/ZkContainer.java @@ -21,13 +21,17 @@ import io.opentelemetry.api.common.Attributes; import java.io.IOException; +import java.io.Reader; +import java.io.StringReader; import java.lang.invoke.MethodHandles; +import java.nio.file.Files; import java.nio.file.Path; +import java.util.Properties; import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.Predicate; import org.apache.solr.client.solrj.impl.SolrZkClientTimeout; -import org.apache.solr.cloud.SolrZkServer; import org.apache.solr.cloud.ZkController; import org.apache.solr.common.SolrException; import org.apache.solr.common.cloud.ClusterProperties; @@ -43,7 +47,10 @@ import org.apache.solr.metrics.SolrMetricProducer; import org.apache.solr.metrics.SolrMetricsContext; import org.apache.solr.metrics.otel.OtelUnit; +import org.apache.solr.servlet.CoreContainerProvider; +import org.apache.solr.util.AddressUtils; import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.server.embedded.ZooKeeperServerEmbedded; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,8 +66,14 @@ public class ZkContainer { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + public static final String ZK_WHITELIST_PROPERTY = "zookeeper.4lw.commands.whitelist"; + public static final String ZK_MAX_CNXNS_PROPERTY = "zookeeper.maxCnxns"; + // Per ZooKeeper, "0" means no limit for max client connections. + public static final String ZK_MAX_CNXNS_DEFAULT = "0"; + protected ZkController zkController; - private SolrZkServer zkServer; + + private volatile ZooKeeperServerEmbedded zkServerEmbedded; private ExecutorService coreZkRegister = ExecutorUtil.newMDCAwareCachedThreadPool(new SolrNamedThreadFactory("coreZkRegister")); @@ -70,42 +83,178 @@ public class ZkContainer { public ZkContainer() {} public void initZooKeeper(final CoreContainer cc, CloudConfig config) { - boolean zkRun = EnvUtils.getPropertyAsBool("solr.zookeeper.server.enabled", false); + // zkServerEnabled is set whenever in solrCloud mode ('-c') but no explicit zkHost/ZK_HOST is + // provided. + final boolean zkServerEnabled = + EnvUtils.getPropertyAsBool("solr.zookeeper.server.enabled", false); + boolean zkQuorumNode = false; + if (NodeRoles.MODE_ON.equals(cc.nodeRoles.getRoleMode(NodeRoles.Role.ZOOKEEPER_QUORUM))) { + zkQuorumNode = true; + log.info("Starting node in ZooKeeper Quorum role."); + } - if (zkRun && config == null) + if (zkServerEnabled && config == null) { throw new SolrException( SolrException.ErrorCode.SERVER_ERROR, "Cannot start Solr in cloud mode - no cloud config provided"); + } - if (config == null) return; // not in zk mode + if (config == null) { + log.info("Solr is running in standalone mode"); + return; + } + + final boolean runAsQuorum = config.getZkHost() != null && zkQuorumNode; String zookeeperHost = config.getZkHost(); + final var solrHome = cc.getSolrHome(); + // runAsQuorum takes precedence: the zookeeper_quorum node role alone is sufficient to start + // embedded ZK in quorum mode — no need to also set solr.zookeeper.server.enabled=true. + // zkServerEnabled is only used for legacy standalone embedded ZK (bin/solr -c without ZK_HOST). + if (runAsQuorum) { + // ZooKeeperServerEmbedded being used under the covers. + // When running multiple quorum nodes on the same machine sharing a solrHome, + // override the default data and config directories with unique ones + final int zkClientPortOffset = + EnvUtils.getPropertyAsInteger("solr.zookeeper.server.client.port.offset", 1000); + final int zkQuorumPortOffset = + EnvUtils.getPropertyAsInteger("solr.zookeeper.server.quorum.port.offset", 1010); + final int zkElectionPortOffset = + EnvUtils.getPropertyAsInteger("solr.zookeeper.server.election.port.offset", 1020); + final int zkPort = config.getSolrHostPort() + zkClientPortOffset; + final var zkDataDir = + solrHome.resolve(EnvUtils.getProperty("solr.zookeeper.server.datadir", "zoo_data")); + final var zkConfDir = + solrHome.resolve(EnvUtils.getProperty("solr.zookeeper.server.confdir", "")); + + // Populate a zoo.cfg + final String zooCfgTemplate = + """ + tickTime=2000 + initLimit=10 + syncLimit=5 + dataDir=@@DATA_DIR@@ + 4lw.commands.whitelist=mntr,conf,ruok + admin.enableServer=false + clientPort=@@ZK_CLIENT_PORT@@ + """; + String zooCfgContents = + zooCfgTemplate + .replace("@@DATA_DIR@@", zkDataDir.toString()) + .replace("@@ZK_CLIENT_PORT@@", String.valueOf(zkPort)); + final String[] zkHosts = config.getZkHost().split(","); + int myId = -1; + // TODO: myId detection uses exact string matching between config.getHost() and the host + // portion of zkHost entries. This fails when zkHost uses "localhost" but Solr is + // configured with "127.0.0.1" (or vice versa). Consider resolving hostnames/IPs before + // matching, or matching by port alone (which is unique per node in a single-machine setup). + final String targetConnStringSection = config.getHost() + ":" + zkPort; + if (log.isInfoEnabled()) { + log.info( + "Trying to match {} against zkHostString {} to determine myid", + targetConnStringSection, + config.getZkHost()); + } + for (int i = 0; i < zkHosts.length; i++) { + final String host = zkHosts[i]; + if (targetConnStringSection.equals(zkHosts[i])) { + myId = (i + 1); + } + final var hostComponents = host.split(":"); + if (hostComponents.length < 2) { + throw new IllegalStateException( + "Invalid zkHost entry (expected 'host:port'): '" + host + "'"); + } + final var zkServer = hostComponents[0]; + final int zkClientPort; + try { + zkClientPort = Integer.parseInt(hostComponents[1]); + } catch (NumberFormatException e) { + throw new IllegalStateException( + "Invalid port in zkHost entry '" + host + "': " + hostComponents[1], e); + } + // Derive quorum/election ports from the peer's ZK client port using the same offsets. + // All nodes are assumed to use the same offset configuration. + final var zkQuorumPort = zkClientPort - zkClientPortOffset + zkQuorumPortOffset; + final var zkLeaderPort = zkClientPort - zkClientPortOffset + zkElectionPortOffset; + final String configEntry = + "server." + (i + 1) + "=" + zkServer + ":" + zkQuorumPort + ":" + zkLeaderPort + "\n"; + zooCfgContents = zooCfgContents + configEntry; + } + + if (myId == -1) { + throw new IllegalStateException( + "Unable to determine ZK 'myid' for target " + targetConnStringSection); + } + + try { + Files.createDirectories(zkDataDir); + Files.writeString(zkDataDir.resolve("myid"), String.valueOf(myId)); + Properties zkProps = new Properties(); + zkProps.load(new StringReader(zooCfgContents)); + startZooKeeperServerEmbedded(zkPort, zkConfDir, zkProps); + } catch (Exception e) { + throw new ZooKeeperException( + SolrException.ErrorCode.SERVER_ERROR, + "IOException bootstrapping zk quorum instance", + e); + } + } else if (zkServerEnabled) { + try { + final Path zkDataHome = + solrHome.resolve(EnvUtils.getProperty("solr.zookeeper.server.datadir", "zoo_data")); + final Path zkConfHome = + solrHome.resolve( + EnvUtils.getProperty("solr.zookeeper.server.confdir", solrHome.toString())); + + Path zooCfgPath = zkConfHome.resolve("zoo.cfg"); + if (!Files.exists(zooCfgPath)) { + log.info("Zookeeper configuration not found in {}, using built-in default", zkConfHome); + String solrInstallDir = System.getProperty(CoreContainerProvider.SOLR_INSTALL_DIR); + if (solrInstallDir == null) { + throw new SolrException( + SolrException.ErrorCode.SERVER_ERROR, + "Could not find default zoo.cfg file due to missing " + + CoreContainerProvider.SOLR_INSTALL_DIR); + } + zooCfgPath = Path.of(solrInstallDir).resolve("server").resolve("solr").resolve("zoo.cfg"); + } - // zookeeper in quorum mode currently causes a failure when trying to - // register log4j mbeans. See SOLR-2369 - // TODO: remove after updating to an slf4j based zookeeper - System.setProperty("zookeeper.jmx.log4j.disable", "true"); - - Path solrHome = cc.getSolrHome(); - if (zkRun) { - String zkDataHome = - EnvUtils.getProperty( - "solr.zookeeper.server.datadir", solrHome.resolve("zoo_data").toString()); - String zkConfHome = - EnvUtils.getProperty("solr.zookeeper.server.confdir", solrHome.toString()); - zkServer = - new SolrZkServer( - zkRun, - stripChroot(config.getZkHost()), - Path.of(zkDataHome), - zkConfHome, - config.getSolrHostPort()); - zkServer.parseConfig(); - zkServer.start(); - - // set client from server config if not already set - if (zookeeperHost == null) { - zookeeperHost = zkServer.getClientString(); + final Properties zkProps = new Properties(); + try (Reader reader = Files.newBufferedReader(zooCfgPath)) { + zkProps.load(reader); + } + if (zkProps.getProperty("dataDir") == null) { + zkProps.setProperty("dataDir", zkDataHome.toString()); + } + final String clientPortAddress = + EnvUtils.getProperty("solr.zookeeper.embedded.host", "127.0.0.1"); + zkProps.setProperty("clientPortAddress", clientPortAddress); + if (zkProps.getProperty("clientPort") == null) { + zkProps.setProperty("clientPort", Integer.toString(config.getSolrHostPort() + 1000)); + } + if (System.getProperty(ZK_WHITELIST_PROPERTY) == null) { + System.setProperty(ZK_WHITELIST_PROPERTY, "ruok, mntr, conf"); + } + + Files.createDirectories(zkDataHome); + final int zkPort = Integer.parseInt(zkProps.getProperty("clientPort")); + startZooKeeperServerEmbedded(zkPort, zkDataHome, zkProps); + + if (zookeeperHost == null) { + // We cannot advertise 0.0.0.0, so choose the best host to advertise + // (the same that the Solr Node defaults to) + final String hostName = + "0.0.0.0".equals(clientPortAddress) + ? AddressUtils.getHostToAdvertise() + : clientPortAddress; + zookeeperHost = hostName + ":" + zkPort; + } + } catch (Exception e) { + throw new ZooKeeperException( + SolrException.ErrorCode.SERVER_ERROR, + "Exception starting embedded zookeeper server", + e); } } @@ -116,9 +265,9 @@ public void initZooKeeper(final CoreContainer cc, CloudConfig config) { // we are ZooKeeper enabled try { // If this is an ensemble, allow for a long connect time for other servers to come up - if (zkRun && zkServer.getServers().size() > 1) { - zkClientConnectTimeout = 24 * 60 * 60 * 1000; // 1 day for embedded ensemble - log.info("Zookeeper client={} Waiting for a quorum.", zookeeperHost); + if (runAsQuorum) { + zkClientConnectTimeout = 24 * 60 * 60 * 1000; // 1 day for embedded quorum + log.info("Zookeeper client={} (quorum mode) Waiting for a quorum.", zookeeperHost); } else { log.info("Zookeeper client={}", zookeeperHost); } @@ -133,7 +282,7 @@ public void initZooKeeper(final CoreContainer cc, CloudConfig config) { ZkController zkController = new ZkController(cc, zookeeperHost, zkClientConnectTimeout, config); - if (zkRun) { + if (zkServerEnabled || runAsQuorum) { if (StrUtils.isNotNullOrEmpty(System.getProperty(HTTPS_PORT_PROP))) { // Embedded ZK and probably running with SSL new ClusterProperties(zkController.getZkClient()) @@ -241,9 +390,17 @@ public SolrMetricsContext getSolrMetricsContext() { } } - private String stripChroot(String zkRun) { - if (zkRun == null || zkRun.trim().isEmpty() || zkRun.lastIndexOf('/') < 0) return zkRun; - return zkRun.substring(0, zkRun.lastIndexOf('/')); + private void startZooKeeperServerEmbedded(int port, Path zkHomeDir, Properties p) + throws Exception { + ensureZkMaxCnxnsConfigured(); + zkServerEmbedded = + ZooKeeperServerEmbedded.builder().baseDir(zkHomeDir).configuration(p).build(); + zkServerEmbedded.start(); + log.info("Started embedded ZooKeeper server on port {}", port); + } + + public static void ensureZkMaxCnxnsConfigured() { + System.getProperties().putIfAbsent(ZK_MAX_CNXNS_PROPERTY, ZK_MAX_CNXNS_DEFAULT); } public static volatile Predicate testing_beforeRegisterInZk; @@ -313,8 +470,31 @@ public void close() { zkController.close(); } } finally { - if (zkServer != null) { - zkServer.stop(); + if (zkServerEmbedded != null) { + try { + zkServerEmbedded.close(); + // ZooKeeperServerEmbedded.close() is asynchronous: ZK's QuorumCnxManager + // WorkerSender/WorkerReceiver threads may still be running. Wait up to 5s. + long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(5); + while (System.nanoTime() < deadline) { + boolean quorumThreadsRunning = + Thread.getAllStackTraces().keySet().stream() + .anyMatch( + t -> + t.getName().startsWith("WorkerSender") + || t.getName().startsWith("WorkerReceiver")); + if (!quorumThreadsRunning) break; + try { + Thread.sleep(100); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + break; + } + } + log.info("Closed embedded ZooKeeper server"); + } catch (Exception e) { + log.error("Error closing embedded ZooKeeper server", e); + } } } IOUtils.closeQuietly(metricProducer); diff --git a/solr/core/src/test/org/apache/solr/cloud/TestEmbeddedZkQuorum.java b/solr/core/src/test/org/apache/solr/cloud/TestEmbeddedZkQuorum.java new file mode 100644 index 000000000000..7257a891c8ff --- /dev/null +++ b/solr/core/src/test/org/apache/solr/cloud/TestEmbeddedZkQuorum.java @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.cloud; + +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.nio.file.Path; +import java.util.Locale; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.apache.solr.SolrTestCaseJ4; +import org.apache.solr.client.solrj.impl.CloudSolrClient; +import org.apache.solr.client.solrj.request.CollectionAdminRequest; +import org.apache.solr.client.solrj.request.SolrQuery; +import org.apache.solr.client.solrj.response.QueryResponse; +import org.apache.solr.common.SolrDocumentList; +import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.embedded.JettySolrRunner; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test embedded ZooKeeper running in quorum mode within Solr nodes. + * + *

This test verifies that: + * + *

    + *
  • Multiple Solr nodes can start with embedded ZK in quorum mode + *
  • The ZK quorum forms correctly + *
  • Collections can be created and used + *
  • Documents can be indexed and queried + *
  • All resources are properly closed on shutdown + *
+ */ +@SolrTestCaseJ4.SuppressSSL +public class TestEmbeddedZkQuorum extends SolrCloudTestCase { + + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private static final String COLLECTION_NAME = "test_quorum_collection"; + private static final int NUM_NODES = 3; + + @BeforeClass + public static void setupCluster() throws Exception { + // Disable ZooKeeper JMX to avoid MBean registration conflicts during beasting + System.setProperty("zookeeper.jmx.log4j.disable", "true"); + + // Get path to a test config + Path configPath = TEST_PATH().resolve("collection1").resolve("conf"); + + // Configure cluster with 3 nodes, each running embedded ZK + cluster = + configureCluster(NUM_NODES).addConfig("conf1", configPath).withEmbeddedZkQuorum().build(); + cluster.waitForAllNodes(60); + } + + @Test + public void testBasicQuorumFunctionality() + throws IOException, InterruptedException, TimeoutException { + for (int i = 0; i < NUM_NODES; i++) { + JettySolrRunner node = cluster.getJettySolrRunner(i); + assertTrue("Node " + i + " should be running", node.isRunning()); + assertNotNull("Node " + i + " should have a NodeName", node.getNodeName()); + } + } + + @Test + public void testCollectionIndexing() throws Exception { + try (CloudSolrClient client = cluster.getSolrClient(COLLECTION_NAME)) { + CollectionAdminRequest.Create createCmd = + CollectionAdminRequest.createCollection(COLLECTION_NAME, "conf1", 1, 3); + createCmd.process(client); + cluster.waitForActiveCollection(COLLECTION_NAME, 1, 3); + + // Index some documents + for (int i = 0; i < 10; i++) { + SolrInputDocument doc = new SolrInputDocument(); + doc.addField("id", i); + doc.addField("title_s", "Test Document " + i); + doc.addField("content_t", "This is test content for document " + i); + client.add(doc); + } + client.commit(); + + // Query the documents + SolrQuery query = new SolrQuery("*:*"); + query.setRows(100); + QueryResponse response = client.query(query); + SolrDocumentList results = response.getResults(); + + // Verify results + assertEquals("Should have 10 documents", 10, results.getNumFound()); + + CollectionAdminRequest.Delete deleteCmd = + CollectionAdminRequest.deleteCollection(COLLECTION_NAME); + deleteCmd.process(client); + } + } + + /** + * Tests ZK quorum resilience when a single node fails and recovers. + * + *

This test verifies that: + * + *

    + *
  • A 3-node ZK quorum can lose 1 node and maintain quorum (2/3) + *
  • The cluster continues to accept writes with 2 nodes + *
  • A failed node can rejoin the quorum using the same ports + *
  • All data is preserved after node recovery + *
+ * + *

This test creates its own private cluster to avoid interfering with other tests. + */ + @Test + public void testQuorumResilienceWithNodeFailure() throws Exception { + final String collectionName = "quorum_resilience"; + final int initialDocs = 5; + final int docsWhileDown = 5; + final int docsAfterRecovery = 5; + + // Create a private cluster for this test + Path configPath = TEST_PATH().resolve("collection1").resolve("conf"); + MiniSolrCloudCluster privateCluster = + configureCluster(NUM_NODES).addConfig("conf1", configPath).withEmbeddedZkQuorum().build(); + + try { + privateCluster.waitForAllNodes(60); + + // Create collection with replica on each node + CollectionAdminRequest.createCollection(collectionName, "conf1", 1, 3) + .process(privateCluster.getSolrClient()); + privateCluster.waitForActiveCollection(collectionName, 1, 3); + + try (CloudSolrClient client = privateCluster.getSolrClient(collectionName)) { + // Index initial documents and verify + indexDocuments(client, 0, initialDocs, "initial"); + privateCluster.waitForDocCount( + collectionName, initialDocs, "initial documents", 120, TimeUnit.SECONDS); + + // Stop one node (quorum maintained with 2/3 nodes) + JettySolrRunner stoppedNode = privateCluster.getJettySolrRunner(2); + String stoppedNodeName = stoppedNode.getNodeName(); + if (log.isInfoEnabled()) { + log.info("Stopping node to test quorum resilience: {}", stoppedNodeName); + } + privateCluster.stopJettySolrRunner(stoppedNode); + + // Wait for ZK to detect node loss and verify cluster still operational + privateCluster.waitForLiveNodes(2, 120); + indexDocuments(client, initialDocs, docsWhileDown, "during_failure"); + privateCluster.waitForDocCount( + collectionName, + initialDocs + docsWhileDown, + "documents while node down", + 120, + TimeUnit.SECONDS); + if (log.isInfoEnabled()) { + log.info("Starting node {} again and testing functionality", stoppedNodeName); + } + + privateCluster.startJettySolrRunner(stoppedNode, true); + privateCluster.waitForNode(stoppedNode, 120); + + // Wait for cluster to stabilize and verify all nodes running + privateCluster.waitForLiveNodes(3, 120); + + // CRITICAL: Wait for collection to become active (replicas up, leader elected) + // before attempting to index documents + privateCluster.waitForActiveCollection(collectionName, 120, TimeUnit.SECONDS, 1, 3); + + privateCluster.waitForDocCount( + collectionName, + initialDocs + docsWhileDown, + "documents after recovery", + 120, + TimeUnit.SECONDS); + + // Verify full cluster functionality by adding more documents + indexDocuments(client, initialDocs + docsWhileDown, docsAfterRecovery, "after_recovery"); + privateCluster.waitForDocCount( + collectionName, + initialDocs + docsWhileDown + docsAfterRecovery, + "all documents", + 120, + TimeUnit.SECONDS); + } + } finally { + CollectionAdminRequest.deleteCollection(collectionName) + .process(privateCluster.getSolrClient()); + privateCluster.shutdown(); + } + } + + /** + * Tests ZK quorum loss and recovery when majority of nodes fail. + * + *

This test verifies that: + * + *

    + *
  • A 3-node ZK quorum loses quorum when 2 nodes are down (1/3 remaining) + *
  • The surviving node maintains its replica but cannot process updates without quorum + *
  • Both failed nodes can be restarted to restore quorum + *
  • The cluster becomes operational again (can query and index documents) + *
  • Note: After catastrophic failure, some replicas may need time or manual intervention to + * fully recover + *
+ * + *

This test creates its own private cluster to avoid interfering with other tests. Hard to + * make this test pass + */ + @AwaitsFix(bugUrl = "https://issues.apache.org/jira/browse/SOLR-18094") + @Test + public void testQuorumLossAndRecovery() throws Exception { + final String collectionName = "quorum_loss"; + + // Create a private cluster for this test + Path configPath = TEST_PATH().resolve("collection1").resolve("conf"); + MiniSolrCloudCluster privateCluster = + configureCluster(NUM_NODES).addConfig("conf1", configPath).withEmbeddedZkQuorum().build(); + + try { + privateCluster.waitForAllNodes(60); + + // Create collection with 3 replicas (one on each node) to ensure at least + // one replica survives when we stop 2 nodes + CollectionAdminRequest.createCollection(collectionName, "conf1", 1, 3) + .process(privateCluster.getSolrClient()); + privateCluster.waitForActiveCollection(collectionName, 1, 3); + + try (CloudSolrClient client = privateCluster.getSolrClient(collectionName)) { + indexDocuments(client, 0, 1, "before_loss"); + privateCluster.waitForDocCount( + collectionName, 1, "initial document", 120, TimeUnit.SECONDS); + + // Stop 2 out of 3 nodes to lose quorum + JettySolrRunner node1 = privateCluster.getJettySolrRunner(1); + JettySolrRunner node2 = privateCluster.getJettySolrRunner(2); + String node1Name = node1.getNodeName(); + String node2Name = node2.getNodeName(); + + if (log.isInfoEnabled()) { + log.info("Stopping 2 nodes to lose quorum: {}, {}", node1Name, node2Name); + } + privateCluster.stopJettySolrRunner(node1); + privateCluster.stopJettySolrRunner(node2); + + // Wait for ZK to detect quorum loss + privateCluster.waitForLiveNodes(1, 120); + + // Restart both nodes to restore quorum + if (log.isInfoEnabled()) { + log.info("Restarting nodes to restore quorum"); + } + privateCluster.startJettySolrRunner(node1, true); + privateCluster.startJettySolrRunner(node2, true); + + // Wait for both nodes to register with ZK (they should appear in live_nodes) + // but we don't require them to be fully recovered immediately + privateCluster.waitForNode(node1, 120); + privateCluster.waitForNode(node2, 120); + privateCluster.waitForLiveNodes(3, 120); + + // CRITICAL: Wait for collection to become active (replicas up, leader elected) + // After catastrophic failure, we need to ensure at least one replica is active + // before attempting operations + privateCluster.waitForActiveCollection(collectionName, 120, TimeUnit.SECONDS, 1, 1); + + // After catastrophic failure, the cluster should be operational with quorum restored + // even if not all replicas are immediately active + try { + privateCluster.waitForDocCount( + collectionName, 1, "document after recovery", 120, TimeUnit.SECONDS); + + // Verify cluster accepts writes + indexDocuments(client, 1, 1, "after_recovery"); + privateCluster.waitForDocCount( + collectionName, 2, "all documents after recovery", 120, TimeUnit.SECONDS); + + } catch (Exception e) { + if (log.isErrorEnabled()) { + log.error("Cluster failed to become operational after quorum restoration"); + } + throw e; + } + } + } finally { + // Clean up collection and cluster + CollectionAdminRequest.deleteCollection(collectionName) + .process(privateCluster.getSolrClient()); + privateCluster.shutdown(); + } + } + + // Helper methods for improved test clarity and reusability + + /** + * Index a batch of documents with a specific phase tag. + * + * @param client the CloudSolrClient to use + * @param startId starting document ID + * @param count number of documents to index + * @param phase phase tag to add to documents + */ + private void indexDocuments(CloudSolrClient client, int startId, int count, String phase) + throws Exception { + for (int i = 0; i < count; i++) { + SolrInputDocument doc = new SolrInputDocument(); + doc.addField("id", startId + i); + doc.addField("phase_s", phase); + doc.addField( + "content_t", String.format(Locale.ROOT, "Document %d in phase %s", startId + i, phase)); + client.add(doc); + } + client.commit(); + } +} diff --git a/solr/solr-ref-guide/modules/configuration-guide/pages/solr-properties.adoc b/solr/solr-ref-guide/modules/configuration-guide/pages/solr-properties.adoc index b9ecaddb1002..5b7099326c39 100644 --- a/solr/solr-ref-guide/modules/configuration-guide/pages/solr-properties.adoc +++ b/solr/solr-ref-guide/modules/configuration-guide/pages/solr-properties.adoc @@ -132,8 +132,14 @@ NOTE: Properties marked with "!" indicate inverted meaning between pre Solr 10 a |solr.zookeeper.server.datadir|zkServerDataDir|Defaults to solr.home/zoo_data dir.|Where to store ZooKeeper data when running embedded ZooKeeper. +|solr.zookeeper.server.client.port.offset||1000|Port offset from the Solr port to the embedded ZooKeeper client port when running in quorum mode. All nodes in the ensemble must use the same offset. + +|solr.zookeeper.server.election.port.offset||1020|Port offset from the Solr port to the embedded ZooKeeper leader election port when running in quorum mode. All nodes in the ensemble must use the same offset. + |solr.zookeeper.server.enabled|zk.run|false|Controls whether embedded ZooKeeper is started. Set by the start script. +|solr.zookeeper.server.quorum.port.offset||1010|Port offset from the Solr port to the embedded ZooKeeper quorum peer port when running in quorum mode. All nodes in the ensemble must use the same offset. + |=== === Jetty related properties diff --git a/solr/solr-ref-guide/modules/deployment-guide/pages/node-roles.adoc b/solr/solr-ref-guide/modules/deployment-guide/pages/node-roles.adoc index 07dab503bddb..bfcde66b3a3e 100644 --- a/solr/solr-ref-guide/modules/deployment-guide/pages/node-roles.adoc +++ b/solr/solr-ref-guide/modules/deployment-guide/pages/node-roles.adoc @@ -49,7 +49,7 @@ If a node has been started with no `solr.node.roles` parameter, it will be assum ==== .Supported roles -[cols="1,1"] +[cols="1,1"] |=== |Role |Modes @@ -61,6 +61,9 @@ If a node has been started with no `solr.node.roles` parameter, it will be assum |`coordinator` |on, off + +|`zookeeper_quorum` _(experimental)_ +|on, off |=== === `data` role @@ -88,6 +91,15 @@ In such cases, a few dedicated nodes can be started with a *`coordinator`* role +=== `zookeeper_quorum` role + +WARNING: This feature is *experimental / alpha* and should not be used in production. The configuration interface may change without notice. + +A node with this role will start an embedded ZooKeeper server that joins a multi-node ZooKeeper quorum (ensemble) on startup. +This allows running a self-contained SolrCloud cluster without deploying a separate ZooKeeper process. + +See xref:zookeeper-ensemble.adoc#embedded-zookeeper-ensemble[Embedded ZooKeeper Ensemble] for full details on how to configure and start a cluster with this role. + == Example usage Sometimes, when the nodes in a cluster are under heavy querying or indexing load, the overseer leader node might be unable to perform collection management duties efficiently. It might be reasonable to have dedicated nodes to act as the overseer. Such an effect can be achieved as follows: @@ -125,6 +137,10 @@ curl http://localhost:8983/api/cluster/node-roles/supported "modes":["disallowed", "allowed", "preferred"] + }, + "zookeeper_quorum":{ + "modes":["off", + "on"] } } } diff --git a/solr/solr-ref-guide/modules/deployment-guide/pages/zookeeper-ensemble.adoc b/solr/solr-ref-guide/modules/deployment-guide/pages/zookeeper-ensemble.adoc index d422b89572aa..d2d5d0d7729c 100644 --- a/solr/solr-ref-guide/modules/deployment-guide/pages/zookeeper-ensemble.adoc +++ b/solr/solr-ref-guide/modules/deployment-guide/pages/zookeeper-ensemble.adoc @@ -546,6 +546,94 @@ set SOLR_OPTS=%SOLR_OPTS% -Djute.maxbuffer=0x200000 ==== ====== +[[embedded-zookeeper-ensemble]] +== Embedded ZooKeeper Ensemble (Experimental) + +WARNING: This feature is *experimental / alpha* and is not suitable for production use. +The configuration interface may change without notice in future Solr releases. + +Normally, Solr's embedded ZooKeeper runs in "standalone" mode and cannot tolerate node failures. +Starting in Solr 10.1, it is possible to run Solr nodes where each node's embedded ZooKeeper participates in a multi-node quorum (ensemble). +This allows a self-contained SolrCloud cluster without a separate ZooKeeper deployment — useful for development, testing, or small low-stakes clusters. + +=== How It Works + +Each Solr node that has the `zookeeper_quorum` xref:node-roles.adoc[node role] set to `on` will start an embedded `ZooKeeperServerEmbedded` instance on startup. +Solr identifies which host in the ZK connection string it corresponds to (by matching host+port) and assigns it a ZooKeeper `myid`. + +Port scheme used by each node: + +|=== +|Port |Formula |Example (Solr on 8983) + +|Solr HTTP +|`solr_port` +|8983 + +|ZK client +|`solr_port + 1000` +|9983 + +|ZK quorum peer +|`solr_port + 1010` +|9993 + +|ZK leader election +|`solr_port + 1020` +|10003 +|=== + +All three port offsets can be overridden with system properties — see <> below. + +=== Starting a 3-Node Embedded ZK Ensemble + +The ZK connection string must list all nodes' ZK client ports (`solr_port + 1000`), and every node must use `zookeeper_quorum:on`. +All three nodes must be started before the quorum can elect a leader and become operational. + +NOTE: On a single machine, each node needs distinct ports. Because each ZK instance also binds to `zk_client_port+1` (quorum peer) and `zk_client_port+2` (leader election), Solr ports must be spaced *at least 1001 apart* to avoid collisions. +When nodes share the same working directory, also set `solr.zookeeper.server.datadir` to a distinct path per node to avoid ZooKeeper data directory conflicts. + +[source,bash] +---- +export LH="localhost" +ZK_HOST="$LH:21000,$LH:31000,$LH:41000" +ROLES="data:on,overseer:allowed,zookeeper_quorum:on" + +bin/solr start -p 20000 -z "$ZK_HOST" -Dsolr.node.roles="$ROLES" -Dsolr.zookeeper.server.datadir=zoo_home_1 +bin/solr start -p 30000 -z "$ZK_HOST" -Dsolr.node.roles="$ROLES" -Dsolr.zookeeper.server.datadir=zoo_home_2 +bin/solr start -p 40000 -z "$ZK_HOST" -Dsolr.node.roles="$ROLES" -Dsolr.zookeeper.server.datadir=zoo_home_3 +---- + +[[embedded-zk-ports]] +=== Configuring Embedded ZK Ports + +By default the three ZooKeeper ports are derived from the Solr port. +Each can be overridden with a system property: + +[cols="2,1,2"] +|=== +|System property |Default |Description + +|`solr.zookeeper.server.client.port.offset` +|`1000` +|Offset from Solr port to ZK client port + +|`solr.zookeeper.server.quorum.port.offset` +|`1010` +|Offset from Solr port to ZK quorum peer port + +|`solr.zookeeper.server.election.port.offset` +|`1020` +|Offset from Solr port to ZK leader election port +|=== + +=== Known Limitations + +* Does not support ZooKeeper's dynamic-ensemble reconfiguration — all nodes must be listed in the static ZK connection string at startup. +* Quorum loss and recovery (majority of nodes down simultaneously) is not yet fully reliable. +* The hostname in the ZK connection string must exactly match Solr's configured host (`solr.host` / `-Dhost`). Using `localhost` on some nodes and `127.0.0.1` on others will not work. +* Runs best with the Java security manager disabled (`solr.security.manager.enabled=false`). + == Securing the ZooKeeper Connection You may also want to secure the communication between ZooKeeper and Solr. diff --git a/solr/solr-ref-guide/modules/upgrade-notes/pages/major-changes-in-solr-10.adoc b/solr/solr-ref-guide/modules/upgrade-notes/pages/major-changes-in-solr-10.adoc index 375109ddbb70..75a08f8dec57 100644 --- a/solr/solr-ref-guide/modules/upgrade-notes/pages/major-changes-in-solr-10.adoc +++ b/solr/solr-ref-guide/modules/upgrade-notes/pages/major-changes-in-solr-10.adoc @@ -36,6 +36,14 @@ Solr 10.0 requires at least Java 21, while SolrJ 10.0 requires at least Java 17. == Solr 10.1 +=== Embedded ZooKeeper Ensemble (Experimental) + +A new `zookeeper_quorum` xref:deployment-guide:node-roles.adoc[node role] has been added (experimental/alpha). +When enabled, each participating Solr node starts an embedded ZooKeeper that joins a multi-node quorum, allowing a self-contained SolrCloud cluster without a separate ZooKeeper deployment. +See xref:deployment-guide:zookeeper-ensemble.adoc#embedded-zookeeper-ensemble[Embedded ZooKeeper Ensemble] for details. + +=== Configuring SSL + For SSL (https), it's no longer necessary to set the "urlScheme" cluster property since the `SOLR_SSL_ENABLED` env var (or `solr.ssl.enabled` sys-prop) suffices. These are now honored by CloudSolrClient, as well as scheme detection from the connection string / hosts. The "urlScheme" cluster property and httpShardHandlerFactory configuration is likely to be deprecated; feedback welcome. diff --git a/solr/test-framework/src/java/org/apache/solr/SolrIgnoredThreadsFilter.java b/solr/test-framework/src/java/org/apache/solr/SolrIgnoredThreadsFilter.java index 298b89d38da7..ba8698ba24e2 100644 --- a/solr/test-framework/src/java/org/apache/solr/SolrIgnoredThreadsFilter.java +++ b/solr/test-framework/src/java/org/apache/solr/SolrIgnoredThreadsFilter.java @@ -88,6 +88,13 @@ public boolean reject(Thread t) { return true; } + // ZK's QuorumCnxManager spawns WorkerSender/WorkerReceiver threads that may briefly + // outlive ZooKeeperServerEmbedded.close() (which is asynchronous). ZkContainer.close() + // waits up to 5s for them, but suppress any stragglers here as a safety net. + if (threadName.startsWith("WorkerSender") || threadName.startsWith("WorkerReceiver")) { + return true; + } + if (threadName.startsWith("closeThreadPool")) { return true; } diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java b/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java index 62470e6913d3..d1fd3fd52c80 100644 --- a/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java +++ b/solr/test-framework/src/java/org/apache/solr/cloud/MiniSolrCloudCluster.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.io.PrintStream; import java.lang.invoke.MethodHandles; +import java.net.ServerSocket; import java.net.URL; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; @@ -55,11 +56,14 @@ import org.apache.lucene.tests.util.LuceneTestCase; import org.apache.solr.SolrBackend; import org.apache.solr.SolrTestCaseJ4; +import org.apache.solr.client.solrj.SolrClient; import org.apache.solr.client.solrj.SolrServerException; import org.apache.solr.client.solrj.apache.CloudLegacySolrClient; import org.apache.solr.client.solrj.impl.CloudSolrClient; import org.apache.solr.client.solrj.jetty.SSLConfig; import org.apache.solr.client.solrj.request.CollectionAdminRequest; +import org.apache.solr.client.solrj.request.SolrQuery; +import org.apache.solr.client.solrj.response.QueryResponse; import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException.ErrorCode; import org.apache.solr.common.cloud.Aliases; @@ -159,6 +163,7 @@ public class MiniSolrCloudCluster implements SolrBackend { private final JettyConfig jettyConfig; private final String solrXml; private final boolean trackJettyMetrics; + private final String zkHost; // ZK connection string (used in quorum mode when zkServer is null) private final AtomicInteger nodeIds = new AtomicInteger(); private final Map solrClientByCollection = new ConcurrentHashMap<>(); @@ -293,6 +298,7 @@ public MiniSolrCloudCluster( } } this.zkServer = zkTestServer; + this.zkHost = null; // Not used in standard mode try (SolrZkClient zkClient = new SolrZkClient.Builder() @@ -340,6 +346,294 @@ public MiniSolrCloudCluster( } } + /** + * Create a MiniSolrCloudCluster with embedded ZooKeeper quorum mode. Each Solr node runs its own + * embedded ZooKeeper server, and together they form a quorum. + * + * @param numServers number of Solr servers (must be at least 3 for quorum) + * @param baseDir base directory that the mini cluster should be run from + * @param solrXml solr.xml file content + * @param jettyConfig Jetty configuration + * @param securityJson Optional security.json configuration + * @param trackJettyMetrics whether to track Jetty metrics + * @throws Exception if there was an error starting the cluster + */ + MiniSolrCloudCluster( + int numServers, + Path baseDir, + String solrXml, + JettyConfig jettyConfig, + Optional securityJson, + boolean trackJettyMetrics, + boolean useEmbeddedZkQuorum) + throws Exception { + + if (!useEmbeddedZkQuorum) { + throw new IllegalArgumentException("This constructor is only for embedded ZK quorum mode"); + } + if (numServers < 3) { + throw new IllegalArgumentException( + "ZooKeeper quorum requires at least 3 nodes, got: " + numServers); + } + + Objects.requireNonNull(securityJson); + this.baseDir = Objects.requireNonNull(baseDir); + this.jettyConfig = Objects.requireNonNull(jettyConfig); + this.solrXml = solrXml == null ? DEFAULT_CLOUD_SOLR_XML : solrXml; + this.trackJettyMetrics = trackJettyMetrics; + this.externalZkServer = true; // No ZkTestServer in quorum mode + this.zkServer = null; // No single ZK server + + log.info("Starting cluster of {} servers with embedded ZK quorum in {}", numServers, baseDir); + Files.createDirectories(baseDir); + + // Phase 1: Reserve random ports for all nodes + int[] ports = reservePortPairs(numServers); + + // Build the zkHost string with all ZK ports (Solr port + 1000) + StringBuilder zkHostBuilder = new StringBuilder(); + for (int i = 0; i < numServers; i++) { + if (i > 0) { + zkHostBuilder.append(","); + } + int zkPort = ports[i] + 1000; + zkHostBuilder.append("127.0.0.1:").append(zkPort); + } + this.zkHost = zkHostBuilder.toString(); // Save for later use + + if (log.isInfoEnabled()) { + log.info("Reserved ports for {} nodes: {}", numServers, Arrays.toString(ports)); + log.info("ZK connection string: {}", this.zkHost); + } + + // Set system properties for embedded ZK quorum mode + // Note: solr.zookeeper.server.enabled is NOT needed here — the zookeeper_quorum node role alone + // is sufficient to trigger embedded ZK startup in quorum mode. + System.setProperty("solr.security.manager.enabled", "false"); + System.setProperty("solr.node.roles", "data:on,overseer:allowed,zookeeper_quorum:on"); + System.setProperty("solr.test.sys.prop1", "propone"); + System.setProperty("solr.test.sys.prop2", "proptwo"); + System.setProperty("solr.zookeeper.client.timeout", "300000"); // 5 minutes + + // Phase 2: Start all nodes in parallel + List> startups = new ArrayList<>(numServers); + for (int i = 0; i < numServers; i++) { + final int solrPort = ports[i]; + final String nodeName = newNodeName(); + startups.add( + () -> { + Path runnerPath = createInstancePath(nodeName); + Files.write(runnerPath.resolve("solr.xml"), solrXml.getBytes(StandardCharsets.UTF_8)); + + Properties nodeProps = new Properties(); + nodeProps.setProperty("zkHost", this.zkHost); + nodeProps.setProperty("hostPort", String.valueOf(solrPort)); + + JettyConfig newConfig = JettyConfig.builder(jettyConfig).setPort(solrPort).build(); + + JettySolrRunner jetty = + !trackJettyMetrics + ? new JettySolrRunner(runnerPath.toString(), nodeProps, newConfig) + : new JettySolrRunnerWithMetrics(runnerPath.toString(), nodeProps, newConfig); + + int zkPort = solrPort + 1000; + log.info("Starting {} on port {} with ZK on port {}", nodeName, solrPort, zkPort); + jetty.start(); + log.info("Node {} started successfully", nodeName); + + jettys.add(jetty); + synchronized (startupWait) { + startupWait.notifyAll(); + } + return jetty; + }); + } + + final ExecutorService executorLauncher = + ExecutorUtil.newMDCAwareCachedThreadPool(new SolrNamedThreadFactory("jetty-launcher")); + Collection> futures = executorLauncher.invokeAll(startups); + ExecutorUtil.shutdownAndAwaitTermination(executorLauncher); + Exception startupError = + checkForExceptions( + "Error starting up MiniSolrCloudCluster with embedded ZK quorum", futures); + if (startupError != null) { + try { + this.shutdown(); + } catch (Throwable t) { + startupError.addSuppressed(t); + } + throw startupError; + } + + log.info("All {} nodes started, waiting for quorum formation...", numServers); + TimeOut zkTimeout = new TimeOut(30, TimeUnit.SECONDS, TimeSource.NANO_TIME); + while (!zkTimeout.hasTimedOut()) { + try (SolrZkClient zkProbe = + new SolrZkClient.Builder() + .withUrl(this.zkHost) + .withTimeout(3000, TimeUnit.MILLISECONDS) + .build()) { + if (zkProbe.isConnected()) { + break; + } + } catch (Exception ignored) { + } + Thread.sleep(500); + } + if (zkTimeout.hasTimedOut()) { + throw new TimeoutException("ZK quorum did not form within 30 seconds on " + this.zkHost); + } + + // Initialize ZK paths and security (if provided) + try (SolrZkClient zkClient = + new SolrZkClient.Builder() + .withUrl(this.zkHost) + .withTimeout(60000, TimeUnit.MILLISECONDS) + .build()) { + if (!zkClient.exists("/solr")) { + zkClient.makePath("/solr", true); + } + + if (jettyConfig.sslConfig != null && jettyConfig.sslConfig.isSSLMode()) { + zkClient.makePath( + "/solr" + ZkStateReader.CLUSTER_PROPS, + "{'urlScheme':'https'}".getBytes(StandardCharsets.UTF_8), + true); + } + if (securityJson.isPresent()) { + zkClient.makePath( + "/solr/security.json", securityJson.get().getBytes(Charset.defaultCharset()), true); + } + } + + solrClient = buildSolrClientForQuorum(this.zkHost); + + if (numServers > 0) { + waitForAllNodes(numServers, 60); + } + + log.info("Embedded ZK quorum cluster started successfully with {} nodes", numServers); + } + + /** + * Reserves port pairs for embedded ZK quorum mode. For each node, we need both a Solr port and a + * ZK port (Solr port + 1000). This method ensures both ports in each pair are available before + * returning. + * + *

The method keeps all ServerSockets open during the search to prevent race conditions where + * another process might grab a port between our check and actual usage. + * + * @param numPairs the number of port pairs to reserve + * @return array of Solr ports (ZK ports are Solr port + 1000) + * @throws IOException if unable to find enough available port pairs + */ + private int[] reservePortPairs(int numPairs) throws IOException { + List solrSockets = new ArrayList<>(); + List zkSockets = new ArrayList<>(); + int[] ports = new int[numPairs]; + + try { + int pairsFound = 0; + int maxAttempts = numPairs * 100; // Reasonable limit to avoid infinite loops + int attempts = 0; + + while (pairsFound < numPairs && attempts < maxAttempts) { + attempts++; + ServerSocket solrSocket = null; + ServerSocket zkSocket = null; + ServerSocket zkQuorumSocket = null; + ServerSocket zkLeaderSocket = null; + + try { + // Try to get a random available port for Solr + solrSocket = new ServerSocket(0); + int solrPort = solrSocket.getLocalPort(); + int zkPort = solrPort + 1000; + + // Check if ZK ports would exceed the valid port range (0-65535) + if (zkPort + 2 > 65535) { + solrSocket.close(); + continue; // Skip this port and try again + } + + // Verify the ZK client port and quorum/election ports (+1/+2) are all available + zkSocket = new ServerSocket(zkPort); + zkQuorumSocket = new ServerSocket(zkPort + 1); + zkLeaderSocket = new ServerSocket(zkPort + 2); + + // All ports are available - keep the sockets and record the Solr port + solrSockets.add(solrSocket); + zkSockets.add(zkSocket); + zkSockets.add(zkQuorumSocket); + zkSockets.add(zkLeaderSocket); + ports[pairsFound] = solrPort; + pairsFound++; + + if (log.isDebugEnabled()) { + log.debug( + "Reserved port pair {}/{}: Solr={}, ZK={}", pairsFound, numPairs, solrPort, zkPort); + } + + } catch (IOException | IllegalArgumentException e) { + // A port was not available or invalid, close sockets and try again + for (ServerSocket s : + new ServerSocket[] {solrSocket, zkSocket, zkQuorumSocket, zkLeaderSocket}) { + if (s != null) { + try { + s.close(); + } catch (IOException ignored) { + } + } + } + } + } + + if (pairsFound < numPairs) { + throw new IOException( + "Unable to find " + numPairs + " available port pairs after " + attempts + " attempts"); + } + return ports; + + } finally { + // Close all sockets now that we've recorded the ports + // The ports will remain available for immediate reuse + for (ServerSocket socket : solrSockets) { + try { + socket.close(); + } catch (IOException e) { + log.warn("Error closing Solr socket", e); + } + } + for (ServerSocket socket : zkSockets) { + try { + socket.close(); + } catch (IOException e) { + log.warn("Error closing ZK socket", e); + } + } + } + } + + /** + * Get the ZK connection string. Works for both standard mode (using zkServer) and quorum mode + * (using zkHost field). + * + * @return ZK connection string + */ + private String getZkAddress() { + if (zkHost != null) { + return zkHost; // Quorum mode + } + return zkServer.getZkAddress(); // Standard mode + } + + private CloudSolrClient buildSolrClientForQuorum(String zkHost) { + return new CloudLegacySolrClient.Builder(List.of(zkHost), Optional.empty()) + .withSocketTimeout(90000, TimeUnit.MILLISECONDS) + .withConnectionTimeout(15000, TimeUnit.MILLISECONDS) + .build(); + } + private void waitForAllNodes(int numServers, int timeoutSeconds) throws InterruptedException, TimeoutException { log.info("waitForAllNodes: numServers={}", numServers); @@ -389,6 +683,91 @@ public void waitForNode(JettySolrRunner jetty, int timeoutSeconds) timeoutSeconds, TimeUnit.SECONDS, (o, n) -> n != null && n.contains(nodeName)); } + /** + * Wait for the expected number of live nodes in the cluster. + * + * @param expectedCount expected number of live nodes + * @param timeoutSeconds timeout in seconds + * @throws InterruptedException if interrupted while waiting + * @throws TimeoutException if the expected count is not reached within the timeout + */ + // TODO: This method counts locally running Jetty processes rather than querying ZooKeeper + // live_nodes. In embedded-quorum mode this is approximately correct (ZK and Solr share a + // process), but ZK state can lag behind Jetty state. Consider implementing using + // getZkStateReader().waitForLiveNodes(...) for a more correct implementation. + public void waitForLiveNodes(int expectedCount, int timeoutSeconds) + throws InterruptedException, TimeoutException { + TimeOut timeout = new TimeOut(timeoutSeconds, TimeUnit.SECONDS, TimeSource.NANO_TIME); + while (!timeout.hasTimedOut()) { + long runningNodes = jettys.stream().filter(JettySolrRunner::isRunning).count(); + if (runningNodes == expectedCount) { + log.info("Verified {} live nodes", runningNodes); + return; + } + Thread.sleep(200); + } + // Final check after timeout + long actualCount = jettys.stream().filter(JettySolrRunner::isRunning).count(); + throw new TimeoutException( + "Live node count mismatch: expected " + expectedCount + " but got " + actualCount); + } + + /** + * Wait for the document count in a collection to reach the expected value. + * + * @param collectionName name of the collection to check + * @param expectedCount expected number of documents + * @param description description for logging + * @param timeoutValue timeout value in seconds + * @param timeoutUnit timeout unit + * @throws InterruptedException if interrupted while waiting + * @throws TimeoutException if the expected count is not reached within the timeout + */ + public void waitForDocCount( + String collectionName, + long expectedCount, + String description, + int timeoutValue, + TimeUnit timeoutUnit) + throws InterruptedException, TimeoutException { + TimeOut timeout = new TimeOut(timeoutValue, timeoutUnit, TimeSource.NANO_TIME); + SolrClient client = getSolrClient(collectionName); + while (!timeout.hasTimedOut()) { + try { + QueryResponse response = client.query(new SolrQuery("*:*").setRows(0)); + long actualCount = response.getResults().getNumFound(); + if (actualCount == expectedCount) { + log.info("Verified {}: {} documents", description, actualCount); + return; + } + Thread.sleep(100); + } catch (Exception e) { + // Cluster might be temporarily unavailable during recovery + Thread.sleep(500); + } + } + // Final check after timeout + try { + QueryResponse response = client.query(new SolrQuery("*:*").setRows(0)); + long actualCount = response.getResults().getNumFound(); + throw new TimeoutException( + "Document count mismatch for: " + + description + + ". Expected " + + expectedCount + + " but got " + + actualCount); + } catch (Exception e) { + throw new TimeoutException( + "Document count check failed for: " + + description + + ". Expected " + + expectedCount + + " but query failed: " + + e.getMessage()); + } + } + /** * This method wait till all Solr JVMs ( Jettys ) are running . It waits up to the timeout (in * seconds) for the JVMs to be up before throwing IllegalStateException. This is called @@ -482,7 +861,7 @@ public JettySolrRunner getJettySolrRunner(int index) { public JettySolrRunner startJettySolrRunner(String name, JettyConfig config, String solrXml) throws Exception { final Properties nodeProps = new Properties(); - nodeProps.setProperty("zkHost", zkServer.getZkAddress()); + nodeProps.setProperty("zkHost", getZkAddress()); Path runnerPath = createInstancePath(name); if (solrXml == null) { @@ -671,7 +1050,9 @@ public void shutdown() throws Exception { throw shutdownError; } } finally { - if (!externalZkServer) { + // Only shut down zkServer if it exists (not null) and we created it (!externalZkServer) + // In quorum mode, zkServer is null and each node's embedded ZK is shut down with the node + if (!externalZkServer && zkServer != null) { zkServer.shutdown(); } resetRecordingFlag(); @@ -699,7 +1080,7 @@ public CloudSolrClient getSolrClient(String collectionName) { collectionName, k -> { CloudSolrClient solrClient = - new CloudLegacySolrClient.Builder(List.of(zkServer.getZkAddress()), Optional.empty()) + new CloudLegacySolrClient.Builder(List.of(getZkAddress()), Optional.empty()) .withDefaultCollection(collectionName) .withSocketTimeout(90000) .withConnectionTimeout(15000) @@ -719,8 +1100,7 @@ public CloudSolrClient getSolrClient(String collectionName) { @Override // SolrBackend public CloudSolrClient newSolrClient(String collection) { - return new CloudLegacySolrClient.Builder( - List.of(getZkServer().getZkAddress()), Optional.empty()) + return new CloudLegacySolrClient.Builder(List.of(getZkAddress()), Optional.empty()) .withSocketTimeout(90000, TimeUnit.MILLISECONDS) .withConnectionTimeout(15000, TimeUnit.MILLISECONDS) .withDefaultCollection(collection) @@ -1009,6 +1389,7 @@ public static class Builder { EnvUtils.getPropertyAsBool("solr.cloud.overseer.enabled", true); private boolean formatZkServer = true; private boolean disableTraceIdGeneration = false; + private boolean useEmbeddedZkQuorum = false; /** * Create a builder @@ -1127,6 +1508,27 @@ public Builder formatZkServer(boolean formatZkServer) { return this; } + /** + * Configure cluster to use embedded ZooKeeper quorum mode where each Solr node runs its own + * ZooKeeper server. + * + *

When enabled, instead of using a separate {@link ZkTestServer}, each Solr node will run an + * embedded ZooKeeper server, and together they form a quorum. This tests the embedded ZK quorum + * functionality. + * + *

Requires at least 3 nodes for a valid quorum. + * + * @return this Builder + */ + public Builder withEmbeddedZkQuorum() { + if (nodeCount < 3) { + throw new IllegalArgumentException( + "ZooKeeper quorum requires at least 3 nodes, got: " + nodeCount); + } + this.useEmbeddedZkQuorum = true; + return this; + } + /** * Configure and run the {@link MiniSolrCloudCluster} * @@ -1150,16 +1552,33 @@ public MiniSolrCloudCluster build() throws Exception { } JettyConfig jettyConfig = jettyConfigBuilder.build(); - MiniSolrCloudCluster cluster = - new MiniSolrCloudCluster( - nodeCount, - baseDir, - solrXml, - jettyConfig, - null, - securityJson, - trackJettyMetrics, - formatZkServer); + MiniSolrCloudCluster cluster; + + if (useEmbeddedZkQuorum) { + // Use embedded ZK quorum mode constructor + cluster = + new MiniSolrCloudCluster( + nodeCount, + baseDir, + solrXml, + jettyConfig, + securityJson, + trackJettyMetrics, + true); // useEmbeddedZkQuorum = true + } else { + // Use standard constructor with ZkTestServer + cluster = + new MiniSolrCloudCluster( + nodeCount, + baseDir, + solrXml, + jettyConfig, + null, + securityJson, + trackJettyMetrics, + formatZkServer); + } + for (Config config : configs) { cluster.uploadConfigSet(config.path, config.name); } diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/ZkTestServer.java b/solr/test-framework/src/java/org/apache/solr/cloud/ZkTestServer.java index 8be5bec00db0..b1fdf316c890 100644 --- a/solr/test-framework/src/java/org/apache/solr/cloud/ZkTestServer.java +++ b/solr/test-framework/src/java/org/apache/solr/cloud/ZkTestServer.java @@ -16,7 +16,7 @@ */ package org.apache.solr.cloud; -import static org.apache.solr.cloud.SolrZkServer.ZK_WHITELIST_PROPERTY; +import static org.apache.solr.core.ZkContainer.ZK_WHITELIST_PROPERTY; import static org.junit.Assert.assertTrue; import java.io.IOException; @@ -39,6 +39,7 @@ import org.apache.solr.common.util.IOUtils; import org.apache.solr.common.util.ObjectReleaseTracker; import org.apache.solr.common.util.Utils; +import org.apache.solr.core.ZkContainer; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.Watcher; @@ -57,6 +58,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * Single-node embedded ZooKeeper for tests. Unlike production's {@link + * org.apache.zookeeper.server.embedded.ZooKeeperServerEmbedded}, this wires up {@code + * ZooKeeperServer} directly so it can install a {@code ZKDatabase} that counts watch registrations + * — letting tests detect watch leaks via configurable per-path limits. + */ public class ZkTestServer { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); @@ -483,7 +490,7 @@ public void run() throws InterruptedException, IOException { public void run(boolean solrFormat) throws InterruptedException, IOException { log.info("STARTING ZK TEST SERVER"); - SolrZkServer.ensureZkMaxCnxnsConfigured(); + ZkContainer.ensureZkMaxCnxnsConfigured(); ensureStatCommandWhitelisted(); AtomicReference zooError = new AtomicReference<>();