#! /usr/bin/env python3 import argparse import sys import logging import smtplib import json import requests import feedparser import referencing import ast import re from datetime import datetime from pathlib import Path from time import mktime logger = logging.getLogger(__name__) FILTER_FN_RE = re.compile(r"^([^(]+)(?:(\([^\)]+\)))?") def filter_lwn_hide_paid_articles(article: dict) -> bool: return "[$]" not in article["entry"]["title"] def filter_discord_category(article: dict, category: str) -> bool: # return category in article["metadata"]["categories"] return True def main() -> int: # Handle program arguments ap = argparse.ArgumentParser( prog="ewp-send-article-digest", description="Generates and emails a digest of new articles", ) ap.add_argument("targets", help="Email addresses to send the digest to", nargs="+") ap.add_argument( "--since", help="Only fetch articles since this time", ) ap.add_argument("--fetch-first-n", help="Fetch the first N articles", type=int) ap.add_argument( "-v", "--verbose", help="Enable verbose logging", action="store_true" ) args = ap.parse_args() # Configure logging logging.basicConfig( level=logging.DEBUG if args.verbose else logging.INFO, format="%(levelname)s: %(message)s", ) # Read the subscriptions list subscriptions_file = json.loads( Path("~/.config/ewp-rss-feeds/subscriptions.json").expanduser().read_text() ) subscriptions = subscriptions_file["subscriptions"] logger.info(f"Found {len(subscriptions)} subscriptions") # Fetch each feed feeds = [] for subscription in subscriptions: # If we have a limit, stop fetching feeds if args.fetch_first_n and len(feeds) >= args.fetch_first_n: logger.info(f"Reached fetch limit of {args.fetch_first_n}") break # Make a request to get the feed logger.info(f"Fetching feed for: {subscription['name']}") try: response = requests.get(subscription["url"], timeout=3) except requests.exceptions.ConnectTimeout: logger.warning(f"Timed out fetching feed for: {subscription['name']}") continue # If the response fails, we can warn and skip if not response.ok: logger.warning(f"Failed to fetch feed for: {subscription['name']}") continue # Parse the feed feed = feedparser.parse(response.text) feeds.append({"metadata": subscription, "feed": feed}) logger.info(f"Fetched {len(feeds)}/{len(subscriptions)} feeds") # Figure out the actual time to filter articles by since = 0 if not args.since else datetime.fromisoformat(args.since).timestamp() # Filter articles by date logger.info(f"Filtering articles since: {since}") articles = [] for feed in feeds: for entry in feed["feed"]["entries"]: if entry["published_parsed"]: if mktime(entry["published_parsed"]) > since: articles.append({"metadata": feed["metadata"], "entry": entry}) else: logger.warning(f"Entry has no published date: {entry['title']}") logger.info(f"Found {len(articles)} articles") # Handle special filters logger.info("Applying special filters") for article in articles: article["send"] = True filters = article["metadata"].get("filters", []) for filter_info in filters: # Parse the filter into a name and possible literal arguments match = FILTER_FN_RE.match(filter_info) if not match: logger.error(f"Failed to parse filter: {filter_info}") sys.exit(1) filter_name, filter_args = match.groups() filter_name = filter_name.lower().replace("-", "_").replace("::", "_") if filter_args: filter_args = ast.literal_eval(filter_args) else: filter_args = () # Get the filter function filter_fn = getattr(sys.modules[__name__], f"filter_{filter_name}") if not filter_fn: logger.error(f"Failed to find filter: {filter_name}") sys.exit(1) # Apply the filter article["send"] = filter_fn(article, *filter_args) # Prune articles = [article for article in articles if article["send"]] logger.info(f"Filtered to {len(articles)} articles") return 0 if __name__ == "__main__": sys.exit(main())