"""HTML page source.
Fetches URLs, honors robots.txt automatically, strips noise (script/style/nav/
footer/header), extracts main body text + outbound `<a href>` links as edges.
Optional dependency. Install with `pip install arborist[html]`.
"""
from __future__ import annotations
import re
import urllib.parse
import urllib.robotparser
from pathlib import Path
from typing import TYPE_CHECKING, Iterable, Iterator
try:
import httpx
from selectolax.parser import HTMLParser
except ImportError as e: # pragma: no cover
raise ImportError(
"HTML source requires extras: pip install 'arborist[html]'"
) from e
from arborist.document import Document, Edge
from arborist.source import Source
if TYPE_CHECKING:
from arborist.sources.loss_report import LossCollector
USER_AGENT = "arborist/0.0.1 (+https://unturf.com)"
NOISE_SELECTORS = ("script", "style", "noscript", "nav", "header", "footer", "aside")
NORMALIZE_VERSION = "html-normalize-v1"
ADAPTER_NAME = "HtmlPageSource"
# Loss-kind taxonomy per ticket #000022 §2.1.1. Free-string in v0; promoted
# to enum after >=3 adapters. ``html_chrome`` is heuristic — selectolax may
# leave residual nav text on pages that don't tag with semantic elements;
# auditors should treat the kind as advisory, not authoritative.
_NOISE_LOSS_KINDS = {
"script": "script_block",
"style": "style_block",
"noscript": "html_chrome",
"nav": "html_chrome",
"header": "html_chrome",
"footer": "html_chrome",
"aside": "html_chrome",
}
def _normalize_text(
text: str,
*,
loss_collector: "LossCollector | None" = None,
) -> str:
pre_len = (
len(text.encode("utf-8", errors="surrogatepass"))
if loss_collector is not None
else 0
)
text = re.sub(r"[ \t]+", " ", text)
text = re.sub(r"\n{3,}", "\n\n", text)
text = text.strip()
if loss_collector is not None:
post_len = len(text.encode("utf-8", errors="surrogatepass"))
loss_collector.record_delta(
stage="html_normalize",
canonicalization_version=NORMALIZE_VERSION,
loss_kind="whitespace_run",
loss_mode="normalize",
bytes_delta=pre_len - post_len,
)
return text
[docs]
def parse_html(
url: str,
html: str,
source_type: str = "html",
*,
loss_collector: "LossCollector | None" = None,
) -> Document | None:
"""Pure parse function. Separated so tests can run without network.
When ``loss_collector`` is provided, drops from noise-selector
decomposition (``<script>``, ``<style>``, ``<nav>``, etc.) and
whitespace normalization are recorded against the collector. Output
bytes remain bit-identical to the no-collector path. Default
``None`` keeps callers stable.
"""
tree = HTMLParser(html)
for sel in NOISE_SELECTORS:
for node in tree.css(sel):
if loss_collector is not None:
kind = _NOISE_LOSS_KINDS.get(sel, "html_chrome")
dropped_html = node.html or ""
if dropped_html:
loss_collector.add(
stage="html_normalize",
canonicalization_version=NORMALIZE_VERSION,
loss_kind=kind,
loss_mode="pure_drop",
dropped=dropped_html,
)
node.decompose()
body = tree.css_first("body") or tree.root
if body is None:
return None
text = _normalize_text(
body.text(separator="\n", strip=True),
loss_collector=loss_collector,
)
if not text:
return None
title_node = tree.css_first("title")
title = title_node.text(strip=True) if title_node is not None else None
edges: list[Edge] = []
seen: set[tuple[str, str]] = set()
for a in tree.css("a[href]"):
href = (a.attributes.get("href") or "").strip()
if not href or href.startswith(("javascript:", "mailto:", "tel:", "#")):
continue
absolute = urllib.parse.urljoin(url, href)
split = urllib.parse.urlsplit(absolute)
if split.scheme not in ("http", "https"):
continue
anchor = split.fragment or ""
dst_uri = urllib.parse.urlunsplit(split._replace(fragment=""))
key = (dst_uri, anchor)
if key in seen:
continue
seen.add(key)
edges.append(Edge(edge_type="hyperlink", dst_uri=dst_uri, anchor=anchor or None))
extra: dict = {}
if loss_collector is not None:
events = loss_collector.events()
if events:
# Document-scope losses anchor to the doc's first chunk at
# ingest time. We stash them in extra; ingest reads and
# persists. See ticket #000022 §3.4.
extra["loss_events"] = events
return Document(
uri=url,
content=text,
source_type=source_type,
title=title,
edges=edges,
extra=extra,
)
[docs]
class HtmlPageSource(Source):
"""Iterates a list of URLs, fetching and parsing each as HTML."""
source_type = "html"
def __init__(
self,
urls: Iterable[str],
*,
respect_robots: bool = True,
timeout: float = 30.0,
loss_report_enabled: bool = True,
loss_report_excerpts: bool = True,
loss_report_max_excerpt_bytes: int = 200,
default_author: str | None = None,
):
"""``default_author`` (Phase 1 follow-up of #000031) — when the
ingested HTML's ``<title>`` doesn't already include the author
surname, this string is appended (``"<title>, by <author>"``).
Surfaces the author signal into the warrant resolver's
``_shard_matches_citation`` haystack without a per-shard SQL
UPDATE workaround. Wikisource and Project Gutenberg HTML pages
rarely include author in ``<title>`` — the manifest entry
carries it instead. Skipped when None or empty.
"""
self.urls = list(urls)
self.respect_robots = respect_robots
self.timeout = timeout
self.loss_report_enabled = loss_report_enabled
self.loss_report_excerpts = loss_report_excerpts
self.loss_report_max_excerpt_bytes = loss_report_max_excerpt_bytes
self.default_author = (default_author or "").strip()
self._robots_cache: dict[str, urllib.robotparser.RobotFileParser] = {}
[docs]
@classmethod
def from_file(cls, path: str | Path, **kwargs) -> HtmlPageSource:
urls = [
line.strip()
for line in Path(path).read_text(encoding="utf-8").splitlines()
if line.strip() and not line.lstrip().startswith("#")
]
return cls(urls, **kwargs)
[docs]
def iter_documents(self) -> Iterator[Document]:
with httpx.Client(
headers={"User-Agent": USER_AGENT},
timeout=self.timeout,
follow_redirects=True,
) as client:
for url in self.urls:
if self.respect_robots and not self._allowed(client, url):
continue
try:
resp = client.get(url)
resp.raise_for_status()
except httpx.HTTPError:
continue
ctype = resp.headers.get("content-type", "").lower()
if "html" not in ctype and "xml" not in ctype:
continue
collector = None
if self.loss_report_enabled:
from arborist.sources.loss_report import LossCollector
collector = LossCollector(
excerpts_enabled=self.loss_report_excerpts,
max_excerpt_bytes=self.loss_report_max_excerpt_bytes,
adapter_name=ADAPTER_NAME,
adapter_version=NORMALIZE_VERSION,
)
doc = parse_html(
str(resp.url),
resp.text,
self.source_type,
loss_collector=collector,
)
if doc is not None:
yield self._with_author_appended(doc)
def _allowed(self, client: "httpx.Client", url: str) -> bool:
parsed = urllib.parse.urlparse(url)
origin = f"{parsed.scheme}://{parsed.netloc}"
rp = self._robots_cache.get(origin)
if rp is None:
rp = urllib.robotparser.RobotFileParser()
try:
resp = client.get(f"{origin}/robots.txt")
except httpx.HTTPError:
resp = None
if resp is not None and resp.status_code == 200:
rp.parse(resp.text.splitlines())
else:
# Missing robots.txt = no rules per RFC 9309.
rp.allow_all = True
self._robots_cache[origin] = rp
return rp.can_fetch(USER_AGENT, url)
def _with_author_appended(self, doc):
"""Phase 1 follow-up of #000031 — when the source ingest
produces a Document whose title doesn't carry the author
surname, append ``", by <default_author>"`` so the resolver's
author-surname haystack check fires.
Suppress the append when ``default_author`` is empty OR when
the existing title already contains a surname token of length
>= 4 from the configured author (case-insensitive substring).
Idempotent — re-ingesting the same URL with the same config
produces the same Document content_root because content
doesn't change, only the title text appended.
"""
if not self.default_author or not doc:
return doc
title = (doc.title or "").strip()
# If any surname-shaped token from default_author already
# appears in title (case-insensitive), don't append.
title_l = title.lower()
for tok in self.default_author.split():
tok_l = tok.lower()
if len(tok_l) >= 4 and tok_l in title_l:
return doc
new_title = (
f"{title}, by {self.default_author}" if title
else f"by {self.default_author}"
)
# Document is a frozen-ish dataclass; replace title.
from dataclasses import replace
return replace(doc, title=new_title)