[py] Use generated Bidi files instead of hand curated ones (#17266) --------- Co-authored-by: Corey Goldberg <[email protected]> NOKEYCHECK=True GitOrigin-RevId: a75eb73e6118e63c200c4d74b2df92c816580668
diff --git a/BUILD.bazel b/BUILD.bazel index 1863245..4fcfddb 100644 --- a/BUILD.bazel +++ b/BUILD.bazel
@@ -262,8 +262,10 @@ # BiDi protocol support py_library( name = "bidi", - srcs = glob(["selenium/webdriver/common/bidi/**/*.py"]), - data = [":mutation-listener"], + srcs = [":create-bidi-src"], + data = [ + ":mutation-listener", + ], imports = ["."], visibility = ["//visibility:public"], deps = [ @@ -617,7 +619,7 @@ browser_versions = BROWSER_VERSIONS, ) -# Pilot BiDi code generation from CDDL specification +# Generate BiDi source files from CDDL specification generate_bidi( name = "create-bidi-src", cddl_file = "//common/bidi/spec:all.cddl",
diff --git a/generate_bidi.py b/generate_bidi.py index 5b301d3..8d5b75a 100755 --- a/generate_bidi.py +++ b/generate_bidi.py
@@ -1699,9 +1699,12 @@ logger.info(f"Parsed {len(modules)} modules") - # Clean up existing generated files + # Clean up existing generated files. + # Keep static helper modules that are staged by Bazel (for example cdp.py) + # as part of create-bidi-src.extra_srcs. + preserved_python_files = {"py.typed", "cdp.py"} for file_path in output_path.glob("*.py"): - if file_path.name != "py.typed" and not file_path.name.startswith("_"): + if file_path.name not in preserved_python_files and not file_path.name.startswith("_"): file_path.unlink() logger.debug(f"Removed: {file_path}")
diff --git a/private/bidi_enhancements_manifest.py b/private/bidi_enhancements_manifest.py index 8cec1f9..f8f033b 100644 --- a/private/bidi_enhancements_manifest.py +++ b/private/bidi_enhancements_manifest.py
@@ -1087,24 +1087,37 @@ self._conn.execute(_cb("network.removeIntercept", {"intercept": intercept_id})) if intercept_id in self.intercepts: self.intercepts.remove(intercept_id)''', + ''' def _canonical_request_handler_event(self, event): + """Map public request-handler aliases to supported event keys.""" + event_aliases = { + "auth_required": "auth_required", + "before_request": "before_request", + "before_request_sent": "before_request", + } + canonical_event = event_aliases.get(event) + if canonical_event is None: + available_events = ", ".join(sorted(event_aliases)) + raise ValueError( + f"Unsupported request handler event '{event}'. Available events: {available_events}" + ) + return canonical_event''', ''' def add_request_handler(self, event, callback, url_patterns=None): """Add a handler for network requests at the specified phase. Args: - event: Event name, e.g. ``"before_request"``. + event: Event name, e.g. ``"before_request"`` or ``"before_request_sent"``. callback: Callable receiving a :class:`Request` instance. url_patterns: optional list of URL pattern dicts to filter. Returns: callback_id int for later removal via remove_request_handler. """ + canonical_event = self._canonical_request_handler_event(event) phase_map = { "before_request": "beforeRequestSent", - "before_request_sent": "beforeRequestSent", - "response_started": "responseStarted", "auth_required": "authRequired", } - phase = phase_map.get(event, "beforeRequestSent") + phase = phase_map[canonical_event] intercept_result = self._add_intercept(phases=[phase], url_patterns=url_patterns) intercept_id = intercept_result.get("intercept") if intercept_result else None @@ -1117,7 +1130,7 @@ request = Request(self._conn, raw) callback(request) - callback_id = self.add_event_handler(event, _request_callback) + callback_id = self.add_event_handler(canonical_event, _request_callback) if intercept_id: self._handler_intercepts[callback_id] = intercept_id return callback_id''', @@ -1128,7 +1141,8 @@ event: The event name used when adding the handler. callback_id: The int returned by add_request_handler. """ - self.remove_event_handler(event, callback_id) + canonical_event = self._canonical_request_handler_event(event) + self.remove_event_handler(canonical_event, callback_id) intercept_id = self._handler_intercepts.pop(callback_id, None) if intercept_id: self._remove_intercept(intercept_id)''',
diff --git a/private/cdp.py b/private/cdp.py index d94f0da..86341b3 100644 --- a/private/cdp.py +++ b/private/cdp.py
@@ -451,7 +451,13 @@ try: data = json.loads(message) except json.JSONDecodeError: - raise BrowserError({"code": -32700, "message": "Client received invalid JSON", "data": message}) + raise BrowserError( + { + "code": -32700, + "message": "Client received invalid JSON", + "data": message, + } + ) logger.debug("Received message %r", data) if "sessionId" in data: session_id = devtools.target.SessionID(data["sessionId"])
diff --git a/selenium/webdriver/common/bidi/__init__.py b/selenium/webdriver/common/bidi/__init__.py deleted file mode 100644 index b37319d..0000000 --- a/selenium/webdriver/common/bidi/__init__.py +++ /dev/null
@@ -1,43 +0,0 @@ -# Licensed to the Software Freedom Conservancy (SFC) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The SFC licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - - -from __future__ import annotations - -from selenium.webdriver.common.bidi.browser import Browser -from selenium.webdriver.common.bidi.browsing_context import BrowsingContext -from selenium.webdriver.common.bidi.emulation import Emulation -from selenium.webdriver.common.bidi.input import Input -from selenium.webdriver.common.bidi.log import Log -from selenium.webdriver.common.bidi.network import Network -from selenium.webdriver.common.bidi.script import Script -from selenium.webdriver.common.bidi.session import Session -from selenium.webdriver.common.bidi.storage import Storage -from selenium.webdriver.common.bidi.webextension import WebExtension - -__all__ = [ - "Browser", - "BrowsingContext", - "Emulation", - "Input", - "Log", - "Network", - "Script", - "Session", - "Storage", - "WebExtension", -]
diff --git a/selenium/webdriver/common/bidi/_event_manager.py b/selenium/webdriver/common/bidi/_event_manager.py deleted file mode 100644 index 1dcc828..0000000 --- a/selenium/webdriver/common/bidi/_event_manager.py +++ /dev/null
@@ -1,180 +0,0 @@ -# Licensed to the Software Freedom Conservancy (SFC) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The SFC licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -"""Shared event management helpers for generated WebDriver BiDi modules. - -``EventConfig``, ``_EventWrapper``, and ``_EventManager`` are emitted -identically into every generated module that exposes events. Rather than -duplicating this logic across those modules, they are defined once here and -copied into generated outputs by Bazel. -""" - -from __future__ import annotations - -import threading -from collections.abc import Callable -from dataclasses import dataclass -from typing import Any - -from selenium.webdriver.common.bidi.session import Session - - -@dataclass -class EventConfig: - """Configuration for a BiDi event.""" - - event_key: str - bidi_event: str - event_class: type - - -class _EventWrapper: - """Wrapper to provide event_class attribute for WebSocketConnection callbacks.""" - - def __init__(self, bidi_event: str, event_class: type): - self.event_class = bidi_event # WebSocket expects the BiDi event name as event_class - self._python_class = event_class # Keep reference to Python dataclass for deserialization - - def from_json(self, params: dict) -> Any: - """Deserialize event params into the wrapped Python dataclass. - - Args: - params: Raw BiDi event params with camelCase keys. - - Returns: - An instance of the dataclass, or the raw dict on failure. - """ - if self._python_class is None or self._python_class is dict: - return params - try: - # Delegate to a classmethod from_json if the class defines one - if hasattr(self._python_class, "from_json") and callable(self._python_class.from_json): - return self._python_class.from_json(params) - import dataclasses as dc - - snake_params = {self._camel_to_snake(k): v for k, v in params.items()} - if dc.is_dataclass(self._python_class): - valid_fields = {f.name for f in dc.fields(self._python_class)} - filtered = {k: v for k, v in snake_params.items() if k in valid_fields} - return self._python_class(**filtered) - return self._python_class(**snake_params) - except Exception: - return params - - @staticmethod - def _camel_to_snake(name: str) -> str: - result = [name[0].lower()] - for char in name[1:]: - if char.isupper(): - result.extend(["_", char.lower()]) - else: - result.append(char) - return "".join(result) - - -class _EventManager: - """Manages event subscriptions and callbacks.""" - - def __init__(self, conn, event_configs: dict[str, EventConfig]): - self.conn = conn - self.event_configs = event_configs - self.subscriptions: dict = {} - self._event_wrappers = {} # Cache of _EventWrapper objects - self._bidi_to_class = {config.bidi_event: config.event_class for config in event_configs.values()} - self._available_events = ", ".join(sorted(event_configs.keys())) - self._subscription_lock = threading.Lock() - - # Create event wrappers for each event - for config in event_configs.values(): - wrapper = _EventWrapper(config.bidi_event, config.event_class) - self._event_wrappers[config.bidi_event] = wrapper - - def validate_event(self, event: str) -> EventConfig: - event_config = self.event_configs.get(event) - if not event_config: - raise ValueError(f"Event '{event}' not found. Available events: {self._available_events}") - return event_config - - def subscribe_to_event(self, bidi_event: str, contexts: list[str] | None = None) -> None: - """Subscribe to a BiDi event if not already subscribed.""" - with self._subscription_lock: - if bidi_event not in self.subscriptions: - session = Session(self.conn) - result = session.subscribe([bidi_event], contexts=contexts) - sub_id = result.get("subscription") if isinstance(result, dict) else None - self.subscriptions[bidi_event] = { - "callbacks": [], - "subscription_id": sub_id, - } - - def unsubscribe_from_event(self, bidi_event: str) -> None: - """Unsubscribe from a BiDi event if no more callbacks exist.""" - with self._subscription_lock: - entry = self.subscriptions.get(bidi_event) - if entry is not None and not entry["callbacks"]: - session = Session(self.conn) - sub_id = entry.get("subscription_id") - if sub_id: - session.unsubscribe(subscriptions=[sub_id]) - else: - session.unsubscribe(events=[bidi_event]) - del self.subscriptions[bidi_event] - - def add_callback_to_tracking(self, bidi_event: str, callback_id: int) -> None: - with self._subscription_lock: - self.subscriptions[bidi_event]["callbacks"].append(callback_id) - - def remove_callback_from_tracking(self, bidi_event: str, callback_id: int) -> None: - with self._subscription_lock: - entry = self.subscriptions.get(bidi_event) - if entry and callback_id in entry["callbacks"]: - entry["callbacks"].remove(callback_id) - - def add_event_handler(self, event: str, callback: Callable, contexts: list[str] | None = None) -> int: - event_config = self.validate_event(event) - # Use the event wrapper for add_callback - event_wrapper = self._event_wrappers.get(event_config.bidi_event) - callback_id = self.conn.add_callback(event_wrapper, callback) - self.subscribe_to_event(event_config.bidi_event, contexts) - self.add_callback_to_tracking(event_config.bidi_event, callback_id) - return callback_id - - def remove_event_handler(self, event: str, callback_id: int) -> None: - event_config = self.validate_event(event) - event_wrapper = self._event_wrappers.get(event_config.bidi_event) - self.conn.remove_callback(event_wrapper, callback_id) - self.remove_callback_from_tracking(event_config.bidi_event, callback_id) - self.unsubscribe_from_event(event_config.bidi_event) - - def clear_event_handlers(self) -> None: - """Clear all event handlers.""" - with self._subscription_lock: - if not self.subscriptions: - return - session = Session(self.conn) - for bidi_event, entry in list(self.subscriptions.items()): - event_wrapper = self._event_wrappers.get(bidi_event) - callbacks = entry["callbacks"] if isinstance(entry, dict) else entry - if event_wrapper: - for callback_id in callbacks: - self.conn.remove_callback(event_wrapper, callback_id) - sub_id = entry.get("subscription_id") if isinstance(entry, dict) else None - if sub_id: - session.unsubscribe(subscriptions=[sub_id]) - else: - session.unsubscribe(events=[bidi_event]) - self.subscriptions.clear()
diff --git a/selenium/webdriver/common/bidi/browser.py b/selenium/webdriver/common/bidi/browser.py deleted file mode 100644 index 440f13e..0000000 --- a/selenium/webdriver/common/bidi/browser.py +++ /dev/null
@@ -1,363 +0,0 @@ -# Licensed to the Software Freedom Conservancy (SFC) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The SFC licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - - -from __future__ import annotations - -from dataclasses import dataclass, field -from typing import Any - -from selenium.webdriver.common.bidi.common import command_builder - - -def transform_download_params( - allowed: bool | None, - destination_folder: str | None, -) -> dict[str, Any] | None: - """Transform download parameters into download_behavior object. - - Args: - allowed: Whether downloads are allowed - destination_folder: Destination folder for downloads (accepts str or - pathlib.Path; will be coerced to str) - - Returns: - Dictionary representing the download_behavior object, or None if allowed is None - """ - if allowed is True: - return { - "type": "allowed", - # Coerce pathlib.Path (or any path-like) to str so the BiDi - # protocol always receives a plain JSON string. - "destinationFolder": str(destination_folder) if destination_folder is not None else None, - } - elif allowed is False: - return {"type": "denied"} - else: # None — reset to browser default (sent as JSON null) - return None - - -def validate_download_behavior( - allowed: bool | None, - destination_folder: str | None, - user_contexts: Any | None = None, -) -> None: - """Validate download behavior parameters. - - Args: - allowed: Whether downloads are allowed - destination_folder: Destination folder for downloads - user_contexts: Optional list of user contexts - - Raises: - ValueError: If parameters are invalid - """ - if allowed is True and not destination_folder: - raise ValueError("destination_folder is required when allowed=True") - if allowed is False and destination_folder: - raise ValueError("destination_folder should not be provided when allowed=False") - - -@dataclass -class ClientWindowInfo: - """ClientWindowInfo.""" - - active: bool | None = None - client_window: Any | None = None - height: Any | None = None - state: Any | None = None - width: Any | None = None - x: Any | None = None - y: Any | None = None - - def get_client_window(self): - """Get the client window ID.""" - return self.client_window - - def get_state(self): - """Get the client window state.""" - return self.state - - def get_width(self): - """Get the client window width.""" - return self.width - - def get_height(self): - """Get the client window height.""" - return self.height - - def is_active(self): - """Check if the client window is active.""" - return self.active - - def get_x(self): - """Get the client window X position.""" - return self.x - - def get_y(self): - """Get the client window Y position.""" - return self.y - - -@dataclass -class UserContextInfo: - """UserContextInfo.""" - - user_context: Any | None = None - - -@dataclass -class CreateUserContextParameters: - """CreateUserContextParameters.""" - - accept_insecure_certs: bool | None = None - proxy: Any | None = None - unhandled_prompt_behavior: Any | None = None - - -@dataclass -class GetClientWindowsResult: - """GetClientWindowsResult.""" - - client_windows: list[Any] = field(default_factory=list) - - -@dataclass -class GetUserContextsResult: - """GetUserContextsResult.""" - - user_contexts: list[Any] = field(default_factory=list) - - -@dataclass -class RemoveUserContextParameters: - """RemoveUserContextParameters.""" - - user_context: Any | None = None - - -@dataclass -class ClientWindowRectState: - """ClientWindowRectState.""" - - state: str = field(default="normal", init=False) - width: Any | None = None - height: Any | None = None - x: Any | None = None - y: Any | None = None - - -@dataclass -class SetDownloadBehaviorParameters: - """SetDownloadBehaviorParameters.""" - - download_behavior: Any | None = None - user_contexts: list[Any] = field(default_factory=list) - - -@dataclass -class DownloadBehaviorAllowed: - """DownloadBehaviorAllowed.""" - - type: str = field(default="allowed", init=False) - destination_folder: str | None = None - - -@dataclass -class DownloadBehaviorDenied: - """DownloadBehaviorDenied.""" - - type: str = field(default="denied", init=False) - - -class ClientWindowNamedState: - """Named states for a browser client window.""" - - FULLSCREEN = "fullscreen" - MAXIMIZED = "maximized" - MINIMIZED = "minimized" - NORMAL = "normal" - - -@dataclass -class SetClientWindowStateParameters: - """SetClientWindowStateParameters. - - The ``state`` field is required and must be either a named-state string - (e.g. ``ClientWindowNamedState.MAXIMIZED``) or a - :class:`ClientWindowRectState` instance. ``client_window`` is the ID of - the window to affect. - """ - - client_window: Any | None = None - state: Any | None = None - - -class Browser: - """WebDriver BiDi browser module.""" - - def __init__(self, conn) -> None: - self._conn = conn - - def close(self): - """Execute browser.close.""" - params = {} - params = {k: v for k, v in params.items() if v is not None} - cmd = command_builder("browser.close", params) - result = self._conn.execute(cmd) - return result - - def create_user_context( - self, - accept_insecure_certs: bool | None = None, - proxy: Any | None = None, - unhandled_prompt_behavior: Any | None = None, - ): - """Execute browser.createUserContext.""" - if proxy and hasattr(proxy, "to_bidi_dict"): - proxy = proxy.to_bidi_dict() - - if unhandled_prompt_behavior and hasattr(unhandled_prompt_behavior, "to_bidi_dict"): - unhandled_prompt_behavior = unhandled_prompt_behavior.to_bidi_dict() - - params = { - "acceptInsecureCerts": accept_insecure_certs, - "proxy": proxy, - "unhandledPromptBehavior": unhandled_prompt_behavior, - } - params = {k: v for k, v in params.items() if v is not None} - cmd = command_builder("browser.createUserContext", params) - result = self._conn.execute(cmd) - if result and "userContext" in result: - extracted = result.get("userContext") - return extracted - return result - - def get_client_windows(self): - """Execute browser.getClientWindows.""" - params = {} - params = {k: v for k, v in params.items() if v is not None} - cmd = command_builder("browser.getClientWindows", params) - result = self._conn.execute(cmd) - if result and "clientWindows" in result: - items = result.get("clientWindows", []) - return [ - ClientWindowInfo( - active=item.get("active"), - client_window=item.get("clientWindow"), - height=item.get("height"), - state=item.get("state"), - width=item.get("width"), - x=item.get("x"), - y=item.get("y"), - ) - for item in items - if isinstance(item, dict) - ] - return [] - - def get_user_contexts(self): - """Execute browser.getUserContexts.""" - params = {} - params = {k: v for k, v in params.items() if v is not None} - cmd = command_builder("browser.getUserContexts", params) - result = self._conn.execute(cmd) - if result and "userContexts" in result: - items = result.get("userContexts", []) - return [item.get("userContext") for item in items if isinstance(item, dict)] - return [] - - def remove_user_context(self, user_context: Any | None = None): - """Execute browser.removeUserContext.""" - if user_context is None: - raise TypeError("remove_user_context() missing required argument: 'user_context'") - - params = { - "userContext": user_context, - } - params = {k: v for k, v in params.items() if v is not None} - cmd = command_builder("browser.removeUserContext", params) - result = self._conn.execute(cmd) - return result - - def set_download_behavior( - self, - allowed: bool | None = None, - destination_folder: str | None = None, - user_contexts: list[Any] | None = None, - ): - """Set the download behavior for the browser. - - Args: - allowed: ``True`` to allow downloads, ``False`` to deny, or ``None`` - to reset to browser default (sends ``null`` to the protocol). - destination_folder: Destination folder for downloads. Required when - ``allowed=True``. Accepts a string or :class:`pathlib.Path`. - user_contexts: Optional list of user context IDs. - - Raises: - ValueError: If *allowed* is ``True`` and *destination_folder* is - omitted, or ``False`` and *destination_folder* is provided. - """ - validate_download_behavior( - allowed=allowed, - destination_folder=destination_folder, - user_contexts=user_contexts, - ) - download_behavior = transform_download_params(allowed, destination_folder) - # downloadBehavior is a REQUIRED field in the BiDi spec (can be null but - # must be present). Do NOT use a generic None-filter on it. - params: dict = {"downloadBehavior": download_behavior} - if user_contexts is not None: - params["userContexts"] = user_contexts - cmd = command_builder("browser.setDownloadBehavior", params) - return self._conn.execute(cmd) - - def set_client_window_state( - self, - client_window: Any | None = None, - state: Any | None = None, - ): - """Set the client window state. - - Args: - client_window: The client window ID to apply the state to. - state: The window state to set. Can be one of: - - A string: "fullscreen", "maximized", "minimized", "normal" - - A ClientWindowRectState object with width, height, x, y - - A dict representing the state - - Raises: - ValueError: If client_window is not provided or state is invalid. - """ - if client_window is None: - raise ValueError("client_window is required") - if state is None: - raise ValueError("state is required") - - # Serialize ClientWindowRectState if needed - state_param = state - if hasattr(state, "__dataclass_fields__"): - # It's a dataclass, convert to dict - state_param = {k: v for k, v in state.__dict__.items() if v is not None} - - params = { - "clientWindow": client_window, - "state": state_param, - } - cmd = command_builder("browser.setClientWindowState", params) - return self._conn.execute(cmd)
diff --git a/selenium/webdriver/common/bidi/browsing_context.py b/selenium/webdriver/common/bidi/browsing_context.py deleted file mode 100644 index b5e14f1..0000000 --- a/selenium/webdriver/common/bidi/browsing_context.py +++ /dev/null
@@ -1,891 +0,0 @@ -# Licensed to the Software Freedom Conservancy (SFC) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The SFC licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - - -from __future__ import annotations - -from collections.abc import Callable -from dataclasses import dataclass, field -from typing import Any - -from selenium.webdriver.common.bidi._event_manager import EventConfig, _EventManager -from selenium.webdriver.common.bidi.common import command_builder - - -class ReadinessState: - """ReadinessState.""" - - NONE = "none" - INTERACTIVE = "interactive" - COMPLETE = "complete" - - -class UserPromptType: - """UserPromptType.""" - - ALERT = "alert" - BEFOREUNLOAD = "beforeunload" - CONFIRM = "confirm" - PROMPT = "prompt" - - -class CreateType: - """CreateType.""" - - TAB = "tab" - WINDOW = "window" - - -class DownloadCompleteParams: - """DownloadCompleteParams.""" - - COMPLETE = "complete" - - -@dataclass -class Info: - """Info.""" - - children: Any | None = None - client_window: Any | None = None - context: Any | None = None - original_opener: Any | None = None - url: str | None = None - user_context: Any | None = None - parent: Any | None = None - - -@dataclass -class AccessibilityLocator: - """AccessibilityLocator.""" - - type: str = field(default="accessibility", init=False) - name: str | None = None - role: str | None = None - - -@dataclass -class CssLocator: - """CssLocator.""" - - type: str = field(default="css", init=False) - value: str | None = None - - -@dataclass -class ContextLocator: - """ContextLocator.""" - - type: str = field(default="context", init=False) - context: Any | None = None - - -@dataclass -class InnerTextLocator: - """InnerTextLocator.""" - - type: str = field(default="innerText", init=False) - value: str | None = None - ignore_case: bool | None = None - match_type: Any | None = None - max_depth: Any | None = None - - -@dataclass -class XPathLocator: - """XPathLocator.""" - - type: str = field(default="xpath", init=False) - value: str | None = None - - -@dataclass -class BaseNavigationInfo: - """BaseNavigationInfo.""" - - context: Any | None = None - navigation: Any | None = None - timestamp: Any | None = None - url: str | None = None - user_context: Any | None = None - - -@dataclass -class ActivateParameters: - """ActivateParameters.""" - - context: Any | None = None - - -@dataclass -class CaptureScreenshotParameters: - """CaptureScreenshotParameters.""" - - context: Any | None = None - format: Any | None = None - clip: Any | None = None - - -@dataclass -class ImageFormat: - """ImageFormat.""" - - type: str | None = None - quality: Any | None = None - - -@dataclass -class ElementClipRectangle: - """ElementClipRectangle.""" - - type: str = field(default="element", init=False) - element: Any | None = None - - -@dataclass -class BoxClipRectangle: - """BoxClipRectangle.""" - - type: str = field(default="box", init=False) - x: Any | None = None - y: Any | None = None - width: Any | None = None - height: Any | None = None - - -@dataclass -class CaptureScreenshotResult: - """CaptureScreenshotResult.""" - - data: str | None = None - - -@dataclass -class CloseParameters: - """CloseParameters.""" - - context: Any | None = None - prompt_unload: bool | None = None - - -@dataclass -class CreateParameters: - """CreateParameters.""" - - type: Any | None = None - reference_context: Any | None = None - background: bool | None = None - user_context: Any | None = None - - -@dataclass -class CreateResult: - """CreateResult.""" - - context: Any | None = None - user_context: Any | None = None - - -@dataclass -class GetTreeParameters: - """GetTreeParameters.""" - - max_depth: Any | None = None - root: Any | None = None - - -@dataclass -class GetTreeResult: - """GetTreeResult.""" - - contexts: Any | None = None - - -@dataclass -class HandleUserPromptParameters: - """HandleUserPromptParameters.""" - - context: Any | None = None - accept: bool | None = None - user_text: str | None = None - - -@dataclass -class LocateNodesParameters: - """LocateNodesParameters.""" - - context: Any | None = None - locator: Any | None = None - serialization_options: Any | None = None - start_nodes: list[Any] = field(default_factory=list) - - -@dataclass -class LocateNodesResult: - """LocateNodesResult.""" - - nodes: list[Any] = field(default_factory=list) - - -@dataclass -class NavigateParameters: - """NavigateParameters.""" - - context: Any | None = None - url: str | None = None - wait: Any | None = None - - -@dataclass -class NavigateResult: - """NavigateResult.""" - - navigation: Any | None = None - url: str | None = None - - -@dataclass -class PrintParameters: - """PrintParameters.""" - - context: Any | None = None - background: bool | None = None - margin: Any | None = None - page: Any | None = None - scale: Any | None = None - shrink_to_fit: bool | None = None - - -@dataclass -class PrintMarginParameters: - """PrintMarginParameters.""" - - bottom: Any | None = None - left: Any | None = None - right: Any | None = None - top: Any | None = None - - -@dataclass -class PrintPageParameters: - """PrintPageParameters.""" - - height: Any | None = None - width: Any | None = None - - -@dataclass -class PrintResult: - """PrintResult.""" - - data: str | None = None - - -@dataclass -class ReloadParameters: - """ReloadParameters.""" - - context: Any | None = None - ignore_cache: bool | None = None - wait: Any | None = None - - -@dataclass -class SetBypassCSPParameters: - """SetBypassCSPParameters.""" - - bypass: Any | None = None - contexts: list[Any] = field(default_factory=list) - user_contexts: list[Any] = field(default_factory=list) - - -@dataclass -class SetViewportParameters: - """SetViewportParameters.""" - - context: Any | None = None - viewport: Any | None = None - device_pixel_ratio: Any | None = None - user_contexts: list[Any] = field(default_factory=list) - - -@dataclass -class Viewport: - """Viewport.""" - - width: Any | None = None - height: Any | None = None - - -@dataclass -class TraverseHistoryParameters: - """TraverseHistoryParameters.""" - - context: Any | None = None - delta: Any | None = None - - -@dataclass -class HistoryUpdatedParameters: - """HistoryUpdatedParameters.""" - - context: Any | None = None - timestamp: Any | None = None - url: str | None = None - user_context: Any | None = None - - -@dataclass -class UserPromptClosedParameters: - """UserPromptClosedParameters.""" - - context: Any | None = None - accepted: bool | None = None - type: Any | None = None - user_context: Any | None = None - user_text: str | None = None - - -@dataclass -class UserPromptOpenedParameters: - """UserPromptOpenedParameters.""" - - context: Any | None = None - handler: Any | None = None - message: str | None = None - type: Any | None = None - user_context: Any | None = None - default_value: str | None = None - - -@dataclass -class DownloadWillBeginParams: - """DownloadWillBeginParams.""" - - suggested_filename: str | None = None - - -@dataclass -class DownloadCanceledParams: - """DownloadCanceledParams.""" - - status: Any | None = None - - -@dataclass -class DownloadParams: - """DownloadParams - fields shared by all download end event variants.""" - - status: str | None = None - context: Any | None = None - navigation: Any | None = None - timestamp: Any | None = None - url: str | None = None - filepath: str | None = None - - -@dataclass -class DownloadEndParams: - """DownloadEndParams - params for browsingContext.downloadEnd event.""" - - download_params: DownloadParams | None = None - - @classmethod - def from_json(cls, params: dict) -> DownloadEndParams: - """Deserialize from BiDi wire-level params dict.""" - dp = DownloadParams( - status=params.get("status"), - context=params.get("context"), - navigation=params.get("navigation"), - timestamp=params.get("timestamp"), - url=params.get("url"), - filepath=params.get("filepath"), - ) - return cls(download_params=dp) - - -# BiDi Event Name to Parameter Type Mapping -EVENT_NAME_MAPPING = { - "context_created": "browsingContext.contextCreated", - "context_destroyed": "browsingContext.contextDestroyed", - "navigation_started": "browsingContext.navigationStarted", - "fragment_navigated": "browsingContext.fragmentNavigated", - "history_updated": "browsingContext.historyUpdated", - "dom_content_loaded": "browsingContext.domContentLoaded", - "load": "browsingContext.load", - "download_will_begin": "browsingContext.downloadWillBegin", - "download_end": "browsingContext.downloadEnd", - "navigation_aborted": "browsingContext.navigationAborted", - "navigation_committed": "browsingContext.navigationCommitted", - "navigation_failed": "browsingContext.navigationFailed", - "user_prompt_closed": "browsingContext.userPromptClosed", - "user_prompt_opened": "browsingContext.userPromptOpened", -} - - -def _deserialize_info_list(items: list) -> list | None: - """Recursively deserialize a list of dicts to Info objects. - - Args: - items: List of dicts from the API response - - Returns: - List of Info objects with properly nested children, or None if empty - """ - if not items or not isinstance(items, list): - return None - - result = [] - for item in items: - if isinstance(item, dict): - # Recursively deserialize children only if the key exists in response - children_list = None - if "children" in item: - children_list = _deserialize_info_list(item.get("children", [])) - info = Info( - children=children_list, - client_window=item.get("clientWindow"), - context=item.get("context"), - original_opener=item.get("originalOpener"), - url=item.get("url"), - user_context=item.get("userContext"), - parent=item.get("parent"), - ) - result.append(info) - return result if result else None - - -class BrowsingContext: - """WebDriver BiDi browsingContext module.""" - - EVENT_CONFIGS: dict[str, EventConfig] = {} - - def __init__(self, conn) -> None: - self._conn = conn - self._event_manager = _EventManager(conn, self.EVENT_CONFIGS) - - def activate(self, context: Any | None = None): - """Execute browsingContext.activate.""" - if context is None: - raise TypeError("activate() missing required argument: 'context'") - - params = { - "context": context, - } - params = {k: v for k, v in params.items() if v is not None} - cmd = command_builder("browsingContext.activate", params) - result = self._conn.execute(cmd) - return result - - def capture_screenshot( - self, - context: str | None = None, - format: Any | None = None, - clip: Any | None = None, - origin: str | None = None, - ): - """Execute browsingContext.captureScreenshot.""" - if context is None: - raise TypeError("capture_screenshot() missing required argument: 'context'") - - params = { - "context": context, - "format": format, - "clip": clip, - "origin": origin, - } - params = {k: v for k, v in params.items() if v is not None} - cmd = command_builder("browsingContext.captureScreenshot", params) - result = self._conn.execute(cmd) - if result and "data" in result: - extracted = result.get("data") - return extracted - return result - - def close(self, context: Any | None = None, prompt_unload: bool | None = None): - """Execute browsingContext.close.""" - if context is None: - raise TypeError("close() missing required argument: 'context'") - - params = { - "context": context, - "promptUnload": prompt_unload, - } - params = {k: v for k, v in params.items() if v is not None} - cmd = command_builder("browsingContext.close", params) - result = self._conn.execute(cmd) - return result - - def create( - self, - type: Any | None = None, - reference_context: Any | None = None, - background: bool | None = None, - user_context: Any | None = None, - ): - """Execute browsingContext.create.""" - if type is None: - raise TypeError("create() missing required argument: 'type'") - - params = { - "type": type, - "referenceContext": reference_context, - "background": background, - "userContext": user_context, - } - params = {k: v for k, v in params.items() if v is not None} - cmd = command_builder("browsingContext.create", params) - result = self._conn.execute(cmd) - if result and "context" in result: - extracted = result.get("context") - return extracted - return result - - def get_tree(self, max_depth: Any | None = None, root: Any | None = None): - """Execute browsingContext.getTree.""" - params = { - "maxDepth": max_depth, - "root": root, - } - params = {k: v for k, v in params.items() if v is not None} - cmd = command_builder("browsingContext.getTree", params) - result = self._conn.execute(cmd) - if result and "contexts" in result: - items = result.get("contexts", []) - return [ - Info( - children=_deserialize_info_list(item.get("children", [])), - client_window=item.get("clientWindow"), - context=item.get("context"), - original_opener=item.get("originalOpener"), - url=item.get("url"), - user_context=item.get("userContext"), - parent=item.get("parent"), - ) - for item in items - if isinstance(item, dict) - ] - return [] - - def handle_user_prompt(self, context: Any | None = None, accept: bool | None = None, user_text: Any | None = None): - """Execute browsingContext.handleUserPrompt.""" - if context is None: - raise TypeError("handle_user_prompt() missing required argument: 'context'") - - params = { - "context": context, - "accept": accept, - "userText": user_text, - } - params = {k: v for k, v in params.items() if v is not None} - cmd = command_builder("browsingContext.handleUserPrompt", params) - result = self._conn.execute(cmd) - return result - - def locate_nodes( - self, - context: str | None = None, - locator: Any | None = None, - serialization_options: Any | None = None, - start_nodes: Any | None = None, - max_node_count: int | None = None, - ): - """Execute browsingContext.locateNodes.""" - if context is None: - raise TypeError("locate_nodes() missing required argument: 'context'") - if locator is None: - raise TypeError("locate_nodes() missing required argument: 'locator'") - - params = { - "context": context, - "locator": locator, - "serializationOptions": serialization_options, - "startNodes": start_nodes, - "maxNodeCount": max_node_count, - } - params = {k: v for k, v in params.items() if v is not None} - cmd = command_builder("browsingContext.locateNodes", params) - result = self._conn.execute(cmd) - if result and "nodes" in result: - extracted = result.get("nodes") - return extracted - return result - - def navigate(self, context: Any | None = None, url: Any | None = None, wait: Any | None = None): - """Execute browsingContext.navigate.""" - if context is None: - raise TypeError("navigate() missing required argument: 'context'") - if url is None: - raise TypeError("navigate() missing required argument: 'url'") - - params = { - "context": context, - "url": url, - "wait": wait, - } - params = {k: v for k, v in params.items() if v is not None} - cmd = command_builder("browsingContext.navigate", params) - result = self._conn.execute(cmd) - return result - - def print( - self, - context: Any | None = None, - background: bool | None = None, - margin: Any | None = None, - page: Any | None = None, - scale: Any | None = None, - shrink_to_fit: bool | None = None, - ): - """Execute browsingContext.print.""" - if context is None: - raise TypeError("print() missing required argument: 'context'") - - params = { - "context": context, - "background": background, - "margin": margin, - "page": page, - "scale": scale, - "shrinkToFit": shrink_to_fit, - } - params = {k: v for k, v in params.items() if v is not None} - cmd = command_builder("browsingContext.print", params) - result = self._conn.execute(cmd) - if result and "data" in result: - extracted = result.get("data") - return extracted - return result - - def reload(self, context: Any | None = None, ignore_cache: bool | None = None, wait: Any | None = None): - """Execute browsingContext.reload.""" - if context is None: - raise TypeError("reload() missing required argument: 'context'") - - params = { - "context": context, - "ignoreCache": ignore_cache, - "wait": wait, - } - params = {k: v for k, v in params.items() if v is not None} - cmd = command_builder("browsingContext.reload", params) - result = self._conn.execute(cmd) - return result - - def set_bypass_csp( - self, - bypass: Any | None = None, - contexts: list[Any] | None = None, - user_contexts: list[Any] | None = None, - ): - """Execute browsingContext.setBypassCSP.""" - if bypass is None: - raise TypeError("set_bypass_csp() missing required argument: 'bypass'") - - params = { - "bypass": bypass, - "contexts": contexts, - "userContexts": user_contexts, - } - params = {k: v for k, v in params.items() if v is not None} - cmd = command_builder("browsingContext.setBypassCSP", params) - result = self._conn.execute(cmd) - return result - - def traverse_history(self, context: Any | None = None, delta: Any | None = None): - """Execute browsingContext.traverseHistory.""" - if context is None: - raise TypeError("traverse_history() missing required argument: 'context'") - if delta is None: - raise TypeError("traverse_history() missing required argument: 'delta'") - - params = { - "context": context, - "delta": delta, - } - params = {k: v for k, v in params.items() if v is not None} - cmd = command_builder("browsingContext.traverseHistory", params) - result = self._conn.execute(cmd) - return result - - def set_viewport( - self, - context: str | None = None, - viewport: Any = ..., - user_contexts: Any | None = None, - device_pixel_ratio: Any = ..., - ): - """Execute browsingContext.setViewport. - - Uses sentinel defaults so explicit None is serialized for viewport/devicePixelRatio, - while omitted arguments are not sent. - """ - params = {} - if context is not None: - params["context"] = context - if user_contexts is not None: - params["userContexts"] = user_contexts - if viewport is not ...: - params["viewport"] = viewport - if device_pixel_ratio is not ...: - params["devicePixelRatio"] = device_pixel_ratio - - cmd = command_builder("browsingContext.setViewport", params) - result = self._conn.execute(cmd) - return result - - def add_event_handler(self, event: str, callback: Callable, contexts: list[str] | None = None) -> int: - """Add an event handler. - - Args: - event: The event to subscribe to. - callback: The callback function to execute on event. - contexts: The context IDs to subscribe to (optional). - - Returns: - The callback ID. - """ - return self._event_manager.add_event_handler(event, callback, contexts) - - def remove_event_handler(self, event: str, callback_id: int) -> None: - """Remove an event handler. - - Args: - event: The event to unsubscribe from. - callback_id: The callback ID. - """ - return self._event_manager.remove_event_handler(event, callback_id) - - def clear_event_handlers(self) -> None: - """Clear all event handlers.""" - return self._event_manager.clear_event_handlers() - - -# Event Info Type Aliases -# Event: browsingContext.contextCreated -ContextCreated = globals().get("Info", dict) # Fallback to dict if type not defined - -# Event: browsingContext.contextDestroyed -ContextDestroyed = globals().get("Info", dict) # Fallback to dict if type not defined - -# Event: browsingContext.navigationStarted -NavigationStarted = globals().get("BaseNavigationInfo", dict) # Fallback to dict if type not defined - -# Event: browsingContext.fragmentNavigated -FragmentNavigated = globals().get("BaseNavigationInfo", dict) # Fallback to dict if type not defined - -# Event: browsingContext.historyUpdated -HistoryUpdated = globals().get("HistoryUpdatedParameters", dict) # Fallback to dict if type not defined - -# Event: browsingContext.domContentLoaded -DomContentLoaded = globals().get("BaseNavigationInfo", dict) # Fallback to dict if type not defined - -# Event: browsingContext.load -Load = globals().get("BaseNavigationInfo", dict) # Fallback to dict if type not defined - -# Event: browsingContext.downloadWillBegin -DownloadWillBegin = globals().get("DownloadWillBeginParams", dict) # Fallback to dict if type not defined - -# Event: browsingContext.downloadEnd -DownloadEnd = globals().get("DownloadEndParams", dict) # Fallback to dict if type not defined - -# Event: browsingContext.navigationAborted -NavigationAborted = globals().get("BaseNavigationInfo", dict) # Fallback to dict if type not defined - -# Event: browsingContext.navigationCommitted -NavigationCommitted = globals().get("BaseNavigationInfo", dict) # Fallback to dict if type not defined - -# Event: browsingContext.navigationFailed -NavigationFailed = globals().get("BaseNavigationInfo", dict) # Fallback to dict if type not defined - -# Event: browsingContext.userPromptClosed -UserPromptClosed = globals().get("UserPromptClosedParameters", dict) # Fallback to dict if type not defined - -# Event: browsingContext.userPromptOpened -UserPromptOpened = globals().get("UserPromptOpenedParameters", dict) # Fallback to dict if type not defined - - -# Populate EVENT_CONFIGS with event configuration mappings -_globals = globals() -BrowsingContext.EVENT_CONFIGS = { - "context_created": EventConfig( - "context_created", - "browsingContext.contextCreated", - _globals.get("ContextCreated", dict) if _globals.get("ContextCreated") else dict, - ), - "context_destroyed": EventConfig( - "context_destroyed", - "browsingContext.contextDestroyed", - _globals.get("ContextDestroyed", dict) if _globals.get("ContextDestroyed") else dict, - ), - "navigation_started": EventConfig( - "navigation_started", - "browsingContext.navigationStarted", - _globals.get("NavigationStarted", dict) if _globals.get("NavigationStarted") else dict, - ), - "fragment_navigated": EventConfig( - "fragment_navigated", - "browsingContext.fragmentNavigated", - _globals.get("FragmentNavigated", dict) if _globals.get("FragmentNavigated") else dict, - ), - "history_updated": EventConfig( - "history_updated", - "browsingContext.historyUpdated", - _globals.get("HistoryUpdated", dict) if _globals.get("HistoryUpdated") else dict, - ), - "dom_content_loaded": EventConfig( - "dom_content_loaded", - "browsingContext.domContentLoaded", - _globals.get("DomContentLoaded", dict) if _globals.get("DomContentLoaded") else dict, - ), - "load": EventConfig("load", "browsingContext.load", _globals.get("Load", dict) if _globals.get("Load") else dict), - "download_will_begin": EventConfig( - "download_will_begin", - "browsingContext.downloadWillBegin", - _globals.get("DownloadWillBegin", dict) if _globals.get("DownloadWillBegin") else dict, - ), - "download_end": EventConfig( - "download_end", - "browsingContext.downloadEnd", - _globals.get("DownloadEnd", dict) if _globals.get("DownloadEnd") else dict, - ), - "navigation_aborted": EventConfig( - "navigation_aborted", - "browsingContext.navigationAborted", - _globals.get("NavigationAborted", dict) if _globals.get("NavigationAborted") else dict, - ), - "navigation_committed": EventConfig( - "navigation_committed", - "browsingContext.navigationCommitted", - _globals.get("NavigationCommitted", dict) if _globals.get("NavigationCommitted") else dict, - ), - "navigation_failed": EventConfig( - "navigation_failed", - "browsingContext.navigationFailed", - _globals.get("NavigationFailed", dict) if _globals.get("NavigationFailed") else dict, - ), - "user_prompt_closed": EventConfig( - "user_prompt_closed", - "browsingContext.userPromptClosed", - _globals.get("UserPromptClosed", dict) if _globals.get("UserPromptClosed") else dict, - ), - "user_prompt_opened": EventConfig( - "user_prompt_opened", - "browsingContext.userPromptOpened", - _globals.get("UserPromptOpened", dict) if _globals.get("UserPromptOpened") else dict, - ), -}
diff --git a/selenium/webdriver/common/bidi/cdp.py b/selenium/webdriver/common/bidi/cdp.py deleted file mode 100644 index bac0076..0000000 --- a/selenium/webdriver/common/bidi/cdp.py +++ /dev/null
@@ -1,519 +0,0 @@ -# The MIT License(MIT) -# -# Copyright(c) 2018 Hyperion Gray -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files(the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in -# all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -# THE SOFTWARE. -# -# This code comes from https://github.com/HyperionGray/trio-chrome-devtools-protocol/tree/master/trio_cdp - -import contextvars -import importlib -import itertools -import json -import logging -import pathlib -from collections import defaultdict -from collections.abc import AsyncGenerator, AsyncIterator, Generator -from contextlib import asynccontextmanager, contextmanager -from dataclasses import dataclass -from typing import Any, TypeVar - -import trio -from trio_websocket import ConnectionClosed as WsConnectionClosed -from trio_websocket import connect_websocket_url - -logger = logging.getLogger("trio_cdp") -T = TypeVar("T") -MAX_WS_MESSAGE_SIZE = 2**24 - -devtools = None -version = None - - -def import_devtools(ver): - """Attempt to load the current latest available devtools into the module cache for use later.""" - global devtools - global version - version = ver - base = "selenium.webdriver.common.devtools.v" - try: - devtools = importlib.import_module(f"{base}{ver}") - return devtools - except ModuleNotFoundError: - # Attempt to parse and load the 'most recent' devtools module. This is likely - # because cdp has been updated but selenium python has not been released yet. - devtools_path = pathlib.Path(__file__).parents[1].joinpath("devtools") - versions = tuple(f.name for f in devtools_path.iterdir() if f.is_dir()) - available_versions = tuple(x for x in versions if x == "latest" or (x.startswith("v") and x[1:].isdigit())) - numeric_versions = tuple(x[1:] for x in available_versions if x.startswith("v")) - if not numeric_versions: - raise - latest = max(numeric_versions, key=int) - selenium_logger = logging.getLogger(__name__) - selenium_logger.debug("Falling back to loading `devtools`: v%s", latest) - devtools = importlib.import_module(f"{base}{latest}") - return devtools - - -_connection_context: contextvars.ContextVar = contextvars.ContextVar("connection_context") -_session_context: contextvars.ContextVar = contextvars.ContextVar("session_context") - - -def get_connection_context(fn_name): - """Look up the current connection. - - If there is no current connection, raise a ``RuntimeError`` with a - helpful message. - """ - try: - return _connection_context.get() - except LookupError: - raise RuntimeError(f"{fn_name}() must be called in a connection context.") - - -def get_session_context(fn_name): - """Look up the current session. - - If there is no current session, raise a ``RuntimeError`` with a - helpful message. - """ - try: - return _session_context.get() - except LookupError: - raise RuntimeError(f"{fn_name}() must be called in a session context.") - - -@contextmanager -def connection_context(connection): - """Context manager installs ``connection`` as the session context for the current Trio task.""" - token = _connection_context.set(connection) - try: - yield - finally: - _connection_context.reset(token) - - -@contextmanager -def session_context(session): - """Context manager installs ``session`` as the session context for the current Trio task.""" - token = _session_context.set(session) - try: - yield - finally: - _session_context.reset(token) - - -def set_global_connection(connection): - """Install ``connection`` in the root context so that it will become the default connection for all tasks. - - This is generally not recommended, except it may be necessary in - certain use cases such as running inside Jupyter notebook. - """ - global _connection_context - _connection_context = contextvars.ContextVar("_connection_context", default=connection) - - -def set_global_session(session): - """Install ``session`` in the root context so that it will become the default session for all tasks. - - This is generally not recommended, except it may be necessary in - certain use cases such as running inside Jupyter notebook. - """ - global _session_context - _session_context = contextvars.ContextVar("_session_context", default=session) - - -class BrowserError(Exception): - """This exception is raised when the browser's response to a command indicates that an error occurred.""" - - def __init__(self, obj): - self.code = obj.get("code") - self.message = obj.get("message") - self.detail = obj.get("data") - - def __str__(self): - return f"BrowserError<code={self.code} message={self.message}> {self.detail}" - - -class CdpConnectionClosed(WsConnectionClosed): - """Raised when a public method is called on a closed CDP connection.""" - - def __init__(self, reason): - """Constructor. - - Args: - reason: wsproto.frame_protocol.CloseReason - """ - self.reason = reason - - def __repr__(self): - """Return representation.""" - return f"{self.__class__.__name__}<{self.reason}>" - - -class InternalError(Exception): - """This exception is only raised when there is faulty logic in TrioCDP or the integration with PyCDP.""" - - pass - - -@dataclass -class CmEventProxy: - """A proxy object returned by :meth:`CdpBase.wait_for()``. - - After the context manager executes, this proxy object will have a - value set that contains the returned event. - """ - - value: Any = None - - -class CdpBase: - def __init__(self, ws, session_id, target_id): - self.ws = ws - self.session_id = session_id - self.target_id = target_id - self.channels = defaultdict(set) - self.id_iter = itertools.count() - self.inflight_cmd = {} - self.inflight_result = {} - - async def execute(self, cmd: Generator[dict, T, Any]) -> T: - """Execute a command on the server and wait for the result. - - Args: - cmd: any CDP command - - Returns: - a CDP result - """ - cmd_id = next(self.id_iter) - cmd_event = trio.Event() - self.inflight_cmd[cmd_id] = cmd, cmd_event - request = next(cmd) - request["id"] = cmd_id - if self.session_id: - request["sessionId"] = self.session_id - request_str = json.dumps(request) - if logger.isEnabledFor(logging.DEBUG): - logger.debug(f"Sending CDP message: {cmd_id} {cmd_event}: {request_str}") - try: - await self.ws.send_message(request_str) - except WsConnectionClosed as wcc: - raise CdpConnectionClosed(wcc.reason) from None - await cmd_event.wait() - response = self.inflight_result.pop(cmd_id) - if logger.isEnabledFor(logging.DEBUG): - logger.debug(f"Received CDP message: {response}") - if isinstance(response, Exception): - if logger.isEnabledFor(logging.DEBUG): - logger.debug(f"Exception raised by {cmd_event} message: {type(response).__name__}") - raise response - return response - - def listen(self, *event_types, buffer_size=10): - """Listen for events. - - Returns: - An async iterator that iterates over events matching the indicated types. - """ - sender, receiver = trio.open_memory_channel(buffer_size) - for event_type in event_types: - self.channels[event_type].add(sender) - return receiver - - @asynccontextmanager - async def wait_for(self, event_type: type[T], buffer_size=10) -> AsyncGenerator[CmEventProxy, None]: - """Wait for an event of the given type and return it. - - This is an async context manager, so you should open it inside - an async with block. The block will not exit until the indicated - event is received. - """ - sender: trio.MemorySendChannel - receiver: trio.MemoryReceiveChannel - sender, receiver = trio.open_memory_channel(buffer_size) - self.channels[event_type].add(sender) - proxy = CmEventProxy() - yield proxy - async with receiver: - event = await receiver.receive() - proxy.value = event - - def _handle_data(self, data): - """Handle incoming WebSocket data. - - Args: - data: a JSON dictionary - """ - if "id" in data: - self._handle_cmd_response(data) - else: - self._handle_event(data) - - def _handle_cmd_response(self, data: dict): - """Handle a response to a command. - - This will set an event flag that will return control to the - task that called the command. - - Args: - data: response as a JSON dictionary - """ - cmd_id = data["id"] - try: - cmd, event = self.inflight_cmd.pop(cmd_id) - except KeyError: - logger.warning("Got a message with a command ID that does not exist: %s", data) - return - if "error" in data: - # If the server reported an error, convert it to an exception and do - # not process the response any further. - self.inflight_result[cmd_id] = BrowserError(data["error"]) - else: - # Otherwise, continue the generator to parse the JSON result - # into a CDP object. - try: - _ = cmd.send(data["result"]) - raise InternalError("The command's generator function did not exit when expected!") - except StopIteration as exit: - return_ = exit.value - self.inflight_result[cmd_id] = return_ - event.set() - - def _handle_event(self, data: dict): - """Handle an event. - - Args: - data: event as a JSON dictionary - """ - global devtools - if devtools is None: - raise RuntimeError("CDP devtools module not loaded. Call import_devtools() first.") - event = devtools.util.parse_json_event(data) - logger.debug("Received event: %s", event) - to_remove = set() - for sender in self.channels[type(event)]: - try: - sender.send_nowait(event) - except trio.WouldBlock: - logger.error('Unable to send event "%r" due to full channel %s', event, sender) - except trio.BrokenResourceError: - to_remove.add(sender) - if to_remove: - self.channels[type(event)] -= to_remove - - -class CdpSession(CdpBase): - """Contains the state for a CDP session. - - Generally you should not instantiate this object yourself; you should call - :meth:`CdpConnection.open_session`. - """ - - def __init__(self, ws, session_id, target_id): - """Constructor. - - Args: - ws: trio_websocket.WebSocketConnection - session_id: devtools.target.SessionID - target_id: devtools.target.TargetID - """ - super().__init__(ws, session_id, target_id) - - self._dom_enable_count = 0 - self._dom_enable_lock = trio.Lock() - self._page_enable_count = 0 - self._page_enable_lock = trio.Lock() - - @asynccontextmanager - async def dom_enable(self): - """Context manager that executes ``dom.enable()`` when it enters and then calls ``dom.disable()``. - - This keeps track of concurrent callers and only disables DOM - events when all callers have exited. - """ - global devtools - async with self._dom_enable_lock: - self._dom_enable_count += 1 - if self._dom_enable_count == 1: - await self.execute(devtools.dom.enable()) - - yield - - async with self._dom_enable_lock: - self._dom_enable_count -= 1 - if self._dom_enable_count == 0: - await self.execute(devtools.dom.disable()) - - @asynccontextmanager - async def page_enable(self): - """Context manager executes ``page.enable()`` when it enters and then calls ``page.disable()`` when it exits. - - This keeps track of concurrent callers and only disables page - events when all callers have exited. - """ - global devtools - async with self._page_enable_lock: - self._page_enable_count += 1 - if self._page_enable_count == 1: - await self.execute(devtools.page.enable()) - - yield - - async with self._page_enable_lock: - self._page_enable_count -= 1 - if self._page_enable_count == 0: - await self.execute(devtools.page.disable()) - - -class CdpConnection(CdpBase, trio.abc.AsyncResource): - """Contains the connection state for a Chrome DevTools Protocol server. - - CDP can multiplex multiple "sessions" over a single connection. This - class corresponds to the "root" session, i.e. the implicitly created - session that has no session ID. This class is responsible for - reading incoming WebSocket messages and forwarding them to the - corresponding session, as well as handling messages targeted at the - root session itself. You should generally call the - :func:`open_cdp()` instead of instantiating this class directly. - """ - - def __init__(self, ws): - """Constructor. - - Args: - ws: trio_websocket.WebSocketConnection - """ - super().__init__(ws, session_id=None, target_id=None) - self.sessions = {} - - async def aclose(self): - """Close the underlying WebSocket connection. - - This will cause the reader task to gracefully exit when it tries - to read the next message from the WebSocket. All of the public - APIs (``execute()``, ``listen()``, etc.) will raise - ``CdpConnectionClosed`` after the CDP connection is closed. It - is safe to call this multiple times. - """ - await self.ws.aclose() - - @asynccontextmanager - async def open_session(self, target_id) -> AsyncIterator[CdpSession]: - """Context manager opens a session and enables the "simple" style of calling CDP APIs. - - For example, inside a session context, you can call ``await - dom.get_document()`` and it will execute on the current session - automatically. - """ - session = await self.connect_session(target_id) - with session_context(session): - yield session - - async def connect_session(self, target_id) -> "CdpSession": - """Returns a new :class:`CdpSession` connected to the specified target.""" - global devtools - if devtools is None: - raise RuntimeError("CDP devtools module not loaded. Call import_devtools() first.") - session_id = await self.execute(devtools.target.attach_to_target(target_id, True)) - session = CdpSession(self.ws, session_id, target_id) - self.sessions[session_id] = session - return session - - async def _reader_task(self): - """Runs in the background and handles incoming messages. - - Dispatches responses to commands and events to listeners. - """ - global devtools - if devtools is None: - raise RuntimeError("CDP devtools module not loaded. Call import_devtools() first.") - while True: - try: - message = await self.ws.get_message() - except WsConnectionClosed: - # If the WebSocket is closed, we don't want to throw an - # exception from the reader task. Instead we will throw - # exceptions from the public API methods, and we can quietly - # exit the reader task here. - break - try: - data = json.loads(message) - except json.JSONDecodeError: - raise BrowserError({"code": -32700, "message": "Client received invalid JSON", "data": message}) - logger.debug("Received message %r", data) - if "sessionId" in data: - session_id = devtools.target.SessionID(data["sessionId"]) - try: - session = self.sessions[session_id] - except KeyError: - raise BrowserError( - { - "code": -32700, - "message": "Browser sent a message for an invalid session", - "data": f"{session_id!r}", - } - ) - session._handle_data(data) - else: - self._handle_data(data) - - for _, session in self.sessions.items(): - for _, senders in session.channels.items(): - for sender in senders: - sender.close() - - -@asynccontextmanager -async def open_cdp(url) -> AsyncIterator[CdpConnection]: - """Async context manager opens a connection to the browser then closes the connection when the block exits. - - The context manager also sets the connection as the default - connection for the current task, so that commands like ``await - target.get_targets()`` will run on this connection automatically. If - you want to use multiple connections concurrently, it is recommended - to open each on in a separate task. - """ - async with trio.open_nursery() as nursery: - conn = await connect_cdp(nursery, url) - try: - with connection_context(conn): - yield conn - finally: - await conn.aclose() - - -async def connect_cdp(nursery, url) -> CdpConnection: - """Connect to the browser specified by ``url`` and spawn a background task in the specified nursery. - - The ``open_cdp()`` context manager is preferred in most situations. - You should only use this function if you need to specify a custom - nursery. This connection is not automatically closed! You can either - use the connection object as a context manager (``async with - conn:``) or else call ``await conn.aclose()`` on it when you are - done with it. If ``set_context`` is True, then the returned - connection will be installed as the default connection for the - current task. This argument is for unusual use cases, such as - running inside of a notebook. - """ - ws = await connect_websocket_url(nursery, url, max_message_size=MAX_WS_MESSAGE_SIZE) - cdp_conn = CdpConnection(ws) - nursery.start_soon(cdp_conn._reader_task) - return cdp_conn
diff --git a/selenium/webdriver/common/bidi/common.py b/selenium/webdriver/common/bidi/common.py deleted file mode 100644 index ff67b56..0000000 --- a/selenium/webdriver/common/bidi/common.py +++ /dev/null
@@ -1,43 +0,0 @@ -# Licensed to the Software Freedom Conservancy (SFC) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The SFC licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -"""Common utilities for BiDi command construction.""" - -from __future__ import annotations - -from collections.abc import Generator -from typing import Any - - -def command_builder(method: str, params: dict[str, Any] | None = None) -> Generator[dict[str, Any], Any, Any]: - """Build a BiDi command generator. - - Args: - method: The BiDi method name (e.g., "session.status", "browser.close") - params: The parameters for the command. If omitted, an empty - dictionary is sent. - - Yields: - A dictionary representing the BiDi command - - Returns: - The result from the BiDi command execution - """ - if params is None: - params = {} - result = yield {"method": method, "params": params} - return result
diff --git a/selenium/webdriver/common/bidi/console.py b/selenium/webdriver/common/bidi/console.py deleted file mode 100644 index 93fe7d8..0000000 --- a/selenium/webdriver/common/bidi/console.py +++ /dev/null
@@ -1,24 +0,0 @@ -# Licensed to the Software Freedom Conservancy (SFC) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The SFC licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -from enum import Enum - - -class Console(Enum): - ALL = "all" - LOG = "log" - ERROR = "error"
diff --git a/selenium/webdriver/common/bidi/emulation.py b/selenium/webdriver/common/bidi/emulation.py deleted file mode 100644 index a3e6b4b..0000000 --- a/selenium/webdriver/common/bidi/emulation.py +++ /dev/null
@@ -1,500 +0,0 @@ -# Licensed to the Software Freedom Conservancy (SFC) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The SFC licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - - -from __future__ import annotations - -from dataclasses import dataclass, field -from typing import Any - -from selenium.webdriver.common.bidi.common import command_builder - - -class ForcedColorsModeTheme: - """ForcedColorsModeTheme.""" - - LIGHT = "light" - DARK = "dark" - - -class ScreenOrientationNatural: - """ScreenOrientationNatural.""" - - PORTRAIT = "portrait" - LANDSCAPE = "landscape" - - -class ScreenOrientationType: - """ScreenOrientationType.""" - - PORTRAIT_PRIMARY = "portrait-primary" - PORTRAIT_SECONDARY = "portrait-secondary" - LANDSCAPE_PRIMARY = "landscape-primary" - LANDSCAPE_SECONDARY = "landscape-secondary" - - -@dataclass -class SetForcedColorsModeThemeOverrideParameters: - """SetForcedColorsModeThemeOverrideParameters.""" - - theme: Any | None = None - contexts: list[Any] = field(default_factory=list) - user_contexts: list[Any] = field(default_factory=list) - - -@dataclass -class SetGeolocationOverrideParameters: - """SetGeolocationOverrideParameters.""" - - contexts: list[Any] = field(default_factory=list) - user_contexts: list[Any] = field(default_factory=list) - - -@dataclass -class GeolocationCoordinates: - """GeolocationCoordinates.""" - - latitude: Any | None = None - longitude: Any | None = None - accuracy: Any | None = None - altitude: Any | None = None - altitude_accuracy: Any | None = None - heading: Any | None = None - speed: Any | None = None - - -@dataclass -class GeolocationPositionError: - """GeolocationPositionError.""" - - type: str = field(default="positionUnavailable", init=False) - - -@dataclass -class SetLocaleOverrideParameters: - """SetLocaleOverrideParameters.""" - - locale: Any | None = None - contexts: list[Any] = field(default_factory=list) - user_contexts: list[Any] = field(default_factory=list) - - -@dataclass -class NetworkConditionsOffline: - """NetworkConditionsOffline.""" - - type: str = field(default="offline", init=False) - - -@dataclass -class ScreenArea: - """ScreenArea.""" - - width: Any | None = None - height: Any | None = None - - -@dataclass -class SetScreenSettingsOverrideParameters: - """SetScreenSettingsOverrideParameters.""" - - screen_area: Any | None = None - contexts: list[Any] = field(default_factory=list) - user_contexts: list[Any] = field(default_factory=list) - - -@dataclass -class ScreenOrientation: - """ScreenOrientation.""" - - natural: Any | None = None - type: Any | None = None - - -@dataclass -class SetScreenOrientationOverrideParameters: - """SetScreenOrientationOverrideParameters.""" - - screen_orientation: Any | None = None - contexts: list[Any] = field(default_factory=list) - user_contexts: list[Any] = field(default_factory=list) - - -@dataclass -class SetUserAgentOverrideParameters: - """SetUserAgentOverrideParameters.""" - - user_agent: Any | None = None - contexts: list[Any] = field(default_factory=list) - user_contexts: list[Any] = field(default_factory=list) - - -@dataclass -class SetScriptingEnabledParameters: - """SetScriptingEnabledParameters.""" - - enabled: Any | None = None - contexts: list[Any] = field(default_factory=list) - user_contexts: list[Any] = field(default_factory=list) - - -@dataclass -class SetScrollbarTypeOverrideParameters: - """SetScrollbarTypeOverrideParameters.""" - - scrollbar_type: Any | None = None - contexts: list[Any] = field(default_factory=list) - user_contexts: list[Any] = field(default_factory=list) - - -@dataclass -class SetTimezoneOverrideParameters: - """SetTimezoneOverrideParameters.""" - - timezone: Any | None = None - contexts: list[Any] = field(default_factory=list) - user_contexts: list[Any] = field(default_factory=list) - - -@dataclass -class SetTouchOverrideParameters: - """SetTouchOverrideParameters.""" - - contexts: list[Any] = field(default_factory=list) - user_contexts: list[Any] = field(default_factory=list) - - -@dataclass -class SetNetworkConditionsParameters: - """SetNetworkConditionsParameters.""" - - network_conditions: Any | None = None - contexts: list[Any] = field(default_factory=list) - user_contexts: list[Any] = field(default_factory=list) - - -# Backward-compatible alias for existing imports -setNetworkConditionsParameters = SetNetworkConditionsParameters - - -class Emulation: - """WebDriver BiDi emulation module.""" - - def __init__(self, conn) -> None: - self._conn = conn - - def set_forced_colors_mode_theme_override( - self, - theme: Any | None = None, - contexts: list[Any] | None = None, - user_contexts: list[Any] | None = None, - ): - """Execute emulation.setForcedColorsModeThemeOverride.""" - if theme is None: - raise TypeError("set_forced_colors_mode_theme_override() missing required argument: 'theme'") - - params = { - "theme": theme, - "contexts": contexts, - "userContexts": user_contexts, - } - params = {k: v for k, v in params.items() if v is not None} - cmd = command_builder("emulation.setForcedColorsModeThemeOverride", params) - result = self._conn.execute(cmd) - return result - - def set_locale_override( - self, - locale: Any | None = None, - contexts: list[Any] | None = None, - user_contexts: list[Any] | None = None, - ): - """Execute emulation.setLocaleOverride.""" - if locale is None: - raise TypeError("set_locale_override() missing required argument: 'locale'") - - params = { - "locale": locale, - "contexts": contexts, - "userContexts": user_contexts, - } - params = {k: v for k, v in params.items() if v is not None} - cmd = command_builder("emulation.setLocaleOverride", params) - result = self._conn.execute(cmd) - return result - - def set_scrollbar_type_override( - self, - scrollbar_type: Any | None = None, - contexts: list[Any] | None = None, - user_contexts: list[Any] | None = None, - ): - """Execute emulation.setScrollbarTypeOverride.""" - if scrollbar_type is None: - raise TypeError("set_scrollbar_type_override() missing required argument: 'scrollbar_type'") - - params = { - "scrollbarType": scrollbar_type, - "contexts": contexts, - "userContexts": user_contexts, - } - params = {k: v for k, v in params.items() if v is not None} - cmd = command_builder("emulation.setScrollbarTypeOverride", params) - result = self._conn.execute(cmd) - return result - - def set_touch_override(self, contexts: list[Any] | None = None, user_contexts: list[Any] | None = None): - """Execute emulation.setTouchOverride.""" - params = { - "contexts": contexts, - "userContexts": user_contexts, - } - params = {k: v for k, v in params.items() if v is not None} - cmd = command_builder("emulation.setTouchOverride", params) - result = self._conn.execute(cmd) - return result - - def set_geolocation_override( - self, - coordinates=None, - error=None, - contexts: list[Any] | None = None, - user_contexts: list[Any] | None = None, - ): - """Execute emulation.setGeolocationOverride. - - Sets or clears the geolocation override for specified browsing or user contexts. - - Args: - coordinates: A GeolocationCoordinates instance (or dict) to override the - position, or ``None`` to clear a previously-set override. - error: A GeolocationPositionError instance (or dict) to simulate a - position-unavailable error. Mutually exclusive with *coordinates*. - contexts: List of browsing context IDs to target. - user_contexts: List of user context IDs to target. - """ - params: dict[str, Any] = {} - if coordinates is not None: - if isinstance(coordinates, dict): - coords_dict = coordinates - else: - coords_dict = {} - if coordinates.latitude is not None: - coords_dict["latitude"] = coordinates.latitude - if coordinates.longitude is not None: - coords_dict["longitude"] = coordinates.longitude - if coordinates.accuracy is not None: - coords_dict["accuracy"] = coordinates.accuracy - if coordinates.altitude is not None: - coords_dict["altitude"] = coordinates.altitude - if coordinates.altitude_accuracy is not None: - coords_dict["altitudeAccuracy"] = coordinates.altitude_accuracy - if coordinates.heading is not None: - coords_dict["heading"] = coordinates.heading - if coordinates.speed is not None: - coords_dict["speed"] = coordinates.speed - params["coordinates"] = coords_dict - if error is not None: - if isinstance(error, dict): - params["error"] = error - else: - params["error"] = {"type": error.type if error.type is not None else "positionUnavailable"} - if contexts is not None: - params["contexts"] = contexts - if user_contexts is not None: - params["userContexts"] = user_contexts - cmd = command_builder("emulation.setGeolocationOverride", params) - result = self._conn.execute(cmd) - return result - - def set_timezone_override( - self, - timezone=None, - contexts: list[Any] | None = None, - user_contexts: list[Any] | None = None, - ): - """Execute emulation.setTimezoneOverride. - - Sets or clears the timezone override for specified browsing or user contexts. - Pass ``timezone=None`` (or omit it) to clear a previously-set override. - - Args: - timezone: IANA timezone string (e.g. ``"America/New_York"``) or ``None`` - to clear the override. - contexts: List of browsing context IDs to target. - user_contexts: List of user context IDs to target. - """ - params: dict[str, Any] = {"timezone": timezone} - if contexts is not None: - params["contexts"] = contexts - if user_contexts is not None: - params["userContexts"] = user_contexts - cmd = command_builder("emulation.setTimezoneOverride", params) - return self._conn.execute(cmd) - - def set_scripting_enabled( - self, - enabled=None, - contexts: list[Any] | None = None, - user_contexts: list[Any] | None = None, - ): - """Execute emulation.setScriptingEnabled. - - Enables or disables scripting for specified browsing or user contexts. - Pass ``enabled=None`` to restore the default behaviour. - - Args: - enabled: ``True`` to enable scripting, ``False`` to disable it, or - ``None`` to clear the override. - contexts: List of browsing context IDs to target. - user_contexts: List of user context IDs to target. - """ - params: dict[str, Any] = {"enabled": enabled} - if contexts is not None: - params["contexts"] = contexts - if user_contexts is not None: - params["userContexts"] = user_contexts - cmd = command_builder("emulation.setScriptingEnabled", params) - return self._conn.execute(cmd) - - def set_user_agent_override( - self, - user_agent=None, - contexts: list[Any] | None = None, - user_contexts: list[Any] | None = None, - ): - """Execute emulation.setUserAgentOverride. - - Overrides the User-Agent string for specified browsing or user contexts. - Pass ``user_agent=None`` to clear a previously-set override. - - Args: - user_agent: Custom User-Agent string, or ``None`` to clear the override. - contexts: List of browsing context IDs to target. - user_contexts: List of user context IDs to target. - """ - params: dict[str, Any] = {"userAgent": user_agent} - if contexts is not None: - params["contexts"] = contexts - if user_contexts is not None: - params["userContexts"] = user_contexts - cmd = command_builder("emulation.setUserAgentOverride", params) - return self._conn.execute(cmd) - - def set_screen_orientation_override( - self, - screen_orientation=None, - contexts: list[Any] | None = None, - user_contexts: list[Any] | None = None, - ): - """Execute emulation.setScreenOrientationOverride. - - Sets or clears the screen orientation override for specified browsing or - user contexts. - - Args: - screen_orientation: A :class:`ScreenOrientation` instance (or dict with - ``natural`` and ``type`` keys) to lock the orientation, or ``None`` - to clear a previously-set override. - contexts: List of browsing context IDs to target. - user_contexts: List of user context IDs to target. - """ - if screen_orientation is None: - so_value = None - elif isinstance(screen_orientation, dict): - so_value = screen_orientation - else: - natural = screen_orientation.natural - orientation_type = screen_orientation.type - so_value = { - "natural": natural.lower() if isinstance(natural, str) else natural, - "type": orientation_type.lower() if isinstance(orientation_type, str) else orientation_type, - } - params: dict[str, Any] = {"screenOrientation": so_value} - if contexts is not None: - params["contexts"] = contexts - if user_contexts is not None: - params["userContexts"] = user_contexts - cmd = command_builder("emulation.setScreenOrientationOverride", params) - return self._conn.execute(cmd) - - def set_network_conditions( - self, - network_conditions=None, - offline: bool | None = None, - contexts: list[Any] | None = None, - user_contexts: list[Any] | None = None, - ): - """Execute emulation.setNetworkConditions. - - Sets or clears network condition emulation for specified browsing or user - contexts. - - Args: - network_conditions: A dict with the raw ``networkConditions`` value - (e.g. ``{"type": "offline"}``), or ``None`` to clear the override. - Mutually exclusive with *offline*. - offline: Convenience bool — ``True`` sets offline conditions, - ``False`` clears them (sends ``null``). When provided, this takes - precedence over *network_conditions*. - contexts: List of browsing context IDs to target. - user_contexts: List of user context IDs to target. - """ - if offline is not None: - nc_value = {"type": "offline"} if offline else None - else: - nc_value = network_conditions - params: dict[str, Any] = {"networkConditions": nc_value} - if contexts is not None: - params["contexts"] = contexts - if user_contexts is not None: - params["userContexts"] = user_contexts - cmd = command_builder("emulation.setNetworkConditions", params) - return self._conn.execute(cmd) - - def set_screen_settings_override( - self, - width: int | None = None, - height: int | None = None, - contexts: list[Any] | None = None, - user_contexts: list[Any] | None = None, - ): - """Execute emulation.setScreenSettingsOverride. - - Sets or clears the screen settings override for specified browsing or user - contexts. - - Args: - width: The screen width in pixels, or ``None`` to clear the override. - height: The screen height in pixels, or ``None`` to clear the override. - contexts: List of browsing context IDs to target. - user_contexts: List of user context IDs to target. - """ - screen_area = None - if width is not None or height is not None: - screen_area = {} - if width is not None: - screen_area["width"] = width - if height is not None: - screen_area["height"] = height - params: dict[str, Any] = {"screenArea": screen_area} - if contexts is not None: - params["contexts"] = contexts - if user_contexts is not None: - params["userContexts"] = user_contexts - cmd = command_builder("emulation.setScreenSettingsOverride", params) - return self._conn.execute(cmd)
diff --git a/selenium/webdriver/common/bidi/input.py b/selenium/webdriver/common/bidi/input.py deleted file mode 100644 index 6c06fc4..0000000 --- a/selenium/webdriver/common/bidi/input.py +++ /dev/null
@@ -1,339 +0,0 @@ -# Licensed to the Software Freedom Conservancy (SFC) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The SFC licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - - -from __future__ import annotations - -from collections.abc import Callable -from dataclasses import dataclass, field -from typing import Any - -from selenium.webdriver.common.bidi._event_manager import EventConfig, _EventManager -from selenium.webdriver.common.bidi.common import command_builder - - -class PointerType: - """PointerType.""" - - MOUSE = "mouse" - PEN = "pen" - TOUCH = "touch" - - -class Origin: - """Origin.""" - - VIEWPORT = "viewport" - POINTER = "pointer" - - -@dataclass -class ElementOrigin: - """ElementOrigin.""" - - type: str = field(default="element", init=False) - element: Any | None = None - - -@dataclass -class PerformActionsParameters: - """PerformActionsParameters.""" - - context: Any | None = None - actions: list[Any] = field(default_factory=list) - - -@dataclass -class NoneSourceActions: - """NoneSourceActions.""" - - type: str = field(default="none", init=False) - id: str | None = None - actions: list[Any] = field(default_factory=list) - - -@dataclass -class KeySourceActions: - """KeySourceActions.""" - - type: str = field(default="key", init=False) - id: str | None = None - actions: list[Any] = field(default_factory=list) - - -@dataclass -class PointerSourceActions: - """PointerSourceActions.""" - - type: str = field(default="pointer", init=False) - id: str | None = None - parameters: Any | None = None - actions: list[Any] = field(default_factory=list) - - -@dataclass -class PointerParameters: - """PointerParameters.""" - - pointer_type: Any | None = None - - -@dataclass -class WheelSourceActions: - """WheelSourceActions.""" - - type: str = field(default="wheel", init=False) - id: str | None = None - actions: list[Any] = field(default_factory=list) - - -@dataclass -class PauseAction: - """PauseAction.""" - - type: str = field(default="pause", init=False) - duration: Any | None = None - - -@dataclass -class KeyDownAction: - """KeyDownAction.""" - - type: str = field(default="keyDown", init=False) - value: str | None = None - - -@dataclass -class KeyUpAction: - """KeyUpAction.""" - - type: str = field(default="keyUp", init=False) - value: str | None = None - - -@dataclass -class PointerUpAction: - """PointerUpAction.""" - - type: str = field(default="pointerUp", init=False) - button: Any | None = None - - -@dataclass -class WheelScrollAction: - """WheelScrollAction.""" - - type: str = field(default="scroll", init=False) - x: Any | None = None - y: Any | None = None - delta_x: Any | None = None - delta_y: Any | None = None - duration: Any | None = None - origin: Any | None = None - - -@dataclass -class PointerCommonProperties: - """PointerCommonProperties.""" - - width: Any | None = None - height: Any | None = None - pressure: Any | None = None - tangential_pressure: Any | None = None - twist: Any | None = None - altitude_angle: Any | None = None - azimuth_angle: Any | None = None - - -@dataclass -class ReleaseActionsParameters: - """ReleaseActionsParameters.""" - - context: Any | None = None - - -@dataclass -class SetFilesParameters: - """SetFilesParameters.""" - - context: Any | None = None - element: Any | None = None - files: list[Any] = field(default_factory=list) - - -@dataclass -class FileDialogInfo: - """FileDialogInfo - parameters for the input.fileDialogOpened event.""" - - context: Any | None = None - element: Any | None = None - multiple: bool | None = None - - @classmethod - def from_json(cls, params: dict) -> FileDialogInfo: - """Deserialize event params into FileDialogInfo.""" - return cls( - context=params.get("context"), - element=params.get("element"), - multiple=params.get("multiple"), - ) - - -@dataclass -class PointerMoveAction: - """PointerMoveAction.""" - - type: str = field(default="pointerMove", init=False) - x: Any | None = None - y: Any | None = None - duration: Any | None = None - origin: Any | None = None - properties: Any | None = None - - -@dataclass -class PointerDownAction: - """PointerDownAction.""" - - type: str = field(default="pointerDown", init=False) - button: Any | None = None - properties: Any | None = None - - -# BiDi Event Name to Parameter Type Mapping -EVENT_NAME_MAPPING = { - "file_dialog_opened": "input.fileDialogOpened", -} - - -class Input: - """WebDriver BiDi input module.""" - - EVENT_CONFIGS: dict[str, EventConfig] = {} - - def __init__(self, conn) -> None: - self._conn = conn - self._event_manager = _EventManager(conn, self.EVENT_CONFIGS) - - def perform_actions(self, context: Any | None = None, actions: list[Any] | None = None): - """Execute input.performActions.""" - if context is None: - raise TypeError("perform_actions() missing required argument: 'context'") - if actions is None: - raise TypeError("perform_actions() missing required argument: 'actions'") - - params = { - "context": context, - "actions": actions, - } - params = {k: v for k, v in params.items() if v is not None} - cmd = command_builder("input.performActions", params) - result = self._conn.execute(cmd) - return result - - def release_actions(self, context: Any | None = None): - """Execute input.releaseActions.""" - if context is None: - raise TypeError("release_actions() missing required argument: 'context'") - - params = { - "context": context, - } - params = {k: v for k, v in params.items() if v is not None} - cmd = command_builder("input.releaseActions", params) - result = self._conn.execute(cmd) - return result - - def set_files(self, context: Any | None = None, element: Any | None = None, files: list[Any] | None = None): - """Execute input.setFiles.""" - if context is None: - raise TypeError("set_files() missing required argument: 'context'") - if element is None: - raise TypeError("set_files() missing required argument: 'element'") - if files is None: - raise TypeError("set_files() missing required argument: 'files'") - - params = { - "context": context, - "element": element, - "files": files, - } - params = {k: v for k, v in params.items() if v is not None} - cmd = command_builder("input.setFiles", params) - result = self._conn.execute(cmd) - return result - - def add_file_dialog_handler(self, callback) -> int: - """Subscribe to the input.fileDialogOpened event. - - Args: - callback: Callable invoked with a FileDialogInfo when a file dialog opens. - - Returns: - A handler ID that can be passed to remove_file_dialog_handler. - """ - return self._event_manager.add_event_handler("file_dialog_opened", callback) - - def remove_file_dialog_handler(self, handler_id: int) -> None: - """Unsubscribe a previously registered file dialog event handler. - - Args: - handler_id: The handler ID returned by add_file_dialog_handler. - """ - return self._event_manager.remove_event_handler("file_dialog_opened", handler_id) - - def add_event_handler(self, event: str, callback: Callable, contexts: list[str] | None = None) -> int: - """Add an event handler. - - Args: - event: The event to subscribe to. - callback: The callback function to execute on event. - contexts: The context IDs to subscribe to (optional). - - Returns: - The callback ID. - """ - return self._event_manager.add_event_handler(event, callback, contexts) - - def remove_event_handler(self, event: str, callback_id: int) -> None: - """Remove an event handler. - - Args: - event: The event to unsubscribe from. - callback_id: The callback ID. - """ - return self._event_manager.remove_event_handler(event, callback_id) - - def clear_event_handlers(self) -> None: - """Clear all event handlers.""" - return self._event_manager.clear_event_handlers() - - -# Event Info Type Aliases -# Event: input.fileDialogOpened -FileDialogOpened = globals().get("FileDialogInfo", dict) # Fallback to dict if type not defined - - -# Populate EVENT_CONFIGS with event configuration mappings -_globals = globals() -Input.EVENT_CONFIGS = { - "file_dialog_opened": EventConfig( - "file_dialog_opened", - "input.fileDialogOpened", - _globals.get("FileDialogOpened", dict) if _globals.get("FileDialogOpened") else dict, - ), -}
diff --git a/selenium/webdriver/common/bidi/log.py b/selenium/webdriver/common/bidi/log.py deleted file mode 100644 index 5979364..0000000 --- a/selenium/webdriver/common/bidi/log.py +++ /dev/null
@@ -1,167 +0,0 @@ -# Licensed to the Software Freedom Conservancy (SFC) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The SFC licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - - -from __future__ import annotations - -from collections.abc import Callable -from dataclasses import dataclass -from typing import Any - -from selenium.webdriver.common.bidi._event_manager import EventConfig, _EventManager - - -class Level: - """Level.""" - - DEBUG = "debug" - INFO = "info" - WARN = "warn" - ERROR = "error" - - -LogLevel = Level - - -@dataclass -class BaseLogEntry: - """BaseLogEntry.""" - - level: Any | None = None - source: Any | None = None - text: Any | None = None - timestamp: Any | None = None - stack_trace: Any | None = None - - -@dataclass -class GenericLogEntry: - """GenericLogEntry.""" - - type: str | None = None - - -@dataclass -class ConsoleLogEntry: - """ConsoleLogEntry - a console log entry from the browser.""" - - type_: str | None = None - method: str | None = None - args: list | None = None - level: Any | None = None - text: Any | None = None - source: Any | None = None - timestamp: Any | None = None - stack_trace: Any | None = None - - @classmethod - def from_json(cls, params: dict) -> ConsoleLogEntry: - """Deserialize from BiDi params dict.""" - return cls( - type_=params.get("type"), - method=params.get("method"), - args=params.get("args"), - level=params.get("level"), - text=params.get("text"), - source=params.get("source"), - timestamp=params.get("timestamp"), - stack_trace=params.get("stackTrace"), - ) - - -@dataclass -class JavascriptLogEntry: - """JavascriptLogEntry - a JavaScript error log entry from the browser.""" - - type_: str | None = None - level: Any | None = None - text: Any | None = None - source: Any | None = None - timestamp: Any | None = None - stacktrace: Any | None = None - - @classmethod - def from_json(cls, params: dict) -> JavascriptLogEntry: - """Deserialize from BiDi params dict.""" - return cls( - type_=params.get("type"), - level=params.get("level"), - text=params.get("text"), - source=params.get("source"), - timestamp=params.get("timestamp"), - stacktrace=params.get("stackTrace"), - ) - - -Entry = GenericLogEntry | ConsoleLogEntry | JavascriptLogEntry - -# BiDi Event Name to Parameter Type Mapping -EVENT_NAME_MAPPING = { - "entry_added": "log.entryAdded", -} - - -class Log: - """WebDriver BiDi log module.""" - - EVENT_CONFIGS: dict[str, EventConfig] = {} - - def __init__(self, conn) -> None: - self._conn = conn - self._event_manager = _EventManager(conn, self.EVENT_CONFIGS) - - def add_event_handler(self, event: str, callback: Callable, contexts: list[str] | None = None) -> int: - """Add an event handler. - - Args: - event: The event to subscribe to. - callback: The callback function to execute on event. - contexts: The context IDs to subscribe to (optional). - - Returns: - The callback ID. - """ - return self._event_manager.add_event_handler(event, callback, contexts) - - def remove_event_handler(self, event: str, callback_id: int) -> None: - """Remove an event handler. - - Args: - event: The event to unsubscribe from. - callback_id: The callback ID. - """ - return self._event_manager.remove_event_handler(event, callback_id) - - def clear_event_handlers(self) -> None: - """Clear all event handlers.""" - return self._event_manager.clear_event_handlers() - - -# Event Info Type Aliases -# Event: log.entryAdded -EntryAdded = Entry - - -# Populate EVENT_CONFIGS with event configuration mappings -_globals = globals() -Log.EVENT_CONFIGS = { - "entry_added": EventConfig( - "entry_added", - "log.entryAdded", - _globals.get("EntryAdded", dict) if _globals.get("EntryAdded") else dict, - ), -}
diff --git a/selenium/webdriver/common/bidi/network.py b/selenium/webdriver/common/bidi/network.py deleted file mode 100644 index 6c24e39..0000000 --- a/selenium/webdriver/common/bidi/network.py +++ /dev/null
@@ -1,925 +0,0 @@ -# Licensed to the Software Freedom Conservancy (SFC) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The SFC licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - - -from __future__ import annotations - -from collections.abc import Callable -from dataclasses import dataclass, field -from typing import Any - -from selenium.webdriver.common.bidi._event_manager import EventConfig, _EventManager -from selenium.webdriver.common.bidi.common import command_builder - - -class SameSite: - """SameSite.""" - - STRICT = "strict" - LAX = "lax" - NONE = "none" - DEFAULT = "default" - - -class DataType: - """DataType.""" - - REQUEST = "request" - RESPONSE = "response" - - -class InterceptPhase: - """InterceptPhase.""" - - BEFOREREQUESTSENT = "beforeRequestSent" - RESPONSESTARTED = "responseStarted" - AUTHREQUIRED = "authRequired" - - -class ContinueWithAuthNoCredentials: - """ContinueWithAuthNoCredentials.""" - - DEFAULT = "default" - CANCEL = "cancel" - - -@dataclass -class AuthChallenge: - """AuthChallenge.""" - - scheme: str | None = None - realm: str | None = None - - -@dataclass -class AuthCredentials: - """AuthCredentials.""" - - type: str = field(default="password", init=False) - username: str | None = None - password: str | None = None - - -@dataclass -class BaseParameters: - """BaseParameters.""" - - context: Any | None = None - is_blocked: bool | None = None - navigation: Any | None = None - redirect_count: Any | None = None - request: Any | None = None - timestamp: Any | None = None - user_context: Any | None = None - intercepts: list[Any] = field(default_factory=list) - - -@dataclass -class StringValue: - """StringValue.""" - - type: str = field(default="string", init=False) - value: str | None = None - - -@dataclass -class Base64Value: - """Base64Value.""" - - type: str = field(default="base64", init=False) - value: str | None = None - - -@dataclass -class Cookie: - """Cookie.""" - - name: str | None = None - value: Any | None = None - domain: str | None = None - path: str | None = None - size: Any | None = None - http_only: bool | None = None - secure: bool | None = None - same_site: Any | None = None - expiry: Any | None = None - - -@dataclass -class CookieHeader: - """CookieHeader.""" - - name: str | None = None - value: Any | None = None - - -@dataclass -class FetchTimingInfo: - """FetchTimingInfo.""" - - time_origin: Any | None = None - request_time: Any | None = None - redirect_start: Any | None = None - redirect_end: Any | None = None - fetch_start: Any | None = None - dns_start: Any | None = None - dns_end: Any | None = None - connect_start: Any | None = None - connect_end: Any | None = None - tls_start: Any | None = None - request_start: Any | None = None - response_start: Any | None = None - response_end: Any | None = None - - -@dataclass -class Header: - """Header.""" - - name: str | None = None - value: Any | None = None - - -@dataclass -class Initiator: - """Initiator.""" - - column_number: Any | None = None - line_number: Any | None = None - request: Any | None = None - stack_trace: Any | None = None - type: Any | None = None - - -@dataclass -class ResponseContent: - """ResponseContent.""" - - size: Any | None = None - - -@dataclass -class ResponseData: - """ResponseData.""" - - url: str | None = None - protocol: str | None = None - status: Any | None = None - status_text: str | None = None - from_cache: bool | None = None - headers: list[Any] = field(default_factory=list) - mime_type: str | None = None - bytes_received: Any | None = None - headers_size: Any | None = None - body_size: Any | None = None - content: Any | None = None - auth_challenges: list[Any] = field(default_factory=list) - - -@dataclass -class SetCookieHeader: - """SetCookieHeader.""" - - name: str | None = None - value: Any | None = None - domain: str | None = None - http_only: bool | None = None - expiry: str | None = None - max_age: Any | None = None - path: str | None = None - same_site: Any | None = None - secure: bool | None = None - - -@dataclass -class UrlPatternPattern: - """UrlPatternPattern.""" - - type: str = field(default="pattern", init=False) - protocol: str | None = None - hostname: str | None = None - port: str | None = None - pathname: str | None = None - search: str | None = None - - -@dataclass -class UrlPatternString: - """UrlPatternString.""" - - type: str = field(default="string", init=False) - pattern: str | None = None - - -@dataclass -class AddDataCollectorParameters: - """AddDataCollectorParameters.""" - - data_types: list[Any] = field(default_factory=list) - max_encoded_data_size: Any | None = None - collector_type: Any | None = None - contexts: list[Any] = field(default_factory=list) - user_contexts: list[Any] = field(default_factory=list) - - -@dataclass -class AddDataCollectorResult: - """AddDataCollectorResult.""" - - collector: Any | None = None - - -@dataclass -class AddInterceptParameters: - """AddInterceptParameters.""" - - phases: list[Any] = field(default_factory=list) - contexts: list[Any] = field(default_factory=list) - url_patterns: list[Any] = field(default_factory=list) - - -@dataclass -class AddInterceptResult: - """AddInterceptResult.""" - - intercept: Any | None = None - - -@dataclass -class ContinueResponseParameters: - """ContinueResponseParameters.""" - - request: Any | None = None - cookies: list[Any] = field(default_factory=list) - credentials: Any | None = None - headers: list[Any] = field(default_factory=list) - reason_phrase: str | None = None - status_code: Any | None = None - - -@dataclass -class ContinueWithAuthParameters: - """ContinueWithAuthParameters.""" - - request: Any | None = None - - -@dataclass -class ContinueWithAuthCredentials: - """ContinueWithAuthCredentials.""" - - action: str = field(default="provideCredentials", init=False) - credentials: Any | None = None - - -@dataclass -class FailRequestParameters: - """FailRequestParameters.""" - - request: Any | None = None - - -@dataclass -class GetDataParameters: - """GetDataParameters.""" - - data_type: Any | None = None - collector: Any | None = None - disown: bool | None = None - request: Any | None = None - - -@dataclass -class GetDataResult: - """GetDataResult.""" - - bytes: Any | None = None - - -@dataclass -class ProvideResponseParameters: - """ProvideResponseParameters.""" - - request: Any | None = None - body: Any | None = None - cookies: list[Any] = field(default_factory=list) - headers: list[Any] = field(default_factory=list) - reason_phrase: str | None = None - status_code: Any | None = None - - -@dataclass -class RemoveDataCollectorParameters: - """RemoveDataCollectorParameters.""" - - collector: Any | None = None - - -@dataclass -class RemoveInterceptParameters: - """RemoveInterceptParameters.""" - - intercept: Any | None = None - - -@dataclass -class SetCacheBehaviorParameters: - """SetCacheBehaviorParameters.""" - - cache_behavior: Any | None = None - contexts: list[Any] = field(default_factory=list) - - -@dataclass -class SetExtraHeadersParameters: - """SetExtraHeadersParameters.""" - - headers: list[Any] = field(default_factory=list) - contexts: list[Any] = field(default_factory=list) - user_contexts: list[Any] = field(default_factory=list) - - -@dataclass -class ResponseStartedParameters: - """ResponseStartedParameters.""" - - response: Any | None = None - - -@dataclass -class DisownDataParameters: - """DisownDataParameters.""" - - data_type: Any | None = None - collector: Any | None = None - request: Any | None = None - - -# Backward-compatible alias for existing imports -disownDataParameters = DisownDataParameters - - -class BytesValue: - """A string or base64-encoded bytes value used in cookie operations. - - This corresponds to network.BytesValue in the WebDriver BiDi specification, - wrapping either a plain string or a base64-encoded binary value. - """ - - TYPE_STRING = "string" - TYPE_BASE64 = "base64" - - def __init__(self, type: Any | None, value: Any | None) -> None: - self.type = type - self.value = value - - def to_bidi_dict(self) -> dict: - return {"type": self.type, "value": self.value} - - -class Request: - """Wraps a BiDi network request event params and provides request action methods.""" - - def __init__(self, conn, params): - self._conn = conn - self._params = params if isinstance(params, dict) else {} - req = self._params.get("request", {}) or {} - self.url = req.get("url", "") - self._request_id = req.get("request") - - def continue_request(self, **kwargs): - """Continue the intercepted request.""" - from selenium.webdriver.common.bidi.common import command_builder as _cb - - params = {"request": self._request_id} - params.update(kwargs) - self._conn.execute(_cb("network.continueRequest", params)) - - -# BiDi Event Name to Parameter Type Mapping -EVENT_NAME_MAPPING = { - "auth_required": "network.authRequired", - "before_request": "network.beforeRequestSent", -} - - -class Network: - """WebDriver BiDi network module.""" - - EVENT_CONFIGS: dict[str, EventConfig] = {} - - def __init__(self, conn) -> None: - self._conn = conn - self._event_manager = _EventManager(conn, self.EVENT_CONFIGS) - self.intercepts: list[Any] = [] - self._handler_intercepts: dict[str, Any] = {} - - def add_data_collector( - self, - data_types: list[Any] | None = None, - max_encoded_data_size: Any | None = None, - collector_type: Any | None = None, - contexts: list[Any] | None = None, - user_contexts: list[Any] | None = None, - ): - """Execute network.addDataCollector.""" - if data_types is None: - raise TypeError("add_data_collector() missing required argument: 'data_types'") - if max_encoded_data_size is None: - raise TypeError("add_data_collector() missing required argument: 'max_encoded_data_size'") - - params = { - "dataTypes": data_types, - "maxEncodedDataSize": max_encoded_data_size, - "collectorType": collector_type, - "contexts": contexts, - "userContexts": user_contexts, - } - params = {k: v for k, v in params.items() if v is not None} - cmd = command_builder("network.addDataCollector", params) - result = self._conn.execute(cmd) - return result - - def add_intercept( - self, - phases: list[Any] | None = None, - contexts: list[Any] | None = None, - url_patterns: list[Any] | None = None, - ): - """Execute network.addIntercept.""" - if phases is None: - raise TypeError("add_intercept() missing required argument: 'phases'") - - params = { - "phases": phases, - "contexts": contexts, - "urlPatterns": url_patterns, - } - params = {k: v for k, v in params.items() if v is not None} - cmd = command_builder("network.addIntercept", params) - result = self._conn.execute(cmd) - return result - - def continue_request( - self, - request: Any | None = None, - body: Any | None = None, - cookies: list[Any] | None = None, - headers: list[Any] | None = None, - method: Any | None = None, - url: Any | None = None, - ): - """Execute network.continueRequest.""" - if request is None: - raise TypeError("continue_request() missing required argument: 'request'") - - params = { - "request": request, - "body": body, - "cookies": cookies, - "headers": headers, - "method": method, - "url": url, - } - params = {k: v for k, v in params.items() if v is not None} - cmd = command_builder("network.continueRequest", params) - result = self._conn.execute(cmd) - return result - - def continue_response( - self, - request: Any | None = None, - cookies: list[Any] | None = None, - credentials: Any | None = None, - headers: list[Any] | None = None, - reason_phrase: Any | None = None, - status_code: Any | None = None, - ): - """Execute network.continueResponse.""" - if request is None: - raise TypeError("continue_response() missing required argument: 'request'") - - params = { - "request": request, - "cookies": cookies, - "credentials": credentials, - "headers": headers, - "reasonPhrase": reason_phrase, - "statusCode": status_code, - } - params = {k: v for k, v in params.items() if v is not None} - cmd = command_builder("network.continueResponse", params) - result = self._conn.execute(cmd) - return result - - def continue_with_auth(self, request: Any | None = None): - """Execute network.continueWithAuth.""" - if request is None: - raise TypeError("continue_with_auth() missing required argument: 'request'") - - params = { - "request": request, - } - params = {k: v for k, v in params.items() if v is not None} - cmd = command_builder("network.continueWithAuth", params) - result = self._conn.execute(cmd) - return result - - def disown_data(self, data_type: Any | None = None, collector: Any | None = None, request: Any | None = None): - """Execute network.disownData.""" - if data_type is None: - raise TypeError("disown_data() missing required argument: 'data_type'") - if collector is None: - raise TypeError("disown_data() missing required argument: 'collector'") - if request is None: - raise TypeError("disown_data() missing required argument: 'request'") - - params = { - "dataType": data_type, - "collector": collector, - "request": request, - } - params = {k: v for k, v in params.items() if v is not None} - cmd = command_builder("network.disownData", params) - result = self._conn.execute(cmd) - return result - - def fail_request(self, request: Any | None = None): - """Execute network.failRequest.""" - if request is None: - raise TypeError("fail_request() missing required argument: 'request'") - - params = { - "request": request, - } - params = {k: v for k, v in params.items() if v is not None} - cmd = command_builder("network.failRequest", params) - result = self._conn.execute(cmd) - return result - - def get_data( - self, - data_type: Any | None = None, - collector: Any | None = None, - disown: bool | None = None, - request: Any | None = None, - ): - """Execute network.getData.""" - if data_type is None: - raise TypeError("get_data() missing required argument: 'data_type'") - if request is None: - raise TypeError("get_data() missing required argument: 'request'") - - params = { - "dataType": data_type, - "collector": collector, - "disown": disown, - "request": request, - } - params = {k: v for k, v in params.items() if v is not None} - cmd = command_builder("network.getData", params) - result = self._conn.execute(cmd) - return result - - def provide_response( - self, - request: Any | None = None, - body: Any | None = None, - cookies: list[Any] | None = None, - headers: list[Any] | None = None, - reason_phrase: Any | None = None, - status_code: Any | None = None, - ): - """Execute network.provideResponse.""" - if request is None: - raise TypeError("provide_response() missing required argument: 'request'") - - params = { - "request": request, - "body": body, - "cookies": cookies, - "headers": headers, - "reasonPhrase": reason_phrase, - "statusCode": status_code, - } - params = {k: v for k, v in params.items() if v is not None} - cmd = command_builder("network.provideResponse", params) - result = self._conn.execute(cmd) - return result - - def remove_data_collector(self, collector: Any | None = None): - """Execute network.removeDataCollector.""" - if collector is None: - raise TypeError("remove_data_collector() missing required argument: 'collector'") - - params = { - "collector": collector, - } - params = {k: v for k, v in params.items() if v is not None} - cmd = command_builder("network.removeDataCollector", params) - result = self._conn.execute(cmd) - return result - - def remove_intercept(self, intercept: Any | None = None): - """Execute network.removeIntercept.""" - if intercept is None: - raise TypeError("remove_intercept() missing required argument: 'intercept'") - - params = { - "intercept": intercept, - } - params = {k: v for k, v in params.items() if v is not None} - cmd = command_builder("network.removeIntercept", params) - result = self._conn.execute(cmd) - return result - - def set_cache_behavior(self, cache_behavior: Any | None = None, contexts: list[Any] | None = None): - """Execute network.setCacheBehavior.""" - if cache_behavior is None: - raise TypeError("set_cache_behavior() missing required argument: 'cache_behavior'") - - params = { - "cacheBehavior": cache_behavior, - "contexts": contexts, - } - params = {k: v for k, v in params.items() if v is not None} - cmd = command_builder("network.setCacheBehavior", params) - result = self._conn.execute(cmd) - return result - - def set_extra_headers( - self, - headers: list[Any] | None = None, - contexts: list[Any] | None = None, - user_contexts: list[Any] | None = None, - ): - """Execute network.setExtraHeaders.""" - if headers is None: - raise TypeError("set_extra_headers() missing required argument: 'headers'") - - params = { - "headers": headers, - "contexts": contexts, - "userContexts": user_contexts, - } - params = {k: v for k, v in params.items() if v is not None} - cmd = command_builder("network.setExtraHeaders", params) - result = self._conn.execute(cmd) - return result - - def before_request_sent(self, initiator: Any | None = None, method: Any | None = None, params: Any | None = None): - """Execute network.beforeRequestSent.""" - if method is None: - raise TypeError("before_request_sent() missing required argument: 'method'") - if params is None: - raise TypeError("before_request_sent() missing required argument: 'params'") - - params = { - "initiator": initiator, - "method": method, - "params": params, - } - params = {k: v for k, v in params.items() if v is not None} - cmd = command_builder("network.beforeRequestSent", params) - result = self._conn.execute(cmd) - return result - - def fetch_error(self, error_text: Any | None = None, method: Any | None = None, params: Any | None = None): - """Execute network.fetchError.""" - if error_text is None: - raise TypeError("fetch_error() missing required argument: 'error_text'") - if method is None: - raise TypeError("fetch_error() missing required argument: 'method'") - if params is None: - raise TypeError("fetch_error() missing required argument: 'params'") - - params = { - "errorText": error_text, - "method": method, - "params": params, - } - params = {k: v for k, v in params.items() if v is not None} - cmd = command_builder("network.fetchError", params) - result = self._conn.execute(cmd) - return result - - def response_completed(self, response: Any | None = None, method: Any | None = None, params: Any | None = None): - """Execute network.responseCompleted.""" - if response is None: - raise TypeError("response_completed() missing required argument: 'response'") - if method is None: - raise TypeError("response_completed() missing required argument: 'method'") - if params is None: - raise TypeError("response_completed() missing required argument: 'params'") - - params = { - "response": response, - "method": method, - "params": params, - } - params = {k: v for k, v in params.items() if v is not None} - cmd = command_builder("network.responseCompleted", params) - result = self._conn.execute(cmd) - return result - - def response_started(self, response: Any | None = None): - """Execute network.responseStarted.""" - if response is None: - raise TypeError("response_started() missing required argument: 'response'") - - params = { - "response": response, - } - params = {k: v for k, v in params.items() if v is not None} - cmd = command_builder("network.responseStarted", params) - result = self._conn.execute(cmd) - return result - - def _add_intercept(self, phases=None, url_patterns=None): - """Add a low-level network intercept. - - Args: - phases: list of intercept phases (default: ["beforeRequestSent"]) - url_patterns: optional URL patterns to filter - - Returns: - dict with "intercept" key containing the intercept ID - """ - from selenium.webdriver.common.bidi.common import command_builder as _cb - - if phases is None: - phases = ["beforeRequestSent"] - params = {"phases": phases} - if url_patterns: - params["urlPatterns"] = url_patterns - result = self._conn.execute(_cb("network.addIntercept", params)) - if result: - intercept_id = result.get("intercept") - if intercept_id and intercept_id not in self.intercepts: - self.intercepts.append(intercept_id) - return result - - def _remove_intercept(self, intercept_id): - """Remove a low-level network intercept.""" - from selenium.webdriver.common.bidi.common import command_builder as _cb - - self._conn.execute(_cb("network.removeIntercept", {"intercept": intercept_id})) - if intercept_id in self.intercepts: - self.intercepts.remove(intercept_id) - - def add_request_handler(self, event, callback, url_patterns=None): - """Add a handler for network requests at the specified phase. - - Args: - event: Event name, e.g. ``"before_request"``. - callback: Callable receiving a :class:`Request` instance. - url_patterns: optional list of URL pattern dicts to filter. - - Returns: - callback_id int for later removal via remove_request_handler. - """ - phase_map = { - "before_request": "beforeRequestSent", - "before_request_sent": "beforeRequestSent", - "response_started": "responseStarted", - "auth_required": "authRequired", - } - phase = phase_map.get(event, "beforeRequestSent") - intercept_result = self._add_intercept(phases=[phase], url_patterns=url_patterns) - intercept_id = intercept_result.get("intercept") if intercept_result else None - - def _request_callback(params): - raw = params if isinstance(params, dict) else (params.__dict__ if hasattr(params, "__dict__") else {}) - request = Request(self._conn, raw) - callback(request) - - callback_id = self.add_event_handler(event, _request_callback) - if intercept_id: - self._handler_intercepts[callback_id] = intercept_id - return callback_id - - def remove_request_handler(self, event, callback_id): - """Remove a network request handler and its associated network intercept. - - Args: - event: The event name used when adding the handler. - callback_id: The int returned by add_request_handler. - """ - self.remove_event_handler(event, callback_id) - intercept_id = self._handler_intercepts.pop(callback_id, None) - if intercept_id: - self._remove_intercept(intercept_id) - - def clear_request_handlers(self): - """Clear all request handlers and remove all tracked intercepts.""" - self.clear_event_handlers() - for intercept_id in list(self.intercepts): - self._remove_intercept(intercept_id) - - def add_auth_handler(self, username, password): - """Add an auth handler that automatically provides credentials. - - Args: - username: The username for basic authentication. - password: The password for basic authentication. - - Returns: - callback_id int for later removal via remove_auth_handler. - """ - from selenium.webdriver.common.bidi.common import command_builder as _cb - - # Set up network intercept for authRequired phase - intercept_result = self._add_intercept(phases=["authRequired"]) - intercept_id = intercept_result.get("intercept") if intercept_result else None - - def _auth_callback(params): - raw = params if isinstance(params, dict) else (params.__dict__ if hasattr(params, "__dict__") else {}) - request_id = raw.get("request", {}).get("request") if isinstance(raw, dict) else None - if request_id: - self._conn.execute( - _cb( - "network.continueWithAuth", - { - "request": request_id, - "action": "provideCredentials", - "credentials": { - "type": "password", - "username": username, - "password": password, - }, - }, - ) - ) - - callback_id = self.add_event_handler("auth_required", _auth_callback) - if intercept_id: - self._handler_intercepts[callback_id] = intercept_id - return callback_id - - def remove_auth_handler(self, callback_id): - """Remove an auth handler by callback ID and its associated network intercept. - - Args: - callback_id: The handler ID returned by add_auth_handler. - """ - self.remove_event_handler("auth_required", callback_id) - intercept_id = self._handler_intercepts.pop(callback_id, None) - if intercept_id: - self._remove_intercept(intercept_id) - - def add_event_handler(self, event: str, callback: Callable, contexts: list[str] | None = None) -> int: - """Add an event handler. - - Args: - event: The event to subscribe to. - callback: The callback function to execute on event. - contexts: The context IDs to subscribe to (optional). - - Returns: - The callback ID. - """ - return self._event_manager.add_event_handler(event, callback, contexts) - - def remove_event_handler(self, event: str, callback_id: int) -> None: - """Remove an event handler. - - Args: - event: The event to unsubscribe from. - callback_id: The callback ID. - """ - return self._event_manager.remove_event_handler(event, callback_id) - - def clear_event_handlers(self) -> None: - """Clear all event handlers.""" - return self._event_manager.clear_event_handlers() - - -# Event Info Type Aliases -# Event: network.authRequired -AuthRequired = globals().get("AuthRequiredParameters", dict) # Fallback to dict if type not defined - - -# Populate EVENT_CONFIGS with event configuration mappings -_globals = globals() -Network.EVENT_CONFIGS = { - "auth_required": EventConfig( - "auth_required", - "network.authRequired", - _globals.get("AuthRequired", dict) if _globals.get("AuthRequired") else dict, - ), - "before_request": EventConfig("before_request", "network.beforeRequestSent", _globals.get("dict", dict)), -}
diff --git a/selenium/webdriver/common/bidi/permissions.py b/selenium/webdriver/common/bidi/permissions.py deleted file mode 100644 index 98e25a1..0000000 --- a/selenium/webdriver/common/bidi/permissions.py +++ /dev/null
@@ -1,103 +0,0 @@ -# Licensed to the Software Freedom Conservancy (SFC) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The SFC licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -"""WebDriver BiDi Permissions module.""" - -from __future__ import annotations - -from enum import Enum -from typing import Any - -from selenium.webdriver.common.bidi.common import command_builder - -_VALID_PERMISSION_STATES = {"granted", "denied", "prompt"} - - -class PermissionState(str, Enum): - """Permission state enumeration.""" - - GRANTED = "granted" - DENIED = "denied" - PROMPT = "prompt" - - -class PermissionDescriptor: - """Descriptor for a permission.""" - - def __init__(self, name: str) -> None: - """Initialize a PermissionDescriptor. - - Args: - name: The name of the permission (e.g., 'geolocation', 'microphone', 'camera') - """ - self.name = name - - def __repr__(self) -> str: - return f"PermissionDescriptor('{self.name}')" - - -class Permissions: - """WebDriver BiDi Permissions module.""" - - def __init__(self, websocket_connection: Any) -> None: - """Initialize the Permissions module. - - Args: - websocket_connection: The WebSocket connection for sending BiDi commands - """ - self._conn = websocket_connection - - def set_permission( - self, - descriptor: PermissionDescriptor | str, - state: PermissionState | str, - origin: str | None = None, - user_context: str | None = None, - ) -> None: - """Set a permission for a given origin. - - Args: - descriptor: The permission descriptor or permission name as a string - state: The desired permission state - origin: The origin for which to set the permission - user_context: Optional user context ID to scope the permission - - Raises: - ValueError: If the state is not a valid permission state - """ - state_value = state.value if isinstance(state, PermissionState) else state - if state_value not in _VALID_PERMISSION_STATES: - raise ValueError( - f"Invalid permission state: {state_value!r}. Must be one of {sorted(_VALID_PERMISSION_STATES)}" - ) - - if isinstance(descriptor, str): - descriptor_dict = {"name": descriptor} - else: - descriptor_dict = {"name": descriptor.name} - - params: dict[str, Any] = { - "descriptor": descriptor_dict, - "state": state_value, - } - if origin is not None: - params["origin"] = origin - if user_context is not None: - params["userContext"] = user_context - - cmd = command_builder("permissions.setPermission", params) - self._conn.execute(cmd)
diff --git a/selenium/webdriver/common/bidi/py.typed b/selenium/webdriver/common/bidi/py.typed deleted file mode 100644 index e69de29..0000000 --- a/selenium/webdriver/common/bidi/py.typed +++ /dev/null
diff --git a/selenium/webdriver/common/bidi/script.py b/selenium/webdriver/common/bidi/script.py deleted file mode 100644 index ee6eb4f..0000000 --- a/selenium/webdriver/common/bidi/script.py +++ /dev/null
@@ -1,1230 +0,0 @@ -# Licensed to the Software Freedom Conservancy (SFC) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The SFC licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - - -from __future__ import annotations - -from collections.abc import Callable -from dataclasses import dataclass, field -from typing import Any - -from selenium.webdriver.common.bidi._event_manager import EventConfig, _EventManager -from selenium.webdriver.common.bidi.common import command_builder - - -class SpecialNumber: - """SpecialNumber.""" - - NAN = "NaN" - _0 = "-0" - INFINITY = "Infinity" - _INFINITY = "-Infinity" - - -class RealmType: - """RealmType.""" - - WINDOW = "window" - DEDICATED_WORKER = "dedicated-worker" - SHARED_WORKER = "shared-worker" - SERVICE_WORKER = "service-worker" - WORKER = "worker" - PAINT_WORKLET = "paint-worklet" - AUDIO_WORKLET = "audio-worklet" - WORKLET = "worklet" - - -class ResultOwnership: - """ResultOwnership.""" - - ROOT = "root" - NONE = "none" - - -@dataclass -class ChannelValue: - """ChannelValue.""" - - type: str = field(default="channel", init=False) - value: Any | None = None - - -@dataclass -class ChannelProperties: - """ChannelProperties.""" - - channel: Any | None = None - serialization_options: Any | None = None - ownership: Any | None = None - - -@dataclass -class EvaluateResultSuccess: - """EvaluateResultSuccess.""" - - type: str = field(default="success", init=False) - result: Any | None = None - realm: Any | None = None - - -@dataclass -class EvaluateResultException: - """EvaluateResultException.""" - - type: str = field(default="exception", init=False) - exception_details: Any | None = None - realm: Any | None = None - - -@dataclass -class ExceptionDetails: - """ExceptionDetails.""" - - column_number: Any | None = None - exception: Any | None = None - line_number: Any | None = None - stack_trace: Any | None = None - text: str | None = None - - -@dataclass -class ArrayLocalValue: - """ArrayLocalValue.""" - - type: str = field(default="array", init=False) - value: Any | None = None - - -@dataclass -class DateLocalValue: - """DateLocalValue.""" - - type: str = field(default="date", init=False) - value: str | None = None - - -@dataclass -class MapLocalValue: - """MapLocalValue.""" - - type: str = field(default="map", init=False) - value: Any | None = None - - -@dataclass -class ObjectLocalValue: - """ObjectLocalValue.""" - - type: str = field(default="object", init=False) - value: Any | None = None - - -@dataclass -class RegExpValue: - """RegExpValue.""" - - pattern: str | None = None - flags: str | None = None - - -@dataclass -class RegExpLocalValue: - """RegExpLocalValue.""" - - type: str = field(default="regexp", init=False) - value: Any | None = None - - -@dataclass -class SetLocalValue: - """SetLocalValue.""" - - type: str = field(default="set", init=False) - value: Any | None = None - - -@dataclass -class UndefinedValue: - """UndefinedValue.""" - - type: str = field(default="undefined", init=False) - - -@dataclass -class NullValue: - """NullValue.""" - - type: str = field(default="null", init=False) - - -@dataclass -class StringValue: - """StringValue.""" - - type: str = field(default="string", init=False) - value: str | None = None - - -@dataclass -class NumberValue: - """NumberValue.""" - - type: str = field(default="number", init=False) - value: Any | None = None - - -@dataclass -class BooleanValue: - """BooleanValue.""" - - type: str = field(default="boolean", init=False) - value: bool | None = None - - -@dataclass -class BigIntValue: - """BigIntValue.""" - - type: str = field(default="bigint", init=False) - value: str | None = None - - -@dataclass -class BaseRealmInfo: - """BaseRealmInfo.""" - - realm: Any | None = None - origin: str | None = None - - -@dataclass -class WindowRealmInfo: - """WindowRealmInfo.""" - - type: str = field(default="window", init=False) - context: Any | None = None - user_context: Any | None = None - sandbox: str | None = None - - -@dataclass -class DedicatedWorkerRealmInfo: - """DedicatedWorkerRealmInfo.""" - - type: str = field(default="dedicated-worker", init=False) - owners: list[Any] = field(default_factory=list) - - -@dataclass -class SharedWorkerRealmInfo: - """SharedWorkerRealmInfo.""" - - type: str = field(default="shared-worker", init=False) - - -@dataclass -class ServiceWorkerRealmInfo: - """ServiceWorkerRealmInfo.""" - - type: str = field(default="service-worker", init=False) - - -@dataclass -class WorkerRealmInfo: - """WorkerRealmInfo.""" - - type: str = field(default="worker", init=False) - - -@dataclass -class PaintWorkletRealmInfo: - """PaintWorkletRealmInfo.""" - - type: str = field(default="paint-worklet", init=False) - - -@dataclass -class AudioWorkletRealmInfo: - """AudioWorkletRealmInfo.""" - - type: str = field(default="audio-worklet", init=False) - - -@dataclass -class WorkletRealmInfo: - """WorkletRealmInfo.""" - - type: str = field(default="worklet", init=False) - - -@dataclass -class SharedReference: - """SharedReference.""" - - shared_id: Any | None = None - handle: Any | None = None - - -@dataclass -class RemoteObjectReference: - """RemoteObjectReference.""" - - handle: Any | None = None - shared_id: Any | None = None - - -@dataclass -class SymbolRemoteValue: - """SymbolRemoteValue.""" - - type: str = field(default="symbol", init=False) - handle: Any | None = None - internal_id: Any | None = None - - -@dataclass -class ArrayRemoteValue: - """ArrayRemoteValue.""" - - type: str = field(default="array", init=False) - handle: Any | None = None - internal_id: Any | None = None - value: Any | None = None - - -@dataclass -class ObjectRemoteValue: - """ObjectRemoteValue.""" - - type: str = field(default="object", init=False) - handle: Any | None = None - internal_id: Any | None = None - value: Any | None = None - - -@dataclass -class FunctionRemoteValue: - """FunctionRemoteValue.""" - - type: str = field(default="function", init=False) - handle: Any | None = None - internal_id: Any | None = None - - -@dataclass -class RegExpRemoteValue: - """RegExpRemoteValue.""" - - handle: Any | None = None - internal_id: Any | None = None - - -@dataclass -class DateRemoteValue: - """DateRemoteValue.""" - - handle: Any | None = None - internal_id: Any | None = None - - -@dataclass -class MapRemoteValue: - """MapRemoteValue.""" - - type: str = field(default="map", init=False) - handle: Any | None = None - internal_id: Any | None = None - value: Any | None = None - - -@dataclass -class SetRemoteValue: - """SetRemoteValue.""" - - type: str = field(default="set", init=False) - handle: Any | None = None - internal_id: Any | None = None - value: Any | None = None - - -@dataclass -class WeakMapRemoteValue: - """WeakMapRemoteValue.""" - - type: str = field(default="weakmap", init=False) - handle: Any | None = None - internal_id: Any | None = None - - -@dataclass -class WeakSetRemoteValue: - """WeakSetRemoteValue.""" - - type: str = field(default="weakset", init=False) - handle: Any | None = None - internal_id: Any | None = None - - -@dataclass -class GeneratorRemoteValue: - """GeneratorRemoteValue.""" - - type: str = field(default="generator", init=False) - handle: Any | None = None - internal_id: Any | None = None - - -@dataclass -class ErrorRemoteValue: - """ErrorRemoteValue.""" - - type: str = field(default="error", init=False) - handle: Any | None = None - internal_id: Any | None = None - - -@dataclass -class ProxyRemoteValue: - """ProxyRemoteValue.""" - - type: str = field(default="proxy", init=False) - handle: Any | None = None - internal_id: Any | None = None - - -@dataclass -class PromiseRemoteValue: - """PromiseRemoteValue.""" - - type: str = field(default="promise", init=False) - handle: Any | None = None - internal_id: Any | None = None - - -@dataclass -class TypedArrayRemoteValue: - """TypedArrayRemoteValue.""" - - type: str = field(default="typedarray", init=False) - handle: Any | None = None - internal_id: Any | None = None - - -@dataclass -class ArrayBufferRemoteValue: - """ArrayBufferRemoteValue.""" - - type: str = field(default="arraybuffer", init=False) - handle: Any | None = None - internal_id: Any | None = None - - -@dataclass -class NodeListRemoteValue: - """NodeListRemoteValue.""" - - type: str = field(default="nodelist", init=False) - handle: Any | None = None - internal_id: Any | None = None - value: Any | None = None - - -@dataclass -class HTMLCollectionRemoteValue: - """HTMLCollectionRemoteValue.""" - - type: str = field(default="htmlcollection", init=False) - handle: Any | None = None - internal_id: Any | None = None - value: Any | None = None - - -@dataclass -class NodeRemoteValue: - """NodeRemoteValue.""" - - type: str = field(default="node", init=False) - shared_id: Any | None = None - handle: Any | None = None - internal_id: Any | None = None - value: Any | None = None - - -@dataclass -class NodeProperties: - """NodeProperties.""" - - node_type: Any | None = None - child_node_count: Any | None = None - children: list[Any] = field(default_factory=list) - local_name: str | None = None - mode: Any | None = None - namespace_uri: str | None = None - node_value: str | None = None - shadow_root: Any | None = None - - -@dataclass -class WindowProxyRemoteValue: - """WindowProxyRemoteValue.""" - - type: str = field(default="window", init=False) - value: Any | None = None - handle: Any | None = None - internal_id: Any | None = None - - -@dataclass -class WindowProxyProperties: - """WindowProxyProperties.""" - - context: Any | None = None - - -@dataclass -class StackFrame: - """StackFrame.""" - - column_number: Any | None = None - function_name: str | None = None - line_number: Any | None = None - url: str | None = None - - -@dataclass -class StackTrace: - """StackTrace.""" - - call_frames: list[Any] = field(default_factory=list) - - -@dataclass -class Source: - """Source.""" - - realm: Any | None = None - context: Any | None = None - user_context: Any | None = None - - -@dataclass -class RealmTarget: - """RealmTarget.""" - - realm: Any | None = None - - -@dataclass -class ContextTarget: - """ContextTarget.""" - - context: Any | None = None - sandbox: str | None = None - - -@dataclass -class AddPreloadScriptParameters: - """AddPreloadScriptParameters.""" - - function_declaration: str | None = None - arguments: list[Any] = field(default_factory=list) - contexts: list[Any] = field(default_factory=list) - user_contexts: list[Any] = field(default_factory=list) - sandbox: str | None = None - - -@dataclass -class AddPreloadScriptResult: - """AddPreloadScriptResult.""" - - script: Any | None = None - - -@dataclass -class DisownParameters: - """DisownParameters.""" - - handles: list[Any] = field(default_factory=list) - target: Any | None = None - - -@dataclass -class CallFunctionParameters: - """CallFunctionParameters.""" - - function_declaration: str | None = None - await_promise: bool | None = None - target: Any | None = None - arguments: list[Any] = field(default_factory=list) - result_ownership: Any | None = None - serialization_options: Any | None = None - this: Any | None = None - user_activation: bool | None = None - - -@dataclass -class EvaluateParameters: - """EvaluateParameters.""" - - expression: str | None = None - target: Any | None = None - await_promise: bool | None = None - result_ownership: Any | None = None - serialization_options: Any | None = None - user_activation: bool | None = None - - -@dataclass -class GetRealmsParameters: - """GetRealmsParameters.""" - - context: Any | None = None - type: Any | None = None - - -@dataclass -class GetRealmsResult: - """GetRealmsResult.""" - - realms: list[Any] = field(default_factory=list) - - -@dataclass -class RemovePreloadScriptParameters: - """RemovePreloadScriptParameters.""" - - script: Any | None = None - - -@dataclass -class MessageParameters: - """MessageParameters.""" - - channel: Any | None = None - data: Any | None = None - source: Any | None = None - - -@dataclass -class RealmDestroyedParameters: - """RealmDestroyedParameters.""" - - realm: Any | None = None - - -# BiDi Event Name to Parameter Type Mapping -EVENT_NAME_MAPPING = { - "realm_created": "script.realmCreated", - "realm_destroyed": "script.realmDestroyed", -} - - -class Script: - """WebDriver BiDi script module.""" - - EVENT_CONFIGS: dict[str, EventConfig] = {} - - def __init__(self, conn, driver=None) -> None: - self._conn = conn - self._driver = driver - self._event_manager = _EventManager(conn, self.EVENT_CONFIGS) - - def add_preload_script( - self, - function_declaration: Any | None = None, - arguments: list[Any] | None = None, - contexts: list[Any] | None = None, - user_contexts: list[Any] | None = None, - sandbox: Any | None = None, - ): - """Execute script.addPreloadScript.""" - if function_declaration is None: - raise TypeError("add_preload_script() missing required argument: 'function_declaration'") - - params = { - "functionDeclaration": function_declaration, - "arguments": arguments, - "contexts": contexts, - "userContexts": user_contexts, - "sandbox": sandbox, - } - params = {k: v for k, v in params.items() if v is not None} - cmd = command_builder("script.addPreloadScript", params) - result = self._conn.execute(cmd) - return result - - def disown(self, handles: list[Any] | None = None, target: Any | None = None): - """Execute script.disown.""" - if handles is None: - raise TypeError("disown() missing required argument: 'handles'") - if target is None: - raise TypeError("disown() missing required argument: 'target'") - - params = { - "handles": handles, - "target": target, - } - params = {k: v for k, v in params.items() if v is not None} - cmd = command_builder("script.disown", params) - result = self._conn.execute(cmd) - return result - - def call_function( - self, - function_declaration: Any | None = None, - await_promise: bool | None = None, - target: Any | None = None, - arguments: list[Any] | None = None, - result_ownership: Any | None = None, - serialization_options: Any | None = None, - this: Any | None = None, - user_activation: bool | None = None, - ): - """Execute script.callFunction.""" - if function_declaration is None: - raise TypeError("call_function() missing required argument: 'function_declaration'") - if await_promise is None: - raise TypeError("call_function() missing required argument: 'await_promise'") - if target is None: - raise TypeError("call_function() missing required argument: 'target'") - - params = { - "functionDeclaration": function_declaration, - "awaitPromise": await_promise, - "target": target, - "arguments": arguments, - "resultOwnership": result_ownership, - "serializationOptions": serialization_options, - "this": this, - "userActivation": user_activation, - } - params = {k: v for k, v in params.items() if v is not None} - cmd = command_builder("script.callFunction", params) - result = self._conn.execute(cmd) - return result - - def evaluate( - self, - expression: Any | None = None, - target: Any | None = None, - await_promise: bool | None = None, - result_ownership: Any | None = None, - serialization_options: Any | None = None, - user_activation: bool | None = None, - ): - """Execute script.evaluate.""" - if expression is None: - raise TypeError("evaluate() missing required argument: 'expression'") - if target is None: - raise TypeError("evaluate() missing required argument: 'target'") - if await_promise is None: - raise TypeError("evaluate() missing required argument: 'await_promise'") - - params = { - "expression": expression, - "target": target, - "awaitPromise": await_promise, - "resultOwnership": result_ownership, - "serializationOptions": serialization_options, - "userActivation": user_activation, - } - params = {k: v for k, v in params.items() if v is not None} - cmd = command_builder("script.evaluate", params) - result = self._conn.execute(cmd) - return result - - def get_realms(self, context: Any | None = None, type: Any | None = None): - """Execute script.getRealms.""" - params = { - "context": context, - "type": type, - } - params = {k: v for k, v in params.items() if v is not None} - cmd = command_builder("script.getRealms", params) - result = self._conn.execute(cmd) - return result - - def remove_preload_script(self, script: Any | None = None): - """Execute script.removePreloadScript.""" - if script is None: - raise TypeError("remove_preload_script() missing required argument: 'script'") - - params = { - "script": script, - } - params = {k: v for k, v in params.items() if v is not None} - cmd = command_builder("script.removePreloadScript", params) - result = self._conn.execute(cmd) - return result - - def message(self, channel: Any | None = None, data: Any | None = None, source: Any | None = None): - """Execute script.message.""" - if channel is None: - raise TypeError("message() missing required argument: 'channel'") - if data is None: - raise TypeError("message() missing required argument: 'data'") - if source is None: - raise TypeError("message() missing required argument: 'source'") - - params = { - "channel": channel, - "data": data, - "source": source, - } - params = {k: v for k, v in params.items() if v is not None} - cmd = command_builder("script.message", params) - result = self._conn.execute(cmd) - return result - - def execute(self, function_declaration: str, *args, context_id: str | None = None) -> Any: - """Execute a function declaration in the browser context. - - Args: - function_declaration: The function as a string, e.g. ``"() => document.title"``. - *args: Optional Python values to pass as arguments to the function. - Each value is serialised to a BiDi ``LocalValue`` automatically. - Supported types: ``None``, ``bool``, ``int``, ``float`` - (including ``NaN`` and ``Infinity``), ``str``, ``list``, - ``dict``, and ``datetime.datetime``. - context_id: The browsing context ID to run in. Defaults to the - driver's current window handle when a driver was provided. - - Returns: - The inner RemoteValue result dict, or raises WebDriverException on exception. - """ - import datetime as _datetime - import math as _math - - from selenium.common.exceptions import WebDriverException as _WebDriverException - - def _serialize_arg(value): - """Serialise a Python value to a BiDi LocalValue dict.""" - if value is None: - return {"type": "null"} - if isinstance(value, bool): - return {"type": "boolean", "value": value} - if isinstance(value, _datetime.datetime): - return {"type": "date", "value": value.isoformat()} - if isinstance(value, float): - if _math.isnan(value): - return {"type": "number", "value": "NaN"} - if _math.isinf(value): - return {"type": "number", "value": "Infinity" if value > 0 else "-Infinity"} - return {"type": "number", "value": value} - if isinstance(value, int): - _MAX_SAFE_INT = 9007199254740991 - if abs(value) > _MAX_SAFE_INT: - return {"type": "bigint", "value": str(value)} - return {"type": "number", "value": value} - if isinstance(value, str): - return {"type": "string", "value": value} - if isinstance(value, list): - return {"type": "array", "value": [_serialize_arg(v) for v in value]} - if isinstance(value, dict): - return {"type": "object", "value": [[str(k), _serialize_arg(v)] for k, v in value.items()]} - return value - - if context_id is None and self._driver is not None: - try: - context_id = self._driver.current_window_handle - except Exception: - pass - target = {"context": context_id} if context_id else {} - serialized_args = [_serialize_arg(a) for a in args] if args else None - raw = self.call_function( - function_declaration=function_declaration, - await_promise=True, - target=target, - arguments=serialized_args, - ) - if isinstance(raw, dict): - if raw.get("type") == "exception": - exc = raw.get("exceptionDetails", {}) - msg = exc.get("text", str(exc)) if isinstance(exc, dict) else str(exc) - raise _WebDriverException(msg) - if raw.get("type") == "success": - return raw.get("result") - return raw - - def _add_preload_script( - self, - function_declaration, - arguments=None, - contexts=None, - user_contexts=None, - sandbox=None, - ): - """Add a preload script with validation. - - Args: - function_declaration: The JS function to run on page load. - arguments: Optional list of BiDi arguments. - contexts: Optional list of browsing context IDs. - user_contexts: Optional list of user context IDs. - sandbox: Optional sandbox name. - - Returns: - script_id: The ID of the added preload script (str). - - Raises: - ValueError: If both contexts and user_contexts are specified. - """ - if contexts is not None and user_contexts is not None: - raise ValueError("Cannot specify both contexts and user_contexts") - result = self.add_preload_script( - function_declaration=function_declaration, - arguments=arguments, - contexts=contexts, - user_contexts=user_contexts, - sandbox=sandbox, - ) - if isinstance(result, dict): - return result.get("script") - return result - - def _remove_preload_script(self, script_id): - """Remove a preload script by ID. - - Args: - script_id: The ID of the preload script to remove. - """ - return self.remove_preload_script(script=script_id) - - def pin(self, function_declaration): - """Pin (add) a preload script that runs on every page load. - - Args: - function_declaration: The JS function to execute on page load. - - Returns: - script_id: The ID of the pinned script (str). - """ - return self._add_preload_script(function_declaration) - - def unpin(self, script_id): - """Unpin (remove) a previously pinned preload script. - - Args: - script_id: The ID returned by pin(). - """ - return self._remove_preload_script(script_id=script_id) - - def _evaluate( - self, - expression, - target, - await_promise, - result_ownership=None, - serialization_options=None, - user_activation=None, - ): - """Evaluate a script expression and return a structured result. - - Args: - expression: The JavaScript expression to evaluate. - target: A dict like {"context": <id>} or {"realm": <id>}. - await_promise: Whether to await a returned promise. - result_ownership: Optional result ownership setting. - serialization_options: Optional serialization options dict. - user_activation: Optional user activation flag. - - Returns: - An object with .realm, .result (dict or None), and .exception_details (or None). - """ - - class _EvalResult: - def __init__(self2, realm, result, exception_details): - self2.realm = realm - self2.result = result - self2.exception_details = exception_details - - raw = self.evaluate( - expression=expression, - target=target, - await_promise=await_promise, - result_ownership=result_ownership, - serialization_options=serialization_options, - user_activation=user_activation, - ) - if isinstance(raw, dict): - realm = raw.get("realm") - if raw.get("type") == "exception": - exc = raw.get("exceptionDetails") - return _EvalResult(realm=realm, result=None, exception_details=exc) - return _EvalResult(realm=realm, result=raw.get("result"), exception_details=None) - return _EvalResult(realm=None, result=raw, exception_details=None) - - def _call_function( - self, - function_declaration, - await_promise, - target, - arguments=None, - result_ownership=None, - this=None, - user_activation=None, - serialization_options=None, - ): - """Call a function and return a structured result. - - Args: - function_declaration: The JS function string. - await_promise: Whether to await the return value. - target: A dict like {"context": <id>}. - arguments: Optional list of BiDi arguments. - result_ownership: Optional result ownership. - this: Optional 'this' binding. - user_activation: Optional user activation flag. - serialization_options: Optional serialization options dict. - - Returns: - An object with .result (dict or None) and .exception_details (or None). - """ - - class _CallResult: - def __init__(self2, result, exception_details): - self2.result = result - self2.exception_details = exception_details - - raw = self.call_function( - function_declaration=function_declaration, - await_promise=await_promise, - target=target, - arguments=arguments, - result_ownership=result_ownership, - this=this, - user_activation=user_activation, - serialization_options=serialization_options, - ) - if isinstance(raw, dict): - if raw.get("type") == "exception": - exc = raw.get("exceptionDetails") - return _CallResult(result=None, exception_details=exc) - if raw.get("type") == "success": - return _CallResult(result=raw.get("result"), exception_details=None) - return _CallResult(result=raw, exception_details=None) - - def _get_realms(self, context=None, type=None): - """Get all realms, optionally filtered by context and type. - - Args: - context: Optional browsing context ID to filter by. - type: Optional realm type string to filter by (e.g. RealmType.WINDOW). - - Returns: - List of realm info objects with .realm, .origin, .type, .context attributes. - """ - - class _RealmInfo: - def __init__(self2, realm, origin, type_, context): - self2.realm = realm - self2.origin = origin - self2.type = type_ - self2.context = context - - raw = self.get_realms(context=context, type=type) - realms_list = raw.get("realms", []) if isinstance(raw, dict) else [] - result = [] - for r in realms_list: - if isinstance(r, dict): - result.append( - _RealmInfo( - realm=r.get("realm"), - origin=r.get("origin"), - type_=r.get("type"), - context=r.get("context"), - ) - ) - return result - - def _disown(self, handles, target): - """Disown handles in a browsing context. - - Args: - handles: List of handle strings to disown. - target: A dict like {"context": <id>}. - """ - return self.disown(handles=handles, target=target) - - def _subscribe_log_entry(self, callback, entry_type_filter=None): - """Subscribe to log.entryAdded BiDi events with optional type filtering.""" - import threading as _threading - - from selenium.webdriver.common.bidi import log as _log_mod - from selenium.webdriver.common.bidi.session import Session as _Session - - bidi_event = "log.entryAdded" - - if not hasattr(self, "_log_subscriptions"): - self._log_subscriptions = {} - self._log_lock = _threading.Lock() - - def _deserialize(params): - t = params.get("type") if isinstance(params, dict) else None - if t == "console": - cls = getattr(_log_mod, "ConsoleLogEntry", None) - if cls is not None and hasattr(cls, "from_json"): - try: - return cls.from_json(params) - except Exception: - pass - elif t == "javascript": - cls = getattr(_log_mod, "JavascriptLogEntry", None) - if cls is not None and hasattr(cls, "from_json"): - try: - return cls.from_json(params) - except Exception: - pass - return params - - def _wrapped(raw): - entry = _deserialize(raw) - if entry_type_filter is None: - callback(entry) - else: - t = getattr(entry, "type_", None) or (entry.get("type") if isinstance(entry, dict) else None) - if t == entry_type_filter: - callback(entry) - - class _BidiRef: - event_class = bidi_event - - def from_json(self2, p): - return p - - _wrapper = _BidiRef() - callback_id = self._conn.add_callback(_wrapper, _wrapped) - with self._log_lock: - if bidi_event not in self._log_subscriptions: - session = _Session(self._conn) - result = session.subscribe([bidi_event]) - sub_id = result.get("subscription") if isinstance(result, dict) else None - self._log_subscriptions[bidi_event] = { - "callbacks": [], - "subscription_id": sub_id, - } - self._log_subscriptions[bidi_event]["callbacks"].append(callback_id) - return callback_id - - def _unsubscribe_log_entry(self, callback_id): - """Unsubscribe a log entry callback by ID.""" - from selenium.webdriver.common.bidi.session import Session as _Session - - bidi_event = "log.entryAdded" - if not hasattr(self, "_log_subscriptions"): - return - - class _BidiRef: - event_class = bidi_event - - def from_json(self2, p): - return p - - _wrapper = _BidiRef() - self._conn.remove_callback(_wrapper, callback_id) - with self._log_lock: - entry = self._log_subscriptions.get(bidi_event) - if entry and callback_id in entry["callbacks"]: - entry["callbacks"].remove(callback_id) - if entry is not None and not entry["callbacks"]: - session = _Session(self._conn) - sub_id = entry.get("subscription_id") - if sub_id: - session.unsubscribe(subscriptions=[sub_id]) - else: - session.unsubscribe(events=[bidi_event]) - del self._log_subscriptions[bidi_event] - - def add_console_message_handler(self, callback: Callable) -> int: - """Add a handler for console log messages (log.entryAdded type=console). - - Args: - callback: Function called with a ConsoleLogEntry on each console message. - - Returns: - callback_id for use with remove_console_message_handler. - """ - return self._subscribe_log_entry(callback, entry_type_filter="console") - - def remove_console_message_handler(self, callback_id: int) -> None: - """Remove a console message handler by callback ID.""" - self._unsubscribe_log_entry(callback_id) - - def add_javascript_error_handler(self, callback: Callable) -> int: - """Add a handler for JavaScript error log messages (log.entryAdded type=javascript). - - Args: - callback: Function called with a JavascriptLogEntry on each JS error. - - Returns: - callback_id for use with remove_javascript_error_handler. - """ - return self._subscribe_log_entry(callback, entry_type_filter="javascript") - - def remove_javascript_error_handler(self, callback_id: int) -> None: - """Remove a JavaScript error handler by callback ID.""" - self._unsubscribe_log_entry(callback_id) - - def add_event_handler(self, event: str, callback: Callable, contexts: list[str] | None = None) -> int: - """Add an event handler. - - Args: - event: The event to subscribe to. - callback: The callback function to execute on event. - contexts: The context IDs to subscribe to (optional). - - Returns: - The callback ID. - """ - return self._event_manager.add_event_handler(event, callback, contexts) - - def remove_event_handler(self, event: str, callback_id: int) -> None: - """Remove an event handler. - - Args: - event: The event to unsubscribe from. - callback_id: The callback ID. - """ - return self._event_manager.remove_event_handler(event, callback_id) - - def clear_event_handlers(self) -> None: - """Clear all event handlers.""" - return self._event_manager.clear_event_handlers() - - -# Event Info Type Aliases -# Event: script.realmCreated -RealmCreated = globals().get("RealmInfo", dict) # Fallback to dict if type not defined - -# Event: script.realmDestroyed -RealmDestroyed = globals().get("RealmDestroyedParameters", dict) # Fallback to dict if type not defined - - -# Populate EVENT_CONFIGS with event configuration mappings -_globals = globals() -Script.EVENT_CONFIGS = { - "realm_created": EventConfig( - "realm_created", - "script.realmCreated", - _globals.get("RealmCreated", dict) if _globals.get("RealmCreated") else dict, - ), - "realm_destroyed": EventConfig( - "realm_destroyed", - "script.realmDestroyed", - _globals.get("RealmDestroyed", dict) if _globals.get("RealmDestroyed") else dict, - ), -}
diff --git a/selenium/webdriver/common/bidi/session.py b/selenium/webdriver/common/bidi/session.py deleted file mode 100644 index b00544d..0000000 --- a/selenium/webdriver/common/bidi/session.py +++ /dev/null
@@ -1,260 +0,0 @@ -# Licensed to the Software Freedom Conservancy (SFC) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The SFC licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - - -from __future__ import annotations - -from dataclasses import dataclass, field -from typing import Any - -from selenium.webdriver.common.bidi.common import command_builder - - -class UserPromptHandlerType: - """UserPromptHandlerType.""" - - ACCEPT = "accept" - DISMISS = "dismiss" - IGNORE = "ignore" - - -@dataclass -class CapabilitiesRequest: - """CapabilitiesRequest.""" - - always_match: Any | None = None - first_match: list[Any] = field(default_factory=list) - - -@dataclass -class CapabilityRequest: - """CapabilityRequest.""" - - accept_insecure_certs: bool | None = None - browser_name: str | None = None - browser_version: str | None = None - platform_name: str | None = None - proxy: Any | None = None - unhandled_prompt_behavior: Any | None = None - - -@dataclass -class AutodetectProxyConfiguration: - """AutodetectProxyConfiguration.""" - - proxy_type: str = field(default="autodetect", init=False) - - -@dataclass -class DirectProxyConfiguration: - """DirectProxyConfiguration.""" - - proxy_type: str = field(default="direct", init=False) - - -@dataclass -class ManualProxyConfiguration: - """ManualProxyConfiguration.""" - - proxy_type: str = field(default="manual", init=False) - http_proxy: str | None = None - ssl_proxy: str | None = None - no_proxy: list[Any] = field(default_factory=list) - - -@dataclass -class SocksProxyConfiguration: - """SocksProxyConfiguration.""" - - socks_proxy: str | None = None - socks_version: Any | None = None - - -@dataclass -class PacProxyConfiguration: - """PacProxyConfiguration.""" - - proxy_type: str = field(default="pac", init=False) - proxy_autoconfig_url: str | None = None - - -@dataclass -class SystemProxyConfiguration: - """SystemProxyConfiguration.""" - - proxy_type: str = field(default="system", init=False) - - -@dataclass -class SubscribeParameters: - """SubscribeParameters.""" - - events: list[str] = field(default_factory=list) - contexts: list[Any] = field(default_factory=list) - user_contexts: list[Any] = field(default_factory=list) - - -@dataclass -class UnsubscribeByIDRequest: - """UnsubscribeByIDRequest.""" - - subscriptions: list[Any] = field(default_factory=list) - - -@dataclass -class UnsubscribeByAttributesRequest: - """UnsubscribeByAttributesRequest.""" - - events: list[str] = field(default_factory=list) - - -@dataclass -class StatusResult: - """StatusResult.""" - - ready: bool | None = None - message: str | None = None - - -@dataclass -class NewParameters: - """NewParameters.""" - - capabilities: Any | None = None - - -@dataclass -class NewResult: - """NewResult.""" - - session_id: str | None = None - accept_insecure_certs: bool | None = None - browser_name: str | None = None - browser_version: str | None = None - platform_name: str | None = None - set_window_rect: bool | None = None - user_agent: str | None = None - proxy: Any | None = None - unhandled_prompt_behavior: Any | None = None - web_socket_url: str | None = None - - -@dataclass -class SubscribeResult: - """SubscribeResult.""" - - subscription: Any | None = None - - -@dataclass -class UserPromptHandler: - """UserPromptHandler.""" - - alert: Any | None = None - before_unload: Any | None = None - confirm: Any | None = None - default: Any | None = None - file: Any | None = None - prompt: Any | None = None - - def to_bidi_dict(self) -> dict: - """Convert to BiDi protocol dict with camelCase keys.""" - result = {} - if self.alert is not None: - result["alert"] = self.alert - if self.before_unload is not None: - result["beforeUnload"] = self.before_unload - if self.confirm is not None: - result["confirm"] = self.confirm - if self.default is not None: - result["default"] = self.default - if self.file is not None: - result["file"] = self.file - if self.prompt is not None: - result["prompt"] = self.prompt - return result - - def to_dict(self) -> dict: - """Backward-compatible alias for to_bidi_dict().""" - return self.to_bidi_dict() - - -class Session: - """WebDriver BiDi session module.""" - - def __init__(self, conn) -> None: - self._conn = conn - - def status(self): - """Execute session.status.""" - params = {} - params = {k: v for k, v in params.items() if v is not None} - cmd = command_builder("session.status", params) - result = self._conn.execute(cmd) - return result - - def new(self, capabilities: Any | None = None): - """Execute session.new.""" - if capabilities is None: - raise TypeError("new() missing required argument: 'capabilities'") - - params = { - "capabilities": capabilities, - } - params = {k: v for k, v in params.items() if v is not None} - cmd = command_builder("session.new", params) - result = self._conn.execute(cmd) - return result - - def end(self): - """Execute session.end.""" - params = {} - params = {k: v for k, v in params.items() if v is not None} - cmd = command_builder("session.end", params) - result = self._conn.execute(cmd) - return result - - def subscribe( - self, - events: list[Any] | None = None, - contexts: list[Any] | None = None, - user_contexts: list[Any] | None = None, - ): - """Execute session.subscribe.""" - if events is None: - raise TypeError("subscribe() missing required argument: 'events'") - - params = { - "events": events, - "contexts": contexts, - "userContexts": user_contexts, - } - params = {k: v for k, v in params.items() if v is not None} - cmd = command_builder("session.subscribe", params) - result = self._conn.execute(cmd) - return result - - def unsubscribe(self, events: list[Any] | None = None, subscriptions: list[Any] | None = None): - """Execute session.unsubscribe.""" - params = { - "events": events, - "subscriptions": subscriptions, - } - params = {k: v for k, v in params.items() if v is not None} - cmd = command_builder("session.unsubscribe", params) - result = self._conn.execute(cmd) - return result
diff --git a/selenium/webdriver/common/bidi/storage.py b/selenium/webdriver/common/bidi/storage.py deleted file mode 100644 index 90e65ac..0000000 --- a/selenium/webdriver/common/bidi/storage.py +++ /dev/null
@@ -1,353 +0,0 @@ -# Licensed to the Software Freedom Conservancy (SFC) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The SFC licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - - -from __future__ import annotations - -from dataclasses import dataclass, field -from typing import Any - -from selenium.webdriver.common.bidi.common import command_builder - - -@dataclass -class PartitionKey: - """PartitionKey.""" - - user_context: str | None = None - source_origin: str | None = None - - -@dataclass -class GetCookiesParameters: - """GetCookiesParameters.""" - - filter: Any | None = None - partition: Any | None = None - - -@dataclass -class GetCookiesResult: - """GetCookiesResult.""" - - cookies: list[Any] = field(default_factory=list) - partition_key: Any | None = None - - -@dataclass -class SetCookieParameters: - """SetCookieParameters.""" - - cookie: Any | None = None - partition: Any | None = None - - -@dataclass -class SetCookieResult: - """SetCookieResult.""" - - partition_key: Any | None = None - - -@dataclass -class DeleteCookiesParameters: - """DeleteCookiesParameters.""" - - filter: Any | None = None - partition: Any | None = None - - -@dataclass -class DeleteCookiesResult: - """DeleteCookiesResult.""" - - partition_key: Any | None = None - - -class BytesValue: - """A string or base64-encoded bytes value used in cookie operations. - - This corresponds to network.BytesValue in the WebDriver BiDi specification, - wrapping either a plain string or a base64-encoded binary value. - """ - - TYPE_STRING = "string" - TYPE_BASE64 = "base64" - - def __init__(self, type: Any | None, value: Any | None) -> None: - self.type = type - self.value = value - - def to_bidi_dict(self) -> dict: - return {"type": self.type, "value": self.value} - - def to_dict(self) -> dict: - """Backward-compatible alias for to_bidi_dict().""" - return self.to_bidi_dict() - - -class SameSite: - """SameSite cookie attribute values.""" - - STRICT = "strict" - LAX = "lax" - NONE = "none" - DEFAULT = "default" - - -@dataclass -class StorageCookie: - """A cookie object returned by storage.getCookies.""" - - name: str | None = None - value: Any | None = None - domain: str | None = None - path: str | None = None - size: Any | None = None - http_only: bool | None = None - secure: bool | None = None - same_site: Any | None = None - expiry: Any | None = None - - @classmethod - def from_bidi_dict(cls, raw: dict) -> StorageCookie: - """Deserialize a wire-level cookie dict to a StorageCookie.""" - value_raw = raw.get("value") - if isinstance(value_raw, dict): - value: Any = BytesValue(value_raw.get("type"), value_raw.get("value")) - else: - value = value_raw - return cls( - name=raw.get("name"), - value=value, - domain=raw.get("domain"), - path=raw.get("path"), - size=raw.get("size"), - http_only=raw.get("httpOnly"), - secure=raw.get("secure"), - same_site=raw.get("sameSite"), - expiry=raw.get("expiry"), - ) - - -@dataclass -class CookieFilter: - """CookieFilter.""" - - name: str | None = None - value: Any | None = None - domain: str | None = None - path: str | None = None - size: Any | None = None - http_only: bool | None = None - secure: bool | None = None - same_site: Any | None = None - expiry: Any | None = None - - def to_bidi_dict(self) -> dict: - """Serialize to the BiDi wire-protocol dict.""" - result: dict = {} - if self.name is not None: - result["name"] = self.name - if self.value is not None: - result["value"] = self.value.to_bidi_dict() if hasattr(self.value, "to_bidi_dict") else self.value - if self.domain is not None: - result["domain"] = self.domain - if self.path is not None: - result["path"] = self.path - if self.size is not None: - result["size"] = self.size - if self.http_only is not None: - result["httpOnly"] = self.http_only - if self.secure is not None: - result["secure"] = self.secure - if self.same_site is not None: - result["sameSite"] = self.same_site - if self.expiry is not None: - result["expiry"] = self.expiry - return result - - def to_dict(self) -> dict: - """Backward-compatible alias for to_bidi_dict().""" - return self.to_bidi_dict() - - -@dataclass -class PartialCookie: - """PartialCookie.""" - - name: str | None = None - value: Any | None = None - domain: str | None = None - path: str | None = None - http_only: bool | None = None - secure: bool | None = None - same_site: Any | None = None - expiry: Any | None = None - - def to_bidi_dict(self) -> dict: - """Serialize to the BiDi wire-protocol dict.""" - result: dict = {} - if self.name is not None: - result["name"] = self.name - if self.value is not None: - result["value"] = self.value.to_bidi_dict() if hasattr(self.value, "to_bidi_dict") else self.value - if self.domain is not None: - result["domain"] = self.domain - if self.path is not None: - result["path"] = self.path - if self.http_only is not None: - result["httpOnly"] = self.http_only - if self.secure is not None: - result["secure"] = self.secure - if self.same_site is not None: - result["sameSite"] = self.same_site - if self.expiry is not None: - result["expiry"] = self.expiry - return result - - def to_dict(self) -> dict: - """Backward-compatible alias for to_bidi_dict().""" - return self.to_bidi_dict() - - -class BrowsingContextPartitionDescriptor: - """BrowsingContextPartitionDescriptor. - - The first positional argument is *context* (a browsing-context ID / window - handle), mirroring how the class is used throughout the test suite: - ``BrowsingContextPartitionDescriptor(driver.current_window_handle)``. - """ - - def __init__(self, context: Any = None, type: str = "context") -> None: - self.context = context - self.type = type - - def to_bidi_dict(self) -> dict: - return {"type": "context", "context": self.context} - - def to_dict(self) -> dict: - """Backward-compatible alias for to_bidi_dict().""" - return self.to_bidi_dict() - - -@dataclass -class StorageKeyPartitionDescriptor: - """StorageKeyPartitionDescriptor.""" - - type: Any | None = "storageKey" - user_context: str | None = None - source_origin: str | None = None - - def to_bidi_dict(self) -> dict: - """Serialize to the BiDi wire-protocol dict.""" - result: dict = {"type": "storageKey"} - if self.user_context is not None: - result["userContext"] = self.user_context - if self.source_origin is not None: - result["sourceOrigin"] = self.source_origin - return result - - def to_dict(self) -> dict: - """Backward-compatible alias for to_bidi_dict().""" - return self.to_bidi_dict() - - -class Storage: - """WebDriver BiDi storage module.""" - - def __init__(self, conn) -> None: - self._conn = conn - - def get_cookies(self, filter=None, partition=None): - """Execute storage.getCookies and return a GetCookiesResult.""" - if filter and hasattr(filter, "to_bidi_dict"): - filter = filter.to_bidi_dict() - if partition and hasattr(partition, "to_bidi_dict"): - partition = partition.to_bidi_dict() - params = { - "filter": filter, - "partition": partition, - } - params = {k: v for k, v in params.items() if v is not None} - cmd = command_builder("storage.getCookies", params) - result = self._conn.execute(cmd) - if result and "cookies" in result: - cookies = [StorageCookie.from_bidi_dict(c) for c in result.get("cookies", []) if isinstance(c, dict)] - pk_raw = result.get("partitionKey") - pk = ( - PartitionKey( - user_context=pk_raw.get("userContext"), - source_origin=pk_raw.get("sourceOrigin"), - ) - if isinstance(pk_raw, dict) - else None - ) - return GetCookiesResult(cookies=cookies, partition_key=pk) - return GetCookiesResult(cookies=[], partition_key=None) - - def set_cookie(self, cookie=None, partition=None): - """Execute storage.setCookie.""" - if cookie and hasattr(cookie, "to_bidi_dict"): - cookie = cookie.to_bidi_dict() - if partition and hasattr(partition, "to_bidi_dict"): - partition = partition.to_bidi_dict() - params = { - "cookie": cookie, - "partition": partition, - } - params = {k: v for k, v in params.items() if v is not None} - cmd = command_builder("storage.setCookie", params) - result = self._conn.execute(cmd) - if isinstance(result, dict): - pk_raw = result.get("partitionKey") - pk = ( - PartitionKey( - user_context=pk_raw.get("userContext"), - source_origin=pk_raw.get("sourceOrigin"), - ) - if isinstance(pk_raw, dict) - else None - ) - return SetCookieResult(partition_key=pk) - return result - - def delete_cookies(self, filter=None, partition=None): - """Execute storage.deleteCookies.""" - if filter and hasattr(filter, "to_bidi_dict"): - filter = filter.to_bidi_dict() - if partition and hasattr(partition, "to_bidi_dict"): - partition = partition.to_bidi_dict() - params = { - "filter": filter, - "partition": partition, - } - params = {k: v for k, v in params.items() if v is not None} - cmd = command_builder("storage.deleteCookies", params) - result = self._conn.execute(cmd) - if isinstance(result, dict): - pk_raw = result.get("partitionKey") - pk = ( - PartitionKey( - user_context=pk_raw.get("userContext"), - source_origin=pk_raw.get("sourceOrigin"), - ) - if isinstance(pk_raw, dict) - else None - ) - return DeleteCookiesResult(partition_key=pk) - return result
diff --git a/selenium/webdriver/common/bidi/webextension.py b/selenium/webdriver/common/bidi/webextension.py deleted file mode 100644 index 62f2dec..0000000 --- a/selenium/webdriver/common/bidi/webextension.py +++ /dev/null
@@ -1,154 +0,0 @@ -# Licensed to the Software Freedom Conservancy (SFC) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The SFC licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - - -from __future__ import annotations - -from dataclasses import dataclass, field -from typing import Any - -from selenium.webdriver.common.bidi.common import command_builder - - -@dataclass -class InstallParameters: - """InstallParameters.""" - - extension_data: Any | None = None - - -@dataclass -class ExtensionPath: - """ExtensionPath.""" - - type: str = field(default="path", init=False) - path: str | None = None - - -@dataclass -class ExtensionArchivePath: - """ExtensionArchivePath.""" - - type: str = field(default="archivePath", init=False) - path: str | None = None - - -@dataclass -class ExtensionBase64Encoded: - """ExtensionBase64Encoded.""" - - type: str = field(default="base64", init=False) - value: str | None = None - - -@dataclass -class InstallResult: - """InstallResult.""" - - extension: Any | None = None - - -@dataclass -class UninstallParameters: - """UninstallParameters.""" - - extension: Any | None = None - - -class WebExtension: - """WebDriver BiDi webExtension module.""" - - def __init__(self, conn) -> None: - self._conn = conn - - def install( - self, - path: str | None = None, - archive_path: str | None = None, - base64_value: str | None = None, - ): - """Install a web extension. - - Exactly one of the three keyword arguments must be provided. - - Args: - path: Directory path to an unpacked extension (also accepted for - signed ``.xpi`` / ``.crx`` archive files on Firefox). - archive_path: File-system path to a packed extension archive. - base64_value: Base64-encoded extension archive string. - - Returns: - The raw result dict from the BiDi ``webExtension.install`` command - (contains at least an ``"extension"`` key with the extension ID). - - Raises: - ValueError: If more than one, or none, of the arguments is provided. - """ - provided = [ - k - for k, v in { - "path": path, - "archive_path": archive_path, - "base64_value": base64_value, - }.items() - if v is not None - ] - if len(provided) != 1: - raise ValueError(f"Exactly one of path, archive_path, or base64_value must be provided; got: {provided}") - if path is not None: - extension_data = {"type": "path", "path": path} - elif archive_path is not None: - extension_data = {"type": "archivePath", "path": archive_path} - else: - assert base64_value is not None - extension_data = {"type": "base64", "value": base64_value} - params = {"extensionData": extension_data} - cmd = command_builder("webExtension.install", params) - try: - return self._conn.execute(cmd) - except Exception as e: - if "Method not available" in str(e): - raise RuntimeError( - "webExtension.install failed with 'Method not available'. " - "This likely means that web extension support is disabled. " - "Enable unsafe extension debugging and/or set options.enable_webextensions " - "in your WebDriver configuration." - ) from e - raise - - def uninstall(self, extension: str | dict): - """Uninstall a web extension. - - Args: - extension: Either the extension ID string returned by ``install``, - or the full result dict returned by ``install`` (the - ``"extension"`` value is extracted automatically). - - Raises: - ValueError: If extension is not provided or is None. - """ - if isinstance(extension, dict): - extension_id: Any = extension.get("extension") - else: - extension_id = extension - - if extension_id is None: - raise ValueError("extension parameter is required") - - params = {"extension": extension_id} - cmd = command_builder("webExtension.uninstall", params) - return self._conn.execute(cmd)
diff --git a/test/unit/selenium/webdriver/common/bidi_network_tests.py b/test/unit/selenium/webdriver/common/bidi_network_tests.py new file mode 100644 index 0000000..cd7c463 --- /dev/null +++ b/test/unit/selenium/webdriver/common/bidi_network_tests.py
@@ -0,0 +1,80 @@ +# Licensed to the Software Freedom Conservancy (SFC) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The SFC licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import pytest + +from selenium.webdriver.common.bidi.network import Network + + +class FakeConnection: + def __init__(self): + self.commands = [] + self.added_callbacks = [] + self.removed_callbacks = [] + self._next_callback_id = 1 + + def add_callback(self, event_wrapper, callback): + callback_id = self._next_callback_id + self._next_callback_id += 1 + self.added_callbacks.append((callback_id, event_wrapper.event_class, callback)) + return callback_id + + def remove_callback(self, event_wrapper, callback_id): + self.removed_callbacks.append((callback_id, event_wrapper.event_class)) + + def execute(self, cmd): + payload = next(cmd) + self.commands.append(payload) + + if payload["method"] == "network.addIntercept": + response = {"intercept": "intercept-1"} + elif payload["method"] == "session.subscribe": + response = {"subscription": "subscription-1"} + else: + response = {} + + try: + cmd.send(response) + except StopIteration as exc: + return exc.value + + raise AssertionError("BiDi command generator did not finish") + + +def test_add_request_handler_accepts_before_request_sent_alias(): + conn = FakeConnection() + network = Network(conn) + + callback_id = network.add_request_handler("before_request_sent", lambda request: None) + network.remove_request_handler("before_request_sent", callback_id) + + assert callback_id == 1 + assert conn.added_callbacks[0][1] == "network.beforeRequestSent" + assert conn.removed_callbacks[0] == (1, "network.beforeRequestSent") + assert conn.commands == [ + {"method": "network.addIntercept", "params": {"phases": ["beforeRequestSent"]}}, + {"method": "session.subscribe", "params": {"events": ["network.beforeRequestSent"]}}, + {"method": "session.unsubscribe", "params": {"subscriptions": ["subscription-1"]}}, + {"method": "network.removeIntercept", "params": {"intercept": "intercept-1"}}, + ] + + +def test_add_request_handler_rejects_unsupported_alias(): + network = Network(FakeConnection()) + + with pytest.raises(ValueError, match="Unsupported request handler event 'response_started'"): + network.add_request_handler("response_started", lambda request: None)