[grid] External datastore Redis-backed for Session Queue Signed-off-by: Viet Nguyen Duc <nguyenducviet4496@gmail.com>
diff --git a/java/src/org/openqa/selenium/grid/distributor/local/LocalDistributor.java b/java/src/org/openqa/selenium/grid/distributor/local/LocalDistributor.java index 835150e..7986427 100644 --- a/java/src/org/openqa/selenium/grid/distributor/local/LocalDistributor.java +++ b/java/src/org/openqa/selenium/grid/distributor/local/LocalDistributor.java
@@ -268,9 +268,7 @@ public static Distributor create(Config config) { SessionMap sessions = new SessionMapOptions(config).getSessionMap(); SecretOptions secretOptions = new SecretOptions(config); NewSessionQueueOptions newSessionQueueOptions = new NewSessionQueueOptions(config); - NewSessionQueue sessionQueue = - newSessionQueueOptions.getSessionQueue( - "org.openqa.selenium.grid.sessionqueue.remote.RemoteNewSessionQueue"); + NewSessionQueue sessionQueue = newSessionQueueOptions.getSessionQueue(); return new LocalDistributor( tracer, bus,
diff --git a/java/src/org/openqa/selenium/grid/sessionqueue/config/NewSessionQueueOptions.java b/java/src/org/openqa/selenium/grid/sessionqueue/config/NewSessionQueueOptions.java index ef6e74e..3cc64c4 100644 --- a/java/src/org/openqa/selenium/grid/sessionqueue/config/NewSessionQueueOptions.java +++ b/java/src/org/openqa/selenium/grid/sessionqueue/config/NewSessionQueueOptions.java
@@ -35,6 +35,8 @@ public class NewSessionQueueOptions { static final String SESSION_QUEUE_SECTION = "sessionqueue"; + static final String DEFAULT_SESSION_QUEUE = + "org.openqa.selenium.grid.sessionqueue.remote.RemoteNewSessionQueue"; static final int DEFAULT_MAXIMUM_RESPONSE_DELAY = 8; static final int DEFAULT_REQUEST_TIMEOUT = 300; static final int DEFAULT_REQUEST_TIMEOUT_PERIOD = 10; @@ -50,6 +52,12 @@ public NewSessionQueueOptions(Config config) { public URI getSessionQueueUri() { + BaseServerOptions serverOptions = new BaseServerOptions(config); + String scheme = + config + .get(SESSION_QUEUE_SECTION, "scheme") + .orElse((serverOptions.isSecure() || serverOptions.isSelfSigned()) ? "https" : "http"); + Optional<URI> host = config .get(SESSION_QUEUE_SECTION, "host") @@ -72,8 +80,6 @@ public URI getSessionQueueUri() { return host.get(); } - BaseServerOptions serverOptions = new BaseServerOptions(config); - String schema = (serverOptions.isSecure() || serverOptions.isSelfSigned()) ? "https" : "http"; Optional<Integer> port = config.getInt(SESSION_QUEUE_SECTION, "port"); Optional<String> hostname = config.get(SESSION_QUEUE_SECTION, "hostname"); @@ -82,7 +88,7 @@ public URI getSessionQueueUri() { } try { - return new URI(schema, null, hostname.get(), port.get(), "", null, null); + return new URI(scheme, null, hostname.get(), port.get(), "", null, null); } catch (URISyntaxException e) { throw new ConfigException( "Session queue server uri configured through host (%s) and port (%d) is not a valid URI", @@ -90,6 +96,87 @@ public URI getSessionQueueUri() { } } + /** + * Gets the Redis URI for Redis-backed session queue configuration. + * + * @return Redis URI constructed from hostname and port configuration + */ + public URI getRedisUri() { + Optional<Integer> port = config.getInt(SESSION_QUEUE_SECTION, "port"); + Optional<String> hostname = config.get(SESSION_QUEUE_SECTION, "hostname"); + + if (!(port.isPresent() && hostname.isPresent())) { + throw new ConfigException( + "Unable to determine Redis hostname and port for the session queue"); + } + + try { + return new URI("redis", null, hostname.get(), port.get(), "", null, null); + } catch (URISyntaxException e) { + throw new ConfigException( + "Redis session queue uri configured through hostname (%s) and port (%d) is not a valid" + + " URI", + hostname.get(), port.get()); + } + } + + /** + * Gets the session queue scheme (e.g., "redis", "local", "remote"). + * + * @return the session queue scheme + */ + public Optional<String> getSessionQueueScheme() { + return config.get(SESSION_QUEUE_SECTION, "scheme"); + } + + /** + * Gets the session queue implementation class name. + * + * @return the implementation class name + */ + public Optional<String> getSessionQueueImplementation() { + return config.get(SESSION_QUEUE_SECTION, "implementation"); + } + + /** + * Gets the session queue hostname. + * + * @return the hostname + */ + public Optional<String> getSessionQueueHostname() { + return config.get(SESSION_QUEUE_SECTION, "hostname"); + } + + /** + * Gets the session queue port. + * + * @return the port number + */ + public Optional<Integer> getSessionQueuePort() { + return config.getInt(SESSION_QUEUE_SECTION, "port"); + } + + @ManagedAttribute(name = "SessionQueueScheme") + public String getSessionQueueSchemeAttribute() { + return getSessionQueueScheme().orElse("http"); + } + + @ManagedAttribute(name = "SessionQueueImplementation") + public String getSessionQueueImplementationAttribute() { + return getSessionQueueImplementation() + .orElse("org.openqa.selenium.grid.sessionqueue.local.LocalNewSessionQueue"); + } + + @ManagedAttribute(name = "SessionQueueHostname") + public String getSessionQueueHostnameAttribute() { + return getSessionQueueHostname().orElse("localhost"); + } + + @ManagedAttribute(name = "SessionQueuePort") + public int getSessionQueuePortAttribute() { + return getSessionQueuePort().orElse(-1); + } + public Duration getMaximumResponseDelay() { int timeout = config @@ -156,8 +243,8 @@ public long getRetryIntervalMilliseconds() { return getSessionRequestRetryInterval().toMillis(); } - public NewSessionQueue getSessionQueue(String implementation) { + public NewSessionQueue getSessionQueue() { return config.getClass( - SESSION_QUEUE_SECTION, "implementation", NewSessionQueue.class, implementation); + SESSION_QUEUE_SECTION, "implementation", NewSessionQueue.class, DEFAULT_SESSION_QUEUE); } }
diff --git a/java/src/org/openqa/selenium/grid/sessionqueue/httpd/NewSessionQueueServer.java b/java/src/org/openqa/selenium/grid/sessionqueue/httpd/NewSessionQueueServer.java index e02eeeb..9493b70 100644 --- a/java/src/org/openqa/selenium/grid/sessionqueue/httpd/NewSessionQueueServer.java +++ b/java/src/org/openqa/selenium/grid/sessionqueue/httpd/NewSessionQueueServer.java
@@ -50,8 +50,6 @@ public class NewSessionQueueServer extends TemplateGridServerCommand { private static final Logger LOG = Logger.getLogger(NewSessionQueueServer.class.getName()); - private static final String LOCAL_NEW_SESSION_QUEUE = - "org.openqa.selenium.grid.sessionqueue.local.LocalNewSessionQueue"; @Override public String getName() { @@ -87,7 +85,7 @@ protected Config getDefaultConfig() { protected Handlers createHandlers(Config config) { NewSessionQueueOptions queueOptions = new NewSessionQueueOptions(config); - NewSessionQueue sessionQueue = queueOptions.getSessionQueue(LOCAL_NEW_SESSION_QUEUE); + NewSessionQueue sessionQueue = queueOptions.getSessionQueue(); return new Handlers( Route.combine(
diff --git a/java/src/org/openqa/selenium/grid/sessionqueue/redis/BUILD.bazel b/java/src/org/openqa/selenium/grid/sessionqueue/redis/BUILD.bazel new file mode 100644 index 0000000..d87d585 --- /dev/null +++ b/java/src/org/openqa/selenium/grid/sessionqueue/redis/BUILD.bazel
@@ -0,0 +1,30 @@ +load("@rules_jvm_external//:defs.bzl", "artifact") +load("//java:defs.bzl", "java_export") +load("//java:version.bzl", "SE_VERSION") + +java_export( + name = "redis", + srcs = glob(["*.java"]), + maven_coordinates = "org.seleniumhq.selenium:selenium-session-queue-redis:%s" % SE_VERSION, + pom_template = "//java/src/org/openqa/selenium:template-pom", + tags = [ + "release-artifact", + ], + visibility = [ + "//visibility:public", + ], + exports = [ + "//java/src/org/openqa/selenium/grid", + ], + deps = [ + "//java:auto-service", + "//java/src/org/openqa/selenium/grid", + "//java/src/org/openqa/selenium/json", + "//java/src/org/openqa/selenium/redis", + "//java/src/org/openqa/selenium/remote", + artifact("com.beust:jcommander"), + artifact("com.google.guava:guava"), + artifact("io.lettuce:lettuce-core"), + artifact("org.redisson:redisson"), + ], +)
diff --git a/java/src/org/openqa/selenium/grid/sessionqueue/redis/RedisBackedSessionQueue.java b/java/src/org/openqa/selenium/grid/sessionqueue/redis/RedisBackedSessionQueue.java new file mode 100644 index 0000000..2be0fa8 --- /dev/null +++ b/java/src/org/openqa/selenium/grid/sessionqueue/redis/RedisBackedSessionQueue.java
@@ -0,0 +1,537 @@ +// Licensed to the Software Freedom Conservancy (SFC) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The SFC licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.openqa.selenium.grid.sessionqueue.redis; + +import static org.openqa.selenium.remote.tracing.Tags.EXCEPTION; + +import com.google.common.collect.ImmutableMap; +import io.lettuce.core.KeyValue; +import java.io.Closeable; +import java.net.URI; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import java.util.logging.Level; +import java.util.logging.Logger; +import org.openqa.selenium.Capabilities; +import org.openqa.selenium.SessionNotCreatedException; +import org.openqa.selenium.grid.config.Config; +import org.openqa.selenium.grid.data.CreateSessionResponse; +import org.openqa.selenium.grid.data.RequestId; +import org.openqa.selenium.grid.data.SessionRequest; +import org.openqa.selenium.grid.data.SessionRequestCapability; +import org.openqa.selenium.grid.jmx.JMXHelper; +import org.openqa.selenium.grid.jmx.ManagedAttribute; +import org.openqa.selenium.grid.jmx.ManagedService; +import org.openqa.selenium.grid.log.LoggingOptions; +import org.openqa.selenium.grid.security.Secret; +import org.openqa.selenium.grid.security.SecretOptions; +import org.openqa.selenium.grid.sessionqueue.NewSessionQueue; +import org.openqa.selenium.grid.sessionqueue.config.NewSessionQueueOptions; +import org.openqa.selenium.internal.Either; +import org.openqa.selenium.internal.Require; +import org.openqa.selenium.json.Json; +import org.openqa.selenium.redis.GridRedisClient; +import org.openqa.selenium.remote.http.HttpResponse; +import org.openqa.selenium.remote.tracing.AttributeKey; +import org.openqa.selenium.remote.tracing.AttributeMap; +import org.openqa.selenium.remote.tracing.Span; +import org.openqa.selenium.remote.tracing.Status; +import org.openqa.selenium.remote.tracing.Tracer; + +/** + * A Redis-backed implementation of {@link NewSessionQueue} that stores session requests in Redis + * for distributed session queue management across multiple Grid instances. + */ +@ManagedService( + objectName = "org.seleniumhq.grid:type=SessionQueue,name=RedisBackedSessionQueue", + description = "Redis backed session queue") +public class RedisBackedSessionQueue extends NewSessionQueue implements Closeable { + + private static final Logger LOG = Logger.getLogger(RedisBackedSessionQueue.class.getName()); + private static final Json JSON = new Json(); + private static final String QUEUE_KEY = "session:queue"; + private static final String REQUEST_KEY_PREFIX = "session:request:"; + private static final String ENQUEUE_TIME_KEY_PREFIX = "session:enqueue_time:"; + + // Redis operation attribute keys + private static final String REDIS_OPERATION = "redis.operation"; + private static final String REDIS_KEY = "redis.key"; + private static final String REDIS_VALUE = "redis.value"; + private static final String REDIS_URI = "redis.uri"; + + private final GridRedisClient connection; + private final URI redisUri; + + public RedisBackedSessionQueue(Tracer tracer, Secret registrationSecret, URI redisUri) { + super(tracer, registrationSecret); + this.redisUri = Require.nonNull("Redis URI", redisUri); + this.connection = createRedisClient(redisUri); + + new JMXHelper().register(this); + } + + /** + * Protected constructor for testing that allows dependency injection. + * + * @param tracer the tracer for observability + * @param registrationSecret the registration secret + * @param redisUri the Redis URI + * @param redisClient the Redis client to use (for testing) + */ + protected RedisBackedSessionQueue( + Tracer tracer, Secret registrationSecret, URI redisUri, GridRedisClient redisClient) { + super(tracer, registrationSecret); + this.redisUri = Require.nonNull("Redis URI", redisUri); + this.connection = redisClient; + + new JMXHelper().register(this); + } + + /** + * Protected constructor for testing that allows dependency injection and JMX registration + * control. + * + * @param tracer the tracer for observability + * @param registrationSecret the registration secret + * @param redisUri the Redis URI + * @param redisClient the Redis client to use (for testing) + * @param skipJmxRegistration whether to skip JMX registration (for testing) + */ + protected RedisBackedSessionQueue( + Tracer tracer, + Secret registrationSecret, + URI redisUri, + GridRedisClient redisClient, + boolean skipJmxRegistration) { + super(tracer, registrationSecret); + this.redisUri = Require.nonNull("Redis URI", redisUri); + this.connection = redisClient; + + if (!skipJmxRegistration) { + new JMXHelper().register(this); + } + } + + /** + * Creates a new GridRedisClient. This method can be overridden in tests to provide a mock Redis + * client. + * + * @param redisUri the Redis URI + * @return the GridRedisClient instance + */ + protected GridRedisClient createRedisClient(URI redisUri) { + return new GridRedisClient(redisUri); + } + + /** + * Creates a new RedisBackedSessionQueue from configuration. This is the factory method used by + * Selenium Grid's configuration system. + * + * @param config the configuration object + * @return a new RedisBackedSessionQueue instance + */ + public static NewSessionQueue create(Config config) { + Tracer tracer = new LoggingOptions(config).getTracer(); + Secret secret = new SecretOptions(config).getRegistrationSecret(); + NewSessionQueueOptions queueOptions = new NewSessionQueueOptions(config); + + // For RedisBackedSessionQueue, always construct a Redis URI from hostname/port + // This ensures we get redis:// scheme instead of http:// scheme + URI redisUri = queueOptions.getRedisUri(); + + return new RedisBackedSessionQueue(tracer, secret, redisUri); + } + + @Override + public boolean isReady() { + return getRedisClient().isOpen(); + } + + /** + * Gets the Redis client connection. This method can be overridden in tests to provide a mock + * Redis client. + * + * @return the GridRedisClient instance + */ + protected GridRedisClient getRedisClient() { + return connection; + } + + @Override + public boolean peekEmpty() { + try (Span span = tracer.getCurrentContext().createSpan("LLEN " + QUEUE_KEY)) { + AttributeMap attributeMap = tracer.createAttributeMap(); + setCommonSpanAttributes(span); + setCommonEventAttributes(attributeMap); + + span.setAttribute(REDIS_OPERATION, "LLEN"); + span.setAttribute(REDIS_KEY, QUEUE_KEY); + attributeMap.put(REDIS_OPERATION, "LLEN"); + attributeMap.put(REDIS_KEY, QUEUE_KEY); + + try { + Long queueLength = getRedisClient().getConnection().sync().llen(QUEUE_KEY); + boolean isEmpty = queueLength == 0; + + attributeMap.put("queue.empty", isEmpty); + attributeMap.put("queue.length", queueLength); + span.addEvent("Checked queue emptiness", attributeMap); + + return isEmpty; + } catch (Exception e) { + span.setAttribute("error", true); + span.setStatus(Status.CANCELLED); + EXCEPTION.accept(attributeMap, e); + attributeMap.put( + AttributeKey.EXCEPTION_MESSAGE.getKey(), + "Unable to check if queue is empty: " + e.getMessage()); + span.addEvent(AttributeKey.EXCEPTION_EVENT.getKey(), attributeMap); + LOG.log(Level.SEVERE, "Failed to check if queue is empty", e); + return false; + } + } + } + + @Override + public HttpResponse addToQueue(SessionRequest request) { + Require.nonNull("SessionRequest to add", request); + + try (Span span = + tracer.getCurrentContext().createSpan("LPUSH " + QUEUE_KEY + " and MSET request data")) { + AttributeMap attributeMap = tracer.createAttributeMap(); + setCommonSpanAttributes(span); + setCommonEventAttributes(attributeMap); + + String requestId = request.getRequestId().toString(); + String requestKey = REQUEST_KEY_PREFIX + requestId; + String enqueueTimeKey = ENQUEUE_TIME_KEY_PREFIX + requestId; + String requestJson = JSON.toJson(request); + String enqueueTime = request.getEnqueued().toString(); + + span.setAttribute(REDIS_OPERATION, "LPUSH+MSET"); + span.setAttribute(REDIS_KEY, QUEUE_KEY); + span.setAttribute(REDIS_VALUE, requestId); + attributeMap.put(REDIS_OPERATION, "LPUSH+MSET"); + attributeMap.put(REDIS_KEY, QUEUE_KEY); + attributeMap.put(REDIS_VALUE, requestId); + attributeMap.put("request.id", requestId); + + try { + // Store request data and enqueue time + getRedisClient() + .mset( + ImmutableMap.of( + requestKey, requestJson, + enqueueTimeKey, enqueueTime)); + + // Add request ID to the queue + Long queueLength = getRedisClient().getConnection().sync().lpush(QUEUE_KEY, requestId); + + attributeMap.put("queue.length", queueLength); + span.addEvent("Added request to queue", attributeMap); + + HttpResponse resp = new HttpResponse(); + resp.setStatus(200); + return resp; + } catch (Exception e) { + span.setAttribute("error", true); + span.setStatus(Status.CANCELLED); + EXCEPTION.accept(attributeMap, e); + attributeMap.put( + AttributeKey.EXCEPTION_MESSAGE.getKey(), + "Unable to add session request to Redis: " + e.getMessage()); + span.addEvent(AttributeKey.EXCEPTION_EVENT.getKey(), attributeMap); + LOG.log(Level.SEVERE, "Failed to add session request to queue", e); + + HttpResponse resp = new HttpResponse(); + resp.setStatus(500); + return resp; + } + } + } + + @Override + public boolean retryAddToQueue(SessionRequest request) { + HttpResponse response = addToQueue(request); + return response.getStatus() == 200; + } + + @Override + public Optional<SessionRequest> remove(RequestId requestId) { + Require.nonNull("RequestId to remove", requestId); + + try (Span span = tracer.getCurrentContext().createSpan("LREM and GET request data")) { + AttributeMap attributeMap = tracer.createAttributeMap(); + setCommonSpanAttributes(span); + setCommonEventAttributes(attributeMap); + + String requestIdStr = requestId.toString(); + String requestKey = REQUEST_KEY_PREFIX + requestIdStr; + String enqueueTimeKey = ENQUEUE_TIME_KEY_PREFIX + requestIdStr; + + span.setAttribute(REDIS_OPERATION, "GET+LREM+DEL"); + span.setAttribute(REDIS_KEY, requestKey); + attributeMap.put(REDIS_OPERATION, "GET+LREM+DEL"); + attributeMap.put(REDIS_KEY, requestKey); + attributeMap.put("request.id", requestIdStr); + + try { + // Get the request data first + String requestJson = getRedisClient().get(requestKey); + + if (requestJson != null) { + // Remove from queue and delete associated data + Long removedCount = + getRedisClient().getConnection().sync().lrem(QUEUE_KEY, 1, requestIdStr); + getRedisClient().del(requestKey, enqueueTimeKey); + + attributeMap.put("removed.count", removedCount); + span.addEvent("Removed request from queue", attributeMap); + + return Optional.of(JSON.toType(requestJson, SessionRequest.class)); + } else { + attributeMap.put("request.found", false); + span.addEvent("Session request not found in queue", attributeMap); + } + } catch (Exception e) { + span.setAttribute("error", true); + span.setStatus(Status.CANCELLED); + EXCEPTION.accept(attributeMap, e); + attributeMap.put( + AttributeKey.EXCEPTION_MESSAGE.getKey(), + "Unable to remove session request from queue: " + e.getMessage()); + span.addEvent(AttributeKey.EXCEPTION_EVENT.getKey(), attributeMap); + LOG.log(Level.SEVERE, "Failed to remove session request from queue", e); + } + } + return Optional.empty(); + } + + @Override + public List<SessionRequest> getNextAvailable(Map<Capabilities, Long> stereotypes) { + try (Span span = tracer.getCurrentContext().createSpan("RPOP and GET next available request")) { + AttributeMap attributeMap = tracer.createAttributeMap(); + setCommonSpanAttributes(span); + setCommonEventAttributes(attributeMap); + + span.setAttribute(REDIS_OPERATION, "RPOP+GET"); + span.setAttribute(REDIS_KEY, QUEUE_KEY); + attributeMap.put(REDIS_OPERATION, "RPOP+GET"); + attributeMap.put(REDIS_KEY, QUEUE_KEY); + + try { + // Get the next request ID from the queue (FIFO - right pop from left push) + String requestIdStr = getRedisClient().getConnection().sync().rpop(QUEUE_KEY); + + if (requestIdStr != null) { + String requestKey = REQUEST_KEY_PREFIX + requestIdStr; + String requestJson = getRedisClient().get(requestKey); + + if (requestJson != null) { + SessionRequest request = JSON.toType(requestJson, SessionRequest.class); + attributeMap.put("requests.found", 1); + attributeMap.put("request.id", requestIdStr); + span.addEvent("Retrieved next available session request", attributeMap); + return List.of(request); + } else { + // Request data is missing, log warning but continue + LOG.log(Level.WARNING, "Request data missing for ID: " + requestIdStr); + } + } + + attributeMap.put("requests.found", 0); + span.addEvent("No session requests available", attributeMap); + } catch (Exception e) { + span.setAttribute("error", true); + span.setStatus(Status.CANCELLED); + EXCEPTION.accept(attributeMap, e); + attributeMap.put( + AttributeKey.EXCEPTION_MESSAGE.getKey(), + "Unable to get next available session request: " + e.getMessage()); + span.addEvent(AttributeKey.EXCEPTION_EVENT.getKey(), attributeMap); + LOG.log(Level.SEVERE, "Failed to get next available session request", e); + } + } + return List.of(); + } + + @Override + public boolean complete( + RequestId reqId, Either<SessionNotCreatedException, CreateSessionResponse> result) { + // For Redis implementation, we just need to remove the request from storage + // The request was already removed from the queue in getNextAvailable() + String requestIdStr = reqId.toString(); + String requestKey = REQUEST_KEY_PREFIX + requestIdStr; + String enqueueTimeKey = ENQUEUE_TIME_KEY_PREFIX + requestIdStr; + + try { + getRedisClient().del(requestKey, enqueueTimeKey); + return true; + } catch (Exception e) { + LOG.log(Level.WARNING, "Failed to clean up completed request: " + requestIdStr, e); + return false; + } + } + + @Override + public int clearQueue() { + try (Span span = + tracer.getCurrentContext().createSpan("Clear all session requests from queue")) { + AttributeMap attributeMap = tracer.createAttributeMap(); + setCommonSpanAttributes(span); + setCommonEventAttributes(attributeMap); + + span.setAttribute(REDIS_OPERATION, "LRANGE+DEL"); + span.setAttribute(REDIS_KEY, QUEUE_KEY); + attributeMap.put(REDIS_OPERATION, "LRANGE+DEL"); + attributeMap.put(REDIS_KEY, QUEUE_KEY); + + try { + // Get all request IDs from the queue + List<String> requestIds = getRedisClient().getConnection().sync().lrange(QUEUE_KEY, 0, -1); + int requestCount = requestIds.size(); + + if (requestCount > 0) { + // Delete all request data + List<String> keysToDelete = new ArrayList<>(); + for (String requestId : requestIds) { + keysToDelete.add(REQUEST_KEY_PREFIX + requestId); + keysToDelete.add(ENQUEUE_TIME_KEY_PREFIX + requestId); + } + + // Delete the queue and all request data + keysToDelete.add(QUEUE_KEY); + getRedisClient().del(keysToDelete.toArray(new String[0])); + } + + attributeMap.put("requests.cleared", requestCount); + span.addEvent("Cleared all session requests from queue", attributeMap); + return requestCount; + } catch (Exception e) { + span.setAttribute("error", true); + span.setStatus(Status.CANCELLED); + EXCEPTION.accept(attributeMap, e); + attributeMap.put( + AttributeKey.EXCEPTION_MESSAGE.getKey(), + "Unable to clear session queue: " + e.getMessage()); + span.addEvent(AttributeKey.EXCEPTION_EVENT.getKey(), attributeMap); + LOG.log(Level.SEVERE, "Failed to clear session queue", e); + return 0; + } + } + } + + @Override + public List<SessionRequestCapability> getQueueContents() { + try (Span span = tracer.getCurrentContext().createSpan("Get all session requests from queue")) { + AttributeMap attributeMap = tracer.createAttributeMap(); + setCommonSpanAttributes(span); + setCommonEventAttributes(attributeMap); + + span.setAttribute(REDIS_OPERATION, "LRANGE+MGET"); + span.setAttribute(REDIS_KEY, QUEUE_KEY); + attributeMap.put(REDIS_OPERATION, "LRANGE+MGET"); + attributeMap.put(REDIS_KEY, QUEUE_KEY); + + try { + // Get all request IDs from the queue (in order) + List<String> requestIds = getRedisClient().getConnection().sync().lrange(QUEUE_KEY, 0, -1); + List<SessionRequestCapability> contents = new ArrayList<>(); + + if (!requestIds.isEmpty()) { + // Get all request data in batch + String[] requestKeys = + requestIds.stream().map(id -> REQUEST_KEY_PREFIX + id).toArray(String[]::new); + + List<KeyValue<String, String>> requestData = getRedisClient().mget(requestKeys); + + for (int i = 0; i < requestIds.size(); i++) { + String requestIdStr = requestIds.get(i); + KeyValue<String, String> keyValue = requestData.get(i); + + if (keyValue != null && keyValue.hasValue()) { + try { + RequestId requestId = new RequestId(UUID.fromString(requestIdStr)); + SessionRequest request = JSON.toType(keyValue.getValue(), SessionRequest.class); + + SessionRequestCapability capability = + new SessionRequestCapability(requestId, request.getDesiredCapabilities()); + contents.add(capability); + } catch (Exception e) { + LOG.log( + Level.WARNING, + "Failed to parse session request from queue: " + requestIdStr, + e); + } + } + } + } + + attributeMap.put("queue.contents.size", contents.size()); + span.addEvent("Retrieved queue contents", attributeMap); + return contents; + } catch (Exception e) { + span.setAttribute("error", true); + span.setStatus(Status.CANCELLED); + EXCEPTION.accept(attributeMap, e); + attributeMap.put( + AttributeKey.EXCEPTION_MESSAGE.getKey(), + "Unable to get queue contents: " + e.getMessage()); + span.addEvent(AttributeKey.EXCEPTION_EVENT.getKey(), attributeMap); + LOG.log(Level.SEVERE, "Failed to get queue contents", e); + } + } + return List.of(); + } + + @Override + public void close() { + try { + getRedisClient().close(); + } catch (Exception e) { + LOG.log(Level.WARNING, "Failed to close Redis connection for SessionQueue", e); + } + } + + @ManagedAttribute(name = "RedisUri") + public String getRedisUri() { + return redisUri.toString(); + } + + @ManagedAttribute(name = "QueueSize") + public long getQueueSize() { + try { + return getRedisClient().getConnection().sync().llen(QUEUE_KEY); + } catch (Exception e) { + LOG.log(Level.WARNING, "Failed to get queue size", e); + return -1; + } + } + + private void setCommonSpanAttributes(Span span) { + span.setAttribute("span.kind", Span.Kind.CLIENT.toString()); + span.setAttribute(REDIS_URI, redisUri.toString()); + } + + private void setCommonEventAttributes(AttributeMap attributeMap) { + attributeMap.put(REDIS_URI, redisUri.toString()); + } +}
diff --git a/java/test/org/openqa/selenium/grid/sessionqueue/redis/BUILD.bazel b/java/test/org/openqa/selenium/grid/sessionqueue/redis/BUILD.bazel new file mode 100644 index 0000000..10adbef --- /dev/null +++ b/java/test/org/openqa/selenium/grid/sessionqueue/redis/BUILD.bazel
@@ -0,0 +1,22 @@ +load("@rules_jvm_external//:defs.bzl", "artifact") +load("//java:defs.bzl", "JUNIT5_DEPS", "java_test_suite") + +java_test_suite( + name = "MediumTests", + size = "medium", + srcs = glob(["*Test.java"]), + deps = [ + "//java/src/org/openqa/selenium/events/local", + "//java/src/org/openqa/selenium/grid/sessionqueue/redis", + "//java/src/org/openqa/selenium/json", + "//java/src/org/openqa/selenium/redis", + "//java/src/org/openqa/selenium/remote", + "//java/test/org/openqa/selenium/remote/tracing:tracing-support", + "//java/test/org/openqa/selenium/testing:test-base", + artifact("io.lettuce:lettuce-core"), + artifact("io.opentelemetry:opentelemetry-api"), + artifact("org.junit.jupiter:junit-jupiter-api"), + artifact("org.assertj:assertj-core"), + artifact("org.mockito:mockito-core"), + ] + JUNIT5_DEPS, +)
diff --git a/java/test/org/openqa/selenium/grid/sessionqueue/redis/RedisBackedSessionQueueTest.java b/java/test/org/openqa/selenium/grid/sessionqueue/redis/RedisBackedSessionQueueTest.java new file mode 100644 index 0000000..97abe20 --- /dev/null +++ b/java/test/org/openqa/selenium/grid/sessionqueue/redis/RedisBackedSessionQueueTest.java
@@ -0,0 +1,494 @@ +// Licensed to the Software Freedom Conservancy (SFC) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The SFC licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.openqa.selenium.grid.sessionqueue.redis; + +import static org.assertj.core.api.Assertions.*; +import static org.mockito.ArgumentMatchers.*; +import static org.mockito.Mockito.*; +import static org.openqa.selenium.remote.http.HttpMethod.POST; + +import io.lettuce.core.KeyValue; +import io.lettuce.core.RedisCommandExecutionException; +import io.lettuce.core.api.StatefulRedisConnection; +import io.lettuce.core.api.sync.RedisCommands; +import java.net.URI; +import java.time.Instant; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.openqa.selenium.Capabilities; +import org.openqa.selenium.SessionNotCreatedException; +import org.openqa.selenium.grid.data.CreateSessionResponse; +import org.openqa.selenium.grid.data.RequestId; +import org.openqa.selenium.grid.data.SessionRequest; +import org.openqa.selenium.grid.data.SessionRequestCapability; +import org.openqa.selenium.grid.jmx.ManagedService; +import org.openqa.selenium.grid.security.Secret; +import org.openqa.selenium.internal.Either; +import org.openqa.selenium.json.Json; +import org.openqa.selenium.redis.GridRedisClient; +import org.openqa.selenium.remote.http.Contents; +import org.openqa.selenium.remote.http.HttpRequest; +import org.openqa.selenium.remote.http.HttpResponse; +import org.openqa.selenium.remote.tracing.DefaultTestTracer; +import org.openqa.selenium.remote.tracing.Tracer; + +class RedisBackedSessionQueueTest { + + private static final Tracer tracer = DefaultTestTracer.createTracer(); + private static final Secret secret = new Secret("test-secret"); + private static final URI redisUri = URI.create("redis://localhost:6379"); + private static final Json JSON = new Json(); + + @Mock private GridRedisClient mockRedisClient; + @Mock private StatefulRedisConnection<String, String> mockConnection; + @Mock private RedisCommands<String, String> mockCommands; + + private TestableRedisBackedSessionQueue queue; + + @BeforeEach + void setUp() { + MockitoAnnotations.openMocks(this); + when(mockRedisClient.getConnection()).thenReturn(mockConnection); + when(mockConnection.sync()).thenReturn(mockCommands); + when(mockRedisClient.isOpen()).thenReturn(true); + + // Create queue with mocked Redis client and skip JMX registration + queue = new TestableRedisBackedSessionQueue(tracer, secret, redisUri, mockRedisClient, true); + } + + // Test-specific subclass that accepts a mock Redis client + @ManagedService + private static class TestableRedisBackedSessionQueue extends RedisBackedSessionQueue { + public TestableRedisBackedSessionQueue( + Tracer tracer, Secret registrationSecret, URI redisUri, GridRedisClient redisClient) { + super(tracer, registrationSecret, redisUri, redisClient); + } + + public TestableRedisBackedSessionQueue( + Tracer tracer, + Secret registrationSecret, + URI redisUri, + GridRedisClient redisClient, + boolean skipJmxRegistration) { + super(tracer, registrationSecret, redisUri, redisClient, skipJmxRegistration); + } + } + + @Test + void shouldThrowIllegalArgumentExceptionIfRedisUriIsNull() { + assertThatThrownBy(() -> new RedisBackedSessionQueue(tracer, secret, null)) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + void shouldThrowIllegalArgumentExceptionIfTracerIsNull() { + assertThatThrownBy(() -> new RedisBackedSessionQueue(null, secret, redisUri)) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + void shouldThrowIllegalArgumentExceptionIfSecretIsNull() { + assertThatThrownBy(() -> new RedisBackedSessionQueue(tracer, null, redisUri)) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + void isReadyShouldReturnTrueWhenRedisConnectionIsOpen() { + when(mockRedisClient.isOpen()).thenReturn(true); + assertThat(queue.isReady()).isTrue(); + } + + @Test + void isReadyShouldReturnFalseWhenRedisConnectionIsClosed() { + when(mockRedisClient.isOpen()).thenReturn(false); + assertThat(queue.isReady()).isFalse(); + } + + @Test + void peekEmptyShouldReturnTrueWhenQueueIsEmpty() { + when(mockCommands.llen("session:queue")).thenReturn(0L); + assertThat(queue.peekEmpty()).isTrue(); + } + + @Test + void peekEmptyShouldReturnFalseWhenQueueHasRequests() { + when(mockCommands.llen("session:queue")).thenReturn(2L); + assertThat(queue.peekEmpty()).isFalse(); + } + + @Test + void peekEmptyShouldReturnFalseOnRedisException() { + when(mockCommands.llen("session:queue")) + .thenThrow(new RedisCommandExecutionException("Redis error")); + assertThat(queue.peekEmpty()).isFalse(); + } + + @Test + void canAddSessionRequestToQueue() { + RequestId requestId = new RequestId(UUID.randomUUID()); + SessionRequest request = createSessionRequest(requestId); + + when(mockCommands.lpush("session:queue", requestId.toString())).thenReturn(1L); + + HttpResponse response = queue.addToQueue(request); + + assertThat(response.getStatus()).isEqualTo(200); + + // Verify Redis operations + verify(mockRedisClient) + .mset( + argThat( + map -> + map.containsKey("session:request:" + requestId.toString()) + && map.containsKey("session:enqueue_time:" + requestId.toString()))); + verify(mockCommands).lpush("session:queue", requestId.toString()); + } + + @Test + void addToQueueShouldReturn500OnRedisException() { + RequestId requestId = new RequestId(UUID.randomUUID()); + SessionRequest request = createSessionRequest(requestId); + + doThrow(new RedisCommandExecutionException("Redis error")).when(mockRedisClient).mset(any()); + + HttpResponse response = queue.addToQueue(request); + + assertThat(response.getStatus()).isEqualTo(500); + } + + @Test + void retryAddToQueueShouldReturnTrueOnSuccess() { + RequestId requestId = new RequestId(UUID.randomUUID()); + SessionRequest request = createSessionRequest(requestId); + + when(mockCommands.lpush("session:queue", requestId.toString())).thenReturn(1L); + + boolean result = queue.retryAddToQueue(request); + + assertThat(result).isTrue(); + } + + @Test + void retryAddToQueueShouldReturnFalseOnFailure() { + RequestId requestId = new RequestId(UUID.randomUUID()); + SessionRequest request = createSessionRequest(requestId); + + doThrow(new RedisCommandExecutionException("Redis error")).when(mockRedisClient).mset(any()); + + boolean result = queue.retryAddToQueue(request); + + assertThat(result).isFalse(); + } + + @Test + void canRemoveSessionRequestFromQueue() { + RequestId requestId = new RequestId(UUID.randomUUID()); + SessionRequest originalRequest = createSessionRequest(requestId); + String requestJson = JSON.toJson(originalRequest); + + when(mockRedisClient.get("session:request:" + requestId.toString())).thenReturn(requestJson); + when(mockCommands.lrem("session:queue", 1, requestId.toString())).thenReturn(1L); + + Optional<SessionRequest> removed = queue.remove(requestId); + + assertThat(removed).isPresent(); + assertThat(removed.get().getRequestId()).isEqualTo(requestId); + + // Verify Redis operations + verify(mockRedisClient).get("session:request:" + requestId.toString()); + verify(mockCommands).lrem("session:queue", 1, requestId.toString()); + verify(mockRedisClient) + .del( + "session:request:" + requestId.toString(), + "session:enqueue_time:" + requestId.toString()); + } + + @Test + void removeShouldReturnEmptyWhenRequestNotFound() { + RequestId requestId = new RequestId(UUID.randomUUID()); + + when(mockRedisClient.get("session:request:" + requestId.toString())).thenReturn(null); + + Optional<SessionRequest> removed = queue.remove(requestId); + + assertThat(removed).isEmpty(); + } + + @Test + void removeShouldReturnEmptyOnRedisException() { + RequestId requestId = new RequestId(UUID.randomUUID()); + + when(mockRedisClient.get("session:request:" + requestId.toString())) + .thenThrow(new RedisCommandExecutionException("Redis error")); + + Optional<SessionRequest> removed = queue.remove(requestId); + + assertThat(removed).isEmpty(); + } + + @Test + void getNextAvailableShouldReturnOldestRequest() { + RequestId requestId = new RequestId(UUID.randomUUID()); + SessionRequest originalRequest = createSessionRequest(requestId); + String requestJson = JSON.toJson(originalRequest); + + when(mockCommands.rpop("session:queue")).thenReturn(requestId.toString()); + when(mockRedisClient.get("session:request:" + requestId.toString())).thenReturn(requestJson); + + List<SessionRequest> next = queue.getNextAvailable(Map.of()); + + assertThat(next).hasSize(1); + assertThat(next.get(0).getRequestId()).isEqualTo(requestId); + + // Verify Redis operations + verify(mockCommands).rpop("session:queue"); + verify(mockRedisClient).get("session:request:" + requestId.toString()); + } + + @Test + void getNextAvailableShouldReturnEmptyWhenQueueIsEmpty() { + when(mockCommands.rpop("session:queue")).thenReturn(null); + + List<SessionRequest> next = queue.getNextAvailable(Map.of()); + + assertThat(next).isEmpty(); + } + + @Test + void getNextAvailableShouldReturnEmptyWhenRequestDataIsMissing() { + RequestId requestId = new RequestId(UUID.randomUUID()); + + when(mockCommands.rpop("session:queue")).thenReturn(requestId.toString()); + when(mockRedisClient.get("session:request:" + requestId.toString())).thenReturn(null); + + List<SessionRequest> next = queue.getNextAvailable(Map.of()); + + assertThat(next).isEmpty(); + } + + @Test + void getNextAvailableShouldReturnEmptyOnRedisException() { + when(mockCommands.rpop("session:queue")) + .thenThrow(new RedisCommandExecutionException("Redis error")); + + List<SessionRequest> next = queue.getNextAvailable(Map.of()); + + assertThat(next).isEmpty(); + } + + @Test + void completeShouldReturnTrueAndCleanupRequestData() { + RequestId requestId = new RequestId(UUID.randomUUID()); + CreateSessionResponse response = mock(CreateSessionResponse.class); + Either<SessionNotCreatedException, CreateSessionResponse> result = Either.right(response); + + boolean completed = queue.complete(requestId, result); + + assertThat(completed).isTrue(); + + // Verify cleanup operations + verify(mockRedisClient) + .del( + "session:request:" + requestId.toString(), + "session:enqueue_time:" + requestId.toString()); + } + + @Test + void completeShouldReturnFalseOnRedisException() { + RequestId requestId = new RequestId(UUID.randomUUID()); + CreateSessionResponse response = mock(CreateSessionResponse.class); + Either<SessionNotCreatedException, CreateSessionResponse> result = Either.right(response); + + doThrow(new RedisCommandExecutionException("Redis error")) + .when(mockRedisClient) + .del(anyString(), anyString()); + + boolean completed = queue.complete(requestId, result); + + assertThat(completed).isFalse(); + } + + @Test + void clearQueueShouldRemoveAllRequests() { + RequestId requestId1 = new RequestId(UUID.randomUUID()); + RequestId requestId2 = new RequestId(UUID.randomUUID()); + + when(mockCommands.lrange("session:queue", 0, -1)) + .thenReturn(List.of(requestId1.toString(), requestId2.toString())); + + int cleared = queue.clearQueue(); + + assertThat(cleared).isEqualTo(2); + + // Verify Redis operations + verify(mockCommands).lrange("session:queue", 0, -1); + verify(mockRedisClient) + .del( + "session:request:" + requestId1.toString(), + "session:enqueue_time:" + requestId1.toString(), + "session:request:" + requestId2.toString(), + "session:enqueue_time:" + requestId2.toString(), + "session:queue"); + } + + @Test + void clearQueueShouldReturn0WhenQueueIsEmpty() { + when(mockCommands.lrange("session:queue", 0, -1)).thenReturn(List.of()); + + int cleared = queue.clearQueue(); + + assertThat(cleared).isEqualTo(0); + } + + @Test + void clearQueueShouldReturn0OnRedisException() { + when(mockCommands.lrange("session:queue", 0, -1)) + .thenThrow(new RedisCommandExecutionException("Redis error")); + + int cleared = queue.clearQueue(); + + assertThat(cleared).isEqualTo(0); + } + + @Test + void getQueueContentsShouldReturnAllRequests() { + RequestId requestId1 = new RequestId(UUID.randomUUID()); + RequestId requestId2 = new RequestId(UUID.randomUUID()); + SessionRequest request1 = createSessionRequest(requestId1); + SessionRequest request2 = createSessionRequest(requestId2); + + when(mockCommands.lrange("session:queue", 0, -1)) + .thenReturn(List.of(requestId1.toString(), requestId2.toString())); + + when(mockRedisClient.mget( + "session:request:" + requestId1.toString(), "session:request:" + requestId2.toString())) + .thenReturn( + List.of( + KeyValue.just("session:request:" + requestId1.toString(), JSON.toJson(request1)), + KeyValue.just("session:request:" + requestId2.toString(), JSON.toJson(request2)))); + + List<SessionRequestCapability> contents = queue.getQueueContents(); + + assertThat(contents).hasSize(2); + assertThat(contents.get(0).getRequestId()).isEqualTo(requestId1); + assertThat(contents.get(1).getRequestId()).isEqualTo(requestId2); + } + + @Test + void getQueueContentsShouldReturnEmptyWhenQueueIsEmpty() { + when(mockCommands.lrange("session:queue", 0, -1)).thenReturn(List.of()); + + List<SessionRequestCapability> contents = queue.getQueueContents(); + + assertThat(contents).isEmpty(); + } + + @Test + void getQueueContentsShouldReturnEmptyOnRedisException() { + when(mockCommands.lrange("session:queue", 0, -1)) + .thenThrow(new RedisCommandExecutionException("Redis error")); + + List<SessionRequestCapability> contents = queue.getQueueContents(); + + assertThat(contents).isEmpty(); + } + + @Test + void getQueueContentsShouldHandleMissingRequestData() { + RequestId requestId1 = new RequestId(UUID.randomUUID()); + RequestId requestId2 = new RequestId(UUID.randomUUID()); + SessionRequest request1 = createSessionRequest(requestId1); + + when(mockCommands.lrange("session:queue", 0, -1)) + .thenReturn(List.of(requestId1.toString(), requestId2.toString())); + + when(mockRedisClient.mget( + "session:request:" + requestId1.toString(), "session:request:" + requestId2.toString())) + .thenReturn( + List.of( + KeyValue.just("session:request:" + requestId1.toString(), JSON.toJson(request1)), + KeyValue.empty("session:request:" + requestId2.toString()) // Missing data + )); + + List<SessionRequestCapability> contents = queue.getQueueContents(); + + assertThat(contents).hasSize(1); + assertThat(contents.get(0).getRequestId()).isEqualTo(requestId1); + } + + @Test + void closeShouldCloseRedisConnection() { + queue.close(); + + verify(mockRedisClient).close(); + } + + @Test + void closeShouldHandleRedisException() { + doThrow(new RuntimeException("Close error")).when(mockRedisClient).close(); + + // Should not throw exception + assertThatCode(() -> queue.close()).doesNotThrowAnyException(); + } + + @Test + void getQueueSizeShouldReturnCorrectSize() { + when(mockCommands.llen("session:queue")).thenReturn(5L); + + long size = queue.getQueueSize(); + + assertThat(size).isEqualTo(5L); + } + + @Test + void getQueueSizeShouldReturnNegativeOneOnException() { + when(mockCommands.llen("session:queue")) + .thenThrow(new RedisCommandExecutionException("Redis error")); + + long size = queue.getQueueSize(); + + assertThat(size).isEqualTo(-1L); + } + + @Test + void getRedisUriShouldReturnConfiguredUri() { + String uri = queue.getRedisUri(); + + assertThat(uri).isEqualTo(redisUri.toString()); + } + + private SessionRequest createSessionRequest(RequestId requestId) { + HttpRequest httpRequest = new HttpRequest(POST, "/session"); + httpRequest.setContent(Contents.utf8String("{\"capabilities\":{\"browserName\":\"chrome\"}}")); + return new SessionRequest(requestId, httpRequest, Instant.now()); + } + + private SessionRequest createSessionRequestWithCapabilities( + RequestId requestId, Capabilities capabilities) { + HttpRequest httpRequest = new HttpRequest(POST, "/session"); + httpRequest.setContent( + Contents.utf8String("{\"capabilities\":" + JSON.toJson(capabilities) + "}")); + return new SessionRequest(requestId, httpRequest, Instant.now()); + } +}