"""Chat-completion clients.
ChatClient is a Protocol — any object with a `chat_completion` method
plugs in. We ship two concrete clients:
- OpenAICompatibleClient — talks to any OpenAI-compatible /v1/chat/completions
endpoint (vllm, llama.cpp server, ollama, TGI, hosted services).
- StubClient — offline canned responses for tests and dry-runs. No
network. Operation Voyeur safe.
"""
from __future__ import annotations
from typing import Protocol
[docs]
class ChatClient(Protocol):
[docs]
def chat_completion(
self,
messages: list[dict],
*,
model: str,
temperature: float = 0.1,
max_tokens: int = 512,
top_p: float = 1.0,
extra_body: dict | None = None,
) -> str:
"""Return the assistant's text response.
``extra_body`` is forwarded as additional fields in the JSON
request payload — used for vLLM-specific knobs like
``guided_json`` (constrain output to a JSON Schema at sampling
time, eliminating SCHEMA_INVALID failures from prompt drift).
Endpoints that don't recognize the field ignore it; the client
passes it through opaque-ly.
"""
...
[docs]
class StubClient:
"""Offline client for tests / --dry-run.
Pass `answer=callable(messages, **kw) -> str` for dynamic stubbing.
"""
def __init__(self, answer="[STUB] dry-run answer; no LLM was called."):
self._answer = answer
self.calls: list[dict] = []
[docs]
def chat_completion(self, messages, **kwargs) -> str:
self.calls.append({"messages": messages, "kwargs": kwargs})
if callable(self._answer):
return self._answer(messages, **kwargs)
return self._answer
# StubClient ignores extra_body — the offline path doesn't go through
# any inference engine that would honor grammar guidance. Tests that
# want to assert extra_body was passed should inspect `self.calls`.
def _qwen3_no_think_default(model: str, extra_body: dict | None) -> dict | None:
"""Default Qwen3 to enable_thinking=False unless the caller set it.
Qwen3 thinking models default to enable_thinking=True. On a large RAG
context that burns the entire token budget on hidden <think> reasoning
and returns EMPTY ``message.content`` (measured 2026-05-21: 768/768
completion tokens, content '') — every arborist answer comes back
UNGROUNDED. arborist's QA path wants the answer, not the trace, so we
force enable_thinking=False for Qwen3 by default. The reasoning-variant
path passes enable_thinking=True explicitly and is preserved untouched.
"""
if not model or "qwen3" not in model.lower():
return extra_body
eb = dict(extra_body or {})
ctk = dict(eb.get("chat_template_kwargs") or {})
if "enable_thinking" not in ctk:
ctk["enable_thinking"] = False
eb["chat_template_kwargs"] = ctk
return eb
[docs]
class OpenAICompatibleClient:
"""OpenAI-compatible chat completion over HTTP.
Default endpoint is configurable via env. Pass api_key only if the
target requires it; uncloseai's free endpoint does not.
Retries on transient upstream failures (HTTP 502/503/504) with
exponential backoff. The 2026-04-30 QA-modes bench saw 19 of 66
JSON-mode runs error out with 502 from vLLM — clustered, plausibly
correlated with `guided_json` stressing the grammar engine. Retry
smooths over the cluster without changing semantics: a 502 still
fails the bench cell if all attempts exhaust, but transient bursts
no longer dominate the error column.
"""
# HTTP status codes worth retrying — transient gateway/server
# failures from a flaky upstream. 4xx codes are client errors and
# never retried.
_RETRY_STATUS = (502, 503, 504)
def __init__(
self,
base_url: str,
api_key: str | None = None,
timeout: float = 60.0,
max_retries: int = 3,
retry_backoff_base_s: float = 0.5,
):
self.base_url = base_url.rstrip("/")
self.api_key = api_key
self.timeout = timeout
self.max_retries = max(1, max_retries)
self.retry_backoff_base_s = retry_backoff_base_s
# Persistent httpx.Client for HTTP/1.1 keep-alive + connection
# reuse. Constructing a fresh Client per chat_completion paid a
# TLS handshake every call (~100-300ms vs free for keep-alive),
# which adds up fast under bench --concurrency. httpx.Client is
# thread-safe for sequential or concurrent use; its internal
# connection pool serializes pool access. Closed via close().
import httpx
self._http = httpx.Client(timeout=self.timeout)
# Token usage from the most recent chat_completion (prompt /
# completion / total tokens), or None. Lets energy benches cost
# per ACTUAL tokens — incl. the prompt/context the substrate
# prefills — instead of a char//4 estimate. See bench/watt_bench.
self.last_usage: dict | None = None
[docs]
def close(self) -> None:
"""Release the underlying connection pool."""
try:
self._http.close()
except Exception:
pass
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
self.close()
return False
@staticmethod
def _scrub_surrogates(s: str) -> str:
"""Replace lone UTF-16 surrogates with U+FFFD.
Wikipedia chunks (and other ingested text) occasionally
contain unpaired surrogates from the ingest of invalid-UTF-8
source. httpx's json= path does ``.encode('utf-8')`` on the
serialized request body, which raises UnicodeEncodeError on
any lone surrogate. Sanitize incoming message content here
so the outbound HTTP request always serializes cleanly. We
round-trip through WTF-8 (surrogatepass) bytes, then decode
as standard UTF-8 with replacement — invalid sequences
become U+FFFD (REPLACEMENT CHARACTER).
"""
return s.encode("utf-8", errors="surrogatepass").decode("utf-8", errors="replace")
[docs]
def chat_completion(
self,
messages: list[dict],
*,
model: str,
temperature: float = 0.1,
max_tokens: int = 512,
top_p: float = 1.0,
extra_body: dict | None = None,
stop: list[str] | None = None,
) -> str:
import httpx
import time as _time
client = self._http # persistent connection pool from __init__
headers = {"Content-Type": "application/json"}
# Sanitize message content for httpx's json-encode path.
messages = [
{
**m,
"content": (
self._scrub_surrogates(m["content"])
if isinstance(m.get("content"), str)
else m.get("content")
),
}
for m in messages
]
if self.api_key:
headers["Authorization"] = f"Bearer {self.api_key}"
payload = {
"model": model,
"messages": messages,
"temperature": temperature,
"max_tokens": max_tokens,
"top_p": top_p,
}
if stop:
payload["stop"] = list(stop)
# extra_body merges into the payload root — vLLM accepts knobs
# like {"guided_json": {...schema...}} or {"guided_grammar": "..."}.
# Endpoints that don't recognize a key silently drop it.
extra_body = _qwen3_no_think_default(model, extra_body)
if extra_body:
for k, v in extra_body.items():
payload[k] = v
url = f"{self.base_url}/chat/completions"
last_exc: Exception | None = None
for attempt in range(self.max_retries):
try:
resp = client.post(url, headers=headers, json=payload)
if resp.status_code in self._RETRY_STATUS and attempt < self.max_retries - 1:
# Exponential backoff: 0.5s, 1.0s, 2.0s with the
# default base. Last attempt raises through.
sleep_s = self.retry_backoff_base_s * (2 ** attempt)
_time.sleep(sleep_s)
continue
resp.raise_for_status()
data = resp.json()
self.last_usage = data.get("usage")
return data["choices"][0]["message"]["content"]
except httpx.HTTPStatusError as e:
# Retry only the configured transient codes; raise others.
if e.response.status_code in self._RETRY_STATUS and attempt < self.max_retries - 1:
sleep_s = self.retry_backoff_base_s * (2 ** attempt)
_time.sleep(sleep_s)
last_exc = e
continue
raise
except (httpx.ConnectError, httpx.ReadTimeout, httpx.RemoteProtocolError) as e:
# Network-layer transient errors get the same retry.
if attempt < self.max_retries - 1:
sleep_s = self.retry_backoff_base_s * (2 ** attempt)
_time.sleep(sleep_s)
last_exc = e
continue
raise
# Defensive — loop should have returned or raised.
if last_exc is not None:
raise last_exc
raise RuntimeError("OpenAICompatibleClient.chat_completion: retry loop exhausted without raising")