add 1p bridge script
This commit is contained in:
parent
e2c3a2fcdc
commit
397ac96781
126
scripts/1p-bridge
Executable file
126
scripts/1p-bridge
Executable file
@ -0,0 +1,126 @@
|
|||||||
|
#! /usr/bin/env python3
|
||||||
|
"""1password bridge
|
||||||
|
|
||||||
|
This script can expose the `op` cli utility as a web service.
|
||||||
|
Callers can then pass a 1password service token through an HTTP request,
|
||||||
|
and are able to read secrets from 1password.
|
||||||
|
|
||||||
|
HTTP routes:
|
||||||
|
/whoami
|
||||||
|
GET: Returns the current user's information
|
||||||
|
/read/<path:uri>
|
||||||
|
GET: Reads the item at the specified URI
|
||||||
|
"""
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import sys
|
||||||
|
import os
|
||||||
|
import logging
|
||||||
|
import shutil
|
||||||
|
import subprocess
|
||||||
|
import json
|
||||||
|
from flask import Flask, request, Response
|
||||||
|
from typing import Optional, List, Tuple
|
||||||
|
|
||||||
|
app = Flask(__name__)
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def __parse_service_token() -> Optional[str]:
|
||||||
|
bearer_token = request.headers.get("Authorization")
|
||||||
|
if bearer_token is None or not bearer_token.startswith("Bearer "):
|
||||||
|
return None
|
||||||
|
return bearer_token[7:]
|
||||||
|
|
||||||
|
def __op_execute(op_cmd: List[str], bearer_token: str) -> Tuple[str, int]:
|
||||||
|
|
||||||
|
# Edit the environment to include the bearer token
|
||||||
|
current_environment = os.environ.copy()
|
||||||
|
current_environment["OP_SERVICE_ACCOUNT_TOKEN"] = bearer_token
|
||||||
|
|
||||||
|
# Build the full command
|
||||||
|
cmd = ["op"]
|
||||||
|
cmd.extend(op_cmd)
|
||||||
|
cmd.append("--format=json")
|
||||||
|
|
||||||
|
# Spawn the onepassword CLI to run the command
|
||||||
|
op_proc = subprocess.Popen(
|
||||||
|
cmd,
|
||||||
|
stdout=subprocess.PIPE,
|
||||||
|
stderr=subprocess.PIPE,
|
||||||
|
env=current_environment,
|
||||||
|
)
|
||||||
|
|
||||||
|
# If stderr contains errors, return them
|
||||||
|
if op_proc.stderr:
|
||||||
|
error_lines = op_proc.stderr.read().decode("utf-8").splitlines()
|
||||||
|
logger.error("1P CLI returned errors. Passing through to caller.")
|
||||||
|
return json.dumps({"error": error_lines}), 500
|
||||||
|
|
||||||
|
# Return the result
|
||||||
|
return op_proc.stdout.read().decode("utf-8"), 200
|
||||||
|
|
||||||
|
|
||||||
|
@app.route("/whoami", methods=["GET"])
|
||||||
|
def who_am_i():
|
||||||
|
# Read the bearer token from the Authorization header
|
||||||
|
token = __parse_service_token()
|
||||||
|
if not token:
|
||||||
|
return jsonify({"error": "No bearer token provided"}), 401
|
||||||
|
|
||||||
|
# Spawn the onepassword CLI to query the current user;
|
||||||
|
raw_json, http_code = __op_execute(["user", "get", "--me"], token)
|
||||||
|
|
||||||
|
# Build the response
|
||||||
|
response = Response(raw_json, content_type="application/json", status=http_code)
|
||||||
|
return response
|
||||||
|
|
||||||
|
@app.route("/read/<path:uri>", methods=["GET"])
|
||||||
|
def read_item(uri: str):
|
||||||
|
# Read the bearer token from the Authorization header
|
||||||
|
token = __parse_service_token()
|
||||||
|
if not token:
|
||||||
|
return jsonify({"error": "No bearer token provided"}), 401
|
||||||
|
|
||||||
|
# Reconstruct the URI into 1p format
|
||||||
|
uri = "op://" + uri
|
||||||
|
|
||||||
|
# Spawn the onepassword CLI to query the item
|
||||||
|
raw_json, http_code = __op_execute(["read", uri], token)
|
||||||
|
|
||||||
|
# Build the response
|
||||||
|
response = Response(raw_json, content_type="application/json", status=http_code)
|
||||||
|
return response
|
||||||
|
|
||||||
|
|
||||||
|
def main() -> int:
|
||||||
|
# Handle program arguments
|
||||||
|
ap = argparse.ArgumentParser(prog="", description="")
|
||||||
|
ap.add_argument(
|
||||||
|
"--bind", help="Address to bind to", default="unix:///tmp/1password-bridge.sock"
|
||||||
|
)
|
||||||
|
ap.add_argument("--port", help="Port to bind to", default=80, type=int)
|
||||||
|
ap.add_argument(
|
||||||
|
"-v", "--verbose", help="Enable verbose logging", action="store_true"
|
||||||
|
)
|
||||||
|
args = ap.parse_args()
|
||||||
|
|
||||||
|
# Configure logging
|
||||||
|
logging.basicConfig(
|
||||||
|
level=logging.DEBUG if args.verbose else logging.INFO,
|
||||||
|
format="%(levelname)s: %(message)s",
|
||||||
|
)
|
||||||
|
|
||||||
|
# If we can't access the `op` commandline utility, we can't do anything
|
||||||
|
if shutil.which("op") is None:
|
||||||
|
logger.error("1Password CLI not found. Please install it and try again.")
|
||||||
|
return 1
|
||||||
|
|
||||||
|
# Start up the server
|
||||||
|
app.run(host=args.bind, port=args.port)
|
||||||
|
|
||||||
|
return 0
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
sys.exit(main())
|
Loading…
x
Reference in New Issue
Block a user