#! /usr/bin/env python3 import argparse import sys import logging import requests import socket import urllib.parse from pathlib import Path logger = logging.getLogger(__name__) G_CLIENT_ID = "107923498573-ruh1uhkfe1t5f18vam6sckq7pqer1vmg.apps.googleusercontent.com" G_SCOPES = ["https://www.googleapis.com/auth/photoslibrary.appendonly"] G_REDIRECT_URI = "http://localhost:7842" def get_google_oauth_token() -> str: """Either log the user in, or used a stored refresh token to get an OAuth token""" refresh_token_path = Path("~/.config/gp-upload/refresh-token").expanduser() client_secret_path = Path("~/.config/gp-upload/client-secret").expanduser() # Read the client secret with client_secret_path.open("r") as f: client_secret = f.read().strip() # Check if we have a refresh token if refresh_token_path.exists(): logger.info("Using stored refresh token") # Read the refresh token with refresh_token_path.open("r") as f: refresh_token = f.read().strip() # Make the request response = requests.post( "https://oauth2.googleapis.com/token", data={ "client_id": G_CLIENT_ID, "grant_type": "refresh_token", "refresh_token": refresh_token, "client_secret": client_secret, }, ) # Check for errors if response.status_code != 200: logger.error("Failed to get OAuth token") logger.error(response.text) return None # Return the OAuth token return response.json()["access_token"] # Otherwise, log the user in else: logger.info("Logging user in") # Direct the user to Google's login page logger.info("Please visit the following URL to log in:") logger.info( f"https://accounts.google.com/o/oauth2/v2/auth?client_id={G_CLIENT_ID}&response_type=code&scope={'+'.join(G_SCOPES)}&redirect_uri={G_REDIRECT_URI}&access_type=offline&prompt=consent" ) # Open a TCP server to listen for the redirect with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind(("localhost", 7842)) s.listen() # Wait for the redirect conn, addr = s.accept() with conn: # Read the request request = conn.recv(1024).decode("utf-8") # Parse the request request = request.splitlines() request = [line for line in request if line.startswith("GET")] request = request[0].split(" ")[1] request = request.split("?")[1] request = request.split("&") request = {key: urllib.parse.unquote(value) for key, value in [pair.split("=") for pair in request]} # Check for errors if "error" in request: logger.error(f"Failed to log in: {request['error']}") conn.sendall(b"HTTP/1.1 500 Internal Server Error\n\n

Failed to log in

") conn.close() return None # Return a message to the user and close the socket conn.sendall(b"HTTP/1.1 200 OK\n\n

Success!

") conn.close() # Make the request response = requests.post( "https://oauth2.googleapis.com/token", data={ "client_id": G_CLIENT_ID, "code": request["code"], "grant_type": "authorization_code", "redirect_uri": G_REDIRECT_URI, "client_secret": client_secret, }, ) logger.info(f"Response: {response.text}") # Check for errors if response.status_code != 200: logger.error("Failed to get OAuth token") logger.error(response.text) return None access_token = response.json()["access_token"] refresh_token = response.json()["refresh_token"] # Save the refresh token refresh_token_path.parent.mkdir(parents=True, exist_ok=True) with refresh_token_path.open("w") as f: f.write(refresh_token) # Return the OAuth token return access_token def upload_file(file: Path, oauth_token: str): # Read the file with file.open("rb") as f: file_data = f.read() # Make the upload request logger.info("Creating new upload") response = requests.post( "https://photoslibrary.googleapis.com/v1/uploads", headers={ "Authorization": f"Bearer {oauth_token}", "Content-type": "application/octet-stream", "X-Goog-Upload-File-Name": file.name, "X-Goog-Upload-Protocol": "raw", }, data=file_data, ) logger.info(f"Uploaded {file.stat().st_size} bytes") # Check for errors if response.status_code != 200: logger.error(f"Failed to upload: {file}") logger.error(response.text) return None # Get the upload token upload_token = response.text logger.info(f"Upload token: {upload_token}") # Create the media item logger.info("Creating new media item") response = requests.post( "https://photoslibrary.googleapis.com/v1/mediaItems:batchCreate", headers={ "Authorization": f"Bearer {oauth_token}", "Content-type": "application/json", }, json={ "newMediaItems": [ { "description": "", "simpleMediaItem": { "fileName": file.name, "uploadToken": upload_token, }, } ] }, ) # Check for errors if response.status_code != 200: logger.error(f"Failed to create media item: {file}") logger.error(response.text) return None # Log some info about the action for new_item in response.json()["newMediaItemResults"]: if "mediaItem" in new_item: logger.info(f"Created media item: {new_item['mediaItem']['filename']}") logger.info(f"URL: {new_item['mediaItem']['productUrl']}") def main() -> int: # Handle program arguments ap = argparse.ArgumentParser( prog="gp-upload", description="Upload a file to Google Photos" ) ap.add_argument("file", help="File to upload") ap.add_argument( "-v", "--verbose", help="Enable verbose logging", action="store_true" ) args = ap.parse_args() # Configure logging logging.basicConfig( level=logging.DEBUG if args.verbose else logging.INFO, format="%(levelname)s: %(message)s", ) # Authenticate oauth_token = get_google_oauth_token() if oauth_token is None: return 1 # Upload upload_file(Path(args.file), oauth_token) return 0 if __name__ == "__main__": sys.exit(main())