From 397ac96781bc8549f8ef577147959bedd8e76622 Mon Sep 17 00:00:00 2001 From: Evan Pratten Date: Tue, 23 Apr 2024 10:56:16 -0400 Subject: [PATCH] add 1p bridge script --- scripts/1p-bridge | 126 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 126 insertions(+) create mode 100755 scripts/1p-bridge diff --git a/scripts/1p-bridge b/scripts/1p-bridge new file mode 100755 index 0000000..5ba223d --- /dev/null +++ b/scripts/1p-bridge @@ -0,0 +1,126 @@ +#! /usr/bin/env python3 +"""1password bridge + +This script can expose the `op` cli utility as a web service. +Callers can then pass a 1password service token through an HTTP request, +and are able to read secrets from 1password. + +HTTP routes: + /whoami + GET: Returns the current user's information + /read/ + GET: Reads the item at the specified URI +""" + +import argparse +import sys +import os +import logging +import shutil +import subprocess +import json +from flask import Flask, request, Response +from typing import Optional, List, Tuple + +app = Flask(__name__) +logger = logging.getLogger(__name__) + + +def __parse_service_token() -> Optional[str]: + bearer_token = request.headers.get("Authorization") + if bearer_token is None or not bearer_token.startswith("Bearer "): + return None + return bearer_token[7:] + +def __op_execute(op_cmd: List[str], bearer_token: str) -> Tuple[str, int]: + + # Edit the environment to include the bearer token + current_environment = os.environ.copy() + current_environment["OP_SERVICE_ACCOUNT_TOKEN"] = bearer_token + + # Build the full command + cmd = ["op"] + cmd.extend(op_cmd) + cmd.append("--format=json") + + # Spawn the onepassword CLI to run the command + op_proc = subprocess.Popen( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + env=current_environment, + ) + + # If stderr contains errors, return them + if op_proc.stderr: + error_lines = op_proc.stderr.read().decode("utf-8").splitlines() + logger.error("1P CLI returned errors. Passing through to caller.") + return json.dumps({"error": error_lines}), 500 + + # Return the result + return op_proc.stdout.read().decode("utf-8"), 200 + + +@app.route("/whoami", methods=["GET"]) +def who_am_i(): + # Read the bearer token from the Authorization header + token = __parse_service_token() + if not token: + return jsonify({"error": "No bearer token provided"}), 401 + + # Spawn the onepassword CLI to query the current user; + raw_json, http_code = __op_execute(["user", "get", "--me"], token) + + # Build the response + response = Response(raw_json, content_type="application/json", status=http_code) + return response + +@app.route("/read/", methods=["GET"]) +def read_item(uri: str): + # Read the bearer token from the Authorization header + token = __parse_service_token() + if not token: + return jsonify({"error": "No bearer token provided"}), 401 + + # Reconstruct the URI into 1p format + uri = "op://" + uri + + # Spawn the onepassword CLI to query the item + raw_json, http_code = __op_execute(["read", uri], token) + + # Build the response + response = Response(raw_json, content_type="application/json", status=http_code) + return response + + +def main() -> int: + # Handle program arguments + ap = argparse.ArgumentParser(prog="", description="") + ap.add_argument( + "--bind", help="Address to bind to", default="unix:///tmp/1password-bridge.sock" + ) + ap.add_argument("--port", help="Port to bind to", default=80, type=int) + ap.add_argument( + "-v", "--verbose", help="Enable verbose logging", action="store_true" + ) + args = ap.parse_args() + + # Configure logging + logging.basicConfig( + level=logging.DEBUG if args.verbose else logging.INFO, + format="%(levelname)s: %(message)s", + ) + + # If we can't access the `op` commandline utility, we can't do anything + if shutil.which("op") is None: + logger.error("1Password CLI not found. Please install it and try again.") + return 1 + + # Start up the server + app.run(host=args.bind, port=args.port) + + return 0 + + +if __name__ == "__main__": + sys.exit(main())