#!/usr/bin/env python """ MOAT v5g minimal experiment harness. This is a self-contained NumPy prototype for the MOAT v5g-Final spec: - randomized 2D latent geometry per rollout - diagnostic controls that may use evaluator-only ground truth geometry - residual-only and action-only classifiers that do not receive geometry - high-energy / high-PE wrong-belief policy with reduced discriminative directional energy - horizon sweep and classifier-family robustness checks The script is intentionally conservative: it is a benchmark skeleton and measurement sanity check, not a full proposed-agent implementation. """ from __future__ import annotations import argparse import json import math from dataclasses import asdict, dataclass from pathlib import Path from typing import Callable, Dict, Iterable, List, Tuple import numpy as np Array = np.ndarray @dataclass class Config: seed: int = 7 n_train: int = 800 n_test: int = 400 horizons: Tuple[int, ...] = (5, 10, 20, 40) wrong_strengths: Tuple[float, ...] = (0.0, 0.25, 0.5, 0.75, 1.0) delta_b: float = 0.9 sigma_w: float = 0.25 input_energy: float = 2.0 min_directional_energy: float = 0.10 max_directional_energy: float = 0.50 theta_min_deg: float = 30.0 theta_max_deg: float = 150.0 delay: int = 3 pe_threshold: float = 0.15 energy_threshold: float = 1.0 auc_high: float = 0.75 auc_low: float = 0.60 auc_action_pass: float = 0.55 directional_energy_low: float = 0.35 rff_dim: int = 160 mlp_hidden: int = 24 train_steps: int = 180 learning_rate: float = 0.08 def unit_from_angle(theta: float) -> Array: return np.array([math.cos(theta), math.sin(theta)], dtype=float) def rotate(v: Array, theta: float) -> Array: c, s = math.cos(theta), math.sin(theta) return np.array([c * v[0] - s * v[1], s * v[0] + c * v[1]], dtype=float) def perp(v: Array) -> Array: return np.array([-v[1], v[0]], dtype=float) def auc_from_scores(scores: Array, labels: Array) -> float: scores = np.asarray(scores, dtype=float) labels = np.asarray(labels, dtype=int) pos = scores[labels == 1] neg = scores[labels == 0] if len(pos) == 0 or len(neg) == 0: return float("nan") # Rank AUC with average ranks for ties. order = np.argsort(scores) ranks = np.empty_like(order, dtype=float) sorted_scores = scores[order] i = 0 while i < len(scores): j = i + 1 while j < len(scores) and sorted_scores[j] == sorted_scores[i]: j += 1 ranks[order[i:j]] = 0.5 * (i + j - 1) + 1.0 i = j rank_sum_pos = ranks[labels == 1].sum() n_pos, n_neg = len(pos), len(neg) auc = (rank_sum_pos - n_pos * (n_pos + 1) / 2.0) / (n_pos * n_neg) return float(max(auc, 1.0 - auc)) def standardize(x_train: Array, x_test: Array) -> Tuple[Array, Array]: mu = x_train.mean(axis=0, keepdims=True) sd = x_train.std(axis=0, keepdims=True) sd = np.where(sd < 1e-8, 1.0, sd) return (x_train - mu) / sd, (x_test - mu) / sd def sigmoid(z: Array) -> Array: z = np.clip(z, -40.0, 40.0) return 1.0 / (1.0 + np.exp(-z)) def fit_logistic_auc( x_train: Array, y_train: Array, x_test: Array, y_test: Array, rng: np.random.Generator, steps: int, lr: float, l2: float = 1e-3, ) -> float: x_train, x_test = standardize(x_train, x_test) x_train = np.c_[x_train, np.ones(len(x_train))] x_test = np.c_[x_test, np.ones(len(x_test))] w = rng.normal(scale=0.02, size=x_train.shape[1]) y = y_train.astype(float) for _ in range(steps): p = sigmoid(x_train @ w) grad = x_train.T @ (p - y) / len(y) + l2 * w grad[-1] -= l2 * w[-1] w -= lr * grad return auc_from_scores(x_test @ w, y_test) def rff_features(x_train: Array, x_test: Array, rng: np.random.Generator, dim: int) -> Tuple[Array, Array]: x_train_s, x_test_s = standardize(x_train, x_test) sample = x_train_s[rng.choice(len(x_train_s), size=min(200, len(x_train_s)), replace=False)] dists = np.sum((sample[:, None, :] - sample[None, :, :]) ** 2, axis=-1) med = np.median(dists[dists > 1e-9]) if np.any(dists > 1e-9) else 1.0 gamma = 1.0 / max(med, 1e-6) w = rng.normal(scale=math.sqrt(2.0 * gamma), size=(x_train_s.shape[1], dim)) b = rng.uniform(0.0, 2.0 * math.pi, size=dim) scale = math.sqrt(2.0 / dim) return scale * np.cos(x_train_s @ w + b), scale * np.cos(x_test_s @ w + b) def fit_rff_auc( x_train: Array, y_train: Array, x_test: Array, y_test: Array, rng: np.random.Generator, dim: int, steps: int, lr: float, ) -> float: z_train, z_test = rff_features(x_train, x_test, rng, dim) return fit_logistic_auc(z_train, y_train, z_test, y_test, rng, steps, lr, l2=1e-3) def fit_mlp_auc( x_train: Array, y_train: Array, x_test: Array, y_test: Array, rng: np.random.Generator, hidden: int, steps: int, lr: float, ) -> float: x_train, x_test = standardize(x_train, x_test) n, d = x_train.shape w1 = rng.normal(scale=0.12 / math.sqrt(max(1, d)), size=(d, hidden)) b1 = np.zeros(hidden) w2 = rng.normal(scale=0.12 / math.sqrt(hidden), size=hidden) b2 = 0.0 y = y_train.astype(float) for _ in range(steps): h = np.tanh(x_train @ w1 + b1) logits = h @ w2 + b2 p = sigmoid(logits) dz = (p - y) / n gw2 = h.T @ dz + 1e-4 * w2 gb2 = float(dz.sum()) dh = dz[:, None] * w2[None, :] * (1.0 - h * h) gw1 = x_train.T @ dh + 1e-4 * w1 gb1 = dh.sum(axis=0) w2 -= lr * gw2 b2 -= lr * gb2 w1 -= lr * gw1 b1 -= lr * gb1 scores = np.tanh(x_test @ w1 + b1) @ w2 + b2 return auc_from_scores(scores, y_test) def moment_sequence_features(seq: Array) -> Array: # Leakage monitor substitute for a shallow sequence model when only NumPy is # available: raw flattened sequence plus low/high-order temporal moments. flat = seq.reshape(seq.shape[0], -1) mean = seq.mean(axis=1) var = seq.var(axis=1) third = ((seq - mean[:, None, :]) ** 3).mean(axis=1) diffs = np.diff(seq, axis=1) diff_stats = np.c_[diffs.mean(axis=1), diffs.var(axis=1)] return np.c_[flat, mean, var, third, diff_stats] def policy_directional_energy(strength: float, cfg: Config, policy: str) -> float: if policy in {"probe", "oracle"}: return cfg.max_directional_energy if policy != "wrong": raise ValueError(policy) return cfg.max_directional_energy - strength * (cfg.max_directional_energy - cfg.min_directional_energy) def sample_rollout( rng: np.random.Generator, cfg: Config, horizon: int, hypothesis: str, policy: str, wrong_strength: float, ) -> Dict[str, Array]: theta_b = rng.uniform(0.0, 2.0 * math.pi) v_b = unit_from_angle(theta_b) theta_q = math.radians(rng.uniform(cfg.theta_min_deg, cfg.theta_max_deg)) if rng.random() < 0.5: theta_q = -theta_q v_q = rotate(v_b, theta_q) v_p = perp(v_b) de_b = policy_directional_energy(wrong_strength, cfg, policy) var_b = cfg.input_energy * de_b var_p = cfg.input_energy * (1.0 - de_b) cov_u = var_b * np.outer(v_b, v_b) + var_p * np.outer(v_p, v_p) total_len = horizon + cfg.delay u = rng.multivariate_normal(np.zeros(2), cov_u, size=total_len) # Match Q-burst strength to the policy's energy along the B direction. # This is the operational version of the one-step indistinguishability # constraint delta_B^2 E[||u_B||^2] ~= delta_Q. delta_q = cfg.delta_b**2 * var_b e = np.empty_like(u) for i in range(total_len): if hypothesis == "B": mean = cfg.delta_b * v_b * float(v_b @ u[i]) cov = (cfg.sigma_w**2) * np.eye(2) elif hypothesis == "Q": mean = np.zeros(2) cov = (cfg.sigma_w**2) * np.eye(2) + delta_q * np.outer(v_q, v_q) else: raise ValueError(hypothesis) e[i] = rng.multivariate_normal(mean, cov) return {"u": u, "e": e, "v_b": v_b, "v_q": v_q, "cov_u": cov_u} def build_dataset( rng: np.random.Generator, cfg: Config, n_per_hypothesis: int, horizon: int, policy: str, wrong_strength: float, ) -> Dict[str, Array]: rows_e: List[Array] = [] rows_u: List[Array] = [] rows_joint: List[Array] = [] labels: List[int] = [] diag_scores: List[float] = [] dir_energy: List[float] = [] pe_vals: List[float] = [] energy_vals: List[float] = [] for label, hyp in [(1, "B"), (0, "Q")]: for _ in range(n_per_hypothesis): r = sample_rollout(rng, cfg, horizon, hyp, policy, wrong_strength) u_full, e_full, v_b, cov_u = r["u"], r["e"], r["v_b"], r["cov_u"] e_eval = e_full[cfg.delay : cfg.delay + horizon] u_eval = u_full[:horizon] u_aligned = u_full[cfg.delay : cfg.delay + horizon] rows_e.append(e_eval.reshape(-1)) rows_u.append(u_eval.reshape(-1)) rows_joint.append(np.c_[u_aligned, e_eval].reshape(-1)) labels.append(label) # Evaluator-only diagnostic: intervention response along true v_B. diag_scores.append(float(np.mean((e_eval @ v_b) * (u_aligned @ v_b)))) dir_energy.append(float(v_b @ cov_u @ v_b / np.trace(cov_u))) pe_vals.append(float(np.linalg.eigvalsh(cov_u).min())) energy_vals.append(float(np.trace(cov_u))) return { "x_e": np.asarray(rows_e), "x_u": np.asarray(rows_u), "x_u_moments": moment_sequence_features(np.asarray(rows_u).reshape(-1, horizon, 2)), "x_joint": np.asarray(rows_joint), "y": np.asarray(labels, dtype=int), "diag_score": np.asarray(diag_scores), "directional_energy_b": np.asarray(dir_energy), "pe_policy": np.asarray(pe_vals), "input_energy": np.asarray(energy_vals), } def classifier_suite_auc( x_train: Array, y_train: Array, x_test: Array, y_test: Array, rng: np.random.Generator, cfg: Config, ) -> Dict[str, float]: return { "linear": fit_logistic_auc( x_train, y_train, x_test, y_test, rng, cfg.train_steps, cfg.learning_rate ), "rff_rbf": fit_rff_auc( x_train, y_train, x_test, y_test, rng, cfg.rff_dim, cfg.train_steps, cfg.learning_rate ), "mlp": fit_mlp_auc( x_train, y_train, x_test, y_test, rng, cfg.mlp_hidden, cfg.train_steps, cfg.learning_rate * 0.6 ), } def evaluate_condition( rng: np.random.Generator, cfg: Config, horizon: int, wrong_strength: float, ) -> Dict[str, object]: train_wrong = build_dataset(rng, cfg, cfg.n_train, horizon, "wrong", wrong_strength) test_wrong = build_dataset(rng, cfg, cfg.n_test, horizon, "wrong", wrong_strength) train_probe = build_dataset(rng, cfg, cfg.n_train // 2, horizon, "probe", 0.0) test_probe = build_dataset(rng, cfg, cfg.n_test // 2, horizon, "probe", 0.0) train_oracle = build_dataset(rng, cfg, cfg.n_train // 2, horizon, "oracle", 0.0) test_oracle = build_dataset(rng, cfg, cfg.n_test // 2, horizon, "oracle", 0.0) residual_aucs = classifier_suite_auc( train_wrong["x_e"], train_wrong["y"], test_wrong["x_e"], test_wrong["y"], rng, cfg ) action_aucs = classifier_suite_auc( train_wrong["x_u"], train_wrong["y"], test_wrong["x_u"], test_wrong["y"], rng, cfg ) action_moment_auc = fit_rff_auc( train_wrong["x_u_moments"], train_wrong["y"], test_wrong["x_u_moments"], test_wrong["y"], rng, cfg.rff_dim, cfg.train_steps, cfg.learning_rate, ) d_probe_auc = auc_from_scores(test_probe["diag_score"], test_probe["y"]) d_oracle_auc = auc_from_scores(test_oracle["diag_score"], test_oracle["y"]) residual_auc_mean = float(np.mean(list(residual_aucs.values()))) action_auc_max = float(max(max(action_aucs.values()), action_moment_auc)) energy = float(test_wrong["input_energy"].mean()) pe = float(test_wrong["pe_policy"].mean()) directional_energy = float(test_wrong["directional_energy_b"].mean()) d_norm = residual_auc_mean / max(energy, 1e-9) criteria = { "probe_high": d_probe_auc > cfg.auc_high, "oracle_high": d_oracle_auc > cfg.auc_high, "pe_high": pe >= cfg.pe_threshold, "energy_high": energy >= cfg.energy_threshold, "directional_energy_low": directional_energy <= cfg.directional_energy_low, "residual_low": residual_auc_mean < cfg.auc_low, "action_leakage_pass": action_auc_max < cfg.auc_action_pass, } return { "horizon": horizon, "wrong_strength": wrong_strength, "D_probe_auc": d_probe_auc, "D_oracle_auc": d_oracle_auc, "AUC_residual": residual_aucs, "AUC_residual_mean": residual_auc_mean, "AUC_action": action_aucs, "AUC_action_moment": action_moment_auc, "AUC_action_max": action_auc_max, "PE_policy": pe, "InputEnergy": energy, "DirectionalEnergy_B": directional_energy, "D_norm": d_norm, "criteria": criteria, "sra_like_pass": all(criteria.values()), } def monotonic_nonincreasing(values: Iterable[float], tol: float = 0.01) -> bool: vals = list(values) return all(vals[i + 1] <= vals[i] + tol for i in range(len(vals) - 1)) def run(cfg: Config) -> Dict[str, object]: rng = np.random.default_rng(cfg.seed) results: List[Dict[str, object]] = [] for horizon in cfg.horizons: for strength in cfg.wrong_strengths: results.append(evaluate_condition(rng, cfg, horizon, strength)) by_horizon: Dict[str, Dict[str, object]] = {} for horizon in cfg.horizons: rows = [r for r in results if r["horizon"] == horizon] by_horizon[str(horizon)] = { "D_norm_nonincreasing": monotonic_nonincreasing([float(r["D_norm"]) for r in rows]), "DirectionalEnergy_B_nonincreasing": monotonic_nonincreasing( [float(r["DirectionalEnergy_B"]) for r in rows] ), "any_sra_like_pass": any(bool(r["sra_like_pass"]) for r in rows), } return {"config": asdict(cfg), "results": results, "by_horizon": by_horizon} def print_summary(report: Dict[str, object]) -> None: print("MOAT v5g minimal experiment") print("=" * 80) print( "horizon strength D_probe D_oracle PE Energy DirE_B " "AUC_resid AUC_action D_norm PASS" ) for r in report["results"]: print( f"{int(r['horizon']):>7} " f"{float(r['wrong_strength']):>8.2f} " f"{float(r['D_probe_auc']):>7.3f} " f"{float(r['D_oracle_auc']):>8.3f} " f"{float(r['PE_policy']):>5.3f} " f"{float(r['InputEnergy']):>6.3f} " f"{float(r['DirectionalEnergy_B']):>6.3f} " f"{float(r['AUC_residual_mean']):>9.3f} " f"{float(r['AUC_action_max']):>10.3f} " f"{float(r['D_norm']):>6.3f} " f"{'YES' if r['sra_like_pass'] else 'no'}" ) print("=" * 80) print("Per-horizon trend checks:") for h, row in report["by_horizon"].items(): print(f" k={h}: {row}") def main() -> None: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--seed", type=int, default=Config.seed) parser.add_argument("--n-train", type=int, default=Config.n_train) parser.add_argument("--n-test", type=int, default=Config.n_test) parser.add_argument("--out", type=Path, default=Path("moat_v5g_results.json")) parser.add_argument("--quick", action="store_true", help="Use a smaller run for smoke tests.") args = parser.parse_args() cfg = Config(seed=args.seed, n_train=args.n_train, n_test=args.n_test) if args.quick: cfg = Config(seed=args.seed, n_train=180, n_test=90, train_steps=80, rff_dim=80) report = run(cfg) print_summary(report) args.out.write_text(json.dumps(report, indent=2), encoding="utf-8") print(f"\nWrote {args.out}") if __name__ == "__main__": main()