blob: 0cacff067f003ecd86386e602011d0099cce9840 [file] [log] [blame] [edit]
// This file is dual licensed under the terms of the Apache License, Version
// 2.0, and the BSD License. See the LICENSE file in the root of this repository
// for complete details.
use std::borrow::Cow;
use std::collections::HashMap;
use std::ops::Deref;
use cryptography_x509::common::{AlgorithmIdentifier, AlgorithmParameters};
use cryptography_x509::csr::Attribute;
use cryptography_x509::pkcs7::PKCS7_DATA_OID;
use cryptography_x509::{common, oid, pkcs7};
use once_cell::sync::Lazy;
#[cfg(not(CRYPTOGRAPHY_IS_BORINGSSL))]
use openssl::pkcs7::Pkcs7;
use pyo3::types::{PyAnyMethods, PyBytesMethods, PyListMethods};
use crate::asn1::encode_der_data;
use crate::backend::ciphers;
use crate::buf::CffiBuf;
use crate::error::{CryptographyError, CryptographyResult};
use crate::padding::PKCS7UnpaddingContext;
use crate::pkcs12::symmetric_encrypt;
#[cfg(not(CRYPTOGRAPHY_IS_BORINGSSL))]
use crate::x509::certificate::load_der_x509_certificate;
use crate::{exceptions, types, x509};
const PKCS7_CONTENT_TYPE_OID: asn1::ObjectIdentifier = asn1::oid!(1, 2, 840, 113549, 1, 9, 3);
const PKCS7_MESSAGE_DIGEST_OID: asn1::ObjectIdentifier = asn1::oid!(1, 2, 840, 113549, 1, 9, 4);
const PKCS7_SIGNING_TIME_OID: asn1::ObjectIdentifier = asn1::oid!(1, 2, 840, 113549, 1, 9, 5);
const PKCS7_SMIME_CAP_OID: asn1::ObjectIdentifier = asn1::oid!(1, 2, 840, 113549, 1, 9, 15);
static OIDS_TO_MIC_NAME: Lazy<HashMap<&asn1::ObjectIdentifier, &str>> = Lazy::new(|| {
let mut h = HashMap::new();
h.insert(&oid::SHA224_OID, "sha-224");
h.insert(&oid::SHA256_OID, "sha-256");
h.insert(&oid::SHA384_OID, "sha-384");
h.insert(&oid::SHA512_OID, "sha-512");
h
});
#[pyo3::pyfunction]
fn serialize_certificates<'p>(
py: pyo3::Python<'p>,
py_certs: Vec<pyo3::PyRef<'p, x509::certificate::Certificate>>,
encoding: &pyo3::Bound<'p, pyo3::PyAny>,
) -> CryptographyResult<pyo3::Bound<'p, pyo3::types::PyBytes>> {
if py_certs.is_empty() {
return Err(pyo3::exceptions::PyTypeError::new_err(
"certs must be a list of certs with length >= 1",
)
.into());
}
let raw_certs = py_certs
.iter()
.map(|c| c.raw.borrow_dependent().clone())
.collect::<Vec<_>>();
let signed_data = pkcs7::SignedData {
version: 1,
digest_algorithms: common::Asn1ReadableOrWritable::new_write(asn1::SetOfWriter::new(&[])),
content_info: pkcs7::ContentInfo {
_content_type: asn1::DefinedByMarker::marker(),
content: pkcs7::Content::Data(None),
},
certificates: Some(common::Asn1ReadableOrWritable::new_write(
asn1::SetOfWriter::new(&raw_certs),
)),
crls: None,
signer_infos: common::Asn1ReadableOrWritable::new_write(asn1::SetOfWriter::new(&[])),
};
let content_info = pkcs7::ContentInfo {
_content_type: asn1::DefinedByMarker::marker(),
content: pkcs7::Content::SignedData(asn1::Explicit::new(Box::new(signed_data))),
};
let content_info_bytes = asn1::write_single(&content_info)?;
encode_der_data(py, "PKCS7".to_string(), content_info_bytes, encoding)
}
#[pyo3::pyfunction]
fn encrypt_and_serialize<'p>(
py: pyo3::Python<'p>,
builder: &pyo3::Bound<'p, pyo3::PyAny>,
encoding: &pyo3::Bound<'p, pyo3::PyAny>,
options: &pyo3::Bound<'p, pyo3::types::PyList>,
) -> CryptographyResult<pyo3::Bound<'p, pyo3::types::PyBytes>> {
let raw_data: CffiBuf<'p> = builder.getattr(pyo3::intern!(py, "_data"))?.extract()?;
let text_mode = options.contains(types::PKCS7_TEXT.get(py)?)?;
let data_with_header = if options.contains(types::PKCS7_BINARY.get(py)?)? {
Cow::Borrowed(raw_data.as_bytes())
} else {
smime_canonicalize(raw_data.as_bytes(), text_mode).0
};
// The message is encrypted with AES-128-CBC, which the S/MIME v3.2 RFC
// specifies as MUST support (https://datatracker.ietf.org/doc/html/rfc5751#section-2.7)
let key = types::OS_URANDOM.get(py)?.call1((16,))?;
let aes128_algorithm = types::AES128.get(py)?.call1((&key,))?;
let iv = types::OS_URANDOM.get(py)?.call1((16,))?;
let cbc_mode = types::CBC.get(py)?.call1((&iv,))?;
let encrypted_content = symmetric_encrypt(py, aes128_algorithm, cbc_mode, &data_with_header)?;
let py_recipients: Vec<pyo3::Bound<'p, x509::certificate::Certificate>> = builder
.getattr(pyo3::intern!(py, "_recipients"))?
.extract()?;
let mut recipient_infos = vec![];
let padding = types::PKCS1V15.get(py)?.call0()?;
let ka_bytes = cryptography_keepalive::KeepAlive::new();
for cert in py_recipients.iter() {
// Currently, keys are encrypted with RSA (PKCS #1 v1.5), which the S/MIME v3.2 RFC
// specifies as MUST support (https://datatracker.ietf.org/doc/html/rfc5751#section-2.3)
let encrypted_key = cert
.call_method0(pyo3::intern!(py, "public_key"))?
.call_method1(pyo3::intern!(py, "encrypt"), (&key, &padding))?
.extract::<pyo3::pybacked::PyBackedBytes>()?;
recipient_infos.push(pkcs7::RecipientInfo {
version: 0,
issuer_and_serial_number: pkcs7::IssuerAndSerialNumber {
issuer: cert.get().raw.borrow_dependent().tbs_cert.issuer.clone(),
serial_number: cert.get().raw.borrow_dependent().tbs_cert.serial,
},
key_encryption_algorithm: AlgorithmIdentifier {
oid: asn1::DefinedByMarker::marker(),
params: AlgorithmParameters::Rsa(Some(())),
},
encrypted_key: ka_bytes.add(encrypted_key),
});
}
let enveloped_data = pkcs7::EnvelopedData {
version: 0,
recipient_infos: common::Asn1ReadableOrWritable::new_write(asn1::SetOfWriter::new(
&recipient_infos,
)),
encrypted_content_info: pkcs7::EncryptedContentInfo {
content_type: PKCS7_DATA_OID,
content_encryption_algorithm: AlgorithmIdentifier {
oid: asn1::DefinedByMarker::marker(),
params: AlgorithmParameters::Aes128Cbc(iv.extract()?),
},
encrypted_content: Some(&encrypted_content),
},
};
let content_info = pkcs7::ContentInfo {
_content_type: asn1::DefinedByMarker::marker(),
content: pkcs7::Content::EnvelopedData(asn1::Explicit::new(Box::new(enveloped_data))),
};
let ci_bytes = asn1::write_single(&content_info)?;
if encoding.is(&types::ENCODING_SMIME.get(py)?) {
Ok(types::SMIME_ENVELOPED_ENCODE
.get(py)?
.call1((&*ci_bytes,))?
.extract()?)
} else {
// Handles the DER, PEM, and error cases
encode_der_data(py, "PKCS7".to_string(), ci_bytes, encoding)
}
}
#[pyo3::pyfunction]
fn decrypt_smime<'p>(
py: pyo3::Python<'p>,
data: CffiBuf<'p>,
certificate: pyo3::Bound<'p, x509::certificate::Certificate>,
private_key: pyo3::Bound<'p, pyo3::types::PyAny>,
options: &pyo3::Bound<'p, pyo3::types::PyList>,
) -> CryptographyResult<pyo3::Bound<'p, pyo3::types::PyBytes>> {
let decoded_smime_data = types::SMIME_ENVELOPED_DECODE
.get(py)?
.call1((data.as_bytes(),))?;
let data = decoded_smime_data.extract()?;
decrypt_der(py, data, certificate, private_key, options)
}
#[pyo3::pyfunction]
fn decrypt_pem<'p>(
py: pyo3::Python<'p>,
data: &[u8],
certificate: pyo3::Bound<'p, x509::certificate::Certificate>,
private_key: pyo3::Bound<'p, pyo3::types::PyAny>,
options: &pyo3::Bound<'p, pyo3::types::PyList>,
) -> CryptographyResult<pyo3::Bound<'p, pyo3::types::PyBytes>> {
let pem_str = std::str::from_utf8(data)
.map_err(|_| pyo3::exceptions::PyValueError::new_err("Invalid PEM data"))?;
let pem = pem::parse(pem_str)
.map_err(|_| pyo3::exceptions::PyValueError::new_err("Failed to parse PEM data"))?;
// Raise error if the PEM tag is not PKCS7
if pem.tag() != "PKCS7" {
return Err(CryptographyError::from(
pyo3::exceptions::PyValueError::new_err(
"The provided PEM data does not have the PKCS7 tag.",
),
));
}
decrypt_der(py, &pem.into_contents(), certificate, private_key, options)
}
#[pyo3::pyfunction]
fn decrypt_der<'p>(
py: pyo3::Python<'p>,
data: &[u8],
certificate: pyo3::Bound<'p, x509::certificate::Certificate>,
private_key: pyo3::Bound<'p, pyo3::types::PyAny>,
options: &pyo3::Bound<'p, pyo3::types::PyList>,
) -> CryptographyResult<pyo3::Bound<'p, pyo3::types::PyBytes>> {
// Check the decrypt parameters
check_decrypt_parameters(py, &certificate, &private_key, options)?;
// Decrypt the data
let content_info = asn1::parse_single::<pkcs7::ContentInfo<'_>>(data)?;
let plain_content = match content_info.content {
pkcs7::Content::EnvelopedData(data) => {
// Extract enveloped data
let enveloped_data = data.into_inner();
// Get recipients, and the one matching with the given certificate (if any)
let mut recipient_infos = enveloped_data.recipient_infos.unwrap_read().clone();
let recipient_certificate = certificate.get().raw.borrow_dependent();
let recipient_serial_number = recipient_certificate.tbs_cert.serial;
let recipient_issuer = recipient_certificate.tbs_cert.issuer.clone();
let found_recipient_info = recipient_infos.find(|info| {
info.issuer_and_serial_number.serial_number == recipient_serial_number
&& info.issuer_and_serial_number.issuer == recipient_issuer
});
// Raise error when no recipient is found
let recipient_info = match found_recipient_info {
Some(info) => info,
None => {
return Err(CryptographyError::from(
pyo3::exceptions::PyValueError::new_err(
"No recipient found that matches the given certificate.",
),
));
}
};
// Raise error when the key encryption algorithm is not RSA
let key = match recipient_info.key_encryption_algorithm.oid() {
&oid::RSA_OID => {
let padding = types::PKCS1V15.get(py)?.call0()?;
private_key
.call_method1(
pyo3::intern!(py, "decrypt"),
(recipient_info.encrypted_key, &padding),
)?
.extract::<pyo3::pybacked::PyBackedBytes>()?
}
_ => {
return Err(CryptographyError::from(
exceptions::UnsupportedAlgorithm::new_err((
"Only RSA with PKCS #1 v1.5 padding is currently supported for key decryption.",
exceptions::Reasons::UNSUPPORTED_SERIALIZATION,
)),
));
}
};
// Get algorithm
// TODO: implement all the possible algorithms
let algorithm_identifier = enveloped_data
.encrypted_content_info
.content_encryption_algorithm;
let (algorithm, mode) = match algorithm_identifier.params {
AlgorithmParameters::Aes128Cbc(iv) => (
types::AES128.get(py)?.call1((key,))?,
types::CBC
.get(py)?
.call1((pyo3::types::PyBytes::new(py, &iv),))?,
),
_ => {
return Err(CryptographyError::from(
exceptions::UnsupportedAlgorithm::new_err((
"Only AES-128-CBC is currently supported for content decryption.",
exceptions::Reasons::UNSUPPORTED_SERIALIZATION,
)),
));
}
};
// Decrypt the content using the key and proper algorithm
let encrypted_content = match enveloped_data.encrypted_content_info.encrypted_content {
Some(content) => content,
None => {
return Err(CryptographyError::from(
pyo3::exceptions::PyValueError::new_err(
"The EnvelopedData structure does not contain encrypted content.",
),
));
}
};
let decrypted_content = symmetric_decrypt(py, algorithm, mode, encrypted_content)?;
pyo3::types::PyBytes::new(py, decrypted_content.as_slice())
}
_ => {
return Err(CryptographyError::from(
pyo3::exceptions::PyValueError::new_err(
"The PKCS7 data is not an EnvelopedData structure.",
),
));
}
};
// If text_mode, remove the headers after checking the content type
let plain_data = if options.contains(types::PKCS7_TEXT.get(py)?)? {
let stripped_data = types::SMIME_REMOVE_TEXT_HEADERS
.get(py)?
.call1((plain_content.as_bytes(),))?;
pyo3::types::PyBytes::new(py, stripped_data.extract()?)
} else {
pyo3::types::PyBytes::new(py, plain_content.as_bytes())
};
Ok(plain_data)
}
fn check_decrypt_parameters<'p>(
py: pyo3::Python<'p>,
certificate: &pyo3::Bound<'p, x509::certificate::Certificate>,
private_key: &pyo3::Bound<'p, pyo3::PyAny>,
options: &pyo3::Bound<'p, pyo3::types::PyList>,
) -> Result<(), CryptographyError> {
// Check if RSA encryption with PKCS1 v1.5 padding is supported (dependent of FIPS mode)
if cryptography_openssl::fips::is_enabled() {
return Err(CryptographyError::from(
exceptions::UnsupportedAlgorithm::new_err((
"RSA with PKCS1 v1.5 padding is not supported by this version of OpenSSL.",
exceptions::Reasons::UNSUPPORTED_PADDING,
)),
));
}
// Check if all options are from the PKCS7Options enum
let pkcs7_options = types::PKCS7_OPTIONS.get(py)?;
for opt in options.iter() {
if !opt.is_instance(&pkcs7_options)? {
return Err(CryptographyError::from(
pyo3::exceptions::PyValueError::new_err(
"options must be from the PKCS7Options enum",
),
));
}
}
// Check if any option is not PKCS7Options::Text
let text_option = types::PKCS7_TEXT.get(py)?;
for opt in options.iter() {
if !opt.eq(text_option.clone())? {
return Err(CryptographyError::from(
pyo3::exceptions::PyValueError::new_err(
"Only the following options are supported for decryption: Text",
),
));
}
}
// Check if certificate's public key is an RSA public key
let public_key_type = types::RSA_PUBLIC_KEY.get(py)?;
if !certificate
.call_method0(pyo3::intern!(py, "public_key"))?
.is_instance(&public_key_type)?
{
return Err(CryptographyError::from(
pyo3::exceptions::PyTypeError::new_err(
"Only certificate with RSA public keys are supported at this time.",
),
));
}
// Check if private_key is an instance of RSA private key
let private_key_type = types::RSA_PRIVATE_KEY.get(py)?;
if !private_key.is_instance(&private_key_type)? {
return Err(CryptographyError::from(
pyo3::exceptions::PyTypeError::new_err(
"Only RSA private keys are supported at this time.",
),
));
}
Ok(())
}
pub(crate) fn symmetric_decrypt(
py: pyo3::Python<'_>,
algorithm: pyo3::Bound<'_, pyo3::PyAny>,
mode: pyo3::Bound<'_, pyo3::PyAny>,
data: &[u8],
) -> CryptographyResult<Vec<u8>> {
let block_size = algorithm
.getattr(pyo3::intern!(py, "block_size"))?
.extract()?;
let mut cipher =
ciphers::CipherContext::new(py, algorithm, mode, openssl::symm::Mode::Decrypt)?;
// Decrypt the data
let mut decrypted_data = vec![0; data.len() + (block_size / 8)];
let count = cipher.update_into(py, data, &mut decrypted_data)?;
let final_block = cipher.finalize(py)?;
assert!(final_block.as_bytes().is_empty());
decrypted_data.truncate(count);
// Unpad the data
let mut unpadder = PKCS7UnpaddingContext::new(block_size);
let unpadded_first_blocks = unpadder.update(py, CffiBuf::from_bytes(py, &decrypted_data))?;
let unpadded_last_block = unpadder.finalize(py)?;
let unpadded_data = [
unpadded_first_blocks.as_bytes(),
unpadded_last_block.as_bytes(),
]
.concat();
Ok(unpadded_data)
}
#[pyo3::pyfunction]
fn sign_and_serialize<'p>(
py: pyo3::Python<'p>,
builder: &pyo3::Bound<'p, pyo3::PyAny>,
encoding: &pyo3::Bound<'p, pyo3::PyAny>,
options: &pyo3::Bound<'p, pyo3::types::PyList>,
) -> CryptographyResult<pyo3::Bound<'p, pyo3::types::PyBytes>> {
let raw_data: CffiBuf<'p> = builder.getattr(pyo3::intern!(py, "_data"))?.extract()?;
let text_mode = options.contains(types::PKCS7_TEXT.get(py)?)?;
let (data_with_header, data_without_header) =
if options.contains(types::PKCS7_BINARY.get(py)?)? {
(
Cow::Borrowed(raw_data.as_bytes()),
Cow::Borrowed(raw_data.as_bytes()),
)
} else {
smime_canonicalize(raw_data.as_bytes(), text_mode)
};
let content_type_bytes = asn1::write_single(&pkcs7::PKCS7_DATA_OID)?;
let now = x509::common::datetime_now(py)?;
let signing_time_bytes = asn1::write_single(&x509::certificate::time_from_datetime(now)?)?;
let smime_cap_bytes = asn1::write_single(&asn1::SequenceOfWriter::new([
// Subset of values OpenSSL provides:
// https://github.com/openssl/openssl/blob/667a8501f0b6e5705fd611d5bb3ca24848b07154/crypto/pkcs7/pk7_smime.c#L150
// removing all the ones that are bad cryptography
asn1::SequenceOfWriter::new([oid::AES_256_CBC_OID]),
asn1::SequenceOfWriter::new([oid::AES_192_CBC_OID]),
asn1::SequenceOfWriter::new([oid::AES_128_CBC_OID]),
]))?;
#[allow(clippy::type_complexity)]
let py_signers: Vec<(
pyo3::PyRef<'p, x509::certificate::Certificate>,
pyo3::Bound<'_, pyo3::PyAny>,
pyo3::Bound<'_, pyo3::PyAny>,
pyo3::Bound<'_, pyo3::PyAny>,
)> = builder.getattr(pyo3::intern!(py, "_signers"))?.extract()?;
let py_certs: Vec<pyo3::PyRef<'p, x509::certificate::Certificate>> = builder
.getattr(pyo3::intern!(py, "_additional_certs"))?
.extract()?;
let mut signer_infos = vec![];
let mut digest_algs = vec![];
let mut certs = py_certs
.iter()
.map(|p| p.raw.borrow_dependent().clone())
.collect::<Vec<_>>();
let ka_vec = cryptography_keepalive::KeepAlive::new();
let ka_bytes = cryptography_keepalive::KeepAlive::new();
for (cert, py_private_key, py_hash_alg, rsa_padding) in py_signers.iter() {
let (authenticated_attrs, signature) =
if options.contains(&types::PKCS7_NO_ATTRIBUTES.get(py)?)? {
(
None,
x509::sign::sign_data(
py,
py_private_key.clone(),
py_hash_alg.clone(),
rsa_padding.clone(),
&data_with_header,
)?,
)
} else {
let mut authenticated_attrs = vec![
Attribute {
type_id: PKCS7_CONTENT_TYPE_OID,
values: common::Asn1ReadableOrWritable::new_write(asn1::SetOfWriter::new(
[asn1::parse_single(&content_type_bytes).unwrap()],
)),
},
Attribute {
type_id: PKCS7_SIGNING_TIME_OID,
values: common::Asn1ReadableOrWritable::new_write(asn1::SetOfWriter::new(
[asn1::parse_single(&signing_time_bytes).unwrap()],
)),
},
];
let digest = x509::ocsp::hash_data(py, py_hash_alg, &data_with_header)?;
let digest_wrapped = ka_vec.add(asn1::write_single(&digest.as_bytes())?);
authenticated_attrs.push(Attribute {
type_id: PKCS7_MESSAGE_DIGEST_OID,
values: common::Asn1ReadableOrWritable::new_write(asn1::SetOfWriter::new([
asn1::parse_single(digest_wrapped).unwrap(),
])),
});
if !options.contains(types::PKCS7_NO_CAPABILITIES.get(py)?)? {
authenticated_attrs.push(Attribute {
type_id: PKCS7_SMIME_CAP_OID,
values: common::Asn1ReadableOrWritable::new_write(asn1::SetOfWriter::new(
[asn1::parse_single(&smime_cap_bytes).unwrap()],
)),
});
}
let signed_data =
asn1::write_single(&asn1::SetOfWriter::new(authenticated_attrs.as_slice()))?;
(
Some(common::Asn1ReadableOrWritable::new_write(
asn1::SetOfWriter::new(authenticated_attrs),
)),
x509::sign::sign_data(
py,
py_private_key.clone(),
py_hash_alg.clone(),
rsa_padding.clone(),
&signed_data,
)?,
)
};
let digest_alg = x509::ocsp::HASH_NAME_TO_ALGORITHM_IDENTIFIERS[&*py_hash_alg
.getattr(pyo3::intern!(py, "name"))?
.extract::<pyo3::pybacked::PyBackedStr>()?]
.clone();
// Technically O(n^2), but no one will have that many signers.
if !digest_algs.contains(&digest_alg) {
digest_algs.push(digest_alg.clone());
}
certs.push(cert.raw.borrow_dependent().clone());
signer_infos.push(pkcs7::SignerInfo {
version: 1,
issuer_and_serial_number: pkcs7::IssuerAndSerialNumber {
issuer: cert.raw.borrow_dependent().tbs_cert.issuer.clone(),
serial_number: cert.raw.borrow_dependent().tbs_cert.serial,
},
digest_algorithm: digest_alg,
authenticated_attributes: authenticated_attrs,
digest_encryption_algorithm: compute_pkcs7_signature_algorithm(
py,
py_private_key.clone(),
py_hash_alg.clone(),
rsa_padding.clone(),
)?,
encrypted_digest: ka_bytes.add(signature),
unauthenticated_attributes: None,
});
}
let data_tlv_bytes;
let content = if options.contains(types::PKCS7_DETACHED_SIGNATURE.get(py)?)? {
None
} else {
data_tlv_bytes = asn1::write_single(&data_with_header.deref())?;
Some(asn1::parse_single(&data_tlv_bytes).unwrap())
};
let signed_data = pkcs7::SignedData {
version: 1,
digest_algorithms: common::Asn1ReadableOrWritable::new_write(asn1::SetOfWriter::new(
&digest_algs,
)),
content_info: pkcs7::ContentInfo {
_content_type: asn1::DefinedByMarker::marker(),
content: pkcs7::Content::Data(content.map(asn1::Explicit::new)),
},
certificates: if options.contains(types::PKCS7_NO_CERTS.get(py)?)? {
None
} else {
Some(common::Asn1ReadableOrWritable::new_write(
asn1::SetOfWriter::new(&certs),
))
},
crls: None,
signer_infos: common::Asn1ReadableOrWritable::new_write(asn1::SetOfWriter::new(
&signer_infos,
)),
};
let content_info = pkcs7::ContentInfo {
_content_type: asn1::DefinedByMarker::marker(),
content: pkcs7::Content::SignedData(asn1::Explicit::new(Box::new(signed_data))),
};
let ci_bytes = asn1::write_single(&content_info)?;
if encoding.is(&types::ENCODING_SMIME.get(py)?) {
let mic_algs = digest_algs
.iter()
.map(|d| OIDS_TO_MIC_NAME[&d.oid()])
.collect::<Vec<_>>()
.join(",");
Ok(types::SMIME_SIGNED_ENCODE
.get(py)?
.call1((&*data_without_header, &*ci_bytes, mic_algs, text_mode))?
.extract()?)
} else {
// Handles the DER, PEM, and error cases
encode_der_data(py, "PKCS7".to_string(), ci_bytes, encoding)
}
}
fn compute_pkcs7_signature_algorithm<'p>(
py: pyo3::Python<'p>,
private_key: pyo3::Bound<'p, pyo3::PyAny>,
hash_algorithm: pyo3::Bound<'p, pyo3::PyAny>,
rsa_padding: pyo3::Bound<'p, pyo3::PyAny>,
) -> pyo3::PyResult<common::AlgorithmIdentifier<'static>> {
let key_type = x509::sign::identify_key_type(py, private_key.clone())?;
let has_pss_padding = rsa_padding.is_instance(&types::PSS.get(py)?)?;
// For RSA signatures (with no PSS padding), the OID is always the same no matter the
// digest algorithm. See RFC 3370 (section 3.2).
if key_type == x509::sign::KeyType::Rsa && !has_pss_padding {
Ok(common::AlgorithmIdentifier {
oid: asn1::DefinedByMarker::marker(),
params: common::AlgorithmParameters::Rsa(Some(())),
})
} else {
x509::sign::compute_signature_algorithm(py, private_key, hash_algorithm, rsa_padding)
}
}
fn smime_canonicalize(data: &[u8], text_mode: bool) -> (Cow<'_, [u8]>, Cow<'_, [u8]>) {
let mut new_data_with_header = vec![];
let mut new_data_without_header = vec![];
if text_mode {
new_data_with_header.extend_from_slice(b"Content-Type: text/plain\r\n\r\n");
}
let mut last_idx = 0;
for (i, c) in data.iter().copied().enumerate() {
if c == b'\n' && (i == 0 || data[i - 1] != b'\r') {
new_data_with_header.extend_from_slice(&data[last_idx..i]);
new_data_with_header.push(b'\r');
new_data_with_header.push(b'\n');
new_data_without_header.extend_from_slice(&data[last_idx..i]);
new_data_without_header.push(b'\r');
new_data_without_header.push(b'\n');
last_idx = i + 1;
}
}
// If there's stuff in new_data, that means we need to copy the rest of
// data over.
if !new_data_with_header.is_empty() {
new_data_with_header.extend_from_slice(&data[last_idx..]);
new_data_without_header.extend_from_slice(&data[last_idx..]);
(
Cow::Owned(new_data_with_header),
Cow::Owned(new_data_without_header),
)
} else {
(Cow::Borrowed(data), Cow::Borrowed(data))
}
}
#[cfg(not(CRYPTOGRAPHY_IS_BORINGSSL))]
fn load_pkcs7_certificates(
py: pyo3::Python<'_>,
pkcs7: Pkcs7,
) -> CryptographyResult<pyo3::Bound<'_, pyo3::types::PyList>> {
let nid = pkcs7.type_().map(|t| t.nid());
if nid != Some(openssl::nid::Nid::PKCS7_SIGNED) {
let nid_string = nid.map_or("empty".to_string(), |n| n.as_raw().to_string());
return Err(CryptographyError::from(
exceptions::UnsupportedAlgorithm::new_err((
format!("Only basic signed structures are currently supported. NID for this data was {nid_string}"),
exceptions::Reasons::UNSUPPORTED_SERIALIZATION,
)),
));
}
let signed_certificates = pkcs7.signed().and_then(|x| x.certificates());
match signed_certificates {
None => Err(CryptographyError::from(
pyo3::exceptions::PyValueError::new_err(
"The provided PKCS7 has no certificate data, but a cert loading method was called.",
),
)),
Some(certificates) => {
let result = pyo3::types::PyList::empty(py);
for c in certificates {
let cert_der = pyo3::types::PyBytes::new(py, c.to_der()?.as_slice()).unbind();
let cert = load_der_x509_certificate(py, cert_der, None)?;
result.append(cert)?;
}
Ok(result)
}
}
}
#[pyo3::pyfunction]
fn load_pem_pkcs7_certificates<'p>(
py: pyo3::Python<'p>,
data: &[u8],
) -> CryptographyResult<pyo3::Bound<'p, pyo3::types::PyList>> {
cfg_if::cfg_if! {
if #[cfg(not(CRYPTOGRAPHY_IS_BORINGSSL))] {
let pkcs7_decoded = openssl::pkcs7::Pkcs7::from_pem(data).map_err(|_| {
CryptographyError::from(pyo3::exceptions::PyValueError::new_err(
"Unable to parse PKCS7 data",
))
})?;
load_pkcs7_certificates(py, pkcs7_decoded)
} else {
let _ = py;
let _ = data;
Err(CryptographyError::from(
exceptions::UnsupportedAlgorithm::new_err((
"PKCS#7 is not supported by this backend.",
exceptions::Reasons::UNSUPPORTED_SERIALIZATION,
)),
))
}
}
}
#[pyo3::pyfunction]
fn load_der_pkcs7_certificates<'p>(
py: pyo3::Python<'p>,
data: &[u8],
) -> CryptographyResult<pyo3::Bound<'p, pyo3::types::PyList>> {
cfg_if::cfg_if! {
if #[cfg(not(CRYPTOGRAPHY_IS_BORINGSSL))] {
let pkcs7_decoded = openssl::pkcs7::Pkcs7::from_der(data).map_err(|_| {
CryptographyError::from(pyo3::exceptions::PyValueError::new_err(
"Unable to parse PKCS7 data",
))
})?;
load_pkcs7_certificates(py, pkcs7_decoded)
} else {
let _ = py;
let _ = data;
Err(CryptographyError::from(
exceptions::UnsupportedAlgorithm::new_err((
"PKCS#7 is not supported by this backend.",
exceptions::Reasons::UNSUPPORTED_SERIALIZATION,
)),
))
}
}
}
#[pyo3::pymodule]
#[pyo3(name = "pkcs7")]
pub(crate) mod pkcs7_mod {
#[pymodule_export]
use super::{
decrypt_der, decrypt_pem, decrypt_smime, encrypt_and_serialize,
load_der_pkcs7_certificates, load_pem_pkcs7_certificates, serialize_certificates,
sign_and_serialize,
};
}
#[cfg(test)]
mod tests {
use std::borrow::Cow;
use std::ops::Deref;
use super::smime_canonicalize;
#[test]
fn test_smime_canonicalize() {
for (
input,
text_mode,
expected_with_header,
expected_without_header,
expected_is_borrowed,
) in [
// Values with text_mode=false
(b"" as &[u8], false, b"" as &[u8], b"" as &[u8], true),
(b"\n", false, b"\r\n", b"\r\n", false),
(b"abc", false, b"abc", b"abc", true),
(
b"abc\r\ndef\n",
false,
b"abc\r\ndef\r\n",
b"abc\r\ndef\r\n",
false,
),
(b"abc\r\n", false, b"abc\r\n", b"abc\r\n", true),
(
b"abc\ndef\n",
false,
b"abc\r\ndef\r\n",
b"abc\r\ndef\r\n",
false,
),
// Values with text_mode=true
(b"", true, b"Content-Type: text/plain\r\n\r\n", b"", false),
(
b"abc",
true,
b"Content-Type: text/plain\r\n\r\nabc",
b"abc",
false,
),
(
b"abc\n",
true,
b"Content-Type: text/plain\r\n\r\nabc\r\n",
b"abc\r\n",
false,
),
] {
let (result_with_header, result_without_header) = smime_canonicalize(input, text_mode);
assert_eq!(result_with_header.deref(), expected_with_header);
assert_eq!(result_without_header.deref(), expected_without_header);
assert_eq!(
matches!(result_with_header, Cow::Borrowed(_)),
expected_is_borrowed
);
assert_eq!(
matches!(result_without_header, Cow::Borrowed(_)),
expected_is_borrowed
);
}
}
}