Script to upload files to google photos
This commit is contained in:
parent
0dfe5f3ab6
commit
c36a358c12
217
scripts/gp-upload
Executable file
217
scripts/gp-upload
Executable file
@ -0,0 +1,217 @@
|
||||
#! /usr/bin/env python3
|
||||
import argparse
|
||||
import sys
|
||||
import logging
|
||||
import requests
|
||||
import socket
|
||||
import urllib.parse
|
||||
from pathlib import Path
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
G_CLIENT_ID = "107923498573-ruh1uhkfe1t5f18vam6sckq7pqer1vmg.apps.googleusercontent.com"
|
||||
G_SCOPES = ["https://www.googleapis.com/auth/photoslibrary.appendonly"]
|
||||
G_REDIRECT_URI = "http://localhost:7842"
|
||||
|
||||
|
||||
def get_google_oauth_token() -> str:
|
||||
"""Either log the user in, or used a stored refresh token to get an OAuth token"""
|
||||
refresh_token_path = Path("~/.config/gp-upload/refresh-token").expanduser()
|
||||
client_secret_path = Path("~/.config/gp-upload/client-secret").expanduser()
|
||||
|
||||
# Read the client secret
|
||||
with client_secret_path.open("r") as f:
|
||||
client_secret = f.read().strip()
|
||||
|
||||
# Check if we have a refresh token
|
||||
if refresh_token_path.exists():
|
||||
logger.info("Using stored refresh token")
|
||||
|
||||
# Read the refresh token
|
||||
with refresh_token_path.open("r") as f:
|
||||
refresh_token = f.read().strip()
|
||||
|
||||
# Make the request
|
||||
response = requests.post(
|
||||
"https://oauth2.googleapis.com/token",
|
||||
data={
|
||||
"client_id": G_CLIENT_ID,
|
||||
"grant_type": "refresh_token",
|
||||
"refresh_token": refresh_token,
|
||||
"client_secret": client_secret,
|
||||
},
|
||||
)
|
||||
|
||||
# Check for errors
|
||||
if response.status_code != 200:
|
||||
logger.error("Failed to get OAuth token")
|
||||
logger.error(response.text)
|
||||
return None
|
||||
|
||||
# Return the OAuth token
|
||||
return response.json()["access_token"]
|
||||
|
||||
# Otherwise, log the user in
|
||||
else:
|
||||
logger.info("Logging user in")
|
||||
|
||||
# Direct the user to Google's login page
|
||||
logger.info("Please visit the following URL to log in:")
|
||||
logger.info(
|
||||
f"https://accounts.google.com/o/oauth2/v2/auth?client_id={G_CLIENT_ID}&response_type=code&scope={'+'.join(G_SCOPES)}&redirect_uri={G_REDIRECT_URI}&access_type=offline&prompt=consent"
|
||||
)
|
||||
|
||||
# Open a TCP server to listen for the redirect
|
||||
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
||||
s.bind(("localhost", 7842))
|
||||
s.listen()
|
||||
|
||||
# Wait for the redirect
|
||||
conn, addr = s.accept()
|
||||
with conn:
|
||||
# Read the request
|
||||
request = conn.recv(1024).decode("utf-8")
|
||||
|
||||
# Parse the request
|
||||
request = request.splitlines()
|
||||
request = [line for line in request if line.startswith("GET")]
|
||||
request = request[0].split(" ")[1]
|
||||
request = request.split("?")[1]
|
||||
request = request.split("&")
|
||||
request = {key: urllib.parse.unquote(value) for key, value in [pair.split("=") for pair in request]}
|
||||
|
||||
# Check for errors
|
||||
if "error" in request:
|
||||
logger.error(f"Failed to log in: {request['error']}")
|
||||
conn.sendall(b"HTTP/1.1 500 Internal Server Error\n\n<html><body><h1>Failed to log in</h1></body></html>")
|
||||
conn.close()
|
||||
return None
|
||||
|
||||
# Return a message to the user and close the socket
|
||||
conn.sendall(b"HTTP/1.1 200 OK\n\n<html><body><h1>Success!</h1></body></html>")
|
||||
conn.close()
|
||||
|
||||
# Make the request
|
||||
response = requests.post(
|
||||
"https://oauth2.googleapis.com/token",
|
||||
data={
|
||||
"client_id": G_CLIENT_ID,
|
||||
"code": request["code"],
|
||||
"grant_type": "authorization_code",
|
||||
"redirect_uri": G_REDIRECT_URI,
|
||||
"client_secret": client_secret,
|
||||
},
|
||||
)
|
||||
logger.info(f"Response: {response.text}")
|
||||
|
||||
# Check for errors
|
||||
if response.status_code != 200:
|
||||
logger.error("Failed to get OAuth token")
|
||||
logger.error(response.text)
|
||||
return None
|
||||
access_token = response.json()["access_token"]
|
||||
refresh_token = response.json()["refresh_token"]
|
||||
|
||||
# Save the refresh token
|
||||
refresh_token_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with refresh_token_path.open("w") as f:
|
||||
f.write(refresh_token)
|
||||
|
||||
# Return the OAuth token
|
||||
return access_token
|
||||
|
||||
|
||||
|
||||
def upload_file(file: Path, oauth_token: str):
|
||||
# Read the file
|
||||
with file.open("rb") as f:
|
||||
file_data = f.read()
|
||||
|
||||
# Make the upload request
|
||||
logger.info("Creating new upload")
|
||||
response = requests.post(
|
||||
"https://photoslibrary.googleapis.com/v1/uploads",
|
||||
headers={
|
||||
"Authorization": f"Bearer {oauth_token}",
|
||||
"Content-type": "application/octet-stream",
|
||||
"X-Goog-Upload-File-Name": file.name,
|
||||
"X-Goog-Upload-Protocol": "raw",
|
||||
},
|
||||
data=file_data,
|
||||
)
|
||||
logger.info(f"Uploaded {file.stat().st_size} bytes")
|
||||
|
||||
# Check for errors
|
||||
if response.status_code != 200:
|
||||
logger.error(f"Failed to upload: {file}")
|
||||
logger.error(response.text)
|
||||
return None
|
||||
|
||||
# Get the upload token
|
||||
upload_token = response.text
|
||||
logger.info(f"Upload token: {upload_token}")
|
||||
|
||||
# Create the media item
|
||||
logger.info("Creating new media item")
|
||||
response = requests.post(
|
||||
"https://photoslibrary.googleapis.com/v1/mediaItems:batchCreate",
|
||||
headers={
|
||||
"Authorization": f"Bearer {oauth_token}",
|
||||
"Content-type": "application/json",
|
||||
},
|
||||
json={
|
||||
"newMediaItems": [
|
||||
{
|
||||
"description": "",
|
||||
"simpleMediaItem": {
|
||||
"fileName": file.name,
|
||||
"uploadToken": upload_token,
|
||||
},
|
||||
}
|
||||
]
|
||||
},
|
||||
)
|
||||
|
||||
# Check for errors
|
||||
if response.status_code != 200:
|
||||
logger.error(f"Failed to create media item: {file}")
|
||||
logger.error(response.text)
|
||||
return None
|
||||
|
||||
# Log some info about the action
|
||||
for new_item in response.json()["newMediaItemResults"]:
|
||||
if "mediaItem" in new_item:
|
||||
logger.info(f"Created media item: {new_item['mediaItem']['filename']}")
|
||||
logger.info(f"URL: {new_item['mediaItem']['productUrl']}")
|
||||
|
||||
|
||||
def main() -> int:
|
||||
# Handle program arguments
|
||||
ap = argparse.ArgumentParser(
|
||||
prog="gp-upload", description="Upload a file to Google Photos"
|
||||
)
|
||||
ap.add_argument("file", help="File to upload")
|
||||
ap.add_argument(
|
||||
"-v", "--verbose", help="Enable verbose logging", action="store_true"
|
||||
)
|
||||
args = ap.parse_args()
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(
|
||||
level=logging.DEBUG if args.verbose else logging.INFO,
|
||||
format="%(levelname)s: %(message)s",
|
||||
)
|
||||
|
||||
# Authenticate
|
||||
oauth_token = get_google_oauth_token()
|
||||
if oauth_token is None:
|
||||
return 1
|
||||
|
||||
# Upload
|
||||
upload_file(Path(args.file), oauth_token)
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
Loading…
x
Reference in New Issue
Block a user