#!/usr/bin/env python3 """Supervised kernelized chamber learning + dense-lattice implementation. This variant is tuned for the paper's supervised pipeline and for stable practical use. It supports: * demo-gcd on objects {±1,...,±100}, * a built-in conceptual-spaces drinks demo, * root-lattice targets A2,...,A8 and E8, plus the rooted 24D Niemeier choice D24PLUS, * an experimental Conway/Lorentz 24D option that is OFF by default in practice. Important design choice: The standard recommended lattices are A2..A8 and E8. The optional CONWAY_L24_EXPERIMENTAL basis is only a numerical Lorentz-quotient candidate built from the isotropic vector (0,1,2,...,24;70). It is not guaranteed to realize the actual Leech lattice chamber construction faithfully. """ import argparse import json import math import os from dataclasses import dataclass from typing import Dict, List, Optional, Sequence, Tuple import matplotlib matplotlib.use('Agg') import matplotlib.pyplot as plt import numpy as np import pandas as pd from scipy.optimize import minimize from scipy.special import logsumexp def ensure_dir(path: str) -> None: os.makedirs(path, exist_ok=True) def sigmoid(x: np.ndarray) -> np.ndarray: x = np.clip(x, -60.0, 60.0) return 1.0 / (1.0 + np.exp(-x)) def softplus(x: np.ndarray) -> np.ndarray: x = np.clip(x, -60.0, 60.0) return np.where(x > 20.0, x, np.log1p(np.exp(x))) def symmetrize_psd(K: np.ndarray, tol: float = 1e-10) -> np.ndarray: K = 0.5 * (K + K.T) evals, evecs = np.linalg.eigh(K) evals = np.where(evals > tol, evals, 0.0) return (evecs * evals) @ evecs.T def center_gram(K: np.ndarray) -> np.ndarray: n = K.shape[0] H = np.eye(n) - np.ones((n, n), dtype=float) / n return H @ K @ H def smooth_min(X: np.ndarray, tau: float) -> Tuple[np.ndarray, np.ndarray]: Y = -X / tau lse = logsumexp(Y, axis=1, keepdims=True) W = np.exp(Y - lse) vals = (-tau * lse[:, 0]) return vals, W # ---------------- Demo kernel / axioms ---------------- def demo_objects_pm1_pm100() -> List[int]: return [i for i in range(-100, 0)] + [i for i in range(1, 101)] def gcd_kernel(a: int, b: int) -> float: g = math.gcd(abs(a), abs(b)) return (g * g) / (a * b) def build_demo_gram(objects: Sequence[int]) -> np.ndarray: n = len(objects) K = np.empty((n, n), dtype=float) for i, a in enumerate(objects): for j, b in enumerate(objects): K[i, j] = gcd_kernel(a, b) return K def normalized_perspective_truth(values: Sequence[int], w: int = 3) -> np.ndarray: return np.array([(1.0 + gcd_kernel(w, a)) / 2.0 for a in values], dtype=float) def build_demo_axiom_edges(objects: Sequence[int], axiom_abs_max: int = 10, w: int = 3): axioms = [a for a in objects if 1 <= abs(a) <= axiom_abs_max] tau = normalized_perspective_truth(axioms, w=w) imp = np.minimum(1.0, 1.0 - tau[:, None] + tau[None, :]) label_to_idx = {str(x): i for i, x in enumerate(objects)} pos_edges = [] for i, a in enumerate(axioms): for j, b in enumerate(axioms): if i != j and imp[i, j] >= 1.0 - 1e-12: pos_edges.append((label_to_idx[str(a)], label_to_idx[str(b)])) truth_df = pd.DataFrame({'object': axioms, f'tau_w{w}': tau}) imp_df = pd.DataFrame(imp, index=[str(a) for a in axioms], columns=[str(a) for a in axioms]) return pos_edges, truth_df, imp_df # ---------------- Conceptual-spaces drinks demo ---------------- def build_concept_drinks_demo() -> Tuple[List[str], np.ndarray, List[Tuple[int, int]], pd.DataFrame]: data = [ ('water', 0.00, 0.00, 0.00), ('sparkling_water', 0.00, 0.00, 0.00), ('black_coffee', 0.05, 0.00, 0.00), ('espresso', 0.02, 0.00, 0.00), ('iced_tea', 0.45, 0.00, 0.20), ('cola', 0.85, 0.00, 0.10), ('lemonade', 0.82, 0.00, 0.55), ('orange_juice', 0.70, 0.00, 0.95), ('milk', 0.10, 0.55, 0.00), ('latte', 0.30, 0.45, 0.00), ('hot_chocolate', 0.80, 0.70, 0.10), ('milkshake', 0.92, 0.95, 0.60), ] df = pd.DataFrame(data, columns=['object', 'sweetness', 'creaminess', 'fruitiness']) labels = df['object'].tolist() X = df[['sweetness', 'creaminess', 'fruitiness']].values.astype(float) # isotropic Gaussian / RBF conceptual similarity pair_sq = np.sum((X[:, None, :] - X[None, :, :]) ** 2, axis=2) sigma2 = 0.18 K = np.exp(-pair_sq / (2.0 * sigma2)) idx = {lbl: i for i, lbl in enumerate(labels)} edges_named = [ ('milkshake', 'cola'), ('milkshake', 'water'), ('cola', 'water'), ('orange_juice', 'water'), ] pos_edges = [(idx[a], idx[b]) for a, b in edges_named] return labels, K, pos_edges, df # ---------------- Reading files ---------------- def read_gram_csv(path: str) -> Tuple[List[str], np.ndarray]: df = pd.read_csv(path, index_col=0) return list(map(str, df.index.tolist())), df.values.astype(float) def read_edges(path: str, labels: Sequence[str]) -> List[Tuple[int, int]]: label_to_idx = {str(x): i for i, x in enumerate(labels)} if path.lower().endswith('.json'): with open(path, 'r', encoding='utf-8') as f: obj = json.load(f) if 'edges' not in obj: raise ValueError('JSON must contain key "edges".') return [ (label_to_idx[str(s)], label_to_idx[str(t)]) for s, t in obj['edges'] if str(s) in label_to_idx and str(t) in label_to_idx and str(s) != str(t) ] df0 = pd.read_csv(path) cols = [c.lower() for c in df0.columns] if 'source' in cols and 'target' in cols: s_col = df0.columns[cols.index('source')] t_col = df0.columns[cols.index('target')] out = [] for s, t in zip(df0[s_col], df0[t_col]): s, t = str(s), str(t) if s in label_to_idx and t in label_to_idx and s != t: out.append((label_to_idx[s], label_to_idx[t])) return out df = pd.read_csv(path, index_col=0) row_labels = list(map(str, df.index.tolist())) col_labels = list(map(str, df.columns.tolist())) out = [] for i, rl in enumerate(row_labels): for j, cl in enumerate(col_labels): if float(df.iloc[i, j]) > 0.5 and rl in label_to_idx and cl in label_to_idx and rl != cl: out.append((label_to_idx[rl], label_to_idx[cl])) return out # ---------------- Lattice families ---------------- def a_n_cartan(n: int) -> np.ndarray: G = 2.0 * np.eye(n) for i in range(n - 1): G[i, i + 1] = -1.0 G[i + 1, i] = -1.0 return G def d_n_cartan(n: int) -> np.ndarray: if n < 4: raise ValueError('D_n requires n >= 4.') G = 2.0 * np.eye(n) for i in range(n - 1): G[i, i + 1] = -1.0 G[i + 1, i] = -1.0 G[n - 3, n - 1] = -1.0 G[n - 1, n - 3] = -1.0 G[n - 2, n - 1] = 0.0 G[n - 1, n - 2] = 0.0 return G def d24plus_quant_basis() -> Tuple[np.ndarray, np.ndarray, float, Dict[str, float]]: # Quantization basis for the Niemeier lattice with root system D24: # D24^+ = D24 union (D24 + (1/2,...,1/2)). # Chamber basis remains the standard D24 simple-root basis. n = 24 E = np.eye(n) roots = [] for i in range(n - 2): roots.append(E[i] - E[i + 1]) roots.append(E[n - 2] - E[n - 1]) roots.append(E[n - 2] + E[n - 1]) chamber = np.column_stack(roots) quant = [] for i in range(n - 1): quant.append(E[i] - E[i + 1]) quant.append(0.5 * np.ones(n)) quant = np.column_stack(quant) info = { 'family': 'niemeier_D24_plus', 'root_system': 'D24', 'quant_basis_determinant_abs': float(abs(np.linalg.det(quant))), 'chamber_basis_determinant_abs': float(abs(np.linalg.det(chamber))), } return quant, chamber, 2.0, info def conway_lorentz_candidate_basis() -> Tuple[np.ndarray, float, Dict[str, float]]: """Experimental 24D basis from the isotropic vector (0,1,...,24;70). Ambient form: diag(1,...,1,-1) on R^26. We build w^perp numerically, then quotient out the null line by dropping the null eigen-direction of the induced Gram form. This yields a positive-definite 24D Euclidean basis candidate, but it is *not* guaranteed to be the true Leech lattice basis. """ w = np.array(list(range(25)) + [70], dtype=float) # One linear equation sum_{i=0}^{24} i x_i - 70 x_25 = 0. # A convenient spanning set for w^perp consists of standard nullspace basis vectors: N = [] for j in range(25): v = np.zeros(26, dtype=float) v[j] = 70.0 v[25] = float(j) N.append(v) N = np.stack(N, axis=1) # 26 x 25 G_lor = np.diag([1.0] * 25 + [-1.0]) Gram25 = N.T @ G_lor @ N vals, U = np.linalg.eigh(0.5 * (Gram25 + Gram25.T)) order = np.argsort(vals) vals = vals[order] U = U[:, order] # Drop the numerically null direction corresponding to the image of w. pos_vals = vals[1:] U24 = U[:, 1:] if np.min(pos_vals) <= 1e-10: raise ValueError('Experimental Conway quotient basis is numerically singular.') Gram24 = U24.T @ Gram25 @ U24 B = np.linalg.cholesky(0.5 * (Gram24 + Gram24.T)).T diag = np.diag(Gram24) info = { 'experimental_conway_min_eig': float(np.min(np.linalg.eigvalsh(Gram24))), 'experimental_conway_max_eig': float(np.max(np.linalg.eigvalsh(Gram24))), 'experimental_conway_min_diag': float(np.min(diag)), 'experimental_conway_max_diag': float(np.max(diag)), } # This candidate does not certify the Leech minimal norm 4, so keep nn target at 1. return B, 1.0, info def lattice_basis(name: str) -> Tuple[str, np.ndarray, np.ndarray, float, Dict[str, float]]: name_u = name.upper() if name_u.startswith('A') and name_u[1:].isdigit(): n = int(name_u[1:]) if not (2 <= n <= 8): raise ValueError('Supported A_n targets are A2..A8.') G = a_n_cartan(n) B = np.linalg.cholesky(G).T return f'A{n}', B, B, 2.0, {'family': 'A_n_root_lattice'} if name_u in ('A3_FCC', 'FCC'): G = a_n_cartan(3) B = np.linalg.cholesky(G).T return 'A3_FCC', B, B, 2.0, {'family': 'A_n_root_lattice'} if name_u == 'E8': G = np.array([ [2, -1, 0, 0, 0, 0, 0, 0], [-1, 2, -1, 0, 0, 0, 0, 0], [0, -1, 2, -1, 0, 0, 0, 0], [0, 0, -1, 2, -1, 0, 0, 0], [0, 0, 0, -1, 2, -1, 0, -1], [0, 0, 0, 0, -1, 2, -1, 0], [0, 0, 0, 0, 0, -1, 2, 0], [0, 0, 0, 0, -1, 0, 0, 2], ], dtype=float) B = np.linalg.cholesky(G).T return 'E8', B, B, 2.0, {'family': 'exceptional_root_lattice'} if name_u in ('D24PLUS', 'NIEMEIER_D24', 'D24+', 'NIEMEIER24'): Bq, S, min_norm_sq, info = d24plus_quant_basis() return 'D24PLUS', Bq, S, min_norm_sq, info if name_u in ('CONWAY_L24_EXPERIMENTAL', 'L24X'): B, min_norm_sq, info = conway_lorentz_candidate_basis() info = dict(info) info['family'] = 'experimental_lorentz_quotient' info['warning'] = 'This is an experimental Conway/Lorentz quotient candidate, not a certified Leech basis.' return 'CONWAY_L24_EXPERIMENTAL', B, B, min_norm_sq, info raise ValueError(f'Unknown lattice {name}. Use A2..A8, E8, D24PLUS, or CONWAY_L24_EXPERIMENTAL.') # ---------------- KPCA + supervised chamber learning ---------------- @dataclass class LearningResult: F: np.ndarray Kc: np.ndarray kpca_init: np.ndarray report: Dict[str, float] def kpca_coords(Kc: np.ndarray, d: int) -> np.ndarray: evals, evecs = np.linalg.eigh(Kc) order = np.argsort(evals)[::-1] evals = evals[order] evecs = evecs[:, order] take = min(d, int(np.sum(evals > 1e-10))) Z = np.zeros((Kc.shape[0], d), dtype=float) if take > 0: Z[:, :take] = evecs[:, :take] * np.sqrt(np.maximum(evals[:take], 0.0)) return Z def sample_negative_edges(n: int, pos_edges: Sequence[Tuple[int, int]], max_negs: int = 4000) -> List[Tuple[int, int]]: pos_set = set(pos_edges) negs: List[Tuple[int, int]] = [] for i, j in pos_edges: if i != j and (j, i) not in pos_set: negs.append((j, i)) if len(negs) < max_negs: for i in range(n): for j in range(n): if i == j or (i, j) in pos_set: continue negs.append((i, j)) if len(negs) >= max_negs: break if len(negs) >= max_negs: break seen = set() uniq = [] for e in negs: if e not in seen: uniq.append(e) seen.add(e) return uniq[:max_negs] def edge_satisfaction(F: np.ndarray, edges: Sequence[Tuple[int, int]]) -> float: if not edges: return float('nan') i = np.array([a for a, _ in edges], dtype=int) j = np.array([b for _, b in edges], dtype=int) D = F[j] - F[i] return float(np.mean(np.min(D, axis=1) >= -1e-10)) def learn_supervised_chamber_coordinates( K: np.ndarray, pos_edges: Sequence[Tuple[int, int]], d: int, margin: float = 0.5, c_pos: float = 1.0, c_neg: float = 0.2, reg: float = 1e-2, neg_tau: float = 0.25, maxiter: int = 200, ) -> LearningResult: n = K.shape[0] Kc = symmetrize_psd(center_gram(K)) Z0 = kpca_coords(Kc, d) ridge_eye = 1e-6 * np.eye(n) A0 = np.linalg.pinv(Kc + ridge_eye) @ Z0 neg_edges = sample_negative_edges(n, pos_edges) pos_i = np.array([i for i, _ in pos_edges], dtype=int) pos_j = np.array([j for _, j in pos_edges], dtype=int) neg_i = np.array([i for i, _ in neg_edges], dtype=int) neg_j = np.array([j for _, j in neg_edges], dtype=int) def unpack(v: np.ndarray) -> np.ndarray: return v.reshape(n, d) def objective(v: np.ndarray) -> Tuple[float, np.ndarray]: A = unpack(v) F = Kc @ A KA = Kc @ A loss = 0.0 gradF = np.zeros_like(F) if len(pos_i) > 0: Dp = F[pos_j] - F[pos_i] Zp = margin - Dp Sp = sigmoid(Zp) loss += c_pos * float(np.sum(softplus(Zp))) Gp = c_pos * Sp for r in range(d): np.add.at(gradF[:, r], pos_i, Gp[:, r]) np.add.at(gradF[:, r], pos_j, -Gp[:, r]) if len(neg_i) > 0 and c_neg > 0: Dn = F[neg_j] - F[neg_i] mvals, weights = smooth_min(Dn, tau=neg_tau) Zn = margin + mvals Sn = c_neg * sigmoid(Zn) loss += float(np.sum(c_neg * softplus(Zn))) gradDn = Sn[:, None] * weights for r in range(d): np.add.at(gradF[:, r], neg_i, -gradDn[:, r]) np.add.at(gradF[:, r], neg_j, gradDn[:, r]) reg_term = 0.5 * reg * float(np.sum(A * KA)) loss += reg_term gradA = Kc @ gradF + reg * (Kc @ A) return float(loss), gradA.ravel() res = minimize( lambda x: objective(x)[0], A0.ravel(), jac=lambda x: objective(x)[1], method='L-BFGS-B', options={'maxiter': maxiter}, ) A = unpack(res.x) F = Kc @ A report = { 'success': bool(res.success), 'status': int(res.status), 'message': str(res.message), 'objective': float(res.fun), 'n_iter': int(getattr(res, 'nit', -1)), 'dimension': int(d), 'n_pos_edges': int(len(pos_edges)), 'n_neg_edges': int(len(neg_edges)), 'train_edge_satisfaction_learned': edge_satisfaction(F, pos_edges), 'train_edge_satisfaction_kpca_init': edge_satisfaction(Z0, pos_edges), } return LearningResult(F=F, Kc=Kc, kpca_init=Z0, report=report) # ---------------- Quantization ---------------- def babai_local(B: np.ndarray, z: np.ndarray, max_rounds: int = 100) -> np.ndarray: c = np.linalg.lstsq(B, z, rcond=None)[0] m = np.rint(c).astype(int) best = float(np.linalg.norm(z - B @ m) ** 2) d = len(m) improved = True rounds = 0 while improved and rounds < max_rounds: improved = False rounds += 1 for i in range(d): for sgn in (-1, 1): cand = m.copy() cand[i] += sgn val = float(np.linalg.norm(z - B @ cand) ** 2) if val + 1e-12 < best: m, best = cand, val improved = True return m def fixed_nn_scale(min_norm_sq: float, target_nn_distance: float = 1.0) -> float: return target_nn_distance / math.sqrt(min_norm_sq) def quantize_all( F: np.ndarray, B: np.ndarray, min_norm_sq: float, scale_mode: str = 'fixed_nn', scale_value: Optional[float] = None, target_nn_distance: float = 1.0, ) -> Tuple[np.ndarray, np.ndarray, float]: def quantize_with_scale(s: float): Bs = s * B coeffs, quantized, errs = [], [], [] for z in F: m = babai_local(Bs, z) q = Bs @ m coeffs.append(m) quantized.append(q) errs.append(float(np.sum((z - q) ** 2))) return np.asarray(coeffs, dtype=int), np.asarray(quantized, dtype=float), float(np.mean(errs)) if scale_value is not None: coeffs, Q, _ = quantize_with_scale(float(scale_value)) return coeffs, Q, float(scale_value) if scale_mode == 'fixed_nn': s = fixed_nn_scale(min_norm_sq, target_nn_distance=target_nn_distance) coeffs, Q, _ = quantize_with_scale(s) return coeffs, Q, float(s) if scale_mode != 'auto': raise ValueError("scale_mode must be 'fixed_nn' or 'auto'.") row_norm = np.median(np.linalg.norm(F, axis=1)) + 1e-9 col_norm = np.median(np.linalg.norm(B, axis=0)) + 1e-9 base = row_norm / col_norm candidates = base * (2.0 ** np.linspace(-4, 4, 17)) best = None for s in candidates: coeffs, quantized, err = quantize_with_scale(float(s)) if best is None or err < best[2]: best = (coeffs, quantized, err, float(s)) assert best is not None return best[0], best[1], best[3] # ---------------- Logic ---------------- def chamber_rescale(coords: np.ndarray) -> np.ndarray: lo = coords.min(axis=0) hi = coords.max(axis=0) den = np.where(hi > lo, hi - lo, 1.0) return (coords - lo) / den def hard_truth(chamber_coords: np.ndarray, tol: float = 1e-9) -> np.ndarray: diff = chamber_coords[None, :, :] - chamber_coords[:, None, :] return (diff.min(axis=2) >= -tol).astype(int) def fuzzy_truth(chamber_coords: np.ndarray) -> np.ndarray: diff = chamber_coords[None, :, :] - chamber_coords[:, None, :] margin = diff.min(axis=2) return sigmoid(margin) def hard_entailment_from_truth(T: np.ndarray) -> np.ndarray: n = T.shape[0] out = np.zeros((n, n), dtype=int) for a in range(n): for b in range(n): out[a, b] = int(np.all(T[a, :] <= T[b, :])) return out def godel_imp(u: np.ndarray, v: np.ndarray) -> np.ndarray: return np.where(u <= v, 1.0, v) def luka_imp(u: np.ndarray, v: np.ndarray) -> np.ndarray: return np.minimum(1.0, 1.0 - u + v) def global_entailment(MU: np.ndarray, mode: str) -> np.ndarray: n = MU.shape[0] out = np.zeros((n, n), dtype=float) imp = godel_imp if mode == 'godel' else luka_imp for a in range(n): ua = MU[a] for b in range(n): out[a, b] = float(np.min(imp(ua, MU[b]))) return out # ---------------- Save / plots ---------------- def save_matrix_csv(path: str, labels: Sequence[str], M: np.ndarray) -> None: pd.DataFrame(M, index=labels, columns=labels).to_csv(path) def save_coords_csv(path: str, labels: Sequence[str], X: np.ndarray, prefix: str) -> None: cols = [f'{prefix}{i+1}' for i in range(X.shape[1])] df = pd.DataFrame(X, columns=cols) df.insert(0, 'object', labels) df.to_csv(path, index=False) def save_truth_csv(path: str, labels: Sequence[str], M: np.ndarray, prefix: str) -> None: df = pd.DataFrame(M.T, columns=[f'{prefix}_{lbl}' for lbl in labels]) df.insert(0, 'world', labels) df.to_csv(path, index=False) def heatmap(path: str, M: np.ndarray, title: str) -> None: fig, ax = plt.subplots(figsize=(8, 7)) im = ax.imshow(M, aspect='auto') ax.set_title(title) ax.set_xlabel('consequent') ax.set_ylabel('antecedent') fig.colorbar(im, ax=ax) fig.savefig(path, dpi=180, bbox_inches='tight') plt.close(fig) def plot_2d(path: str, labels: Sequence[str], F: np.ndarray, Q: np.ndarray, title: str) -> None: fig, ax = plt.subplots(figsize=(7, 7)) ax.scatter(F[:, 0], F[:, 1], s=10, label='learned') ax.scatter(Q[:, 0], Q[:, 1], s=10, marker='x', label='quantized') for i, lbl in enumerate(labels[:30]): ax.text(F[i, 0], F[i, 1], lbl, fontsize=6) ax.legend() ax.set_title(title) ax.set_aspect('equal', adjustable='box') fig.savefig(path, dpi=180, bbox_inches='tight') plt.close(fig) # ---------------- Main ---------------- def main() -> None: ap = argparse.ArgumentParser(description='Supervised kernelized chamber learning + lattice implementation layer') ap.add_argument('--demo-gcd', action='store_true', help='use demo objects ±1..±100 with gcd kernel and expert axioms ±1..±10 via perspective w=3') ap.add_argument('--demo-concept-drinks', action='store_true', help='use built-in conceptual spaces drinks demo with 12 objects and 4 axioms') ap.add_argument('--gram', help='Gram matrix CSV with labels in first column/header') ap.add_argument('--edges', help='expert entailment graph: edge list CSV/JSON or adjacency CSV') ap.add_argument('--perspective', type=int, default=3) ap.add_argument('--axiom-abs-max', type=int, default=10) ap.add_argument('--dim', '--dimension', dest='dimension', type=int, default=2) ap.add_argument('--lattice', default='A2', choices=['A2', 'A3', 'A4', 'A5', 'A6', 'A7', 'A8', 'E8', 'D24PLUS', 'CONWAY_L24_EXPERIMENTAL']) ap.add_argument('--logic', default='all', choices=['hard', 'godel', 'lukasiewicz', 'all']) ap.add_argument('--maxiter', type=int, default=200) ap.add_argument('--margin', type=float, default=0.5) ap.add_argument('--cpos', type=float, default=1.0) ap.add_argument('--cneg', type=float, default=0.2) ap.add_argument('--reg', type=float, default=1e-2) ap.add_argument('--neg-tau', type=float, default=0.25) ap.add_argument('--scale-mode', default='fixed_nn', choices=['fixed_nn', 'auto']) ap.add_argument('--scale', type=float, default=None, help='explicit quantization scale; overrides --scale-mode') ap.add_argument('--target-nn-distance', type=float, default=1.0) ap.add_argument('--outdir', default='kernel_chamber_out') args = ap.parse_args() ensure_dir(args.outdir) n_demos = int(args.demo_gcd) + int(args.demo_concept_drinks) if n_demos > 1: raise ValueError('Choose at most one built-in demo at a time.') dataset_info: Dict[str, object] = {} if args.demo_gcd: objects = demo_objects_pm1_pm100() labels = [str(x) for x in objects] K = build_demo_gram(objects) save_matrix_csv(os.path.join(args.outdir, 'demo_gram.csv'), labels, K) if args.edges: pos_edges = read_edges(args.edges, labels) else: pos_edges, truth_df, imp_df = build_demo_axiom_edges(objects, axiom_abs_max=args.axiom_abs_max, w=args.perspective) truth_df.to_csv(os.path.join(args.outdir, 'axiom_truth.csv'), index=False) imp_df.to_csv(os.path.join(args.outdir, 'axiom_lukasiewicz_implication.csv')) pd.DataFrame({'source': [labels[i] for i, j in pos_edges], 'target': [labels[j] for i, j in pos_edges]}).to_csv( os.path.join(args.outdir, 'axiom_edges.csv'), index=False ) dataset_info['dataset'] = 'demo_gcd_pm1_pm100' elif args.demo_concept_drinks: labels, K, pos_edges, feature_df = build_concept_drinks_demo() feature_df.to_csv(os.path.join(args.outdir, 'concept_drinks_features.csv'), index=False) save_matrix_csv(os.path.join(args.outdir, 'concept_drinks_gram.csv'), labels, K) pd.DataFrame({'source': [labels[i] for i, j in pos_edges], 'target': [labels[j] for i, j in pos_edges]}).to_csv( os.path.join(args.outdir, 'concept_drinks_axiom_edges.csv'), index=False ) dataset_info['dataset'] = 'conceptual_spaces_drinks' else: if not args.gram or not args.edges: raise ValueError('Either use a built-in demo or provide both --gram and --edges.') labels, K = read_gram_csv(args.gram) pos_edges = read_edges(args.edges, labels) dataset_info['dataset'] = 'custom_csv_input' lattice_name, B, chamber_basis, min_norm_sq, lattice_info = lattice_basis(args.lattice) if B.shape[1] != args.dimension: print(f'Info: requested dim {args.dimension} overridden by lattice dimension {B.shape[1]} for {lattice_name}.') d = B.shape[1] result = learn_supervised_chamber_coordinates( K=K, pos_edges=pos_edges, d=d, margin=args.margin, c_pos=args.cpos, c_neg=args.cneg, reg=args.reg, neg_tau=args.neg_tau, maxiter=args.maxiter, ) F = result.F coeffs, Q, chosen_scale = quantize_all( F, B, min_norm_sq=min_norm_sq, scale_mode=args.scale_mode, scale_value=args.scale, target_nn_distance=args.target_nn_distance, ) eta_learned = chamber_rescale(F) chamber_coords = np.linalg.solve(chamber_basis, Q.T).T eta_quantized = chamber_rescale(chamber_coords) save_coords_csv(os.path.join(args.outdir, 'kpca_init_coords.csv'), labels, result.kpca_init, 'z') save_coords_csv(os.path.join(args.outdir, 'learned_coords.csv'), labels, F, 'f') save_coords_csv(os.path.join(args.outdir, 'learned_coords_rescaled.csv'), labels, eta_learned, 'eta') save_coords_csv(os.path.join(args.outdir, 'lattice_coeffs.csv'), labels, coeffs, 'c') save_coords_csv(os.path.join(args.outdir, 'lattice_coords_rescaled.csv'), labels, eta_quantized, 'etaq') save_coords_csv(os.path.join(args.outdir, 'quantized_chamber_coords.csv'), labels, chamber_coords, 'qc') if d == 2: plot_2d(os.path.join(args.outdir, 'learned_vs_quantized.png'), labels, F, Q, f'{lattice_name}: learned vs quantized') H = hard_truth(chamber_coords) MU = fuzzy_truth(chamber_coords) save_truth_csv(os.path.join(args.outdir, 'hard_truth_table.csv'), labels, H, 'P') save_truth_csv(os.path.join(args.outdir, 'fuzzy_truth_table.csv'), labels, MU, 'mu') HE = hard_entailment_from_truth(H) save_matrix_csv(os.path.join(args.outdir, 'hard_entailment.csv'), labels, HE) heatmap(os.path.join(args.outdir, 'hard_entailment_heatmap.png'), HE, f'{lattice_name}: hard entailment') if args.logic in ('godel', 'all'): GE = global_entailment(MU, 'godel') save_matrix_csv(os.path.join(args.outdir, 'godel_entailment.csv'), labels, GE) heatmap(os.path.join(args.outdir, 'godel_entailment_heatmap.png'), GE, f'{lattice_name}: Gödel entailment') if args.logic in ('lukasiewicz', 'all'): LE = global_entailment(MU, 'lukasiewicz') save_matrix_csv(os.path.join(args.outdir, 'lukasiewicz_entailment.csv'), labels, LE) heatmap(os.path.join(args.outdir, 'lukasiewicz_entailment_heatmap.png'), LE, f'{lattice_name}: Łukasiewicz entailment') uniq, counts = np.unique(coeffs, axis=0, return_counts=True) report = dict(result.report) report.update(dataset_info) report.update(lattice_info) report.update({ 'pipeline': 'supervised_kernel_chamber_learning', 'used_centered_gram': True, 'n_objects': int(len(labels)), 'lattice': lattice_name, 'quantization_scale_mode': args.scale_mode if args.scale is None else 'explicit', 'quantization_scale': float(chosen_scale), 'target_nn_distance': float(args.target_nn_distance), 'mean_quantization_error': float(np.mean(np.sum((F - Q) ** 2, axis=1))), 'n_unique_lattice_points': int(len(uniq)), 'top_multiplicities': [int(x) for x in sorted(counts, reverse=True)[:10]], 'train_edge_satisfaction_quantized': edge_satisfaction(chamber_coords, pos_edges), }) with open(os.path.join(args.outdir, 'optimization_report.json'), 'w', encoding='utf-8') as f: json.dump(report, f, indent=2) print(json.dumps(report, indent=2)) if __name__ == '__main__': main()