// Package runlive ties the harness modules together into a single "run a
// prompt against an org" function. Used by both `cmd/harness` and the HTTP
// API (Phase M). Keeps the assembly logic in one place.
package runlive

import (
	"context"
	"fmt"
	"os"
	"os/exec"
	"path/filepath"
	"strings"
	"time"

	"github.com/flothus/tmux-xterm-research/server-go/internal/harness/envelope"
	"github.com/flothus/tmux-xterm-research/server-go/internal/harness/evaluator"
	"github.com/flothus/tmux-xterm-research/server-go/internal/harness/event"
	"github.com/flothus/tmux-xterm-research/server-go/internal/harness/orchestrator"
	"github.com/flothus/tmux-xterm-research/server-go/internal/harness/org"
	"github.com/flothus/tmux-xterm-research/server-go/internal/harness/prompt"
	"github.com/flothus/tmux-xterm-research/server-go/internal/harness/runtime"
	"github.com/flothus/tmux-xterm-research/server-go/internal/harness/runtime/cli"
	"github.com/flothus/tmux-xterm-research/server-go/internal/harness/runtime/scripted"
	"github.com/flothus/tmux-xterm-research/server-go/internal/harness/runtime/tmux"
	"github.com/flothus/tmux-xterm-research/server-go/internal/harness/runtime/tmux/claudecode"
	"github.com/flothus/tmux-xterm-research/server-go/internal/harness/runtime/tmux/codex"
	"github.com/flothus/tmux-xterm-research/server-go/internal/harness/runtime/tmux/gemini"
	"github.com/flothus/tmux-xterm-research/server-go/internal/harness/runtime/tmux/sidecarchan"
	"github.com/flothus/tmux-xterm-research/server-go/internal/harness/store"
	"github.com/flothus/tmux-xterm-research/server-go/internal/configsvc"
	"github.com/flothus/tmux-xterm-research/server-go/internal/harness/deterministic"
	"github.com/flothus/tmux-xterm-research/server-go/internal/harness/transport"
)

// Config is the input to Run.
type Config struct {
	OrgPath        string        // path to org yaml
	Prompt         string        // user prompt
	RunID          string        // optional; generated if empty
	RuntimeMode    string        // "claude-code", "scripted", or "auto"
	MaxTurns       int           // ceiling on total Agent.HandleOne calls
	MaxWall        time.Duration // ceiling on wall-clock per run
	IdleTimeout    time.Duration // sleep when no agent has work
	TmuxLogsDir    string        // where to write per-agent pane logs (claude-code mode)
	// StallTimeout is how long the watchdog will wait without any event for
	// this run before declaring it stalled and killing it. 0 disables (default
	// 90s). Useful for catching deadlock-class bugs without leaving runs in
	// 'running' forever.
	StallTimeout time.Duration
	// OC4: optional org/role config façade. When non-nil, in-run agents
	// can call org.* / role.* tools to build/validate orgs. Wired by the
	// server when /api/config is mounted.
	ConfigSvc *configsvc.Service
	// ProjectRoot is the absolute path the harness treats as `.` for agent
	// file writes. Defaults to the dir of the org yaml's grandparent (i.e.
	// the repo root that contains `.td/`). Without this, tests running
	// from a subdir would resolve `.td/...` paths into the wrong place.
	ProjectRoot string
}

// Result is the structured outcome of a run.
type Result struct {
	RunID           string
	OrgName         string
	Status          string
	FailureCategory string
	FailureDetail   string
	KillReason      string // legacy: equals FailureDetail; retained for callers that haven't migrated
	TasksTotal      int
	TasksDone       int
	TokensTotal     int64
	WallSeconds     float64
	StartedAt       time.Time
	EndedAt         time.Time
}

// Run executes one full harness run against the given org and prompt. It:
//   1. Loads the org yaml.
//   2. Creates a run row.
//   3. Spawns all roles via OrgRunner with the runtime selected by RuntimeMode.
//   4. Builds a Master tied to the runner.
//   5. Dispatches the prompt.
//   6. Polls all agents in round-robin, calling HandleOne on each, until the
//      run terminates (root task done, ceiling hit, or ctx canceled).
//   7. Returns a Result.
//
// The caller owns the store/bus/queue/orch so multiple concurrent runs can
// share them (the dashboard creates them once at boot).
func Run(ctx context.Context, st *store.Store, bus *event.Bus, q *transport.Queue, orch *orchestrator.Orchestrator, cfg Config) (*Result, error) {
	if cfg.OrgPath == "" {
		return nil, fmt.Errorf("runlive: OrgPath required")
	}
	def, err := org.Load(cfg.OrgPath)
	if err != nil {
		return nil, fmt.Errorf("runlive: load org: %w", err)
	}
	if cfg.MaxTurns == 0 {
		cfg.MaxTurns = 200
	}
	if cfg.MaxWall == 0 {
		cfg.MaxWall = 5 * time.Minute
	}
	if cfg.IdleTimeout == 0 {
		cfg.IdleTimeout = 50 * time.Millisecond
	}
	if cfg.StallTimeout == 0 {
		cfg.StallTimeout = 90 * time.Second
	}
	if cfg.RunID == "" {
		cfg.RunID = fmt.Sprintf("run_%d", time.Now().UnixNano())
	}

	// V93: sweep orphaned 'running' runs from interrupted previous
	// sessions before creating ours. 1-hour threshold is generous enough
	// that legitimate concurrent runs from sibling processes aren't
	// false-positive killed.
	if swept, err := orch.SweepOrphanedRuns(ctx, 1*time.Hour); err == nil && swept > 0 {
		_, _ = bus.Emit(ctx, event.Event{
			Kind: "run.orphans_swept", Payload: map[string]any{"count": swept},
		})
	}
	// V102: also sweep agents whose parent run is already terminal.
	// These accumulate across server restarts and lie about which
	// agents are alive.
	if swept, err := orch.SweepOrphanedAgents(ctx); err == nil && swept > 0 {
		_, _ = bus.Emit(ctx, event.Event{
			Kind: "agent.orphans_swept", Payload: map[string]any{"count": swept},
		})
	}
	// L5: sweep live tmux sessions that don't correspond to a 'running'
	// agent. Pre-fix, Ctrl-C'd runs left harness-* tmux sessions alive
	// indefinitely; user saw 10 stale sessions still holding open claude
	// subprocesses.
	if killed, err := sweepOrphanedTmuxSessions(ctx, st); err == nil && killed > 0 {
		_, _ = bus.Emit(ctx, event.Event{
			Kind: "tmux.orphans_swept", Payload: map[string]any{"count": killed},
		})
	}
	if err := orch.CreateRun(ctx, cfg.RunID, cfg.Prompt); err != nil {
		return nil, err
	}
	if err := orch.SetRunOrg(ctx, cfg.RunID, def.Name, def.Version, def.RoleSet); err != nil {
		// V51: never return an error after CreateRun without also
		// terminating the run — otherwise the row sits at status=running
		// forever and pollutes the research data.
		_ = orch.EndRun(ctx, cfg.RunID, orchestrator.RunOutcome{
			Status:          orchestrator.RunKilled,
			FailureCategory: orchestrator.FailSpawnFailed,
			FailureDetail:   "set_org error: " + err.Error(),
		})
		return nil, err
	}
	// Persist what the user asked for (cfg.RuntimeMode — may be "auto").
	// `runtime_mode_resolved` is filled in just below once pickRuntimes
	// resolves it. Rerun uses the *requested* mode so the same resolution
	// path fires; the resolved value is informational ground truth.
	_ = st.Tx(ctx, func(q store.Querier) error {
		_, err := q.Exec(
			`UPDATE runs SET runtime_mode=?, max_wall_secs=?, max_turns=? WHERE id=?`,
			cfg.RuntimeMode, int(cfg.MaxWall.Seconds()), cfg.MaxTurns, cfg.RunID,
		)
		return err
	})

	runtimes, modeUsed, err := pickRuntimes(cfg, def)
	if err != nil {
		_ = orch.EndRun(ctx, cfg.RunID, orchestrator.RunOutcome{
			Status:          orchestrator.RunKilled,
			FailureCategory: orchestrator.FailSpawnFailed,
			FailureDetail:   "pick_runtime error: " + err.Error(),
		})
		return nil, err
	}
	// L2: pre-flight every subprocess-mode CLI we actually need. Only
	// the providers declared by roles in THIS org's def get checked —
	// gemini being missing from PATH must not block a run that uses
	// only claude+codex (even though the mixed-mode runtimes map
	// registers gemini). Deduplicates by runtime pointer so the same
	// binary isn't probed twice when multiple provider strings alias
	// to one runtime (e.g. "claude-code" + "claude-haiku-4-5-20251001"
	// both map to the same instance).
	usedProviders := map[string]bool{}
	for _, role := range def.Roles {
		if role.Provider != "" {
			usedProviders[role.Provider] = true
		}
	}
	preflightedRuntimes := map[any]bool{}
	for provider := range usedProviders {
		rt, ok := runtimes[provider]
		if !ok {
			// L3 already rejected unknown providers in pickRuntimes — but
			// belt-and-braces, fail loudly rather than silently skip.
			_ = orch.EndRun(ctx, cfg.RunID, orchestrator.RunOutcome{
				Status:          orchestrator.RunKilled,
				FailureCategory: orchestrator.FailSpawnFailed,
				FailureDetail:   fmt.Sprintf("provider %q declared by an org role but not registered in runtime mode %q", provider, modeUsed),
			})
			return nil, fmt.Errorf("runlive: provider %q not registered in mode %q", provider, modeUsed)
		}
		if preflightedRuntimes[rt] {
			continue
		}
		preflightedRuntimes[rt] = true
		pf, ok := rt.(interface{ PreFlight(context.Context) error })
		if !ok {
			continue
		}
		if err := pf.PreFlight(ctx); err != nil {
			_ = orch.EndRun(ctx, cfg.RunID, orchestrator.RunOutcome{
				Status:          orchestrator.RunKilled,
				FailureCategory: orchestrator.FailSpawnFailed,
				FailureDetail:   fmt.Sprintf("pre-flight failed for provider %q: %v", provider, err),
			})
			return nil, fmt.Errorf("runlive: pre-flight %q: %w", provider, err)
		}
	}
	_ = st.Tx(ctx, func(q store.Querier) error {
		_, err := q.Exec(
			`UPDATE runs SET runtime_mode_resolved=? WHERE id=?`,
			modeUsed, cfg.RunID,
		)
		return err
	})
	_, _ = bus.Emit(ctx, event.Event{
		Kind:    "run.runtime_selected",
		RunID:   cfg.RunID,
		Payload: map[string]any{"requested": cfg.RuntimeMode, "resolved": modeUsed},
	})

	runner := orchestrator.NewOrgRunner(orch, q, runtimes, def)
	runner.ConfigSvc = cfg.ConfigSvc
	runner.ProjectRoot = cfg.ProjectRoot
	if runner.ProjectRoot == "" {
		// Default: the directory that contains the .td/ dir which contains
		// the org yaml. This way `.td/runs/...` and `.td/demo-project/...`
		// always anchor to the same place regardless of caller's CWD.
		runner.ProjectRoot = inferProjectRoot(cfg.OrgPath)
	}
	// Wire AgentResolver into any scripted runtime in the map so role
	// behaviors can substitute "<role>-AGENTID" placeholders with real
	// spawned agent ids. (Real-CLI runtimes don't need this — agents
	// discover peers via query_registry.)
	//
	// V11/V13: when a behavior references a role that hasn't been
	// spawned yet, lazy-spawn it on demand. Lets behavior chains like
	// fe-lead → fe-worker work without requiring SpawnAll upfront.
	for _, rt := range runtimes {
		if scr, ok := rt.(*scripted.Runtime); ok {
			runID := cfg.RunID
			scr.AgentResolver = func(role string) string {
				if id, ok := runner.AgentIDFor(role); ok {
					return id
				}
				if _, err := runner.SpawnRole(context.Background(), runID, role); err != nil {
					return ""
				}
				id, _ := runner.AgentIDFor(role)
				return id
			}
		}
	}
	// SpawnAll runs at startup as the safety net. Lazy spawn primitives
	// (V11/V13) live alongside it: AgentResolver above lazy-spawns on
	// demand for scripted behavior chains, master.delegate lazy-spawns
	// its target (master.go), the V46 worker delegate tool spawns via
	// the dynamic_spawn fulfiller, and dead-agent reaping respawns. The
	// upfront SpawnAll stays because the scripted production tests rely
	// on every role being present from t=0 — removing it requires per-
	// role startup-prompt suppression that lands in a follow-up.
	if _, err := runner.SpawnAll(ctx, cfg.RunID); err != nil {
		_, _ = bus.Emit(ctx, event.Event{
			Kind: "run.spawn_failed", RunID: cfg.RunID,
			Payload: map[string]any{"error": err.Error()},
		})
		for _, s := range runner.SpawnedList() {
			_ = orch.MarkAgentTerminated(context.Background(), s.AgentID, "spawn_failed_partial")
		}
		_ = orch.EndRun(ctx, cfg.RunID, orchestrator.RunOutcome{
			Status: orchestrator.RunKilled, FailureCategory: orchestrator.FailSpawnFailed, FailureDetail: err.Error(),
		})
		return nil, fmt.Errorf("runlive: spawn: %w", err)
	}

	// Start an always-on idle heartbeat per agent. Without this, an agent
	// that has drained its inbox stops emitting heartbeats (HandleOne only
	// pulses when it picks up a message), and ReapDeadAgents misclassifies
	// healthy idle agents as dead after maxAge. This separates process-
	// liveness (heartbeat) from workflow-progress (the stall watchdog) —
	// see runWatchdog for the latter. Pulse interval is set well below the
	// reaper's maxAge so we never miss the window. Goroutines stop when
	// the run ctx is cancelled at end-of-run cleanup.
	for _, ag := range runner.SpawnedAgents() {
		ag.StartIdleHeartbeat(ctx, 30*time.Second)
	}

	master := orchestrator.NewMaster(orch, q, prompt.NewClassifierFromEnv(), cfg.RunID)
	master.Runner = runner

	// LM-F: install the post-inbox side-effect hook on the master AGENT
	// (only in LLM mode — deterministic mode keeps ProcessInbox draining
	// the inbox separately). When the master LLM acks a Report from a
	// worker, this hook fires the same side-effects ProcessInbox used to
	// (transition task → completed, notify user, run auto-evaluator) so
	// the user-facing loop closes without the LLM needing to do it.
	if def.DispatchMode != "deterministic" {
		masterRoleID := master.MasterRole
		if masterAgent, ok := runner.AgentByRole(masterRoleID); ok && masterAgent != nil {
			// LM-G: after 3 consecutive user-prompt refusals, fall back
			// to the deterministic coordinator to actually dispatch the
			// work. Keeps "verifiably correct run" achievable even when
			// the LLM master refuses to cooperate.
			masterAgent.SetRefusalThreshold(3)
			masterAgent.RefusalFallbackHook = func(ctx context.Context, msg *envelope.Envelope) {
				coord := &deterministic.Coordinator{
					Orch: orch, Queue: q, Runner: runner, Classer: master.Classer,
					RunID: cfg.RunID, MasterRole: master.MasterRole,
					AutoEvaluator: master.AutoEvaluator,
					NotifyUser:    master.NotifyUser,
					RecordPrompt:  master.RecordPrompt,
					OpenTasks:     master.OpenTasks,
				}
				_, _ = bus.Emit(ctx, event.Event{
					Kind: "master.refusal_fallback", RunID: cfg.RunID, TaskID: msg.TaskID,
					Payload: map[string]any{"prompt": msg.Payload.Intent[:min(120, len(msg.Payload.Intent))]},
				})
				_, _, _ = coord.Dispatch(ctx, msg.Payload.Intent)

				// For introspection-shaped prompts, also emit structural
				// self_report records for EVERY agent in the org. This
				// guarantees the observability answer ("who's in this
				// run?") even when downstream LLMs don't cooperate —
				// every record is sourced as "orchestrator" so callers
				// can distinguish structural emits from genuine LLM
				// self-reports.
				if isIntrospectionIntent(msg.Payload.Intent) && def != nil {
					subOrgs := org.SubOrgByRole(def)
					for _, r := range def.Roles {
						aid, ok := runner.AgentIDFor(r.ID)
						if !ok {
							continue
						}
						_, _ = bus.Emit(ctx, event.Event{
							Kind: "agent.self_report", RunID: cfg.RunID, AgentID: aid, TaskID: msg.TaskID,
							Payload: map[string]any{
								"agent_id":     aid,
								"role":         r.ID,
								"provider":     r.Provider,
								"sub_org":      subOrgs[r.ID],
								"is_connector": org.IsConnectorRole(r),
								"tools":        r.Tools,
								"notes":        "structural self-report emitted by runlive.RefusalFallbackHook (LLM master refused to dispatch; harness completed observability on its behalf)",
								"source":       "orchestrator",
							},
						})
					}
					// For introspection prompts, "every agent self-reported"
					// IS the completion criterion. The structural fallback
					// just emitted one for each role, so the work is
					// definitively done — close out the root task via the
					// orchestrator so the run reaches a terminal state
					// instead of waiting on max_wall timeout. Without this
					// the run hangs in 'running' until killed by the
					// watchdog or max_wall — semantically incorrect since
					// the work is observably complete.
					if msg.TaskID != "" {
						if err := orch.Transition(ctx, msg.TaskID, orchestrator.StateCompleted); err == nil {
							_ = master.NotifyUser(ctx, msg.TaskID, "completed",
								"introspection complete: structural self-reports for all declared agents")
							if master.AutoEvaluator != nil {
								master.AutoEvaluator.OnTaskCompleted(ctx, cfg.RunID, msg.TaskID,
									"introspection complete via refusal-fallback")
							}
							// Responsible-agent decision: the introspection
							// criterion IS met; the work is observably done.
							// End the run immediately via EndRun (first-
							// write-wins, so safe even if the poll loop also
							// races to end). Without this, the run hangs ~4
							// minutes waiting for the dead-agent reaper to
							// fail in-flight child tasks before the
							// openTasks==0 branch in the poll loop fires.
							_ = orch.EndRun(ctx, cfg.RunID, orchestrator.RunOutcome{
								Status: orchestrator.RunCompleted,
							})
						}
					}
				}
			}
			// Responsible-agent primitive: when master calls complete_task,
			// fire the canonical transition path (orchestrator.Transition +
			// auto-evaluator + user notify) AND end the run early if this
			// closes the root task. Without this, complete_task degrades
			// to a raw state UPDATE that skips evaluation and leaves the
			// run waiting on openTasks==0.
			masterAgent.CompleteTaskHook = func(ctx context.Context, taskID, outcome, summary string) error {
				targetState := orchestrator.StateCompleted
				if outcome == "failed" {
					targetState = orchestrator.StateFailed
				}
				if err := orch.Transition(ctx, taskID, targetState); err != nil {
					return err
				}
				_ = master.NotifyUser(ctx, taskID, string(targetState), summary)
				if master.AutoEvaluator != nil && targetState == orchestrator.StateCompleted {
					master.AutoEvaluator.OnTaskCompleted(ctx, cfg.RunID, taskID, summary)
				}
				// If this closes the root task, end the run immediately
				// (the deterministic Coordinator does the equivalent).
				var rootID string
				_ = st.DB().QueryRowContext(ctx,
					`SELECT IFNULL(root_task_id,'') FROM runs WHERE id=?`, cfg.RunID,
				).Scan(&rootID)
				if rootID != "" && rootID == taskID {
					runStatus := orchestrator.RunCompleted
					if targetState == orchestrator.StateFailed {
						runStatus = orchestrator.RunFailed
					}
					_ = orch.EndRun(ctx, cfg.RunID, orchestrator.RunOutcome{Status: runStatus})
				}
				return nil
			}
			masterAgent.PostInboxHook = func(ctx context.Context, msg *envelope.Envelope, _ *runtime.LLMResponse) {
				if msg == nil {
					return
				}
				switch msg.Type {
				case envelope.TypeReport:
					if msg.TaskID != "" {
						_ = orch.Transition(ctx, msg.TaskID, orchestrator.StateCompleted)
					}
					_ = master.NotifyUser(ctx, msg.TaskID, "completed", msg.Payload.Intent)
					if master.AutoEvaluator != nil && msg.TaskID != "" {
						master.AutoEvaluator.OnTaskCompleted(ctx, cfg.RunID, msg.TaskID, msg.Payload.Intent)
					}
				case envelope.TypeClarify:
					_ = master.NotifyUser(ctx, msg.TaskID, "clarification", msg.Payload.Intent)
				}
			}
		}
	}

	// Auto-evaluator: register the heuristic + a light LLM-judge-shaped
	// stub. Every report → multi-judge scoring. Closes the "did the work
	// succeed" loop.
	evReg := evaluator.NewRegistry(st, bus)
	_ = evReg.Register(ctx, evaluator.HeuristicEvaluator{Name: "auto-heuristic"})
	master.AutoEvaluator = &runAutoEvaluator{reg: evReg, st: st}

	// LM-B/H: branch on dispatch_mode. Default (empty / "llm") shovels
	// the prompt into master's inbox via master.Dispatch — LLM-driven
	// from there. "deterministic" hands off to the classifier-driven
	// workflow runner in internal/harness/deterministic.
	dispatchErr := func() error {
		if def.DispatchMode == "deterministic" {
			coord := &deterministic.Coordinator{
				Orch: orch, Queue: q, Runner: runner, Classer: master.Classer,
				RunID: cfg.RunID, MasterRole: master.MasterRole,
				AutoEvaluator: master.AutoEvaluator,
				NotifyUser:    master.NotifyUser,
				RecordPrompt:  master.RecordPrompt,
				OpenTasks:     master.OpenTasks,
			}
			_, _, e := coord.Dispatch(ctx, cfg.Prompt)
			return e
		}
		_, _, e := master.Dispatch(ctx, cfg.Prompt)
		return e
	}()
	if err := dispatchErr; err != nil {
		// V50: Dispatch error used to leave the run dangling at
		// status='running' forever — the caller saw an error and
		// returned, but no one closed the run. Fail it explicitly
		// (EndRun is first-write-wins so this is safe even if the
		// no-dispatch path already ended it as unrouted).
		_ = orch.EndRun(ctx, cfg.RunID, orchestrator.RunOutcome{
			Status:          orchestrator.RunKilled,
			FailureCategory: orchestrator.FailSpawnFailed,
			FailureDetail:   "dispatch error: " + err.Error(),
		})
		// V67: agents are spawned at this point; cleanup is at the
		// bottom of runlive.Run and gets skipped on early return.
		for _, s := range runner.SpawnedList() {
			_ = orch.MarkAgentTerminated(context.Background(), s.AgentID, "dispatch_failed")
		}
		return nil, fmt.Errorf("runlive: dispatch: %w", err)
	}

	started := time.Now()
	turns := 0
	deadline := started.Add(cfg.MaxWall)

	// Watchdog: subscribe to the bus and track the timestamp of the most
	// recent event for this run. If we go StallTimeout without any event,
	// auto-kill the run with reason=stalled. Catches deadlock-class bugs
	// (a future Open() that hangs forever, a CLI that never responds, etc).
	stallCtx, stopWatchdog := context.WithCancel(ctx)
	defer stopWatchdog()
	// 4096-event buffer guards the watchdog against bus.fan drops during
	// event bursts (per-LLM-turn the agent emits ~6 events, plus tool
	// events; 50 concurrent turns × 10 events = 500 events instantaneously).
	// A small buffer here causes the watchdog to miss run.X events and
	// false-positive stall-kill.
	sub, cancelSub := bus.Subscribe(4096)
	go runWatchdog(stallCtx, bus, sub, cancelSub, orch, cfg.RunID, cfg.StallTimeout)

	// V19: wire the reapers. Before this commit they existed but had zero
	// production callers, so the "robust communication" guarantees in the
	// brief (retries / requeue / dead-letters / cost ceilings / dead-agent
	// detection) were theoretical. The reapers run on a single ticker
	// inside the runlive context, so they stop cleanly when the run ends.
	//
	// Cost ceilings are read from the org's policies; if max_tokens or
	// max_cost are unset they're effectively disabled.
	reaperCtx, stopReapers := context.WithCancel(ctx)
	defer stopReapers()
	go reaperLoop(reaperCtx, bus, q, orch, cfg.RunID,
		def.Policies.MaxTokensPerRun, def.Policies.MaxCostUSDPerRun,
		def.Policies.ThrashMaxExchanges)

	// V33: fulfiller for dynamic_spawn_requests. Agents can request that
	// the harness spawn additional sibling/child agents at runtime
	// (the "agents can set up agents" feature in the brief). Without this
	// loop the requests pile up unanswered, and the requesting agent
	// blocks indefinitely waiting for a peer that will never appear.
	go dynamicSpawnFulfiller(reaperCtx, bus, q, runner, st, cfg.RunID)

	// V47: policy-violation watcher. Without this, policy.violation
	// events accumulate silently — an agent that's repeatedly spoofing
	// envelopes or trying to escape its zone gets caught per-event but
	// never escalated. Threshold: 5 violations for the same agent within
	// a 60s window → kill the agent + user notification.
	go policyViolationWatcher(reaperCtx, bus, st, cfg.RunID)

PollLoop:
	for {
		// If the run already terminated (e.g. master.Dispatch hit the
		// unrouted path, or the watchdog killed us), exit immediately.
		// EndRun is first-write-wins, so the original cause is preserved.
		if !runStillRunning(st, cfg.RunID) {
			break PollLoop
		}
		// Check ctx + ceilings.
		if ctx.Err() != nil {
			_ = orch.EndRun(ctx, cfg.RunID, orchestrator.RunOutcome{
				Status: orchestrator.RunKilled, FailureCategory: orchestrator.FailCtxCanceled,
			})
			break
		}
		if turns >= cfg.MaxTurns {
			_ = orch.EndRun(ctx, cfg.RunID, orchestrator.RunOutcome{
				Status: orchestrator.RunKilled, FailureCategory: orchestrator.FailMaxTurns,
			})
			break
		}
		if time.Now().After(deadline) {
			_ = orch.EndRun(ctx, cfg.RunID, orchestrator.RunOutcome{
				Status: orchestrator.RunKilled, FailureCategory: orchestrator.FailMaxWall,
			})
			break
		}
		// All non-terminal tasks drained → reflect the root task's
		// disposition into the run. If root_task_id is NULL here, the
		// no-dispatch path in master.Dispatch should have already ended
		// the run — runStillRunning would have returned false above. If
		// somehow we got here without a root task, end with a typed
		// failure rather than silently completing.
		if openTasks(st, cfg.RunID) == 0 {
			endByRootDisposition(ctx, st, orch, cfg.RunID)
			break PollLoop
		}

		anyHandled := false
		// Master inbox processing: in deterministic mode, ProcessInbox
		// drains master's inbox and runs side-effects (notify user,
		// auto-evaluator). In LLM mode the master AGENT's HandleOne
		// owns the inbox — running ProcessInbox would race for the
		// same messages and prevent the LLM from ever seeing them.
		// (The side-effects move to a post-HandleOne hook in LM-F.)
		if def.DispatchMode == "deterministic" {
		for {
			h, err := master.ProcessInbox(ctx)
			if err != nil {
				_, _ = bus.Emit(ctx, event.Event{
					Kind: "master.error", RunID: cfg.RunID,
					Payload: map[string]any{"error": err.Error()},
				})
				break
			}
			if !h {
				break
			}
			anyHandled = true
		}
		} // end deterministic ProcessInbox branch
		// Poll EVERY agent — master included. Previously we skipped
		// master because ProcessInbox drained its inbox via the
		// hardcoded routing path. In LLM-driven mode the master agent
		// IS the routing layer — its LLM reads the inbox via HandleOne
		// and emits delegate / send_message tool calls. ProcessInbox
		// still runs above for the report/clarify/nack side-effects
		// (notification + auto-evaluator) but no longer consumes
		// messages master's LLM needs to see (see LM-F).
		_ = runner // refresh point: SpawnedAgents() grows as roles spawn
		for _, ag := range runner.SpawnedAgents() {
			h, err := ag.HandleOne(ctx)
			if err != nil {
				_, _ = bus.Emit(ctx, event.Event{
					Kind: "agent.error", RunID: cfg.RunID, AgentID: ag.ID,
					Payload: map[string]any{"error": err.Error()},
				})
				continue
			}
			if h {
				anyHandled = true
				turns++
			}
		}
		if !anyHandled {
			select {
			case <-ctx.Done():
				_ = orch.EndRun(ctx, cfg.RunID, orchestrator.RunOutcome{
					Status: orchestrator.RunKilled, FailureCategory: orchestrator.FailCtxCanceled,
				})
				break PollLoop
			case <-time.After(cfg.IdleTimeout):
			}
		}
	}

	// V81: stop reapers BEFORE cleanup so the dynamic-spawn fulfiller
	// can't race-spawn a new agent during the cleanup iteration. The
	// deferred stopReapers() would fire on Run's return, which is AFTER
	// cleanup completes — too late.
	stopReapers()
	stopWatchdog()

	// Cleanup: terminate every spawned agent so tmux sessions don't
	// accumulate. Best-effort on the runtime side (errors logged via
	// events but never fatal), but the agents-table UPDATE is mandatory:
	// the table must not lie about which agents are still alive — the
	// research-data integrity principle in the brief.
	//
	// With lazy spawn the spawned set is whatever the runner has actually
	// brought up — which may be empty (unrouted runs) or partial (only
	// roles the master delegated to). The cleanup loop iterates only
	// the live set rather than all roles in the org definition.
	for _, s := range runner.SpawnedList() {
		rt, _ := runner.Runtimes[def.Roles[0].Provider] // any runtime works for Terminate
		// Better: pick the runtime that actually spawned this agent. We
		// stored that mapping; re-resolve via the role.
		for _, role := range def.Roles {
			if role.ID == s.RoleID {
				if pr, ok := runtimes[role.Provider]; ok {
					rt = pr
				}
				break
			}
		}
		if rt != nil {
			_ = rt.Terminate(context.Background(), s.AgentID, "run_ended")
		}
		// Truthfully record the termination in the agents table, regardless
		// of whether the runtime call succeeded — the row must never stay
		// at status='running' after the run ends.
		if err := orch.MarkAgentTerminated(context.Background(), s.AgentID, "run_ended"); err != nil {
			_, _ = bus.Emit(ctx, event.Event{
				Kind: "agent.terminate.db_failed", RunID: cfg.RunID, AgentID: s.AgentID,
				Payload: map[string]any{"error": err.Error()},
			})
		}
	}
	terminatedCount := len(runner.SpawnedList())
	_, _ = bus.Emit(ctx, event.Event{
		Kind: "run.cleanup_done", RunID: cfg.RunID,
		Payload: map[string]any{"terminated_agents": terminatedCount},
	})

	// Build result.
	res := &Result{
		RunID:     cfg.RunID,
		OrgName:   def.Name,
		StartedAt: started,
		EndedAt:   time.Now(),
	}
	res.WallSeconds = res.EndedAt.Sub(res.StartedAt).Seconds()
	_ = st.DB().QueryRowContext(ctx,
		`SELECT status, IFNULL(kill_reason,''), failure_category, failure_detail, tokens_total FROM runs WHERE id=?`,
		cfg.RunID,
	).Scan(&res.Status, &res.KillReason, &res.FailureCategory, &res.FailureDetail, &res.TokensTotal)
	_ = st.DB().QueryRowContext(ctx,
		`SELECT COUNT(*), IFNULL(SUM(CASE WHEN state='completed' THEN 1 ELSE 0 END),0) FROM tasks WHERE run_id=?`,
		cfg.RunID,
	).Scan(&res.TasksTotal, &res.TasksDone)
	return res, nil
}

// runAutoEvaluator implements orchestrator.AutoEvaluator. Reads the latest
// artifact for the task and feeds it to the multi-judge registry.
type runAutoEvaluator struct {
	reg *evaluator.Registry
	st  *store.Store
}

func (e *runAutoEvaluator) OnTaskCompleted(ctx context.Context, runID, taskID, reportIntent string) {
	// Find the most recent artifact for this task (if any).
	var path, kind string
	_ = e.st.DB().QueryRowContext(ctx,
		`SELECT path, kind FROM artifacts WHERE task_id=? ORDER BY created_at DESC LIMIT 1`, taskID,
	).Scan(&path, &kind)
	body := reportIntent
	if path != "" {
		if b, err := os.ReadFile(path); err == nil && len(b) > 0 {
			body = string(b)
		}
	}
	_, _ = e.reg.EvaluateAll(ctx, evaluator.Target{
		Kind: evaluator.TargetTask,
		ID:   taskID,
		Path: path,
		Body: body,
	})
	// Keep filepath import alive for future evaluators that need to walk
	// directories.
	_ = filepath.Dir
}

// inferProjectRoot walks up from the org yaml's directory looking for a
// `.td` sibling. Falls back to the org's parent dir.
func inferProjectRoot(orgPath string) string {
	dir, err := filepath.Abs(orgPath)
	if err != nil {
		return ""
	}
	dir = filepath.Dir(dir)
	for i := 0; i < 6; i++ {
		if _, err := os.Stat(filepath.Join(dir, ".td")); err == nil {
			return dir
		}
		parent := filepath.Dir(dir)
		if parent == dir {
			break
		}
		dir = parent
	}
	return filepath.Dir(orgPath)
}

// sweepOrphanedTmuxSessions kills `harness-*` tmux sessions whose agent_id
// component is NOT associated with a currently-'running' agent row. Without
// this, Ctrl-C'd runs leave their tmux sessions alive forever, each holding
// open a claude / codex / gemini subprocess that may still be emitting hooks
// and writing to log files.
//
// Mirrors V93 (orphan runs) + V102 (orphan agent rows) but for actual live
// processes.
func sweepOrphanedTmuxSessions(ctx context.Context, st *store.Store) (int, error) {
	if !tmux.TmuxAvailable() {
		return 0, nil
	}
	out, err := exec.CommandContext(ctx, "tmux", "-L", "harness", "ls", "-F", "#{session_name}").Output()
	if err != nil {
		// "no server running" is a normal state, not an error worth reporting.
		return 0, nil
	}
	var alive []string
	for _, line := range strings.Split(strings.TrimSpace(string(out)), "\n") {
		line = strings.TrimSpace(line)
		if strings.HasPrefix(line, "harness-") {
			alive = append(alive, strings.TrimPrefix(line, "harness-"))
		}
	}
	if len(alive) == 0 {
		return 0, nil
	}
	// Look up which of these agent IDs are still 'running' or 'spawning'
	// per the DB. The rest are orphans.
	keep := map[string]bool{}
	for _, agentID := range alive {
		var n int
		_ = st.DB().QueryRowContext(ctx,
			`SELECT COUNT(*) FROM agents WHERE id=? AND status IN ('spawning','running')`,
			agentID,
		).Scan(&n)
		if n > 0 {
			keep[agentID] = true
		}
	}
	killed := 0
	for _, agentID := range alive {
		if keep[agentID] {
			continue
		}
		if err := exec.CommandContext(ctx, "tmux", "-L", "harness", "kill-session", "-t", "harness-"+agentID).Run(); err == nil {
			killed++
		}
	}
	return killed, nil
}

// recoverGoroutine is the canonical panic-handler for long-running
// background goroutines (reaperLoop, runWatchdog, etc). A panic in any of
// them would otherwise crash the entire harness process (V113); this
// keeps the run alive and surfaces the panic as an event so the operator
// can react.
func recoverGoroutine(bus *event.Bus, where, runID string) {
	if r := recover(); r != nil {
		_, _ = bus.Emit(context.Background(), event.Event{
			Kind: "goroutine.panic", RunID: runID,
			Payload: map[string]any{
				"where": where, "panic": fmt.Sprintf("%v", r),
			},
		})
	}
}

func openTasks(st *store.Store, runID string) int {
	var n int
	_ = st.DB().QueryRow(`SELECT COUNT(*) FROM tasks WHERE run_id=? AND state NOT IN ('completed','failed','abandoned','escalated')`, runID).Scan(&n)
	return n
}

// runStillRunning returns true iff runs.status='running'. Used by the poll
// loop to bail when a side path (master.Dispatch no-op, watchdog) has
// already ended the run. The first-write-wins EndRun guarantees we don't
// overwrite the original cause; this guard just keeps us from spinning.
func runStillRunning(st *store.Store, runID string) bool {
	var status string
	_ = st.DB().QueryRow(`SELECT status FROM runs WHERE id=?`, runID).Scan(&status)
	return status == "running"
}

// endByRootDisposition is called when openTasks==0. It reads the root
// task's final state and ends the run with the matching outcome. If the
// run has no root task, that's a precondition violation — Dispatch should
// have already ended the run via the unrouted path — but we fail closed
// with a typed failure rather than silently completing.
func endByRootDisposition(ctx context.Context, st *store.Store, orch *orchestrator.Orchestrator, runID string) {
	var rootID, rootState string
	row := st.DB().QueryRowContext(ctx,
		`SELECT IFNULL(r.root_task_id,''), IFNULL(t.state,'')
		   FROM runs r LEFT JOIN tasks t ON t.id = r.root_task_id
		  WHERE r.id = ?`, runID)
	_ = row.Scan(&rootID, &rootState)
	if rootID == "" {
		// Should be unreachable in correct code paths; the master's
		// no-dispatch branch ends the run before the poll loop reaches
		// this. If we ever land here, surface it loudly.
		_ = orch.EndRun(ctx, runID, orchestrator.RunOutcome{
			Status: orchestrator.RunKilled, FailureCategory: orchestrator.FailTaskFailed,
			FailureDetail: "poll loop exited with no root_task_id — dispatch invariant violated",
		})
		return
	}
	switch rootState {
	case "completed":
		_ = orch.EndRun(ctx, runID, orchestrator.RunOutcome{Status: orchestrator.RunCompleted})
	case "failed", "abandoned", "escalated":
		_ = orch.EndRun(ctx, runID, orchestrator.RunOutcome{
			Status: orchestrator.RunFailed, FailureCategory: orchestrator.FailTaskFailed,
			FailureDetail: "root task in state=" + rootState,
		})
	default:
		// Root task in an open state but openTasks() returned 0? That's
		// internally inconsistent. End killed so the row becomes terminal
		// rather than dangling at 'running'.
		_ = orch.EndRun(ctx, runID, orchestrator.RunOutcome{
			Status: orchestrator.RunKilled, FailureCategory: orchestrator.FailTaskFailed,
			FailureDetail: "openTasks=0 but root task state=" + rootState,
		})
	}
}

// reaperLoop drives the harness's robustness primitives:
//   - Queue.ReclaimExpired: visibility-expired messages get redelivered, or
//     dead-letter to sender if attempts are exhausted (the crash-loop guard)
//   - Orchestrator.ReapStalledTasks: tasks past their deadline transition to
//     failed (so dashboards don't show forever-stuck rows)
//   - Orchestrator.ReapDeadAgents: agents whose heartbeat has expired are
//     marked terminated and their owned tasks failed
//
// Each tick logs counts only when something happened, to keep the event
// stream readable. The reapers themselves emit their own per-row events.
//
// Tick interval is intentionally conservative (1s): faster polling would
// hammer SQLite without much win, since the visibility/deadline windows are
// seconds-to-minutes.
func reaperLoop(ctx context.Context, bus *event.Bus, q *transport.Queue, orch *orchestrator.Orchestrator, runID string, maxTokens int64, maxCostUSD float64, thrashMax int) {
	defer recoverGoroutine(bus, "reaperLoop", runID)
	tick := time.NewTicker(1 * time.Second)
	defer tick.Stop()
	for {
		select {
		case <-ctx.Done():
			return
		case <-tick.C:
			if reclaimed, err := q.ReclaimExpired(ctx); err == nil && reclaimed > 0 {
				_, _ = bus.Emit(ctx, event.Event{
					Kind: "reaper.queue_reclaimed", RunID: runID,
					Payload: map[string]any{"count": reclaimed},
				})
			}
			// V54: also sweep TTL-expired messages so the table doesn't
			// accumulate forever-stale rows.
			if expired, err := q.SweepExpiredTTL(ctx); err == nil && expired > 0 {
				_, _ = bus.Emit(ctx, event.Event{
					Kind: "reaper.ttl_swept", RunID: runID,
					Payload: map[string]any{"count": expired},
				})
			}
			if stalled, err := orch.ReapStalledTasks(ctx); err == nil && stalled > 0 {
				_, _ = bus.Emit(ctx, event.Event{
					Kind: "reaper.tasks_stalled", RunID: runID,
					Payload: map[string]any{"count": stalled},
				})
			}
			// Dead-agent reaper: heartbeat older than 5 minutes means
			// the agent is genuinely stuck. Per-LLM-call heartbeats
			// can fall well outside 60s during long tool sequences
			// (file ops, multiple sub-calls), so a tighter threshold
			// would false-positive on healthy agents.
			if dead, notices, err := orch.ReapDeadAgents(ctx, 5*time.Minute); err == nil && dead > 0 {
				_, _ = bus.Emit(ctx, event.Event{
					Kind: "reaper.agents_dead", RunID: runID,
					Payload: map[string]any{"count": dead, "notices": len(notices)},
				})
				// V45: enqueue a TypeNack to each parent task's owner so
				// the broken handoff is reported rather than silently
				// stalling out until the run watchdog fires.
				for _, n := range notices {
					nack := &envelope.Envelope{
						ID:     "dead_" + n.FailedTaskID,
						RunID:  n.RunID,
						From:   "reaper",
						To:     n.ParentAgentID,
						Type:   envelope.TypeNack,
						TaskID: n.FailedTaskID,
						TTLMs:  60000,
						Payload: envelope.Payload{
							Reason: "delegate agent " + n.DeadAgentID + " was reaped (heartbeat lost)",
							Intent: "child task failed: " + n.FailedTaskID,
						},
					}
					_ = q.Send(ctx, nack)
				}
			}
			// V31: cost ceiling. Zero values disable. EnforceCostCeiling
			// will EndRun any run that exceeds either ceiling and fail
			// its open tasks (V18 wired the latter into EndRun itself).
			if maxTokens > 0 || maxCostUSD > 0 {
				if killed, err := orch.EnforceCostCeiling(ctx, runID, maxTokens, maxCostUSD); err == nil && killed > 0 {
					_, _ = bus.Emit(ctx, event.Event{
						Kind: "reaper.cost_ceiling_enforced", RunID: runID,
						Payload: map[string]any{"runs_killed": killed},
					})
				}
			}
			// V55: thrash detection. Emits coordination.thrash events
			// for (from, to, task) tuples exceeding the org's
			// ThrashMaxExchanges policy. Without this the self-improvement
			// loop in OrgBuilder has zero signal to act on.
			if thrashMax > 0 {
				policy := orchestrator.Policy{ThrashMaxExchanges: thrashMax}
				if n, err := orch.DetectThrash(ctx, runID, policy); err == nil && n > 0 {
					_, _ = bus.Emit(ctx, event.Event{
						Kind: "reaper.thrash_detected", RunID: runID,
						Payload: map[string]any{"escalations": n},
					})
				}
			}
		}
	}
}

// dynamicSpawnFulfiller polls dynamic_spawn_requests for pending rows
// scoped to this run and fulfills each by calling runner.SpawnRole. On
// success the row is marked fulfilled with the spawned agent id; on
// failure it's marked rejected with the error (and the requester can
// react via the same row, or via an event).
//
// Ticking at 500ms is fast enough that "I asked for an agent" → "the
// agent exists in my peer list" round-trips within a single LLM turn.
func dynamicSpawnFulfiller(ctx context.Context, bus *event.Bus, q *transport.Queue, runner *orchestrator.OrgRunner, st *store.Store, runID string) {
	defer recoverGoroutine(bus, "dynamicSpawnFulfiller", runID)
	tick := time.NewTicker(500 * time.Millisecond)
	defer tick.Stop()
	for {
		select {
		case <-ctx.Done():
			return
		case <-tick.C:
			// V115: include requested_by so we can notify the requester
			// when a spawn is rejected.
			rows, err := st.DB().QueryContext(ctx,
				`SELECT id, role, requested_by FROM dynamic_spawn_requests WHERE run_id=? AND status='pending' LIMIT 8`,
				runID,
			)
			if err != nil {
				continue
			}
			type req struct{ id, role, requestedBy string }
			var pending []req
			for rows.Next() {
				var r req
				if err := rows.Scan(&r.id, &r.role, &r.requestedBy); err != nil {
					rows.Close()
					break
				}
				pending = append(pending, r)
			}
			rows.Close()
			for _, r := range pending {
				ag, err := runner.SpawnRole(ctx, runID, r.role)
				if err != nil {
					_ = st.Tx(ctx, func(q store.Querier) error {
						_, e := q.Exec(
							`UPDATE dynamic_spawn_requests SET status='rejected' WHERE id=?`, r.id,
						)
						return e
					})
					_, _ = bus.Emit(ctx, event.Event{
						Kind: "dynamic_spawn.rejected", RunID: runID,
						Payload: map[string]any{"request_id": r.id, "role": r.role, "error": err.Error()},
					})
					// V115: notify the requester via TypeNack so they know
					// the spawn they asked for didn't happen — without this
					// the requesting agent would block waiting for a peer
					// that never arrives.
					if r.requestedBy != "" {
						_ = q.Send(ctx, &envelope.Envelope{
							ID: "dsr_rej_" + r.id, RunID: runID,
							From: "spawner", To: r.requestedBy,
							Type: envelope.TypeNack, TTLMs: 60000,
							Payload: envelope.Payload{
								Reason: "spawn_rejected: " + err.Error(),
								Intent: "dynamic_spawn role=" + r.role + " rejected",
							},
						})
					}
					continue
				}
				_ = st.Tx(ctx, func(q store.Querier) error {
					_, e := q.Exec(
						`UPDATE dynamic_spawn_requests SET status='fulfilled', fulfilled_agent_id=? WHERE id=?`,
						ag.ID, r.id,
					)
					return e
				})
				// New dynamic agent needs the same idle-heartbeat treatment
				// as the SpawnAll-spawned ones — otherwise it would never
				// pulse and get false-positive-reaped after maxAge.
				ag.StartIdleHeartbeat(ctx, 30*time.Second)
				_, _ = bus.Emit(ctx, event.Event{
					Kind: "dynamic_spawn.fulfilled", RunID: runID, AgentID: ag.ID,
					Payload: map[string]any{"request_id": r.id, "role": r.role},
				})
			}
		}
	}
}

// isWatchdogActivity reports whether an event kind indicates real agent or
// task activity that should reset the stall timer. Reaper events (which
// fire every tick regardless of whether work is happening) are NOT activity
// — including them would falsify the stall signal.
func isWatchdogActivity(kind string) bool {
	switch {
	case kind == "":
		return false
	case len(kind) >= 7 && kind[:7] == "reaper.":
		return false
	case len(kind) >= 6 && kind[:6] == "queue.":
		// queue.dead_letter_notified and similar — administrative,
		// not user-visible work.
		return false
	}
	// agent.*, task.*, message.*, run.*, master.*, evaluation.* all count.
	return true
}

// runWatchdog watches the event bus and emits run.stalled + EndRun if no
// event for this run arrives within stallTimeout. Stops when ctx is canceled.
func runWatchdog(ctx context.Context, bus *event.Bus, sub <-chan event.Event, cancelSub func(), orch *orchestrator.Orchestrator, runID string, stallTimeout time.Duration) {
	defer recoverGoroutine(bus, "runWatchdog", runID)
	defer cancelSub()
	timer := time.NewTimer(stallTimeout)
	defer timer.Stop()
	for {
		select {
		case <-ctx.Done():
			return
		case ev, ok := <-sub:
			if !ok {
				return
			}
			// V91: filter the reset signal. Reaper events fire every
			// tick (1s) and would falsely keep the watchdog alive even
			// when every agent is truly stuck. Only reset on events
			// that indicate real agent/task activity.
			if ev.RunID == runID && isWatchdogActivity(string(ev.Kind)) {
				if !timer.Stop() {
					select {
					case <-timer.C:
					default:
					}
				}
				timer.Reset(stallTimeout)
			}
		case <-timer.C:
			_, _ = bus.Emit(ctx, event.Event{
				Kind: "run.stalled", RunID: runID,
				Payload: map[string]any{
					"stall_timeout_ms": stallTimeout.Milliseconds(),
					"hint":             "no event for this run within the stall window; killing. Likely a deadlock in a runtime adapter, or an LLM hung at an interactive prompt.",
				},
			})
			_ = orch.EndRun(ctx, runID, orchestrator.RunOutcome{
				Status:          orchestrator.RunKilled,
				FailureCategory: orchestrator.FailStalled,
				FailureDetail:   fmt.Sprintf("no event within %dms", stallTimeout.Milliseconds()),
			})
			return
		}
	}
}

func pickRuntimes(cfg Config, def *org.Definition) (map[string]runtime.Runtime, string, error) {
	mode := cfg.RuntimeMode
	if mode == "" || mode == "auto" {
		if !tmux.TmuxAvailable() {
			mode = "scripted"
		} else {
			// L3-followup: inspect the org's declared providers. If any role
			// wants codex or gemini, single-provider modes would either error
			// (post-L3) or silently downgrade (pre-L3). "mixed" is the only
			// auto choice that satisfies a heterogeneous org. Default to
			// "claude-code" only when every role wants claude-code (or no
			// real-CLI provider at all).
			mode = "claude-code"
			if def != nil {
				for _, role := range def.Roles {
					switch role.Provider {
					case "codex", "gemini":
						mode = "mixed"
					}
					if mode == "mixed" {
						break
					}
				}
			}
		}
	}
	// Shared scripted fallback for unknown providers, used by every real-CLI
	// mode so a role with provider=scripted still works in a tmux run.
	fb := scripted.New()
	fb.DefaultAutoComplete = true

	switch mode {
	case "scripted":
		rt := scripted.New()
		rt.DefaultAutoComplete = true
		rt.RoleBehaviors = scripted.ProductionBehaviors()
		return map[string]runtime.Runtime{
			"default": rt, "scripted": rt,
			// All real-provider strings route to the scripted runtime in this
			// mode so org yamls that name a real provider can still be exercised
			// without a CLI install.
			"claude-code":                rt,
			"codex":                      rt,
			"gemini":                     rt,
			"anthropic":                  rt,
			"claude-haiku-4-5-20251001":  rt,
		}, mode, nil
	case "claude-code":
		rt := tmux.New(claudecode.PaneFactory("harness", cfg.TmuxLogsDir))
		rt.PreFlightFn = claudecode.CheckAuth
		if err := attachSidecarChannel(rt); err != nil {
			return nil, "", err
		}
		return map[string]runtime.Runtime{
			"default": rt, "claude-code": rt,
			"claude-haiku-4-5-20251001": rt,
			"scripted":                  fb,
		}, mode, nil
	case "codex":
		rt := tmux.New(codex.PaneFactory("harness", cfg.TmuxLogsDir))
		rt.PreFlightFn = codex.CheckAuth
		return map[string]runtime.Runtime{
			"default": rt, "codex": rt,
			"scripted": fb,
		}, mode, nil
	case "gemini":
		rt := tmux.New(gemini.PaneFactory("harness", cfg.TmuxLogsDir))
		rt.PreFlightFn = gemini.CheckAuth
		return map[string]runtime.Runtime{
			"default": rt, "gemini": rt,
			"scripted": fb,
		}, mode, nil
	case "claude-print":
		// Non-interactive claude -p mode — one subprocess per turn. The
		// production-grade real-CLI path. Clean stdout/stderr per turn.
		// LogDir hooks the per-agent transcript log into the same path
		// the tmux runtime uses, so the dashboard's pane viewer renders
		// CLI output uniformly across runtime modes.
		rt := cli.NewRuntime(cli.ClaudePrintProfile)
		rt.LogDir = cfg.TmuxLogsDir
		return map[string]runtime.Runtime{
			"default": rt, "claude-code": rt, "claude-print": rt,
			"claude-haiku-4-5-20251001": rt,
			"scripted":                  fb,
		}, mode, nil
	case "codex-exec":
		rt := cli.NewRuntime(cli.CodexExecProfile)
		rt.LogDir = cfg.TmuxLogsDir
		return map[string]runtime.Runtime{
			"default": rt, "codex": rt, "codex-exec": rt,
			"scripted": fb,
		}, mode, nil
	case "gemini-print":
		rt := cli.NewRuntime(cli.GeminiPrintProfile)
		rt.LogDir = cfg.TmuxLogsDir
		return map[string]runtime.Runtime{
			"default": rt, "gemini": rt, "gemini-print": rt,
			"scripted": fb,
		}, mode, nil
	case "mixed":
		// Mixed-provider mode. Defaults are picked for *research correctness*:
		//   - claude → interactive TUI (L1 ANSI stripping shipped; subscription)
		//   - codex  → `codex exec` (non-interactive, still subscription-billed
		//              per OpenAI; no TUI modal traps, clean stdout)
		//   - gemini → `gemini -p` (non-interactive; uses cached Code Assist
		//              OAuth quota same as TUI when present, else API key)
		// The TUI variants for codex/gemini were the cause of run
		// 1779424201617823000 (update-modal blocked every keystroke). Use
		// `mixed-tui` to opt back in to full-pane subscription behavior.
		cc := tmux.New(claudecode.PaneFactory("harness", cfg.TmuxLogsDir))
		cc.PreFlightFn = claudecode.CheckAuth
		if err := attachSidecarChannel(cc); err != nil {
			return nil, "", err
		}
		cx := cli.NewRuntime(cli.CodexExecProfile)
		cx.LogDir = cfg.TmuxLogsDir
		gm := cli.NewRuntime(cli.GeminiPrintProfile)
		gm.LogDir = cfg.TmuxLogsDir
		return map[string]runtime.Runtime{
			"default":     fb,
			"claude-code": cc,
			"codex":       cx,
			"codex-exec":  cx,
			"gemini":      gm,
			"gemini-print": gm,
			"scripted":    fb,
			"claude-haiku-4-5-20251001": cc,
		}, mode, nil
	case "mixed-tui":
		// Opt-in: every provider uses its interactive TUI. Subscription billing
		// across the board, but exposes the harness to per-CLI modal traps
		// (update prompts, EULA, auth) and TUI-redraw output parsing.
		cc := tmux.New(claudecode.PaneFactory("harness", cfg.TmuxLogsDir))
		cc.PreFlightFn = claudecode.CheckAuth
		if err := attachSidecarChannel(cc); err != nil {
			return nil, "", err
		}
		cx := tmux.New(codex.PaneFactory("harness", cfg.TmuxLogsDir))
		cx.PreFlightFn = codex.CheckAuth
		gm := tmux.New(gemini.PaneFactory("harness", cfg.TmuxLogsDir))
		gm.PreFlightFn = gemini.CheckAuth
		return map[string]runtime.Runtime{
			"default":     fb,
			"claude-code": cc,
			"codex":       cx,
			"gemini":      gm,
			"scripted":    fb,
			"claude-haiku-4-5-20251001": cc,
		}, mode, nil
	}
	return nil, "", fmt.Errorf("runlive: unknown runtime mode %q", mode)
}

// attachSidecarChannel reads HARNESS_SIDECAR_CHANNEL (and the optional
// HARNESS_SIDECAR_BASEDIR override) and, when active, attaches a
// SocketChannel or MailboxChannel to rt. Default ("" or "pane") leaves
// rt.Channel nil so the legacy pane-scraping path is used.
//
// Base directory default: $TMPDIR/harness-channels. macOS unix sockets
// are bound to sun_path[104], so the path needs to be short — TMPDIR
// (typically /var/folders/.../T) is the longest common ancestor that
// still fits when combined with the agent-id filename.
func attachSidecarChannel(rt *tmux.Runtime) error {
	baseDir := filepath.Join(os.TempDir(), "harness-channels")
	ch, err := sidecarchan.FromEnv(os.Getenv, baseDir)
	if err != nil {
		return err
	}
	rt.Channel = ch
	return nil
}

// policyViolationWatcher (V47) subscribes to policy.violation events
// scoped to this run, counts them per agent in a sliding window, and
// escalates when an agent crosses the threshold. Without this, a
// misbehaving agent that spoofs envelopes or repeatedly tries to
// escape its zone gets caught per-event but never stopped.
//
// Threshold: 5 violations within a 60s window. On hit:
//   - emit `agent.policy_threshold_exceeded`
//   - mark the agent terminated (orchestrator.MarkAgentTerminated)
//   - file a user notification (severity=warn) so the human sees it
func policyViolationWatcher(ctx context.Context, bus *event.Bus, st *store.Store, runID string) {
	defer recoverGoroutine(bus, "policyViolationWatcher", runID)
	sub, cancel := bus.Subscribe(256)
	defer cancel()
	const threshold = 5
	const window = 60 * time.Second

	type slot struct{ ts time.Time }
	perAgent := map[string][]slot{}
	cooldownUntil := map[string]time.Time{}

	for {
		select {
		case <-ctx.Done():
			return
		case ev, ok := <-sub:
			if !ok {
				return
			}
			if ev.Kind != event.KindPolicyViolation || ev.RunID != runID || ev.AgentID == "" {
				continue
			}
			now := time.Now()
			if t, ok := cooldownUntil[ev.AgentID]; ok && now.Before(t) {
				continue
			}
			cutoff := now.Add(-window)
			kept := perAgent[ev.AgentID][:0]
			for _, s := range perAgent[ev.AgentID] {
				if s.ts.After(cutoff) {
					kept = append(kept, s)
				}
			}
			kept = append(kept, slot{ts: now})
			perAgent[ev.AgentID] = kept
			if len(kept) < threshold {
				continue
			}
			// Escalate.
			_, _ = bus.Emit(ctx, event.Event{
				Kind: "agent.policy_threshold_exceeded", RunID: runID, AgentID: ev.AgentID,
				Payload: map[string]any{
					"count":     len(kept),
					"window_s":  int(window.Seconds()),
					"threshold": threshold,
				},
			})
			notifID := fmt.Sprintf("notif_policy_%s_%d", ev.AgentID, now.UnixNano())
			_ = st.Tx(ctx, func(q store.Querier) error {
				_, err := q.Exec(
					`INSERT OR IGNORE INTO user_notifications(id, run_id, agent_id, kind, severity, body_text, created_at) VALUES(?, ?, ?, 'policy', 'warn', ?, ?)`,
					notifID, runID, ev.AgentID,
					fmt.Sprintf("agent %s exceeded %d policy violations within %ds", ev.AgentID, threshold, int(window.Seconds())),
					now.Unix(),
				)
				return err
			})
			// 5-minute cooldown so the watcher doesn't spam if violations
			// keep arriving from the same agent.
			cooldownUntil[ev.AgentID] = now.Add(5 * time.Minute)
			perAgent[ev.AgentID] = nil
		}
	}
}

// isIntrospectionIntent returns true when the user prompt asks the org
// to introspect / self-report. Same trigger words as the deterministic
// classifier's KindIntrospection rule.
func isIntrospectionIntent(s string) bool {
	low := strings.ToLower(s)
	for _, k := range []string{
		"self-report", "self report", "introspect",
		"every subworker", "every agent", "all agents", "all workers",
		"each subworker", "each agent", "each worker",
	} {
		if strings.Contains(low, k) {
			return true
		}
	}
	return false
}
