"""Meta-Cognition Preflight Guard — Ticket #000010 Phase 1.
Runtime epistemic control layer that classifies a question's shape
BEFORE generation, so the model never answers from the surface form
alone when the question is ill-posed (false-premise, contradictory,
under-specified, broad-quantifier, time-sensitive, out-of-corpus,
reference-frame ambiguous).
Pure and deterministic. No I/O, no model call, no retrieval call.
Reuses ``arborist.qa.quantifier.classify_question_quantifier`` for
the broad-quantifier rung; adds four new lightweight detectors:
- temporal sensitivity (current/latest/today/CEO/etc.)
- contradiction (lexical) (unmarried+spouse, always+sometimes-not)
- false-premise (lite) (presupposition patterns)
- out-of-corpus (my-uploaded-X / my-file shapes)
Reference-frame detection lives in ``arborist.qa.query._detect_frame``
(ticket #000002) and is called from the surrounding runtime, not
from this module — keeps detection pure-on-question (no corpus
lookup needed here).
The output is a ``QuestionState`` dataclass that becomes a CTI root
node. First pass surfaces it on the ``query()`` result dict only;
run-DAG node binding deferred to the same Phase 5 work tracked in
ticket #000009 (both nodes can land together).
Hard rule (D1): No LLM in this hard path. Model-assisted preflight,
if added later, labels itself ``SOFT_PREFLIGHT_HINT`` and never
produces a ``PREFLIGHT_OK`` / ``PREFLIGHT_BLOCKED`` without
deterministic support.
"""
from __future__ import annotations
import hashlib
import re
from dataclasses import asdict, dataclass, field
from typing import Any, Literal
PREFLIGHT_VERSION = "metacognition-v0.1"
# ---------------------------------------------------------------- types
LogicalStatus = Literal[
"well_formed",
"under_specified",
"false_premise_suspected",
"contradictory_question",
"out_of_corpus_risk",
"stale_risk",
"reference_frame_ambiguous",
"broad_quantifier_unbounded",
]
PreflightResult = Literal[
"PREFLIGHT_OK",
"PREFLIGHT_PARTIAL",
"PREFLIGHT_BLOCKED",
]
TemporalSensitivity = Literal["high", "medium", "low"]
[docs]
@dataclass(frozen=True)
class QuestionState:
"""Runtime epistemic state for one question.
All fields are deterministic from the question + per-call
`model_profile` / `corpus_profile` / `policy` inputs. No
randomness, no LLM. Hashable via `preflight_policy_hash` so
the run-DAG (Phase 5) can bind the decision into the audit
chain.
"""
raw_question: str
question_hash: str
logical_statuses: tuple[LogicalStatus, ...]
question_shape: str
quantifier_intensity: str | None
quantifier_matched_token: str | None
scope_bound_hint: str
reference_frames: tuple[str, ...]
temporal_sensitivity: TemporalSensitivity
temporal_matched_tokens: tuple[str, ...]
contradiction_pairs: tuple[tuple[str, str], ...]
false_premise_hints: tuple[dict, ...]
corpus_requirement: str
known_boundaries: tuple[str, ...]
answer_constraints: dict
preflight_result: PreflightResult
preflight_policy_hash: str
classifier_version: str = PREFLIGHT_VERSION
[docs]
def to_dict(self) -> dict:
"""Convert to JSON-serializable dict for run-DAG / bench."""
out = asdict(self)
# asdict converts inner dataclasses but tuples-of-tuples
# come back as nested lists already — keep them as lists
# for JSON hygiene.
return out
# ---------------------------------------------------------------- temporal
# Lexical patterns at start-of-question or as standalone tokens.
# `current`, `latest`, `today`, `now`, `as of`, `this year`.
_TEMPORAL_HIGH_PATTERNS = [
re.compile(r"\bcurrent(?:ly)?\b", re.IGNORECASE),
re.compile(r"\blatest\b", re.IGNORECASE),
re.compile(r"\btoday\b", re.IGNORECASE),
re.compile(r"\bright now\b", re.IGNORECASE),
re.compile(r"\bas of\b", re.IGNORECASE),
re.compile(r"\bthis year\b", re.IGNORECASE),
re.compile(r"\bthis month\b", re.IGNORECASE),
re.compile(r"\bthis week\b", re.IGNORECASE),
re.compile(r"\brecently?\b", re.IGNORECASE),
]
# Role-shape patterns: questions about who currently holds a role.
# Conservative — only positions with rapid turnover.
_TEMPORAL_ROLE_PATTERNS = [
re.compile(r"\bCEO\b"),
re.compile(r"\bpresident of\b", re.IGNORECASE),
re.compile(r"\bprime minister\b", re.IGNORECASE),
re.compile(r"\bcurrent (?:champion|holder|price|stock)\b", re.IGNORECASE),
]
[docs]
def detect_temporal_sensitivity(
question: str,
) -> tuple[TemporalSensitivity, tuple[str, ...]]:
"""Return ``(sensitivity, matched_tokens)``.
`high` = explicit temporal anchor (`current`, `latest`, etc.)
OR rapid-turnover role pattern. `medium` reserved for future
weekly/monthly cadence detection (not implemented in this
pass). `low` = no temporal markers detected (the default).
"""
matched: list[str] = []
for pat in _TEMPORAL_HIGH_PATTERNS:
m = pat.search(question)
if m:
matched.append(m.group(0))
for pat in _TEMPORAL_ROLE_PATTERNS:
m = pat.search(question)
if m:
matched.append(m.group(0))
if matched:
return "high", tuple(matched)
return "low", ()
# ---------------------------------------------------------------- contradiction
# Conservative lexical-contradiction pairs. Only fires when BOTH
# tokens appear in the question. False positives are operator-
# hostile so we keep the list short and obvious.
_CONTRADICTION_PAIRS: tuple[tuple[str, str], ...] = (
("unmarried", "spouse"),
("unmarried", "married"),
("never", "always"),
("alive", "dead"),
("nonexistent", "existing"),
("only", "also"),
)
[docs]
def detect_contradiction(
question: str,
) -> tuple[tuple[str, str], ...]:
"""Return tuple of (token_a, token_b) pairs whose BOTH members
appear in ``question`` (case-insensitive whole-word match).
Returns empty tuple when no contradiction detected. The caller
decides whether to label-only or block — by default this
surfaces in the audit-line tail, NOT a hard block, since false
positives on contradiction would refuse legitimate questions.
"""
q_lower = question.lower()
found: list[tuple[str, str]] = []
for a, b in _CONTRADICTION_PAIRS:
# Word-boundary match on each side independently.
a_re = re.compile(rf"\b{re.escape(a)}\b")
b_re = re.compile(rf"\b{re.escape(b)}\b")
if a_re.search(q_lower) and b_re.search(q_lower):
found.append((a, b))
return tuple(found)
# ---------------------------------------------------------------- false premise
# Presupposition patterns. Each pattern extracts an implied relation
# from the question shape. The verifier uses `false_premise_hints`
# as a `required_evidence` hint — the question is NOT blocked, but
# the audit-line tail surfaces "false premise suspected" so the
# operator knows the system didn't blindly accept the premise.
# Subject and predicate character classes deliberately allow periods
# ("Mr.", "U.S."), apostrophes ("Homer's"), and hyphens
# ("by-law"). End-marker is `?` only (declarative variants of these
# question-shapes are not the target).
_FP_SUBJ = r"[\w\s\.\-']+?"
_FP_PRED = r"[\w\s\.\-']+?"
_FALSE_PREMISE_PATTERNS = [
# "when did X stop Y?" → presupposes X did Y
(
re.compile(
rf"\bwhen did\s+(?P<subject>{_FP_SUBJ})\s+stop\s+(?P<predicate>{_FP_PRED})\s*\?",
re.IGNORECASE,
),
"stopped_doing",
"X did Y at some prior time",
),
# "why did X cause Y?" → presupposes X caused Y
(
re.compile(
rf"\bwhy did\s+(?P<subject>{_FP_SUBJ})\s+cause\s+(?P<predicate>{_FP_PRED})\s*\?",
re.IGNORECASE,
),
"caused",
"X caused Y",
),
# "how did X become Y?" → presupposes X became Y
(
re.compile(
rf"\bhow did\s+(?P<subject>{_FP_SUBJ})\s+become\s+(?P<predicate>{_FP_PRED})\s*\?",
re.IGNORECASE,
),
"became",
"X became Y",
),
# "when did X become Y?" → presupposes X became Y
(
re.compile(
rf"\bwhen did\s+(?P<subject>{_FP_SUBJ})\s+become\s+(?P<predicate>{_FP_PRED})\s*\?",
re.IGNORECASE,
),
"became",
"X became Y",
),
]
[docs]
def detect_false_premise(question: str) -> tuple[dict, ...]:
"""Return tuple of presupposition dicts surfacing the implied
relation. Each dict carries::
kind -- pattern label (stopped_doing, caused, ...)
presupposition -- natural-language statement of the
presupposition
subject -- extracted subject token-span
predicate -- extracted predicate token-span
First-pass detection only. The verifier uses these as soft
hints; downstream the audit-line tail surfaces "false premise
suspected" so the operator can read the audit log and check
whether the cited evidence supports the presupposition.
Returns empty tuple when no pattern fires.
"""
hints: list[dict] = []
if not question:
return ()
# Append a `?` if the question lacks one — the patterns
# require a sentence-ending marker for the predicate group.
test_q = question if question.rstrip().endswith(("?", ".")) else question + "?"
for pat, kind, presup_template in _FALSE_PREMISE_PATTERNS:
m = pat.search(test_q)
if m:
subject = m.group("subject").strip()
predicate = m.group("predicate").strip()
hints.append({
"kind": kind,
"presupposition": presup_template.replace(
"X", subject
).replace("Y", predicate),
"subject": subject,
"predicate": predicate,
})
return tuple(hints)
# ---------------------------------------------------------------- out-of-corpus
# Patterns that signal the operator is asking about a private /
# uploaded / non-corpus document. Conservative — defaults to
# "likely_in_corpus" for typical encyclopedic questions.
_OUT_OF_CORPUS_PATTERNS = [
re.compile(r"\bmy (?:uploaded|unpublished|attached|private) [\w\s]+\b", re.IGNORECASE),
re.compile(r"\bthe file (?:i sent|i uploaded|i attached)\b", re.IGNORECASE),
re.compile(r"\bthe document (?:i sent|i uploaded|i attached)\b", re.IGNORECASE),
re.compile(r"\bin my (?:contract|email|notes|spreadsheet|inbox)\b", re.IGNORECASE),
re.compile(r"\bwhat does my [\w\s]+ say\b", re.IGNORECASE),
]
[docs]
def detect_out_of_corpus(question: str) -> bool:
"""Return True iff the question references a private / uploaded
document that the encyclopedic corpus cannot have."""
for pat in _OUT_OF_CORPUS_PATTERNS:
if pat.search(question):
return True
return False
# ---------------------------------------------------------------- preflight
def _question_hash(question: str) -> str:
"""Stable SHA-256 of the raw question string. Lower-cased,
whitespace-normalized so trivial variants share a hash."""
canon = re.sub(r"\s+", " ", question.strip().lower())
return hashlib.sha256(canon.encode("utf-8")).hexdigest()
def _preflight_policy_hash(*, policy: dict | None, version: str) -> str:
"""Hash of the policy fields that drive preflight behavior +
the classifier version. Bumping any of them invalidates prior
QuestionState records on lookup."""
relevant = {
"metacognition_enabled": (policy or {}).get("metacognition_enabled", True),
"metacognition_temporal_check": (policy or {}).get(
"metacognition_temporal_check", True
),
"metacognition_contradiction_check": (policy or {}).get(
"metacognition_contradiction_check", True
),
"metacognition_false_premise_check": (policy or {}).get(
"metacognition_false_premise_check", True
),
"metacognition_out_of_corpus_check": (policy or {}).get(
"metacognition_out_of_corpus_check", True
),
"metacognition_block_on_contradiction": (policy or {}).get(
"metacognition_block_on_contradiction", False
),
"version": version,
}
payload = "|".join(f"{k}={relevant[k]}" for k in sorted(relevant))
return hashlib.sha256(payload.encode("utf-8")).hexdigest()
def _classify_question_shape(
quantifier_intensity: str | None,
temporal: TemporalSensitivity,
has_contradiction: bool,
has_false_premise: bool,
has_out_of_corpus: bool,
) -> str:
"""Map the detector outputs onto a coarse shape mnemonic.
Used by downstream policy + audit display so the operator can
eyeball the question type without parsing the full QuestionState.
"""
if has_out_of_corpus:
return "out_of_corpus"
if has_contradiction:
return "contradictory"
if has_false_premise:
return "presupposing"
if temporal == "high":
return "time_sensitive"
if quantifier_intensity in {"ALL", "COMPREHENSIVE", "OPEN_REQUEST"}:
return "broad_request"
if quantifier_intensity in {"SMALL_NUM_EXPLICIT", "COMPARATIVE_BOUND"}:
return "bounded_count"
if quantifier_intensity == "ABSENT":
return "negation"
if quantifier_intensity == "PROPORTIONAL":
return "proportional"
return "single_fact"
[docs]
def preflight_question(
question: str,
*,
model_profile_id: str | None = None,
corpus_profile: dict | None = None,
reference_frames: tuple[str, ...] = (),
policy: dict | None = None,
) -> QuestionState:
"""Classify ``question`` deterministically into a QuestionState.
Pure function. Reuses the Phase 1 quantifier classifier
(#000008) plus four new lightweight detectors (temporal,
contradiction, false-premise-lite, out-of-corpus).
`corpus_profile` is an optional dict carrying corpus boundary
metadata (e.g. ``{"corpus_latest_timestamp": "2003-05-16"}``);
when present, the temporal detector cross-checks against it.
First-pass implementation just records `corpus_requirement`
based on the temporal sensitivity — full cutoff arithmetic
deferred to a future amend.
`reference_frames` is passed in by the caller because frame
detection requires retrieved sources (lives in
`arborist.qa.query._detect_frame`). Empty tuple is the default
for "no frame routing happened".
`policy` overrides for the per-detector enables. Defaults are
permissive (all checks on) per ticket #000010 §7.3.
"""
from arborist.qa.quantifier import classify_question_quantifier
policy = policy or {}
enabled = bool(policy.get("metacognition_enabled", True))
temporal_on = bool(policy.get("metacognition_temporal_check", True))
contradiction_on = bool(policy.get("metacognition_contradiction_check", True))
false_premise_on = bool(policy.get("metacognition_false_premise_check", True))
out_of_corpus_on = bool(policy.get("metacognition_out_of_corpus_check", True))
block_on_contradiction = bool(
policy.get("metacognition_block_on_contradiction", False)
)
# Empty-question short-circuit.
if not question or not question.strip():
return QuestionState(
raw_question=question or "",
question_hash=_question_hash(question or ""),
logical_statuses=(),
question_shape="empty",
quantifier_intensity=None,
quantifier_matched_token=None,
scope_bound_hint="unknown",
reference_frames=(),
temporal_sensitivity="low",
temporal_matched_tokens=(),
contradiction_pairs=(),
false_premise_hints=(),
corpus_requirement="not_applicable",
known_boundaries=("empty question",),
answer_constraints={},
preflight_result="PREFLIGHT_BLOCKED",
preflight_policy_hash=_preflight_policy_hash(
policy=policy, version=PREFLIGHT_VERSION
),
)
# Master kill — return a stub QuestionState with no detector
# output so the result schema stays consistent. Caller can
# distinguish "guard off" from "well-formed question" via the
# logical_statuses tuple being empty AND classifier_version.
if not enabled:
return QuestionState(
raw_question=question,
question_hash=_question_hash(question),
logical_statuses=(),
question_shape="metacognition_disabled",
quantifier_intensity=None,
quantifier_matched_token=None,
scope_bound_hint="unknown",
reference_frames=(),
temporal_sensitivity="low",
temporal_matched_tokens=(),
contradiction_pairs=(),
false_premise_hints=(),
corpus_requirement="not_evaluated",
known_boundaries=(),
answer_constraints={},
preflight_result="PREFLIGHT_OK",
preflight_policy_hash=_preflight_policy_hash(
policy=policy, version=PREFLIGHT_VERSION
),
)
# Reuse the #000008 quantifier classifier.
quant = classify_question_quantifier(question)
# Run the four new detectors (each gateable).
if temporal_on:
temporal, temporal_matched = detect_temporal_sensitivity(question)
else:
temporal, temporal_matched = "low", ()
if contradiction_on:
contradictions = detect_contradiction(question)
else:
contradictions = ()
if false_premise_on:
false_premise = detect_false_premise(question)
else:
false_premise = ()
if out_of_corpus_on:
out_of_corpus = detect_out_of_corpus(question)
else:
out_of_corpus = False
# Compose logical statuses.
statuses: list[LogicalStatus] = []
if quant.get("is_broad") and quant.get("scope_bound_hint") == "unbounded":
statuses.append("broad_quantifier_unbounded")
elif quant.get("is_broad"):
statuses.append("under_specified")
if temporal == "high":
statuses.append("stale_risk")
if contradictions:
statuses.append("contradictory_question")
if false_premise:
statuses.append("false_premise_suspected")
if out_of_corpus:
statuses.append("out_of_corpus_risk")
if reference_frames and len(reference_frames) > 1:
statuses.append("reference_frame_ambiguous")
if not statuses:
statuses.append("well_formed")
# Compose answer constraints.
answer_constraints: dict[str, Any] = {}
if "broad_quantifier_unbounded" in statuses or "under_specified" in statuses:
answer_constraints["bounded_or_reject"] = True
answer_constraints["max_claims_hint"] = quant.get("explicit_count") or 8
if "stale_risk" in statuses:
answer_constraints["requires_current_source"] = True
if "false_premise_suspected" in statuses:
answer_constraints["require_premise_evidence"] = [
h["presupposition"] for h in false_premise
]
if "out_of_corpus_risk" in statuses:
answer_constraints["expected_corpus_status"] = "out_of_corpus"
# Known boundaries — human-readable hints for the audit-line
# render layer + bench operator.
boundaries: list[str] = []
if temporal_matched:
boundaries.append(
f"temporal markers: {', '.join(temporal_matched)}"
)
if contradictions:
boundaries.append(
"lexical contradiction: "
+ ", ".join(f"{a}/{b}" for a, b in contradictions)
)
if false_premise:
boundaries.append(
"presuppositions: "
+ "; ".join(h["presupposition"] for h in false_premise)
)
if out_of_corpus:
boundaries.append("references private/uploaded document")
if quant.get("is_broad"):
boundaries.append(
f"broad quantifier ({quant.get('intensity')}, "
f"scope={quant.get('scope_bound_hint')})"
)
# Decide preflight result.
# PREFLIGHT_BLOCKED only when an explicit blocking condition is
# set in policy (default False for contradiction-block); otherwise
# PREFLIGHT_PARTIAL when any non-OK status fires; PREFLIGHT_OK
# when only "well_formed" is present.
if "well_formed" in statuses and len(statuses) == 1:
preflight_result: PreflightResult = "PREFLIGHT_OK"
elif "out_of_corpus_risk" in statuses:
preflight_result = "PREFLIGHT_BLOCKED"
elif block_on_contradiction and "contradictory_question" in statuses:
preflight_result = "PREFLIGHT_BLOCKED"
else:
preflight_result = "PREFLIGHT_PARTIAL"
corpus_requirement = (
"needs_current_source"
if temporal == "high"
else "out_of_corpus_likely"
if out_of_corpus
else "encyclopedic"
)
return QuestionState(
raw_question=question,
question_hash=_question_hash(question),
logical_statuses=tuple(statuses),
question_shape=_classify_question_shape(
quantifier_intensity=quant.get("intensity"),
temporal=temporal,
has_contradiction=bool(contradictions),
has_false_premise=bool(false_premise),
has_out_of_corpus=out_of_corpus,
),
quantifier_intensity=quant.get("intensity"),
quantifier_matched_token=quant.get("matched_token"),
scope_bound_hint=quant.get("scope_bound_hint", "unknown"),
reference_frames=tuple(reference_frames),
temporal_sensitivity=temporal,
temporal_matched_tokens=tuple(temporal_matched),
contradiction_pairs=tuple(contradictions),
false_premise_hints=tuple(false_premise),
corpus_requirement=corpus_requirement,
known_boundaries=tuple(boundaries),
answer_constraints=answer_constraints,
preflight_result=preflight_result,
preflight_policy_hash=_preflight_policy_hash(
policy=policy, version=PREFLIGHT_VERSION
),
)