1
ewconfig/scripts/cfddns

413 lines
15 KiB
Python
Executable File

#! /usr/bin/env python3
"""A Dynamic DNS script that can create and modify Cloudflare DNS records
To run this, you'll need an API key from Cloudflare.
Store this key either in $CFDDNS_API_TOKEN or in ewp-secrets as cfddns.api-token.
You can grab a copy of this script from here:
https://github.com/ewpratten/ewconfig/raw/master/scripts/cfddns
---
usage: cfddns [-h] [-i INTERFACE] [--no-ipv4-subzone] [--no-ipv6-subzone] [--dry-run] [-v] zone_id base_zone
A Cloudflare Dynamic DNS script
positional arguments:
zone_id Zone ID to update
base_zone DNS record to update
options:
-h, --help show this help message and exit
-i INTERFACE, --interface INTERFACE
If set, bind to this specific interface
--no-ipv4-subzone If set, don't create/update an IPv4 subzone
--no-ipv6-subzone If set, don't create/update an IPv6 subzone
--dry-run Print actions instead of performing them
-v, --verbose Enable verbose logging
"""
import argparse
import sys
import os
import logging
import requests
import socket
import ipaddress
import subprocess
import shutil
import requests.adapters
from datetime import datetime
from typing import Optional, Union
from urllib3.poolmanager import PoolManager
CDN_CGI_URL = "https://www.cloudflare.com/cdn-cgi/trace"
"""This can be the path to *any* cloudflare-proxied website. They all return the same thing."""
logger = logging.getLogger(__name__)
class InterfaceAdapter(requests.adapters.HTTPAdapter):
"""A custom HTTP adapter that can bind to a specific network interface"""
def __init__(self, interface: Optional[str] = None, **kwargs):
self.interface = interface
super().__init__(**kwargs)
def init_poolmanager(self, connections, maxsize, block=False, **pool_kwargs):
self.poolmanager = PoolManager(
num_pools=connections,
maxsize=maxsize,
block=block,
socket_options=(
[(socket.SOL_SOCKET, socket.SO_BINDTODEVICE, self.interface.encode())]
if self.interface
else []
),
)
def get_requesting_ip(
session: requests.Session, force_ipv4: bool = False
) -> Optional[Union[ipaddress.IPv4Address, ipaddress.IPv6Address]]:
# If we need to force an IPv4 connection
if force_ipv4:
logger.info("Forcing an IPv4 connection")
requests.packages.urllib3.util.connection.HAS_IPV6 = False
# Make a request
try:
response = session.get(CDN_CGI_URL)
response.raise_for_status()
except requests.RequestException as e:
logger.error(f"Failed to get the requesting IP: {e}")
return None
# Parse the response
for line in response.text.split("\n"):
if line.startswith("ip="):
return ipaddress.ip_address(line.split("=")[1])
def read_secret(secret_id: str) -> Optional[str]:
# Attempt to read from the environment
env_var = f"CFDDNS_{secret_id.replace('-', '_').upper()}"
if env_var in os.environ:
return os.environ[env_var]
# Otherwise, try to read from ewp-secrets
if shutil.which("ewp-secrets"):
secrets_proc = subprocess.Popen(
["ewp-secrets", "load", "-n", "cfddns", "-k", secret_id],
stdout=subprocess.PIPE,
)
if secrets_proc.wait() == 0:
return secrets_proc.stdout.read().decode().strip()
# If we can't find the secret, return None
return None
def main() -> int:
# Handle program arguments
ap = argparse.ArgumentParser(
prog="cfddns", description="A Cloudflare Dynamic DNS script"
)
ap.add_argument("zone_id", help="Zone ID to update")
ap.add_argument("base_zone", help="DNS record to update")
ap.add_argument("-i", "--interface", help="If set, bind to this specific interface")
ap.add_argument(
"--no-ipv4-subzone",
help="If set, don't create/update an IPv4 subzone",
action="store_true",
)
ap.add_argument(
"--no-ipv6-subzone",
help="If set, don't create/update an IPv6 subzone",
action="store_true",
)
ap.add_argument(
"--dry-run",
help="Print actions instead of performing them",
action="store_true",
)
ap.add_argument("--api-token", help="Cloudflare API token")
ap.add_argument(
"-v", "--verbose", help="Enable verbose logging", action="store_true"
)
args = ap.parse_args()
# Configure logging
logging.basicConfig(
level=logging.DEBUG if args.verbose else logging.INFO,
format="%(levelname)s: %(message)s",
)
# Read env vars needed for the Cloudflare API
cf_api_token = args.api_token or read_secret("api-token")
if not cf_api_token:
logger.error(
"Failed to read the Cloudflare API token. Either set $CFDDNS_API_TOKEN or use ewp-secrets"
)
return 1
# Create an HTTP session
session = requests.Session()
# If needed, use a custom network interface
if args.interface:
logger.info(f"Binding to network interface {args.interface}")
session.mount("http://", InterfaceAdapter(interface=args.interface))
session.mount("https://", InterfaceAdapter(interface=args.interface))
# Figure out our IPs
ipv4 = get_requesting_ip(session, force_ipv4=True)
ipv6 = get_requesting_ip(session, force_ipv4=False)
# If the ipv6 request returns an IPv4 address, we can't use it
if ipv6 and ipv6.version == 4:
logger.debug("IPv6 request returned an IPv4 address, ignoring it")
ipv6 = None
# Print the IPs we found
logger.info(f"Our IPv4 address is {ipv4}")
logger.info(f"Our IPv6 address is {ipv6}")
# Look up the contents of the zone
zone_contents_response = requests.get(
f"https://api.cloudflare.com/client/v4/zones/{args.zone_id}/dns_records",
headers={
"Authorization": f"Bearer {cf_api_token}",
"Content-Type": "application/json",
},
)
try:
zone_contents_response.raise_for_status()
except Exception as e:
logger.error(zone_contents_response.json())
raise e
zone_contents = zone_contents_response.json()
if "result" not in zone_contents:
logger.error("Failed to get the zone contents")
print(zone_contents)
return 1
# Clear all non-matching A and AAAA records on the base zone
has_correct_ipv4_record = False
has_correct_ipv6_record = False
apex_name = None
for record in zone_contents["result"]:
if record["name"] == args.base_zone and record["type"] in ["A", "AAAA"]:
logger.info(
f"Found existing {record['type']} record for {record['name']}. Value: {record['content']}"
)
# If this doesn't match the corresponding IP, delete it
if (record["type"] == "A" and record["content"] != str(ipv4)) or (
record["type"] == "AAAA" and record["content"] != str(ipv6)
):
logger.info(
f"Deleting stale record {record['id']} ({record['content']})"
)
if not args.dry_run:
delete_response = requests.delete(
f"https://api.cloudflare.com/client/v4/zones/{args.zone_id}/dns_records/{record['id']}",
headers={
"Authorization": f"Bearer {cf_api_token}",
"Content-Type": "application/json",
},
)
try:
delete_response.raise_for_status()
except Exception as e:
logger.error(delete_response.json())
raise e
# Mark the record as OK if it is ok
else:
if record["type"] == "A":
has_correct_ipv4_record = True
elif record["type"] == "AAAA":
has_correct_ipv6_record = True
# Keep track of the apex
if not apex_name:
apex_name = record["zone_name"]
# Figure out *when* we are
now = datetime.now().isoformat()
# If the base name is the apex, switch it to @
base_name = args.base_zone if args.base_zone != apex_name else "@"
# Write new A and AAAA records (if needed) to the base zone
if ipv4 and not has_correct_ipv4_record:
logger.info(f"Creating new A record for {base_name} with value {ipv4}")
if not args.dry_run:
create_response = requests.post(
f"https://api.cloudflare.com/client/v4/zones/{args.zone_id}/dns_records",
headers={
"Authorization": f"Bearer {cf_api_token}",
"Content-Type": "application/json",
},
json={
"type": "A",
"name": base_name,
"content": str(ipv4),
"ttl": 60,
"proxied": False,
"comment": f"Auto-generated by cfddns on {now}",
},
)
try:
create_response.raise_for_status()
except Exception as e:
logger.error(create_response.json())
raise e
if ipv6 and not has_correct_ipv6_record:
logger.info(f"Creating new AAAA record for {base_name} with value {ipv6}")
if not args.dry_run:
create_response = requests.post(
f"https://api.cloudflare.com/client/v4/zones/{args.zone_id}/dns_records",
headers={
"Authorization": f"Bearer {cf_api_token}",
"Content-Type": "application/json",
},
json={
"type": "AAAA",
"name": base_name,
"content": str(ipv6),
"ttl": 60,
"proxied": False,
"comment": f"Auto-generated by cfddns on {now}",
},
)
try:
create_response.raise_for_status()
except Exception as e:
logger.error(create_response.json())
raise e
# If we should be creating subdomains, do so
if not args.no_ipv4_subzone and ipv4:
# Look for an existing record
already_exists = False
for record in zone_contents["result"]:
if record["name"] == f"ipv4.{args.base_zone}" and record["type"] == "A":
logger.info(
f"Found existing A record for {record['name']}. Value: {record['content']}"
)
# If the record matches, we're done
if record["content"] == str(ipv4):
already_exists = True
break
# Otherwise, delete it
logger.info(
f"Deleting stale record {record['id']} ({record['content']})"
)
if not args.dry_run:
delete_response = requests.delete(
f"https://api.cloudflare.com/client/v4/zones/{args.zone_id}/dns_records/{record['id']}",
headers={
"Authorization": f"Bearer {cf_api_token}",
"Content-Type": "application/json",
},
)
try:
delete_response.raise_for_status()
except Exception as e:
logger.error(delete_response.json())
raise e
# If the record doesn't exist, create it
if not already_exists:
logger.info(
f"Creating new A record for ipv4.{args.base_zone} with value {ipv4}"
)
if not args.dry_run:
create_response = requests.post(
f"https://api.cloudflare.com/client/v4/zones/{args.zone_id}/dns_records",
headers={
"Authorization": f"Bearer {cf_api_token}",
"Content-Type": "application/json",
},
json={
"type": "A",
"name": f"ipv4.{args.base_zone}",
"content": str(ipv4),
"ttl": 60,
"proxied": False,
"comment": f"Auto-generated by cfddns on {now}",
},
)
try:
create_response.raise_for_status()
except Exception as e:
logger.error(create_response.json())
raise e
if not args.no_ipv6_subzone and ipv6:
# Look for an existing record
already_exists = False
for record in zone_contents["result"]:
if record["name"] == f"ipv6.{args.base_zone}" and record["type"] == "AAAA":
logger.info(
f"Found existing AAAA record for {record['name']}. Value: {record['content']}"
)
# If the record matches, we're done
if record["content"] == str(ipv6):
already_exists = True
break
# Otherwise, delete it
logger.info(
f"Deleting stale record {record['id']} ({record['content']})"
)
if not args.dry_run:
delete_response = requests.delete(
f"https://api.cloudflare.com/client/v4/zones/{args.zone_id}/dns_records/{record['id']}",
headers={
"Authorization": f"Bearer {cf_api_token}",
"Content-Type": "application/json",
},
)
try:
delete_response.raise_for_status()
except Exception as e:
logger.error(delete_response.json())
raise e
# If the record doesn't exist, create it
if not already_exists:
logger.info(
f"Creating new AAAA record for ipv6.{args.base_zone} with value {ipv6}"
)
if not args.dry_run:
create_response = requests.post(
f"https://api.cloudflare.com/client/v4/zones/{args.zone_id}/dns_records",
headers={
"Authorization": f"Bearer {cf_api_token}",
"Content-Type": "application/json",
},
json={
"type": "AAAA",
"name": f"ipv6.{args.base_zone}",
"content": str(ipv6),
"ttl": 60,
"proxied": False,
"comment": f"Auto-generated by cfddns on {now}",
},
)
try:
create_response.raise_for_status()
except Exception as e:
logger.error(create_response.json())
raise e
return 0
if __name__ == "__main__":
sys.exit(main())