package runtime

import (
	"context"
	"crypto/sha256"
	"encoding/hex"
	"encoding/json"
	"errors"
	"fmt"
	"os"
	"path/filepath"
	"strings"
	"sync"
	"time"

	"github.com/flothus/tmux-xterm-research/server-go/internal/configsvc"
	"github.com/flothus/tmux-xterm-research/server-go/internal/harness/envelope"
	"github.com/flothus/tmux-xterm-research/server-go/internal/harness/event"
	"github.com/flothus/tmux-xterm-research/server-go/internal/harness/store"
	"github.com/flothus/tmux-xterm-research/server-go/internal/harness/transport"
)

// checkCrossSubOrg (V84) refuses to forward a message that crosses sub-org
// boundaries when the org policy declares cross_suborg_requires_connector
// and this agent isn't a connector role. Emits a policy.violation
// before rejecting so the V47 watcher can escalate repeat offenders.
//
// Strategy: split recipient into bare role id (handles "suborg.role"
// qualified form) and compare sub-orgs. Non-role recipients (master,
// queue, reaper, user) are treated as cross-cutting and allowed.
func (a *Agent) checkCrossSubOrg(tool, recipient string) error {
	if !a.CrossSubOrgConn || a.IsConnector || len(a.SubOrgByRole) == 0 {
		return nil
	}
	bare := recipient
	if strings.Contains(recipient, ".") {
		parts := strings.SplitN(recipient, ".", 2)
		if len(parts) == 2 {
			bare = parts[1]
		}
	}
	recvSO, known := a.SubOrgByRole[bare]
	if !known {
		// non-role recipient (or @-connector) → not a cross-suborg hop
		return nil
	}
	if recvSO == a.SubOrg {
		return nil
	}
	_, _ = a.bus.Emit(context.Background(), event.Event{
		Kind: event.KindPolicyViolation, RunID: a.RunID, AgentID: a.ID,
		Payload: map[string]any{
			"tool":            tool,
			"violation":       "cross_suborg_without_connector",
			"sender_sub_org":  a.SubOrg,
			"recipient":       recipient,
			"recipient_sub_org": recvSO,
		},
	})
	return fmt.Errorf("%s: cross-suborg hop %s→%s requires a connector role (sender is %q)", tool, a.SubOrg, recvSO, a.Role)
}

// trunc cuts s to <=n chars with an ellipsis when truncated. Used by
// the classify_prompt tool to keep the event payload small.
func trunc(s string, n int) string {
	if len(s) <= n {
		return s
	}
	return s[:n-1] + "…"
}

func sha256Sum(s string) string {
	h := sha256.Sum256([]byte(s))
	return hex.EncodeToString(h[:])
}

// Agent is the runtime wrapper that drives one LLM agent through the harness.
// It owns:
//   - the receive→LLM→tool-dispatch→ack loop
//   - heartbeat emission (around LLM calls, not from inside the prompt)
//   - token + cost accounting per turn → persisted into agents+runs columns
//   - tool execution including zone-scope checks on writes
//   - parse-failure tracking
//   - per-turn artifact persistence under ArtifactsDir for replay
//
// Plan §13. Heartbeats come from this wrapper, not the agent prompt — see
// §13 lock.
type Agent struct {
	ID            string
	Role          string
	RunID         string
	ZoneScope     []string
	Tools         []string
	Provider      string
	CostPerKTok   float64 // USD per 1k tokens. 0 disables cost tracking.
	// ArtifactsDir is the directory where per-turn artifacts are written.
	// Defaults to ".td/runs/<run-id>/agents/<agent-id>/" via Agent.HandleOne.
	ArtifactsDir string
	// ProjectRoot anchors agent file writes. Every relative path the agent
	// emits (via write_file, persistTurn, etc.) is joined to ProjectRoot
	// before touching the filesystem. Empty means CWD-relative — that's
	// fine in production but fragile in tests where CWD is a subdir.
	ProjectRoot string

	rt    Runtime
	st    *store.Store
	bus   *event.Bus
	queue *transport.Queue
	// OC4: optional org/role config façade. When set, the agent exposes
	// org.* and role.* tools so in-run agents can build/validate orgs
	// without an out-of-band channel. nil = config tools refuse with a
	// clear error (tests without a Service still work).
	cfgSvc *configsvc.Service

	mu              sync.Mutex
	introspectCalls int
	parseFailures   int
	tokensTotal     int64
	costUSDTotal    float64
	turnCounter     int
	// L8: last parse error string, set when a turn ends in parse
	// failure. Fed into the NEXT LLMRequest so the model sees corrective
	// feedback rather than blindly emitting the same malformed output.
	// Cleared on the first successful turn.
	lastParseError string

	// RefusalFallbackHook fires when the agent has nacked a user-
	// originated message N times in a row without emitting any tool
	// calls. The harness installs this on master in LLM mode to invoke
	// the deterministic fan-out as a structural fallback — guarantees
	// the run completes even when the LLM is being uncooperative. The
	// hook is called ONCE per stuck message; the agent stops nacking
	// after firing it.
	RefusalFallbackHook func(ctx context.Context, msg *envelope.Envelope)
	refusalThreshold    int    // 0 disables; runlive sets 3 for master
	refusalCount        int    // consecutive refusal nacks on currentMsgID
	currentMsgID        string // the message id being refused

	// PostInboxHook (LM-F) fires after a successful HandleOne turn —
	// after Ack, after tool dispatch. Used by runlive to attach
	// master-only side-effects (mark Report task completed, notify user,
	// run auto-evaluator) without baking that knowledge into Agent.
	// Workers leave it nil.
	PostInboxHook func(ctx context.Context, msg *envelope.Envelope, resp *LLMResponse)

	// CompleteTaskHook is the responsible-agent primitive. When the LLM
	// calls the complete_task tool, this hook performs the canonical
	// transition (orchestrator.Transition + auto-evaluator + user notify
	// + run-end check if this is the root task). Without the hook,
	// complete_task degrades to a direct state UPDATE — usable in tests
	// but skips the side-effect chain. Wired by runlive for master + any
	// role with `delegates_to`.
	CompleteTaskHook func(ctx context.Context, taskID, outcome, summary string) error

	// ResolveRoleAgent (added with V46 delegate tool) maps a role id to
	// a live agent id, lazy-spawning if the role hasn't been spawned
	// yet. Wired by OrgRunner at agent construction. When the LLM emits
	// a tool call targeting a role name (e.g. `delegate.to: "fe-worker"`)
	// the tool runtime resolves through this callback so the envelope's
	// `To` field is a real agent id and the Queue can deliver. Without
	// it, role-name targets get dropped by the Queue and the LLM's
	// downstream chain stalls silently. Empty return = unresolved.
	ResolveRoleAgent func(role string) string

	// StaticPeers is the team list derived from the org definition at
	// spawn time. Needed because the live agents-table query in
	// Introspect() returns nothing for the FIRST role spawned (master),
	// leaving {{peers}} empty in the rendered system prompt and making
	// the LLM say "I have no team, must be waiting." When set,
	// RenderSystemPrompt prefers this over the live table.
	StaticPeers []Peer

	// V84: cross-suborg routing policy. SubOrg is this agent's sub-org
	// id (empty for root-level roles). SubOrgByRole maps every role id
	// to its sub-org so we can resolve a recipient. CrossSubOrgConn +
	// IsConnector decide whether an agent is allowed to send to a role
	// in a different sub-org without going through a connector.
	SubOrg          string
	SubOrgByRole    map[string]string
	CrossSubOrgConn bool
	IsConnector     bool
}

// SetRefusalThreshold configures how many consecutive user-prompt
// refusals (turns where the LLM emitted zero tool_calls on a from=user
// Delegate) trigger the RefusalFallbackHook. 0 disables; runlive sets 3
// for master in LLM mode.
func (a *Agent) SetRefusalThreshold(n int) {
	a.mu.Lock()
	defer a.mu.Unlock()
	a.refusalThreshold = n
}

// SetConfigSvc installs the org/role builder façade. When set, the agent
// dispatches org.* / role.* tool calls against it; when nil, those tools
// refuse with a clear error so test rigs without a Service still work.
func (a *Agent) SetConfigSvc(s *configsvc.Service) {
	a.mu.Lock()
	defer a.mu.Unlock()
	a.cfgSvc = s
}

// NewAgent constructs an Agent. The agents row must already exist (the
// orchestrator creates it before calling Spawn).
func NewAgent(rt Runtime, st *store.Store, bus *event.Bus, q *transport.Queue, id, role, runID string, zone []string, tools []string, provider string, costPerKTok float64) *Agent {
	return &Agent{
		ID: id, Role: role, RunID: runID, ZoneScope: zone, Tools: tools,
		Provider: provider, CostPerKTok: costPerKTok,
		rt: rt, st: st, bus: bus, queue: q,
	}
}

// HandleOne runs one Receive→LLM→tools→Ack cycle. Returns (handled, error).
// handled=false means inbox was empty.
//
// Emits step events for first-class observability:
//
//	agent.handle_one.started     — message picked up
//	agent.llm.call_started       — about to invoke runtime.CallLLM
//	agent.llm.call_ended         — runtime returned; payload has duration + tokens
//	agent.tool.called            — each tool dispatched (one event per call)
//	agent.tool.failed            — tool dispatch error
//	agent.handle_one.acked       — Ack succeeded; cycle complete
//
// These let the dashboard show per-agent activity and let the watchdog tell
// "agent is working" from "agent is stuck."
func (a *Agent) HandleOne(ctx context.Context) (bool, error) {
	msg, err := a.queue.Receive(ctx, a.ID)
	if err != nil {
		return false, err
	}
	if msg == nil {
		return false, nil
	}

	_, _ = a.bus.Emit(ctx, event.Event{
		Kind: "agent.handle_one.started", RunID: a.RunID, AgentID: a.ID, TaskID: msg.TaskID,
		Payload: map[string]any{"message_id": msg.ID, "type": string(msg.Type), "from": msg.From},
	})

	a.heartbeat(ctx)

	llmStart := time.Now()
	_, _ = a.bus.Emit(ctx, event.Event{
		Kind: "agent.llm.call_started", RunID: a.RunID, AgentID: a.ID, TaskID: msg.TaskID,
		Payload: map[string]any{"message_id": msg.ID, "tool_count": len(a.Tools)},
	})

	// V76: long LLM calls (>5 min) would otherwise let heartbeat go
	// stale and the dead-agent reaper false-kill us mid-call. A periodic
	// pulse goroutine refreshes heartbeat every 30s for the duration of
	// the call. Cancels when CallLLM returns.
	pulseCtx, stopPulse := context.WithCancel(ctx)
	go a.heartbeatPulse(pulseCtx, 30*time.Second)

	a.mu.Lock()
	priorErr := a.lastParseError
	a.mu.Unlock()
	// Advertise BOTH the per-role allowlist tools AND the implicit
	// always-available tools (self_report, report_environment_issue).
	// Without listing the implicit ones, the LLM doesn't know it can
	// call them and improvises — e.g. workers seeing only "introspect"
	// in their tool list will overload it instead of using the dedicated
	// self_report tool. Always include implicit tools so the model has
	// a clean shape to call.
	combinedTools := make([]string, 0, len(a.Tools)+len(implicitTools))
	combinedTools = append(combinedTools, a.Tools...)
	for t := range implicitTools {
		// Skip if already in the role allowlist (avoid duplicates).
		dup := false
		for _, existing := range a.Tools {
			if existing == t {
				dup = true
				break
			}
		}
		if !dup {
			combinedTools = append(combinedTools, t)
		}
	}
	resp, err := a.rt.CallLLM(ctx, a.ID, LLMRequest{
		IncomingMessage: msg,
		AvailableTools:  combinedTools,
		PriorParseError: priorErr,
	})
	stopPulse()
	llmDur := time.Since(llmStart)

	a.heartbeat(ctx)

	if err != nil {
		// L8: remember the parse error so the next turn's prompt can
		// include corrective context. Without this, the model keeps
		// emitting the same malformed shape until dead-letter.
		a.mu.Lock()
		a.lastParseError = err.Error()
		a.mu.Unlock()
		// L6/V116: capture whatever forensic information we have BEFORE
		// erroring out. resp can be non-nil on parse failure (the runtime
		// returns the raw output + parseFailures count). Without this
		// persisted artifact, diagnosing "what did the LLM actually say
		// that we couldn't parse?" requires the live tmux pane to still
		// exist — which it usually doesn't.
		artifactPath := a.persistFailedTurn(ctx, msg, resp, err, llmDur)
		payload := map[string]any{
			"message_id":  msg.ID,
			"duration_ms": llmDur.Milliseconds(),
			"error":       err.Error(),
		}
		if artifactPath != "" {
			payload["failed_turn_artifact"] = artifactPath
		}
		if resp != nil {
			payload["parse_failures"] = resp.ParseFailures
			payload["raw_text_bytes"] = len(resp.Text)
		}
		_, _ = a.bus.Emit(ctx, event.Event{
			Kind: "agent.llm.call_failed", RunID: a.RunID, AgentID: a.ID, TaskID: msg.TaskID,
			Payload: payload,
		})
		_ = a.queue.Nack(ctx, msg.ID, "runtime_error: "+err.Error())
		return true, err
	}

	// L8: success — clear the prior-parse-error so the next turn isn't
	// nagged with stale corrective text.
	a.mu.Lock()
	a.lastParseError = ""
	a.mu.Unlock()

	textSnippet := ""
	if len(resp.Text) > 200 {
		textSnippet = resp.Text[:200] + "…"
	} else {
		textSnippet = resp.Text
	}
	_, _ = a.bus.Emit(ctx, event.Event{
		Kind: "agent.llm.call_ended", RunID: a.RunID, AgentID: a.ID, TaskID: msg.TaskID,
		Payload: map[string]any{
			"message_id":     msg.ID,
			"duration_ms":    llmDur.Milliseconds(),
			"tokens_prompt":  resp.Tokens.Prompt,
			"tokens_compl":   resp.Tokens.Completion,
			"tool_calls":     len(resp.ToolCalls),
			"parse_failures": resp.ParseFailures,
			"text_snippet":   textSnippet,
		},
	})

	if resp.ParseFailures > 0 {
		a.mu.Lock()
		a.parseFailures += resp.ParseFailures
		pf := a.parseFailures
		a.mu.Unlock()
		_ = a.st.Tx(ctx, func(q store.Querier) error {
			_, err := q.Exec(`UPDATE agents SET parse_failures=? WHERE id=?`, pf, a.ID)
			return err
		})
		_, _ = a.bus.Emit(ctx, event.Event{
			Kind: event.KindParseFailure, RunID: a.RunID, AgentID: a.ID,
			Payload: map[string]any{"count": resp.ParseFailures, "total": pf},
		})
	}

	if resp.Tokens.Prompt+resp.Tokens.Completion > 0 {
		a.recordCost(ctx, resp.Tokens)
	}

	// Persist this turn as an artifact so it can be replayed/inspected.
	a.persistTurn(ctx, msg, resp, llmDur)

	// V90: cap tool calls per turn. A malformed/runaway LLM that emits
	// thousands of tool calls would otherwise execute them all (DoS).
	// 32 is generous: most turns produce 1-3 tool calls.
	const maxToolCallsPerTurn = 32
	toolCalls := resp.ToolCalls
	if len(toolCalls) > maxToolCallsPerTurn {
		_, _ = a.bus.Emit(ctx, event.Event{
			Kind: event.KindPolicyViolation, RunID: a.RunID, AgentID: a.ID, TaskID: msg.TaskID,
			Payload: map[string]any{
				"kind":     "tool_call_count_exceeded",
				"got":      len(toolCalls),
				"cap":      maxToolCallsPerTurn,
				"truncated_to": maxToolCallsPerTurn,
			},
		})
		toolCalls = toolCalls[:maxToolCallsPerTurn]
	}
	toolFailures := 0
	toolSuccesses := 0
	var lastToolErr error
	for _, tc := range toolCalls {
		toolStart := time.Now()
		// Pass through the originating message so tools that need task
		// context (request_clarification → which task is awaiting? what
		// to put in envelope.TaskID for round-trip?) can use it.
		err := a.executeTool(ctx, tc, msg)
		toolDur := time.Since(toolStart)
		if err != nil {
			toolFailures++
			lastToolErr = err
			_, _ = a.bus.Emit(ctx, event.Event{
				Kind: "agent.tool.failed", RunID: a.RunID, AgentID: a.ID, TaskID: msg.TaskID,
				Payload: map[string]any{
					"tool":        tc.Name,
					"duration_ms": toolDur.Milliseconds(),
					"error":       err.Error(),
				},
			})
			_, _ = a.bus.Emit(ctx, event.Event{
				Kind: event.KindPolicyViolation, RunID: a.RunID, AgentID: a.ID,
				Payload: map[string]any{"tool": tc.Name, "error": err.Error()},
			})
			continue
		}
		toolSuccesses++
		_, _ = a.bus.Emit(ctx, event.Event{
			Kind: "agent.tool.called", RunID: a.RunID, AgentID: a.ID, TaskID: msg.TaskID,
			Payload: map[string]any{
				"tool":        tc.Name,
				"duration_ms": toolDur.Milliseconds(),
				"args":        tc.Args,
			},
		})
	}

	// V38: if the LLM emitted tool calls AND every single one failed, the
	// worker absorbed the inbox message without producing output. Nack
	// so the queue's retry/dead-letter machinery surfaces the failure
	// (rather than silently dropping the work). Pure text-only responses
	// (no tool calls) still Ack — the LLM may have answered conversationally
	// and the sender can read text from the persistTurn artifact.
	if len(resp.ToolCalls) > 0 && toolSuccesses == 0 && toolFailures > 0 {
		reason := "all_tools_failed"
		if lastToolErr != nil {
			reason += ": " + lastToolErr.Error()
		}
		_ = a.queue.Nack(ctx, msg.ID, reason)
		_, _ = a.bus.Emit(ctx, event.Event{
			Kind: "agent.handle_one.nacked", RunID: a.RunID, AgentID: a.ID, TaskID: msg.TaskID,
			Payload: map[string]any{"message_id": msg.ID, "reason": reason, "tool_calls": len(resp.ToolCalls)},
		})
		return true, nil
	}
	// V73: if the LLM produced no tools AND no usable text AND parsing
	// failed, the turn was a wasted LLM call with no work product. Nack
	// instead of silently acking — otherwise the queue treats the message
	// as handled and the upstream sender's contract is broken.
	if resp.ParseFailures > 0 && len(resp.ToolCalls) == 0 && strings.TrimSpace(resp.Text) == "" {
		_ = a.queue.Nack(ctx, msg.ID, "parse_failure_no_output")
		_, _ = a.bus.Emit(ctx, event.Event{
			Kind: "agent.handle_one.nacked", RunID: a.RunID, AgentID: a.ID, TaskID: msg.TaskID,
			Payload: map[string]any{"message_id": msg.ID, "reason": "parse_failure_no_output", "parse_failures": resp.ParseFailures},
		})
		return true, nil
	}

	// LM-G safety: when a user-originated delegate produces zero tool
	// calls, the LLM is no-op'ing on the routing turn. Nack so the
	// message is retried (with L8 parse-error feedback included in the
	// next prompt). Without this, a confused master quietly acks the
	// prompt and the whole run stalls until the watchdog fires.
	// Did the LLM emit a TERMINAL tool call (one that actually moves
	// the work forward)? query_registry / classify_prompt / introspect
	// / self_report are inspection-only — by themselves they don't
	// dispatch anything. The user prompt path requires a delegate or
	// send_message to count as progress; otherwise we treat the turn
	// as a refusal for fallback-counting purposes.
	terminalAction := false
	for _, tc := range resp.ToolCalls {
		switch tc.Name {
		case "delegate", "send_message", "ask_user", "request_clarification":
			terminalAction = true
		}
		if terminalAction {
			break
		}
	}
	if msg.From == "user" && msg.Type == envelope.TypeDelegate && !terminalAction {
		nackReason := "user_prompt_no_tool_calls"
		a.mu.Lock()
		if a.currentMsgID != msg.ID {
			a.currentMsgID = msg.ID
			a.refusalCount = 0
		}
		a.refusalCount++
		count := a.refusalCount
		threshold := a.refusalThreshold
		a.lastParseError = "previous turn produced no tool_calls on a user prompt — you must emit at least one delegate / send_message tool call to route the work"
		a.mu.Unlock()
		// Threshold reached: invoke the structural fallback if installed.
		// We Ack the message (not Nack) so it stops bouncing in the queue —
		// the fallback takes ownership of routing the work.
		if threshold > 0 && count >= threshold && a.RefusalFallbackHook != nil {
			_ = a.queue.Ack(ctx, msg.ID)
			_, _ = a.bus.Emit(ctx, event.Event{
				Kind: "agent.refusal_fallback_fired", RunID: a.RunID, AgentID: a.ID, TaskID: msg.TaskID,
				Payload: map[string]any{
					"message_id":      msg.ID,
					"refusal_count":   count,
					"threshold":       threshold,
					"last_text":       textSnippet,
				},
			})
			a.RefusalFallbackHook(ctx, msg)
			a.mu.Lock()
			a.currentMsgID = ""
			a.refusalCount = 0
			a.mu.Unlock()
			return true, nil
		}
		_ = a.queue.Nack(ctx, msg.ID, nackReason)
		_, _ = a.bus.Emit(ctx, event.Event{
			Kind: "agent.handle_one.nacked", RunID: a.RunID, AgentID: a.ID, TaskID: msg.TaskID,
			Payload: map[string]any{"message_id": msg.ID, "reason": nackReason, "text_snippet": textSnippet, "refusal_count": count},
		})
		return true, nil
	}

	_ = a.queue.Ack(ctx, msg.ID)
	_, _ = a.bus.Emit(ctx, event.Event{
		Kind: "agent.handle_one.acked", RunID: a.RunID, AgentID: a.ID, TaskID: msg.TaskID,
		Payload: map[string]any{"message_id": msg.ID},
	})
	// LM-F: post-inbox hook. When set, fires after a successful turn so
	// that side-effects previously owned by Master.ProcessInbox (mark
	// task completed on Report, notify user, run auto-evaluator) still
	// happen — even though the master agent's HandleOne now consumes
	// the inbox in LLM-driven mode. Hook stays nil on worker agents.
	if a.PostInboxHook != nil {
		a.PostInboxHook(ctx, msg, resp)
	}
	return true, nil
}

func (a *Agent) heartbeat(ctx context.Context) {
	nowStr := store.FmtTime(store.Now())
	_ = a.st.Tx(ctx, func(q store.Querier) error {
		_, err := q.Exec(`UPDATE agents SET heartbeat_at=? WHERE id=?`, nowStr, a.ID)
		return err
	})
}

// StartIdleHeartbeat begins a background pulser that updates the agent's
// heartbeat_at every interval, independent of message handling. Without
// this, agents that have processed their inbox and are sitting idle (no
// new messages incoming) stop heartbeating entirely — HandleOne emits
// heartbeats only AFTER receiving a message, so an empty inbox produces
// no pulse. The dead-agent reaper would then misclassify these healthy
// idle agents as "dead" (heartbeat older than maxAge) and fail their
// owned tasks, even though the agent process is alive and ready.
//
// Conceptually: heartbeat is a PROCESS-LIVENESS signal, not a
// workflow-progress signal. Process liveness should be reported whether
// or not work is happening; workflow stuckness is the stall-watchdog's
// responsibility instead.
//
// The runlive layer calls this once per agent after spawn, with the
// run-scoped ctx so the goroutine exits cleanly when the run ends.
// Safe to call multiple times: a second call just starts a second
// goroutine (callers should avoid that, but it's not catastrophic).
func (a *Agent) StartIdleHeartbeat(ctx context.Context, interval time.Duration) {
	go a.heartbeatPulse(ctx, interval)
}

// heartbeatPulse fires heartbeat every interval until ctx is cancelled.
// Used by:
//   - StartIdleHeartbeat (run-scoped idle pulser, fixes the false-positive
//     reap of healthy idle agents)
//   - HandleOne around long LLM calls (per-call pulser, prevents reap
//     during a single multi-minute CallLLM)
func (a *Agent) heartbeatPulse(ctx context.Context, interval time.Duration) {
	defer func() {
		// V113: contain panics so a heartbeat bug doesn't crash the
		// harness. Emit so the operator notices.
		if r := recover(); r != nil {
			_, _ = a.bus.Emit(context.Background(), event.Event{
				Kind: "goroutine.panic", RunID: a.RunID, AgentID: a.ID,
				Payload: map[string]any{"where": "heartbeatPulse", "panic": fmt.Sprintf("%v", r)},
			})
		}
	}()
	tick := time.NewTicker(interval)
	defer tick.Stop()
	for {
		select {
		case <-ctx.Done():
			return
		case <-tick.C:
			a.heartbeat(ctx)
		}
	}
}

// transitionTo writes a task-state UPDATE atomically with a
// task.state_changed event, in the same tx so the FSM record never desyncs
// from the event log. The v4 enum trigger validates the target state; if
// the transition is illegal at the FSM layer, the caller's UPDATE will
// land but no FSM-level integrity check fires (the agent is trusted to
// only request transitions it owns). For research-data analysis this is
// fine: the canonical FSM enforcement is in orchestrator.Transition; this
// path is used when the agent itself initiates a transition mid-tool.
func (a *Agent) transitionTo(ctx context.Context, taskID, to string) error {
	now := store.FmtTime(store.Now())
	var from, runID string
	return a.st.Tx(ctx, func(q store.Querier) error {
		if err := q.QueryRow(`SELECT state, run_id FROM tasks WHERE id=?`, taskID).Scan(&from, &runID); err != nil {
			return err
		}
		if from == to {
			return nil
		}
		if _, err := q.Exec(`UPDATE tasks SET state=?, updated_at=? WHERE id=?`, to, now, taskID); err != nil {
			return err
		}
		if _, err := a.bus.EmitInTx(q, event.Event{
			Kind: event.KindTaskStateChanged, RunID: runID, TaskID: taskID,
			Payload: map[string]any{"from": from, "to": to, "by": a.ID},
		}); err != nil {
			return err
		}
		return nil
	})
}

func (a *Agent) recordCost(ctx context.Context, t TokenUsage) {
	// V85: sanitize LLM-reported token counts. A malformed runtime
	// response could carry negative or absurdly large values; without
	// clamping they'd corrupt run/agent aggregates and silently break
	// cost-ceiling enforcement.
	const maxPerTurn = 10_000_000 // 10M tokens/turn is already absurd for any real LLM
	prompt := t.Prompt
	completion := t.Completion
	clamped := false
	if prompt < 0 {
		prompt = 0
		clamped = true
	}
	if completion < 0 {
		completion = 0
		clamped = true
	}
	if prompt > maxPerTurn {
		prompt = maxPerTurn
		clamped = true
	}
	if completion > maxPerTurn {
		completion = maxPerTurn
		clamped = true
	}
	if clamped {
		_, _ = a.bus.Emit(ctx, event.Event{
			Kind: "agent.token_usage.clamped", RunID: a.RunID, AgentID: a.ID,
			Payload: map[string]any{
				"reported_prompt":     t.Prompt,
				"reported_completion": t.Completion,
				"clamped_prompt":      prompt,
				"clamped_completion":  completion,
			},
		})
	}
	tokens := int64(prompt + completion)
	var dollars float64
	if a.CostPerKTok > 0 {
		dollars = float64(tokens) / 1000.0 * a.CostPerKTok
	}
	a.mu.Lock()
	a.tokensTotal += tokens
	a.costUSDTotal += dollars
	atotal := a.tokensTotal
	acost := a.costUSDTotal
	a.mu.Unlock()
	_ = a.st.Tx(ctx, func(q store.Querier) error {
		if _, err := q.Exec(`UPDATE agents SET tokens_total=?, cost_usd_total=? WHERE id=?`, atotal, acost, a.ID); err != nil {
			return err
		}
		// Bump run totals.
		_, err := q.Exec(`UPDATE runs SET tokens_total=tokens_total+?, cost_usd_total=cost_usd_total+? WHERE id=?`, tokens, dollars, a.RunID)
		return err
	})
}

// executeTool dispatches a single tool call. Tool names not in a.Tools are
// rejected as policy.violation. Implemented tools (Phase A):
//   - introspect: no-op; bumps a counter so tests can verify the agent called it
//   - write_file: zone-scope-checked filesystem write
//   - request_clarification: emits a clarify envelope upward AND transitions
//     the originating task to awaiting_clarification so the FSM reflects the
//     real state (V28)
//   - send_message: emit an arbitrary envelope (skeleton for later use)
//
// `incoming` is the message that triggered this turn — used by tools that
// need to know "which task am I on right now" for FSM transitions and
// envelope task_id propagation. May be nil for synthetic test calls.
func (a *Agent) executeTool(ctx context.Context, tc ToolCall, incoming *envelope.Envelope) error {
	if !a.allowed(tc.Name) {
		return fmt.Errorf("tool %q not in role allowlist", tc.Name)
	}
	switch tc.Name {
	case "introspect":
		a.mu.Lock()
		a.introspectCalls++
		a.mu.Unlock()
		// Return the embedding bundle via a structured event so the LLM
		// (next turn) and the dashboard both see it. The bundle answers
		// the anti-failure questions from the original prompt.
		emb, err := a.Introspect(ctx)
		if err != nil {
			return err
		}
		_, _ = a.bus.Emit(ctx, event.Event{
			Kind: "agent.introspect", RunID: a.RunID, AgentID: a.ID,
			Payload: map[string]any{
				"peers":      emb.Peers,
				"zone":       emb.ZoneScope,
				"tools":      emb.Tools,
				"open_tasks": emb.OpenTaskIDs,
				"plan_path":  emb.PlanPath,
				"escalate":   emb.EscalateTo,
			},
		})
		return nil
	case "write_file":
		path, _ := tc.Args["path"].(string)
		content, _ := tc.Args["content"].(string)
		if path == "" {
			return errors.New("write_file: path required")
		}
		if !a.pathAllowed(path) {
			return fmt.Errorf("zone-scope violation: %q not in %v", path, a.ZoneScope)
		}
		// Actually write to disk + record the artifact for downstream
		// evaluators and the UI to discover. The task_id from the
		// incoming envelope is required for the evaluator's
		// "most recent artifact for this task" lookup (V40).
		taskID := ""
		if incoming != nil {
			taskID = incoming.TaskID
		}
		if err := a.writeArtifactFile(ctx, taskID, path, content); err != nil {
			return err
		}
		return nil
	case "request_clarification":
		q, _ := tc.Args["question"].(string)
		to, _ := tc.Args["to"].(string)
		if to == "" {
			to = "master"
		}
		// Propagate the originating task id so the master can record the
		// clarification request against the right row and the eventual
		// response can be routed back via RespondToClarification.
		taskID := ""
		if incoming != nil {
			taskID = incoming.TaskID
		}
		env := &envelope.Envelope{
			ID: fmt.Sprintf("msg_clarify_%s_%d", a.ID, store.Now().UnixNano()),
			RunID: a.RunID, From: a.ID, To: to,
			Type: envelope.TypeClarify, TTLMs: 60000,
			TaskID: taskID,
			Payload: envelope.Payload{Intent: q, Expects: envelope.ExpectsAnswer},
		}
		if err := a.queue.Send(ctx, env); err != nil {
			return err
		}
		// V28: the FSM must reflect that this task is awaiting a user
		// answer. Without the transition, dashboards can't filter
		// "awaiting feedback" tasks, and downstream openTasks queries
		// can't distinguish a working agent from one parked waiting.
		if taskID != "" {
			// Best-effort: legal transition is in_progress→awaiting_clarification.
			// If the task is in some other state, Transition will reject
			// — log via event but don't fail the tool call.
			if err := a.transitionTo(ctx, taskID, "awaiting_clarification"); err != nil {
				_, _ = a.bus.Emit(ctx, event.Event{
					Kind: "agent.tool.clarify_transition_skipped",
					RunID: a.RunID, AgentID: a.ID, TaskID: taskID,
					Payload: map[string]any{"error": err.Error()},
				})
			}
		}
		return nil
	case "send_message":
		raw, _ := json.Marshal(tc.Args)
		var e envelope.Envelope
		if err := json.Unmarshal(raw, &e); err != nil {
			return err
		}
		// V34: identity is server-enforced. The LLM does not get to choose
		// the sender or the run scope — an LLM that tried to spoof
		// "From: master" while delivering a fake report would otherwise
		// trip every downstream handler that trusts the envelope. The
		// runtime overwrites both fields unconditionally.
		if e.From != "" && e.From != a.ID {
			_, _ = a.bus.Emit(ctx, event.Event{
				Kind: event.KindPolicyViolation, RunID: a.RunID, AgentID: a.ID,
				Payload: map[string]any{
					"tool": "send_message", "violation": "from_spoof_attempt",
					"attempted_from": e.From, "actual": a.ID,
				},
			})
		}
		if e.RunID != "" && e.RunID != a.RunID {
			_, _ = a.bus.Emit(ctx, event.Event{
				Kind: event.KindPolicyViolation, RunID: a.RunID, AgentID: a.ID,
				Payload: map[string]any{
					"tool": "send_message", "violation": "run_id_spoof_attempt",
					"attempted_run_id": e.RunID, "actual": a.RunID,
				},
			})
		}
		e.From = a.ID
		e.RunID = a.RunID
		// Auto-fill envelope ID when the LLM didn't provide one.
		// The LLM shouldn't need to mint message ids — that's a
		// harness concern. Without this, perfectly-shaped send_message
		// calls get rejected by the envelope validator as
		// "envelope: id required".
		if e.ID == "" {
			e.ID = fmt.Sprintf("msg_send_%s_%d", a.ID, store.Now().UnixNano())
		}
		if e.TTLMs == 0 {
			e.TTLMs = 60000
		}
		// V84: enforce cross_suborg_requires_connector. When a sender
		// in sub-org A targets a recipient in sub-org B and the policy
		// is enabled, only roles flagged as connectors may bridge the
		// gap. Without this check the policy was advisory text.
		if err := a.checkCrossSubOrg("send_message", e.To); err != nil {
			return err
		}
		// Resolve role-name recipients to agent ids. LLMs naturally
		// write `to: "master"` (the role); the Queue needs an agent
		// id. Only swap when the resolver returns a non-empty id —
		// callers may legitimately address the master role by its
		// literal id "master" in some setups, or the queue's
		// synthetic recipients (e.g. "user", "queue").
		if a.ResolveRoleAgent != nil {
			if id := a.ResolveRoleAgent(e.To); id != "" && id != e.To {
				e.To = id
			}
		}
		return a.queue.Send(ctx, &e)
	case "query_registry":
		// Returns peers/artifacts via a structured event so the LLM can see
		// it on the next turn. Args: {kind: "agents"|"artifacts",
		// filter: {role: "...", task_id: "...", run_id: "..."}}.
		kind, _ := tc.Args["kind"].(string)
		if kind == "" {
			kind = "agents"
		}
		filter, _ := tc.Args["filter"].(map[string]any)
		result := a.queryRegistry(ctx, kind, filter)
		_, _ = a.bus.Emit(ctx, event.Event{
			Kind: "agent.query_registry", RunID: a.RunID, AgentID: a.ID,
			Payload: map[string]any{"kind": kind, "filter": filter, "results": result},
		})
		return nil
	case "setup_zone":
		// Zoning agent registers a new zone. Args: {zone_id, name,
		// path_globs[], owner_role, doc_path}.
		zoneID, _ := tc.Args["zone_id"].(string)
		name, _ := tc.Args["name"].(string)
		ownerRole, _ := tc.Args["owner_role"].(string)
		docPath, _ := tc.Args["doc_path"].(string)
		globsRaw, _ := tc.Args["path_globs"].([]any)
		if zoneID == "" || len(globsRaw) == 0 {
			return errors.New("setup_zone: zone_id and path_globs required")
		}
		globs := make([]string, 0, len(globsRaw))
		for _, g := range globsRaw {
			if s, ok := g.(string); ok {
				globs = append(globs, s)
			}
		}
		globsJSON, _ := json.Marshal(globs)
		// V96: zone-ownership-hijack guard. If a zone with this id already
		// has an owner_agent_id and it's NOT this agent, refuse the upsert
		// — otherwise any agent could steal another team's zone by setting
		// up a zone with their id. We also write owner_agent_id on the
		// INSERT path so newly-claimed zones have a real owner.
		return a.st.Tx(ctx, func(q store.Querier) error {
			var existingOwner string
			_ = q.QueryRow(`SELECT IFNULL(owner_agent_id,'') FROM zones WHERE id=?`, zoneID).Scan(&existingOwner)
			if existingOwner != "" && existingOwner != a.ID {
				_, _ = a.bus.EmitInTx(q, event.Event{
					Kind: event.KindPolicyViolation, RunID: a.RunID, AgentID: a.ID,
					Payload: map[string]any{
						"kind": "zone_hijack_attempt", "zone_id": zoneID,
						"existing_owner": existingOwner, "attempted_by": a.ID,
					},
				})
				return fmt.Errorf("zone %q already owned by %s", zoneID, existingOwner)
			}
			_, err := q.Exec(
				`INSERT INTO zones(id, name, path_globs_json, owner_agent_id, doc_path) VALUES(?, ?, ?, ?, ?)
				 ON CONFLICT(id) DO UPDATE SET name=excluded.name, path_globs_json=excluded.path_globs_json, doc_path=excluded.doc_path`,
				zoneID, name, string(globsJSON), a.ID, docPath,
			)
			if err != nil {
				return err
			}
			_, _ = a.bus.Emit(ctx, event.Event{
				Kind: "agent.zone_setup", RunID: a.RunID, AgentID: a.ID,
				Payload: map[string]any{
					"zone_id": zoneID, "name": name,
					"globs": globs, "owner": a.ID, "owner_role": ownerRole, "doc_path": docPath,
				},
			})
			return nil
		})
	case "spawn_agent":
		// Lets a responsible agent (e.g. master, lead) instantiate a fresh
		// teammate at runtime. Args: {role: "...", provider: "...",
		// zone_scope: [...], tools: [...]}. Bounded by role allowlist
		// already; the harness emits agent.spawned_dynamically.
		role, _ := tc.Args["role"].(string)
		provider, _ := tc.Args["provider"].(string)
		if role == "" {
			return errors.New("spawn_agent: role required")
		}
		_, _ = a.bus.Emit(ctx, event.Event{
			Kind: "agent.spawn_requested", RunID: a.RunID, AgentID: a.ID,
			Payload: map[string]any{"role": role, "provider": provider, "by": a.ID},
		})
		// The actual spawn is handled by the orchestrator's DynamicSpawner
		// (wired in runlive.Run). We persist the request so the spawner
		// picks it up on its next sweep.
		return a.requestDynamicSpawn(ctx, role, provider, tc.Args)
	case "forward_with_translation":
		// The connector's tool. Wraps a message for the target side with a
		// translation note. Plan §4.2.
		to, _ := tc.Args["to"].(string)
		intent, _ := tc.Args["intent"].(string)
		translation, _ := tc.Args["translation_note"].(string)
		taskID, _ := tc.Args["task_id"].(string)
		if to == "" || intent == "" {
			return errors.New("forward_with_translation: to + intent required")
		}
		extra := map[string]any{}
		if translation != "" {
			extra["translation_note"] = translation
		}
		env := &envelope.Envelope{
			ID:    fmt.Sprintf("msg_fwd_%s_%d", a.ID, nextFwdID()),
			RunID: a.RunID, From: a.ID, To: to,
			Type: envelope.TypeDelegate, TaskID: taskID, TTLMs: 60000,
			Payload: envelope.Payload{
				Intent: intent, Expects: envelope.ExpectsReport, Extra: extra,
			},
		}
		return a.queue.Send(ctx, env)
	case "delegate":
		// V46+V57: a worker delegating a sub-task to a downstream role.
		// Creates a child task under the agent's current task so the
		// task tree is no longer flat, then enqueues a Delegate envelope
		// for the target.
		toRole, _ := tc.Args["to"].(string)
		intent, _ := tc.Args["intent"].(string)
		if toRole == "" || intent == "" {
			return errors.New("delegate: to + intent required")
		}
		// Caller's current task becomes parent_task_id; without an
		// incoming envelope (synthetic spawn turn) we refuse — agents
		// can't delegate before they've been assigned work.
		if incoming == nil || incoming.TaskID == "" {
			return errors.New("delegate: no current task to attach as parent (only callable when handling a delegate/report)")
		}
		// V84: same cross-suborg gate as send_message.
		if err := a.checkCrossSubOrg("delegate", toRole); err != nil {
			return err
		}
		// Resolve role name → agent id (lazy-spawning if needed). LLMs
		// naturally write `to: "fe-worker"` (the role); the Queue needs
		// a real agent id to deliver. Without this step the envelope
		// gets dropped silently and the worker never wakes up.
		recipient := toRole
		if a.ResolveRoleAgent != nil {
			if id := a.ResolveRoleAgent(toRole); id != "" {
				recipient = id
			} else {
				_, _ = a.bus.Emit(ctx, event.Event{
					Kind: "agent.delegate.unresolved", RunID: a.RunID, AgentID: a.ID,
					Payload: map[string]any{"target_role": toRole, "hint": "no agent registered for this role and lazy spawn failed"},
				})
				return fmt.Errorf("delegate: could not resolve role %q to a live agent", toRole)
			}
		}
		parentTaskID := incoming.TaskID
		childTaskID := fmt.Sprintf("task_%d_%d", store.Now().UnixNano(), nextFwdID())
		now := store.FmtTime(store.Now())
		title := intent
		if len(title) > 80 {
			title = title[:79] + "…"
		}
		// Set owner_agent_id at creation so the dead-agent reaper can fail
		// this task if the recipient dies before completing. Without an
		// owner, ReapDeadAgents skips the row and the task lingers in
		// 'created' forever — keeping openTasks()>0 and preventing the
		// run from ending. Workers transition created→assigned→in_progress
		// when they actually start handling, but creation-time ownership
		// is what makes the row claimable by reapers.
		err := a.st.Tx(ctx, func(q store.Querier) error {
			if _, err := q.Exec(
				`INSERT INTO tasks(id, run_id, parent_task_id, owner_agent_id, title, state, attempts, created_at, updated_at)
				 VALUES(?, ?, ?, ?, ?, 'created', 0, ?, ?)`,
				childTaskID, a.RunID, parentTaskID, recipient, title, now, now,
			); err != nil {
				return err
			}
			return nil
		})
		if err != nil {
			return fmt.Errorf("delegate: create child task: %w", err)
		}
		env := &envelope.Envelope{
			ID: fmt.Sprintf("msg_del_%s_%d", a.ID, store.Now().UnixNano()),
			RunID: a.RunID, From: a.ID, To: recipient,
			Type: envelope.TypeDelegate, TaskID: childTaskID, TTLMs: 60000,
			Payload: envelope.Payload{Intent: intent, Expects: envelope.ExpectsReport},
		}
		if err := a.queue.Send(ctx, env); err != nil {
			return err
		}
		_, _ = a.bus.Emit(ctx, event.Event{
			Kind: "agent.delegated", RunID: a.RunID, AgentID: a.ID, TaskID: childTaskID,
			Payload: map[string]any{
				"parent_task_id": parentTaskID, "to_role": toRole, "to_agent": recipient, "intent": intent,
			},
		})
		return nil
	case "ask_user":
		// V46: master/agent asks the user a direct question (vs request_clarification
		// which is an internal-loop primitive). Files a user_notification of kind
		// 'clarification' so the UI surfaces it.
		question, _ := tc.Args["question"].(string)
		if question == "" {
			return errors.New("ask_user: question required")
		}
		notifID := fmt.Sprintf("notif_ask_%s_%d", a.ID, store.Now().UnixNano())
		taskID := ""
		if incoming != nil {
			taskID = incoming.TaskID
		}
		_ = a.st.Tx(ctx, func(q store.Querier) error {
			_, err := q.Exec(
				`INSERT INTO user_notifications(id, run_id, agent_id, kind, severity, body_text, created_at) VALUES(?, ?, ?, 'clarification', 'info', ?, ?)`,
				notifID, a.RunID, a.ID, question, store.Now().Unix(),
			)
			return err
		})
		_, _ = a.bus.Emit(ctx, event.Event{
			Kind: "agent.ask_user", RunID: a.RunID, AgentID: a.ID, TaskID: taskID,
			Payload: map[string]any{"question": question, "notification_id": notifID},
		})
		return nil
	case "classify_prompt":
		// V46: master classifies an inbound user prompt. The harness
		// already does classification at run-start; this tool exists so
		// the master LLM can re-classify a follow-up. Emits an event;
		// callers (master.ProcessInbox) can re-route on the result.
		text, _ := tc.Args["text"].(string)
		if text == "" {
			return errors.New("classify_prompt: text required")
		}
		_, _ = a.bus.Emit(ctx, event.Event{
			Kind: "agent.classify_prompt", RunID: a.RunID, AgentID: a.ID,
			Payload: map[string]any{
				"text_prefix": trunc(text, 200),
				"note":        "classification surfaced for caller to act on; harness does not re-route automatically",
			},
		})
		return nil
	case "complete_task":
		// Responsible-agent primitive: an agent (typically master or a
		// lead) declares one of its delegated tasks done. The actual
		// state transition + side effects (auto-evaluator, user
		// notification) are wired by the runlive layer via the
		// CompleteTaskHook callback — Agent itself doesn't import the
		// orchestrator. Args: {task_id, outcome?: "completed"|"failed",
		// summary?: string}. Defaults to completed.
		taskID, _ := tc.Args["task_id"].(string)
		if taskID == "" && incoming != nil {
			taskID = incoming.TaskID
		}
		if taskID == "" {
			return errors.New("complete_task: task_id required")
		}
		outcome, _ := tc.Args["outcome"].(string)
		if outcome == "" {
			outcome = "completed"
		}
		summary, _ := tc.Args["summary"].(string)
		_, _ = a.bus.Emit(ctx, event.Event{
			Kind: "agent.complete_task", RunID: a.RunID, AgentID: a.ID, TaskID: taskID,
			Payload: map[string]any{"outcome": outcome, "summary": summary, "by": a.ID},
		})
		if a.CompleteTaskHook != nil {
			return a.CompleteTaskHook(ctx, taskID, outcome, summary)
		}
		// Fallback: no hook installed (test rigs, deterministic mode).
		// Do an unvalidated direct transition so the task at least
		// reflects the decision; orchestrator-driven side-effects
		// (evaluator, user notification) are skipped.
		return a.transitionTo(ctx, taskID, outcome)
	case "org.list", "org.get", "org.write", "org.validate", "org.delete",
		"role.get", "role.write":
		return a.executeConfigTool(ctx, tc)
	case "self_report":
		// SR1: structured agent self-identification. Captures the
		// invariants the harness already knows about this agent (role,
		// provider, sub_org, tools, system_prompt_sha) plus an optional
		// note string from the LLM. Emitted as a first-class event so:
		//   - the grader can answer "did every declared agent identify itself?"
		//   - the dashboard surfaces who-is-who without inferring from logs
		//   - the harness CLI has a single source of truth for the run topology
		// We also persist a per-agent artifact (one JSON file) so the report
		// survives event-table pruning and shows up in the run dir alongside
		// turn artifacts (full transparency requirement).
		notes, _ := tc.Args["notes"].(string)
		incomingTaskID := ""
		incomingMsgID := ""
		if incoming != nil {
			incomingTaskID = incoming.TaskID
			incomingMsgID = incoming.ID
		}
		report := map[string]any{
			"agent_id":       a.ID,
			"role":           a.Role,
			"provider":       a.Provider,
			"sub_org":        a.SubOrg,
			"is_connector":   a.IsConnector,
			"tools":          a.Tools,
			"zone_scope":     a.ZoneScope,
			"task_id":        incomingTaskID,
			"in_reply_to":    incomingMsgID,
			"notes":          notes,
			"ts":             store.FmtTime(store.Now()),
		}
		_, _ = a.bus.Emit(ctx, event.Event{
			Kind: "agent.self_report", RunID: a.RunID, AgentID: a.ID, TaskID: incomingTaskID,
			Payload: report,
		})
		// Persist artifact: .td/runs/<runID>/agents/<agentID>/self-report.json.
		// Append a numbered suffix on subsequent self-reports so multiple
		// introspection requests don't clobber the previous one.
		if a.ArtifactsDir != "" || a.RunID != "" {
			dir := a.artifactsDirEnsuring(ctx)
			if dir != "" {
				name := "self-report.json"
				path := filepath.Join(dir, name)
				if _, statErr := os.Stat(path); statErr == nil {
					name = fmt.Sprintf("self-report.%d.json", time.Now().UnixNano())
					path = filepath.Join(dir, name)
				}
				b, _ := json.MarshalIndent(report, "", "  ")
				_ = os.WriteFile(path, b, 0o644)
			}
		}
		return nil
	case "report_environment_issue":
		// L8: agent self-report. When a worker detects that it can't
		// operate (CLI returning empty bodies, missing tool on PATH,
		// auth flow blocking, modal in the way), it emits this tool so
		// the harness records the issue against the run + escalates
		// to the user / parent. Promoted to a structured event so the
		// dashboard and the grader both surface it.
		category, _ := tc.Args["category"].(string)
		detail, _ := tc.Args["detail"].(string)
		severity, _ := tc.Args["severity"].(string)
		if category == "" || detail == "" {
			return errors.New("report_environment_issue: category + detail required")
		}
		if severity == "" {
			severity = "warn"
		}
		_, _ = a.bus.Emit(ctx, event.Event{
			Kind: "agent.environment_issue", RunID: a.RunID, AgentID: a.ID,
			Payload: map[string]any{
				"category": category, "detail": detail, "severity": severity,
			},
		})
		// File a user notification for "blocker" severity so the human
		// sees it in the runs UI without having to inspect events.
		if severity == "blocker" {
			notifID := fmt.Sprintf("notif_envissue_%s_%d", a.ID, store.Now().UnixNano())
			_ = a.st.Tx(ctx, func(q store.Querier) error {
				_, err := q.Exec(
					`INSERT INTO user_notifications(id, run_id, agent_id, kind, severity, body_text, created_at) VALUES(?, ?, ?, 'environment_issue', ?, ?, ?)`,
					notifID, a.RunID, a.ID, severity,
					fmt.Sprintf("[%s] %s", category, detail),
					store.Now().Unix(),
				)
				return err
			})
		}
		return nil
	}
	return fmt.Errorf("unknown tool %q", tc.Name)
}

// executeConfigTool dispatches org.* and role.* tool calls against the
// installed configsvc.Service. Every call emits an `agent.config.tool`
// event with the structured result so the dashboard can show what
// the LLM saw — keeps the AI-vs-human surface symmetric.
func (a *Agent) executeConfigTool(ctx context.Context, tc ToolCall) error {
	if a.cfgSvc == nil {
		return errors.New("config tools unavailable: SetConfigSvc was never called on this agent")
	}
	emit := func(payload map[string]any) {
		payload["tool"] = tc.Name
		_, _ = a.bus.Emit(ctx, event.Event{
			Kind: "agent.config.tool", RunID: a.RunID, AgentID: a.ID,
			Payload: payload,
		})
	}
	switch tc.Name {
	case "org.list":
		orgs, errs, err := a.cfgSvc.ListOrgs()
		if err != nil {
			return err
		}
		emit(map[string]any{"orgs": orgs, "errors": errs})
		return nil
	case "org.get":
		name, _ := tc.Args["name"].(string)
		if name == "" {
			return errors.New("org.get: name required")
		}
		def, raw, err := a.cfgSvc.GetOrg(name)
		if err != nil && def == nil {
			return err
		}
		payload := map[string]any{"name": name, "def": def, "raw": string(raw)}
		if err != nil {
			payload["parse_error"] = err.Error()
		}
		emit(payload)
		return nil
	case "org.validate":
		yamlBody, _ := tc.Args["yaml"].(string)
		if yamlBody == "" {
			return errors.New("org.validate: yaml required")
		}
		_, res := a.cfgSvc.ValidateCandidate(yamlBody)
		emit(map[string]any{"validation": res})
		return nil
	case "org.write":
		name, _ := tc.Args["name"].(string)
		yamlBody, _ := tc.Args["yaml"].(string)
		overwrite, _ := tc.Args["overwrite"].(bool)
		if name == "" || yamlBody == "" {
			return errors.New("org.write: name + yaml required")
		}
		def, res, err := a.cfgSvc.WriteOrg(name, yamlBody, overwrite)
		if err != nil {
			emit(map[string]any{"name": name, "validation": res, "error": err.Error()})
			return err
		}
		emit(map[string]any{"name": name, "def": def, "validation": res})
		if !res.OK {
			// Validation failed: return error so the LLM sees the tool
			// rejected its candidate (and the next turn can retry).
			return fmt.Errorf("org.write: candidate failed validation; see .validation in event payload")
		}
		return nil
	case "org.delete":
		name, _ := tc.Args["name"].(string)
		if name == "" {
			return errors.New("org.delete: name required")
		}
		if err := a.cfgSvc.DeleteOrg(name); err != nil {
			return err
		}
		emit(map[string]any{"name": name, "deleted": true})
		return nil
	case "role.get":
		set, _ := tc.Args["set"].(string)
		id, _ := tc.Args["id"].(string)
		if set == "" || id == "" {
			return errors.New("role.get: set + id required")
		}
		doc, err := a.cfgSvc.GetRole(set, id)
		if err != nil {
			return err
		}
		emit(map[string]any{"role": doc})
		return nil
	case "role.write":
		set, _ := tc.Args["set"].(string)
		id, _ := tc.Args["id"].(string)
		md, _ := tc.Args["markdown"].(string)
		createSet, _ := tc.Args["create_set"].(bool)
		if set == "" || id == "" {
			return errors.New("role.write: set + id required")
		}
		doc, err := a.cfgSvc.WriteRole(set, id, md, createSet)
		if err != nil {
			return err
		}
		emit(map[string]any{"role": doc})
		return nil
	}
	return fmt.Errorf("unknown config tool %q", tc.Name)
}

// implicitTools are always allowed regardless of the per-role allowlist.
// They cover pure-introspection / self-status emits that carry no
// security surface — refusing them would just force every org yaml to
// list them everywhere. Future tools that report agent state should
// be added here, not require allowlist plumbing in every existing org.
var implicitTools = map[string]bool{
	"self_report":              true,
	"report_environment_issue": true,
	// Responsible-agent primitive: any agent may declare its delegated
	// task done. Worker mis-use is harmless — they can only complete
	// tasks they own; runlive's CompleteTaskHook gates the side-effects.
	"complete_task": true,
}

func (a *Agent) allowed(name string) bool {
	if implicitTools[name] {
		return true
	}
	for _, t := range a.Tools {
		if t == name {
			return true
		}
	}
	return false
}

// pathAllowed checks if path matches any glob in ZoneScope. Match semantics
// are filepath.Match against the cleaned path. Empty zone scope = no writes
// allowed (closed-default).
//
// Special case: paths under .td/runs/<this-run-id>/ are always allowed.
// That's harness operational state — plans, reports, per-turn artifacts —
// not project code. Agents need to write their reports here regardless of
// which project zone they're scoped to. Without this, every role md's
// "store reports at .td/runs/.../report.md" convention would be rejected.
func (a *Agent) pathAllowed(path string) bool {
	clean := filepath.Clean(path)
	// Operational-state escape hatch: this run's own artifact directory.
	if a.RunID != "" {
		runPrefix := filepath.Clean(filepath.Join(".td", "runs", a.RunID)) + string(filepath.Separator)
		if strings.HasPrefix(clean, runPrefix) || clean == strings.TrimSuffix(runPrefix, string(filepath.Separator)) {
			return true
		}
	}
	for _, g := range a.ZoneScope {
		if strings.Contains(g, "**/") {
			prefix := strings.SplitN(g, "**/", 2)[0]
			rest := strings.SplitN(g, "**/", 2)[1]
			if strings.HasPrefix(clean, prefix) && matchSuffix(clean, rest) {
				return true
			}
			continue
		}
		ok, _ := filepath.Match(g, clean)
		if ok {
			return true
		}
		if strings.HasSuffix(g, "/") && strings.HasPrefix(clean, g) {
			return true
		}
	}
	return false
}

func matchSuffix(path, suffix string) bool {
	// Allow suffix=glob like "*.vue".
	parts := strings.Split(path, string(filepath.Separator))
	last := parts[len(parts)-1]
	ok, _ := filepath.Match(suffix, last)
	return ok
}

// IntrospectCalls reports how many times the agent called introspect.
// Test-only accessor.
func (a *Agent) IntrospectCalls() int {
	a.mu.Lock()
	defer a.mu.Unlock()
	return a.introspectCalls
}

// ParseFailures reports the count of parse-failure retries.
func (a *Agent) ParseFailures() int {
	a.mu.Lock()
	defer a.mu.Unlock()
	return a.parseFailures
}

// TokensTotal returns the agent's running token total.
func (a *Agent) TokensTotal() int64 {
	a.mu.Lock()
	defer a.mu.Unlock()
	return a.tokensTotal
}

// artifactsDirEnsuring returns the directory where per-agent artifacts
// should be written, creating it if needed. Centralises the path logic
// shared by persistTurn, persistFailedTurn, and self_report so all three
// agree on where the run dir lives.
func (a *Agent) artifactsDirEnsuring(_ context.Context) string {
	dir := a.ArtifactsDir
	if dir == "" {
		dir = filepath.Join(".td", "runs", a.RunID, "agents", a.ID)
	}
	if !filepath.IsAbs(dir) && a.ProjectRoot != "" {
		dir = filepath.Join(a.ProjectRoot, dir)
	}
	if err := os.MkdirAll(dir, 0o755); err != nil {
		return ""
	}
	return dir
}

// persistTurn writes the (incoming, llm response, tools called) tuple of one
// turn to a JSON file under ArtifactsDir. Cheap, replayable, inspectable.
//
// Path: <ArtifactsDir>/turn-<n>.json. If ArtifactsDir is unset, defaults to
// .td/runs/<run-id>/agents/<agent-id>/. Errors are swallowed — persistence
// is best-effort observability, not a correctness path.
func (a *Agent) persistTurn(ctx context.Context, in *envelope.Envelope, resp *LLMResponse, dur time.Duration) {
	a.mu.Lock()
	a.turnCounter++
	n := a.turnCounter
	dir := a.ArtifactsDir
	a.mu.Unlock()
	if dir == "" {
		dir = filepath.Join(".td", "runs", a.RunID, "agents", a.ID)
	}
	if !filepath.IsAbs(dir) && a.ProjectRoot != "" {
		dir = filepath.Join(a.ProjectRoot, dir)
	}
	if err := os.MkdirAll(dir, 0o755); err != nil {
		return
	}
	turn := map[string]any{
		"turn":           n,
		"agent_id":       a.ID,
		"role":           a.Role,
		"run_id":         a.RunID,
		"ts":             time.Now().UTC().Format(time.RFC3339Nano),
		"duration_ms":    dur.Milliseconds(),
		"incoming":       in,
		"llm_text":       resp.Text,
		"tool_calls":     resp.ToolCalls,
		"tokens":         resp.Tokens,
		"parse_failures": resp.ParseFailures,
	}
	raw, err := json.MarshalIndent(turn, "", "  ")
	if err != nil {
		return
	}
	_ = os.WriteFile(filepath.Join(dir, fmt.Sprintf("turn-%04d.json", n)), raw, 0o644)
}

// persistFailedTurn (L6/V116) writes a forensic artifact whenever CallLLM
// returns an error. The artifact captures EVERYTHING we can salvage:
//   - the incoming message that triggered the turn
//   - the raw LLM output (resp.Text — non-nil even on parse failure when
//     the runtime returns the bytes it received before giving up)
//   - the error from the runtime (parse_failures, "max retries exhausted",
//     "context deadline", whatever)
//
// Returns the artifact path so the caller can include it in the
// agent.llm.call_failed event payload — that way the operator can navigate
// from the event log straight to the forensic file without grepping disk.
//
// Without this, the model's actual output (the thing that would tell us
// WHY parsing failed — e.g. "the stop hook errored and Claude Code never
// emitted the final block") is thrown away. The live tmux pane is the
// ONLY place it lived, and panes get cleaned up.
func (a *Agent) persistFailedTurn(ctx context.Context, in *envelope.Envelope, resp *LLMResponse, callErr error, dur time.Duration) string {
	a.mu.Lock()
	a.turnCounter++
	n := a.turnCounter
	dir := a.ArtifactsDir
	a.mu.Unlock()
	if dir == "" {
		dir = filepath.Join(".td", "runs", a.RunID, "agents", a.ID)
	}
	if !filepath.IsAbs(dir) && a.ProjectRoot != "" {
		dir = filepath.Join(a.ProjectRoot, dir)
	}
	if err := os.MkdirAll(dir, 0o755); err != nil {
		return ""
	}
	failed := map[string]any{
		"turn":          n,
		"agent_id":      a.ID,
		"role":          a.Role,
		"run_id":        a.RunID,
		"ts":            time.Now().UTC().Format(time.RFC3339Nano),
		"duration_ms":   dur.Milliseconds(),
		"incoming":      in,
		"error":         callErr.Error(),
	}
	if resp != nil {
		failed["raw_llm_text"] = resp.Text
		failed["raw_tool_calls"] = resp.ToolCalls
		failed["parse_failures"] = resp.ParseFailures
		failed["tokens"] = resp.Tokens
	}
	raw, jerr := json.MarshalIndent(failed, "", "  ")
	if jerr != nil {
		// Even marshal failed; write a plain-text fallback rather than drop.
		path := filepath.Join(dir, fmt.Sprintf("turn-%04d.failed.txt", n))
		_ = os.WriteFile(path, []byte(fmt.Sprintf("turn=%d agent=%s err=%s marshal_err=%s\n",
			n, a.ID, callErr.Error(), jerr.Error())), 0o644)
		return path
	}
	path := filepath.Join(dir, fmt.Sprintf("turn-%04d.failed.json", n))
	if err := os.WriteFile(path, raw, 0o644); err != nil {
		return ""
	}
	return path
}

// TurnCount returns the number of turns this agent has processed.
func (a *Agent) TurnCount() int {
	a.mu.Lock()
	defer a.mu.Unlock()
	return a.turnCounter
}

// writeArtifactFile writes content to path (creating parent dirs), computes
// sha256, inserts an artifact row, and emits agent.artifact_written.
//
// `path` may be relative; it's joined to a.ProjectRoot before the syscall.
// The artifact row records the resolved absolute path so downstream
// inspection works regardless of which subdir invoked it. The zone-check
// gate runs on the *relative* (user-meaningful) path before this is called.
//
// V35: defence-in-depth on the final resolved path. The zone check earlier
// uses a logical glob match, but a malicious or buggy LLM could provide an
// absolute path or a path containing a symlink that escapes the project
// root. Here we resolve the parent directory's real path and verify it
// remains inside ProjectRoot. Absolute paths are rejected outright unless
// ProjectRoot is empty (test/sandbox mode where the agent can write any-
// where intentionally).
func (a *Agent) writeArtifactFile(ctx context.Context, taskID, path, content string) error {
	if a.ProjectRoot != "" && filepath.IsAbs(path) {
		return fmt.Errorf("write_file: absolute paths not permitted (got %q)", path)
	}
	abs := path
	if !filepath.IsAbs(abs) && a.ProjectRoot != "" {
		abs = filepath.Join(a.ProjectRoot, path)
	}
	if a.ProjectRoot != "" {
		rootAbs, err := filepath.Abs(a.ProjectRoot)
		if err != nil {
			return fmt.Errorf("write_file: resolve project root: %w", err)
		}
		// Normalize root through symlinks too. On macOS /tmp is a symlink
		// to /private/tmp; without this, a write path under /tmp would
		// resolve to /private/tmp/... and falsely look like an escape.
		if rootReal, err := filepath.EvalSymlinks(rootAbs); err == nil {
			rootAbs = rootReal
		}
		// Resolve the parent dir's real path (handles symlinks). Walk up
		// until we find an existing ancestor — required because the
		// target file's own parent may not exist yet.
		ancestor := filepath.Dir(abs)
		for {
			if _, err := os.Stat(ancestor); err == nil {
				break
			}
			parent := filepath.Dir(ancestor)
			if parent == ancestor {
				break
			}
			ancestor = parent
		}
		real, err := filepath.EvalSymlinks(ancestor)
		if err == nil {
			rel, err := filepath.Rel(rootAbs, real)
			if err != nil || strings.HasPrefix(rel, "..") || rel == ".." {
				return fmt.Errorf("write_file: target escapes project root (resolved %q, root %q)", real, rootAbs)
			}
		}
	}
	if err := os.MkdirAll(filepath.Dir(abs), 0o755); err != nil {
		return fmt.Errorf("write_file: mkdir %s: %w", filepath.Dir(abs), err)
	}
	if err := os.WriteFile(abs, []byte(content), 0o644); err != nil {
		return fmt.Errorf("write_file: %s: %w", abs, err)
	}
	sum := sha256Hex(content)
	id := fmt.Sprintf("art_%s_%d", a.ID, store.Now().UnixNano())
	var taskIDArg any
	if taskID != "" {
		taskIDArg = taskID
	}
	_ = a.st.Tx(ctx, func(q store.Querier) error {
		_, err := q.Exec(
			`INSERT INTO artifacts(id, task_id, kind, path, sha256, created_at) VALUES(?, ?, 'file', ?, ?, ?)`,
			id, taskIDArg, abs, sum, store.FmtTime(store.Now()),
		)
		return err
	})
	_, _ = a.bus.Emit(ctx, event.Event{
		Kind: "agent.artifact_written", RunID: a.RunID, AgentID: a.ID,
		Payload: map[string]any{"path": abs, "rel_path": path, "bytes": len(content), "sha256": sum},
	})
	return nil
}

// queryRegistry resolves a structured query against the run's registry.
// Returns a JSON-serializable slice the caller emits as event payload.
func (a *Agent) queryRegistry(ctx context.Context, kind string, filter map[string]any) []map[string]any {
	switch kind {
	case "agents", "team":
		// LM-D: returns a richer per-agent record so the master LLM can
		// reason about who to delegate to. `team` is an alias for
		// `agents` (clearer naming for the master's introspect-then-
		// delegate flow). Includes provider + heartbeat + the id of any
		// in-flight task the agent owns.
		role, _ := filter["role"].(string)
		q := `SELECT a.id, IFNULL(a.role,''), IFNULL(a.provider,''), a.status, IFNULL(a.heartbeat_at,''),
		             (SELECT id FROM tasks WHERE owner_agent_id=a.id AND state IN ('assigned','in_progress','awaiting_clarification','awaiting_subtask') ORDER BY created_at DESC LIMIT 1) AS in_flight
		        FROM agents a WHERE a.run_id=?`
		args := []any{a.RunID}
		if role != "" {
			q += ` AND a.role=?`
			args = append(args, role)
		}
		q += ` ORDER BY a.spawned_at ASC LIMIT 100`
		rows, err := a.st.DB().QueryContext(ctx, q, args...)
		if err != nil {
			return nil
		}
		defer rows.Close()
		var out []map[string]any
		for rows.Next() {
			var id, r, prov, s, hb string
			var inFlight *string
			_ = rows.Scan(&id, &r, &prov, &s, &hb, &inFlight)
			entry := map[string]any{
				"agent_id": id, "role": r, "provider": prov, "status": s, "heartbeat_at": hb,
			}
			if inFlight != nil && *inFlight != "" {
				entry["in_flight_task_id"] = *inFlight
			}
			out = append(out, entry)
		}
		return out
	case "artifacts":
		// V64: scope to this run. Without the JOIN, an agent in run A
		// could enumerate artifacts written by an agent in run B —
		// cross-run information leak.
		tid, _ := filter["task_id"].(string)
		q := `SELECT a.id, a.kind, a.path, a.sha256, a.created_at
		        FROM artifacts a
		   LEFT JOIN tasks t ON t.id = a.task_id
		       WHERE (a.task_id IS NULL OR t.run_id = ?)`
		args := []any{a.RunID}
		if tid != "" {
			q += ` AND a.task_id=?`
			args = append(args, tid)
		}
		q += ` ORDER BY a.created_at DESC LIMIT 50`
		rows, err := a.st.DB().QueryContext(ctx, q, args...)
		if err != nil {
			return nil
		}
		defer rows.Close()
		var out []map[string]any
		for rows.Next() {
			var id, k, p, sum, ts string
			_ = rows.Scan(&id, &k, &p, &sum, &ts)
			out = append(out, map[string]any{
				"id": id, "kind": k, "path": p, "sha256": sum, "created_at": ts,
			})
		}
		return out
	case "tasks":
		rows, err := a.st.DB().QueryContext(ctx,
			`SELECT id, title, state, IFNULL(owner_agent_id,'') FROM tasks WHERE run_id=? ORDER BY created_at ASC LIMIT 100`,
			a.RunID,
		)
		if err != nil {
			return nil
		}
		defer rows.Close()
		var out []map[string]any
		for rows.Next() {
			var id, title, state, owner string
			_ = rows.Scan(&id, &title, &state, &owner)
			out = append(out, map[string]any{
				"task_id": id, "title": title, "state": state, "owner": owner,
			})
		}
		return out
	}
	return nil
}

// requestDynamicSpawn inserts a dynamic_spawn_requests row that the
// orchestrator's spawner polls. This is the "agents can set up agents"
// path the original prompt asked for.
//
// V52 dedup: if a pending or fulfilled request for the same (run, role)
// already exists, short-circuit instead of inserting a duplicate. An LLM
// in a loop calling spawn_agent every turn would otherwise fill the
// table with pending rows. Emits agent.spawn_request_deduped so the
// loop is observable.
//
// (The v8 migration created the table; the ad-hoc CREATE TABLE IF NOT
// EXISTS remains for the transition window so deployments mid-upgrade
// don't race on schema availability.)
func (a *Agent) requestDynamicSpawn(ctx context.Context, role, provider string, args map[string]any) error {
	return a.st.Tx(ctx, func(q store.Querier) error {
		if _, err := q.Exec(`CREATE TABLE IF NOT EXISTS dynamic_spawn_requests (
			id TEXT PRIMARY KEY,
			run_id TEXT NOT NULL,
			requested_by TEXT NOT NULL,
			role TEXT NOT NULL,
			provider TEXT,
			args_json TEXT,
			status TEXT NOT NULL DEFAULT 'pending',
			created_at TEXT NOT NULL,
			fulfilled_agent_id TEXT
		)`); err != nil {
			return err
		}
		// Dedup: skip if a pending or fulfilled request already exists
		// for this (run, role).
		var existing int
		_ = q.QueryRow(
			`SELECT COUNT(*) FROM dynamic_spawn_requests
			   WHERE run_id=? AND role=? AND status IN ('pending','fulfilled')`,
			a.RunID, role,
		).Scan(&existing)
		if existing > 0 {
			_, _ = a.bus.EmitInTx(q, event.Event{
				Kind: "agent.spawn_request_deduped", RunID: a.RunID, AgentID: a.ID,
				Payload: map[string]any{"role": role, "reason": "already pending or fulfilled"},
			})
			return nil
		}
		raw, _ := json.Marshal(args)
		_, err := q.Exec(
			`INSERT INTO dynamic_spawn_requests(id, run_id, requested_by, role, provider, args_json, created_at)
			 VALUES(?, ?, ?, ?, ?, ?, ?)`,
			fmt.Sprintf("dsr_%d", store.Now().UnixNano()),
			a.RunID, a.ID, role, provider, string(raw),
			store.FmtTime(store.Now()),
		)
		return err
	})
}

// sha256Hex is a small helper to avoid importing crypto in two places.
func sha256Hex(s string) string {
	// Local-only to avoid leaking the dependency; orchestrator/policy.go
	// already imports crypto/sha256 for intentHash.
	return sha256Sum(s)
}

// fwdCounter avoids ID collisions when an agent emits multiple
// forward_with_translation calls inside a single turn (same nanosecond).
var (
	fwdMu      sync.Mutex
	fwdCounter int64
)

func nextFwdID() int64 {
	fwdMu.Lock()
	defer fwdMu.Unlock()
	fwdCounter++
	return store.Now().UnixNano() + fwdCounter
}
