package orchestrator

import (
	"context"
	"database/sql"
	"strings"
	"time"

	"github.com/flothus/tmux-xterm-research/server-go/internal/harness/event"
	"github.com/flothus/tmux-xterm-research/server-go/internal/harness/store"
)

// SweepOrphanedAgents marks 'running'/'spawning' agents as terminated when
// their parent run has already reached a terminal status. These are
// orphans from interrupted previous sessions — the per-run cleanup didn't
// run, so the agents table lies about the world.
//
// V102: called at runlive startup alongside SweepOrphanedRuns.
func (o *Orchestrator) SweepOrphanedAgents(ctx context.Context) (int, error) {
	now := store.FmtTime(store.Now())
	var swept int
	err := o.St.Tx(ctx, func(q store.Querier) error {
		res, err := q.Exec(
			`UPDATE agents SET status='terminated', terminated_at=?
			   WHERE status IN ('running','spawning')
			     AND run_id IN (
			       SELECT id FROM runs WHERE status NOT IN ('running','awaiting_user')
			     )`,
			now,
		)
		if err != nil {
			return err
		}
		n, _ := res.RowsAffected()
		swept = int(n)
		return nil
	})
	if err != nil {
		return 0, err
	}
	if swept > 0 {
		_, _ = o.Bus.Emit(ctx, event.Event{
			Kind: "reaper.orphan_agents_swept",
			Payload: map[string]any{"count": swept},
		})
	}
	return swept, nil
}

// SweepOrphanedRuns ends 'running' runs whose started_at is older than
// olderThan AND have had no events recently. These are orphans from
// interrupted previous sessions — no per-run reaperLoop survives across
// process restarts to terminate them, so they sit at status='running'
// forever and poison cross-run analytics.
//
// V93: returns the count of runs ended. Should be called once at startup,
// not per-run, with a generous threshold (e.g. 1 hour) to avoid killing
// runs from concurrently-running sibling processes.
func (o *Orchestrator) SweepOrphanedRuns(ctx context.Context, olderThan time.Duration) (int, error) {
	cutoff := store.FmtTime(store.Now().Add(-olderThan))
	var orphanIDs []string
	err := o.St.Tx(ctx, func(q store.Querier) error {
		rows, err := q.Query(
			`SELECT id FROM runs
			   WHERE status='running'
			     AND started_at < ?
			     AND NOT EXISTS (
			       SELECT 1 FROM events e
			        WHERE e.run_id = runs.id AND e.ts > ?
			     )`,
			cutoff, cutoff,
		)
		if err != nil {
			return err
		}
		defer rows.Close()
		for rows.Next() {
			var id string
			if err := rows.Scan(&id); err != nil {
				return err
			}
			orphanIDs = append(orphanIDs, id)
		}
		return rows.Err()
	})
	if err != nil {
		return 0, err
	}
	for _, id := range orphanIDs {
		_ = o.EndRun(ctx, id, RunOutcome{
			Status:          RunKilled,
			FailureCategory: FailStalled,
			FailureDetail:   "orphan run from previous session (no events in " + olderThan.String() + ")",
		})
		_, _ = o.Bus.Emit(ctx, event.Event{
			Kind: "reaper.orphan_run_swept", RunID: id,
			Payload: map[string]any{"older_than": olderThan.String()},
		})
	}
	return len(orphanIDs), nil
}

// ReapStalledTasks finds tasks past their deadline in a non-terminal state
// and transitions them to failed (with kind=task.stalled emitted). Returns the
// count reaped.
//
// Non-terminal = anything except completed/abandoned/failed/escalated.
func (o *Orchestrator) ReapStalledTasks(ctx context.Context) (int, error) {
	nowStr := store.FmtTime(store.Now())
	type stalled struct {
		id    string
		runID string
	}
	var found []stalled
	err := o.St.Tx(ctx, func(q store.Querier) error {
		rows, err := q.Query(
			`SELECT id, run_id FROM tasks
			  WHERE deadline IS NOT NULL AND deadline <= ?
			    AND state NOT IN ('completed','abandoned','failed','escalated')`,
			nowStr,
		)
		if err != nil {
			return err
		}
		defer rows.Close()
		for rows.Next() {
			var s stalled
			if err := rows.Scan(&s.id, &s.runID); err != nil {
				return err
			}
			found = append(found, s)
		}
		return rows.Err()
	})
	if err != nil {
		return 0, err
	}
	for _, s := range found {
		_ = o.Transition(ctx, s.id, StateFailed)
		_, _ = o.Bus.Emit(ctx, event.Event{
			Kind: event.KindTaskStalled, RunID: s.runID, TaskID: s.id,
			Payload: map[string]any{"reason": "deadline_exceeded"},
		})
	}
	return len(found), nil
}

// FailedTaskNotice is one (parent-task owner, child-task id) pair the dead-
// agent reaper produced. Callers can iterate these and enqueue a TypeNack
// envelope so the parent learns its delegate is dead — closing the V45
// broken-handoff hole.
type FailedTaskNotice struct {
	FailedTaskID string
	ParentAgentID string
	DeadAgentID   string
	RunID         string
}

// ReapDeadAgents marks agents whose heartbeat is older than maxAge as terminated
// and reassigns their owned tasks to "failed" so a parent can decide to requeue
// or escalate. Returns the failure notices so the caller can deliver them to
// the affected parent agents (the orchestrator package doesn't own a queue).
func (o *Orchestrator) ReapDeadAgents(ctx context.Context, maxAge time.Duration) (int, []FailedTaskNotice, error) {
	cutoff := store.FmtTime(store.Now().Add(-maxAge))
	type dead struct {
		id    string
		runID string
	}
	var found []dead
	err := o.St.Tx(ctx, func(q store.Querier) error {
		rows, err := q.Query(
			`SELECT id, run_id FROM agents
			  WHERE status IN ('spawning','running') AND heartbeat_at IS NOT NULL AND heartbeat_at < ?`,
			cutoff,
		)
		if err != nil {
			return err
		}
		defer rows.Close()
		for rows.Next() {
			var d dead
			var runID *string
			if err := rows.Scan(&d.id, &runID); err != nil {
				return err
			}
			if runID != nil {
				d.runID = *runID
			}
			found = append(found, d)
		}
		return rows.Err()
	})
	if err != nil {
		return 0, nil, err
	}
	if len(found) == 0 {
		return 0, nil, nil
	}
	// Mark them terminated + fail their tasks.
	nowStr := store.FmtTime(store.Now())
	err = o.St.Tx(ctx, func(q store.Querier) error {
		for _, d := range found {
			if _, err := q.Exec(`UPDATE agents SET status='terminated', terminated_at=? WHERE id=?`, nowStr, d.id); err != nil {
				return err
			}
		}
		return nil
	})
	if err != nil {
		return 0, nil, err
	}
	var notices []FailedTaskNotice
	for _, d := range found {
		_, _ = o.Bus.Emit(ctx, event.Event{
			Kind: event.KindAgentTerminated, RunID: d.runID, AgentID: d.id,
			Payload: map[string]any{"reason": "heartbeat_lost"},
		})
		// Fail tasks owned by this agent. For each, look up the parent
		// task's owner so the caller can notify them (V45).
		type taskRow struct{ id, parentOwner string }
		var failed []taskRow
		rows, err := o.St.DB().QueryContext(ctx,
			`SELECT t.id, IFNULL(p.owner_agent_id,'')
			   FROM tasks t LEFT JOIN tasks p ON p.id = t.parent_task_id
			  WHERE t.owner_agent_id=?
			    AND t.state NOT IN ('completed','failed','abandoned','escalated')`,
			d.id,
		)
		if err != nil {
			continue
		}
		for rows.Next() {
			var tr taskRow
			if err := rows.Scan(&tr.id, &tr.parentOwner); err == nil {
				failed = append(failed, tr)
			}
		}
		rows.Close()
		// V58: if the failed task has no parent (root task), fall back
		// to the master agent for this run so SOMETHING gets the
		// failure notice. With the current flat-tree data (V57), every
		// task is rootless and would otherwise have parentOwner="".
		masterAgent := ""
		_ = o.St.DB().QueryRowContext(ctx,
			`SELECT id FROM agents WHERE run_id=? AND role='master' LIMIT 1`,
			d.runID,
		).Scan(&masterAgent)
		for _, tr := range failed {
			_ = o.Transition(ctx, tr.id, StateFailed)
			parent := tr.parentOwner
			if parent == "" {
				parent = masterAgent
			}
			if parent != "" && parent != d.id {
				notices = append(notices, FailedTaskNotice{
					FailedTaskID: tr.id, ParentAgentID: parent,
					DeadAgentID: d.id, RunID: d.runID,
				})
			}
		}
	}
	return len(found), notices, nil
}

// EnforceCostCeiling kills any run whose tokens_total or cost_usd_total exceeds
// its org policy ceiling. For Phase A we don't load org policies; the caller
// passes ceilings. Returns the count of runs killed.
// EnforceCostCeiling kills any matching run whose tokens or cost has exceeded
// the policy ceiling. If runIDFilter is non-empty, scopes to that one run;
// otherwise considers all running runs (e.g. an admin-level sweep).
//
// V41: scoping matters because each runlive invocation runs its own
// reaperLoop. Without the filter, two concurrent runs each enforce ceilings
// on the other — wasteful and confusing in the event log.
//
// V42: dangling task transitions are now handled by EndRun (V18), so this
// function no longer re-iterates tasks itself.
func (o *Orchestrator) EnforceCostCeiling(ctx context.Context, runIDFilter string, maxTokens int64, maxCostUSD float64) (int, error) {
	type rkill struct {
		id     string
		reason string
	}
	var kills []rkill
	err := o.St.Tx(ctx, func(q store.Querier) error {
		var rows *sql.Rows
		var err error
		if runIDFilter != "" {
			rows, err = q.Query(
				`SELECT id, tokens_total, cost_usd_total FROM runs WHERE status='running' AND id=?`,
				runIDFilter,
			)
		} else {
			rows, err = q.Query(
				`SELECT id, tokens_total, cost_usd_total FROM runs WHERE status='running'`,
			)
		}
		if err != nil {
			return err
		}
		defer rows.Close()
		for rows.Next() {
			var id string
			var tokens int64
			var cost float64
			if err := rows.Scan(&id, &tokens, &cost); err != nil {
				return err
			}
			if maxTokens > 0 && tokens > maxTokens {
				kills = append(kills, rkill{id, "tokens_over_ceiling"})
			} else if maxCostUSD > 0 && cost > maxCostUSD {
				kills = append(kills, rkill{id, "cost_usd_over_ceiling"})
			}
		}
		return rows.Err()
	})
	if err != nil {
		return 0, err
	}
	for _, k := range kills {
		_, _ = o.Bus.Emit(ctx, event.Event{
			Kind: event.KindCostCeilingHit, RunID: k.id,
			Payload: map[string]any{"reason": k.reason},
		})
		// EndRun transitions every dangling task to failed atomically
		// (V18). No need to re-iterate here.
		_ = o.EndRun(ctx, k.id, RunOutcome{
			Status: RunKilled, FailureCategory: FailCostCeiling, FailureDetail: k.reason,
		})
	}
	return len(kills), nil
}

// _ keeps strings import alive in case migrate changes formatting.
var _ = strings.TrimSpace
