Source code for arborist.qa.client

"""Chat-completion clients.

ChatClient is a Protocol — any object with a `chat_completion` method
plugs in. We ship two concrete clients:

- OpenAICompatibleClient — talks to any OpenAI-compatible /v1/chat/completions
  endpoint (vllm, llama.cpp server, ollama, TGI, hosted services).
- StubClient — offline canned responses for tests and dry-runs. No
  network. Operation Voyeur safe.
"""

from __future__ import annotations

from typing import Protocol


[docs] class ChatClient(Protocol):
[docs] def chat_completion( self, messages: list[dict], *, model: str, temperature: float = 0.1, max_tokens: int = 512, top_p: float = 1.0, extra_body: dict | None = None, ) -> str: """Return the assistant's text response. ``extra_body`` is forwarded as additional fields in the JSON request payload — used for vLLM-specific knobs like ``guided_json`` (constrain output to a JSON Schema at sampling time, eliminating SCHEMA_INVALID failures from prompt drift). Endpoints that don't recognize the field ignore it; the client passes it through opaque-ly. """ ...
[docs] class StubClient: """Offline client for tests / --dry-run. Pass `answer=callable(messages, **kw) -> str` for dynamic stubbing. """ def __init__(self, answer="[STUB] dry-run answer; no LLM was called."): self._answer = answer self.calls: list[dict] = []
[docs] def chat_completion(self, messages, **kwargs) -> str: self.calls.append({"messages": messages, "kwargs": kwargs}) if callable(self._answer): return self._answer(messages, **kwargs) return self._answer
# StubClient ignores extra_body — the offline path doesn't go through # any inference engine that would honor grammar guidance. Tests that # want to assert extra_body was passed should inspect `self.calls`. def _qwen3_no_think_default(model: str, extra_body: dict | None) -> dict | None: """Default Qwen3 to enable_thinking=False unless the caller set it. Qwen3 thinking models default to enable_thinking=True. On a large RAG context that burns the entire token budget on hidden <think> reasoning and returns EMPTY ``message.content`` (measured 2026-05-21: 768/768 completion tokens, content '') — every arborist answer comes back UNGROUNDED. arborist's QA path wants the answer, not the trace, so we force enable_thinking=False for Qwen3 by default. The reasoning-variant path passes enable_thinking=True explicitly and is preserved untouched. """ if not model or "qwen3" not in model.lower(): return extra_body eb = dict(extra_body or {}) ctk = dict(eb.get("chat_template_kwargs") or {}) if "enable_thinking" not in ctk: ctk["enable_thinking"] = False eb["chat_template_kwargs"] = ctk return eb
[docs] class OpenAICompatibleClient: """OpenAI-compatible chat completion over HTTP. Default endpoint is configurable via env. Pass api_key only if the target requires it; uncloseai's free endpoint does not. Retries on transient upstream failures (HTTP 502/503/504) with exponential backoff. The 2026-04-30 QA-modes bench saw 19 of 66 JSON-mode runs error out with 502 from vLLM — clustered, plausibly correlated with `guided_json` stressing the grammar engine. Retry smooths over the cluster without changing semantics: a 502 still fails the bench cell if all attempts exhaust, but transient bursts no longer dominate the error column. """ # HTTP status codes worth retrying — transient gateway/server # failures from a flaky upstream. 4xx codes are client errors and # never retried. _RETRY_STATUS = (502, 503, 504) def __init__( self, base_url: str, api_key: str | None = None, timeout: float = 60.0, max_retries: int = 3, retry_backoff_base_s: float = 0.5, ): self.base_url = base_url.rstrip("/") self.api_key = api_key self.timeout = timeout self.max_retries = max(1, max_retries) self.retry_backoff_base_s = retry_backoff_base_s # Persistent httpx.Client for HTTP/1.1 keep-alive + connection # reuse. Constructing a fresh Client per chat_completion paid a # TLS handshake every call (~100-300ms vs free for keep-alive), # which adds up fast under bench --concurrency. httpx.Client is # thread-safe for sequential or concurrent use; its internal # connection pool serializes pool access. Closed via close(). import httpx self._http = httpx.Client(timeout=self.timeout) # Token usage from the most recent chat_completion (prompt / # completion / total tokens), or None. Lets energy benches cost # per ACTUAL tokens — incl. the prompt/context the substrate # prefills — instead of a char//4 estimate. See bench/watt_bench. self.last_usage: dict | None = None
[docs] def close(self) -> None: """Release the underlying connection pool.""" try: self._http.close() except Exception: pass
def __enter__(self): return self def __exit__(self, exc_type, exc, tb): self.close() return False @staticmethod def _scrub_surrogates(s: str) -> str: """Replace lone UTF-16 surrogates with U+FFFD. Wikipedia chunks (and other ingested text) occasionally contain unpaired surrogates from the ingest of invalid-UTF-8 source. httpx's json= path does ``.encode('utf-8')`` on the serialized request body, which raises UnicodeEncodeError on any lone surrogate. Sanitize incoming message content here so the outbound HTTP request always serializes cleanly. We round-trip through WTF-8 (surrogatepass) bytes, then decode as standard UTF-8 with replacement — invalid sequences become U+FFFD (REPLACEMENT CHARACTER). """ return s.encode("utf-8", errors="surrogatepass").decode("utf-8", errors="replace")
[docs] def chat_completion( self, messages: list[dict], *, model: str, temperature: float = 0.1, max_tokens: int = 512, top_p: float = 1.0, extra_body: dict | None = None, stop: list[str] | None = None, ) -> str: import httpx import time as _time client = self._http # persistent connection pool from __init__ headers = {"Content-Type": "application/json"} # Sanitize message content for httpx's json-encode path. messages = [ { **m, "content": ( self._scrub_surrogates(m["content"]) if isinstance(m.get("content"), str) else m.get("content") ), } for m in messages ] if self.api_key: headers["Authorization"] = f"Bearer {self.api_key}" payload = { "model": model, "messages": messages, "temperature": temperature, "max_tokens": max_tokens, "top_p": top_p, } if stop: payload["stop"] = list(stop) # extra_body merges into the payload root — vLLM accepts knobs # like {"guided_json": {...schema...}} or {"guided_grammar": "..."}. # Endpoints that don't recognize a key silently drop it. extra_body = _qwen3_no_think_default(model, extra_body) if extra_body: for k, v in extra_body.items(): payload[k] = v url = f"{self.base_url}/chat/completions" last_exc: Exception | None = None for attempt in range(self.max_retries): try: resp = client.post(url, headers=headers, json=payload) if resp.status_code in self._RETRY_STATUS and attempt < self.max_retries - 1: # Exponential backoff: 0.5s, 1.0s, 2.0s with the # default base. Last attempt raises through. sleep_s = self.retry_backoff_base_s * (2 ** attempt) _time.sleep(sleep_s) continue resp.raise_for_status() data = resp.json() self.last_usage = data.get("usage") return data["choices"][0]["message"]["content"] except httpx.HTTPStatusError as e: # Retry only the configured transient codes; raise others. if e.response.status_code in self._RETRY_STATUS and attempt < self.max_retries - 1: sleep_s = self.retry_backoff_base_s * (2 ** attempt) _time.sleep(sleep_s) last_exc = e continue raise except (httpx.ConnectError, httpx.ReadTimeout, httpx.RemoteProtocolError) as e: # Network-layer transient errors get the same retry. if attempt < self.max_retries - 1: sleep_s = self.retry_backoff_base_s * (2 ** attempt) _time.sleep(sleep_s) last_exc = e continue raise # Defensive — loop should have returned or raised. if last_exc is not None: raise last_exc raise RuntimeError("OpenAICompatibleClient.chat_completion: retry loop exhausted without raising")