218 lines
7.2 KiB
Python
Executable File
218 lines
7.2 KiB
Python
Executable File
#! /usr/bin/env python3
|
|
import argparse
|
|
import sys
|
|
import logging
|
|
import requests
|
|
import socket
|
|
import urllib.parse
|
|
from pathlib import Path
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
G_CLIENT_ID = "107923498573-ruh1uhkfe1t5f18vam6sckq7pqer1vmg.apps.googleusercontent.com"
|
|
G_SCOPES = ["https://www.googleapis.com/auth/photoslibrary.appendonly"]
|
|
G_REDIRECT_URI = "http://localhost:7842"
|
|
|
|
|
|
def get_google_oauth_token() -> str:
|
|
"""Either log the user in, or used a stored refresh token to get an OAuth token"""
|
|
refresh_token_path = Path("~/.config/gp-upload/refresh-token").expanduser()
|
|
client_secret_path = Path("~/.config/gp-upload/client-secret").expanduser()
|
|
|
|
# Read the client secret
|
|
with client_secret_path.open("r") as f:
|
|
client_secret = f.read().strip()
|
|
|
|
# Check if we have a refresh token
|
|
if refresh_token_path.exists():
|
|
logger.info("Using stored refresh token")
|
|
|
|
# Read the refresh token
|
|
with refresh_token_path.open("r") as f:
|
|
refresh_token = f.read().strip()
|
|
|
|
# Make the request
|
|
response = requests.post(
|
|
"https://oauth2.googleapis.com/token",
|
|
data={
|
|
"client_id": G_CLIENT_ID,
|
|
"grant_type": "refresh_token",
|
|
"refresh_token": refresh_token,
|
|
"client_secret": client_secret,
|
|
},
|
|
)
|
|
|
|
# Check for errors
|
|
if response.status_code != 200:
|
|
logger.error("Failed to get OAuth token")
|
|
logger.error(response.text)
|
|
return None
|
|
|
|
# Return the OAuth token
|
|
return response.json()["access_token"]
|
|
|
|
# Otherwise, log the user in
|
|
else:
|
|
logger.info("Logging user in")
|
|
|
|
# Direct the user to Google's login page
|
|
logger.info("Please visit the following URL to log in:")
|
|
logger.info(
|
|
f"https://accounts.google.com/o/oauth2/v2/auth?client_id={G_CLIENT_ID}&response_type=code&scope={'+'.join(G_SCOPES)}&redirect_uri={G_REDIRECT_URI}&access_type=offline&prompt=consent"
|
|
)
|
|
|
|
# Open a TCP server to listen for the redirect
|
|
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
|
s.bind(("localhost", 7842))
|
|
s.listen()
|
|
|
|
# Wait for the redirect
|
|
conn, addr = s.accept()
|
|
with conn:
|
|
# Read the request
|
|
request = conn.recv(1024).decode("utf-8")
|
|
|
|
# Parse the request
|
|
request = request.splitlines()
|
|
request = [line for line in request if line.startswith("GET")]
|
|
request = request[0].split(" ")[1]
|
|
request = request.split("?")[1]
|
|
request = request.split("&")
|
|
request = {key: urllib.parse.unquote(value) for key, value in [pair.split("=") for pair in request]}
|
|
|
|
# Check for errors
|
|
if "error" in request:
|
|
logger.error(f"Failed to log in: {request['error']}")
|
|
conn.sendall(b"HTTP/1.1 500 Internal Server Error\n\n<html><body><h1>Failed to log in</h1></body></html>")
|
|
conn.close()
|
|
return None
|
|
|
|
# Return a message to the user and close the socket
|
|
conn.sendall(b"HTTP/1.1 200 OK\n\n<html><body><h1>Success!</h1></body></html>")
|
|
conn.close()
|
|
|
|
# Make the request
|
|
response = requests.post(
|
|
"https://oauth2.googleapis.com/token",
|
|
data={
|
|
"client_id": G_CLIENT_ID,
|
|
"code": request["code"],
|
|
"grant_type": "authorization_code",
|
|
"redirect_uri": G_REDIRECT_URI,
|
|
"client_secret": client_secret,
|
|
},
|
|
)
|
|
logger.info(f"Response: {response.text}")
|
|
|
|
# Check for errors
|
|
if response.status_code != 200:
|
|
logger.error("Failed to get OAuth token")
|
|
logger.error(response.text)
|
|
return None
|
|
access_token = response.json()["access_token"]
|
|
refresh_token = response.json()["refresh_token"]
|
|
|
|
# Save the refresh token
|
|
refresh_token_path.parent.mkdir(parents=True, exist_ok=True)
|
|
with refresh_token_path.open("w") as f:
|
|
f.write(refresh_token)
|
|
|
|
# Return the OAuth token
|
|
return access_token
|
|
|
|
|
|
|
|
def upload_file(file: Path, oauth_token: str):
|
|
# Read the file
|
|
with file.open("rb") as f:
|
|
file_data = f.read()
|
|
|
|
# Make the upload request
|
|
logger.info("Creating new upload")
|
|
response = requests.post(
|
|
"https://photoslibrary.googleapis.com/v1/uploads",
|
|
headers={
|
|
"Authorization": f"Bearer {oauth_token}",
|
|
"Content-type": "application/octet-stream",
|
|
"X-Goog-Upload-File-Name": file.name,
|
|
"X-Goog-Upload-Protocol": "raw",
|
|
},
|
|
data=file_data,
|
|
)
|
|
logger.info(f"Uploaded {file.stat().st_size} bytes")
|
|
|
|
# Check for errors
|
|
if response.status_code != 200:
|
|
logger.error(f"Failed to upload: {file}")
|
|
logger.error(response.text)
|
|
return None
|
|
|
|
# Get the upload token
|
|
upload_token = response.text
|
|
logger.info(f"Upload token: {upload_token}")
|
|
|
|
# Create the media item
|
|
logger.info("Creating new media item")
|
|
response = requests.post(
|
|
"https://photoslibrary.googleapis.com/v1/mediaItems:batchCreate",
|
|
headers={
|
|
"Authorization": f"Bearer {oauth_token}",
|
|
"Content-type": "application/json",
|
|
},
|
|
json={
|
|
"newMediaItems": [
|
|
{
|
|
"description": "",
|
|
"simpleMediaItem": {
|
|
"fileName": file.name,
|
|
"uploadToken": upload_token,
|
|
},
|
|
}
|
|
]
|
|
},
|
|
)
|
|
|
|
# Check for errors
|
|
if response.status_code != 200:
|
|
logger.error(f"Failed to create media item: {file}")
|
|
logger.error(response.text)
|
|
return None
|
|
|
|
# Log some info about the action
|
|
for new_item in response.json()["newMediaItemResults"]:
|
|
if "mediaItem" in new_item:
|
|
logger.info(f"Created media item: {new_item['mediaItem']['filename']}")
|
|
logger.info(f"URL: {new_item['mediaItem']['productUrl']}")
|
|
|
|
|
|
def main() -> int:
|
|
# Handle program arguments
|
|
ap = argparse.ArgumentParser(
|
|
prog="gp-upload", description="Upload a file to Google Photos"
|
|
)
|
|
ap.add_argument("file", help="File to upload")
|
|
ap.add_argument(
|
|
"-v", "--verbose", help="Enable verbose logging", action="store_true"
|
|
)
|
|
args = ap.parse_args()
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.DEBUG if args.verbose else logging.INFO,
|
|
format="%(levelname)s: %(message)s",
|
|
)
|
|
|
|
# Authenticate
|
|
oauth_token = get_google_oauth_token()
|
|
if oauth_token is None:
|
|
return 1
|
|
|
|
# Upload
|
|
upload_file(Path(args.file), oauth_token)
|
|
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|