#! /usr/bin/env python3 import argparse import sys import logging import requests import feedparser import json import sqlite3 import subprocess import smtplib from datetime import datetime from pathlib import Path logger = logging.getLogger(__name__) def get_all_articles_for_subscription(subscription: dict) -> list: """ Get all articles for a given subscription """ # Find all the feeds feeds = subscription.get("feeds", []) if feeds != 1: logger.info(f"Found {len(feeds)} feeds for subscription {subscription['name']}") # Download all the feeds articles = [] for feed in feeds: logger.info(f"Downloading feed {feed}") # Make a request. Careful to handle failures try: response = requests.get(feed, timeout=3.0) response.raise_for_status() except requests.exceptions.RequestException as e: logger.warning(f"Failed to download feed {feed}: {e}") continue # Parse the feed feed = feedparser.parse(response.text) logger.info( f"Found {len(feed.entries)} articles in feed {subscription['name']} ({feed.feed.title})" ) # Add the articles to the list articles.extend(feed.entries) return articles def main() -> int: # Handle program arguments ap = argparse.ArgumentParser( prog="ewp-generate-article-digest", description="Generates a digest of new articles", ) ap.add_argument( "--subscriptions", help="Path to the subscriptions file", type=Path, default=Path( "~/.config/ewconfig/configs/ewp-rss-feeds/subscriptions.json" ).expanduser(), ) ap.add_argument( "--cache-file", help="Path to the cache file", type=Path, default=Path("~/.cache/ewp-rss-feeds.sqlite3").expanduser(), ) ap.add_argument( "-v", "--verbose", help="Enable verbose logging", action="store_true" ) args = ap.parse_args() # Configure logging logging.basicConfig( level=logging.DEBUG if args.verbose else logging.INFO, format="%(levelname)s: %(message)s", ) # Load the subscriptions file if not args.subscriptions.exists(): logger.error(f"Subscriptions file {args.subscriptions} does not exist") return 1 subscriptions = json.loads(args.subscriptions.read_text()) logger.info(f"Found {len(subscriptions)} subscriptions") # Set up the cache args.cache_file.parent.mkdir(parents=True, exist_ok=True) logger.info(f"Using cache file {args.cache_file}") cache_db = sqlite3.connect(args.cache_file) cache_db.execute( """ CREATE TABLE IF NOT EXISTS articles ( id INTEGER PRIMARY KEY, url TEXT, date_fetched TEXT ) """ ) # Create an output buffer output = {} # Handle each subscription for subscription in subscriptions: logger.info(f"Processing subscription {subscription['name']}") articles = get_all_articles_for_subscription(subscription) # Process each article for article in articles: # If we need special pre-processing if "lwn::hide-paid-articles" in subscription.get("filters", []): if article.get("title", "").startswith("[$]"): logger.info(f"Skipping paid article {article.title}") continue # Determine the article URL url = article.get("link") or article.get("guid") or None if url is None: logger.warning(f"Skipping article with no URL: {article.title}") continue # Check if the article is already in the cache cursor = cache_db.execute( "SELECT id FROM articles WHERE url = ?", (url,) ) if cursor.fetchone() is not None: logger.debug(f"Skipping article {article.title} (already in cache)") continue # Add the article to the output and cache it if subscription['name'] not in output: output[subscription['name']] = [] output[subscription['name']].append({ "title": article.get("title"), "link": url, }) cache_db.execute( "INSERT INTO articles (url, date_fetched) VALUES (?, datetime('now'))", (url,), ) # Sort the output by subscription name alphabetically (A first) output = dict(sorted(output.items(), key=lambda x: x[0].lower())) # Build the output output_str = "" for subscription, articles in output.items(): logger.debug(f"Building output for {subscription} ({len(articles)} articles)") output_str += f">> {subscription}\n" for article in articles: output_str +=f" - {article['title']}\n" output_str +=f" URL: {article['link']}\n" output_str += "\n" # Print the output print(output_str) # Clean up cache_db.commit() cache_db.close() return 0 if __name__ == "__main__": sys.exit(main())