Attempt to prohibit mutating a Context after its in use (#1416)
diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 903bf38..c670a2d 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst
@@ -4,6 +4,23 @@ Versions are year-based with a strict backward-compatibility policy. The third digit is only for regressions. +UNRELEASED +---------- + +Backward-incompatible changes: +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Deprecations: +^^^^^^^^^^^^^ + +- Attempting using any methods that mutate an ``OpenSSL.SSL.Context`` after it + has been used to create an ``OpenSSL.SSL.Connection`` will emit a warning. In + a future release, this will raise an exception. + +Changes: +^^^^^^^^ + + 25.0.0 (2025-01-12) -------------------
diff --git a/src/OpenSSL/SSL.py b/src/OpenSSL/SSL.py index 0cde0b2..ca8913c 100644 --- a/src/OpenSSL/SSL.py +++ b/src/OpenSSL/SSL.py
@@ -827,6 +827,26 @@ _session: Any +F = TypeVar("F", bound=Callable[..., Any]) + + +def _require_not_used(f: F) -> F: + @wraps(f) + def inner(self: Context, *args: Any, **kwargs: Any) -> Any: + if self._used: + warnings.warn( + ( + "Attempting to mutate a Context after a Connection was " + "created. In the future, this will raise an exception" + ), + DeprecationWarning, + stacklevel=2, + ) + return f(self, *args, **kwargs) + + return typing.cast(F, inner) + + class Context: """ :class:`OpenSSL.SSL.Context` instances define the parameters for setting @@ -870,6 +890,7 @@ context = _ffi.gc(context, _lib.SSL_CTX_free) self._context = context + self._used = False self._passphrase_helper: _PassphraseHelper | None = None self._passphrase_callback: _PassphraseCallback[Any] | None = None self._passphrase_userdata: Any | None = None @@ -898,6 +919,7 @@ self.set_min_proto_version(version) self.set_max_proto_version(version) + @_require_not_used def set_min_proto_version(self, version: int) -> None: """ Set the minimum supported protocol version. Setting the minimum @@ -911,6 +933,7 @@ _lib.SSL_CTX_set_min_proto_version(self._context, version) == 1 ) + @_require_not_used def set_max_proto_version(self, version: int) -> None: """ Set the maximum supported protocol version. Setting the maximum @@ -924,6 +947,7 @@ _lib.SSL_CTX_set_max_proto_version(self._context, version) == 1 ) + @_require_not_used def load_verify_locations( self, cafile: _StrOrBytesPath | None, @@ -971,6 +995,7 @@ FILETYPE_PEM, wrapper, more_args=True, truncate=True ) + @_require_not_used def set_passwd_cb( self, callback: _PassphraseCallback[_T], @@ -1004,6 +1029,7 @@ ) self._passphrase_userdata = userdata + @_require_not_used def set_default_verify_paths(self) -> None: """ Specify that the platform provided CA certificates are to be used for @@ -1079,6 +1105,7 @@ self.load_verify_locations(None, capath) break + @_require_not_used def use_certificate_chain_file(self, certfile: _StrOrBytesPath) -> None: """ Load a certificate chain from a file. @@ -1096,6 +1123,7 @@ if not result: _raise_current_error() + @_require_not_used def use_certificate_file( self, certfile: _StrOrBytesPath, filetype: int = FILETYPE_PEM ) -> None: @@ -1120,6 +1148,7 @@ if not use_result: _raise_current_error() + @_require_not_used def use_certificate(self, cert: X509 | x509.Certificate) -> None: """ Load a certificate from a X509 object @@ -1144,6 +1173,7 @@ if not use_result: _raise_current_error() + @_require_not_used def add_extra_chain_cert(self, certobj: X509 | x509.Certificate) -> None: """ Add certificate to chain @@ -1176,6 +1206,7 @@ _raise_current_error() + @_require_not_used def use_privatekey_file( self, keyfile: _StrOrBytesPath, filetype: int = FILETYPE_PEM ) -> None: @@ -1200,6 +1231,7 @@ if not use_result: self._raise_passphrase_exception() + @_require_not_used def use_privatekey(self, pkey: _PrivateKey | PKey) -> None: """ Load a private key from a PKey object @@ -1234,6 +1266,7 @@ if not _lib.SSL_CTX_check_private_key(self._context): _raise_current_error() + @_require_not_used def load_client_ca(self, cafile: bytes) -> None: """ Load the trusted certificates that will be sent to the client. Does @@ -1249,6 +1282,7 @@ _openssl_assert(ca_list != _ffi.NULL) _lib.SSL_CTX_set_client_CA_list(self._context, ca_list) + @_require_not_used def set_session_id(self, buf: bytes) -> None: """ Set the session id to *buf* within which a session can be reused for @@ -1266,6 +1300,7 @@ == 1 ) + @_require_not_used def set_session_cache_mode(self, mode: int) -> int: """ Set the behavior of the session cache used by all connections using @@ -1293,6 +1328,7 @@ """ return _lib.SSL_CTX_get_session_cache_mode(self._context) + @_require_not_used def set_verify( self, mode: int, callback: _VerifyCallback | None = None ) -> None: @@ -1330,6 +1366,7 @@ self._verify_callback = self._verify_helper.callback _lib.SSL_CTX_set_verify(self._context, mode, self._verify_callback) + @_require_not_used def set_verify_depth(self, depth: int) -> None: """ Set the maximum depth for the certificate chain verification that shall @@ -1361,6 +1398,7 @@ """ return _lib.SSL_CTX_get_verify_depth(self._context) + @_require_not_used def load_tmp_dh(self, dhfile: _StrOrBytesPath) -> None: """ Load parameters for Ephemeral Diffie-Hellman @@ -1382,6 +1420,7 @@ res = _lib.SSL_CTX_set_tmp_dh(self._context, dh) _openssl_assert(res == 1) + @_require_not_used def set_tmp_ecdh(self, curve: _EllipticCurve | ec.EllipticCurve) -> None: """ Select a curve to use for ECDHE key exchange. @@ -1421,6 +1460,7 @@ ec = _ffi.gc(ec, _lib.EC_KEY_free) _lib.SSL_CTX_set_tmp_ecdh(self._context, ec) + @_require_not_used def set_cipher_list(self, cipher_list: bytes) -> None: """ Set the list of ciphers to be used in this context. @@ -1460,6 +1500,7 @@ ], ) + @_require_not_used def set_client_ca_list( self, certificate_authorities: Sequence[X509Name] ) -> None: @@ -1497,6 +1538,7 @@ _lib.SSL_CTX_set_client_CA_list(self._context, name_stack) + @_require_not_used def add_client_ca( self, certificate_authority: X509 | x509.Certificate ) -> None: @@ -1531,6 +1573,7 @@ ) _openssl_assert(add_result == 1) + @_require_not_used def set_timeout(self, timeout: int) -> None: """ Set the timeout for newly created sessions for this Context object to @@ -1554,6 +1597,7 @@ """ return _lib.SSL_CTX_get_timeout(self._context) + @_require_not_used def set_info_callback( self, callback: Callable[[Connection, int, int], None] ) -> None: @@ -1579,6 +1623,7 @@ _lib.SSL_CTX_set_info_callback(self._context, self._info_callback) @_requires_keylog + @_require_not_used def set_keylog_callback( self, callback: Callable[[Connection, bytes], None] ) -> None: @@ -1613,6 +1658,7 @@ """ return self._app_data + @_require_not_used def set_app_data(self, data: Any) -> None: """ Set the application data (will be returned from get_app_data()) @@ -1639,6 +1685,7 @@ pystore._store = store return pystore + @_require_not_used def set_options(self, options: int) -> int: """ Add options. Options set before are not cleared! @@ -1652,6 +1699,7 @@ return _lib.SSL_CTX_set_options(self._context, options) + @_require_not_used def set_mode(self, mode: int) -> int: """ Add modes via bitmask. Modes set before are not cleared! This method @@ -1665,6 +1713,7 @@ return _lib.SSL_CTX_set_mode(self._context, mode) + @_require_not_used def set_tlsext_servername_callback( self, callback: Callable[[Connection], None] ) -> None: @@ -1690,6 +1739,7 @@ self._context, self._tlsext_servername_callback ) + @_require_not_used def set_tlsext_use_srtp(self, profiles: bytes) -> None: """ Enable support for negotiating SRTP keying material. @@ -1705,6 +1755,7 @@ _lib.SSL_CTX_set_tlsext_use_srtp(self._context, profiles) == 0 ) + @_require_not_used def set_alpn_protos(self, protos: list[bytes]) -> None: """ Specify the protocols that the client is prepared to speak after the @@ -1742,6 +1793,7 @@ == 0 ) + @_require_not_used def set_alpn_select_callback(self, callback: _ALPNSelectCallback) -> None: """ Specify a callback function that will be called on the server when a @@ -1786,6 +1838,7 @@ rc = _lib.SSL_CTX_set_tlsext_status_arg(self._context, self._ocsp_data) _openssl_assert(rc == 1) + @_require_not_used def set_ocsp_server_callback( self, callback: _OCSPServerCallback[_T], @@ -1808,6 +1861,7 @@ helper = _OCSPServerCallbackHelper(callback) self._set_ocsp_callback(helper, data) + @_require_not_used def set_ocsp_client_callback( self, callback: _OCSPClientCallback[_T], @@ -1832,6 +1886,7 @@ helper = _OCSPClientCallbackHelper(callback) self._set_ocsp_callback(helper, data) + @_require_not_used def set_cookie_generate_callback( self, callback: _CookieGenerateCallback ) -> None: @@ -1841,6 +1896,7 @@ self._cookie_generate_helper.callback, ) + @_require_not_used def set_cookie_verify_callback( self, callback: _CookieVerifyCallback ) -> None: @@ -1869,6 +1925,8 @@ if not isinstance(context, Context): raise TypeError("context must be a Context instance") + context._used = True + ssl = _lib.SSL_new(context._context) self._ssl = _ffi.gc(ssl, _lib.SSL_free) # We set SSL_MODE_AUTO_RETRY to handle situations where OpenSSL returns @@ -2000,6 +2058,7 @@ _lib.SSL_set_SSL_CTX(self._ssl, context._context) self._context = context + self._context._used = True def get_servername(self) -> bytes | None: """
diff --git a/tests/test_ssl.py b/tests/test_ssl.py index 12ca4af..bcad6d9 100644 --- a/tests/test_ssl.py +++ b/tests/test_ssl.py
@@ -1819,7 +1819,7 @@ It does not return anything. """ context = Context(SSLv23_METHOD) - assert context.set_tlsext_use_srtp(b"SRTP_AES128_CM_SHA1_80") is None # type: ignore[func-returns-value] + assert context.set_tlsext_use_srtp(b"SRTP_AES128_CM_SHA1_80") is None class TestServerNameCallback: