190 lines
5.7 KiB
Python
Executable File
190 lines
5.7 KiB
Python
Executable File
#! /usr/bin/env python3
|
|
"""Evan's Secrets tool
|
|
|
|
This aims to wrap the different secret management tools used on systems I work with.
|
|
For now, this only targets `secret-tool`, but I plan to add more in the future.
|
|
"""
|
|
|
|
import argparse
|
|
import sys
|
|
import logging
|
|
import shutil
|
|
import subprocess
|
|
import sqlite3
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
from abc import ABC, abstractmethod
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
__all__ = ["EwpSecrets"]
|
|
|
|
|
|
class __SecretManager(ABC):
|
|
@abstractmethod
|
|
def runs_on_this_system(self) -> bool: ...
|
|
|
|
@abstractmethod
|
|
def store(self, namespace: str, key: str, secret: str): ...
|
|
|
|
@abstractmethod
|
|
def load(self, namespace: str, key: str) -> Optional[str]: ...
|
|
|
|
def is_secure(self) -> bool:
|
|
return True
|
|
|
|
|
|
class GnomeKeyringSM(__SecretManager):
|
|
|
|
def runs_on_this_system(self) -> bool:
|
|
return shutil.which("secret-tool") is not None
|
|
|
|
def store(self, namespace: str, key: str, secret: str):
|
|
process = subprocess.Popen(
|
|
[
|
|
"secret-tool",
|
|
"store",
|
|
"--label",
|
|
"Secret stored by ewp-secrets",
|
|
namespace,
|
|
key,
|
|
],
|
|
stdin=subprocess.PIPE,
|
|
)
|
|
process.communicate(input=secret.encode())
|
|
process.wait()
|
|
|
|
def load(self, namespace: str, key: str) -> Optional[str]:
|
|
try:
|
|
process = subprocess.run(
|
|
["secret-tool", "lookup", namespace, key],
|
|
check=True,
|
|
capture_output=True,
|
|
)
|
|
return process.stdout.decode()
|
|
except subprocess.CalledProcessError:
|
|
return None
|
|
|
|
|
|
class FilesystemSM(__SecretManager):
|
|
|
|
def __init__(
|
|
self,
|
|
storage_path: Path = Path("~/.config/ewp-secrets/storage.sqlite3").expanduser(),
|
|
):
|
|
# If the file doesn't exist, create it and restrict access
|
|
if not storage_path.exists():
|
|
storage_path.parent.mkdir(parents=True, exist_ok=True)
|
|
storage_path.touch()
|
|
storage_path.chmod(0o600)
|
|
|
|
self.conn = sqlite3.connect(storage_path)
|
|
self.conn.execute(
|
|
"""CREATE TABLE IF NOT EXISTS secrets (
|
|
namespace TEXT,
|
|
key TEXT,
|
|
secret TEXT,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
accessed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
PRIMARY KEY (namespace, key)
|
|
)
|
|
"""
|
|
)
|
|
|
|
def runs_on_this_system(self) -> bool:
|
|
return True
|
|
|
|
def store(self, namespace: str, key: str, secret: str):
|
|
self.conn.execute(
|
|
"""INSERT INTO secrets (namespace, key, secret)
|
|
VALUES (?, ?, ?)
|
|
ON CONFLICT (namespace, key) DO UPDATE SET
|
|
(secret, updated_at) = (?, CURRENT_TIMESTAMP)
|
|
""",
|
|
(namespace, key, secret, secret),
|
|
)
|
|
|
|
def load(self, namespace: str, key: str) -> Optional[str]:
|
|
cursor = self.conn.execute(
|
|
"""SELECT secret FROM secrets WHERE namespace = ? AND key = ?""",
|
|
(namespace, key),
|
|
)
|
|
result = cursor.fetchone()
|
|
if result:
|
|
self.conn.execute(
|
|
"""UPDATE secrets SET accessed_at = CURRENT_TIMESTAMP
|
|
WHERE namespace = ? AND key = ?""",
|
|
(namespace, key),
|
|
)
|
|
return result[0]
|
|
return None
|
|
|
|
def is_secure(self) -> bool:
|
|
return False
|
|
|
|
class EwpSecrets:
|
|
def __init__(self):
|
|
all_secret_managers = [GnomeKeyringSM(), FilesystemSM()]
|
|
self.secret_managers = [
|
|
sm for sm in all_secret_managers if sm.runs_on_this_system()
|
|
]
|
|
assert self.secret_managers, "No secret managers available on this system"
|
|
|
|
def store(self, namespace: str, key: str, secret: str):
|
|
# Only write to the first (best) secret manager
|
|
self.secret_managers[0].store(namespace, key, secret)
|
|
|
|
def load(self, namespace: str, key: str) -> Optional[str]:
|
|
# Try to read from each secret manager until we find the secret
|
|
for sm in self.secret_managers:
|
|
secret = sm.load(namespace, key)
|
|
if secret:
|
|
return secret
|
|
return None
|
|
|
|
|
|
def main() -> int:
|
|
# Handle program arguments
|
|
ap = argparse.ArgumentParser(
|
|
prog="ewp-secrets", description="Store and load secrets"
|
|
)
|
|
ap.add_argument("action", help="Action to perform", choices=["store", "load"])
|
|
ap.add_argument(
|
|
"-n", "--namespace", help="Namespace to store secrets in", required=True
|
|
)
|
|
ap.add_argument("-k", "--key", help="Key to store secret under" , required=True)
|
|
ap.add_argument(
|
|
"-v", "--verbose", help="Enable verbose logging", action="store_true"
|
|
)
|
|
args = ap.parse_args()
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.DEBUG if args.verbose else logging.INFO,
|
|
format="%(levelname)s: %(message)s",
|
|
)
|
|
|
|
# Access the secret manager
|
|
secrets = EwpSecrets()
|
|
|
|
# Perform the requested action
|
|
if args.action == "store":
|
|
if not secrets.secret_managers[0].is_secure():
|
|
print("Warning: This system does not have a secure way to store secrets", file=sys.stderr)
|
|
secret = input("Enter the secret: ")
|
|
secrets.store(args.namespace, args.key, secret)
|
|
return 0
|
|
elif args.action == "load":
|
|
secret = secrets.load(args.namespace, args.key)
|
|
if secret:
|
|
print(secret)
|
|
return 0
|
|
else:
|
|
print("No secret found", file=sys.stderr)
|
|
return 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|