1

Add secret management script

This commit is contained in:
Evan Pratten 2024-02-29 12:59:47 -05:00
parent 56e43e1e16
commit 5302c41608
2 changed files with 183 additions and 1 deletions

182
scripts/ewp-secrets Executable file
View File

@ -0,0 +1,182 @@
#! /usr/bin/env python3
"""Evan's Secrets tool
This aims to wrap the different secret management tools used on systems I work with.
For now, this only targets `secret-tool`, but I plan to add more in the future.
"""
import argparse
import sys
import logging
import shutil
import subprocess
import sqlite3
from pathlib import Path
from typing import Optional
from abc import ABC, abstractmethod
logger = logging.getLogger(__name__)
__all__ = ["EwpSecrets"]
class __SecretManager(ABC):
@abstractmethod
def runs_on_this_system(self) -> bool: ...
@abstractmethod
def store(self, namespace: str, key: str, secret: str): ...
@abstractmethod
def load(self, namespace: str, key: str) -> Optional[str]: ...
class GnomeKeyringSM(__SecretManager):
def runs_on_this_system(self) -> bool:
return shutil.which("secret-tool") is not None
def store(self, namespace: str, key: str, secret: str):
process = subprocess.Popen(
[
"secret-tool",
"store",
"--label",
"Secret stored by ewp-secrets",
namespace,
key,
],
stdin=subprocess.PIPE,
)
process.communicate(input=secret.encode())
process.wait()
def load(self, namespace: str, key: str) -> Optional[str]:
try:
process = subprocess.run(
["secret-tool", "lookup", namespace, key],
check=True,
capture_output=True,
)
return process.stdout.decode()
except subprocess.CalledProcessError:
return None
class FilesystemSM(__SecretManager):
def __init__(
self,
storage_path: Path = Path("~/.config/ewp-secrets/storage.sqlite3").expanduser(),
):
# If the file doesn't exist, create it and restrict access
if not storage_path.exists():
storage_path.parent.mkdir(parents=True, exist_ok=True)
storage_path.touch()
storage_path.chmod(0o600)
self.conn = sqlite3.connect(storage_path)
self.conn.execute(
"""CREATE TABLE IF NOT EXISTS secrets (
namespace TEXT,
key TEXT,
secret TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
accessed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (namespace, key)
)
"""
)
def runs_on_this_system(self) -> bool:
return True
def store(self, namespace: str, key: str, secret: str):
self.conn.execute(
"""INSERT INTO secrets (namespace, key, secret)
VALUES (?, ?, ?)
ON CONFLICT (namespace, key) DO UPDATE SET
(secret, updated_at) = (?, CURRENT_TIMESTAMP)
""",
(namespace, key, secret, secret),
)
def load(self, namespace: str, key: str) -> Optional[str]:
cursor = self.conn.execute(
"""SELECT secret FROM secrets WHERE namespace = ? AND key = ?""",
(namespace, key),
)
result = cursor.fetchone()
if result:
self.conn.execute(
"""UPDATE secrets SET accessed_at = CURRENT_TIMESTAMP
WHERE namespace = ? AND key = ?""",
(namespace, key),
)
return result[0]
return None
class EwpSecrets:
def __init__(self):
all_secret_managers = [GnomeKeyringSM(), FilesystemSM()]
self.secret_managers = [
sm for sm in all_secret_managers if sm.runs_on_this_system()
]
assert self.secret_managers, "No secret managers available on this system"
def store(self, namespace: str, key: str, secret: str):
# Only write to the first (best) secret manager
self.secret_managers[0].store(namespace, key, secret)
def load(self, namespace: str, key: str) -> Optional[str]:
# Try to read from each secret manager until we find the secret
for sm in self.secret_managers:
secret = sm.load(namespace, key)
if secret:
return secret
return None
def main() -> int:
# Handle program arguments
ap = argparse.ArgumentParser(
prog="ewp-secrets", description="Store and load secrets"
)
ap.add_argument("action", help="Action to perform", choices=["store", "load"])
ap.add_argument(
"-n", "--namespace", help="Namespace to store secrets in", required=True
)
ap.add_argument("-k", "--key", help="Key to store secret under")
ap.add_argument(
"-v", "--verbose", help="Enable verbose logging", action="store_true"
)
args = ap.parse_args()
# Configure logging
logging.basicConfig(
level=logging.DEBUG if args.verbose else logging.INFO,
format="%(levelname)s: %(message)s",
)
# Access the secret manager
secrets = EwpSecrets()
# Perform the requested action
if args.action == "store":
secret = input("Enter the secret: ")
secrets.store(args.namespace, args.key, secret)
return 0
elif args.action == "load":
secret = secrets.load(args.namespace, args.key)
if secret:
print(secret)
return 0
else:
print("No secret found", file=sys.stderr)
return 1
if __name__ == "__main__":
sys.exit(main())

View File

@ -27,7 +27,7 @@ def main() -> int:
)
ap.add_argument(
"--password",
help="Password to use for sending the email. Overrides the password in ~/.netrc",
help="Password to use for sending the email.",
)
args = ap.parse_args()