Files
NASOpenClawRunTime/skills/feed-to-md/scripts/feed_to_md.py

291 lines
9.3 KiB
Python

#!/usr/bin/env python3
"""Convert RSS/Atom feeds to Markdown with safe URL/path handling."""
from __future__ import annotations
import argparse
import html
import ipaddress
import pathlib
import re
import socket
import sys
import urllib.parse
import urllib.request
import xml.etree.ElementTree as ET
TAG_RE = re.compile(r"<[^>]+>")
def normalize_text(value: str) -> str:
text = html.unescape(value or "")
text = TAG_RE.sub("", text)
return " ".join(text.split()).strip()
def validate_public_hostname(hostname: str, label: str) -> None:
if hostname in {"localhost", "localhost.localdomain"}:
raise ValueError(f"{label} uses localhost, which is not allowed")
try:
addr_info = socket.getaddrinfo(hostname, None)
except socket.gaierror as exc:
raise ValueError(f"Unable to resolve host: {hostname}") from exc
for item in addr_info:
ip_raw = item[4][0]
ip = ipaddress.ip_address(ip_raw)
if (
ip.is_private
or ip.is_loopback
or ip.is_link_local
or ip.is_multicast
or ip.is_reserved
or ip.is_unspecified
):
raise ValueError(f"{label} resolves to a non-public IP address")
def validate_feed_url(raw_url: str, label: str = "Feed URL") -> str:
parsed = urllib.parse.urlparse(raw_url)
if parsed.scheme not in {"http", "https"}:
raise ValueError(f"{label} must use http or https")
if not parsed.hostname:
raise ValueError(f"{label} must include a hostname")
hostname = parsed.hostname.strip().lower()
validate_public_hostname(hostname, f"{label} host")
return parsed.geturl()
def validate_output_path(raw_path: str) -> pathlib.Path:
out_path = pathlib.Path(raw_path)
if out_path.is_absolute():
raise ValueError("Output path must be relative to the current workspace")
if ".." in out_path.parts:
raise ValueError("Output path must not contain '..'")
if out_path.suffix.lower() != ".md":
raise ValueError("Output path must end with .md")
root = pathlib.Path.cwd().resolve()
target = (root / out_path).resolve()
try:
target.relative_to(root)
except ValueError as exc:
raise ValueError("Output path escapes the current workspace") from exc
return target
class PublicOnlyRedirectHandler(urllib.request.HTTPRedirectHandler):
def redirect_request(self, req, fp, code, msg, headers, newurl): # noqa: D401
redirected_url = urllib.parse.urljoin(req.full_url, newurl)
validate_feed_url(redirected_url, "Redirect URL")
return super().redirect_request(req, fp, code, msg, headers, newurl)
def fetch_xml(url: str, timeout: int = 15) -> bytes:
request = urllib.request.Request(
url,
headers={
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Accept": "application/rss+xml, application/atom+xml, application/xml, text/xml, */*",
"Accept-Language": "en-US,en;q=0.9",
},
)
opener = urllib.request.build_opener(PublicOnlyRedirectHandler())
with opener.open(request, timeout=timeout) as response:
final_url = response.geturl()
validate_feed_url(final_url, "Final URL")
return response.read()
def namespace(tag: str) -> str | None:
if tag.startswith("{") and "}" in tag:
return tag[1:].split("}", 1)[0]
return None
def find_text(elem: ET.Element, path: str, ns: dict[str, str] | None = None) -> str:
child = elem.find(path, ns or {})
if child is None or child.text is None:
return ""
return normalize_text(child.text)
def parse_rss(root: ET.Element) -> tuple[str, list[dict[str, str]]]:
content_ns = {"content": "http://purl.org/rss/1.0/modules/content/"}
channel = root.find("channel")
if channel is None:
raise ValueError("Invalid RSS feed: missing channel")
feed_title = find_text(channel, "title") or "Feed"
entries: list[dict[str, str]] = []
for item in channel.findall("item"):
title = find_text(item, "title") or "Untitled"
link = find_text(item, "link")
summary = find_text(item, "content:encoded", content_ns) or find_text(
item, "description"
)
published = find_text(item, "pubDate")
entries.append(
{
"title": title,
"link": link,
"summary": summary,
"published": published,
}
)
return feed_title, entries
def parse_atom(root: ET.Element, atom_ns: str) -> tuple[str, list[dict[str, str]]]:
ns = {"a": atom_ns}
feed_title = find_text(root, "a:title", ns) or "Feed"
entries: list[dict[str, str]] = []
for entry in root.findall("a:entry", ns):
title = find_text(entry, "a:title", ns) or "Untitled"
summary = find_text(entry, "a:summary", ns) or find_text(entry, "a:content", ns)
published = find_text(entry, "a:updated", ns) or find_text(entry, "a:published", ns)
link = ""
for link_elem in entry.findall("a:link", ns):
href = (link_elem.attrib.get("href") or "").strip()
rel = (link_elem.attrib.get("rel") or "alternate").strip()
if not href:
continue
if rel == "alternate":
link = href
break
if not link:
link = href
entries.append(
{
"title": title,
"link": link,
"summary": summary,
"published": published,
}
)
return feed_title, entries
def parse_feed(xml_bytes: bytes) -> tuple[str, list[dict[str, str]]]:
root = ET.fromstring(xml_bytes)
atom_ns = namespace(root.tag)
if atom_ns == "http://www.w3.org/2005/Atom":
return parse_atom(root, atom_ns)
return parse_rss(root)
def truncate(value: str, max_len: int) -> str:
if max_len <= 0 or len(value) <= max_len:
return value
clipped = value[: max_len - 1].rstrip()
return f"{clipped}"
def render_markdown(
feed_title: str,
entries: list[dict[str, str]],
template: str,
include_summary: bool,
summary_max_len: int,
) -> str:
lines: list[str] = [f"# {feed_title}", ""]
if not entries:
lines.extend(["No feed items found.", ""])
return "\n".join(lines).rstrip() + "\n"
if template == "short":
for item in entries:
title = item["title"]
link = item["link"]
published = item["published"]
line = f"- [{title}]({link})" if link else f"- {title}"
if published:
line += f" ({published})"
lines.append(line)
lines.append("")
return "\n".join(lines)
for item in entries:
title = item["title"]
link = item["link"]
summary = truncate(item["summary"], summary_max_len)
published = item["published"]
lines.append(f"## [{title}]({link})" if link else f"## {title}")
if published:
lines.append(f"- Published: {published}")
if include_summary and summary:
lines.append("")
lines.append(summary)
lines.append("")
return "\n".join(lines).rstrip() + "\n"
def build_arg_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Convert RSS/Atom feed URL to Markdown")
parser.add_argument("url", help="RSS/Atom feed URL")
parser.add_argument("-o", "--output", help="Write Markdown output to a .md file")
parser.add_argument("--limit", type=int, default=0, help="Max number of feed items")
parser.add_argument("--no-summary", action="store_true", help="Exclude summaries")
parser.add_argument(
"--summary-max-length",
type=int,
default=280,
help="Max summary length before truncation",
)
parser.add_argument(
"--template",
choices=("short", "full"),
default="short",
help="Output template style",
)
return parser
def main() -> int:
args = build_arg_parser().parse_args()
try:
feed_url = validate_feed_url(args.url)
output_path = validate_output_path(args.output) if args.output else None
if args.limit < 0:
raise ValueError("--limit must be >= 0")
if args.summary_max_length < 0:
raise ValueError("--summary-max-length must be >= 0")
xml_bytes = fetch_xml(feed_url)
feed_title, entries = parse_feed(xml_bytes)
if args.limit:
entries = entries[: args.limit]
include_summary = (not args.no_summary) and args.template == "full"
markdown = render_markdown(
feed_title=feed_title,
entries=entries,
template=args.template,
include_summary=include_summary,
summary_max_len=args.summary_max_length,
)
if output_path:
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(markdown, encoding="utf-8")
else:
sys.stdout.write(markdown)
return 0
except Exception as exc: # noqa: BLE001
sys.stderr.write(f"error: {exc}\n")
return 1
if __name__ == "__main__":
raise SystemExit(main())