sequoia-sq/tests/sq-key-subkey.rs
Neal H. Walfield 1817c305ae
Make helper function more generic.
- Change `compare_notations` from taking a slice containing two
    elements to taking a slice taking any number of elements.
2024-06-12 16:49:26 +02:00

644 lines
22 KiB
Rust

use assert_cmd::Command;
use tempfile::TempDir;
use chrono::Duration;
use sequoia_openpgp as openpgp;
use openpgp::packet::Key;
use openpgp::parse::Parse;
use openpgp::types::KeyFlags;
use openpgp::types::ReasonForRevocation;
use openpgp::types::RevocationStatus;
use openpgp::types::SignatureType;
use openpgp::Cert;
use openpgp::Result;
mod common;
use common::compare_notations;
use common::sq_key_generate;
use common::STANDARD_POLICY;
#[test]
fn sq_key_subkey() -> Result<()> {
let (tmpdir, cert_path, _) = sq_key_generate(None).unwrap();
let modified_cert_path = cert_path.parent().unwrap().join("new_key.pgp");
let cert = Cert::from_file(&cert_path)?;
for (arg, expected_key_flags, expected_count) in [
("--can-authenticate", KeyFlags::empty().set_authentication(), 2),
("--can-encrypt=universal", KeyFlags::empty().set_transport_encryption(), 2),
("--can-encrypt=universal", KeyFlags::empty().set_storage_encryption(), 2),
("--can-sign", KeyFlags::empty().set_signing(), 2),
] {
for keystore in [false, true] {
let home = TempDir::new().unwrap();
let home = home.path().display().to_string();
if keystore {
// When using the keystore, we need to import the key.
let mut cmd = Command::cargo_bin("sq")?;
cmd.args([
"--home", &home,
"key",
"import",
&cert_path.display().to_string(),
]);
let output = cmd.output()?;
if !output.status.success() {
panic!(
"sq exited with non-zero status code: {}",
String::from_utf8(output.stderr)?
);
}
}
// Add the subkey.
let mut cmd = Command::cargo_bin("sq")?;
cmd.args([
"--home", &home,
"key",
"subkey",
"add",
arg,
]);
if keystore {
cmd.args([
"--cert", &cert.fingerprint().to_string(),
]);
} else {
cmd.args([
"--force",
"--output",
&modified_cert_path.to_string_lossy(),
"--cert-file", &cert_path.to_string_lossy(),
]);
}
cmd.assert().success();
if keystore {
// When using the keystore, we need to export the
// modified certificate.
let mut cmd = Command::cargo_bin("sq")?;
cmd.args([
"--home", &home,
"cert",
"export",
"--cert", &cert.fingerprint().to_string(),
]);
let output = cmd.output()?;
if !output.status.success() {
panic!(
"sq exited with non-zero status code: {}",
String::from_utf8(output.stderr)?
);
}
std::fs::write(&modified_cert_path, &output.stdout)
.expect(&format!("Writing {}", &modified_cert_path.display()));
}
let cert = Cert::from_file(&modified_cert_path)?;
let valid_cert = cert.with_policy(STANDARD_POLICY, None)?;
assert_eq!(
valid_cert.keys().key_flags(&expected_key_flags).count(),
expected_count
);
}
}
tmpdir.close()?;
Ok(())
}
#[test]
fn sq_key_subkey_revoke() -> Result<()> {
let (tmpdir, cert_path, time) = sq_key_generate(None)?;
let cert_path = cert_path.display().to_string();
let cert = Cert::from_file(&cert_path)?;
let valid_cert = cert.with_policy(STANDARD_POLICY, Some(time.into()))?;
let fingerprint = valid_cert.clone().fingerprint();
let subkey: Key<_, _> = valid_cert
.with_policy(STANDARD_POLICY, Some(time.into()))
.unwrap()
.keys()
.subkeys()
.nth(0)
.unwrap()
.key()
.clone();
let subkey_fingerprint = subkey.fingerprint();
let message = "message";
// revoke for various reasons, with or without notations added, or with
// a revocation whose reference time is one hour after the creation of the
// certificate
for (reason, reason_str, notations, revocation_time) in [
(
ReasonForRevocation::KeyCompromised,
"compromised",
&[][..],
None,
),
(
ReasonForRevocation::KeyCompromised,
"compromised",
&[][..],
Some(time + Duration::hours(1)),
),
(
ReasonForRevocation::KeyCompromised,
"compromised",
&[("foo", "bar"), ("hallo@sequoia-pgp.org", "VALUE")][..],
None,
),
(ReasonForRevocation::KeyRetired, "retired", &[][..], None),
(
ReasonForRevocation::KeyRetired,
"retired",
&[][..],
Some(time + Duration::hours(1)),
),
(
ReasonForRevocation::KeyRetired,
"retired",
&[("foo", "bar"), ("hallo@sequoia-pgp.org", "VALUE")][..],
None,
),
(ReasonForRevocation::KeySuperseded, "superseded", &[][..], None),
(
ReasonForRevocation::KeySuperseded,
"superseded",
&[][..],
Some(time + Duration::hours(1)),
),
(
ReasonForRevocation::KeySuperseded,
"superseded",
&[("foo", "bar"), ("hallo@sequoia-pgp.org", "VALUE")][..],
None,
),
(ReasonForRevocation::Unspecified, "unspecified", &[][..], None),
(
ReasonForRevocation::Unspecified,
"unspecified",
&[][..],
Some(time + Duration::hours(1)),
),
(
ReasonForRevocation::Unspecified,
"unspecified",
&[("foo", "bar"), ("hallo@sequoia-pgp.org", "VALUE")][..],
None,
),
] {
eprintln!("==========================");
eprintln!("reason: {}, message: {}, notations: {:?}, time: {:?}",
reason, reason_str, notations, revocation_time);
for keystore in [false, true].into_iter() {
eprintln!("--------------------------");
eprintln!("keystore: {}", keystore);
let home = TempDir::new().unwrap();
let home = home.path().display().to_string();
let revocation = &tmpdir.path().join(format!(
"revocation_{}_{}_{}.rev",
reason_str,
if notations.is_empty() {
"no_notations"
} else {
"notations"
},
if revocation_time.is_some() {
"time"
} else {
"no_time"
}
));
if keystore {
// When using the keystore, we need to import the key.
let mut cmd = Command::cargo_bin("sq")?;
cmd.args([
"--home", &home,
"key",
"import",
&cert_path,
]);
let output = cmd.output()?;
if !output.status.success() {
panic!(
"sq exited with non-zero status code: {}",
String::from_utf8(output.stderr)?
);
}
}
let mut cmd = Command::cargo_bin("sq")?;
cmd.args([
"--home", &home,
"key",
"subkey",
"revoke",
&subkey_fingerprint.to_string(),
reason_str,
message,
]);
if keystore {
cmd.args([
"--cert", &cert.fingerprint().to_string(),
]);
} else {
cmd.args([
"--output",
&revocation.to_string_lossy(),
"--cert-file",
&cert_path,
]);
}
for (k, v) in notations {
cmd.args(["--notation", k, v]);
}
if let Some(time) = revocation_time {
cmd.args([
"--time",
&time.format("%Y-%m-%dT%H:%M:%SZ").to_string(),
]);
}
let output = cmd.output()?;
if !output.status.success() {
panic!("sq exited with non-zero status code: {:?}", output.stderr);
}
if keystore {
// When using the keystore, we need to export the
// revoked certificate.
let mut cmd = Command::cargo_bin("sq")?;
cmd.args([
"--home", &home,
"cert",
"export",
"--cert", &cert.fingerprint().to_string(),
]);
let output = cmd.output()?;
if !output.status.success() {
panic!(
"sq exited with non-zero status code: {}",
String::from_utf8(output.stderr)?
);
}
std::fs::write(&revocation, &output.stdout)
.expect(&format!("Writing {}", &revocation.display()));
}
// whether we found a revocation signature
let mut found_revoked = false;
// read revocation cert
let rev = Cert::from_file(&revocation)?;
assert!(! rev.is_tsk());
// and merge it into the certificate.
let cert = cert.clone().merge_public(rev)?;
let valid_cert =
cert.with_policy(STANDARD_POLICY, revocation_time.map(Into::into))?;
valid_cert
.with_policy(STANDARD_POLICY, revocation_time.map(Into::into))
.unwrap()
.keys()
.subkeys()
.for_each(|x| {
if x.fingerprint() == subkey_fingerprint {
let status = x.revocation_status(
STANDARD_POLICY,
revocation_time.map(Into::into),
);
// the subkey is revoked
assert!(matches!(status, RevocationStatus::Revoked(_)));
if let RevocationStatus::Revoked(sigs) = status {
// there is only one signature packet
assert_eq!(sigs.len(), 1);
let sig = sigs.into_iter().next().unwrap();
// it is a subkey revocation
assert_eq!(sig.typ(), SignatureType::SubkeyRevocation);
// the issuer is the certificate owner
assert_eq!(
sig.get_issuers().into_iter().next().as_ref(),
Some(&fingerprint.clone().into())
);
// our reason for revocation and message matches
assert_eq!(
sig.reason_for_revocation(),
Some((reason, message.as_bytes()))
);
// the notations of the revocation match the ones
// we passed in
assert!(compare_notations(sig, notations).is_ok());
found_revoked = true;
}
}
});
if !found_revoked {
panic!("the revoked subkey is not found in the revocation cert");
}
}
}
tmpdir.close()?;
Ok(())
}
#[test]
fn sq_key_subkey_revoke_thirdparty() -> Result<()> {
let (tmpdir, cert_path, time) = sq_key_generate(None)?;
let cert_path = cert_path.display().to_string();
let cert = Cert::from_file(&cert_path)?;
let valid_cert = cert.with_policy(STANDARD_POLICY, Some(time.into()))?;
let subkey: Key<_, _> = valid_cert
.with_policy(STANDARD_POLICY, Some(time.into()))
.unwrap()
.keys()
.subkeys()
.nth(0)
.unwrap()
.key()
.clone();
let subkey_fingerprint = subkey.fingerprint();
let (thirdparty_tmpdir, thirdparty_path, thirdparty_time) =
sq_key_generate(Some(&["bob <bob@example.org>"]))?;
let thirdparty_path = thirdparty_path.display().to_string();
let thirdparty_cert = Cert::from_file(&thirdparty_path)?;
let thirdparty_valid_cert = thirdparty_cert
.with_policy(STANDARD_POLICY, Some(thirdparty_time.into()))?;
let thirdparty_fingerprint = thirdparty_valid_cert.clone().fingerprint();
let message = "message";
// revoke for various reasons, with or without notations added, or with
// a revocation whose reference time is one hour after the creation of the
// certificate
for (reason, reason_str, notations, revocation_time) in [
(
ReasonForRevocation::KeyCompromised,
"compromised",
&[][..],
None,
),
(
ReasonForRevocation::KeyCompromised,
"compromised",
&[][..],
Some(thirdparty_time + Duration::hours(1)),
),
(
ReasonForRevocation::KeyCompromised,
"compromised",
&[("foo", "bar"), ("hallo@sequoia-pgp.org", "VALUE")][..],
None,
),
(ReasonForRevocation::KeyRetired, "retired", &[][..], None),
(
ReasonForRevocation::KeyRetired,
"retired",
&[][..],
Some(thirdparty_time + Duration::hours(1)),
),
(
ReasonForRevocation::KeyRetired,
"retired",
&[("foo", "bar"), ("hallo@sequoia-pgp.org", "VALUE")][..],
None,
),
(ReasonForRevocation::KeySuperseded, "superseded", &[][..], None),
(
ReasonForRevocation::KeySuperseded,
"superseded",
&[][..],
Some(thirdparty_time + Duration::hours(1)),
),
(
ReasonForRevocation::KeySuperseded,
"superseded",
&[("foo", "bar"), ("hallo@sequoia-pgp.org", "VALUE")][..],
None,
),
(ReasonForRevocation::Unspecified, "unspecified", &[][..], None),
(
ReasonForRevocation::Unspecified,
"unspecified",
&[][..],
Some(thirdparty_time + Duration::hours(1)),
),
(
ReasonForRevocation::Unspecified,
"unspecified",
&[("foo", "bar"), ("hallo@sequoia-pgp.org", "VALUE")][..],
None,
),
] {
for keystore in [false, true].into_iter() {
let home = TempDir::new().unwrap();
let home = home.path().display().to_string();
let revocation = &tmpdir.path().join(format!(
"revocation_{}_{}_{}.rev",
reason_str,
if ! notations.is_empty() {
"no_notations"
} else {
"notations"
},
if revocation_time.is_some() {
"time"
} else {
"no_time"
}
));
if keystore {
// When using the keystore, we need to import the key.
for path in &[ &cert_path, &thirdparty_path ] {
let mut cmd = Command::cargo_bin("sq")?;
cmd.args([
"--home", &home,
"key",
"import",
&path,
]);
let output = cmd.output()?;
if !output.status.success() {
panic!(
"sq exited with non-zero status code: {}",
String::from_utf8(output.stderr)?
);
}
}
}
let mut cmd = Command::cargo_bin("sq")?;
cmd.args([
"--home", &home,
"key",
"subkey",
"revoke",
&subkey_fingerprint.to_string(),
reason_str,
message,
]);
if keystore {
cmd.args([
"--cert", &cert.fingerprint().to_string(),
"--revoker", &thirdparty_cert.fingerprint().to_string(),
]);
} else {
cmd.args([
"--output",
&revocation.to_string_lossy(),
"--cert-file",
&cert_path,
"--revoker-file",
&thirdparty_path,
]);
}
for (k, v) in notations {
cmd.args(["--notation", k, v]);
}
if let Some(time) = revocation_time {
cmd.args([
"--time",
&time.format("%Y-%m-%dT%H:%M:%SZ").to_string(),
]);
}
let output = cmd.output()?;
if !output.status.success() {
panic!("sq exited with non-zero status code: {}",
String::from_utf8_lossy(&output.stderr));
}
if keystore {
// When using the keystore, we need to export the
// revoked certificate.
let mut cmd = Command::cargo_bin("sq")?;
cmd.args([
"--home", &home,
"cert",
"export",
"--cert", &cert.fingerprint().to_string(),
]);
let output = cmd.output()?;
if !output.status.success() {
panic!(
"sq exited with non-zero status code: {}",
String::from_utf8(output.stderr)?
);
}
std::fs::write(&revocation, &output.stdout)
.expect(&format!("Writing {}", &revocation.display()));
}
// read revocation cert
let rev = Cert::from_file(&revocation)?;
assert!(! rev.is_tsk());
// and merge it into the certificate.
let cert = cert.clone().merge_public(rev)?;
let valid_cert =
cert.with_policy(STANDARD_POLICY, revocation_time.map(Into::into))?;
// whether we found a revocation signature
let mut found_revoked = false;
assert_eq!(valid_cert.userids().count(), 1);
valid_cert
.with_policy(STANDARD_POLICY, revocation_time.map(Into::into))
.unwrap()
.keys()
.subkeys()
.for_each(|x| {
if x.fingerprint() == subkey_fingerprint {
if let RevocationStatus::CouldBe(sigs) = x
.revocation_status(
STANDARD_POLICY,
revocation_time.map(Into::into),
)
{
// there is only one signature packet
assert_eq!(sigs.len(), 1);
let sig = sigs.into_iter().next().unwrap();
// it is a subkey revocation
assert_eq!(sig.typ(), SignatureType::SubkeyRevocation);
// the issuer is a thirdparty revoker
assert_eq!(
sig.get_issuers().into_iter().next().as_ref(),
Some(&thirdparty_fingerprint.clone().into())
);
// the revocation can be verified
if sig
.clone()
.verify_subkey_revocation(
&thirdparty_cert.primary_key(),
&cert.primary_key(),
&subkey,
)
.is_err()
{
panic!("revocation is not valid")
}
// our reason for revocation and message matches
assert_eq!(
sig.reason_for_revocation(),
Some((reason, message.as_bytes()))
);
// the notations of the revocation match the ones
// we passed in
assert!(compare_notations(sig, notations).is_ok());
found_revoked = true;
} else {
panic!("there are no signatures in {:?}", x);
}
}
});
if !found_revoked {
panic!("the revoked subkey is not found in the revocation cert");
}
}
}
tmpdir.close()?;
thirdparty_tmpdir.close()?;
Ok(())
}