diff --git a/scripts/gp-upload b/scripts/gp-upload new file mode 100755 index 0000000..7cd8e8d --- /dev/null +++ b/scripts/gp-upload @@ -0,0 +1,217 @@ +#! /usr/bin/env python3 +import argparse +import sys +import logging +import requests +import socket +import urllib.parse +from pathlib import Path + +logger = logging.getLogger(__name__) + +G_CLIENT_ID = "107923498573-ruh1uhkfe1t5f18vam6sckq7pqer1vmg.apps.googleusercontent.com" +G_SCOPES = ["https://www.googleapis.com/auth/photoslibrary.appendonly"] +G_REDIRECT_URI = "http://localhost:7842" + + +def get_google_oauth_token() -> str: + """Either log the user in, or used a stored refresh token to get an OAuth token""" + refresh_token_path = Path("~/.config/gp-upload/refresh-token").expanduser() + client_secret_path = Path("~/.config/gp-upload/client-secret").expanduser() + + # Read the client secret + with client_secret_path.open("r") as f: + client_secret = f.read().strip() + + # Check if we have a refresh token + if refresh_token_path.exists(): + logger.info("Using stored refresh token") + + # Read the refresh token + with refresh_token_path.open("r") as f: + refresh_token = f.read().strip() + + # Make the request + response = requests.post( + "https://oauth2.googleapis.com/token", + data={ + "client_id": G_CLIENT_ID, + "grant_type": "refresh_token", + "refresh_token": refresh_token, + "client_secret": client_secret, + }, + ) + + # Check for errors + if response.status_code != 200: + logger.error("Failed to get OAuth token") + logger.error(response.text) + return None + + # Return the OAuth token + return response.json()["access_token"] + + # Otherwise, log the user in + else: + logger.info("Logging user in") + + # Direct the user to Google's login page + logger.info("Please visit the following URL to log in:") + logger.info( + f"https://accounts.google.com/o/oauth2/v2/auth?client_id={G_CLIENT_ID}&response_type=code&scope={'+'.join(G_SCOPES)}&redirect_uri={G_REDIRECT_URI}&access_type=offline&prompt=consent" + ) + + # Open a TCP server to listen for the redirect + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(("localhost", 7842)) + s.listen() + + # Wait for the redirect + conn, addr = s.accept() + with conn: + # Read the request + request = conn.recv(1024).decode("utf-8") + + # Parse the request + request = request.splitlines() + request = [line for line in request if line.startswith("GET")] + request = request[0].split(" ")[1] + request = request.split("?")[1] + request = request.split("&") + request = {key: urllib.parse.unquote(value) for key, value in [pair.split("=") for pair in request]} + + # Check for errors + if "error" in request: + logger.error(f"Failed to log in: {request['error']}") + conn.sendall(b"HTTP/1.1 500 Internal Server Error\n\n<html><body><h1>Failed to log in</h1></body></html>") + conn.close() + return None + + # Return a message to the user and close the socket + conn.sendall(b"HTTP/1.1 200 OK\n\n<html><body><h1>Success!</h1></body></html>") + conn.close() + + # Make the request + response = requests.post( + "https://oauth2.googleapis.com/token", + data={ + "client_id": G_CLIENT_ID, + "code": request["code"], + "grant_type": "authorization_code", + "redirect_uri": G_REDIRECT_URI, + "client_secret": client_secret, + }, + ) + logger.info(f"Response: {response.text}") + + # Check for errors + if response.status_code != 200: + logger.error("Failed to get OAuth token") + logger.error(response.text) + return None + access_token = response.json()["access_token"] + refresh_token = response.json()["refresh_token"] + + # Save the refresh token + refresh_token_path.parent.mkdir(parents=True, exist_ok=True) + with refresh_token_path.open("w") as f: + f.write(refresh_token) + + # Return the OAuth token + return access_token + + + +def upload_file(file: Path, oauth_token: str): + # Read the file + with file.open("rb") as f: + file_data = f.read() + + # Make the upload request + logger.info("Creating new upload") + response = requests.post( + "https://photoslibrary.googleapis.com/v1/uploads", + headers={ + "Authorization": f"Bearer {oauth_token}", + "Content-type": "application/octet-stream", + "X-Goog-Upload-File-Name": file.name, + "X-Goog-Upload-Protocol": "raw", + }, + data=file_data, + ) + logger.info(f"Uploaded {file.stat().st_size} bytes") + + # Check for errors + if response.status_code != 200: + logger.error(f"Failed to upload: {file}") + logger.error(response.text) + return None + + # Get the upload token + upload_token = response.text + logger.info(f"Upload token: {upload_token}") + + # Create the media item + logger.info("Creating new media item") + response = requests.post( + "https://photoslibrary.googleapis.com/v1/mediaItems:batchCreate", + headers={ + "Authorization": f"Bearer {oauth_token}", + "Content-type": "application/json", + }, + json={ + "newMediaItems": [ + { + "description": "", + "simpleMediaItem": { + "fileName": file.name, + "uploadToken": upload_token, + }, + } + ] + }, + ) + + # Check for errors + if response.status_code != 200: + logger.error(f"Failed to create media item: {file}") + logger.error(response.text) + return None + + # Log some info about the action + for new_item in response.json()["newMediaItemResults"]: + if "mediaItem" in new_item: + logger.info(f"Created media item: {new_item['mediaItem']['filename']}") + logger.info(f"URL: {new_item['mediaItem']['productUrl']}") + + +def main() -> int: + # Handle program arguments + ap = argparse.ArgumentParser( + prog="gp-upload", description="Upload a file to Google Photos" + ) + ap.add_argument("file", help="File to upload") + ap.add_argument( + "-v", "--verbose", help="Enable verbose logging", action="store_true" + ) + args = ap.parse_args() + + # Configure logging + logging.basicConfig( + level=logging.DEBUG if args.verbose else logging.INFO, + format="%(levelname)s: %(message)s", + ) + + # Authenticate + oauth_token = get_google_oauth_token() + if oauth_token is None: + return 1 + + # Upload + upload_file(Path(args.file), oauth_token) + + return 0 + + +if __name__ == "__main__": + sys.exit(main())