package testutil

import (
	"context"
	"math/rand"
	"path/filepath"
	"strconv"
	"sync"
	"sync/atomic"
	"testing"
	"time"

	"github.com/flothus/tmux-xterm-research/server-go/internal/harness/envelope"
	"github.com/flothus/tmux-xterm-research/server-go/internal/harness/event"
	"github.com/flothus/tmux-xterm-research/server-go/internal/harness/orchestrator"
	"github.com/flothus/tmux-xterm-research/server-go/internal/harness/store"
	"github.com/flothus/tmux-xterm-research/server-go/internal/harness/transport"
)

// TestChaos50Agents1000Messages is the Phase A bar from the plan §17:
// 50 nullagents, 1000 messages, random kills → zero orphaned tasks
// produced deterministically.
//
// Setup:
//   - 50 nullagents subscribed to inboxes "a0".."a49"
//   - 1000 tasks created, each delegated to a random agent
//   - Agents reply with Report; replies are consumed by a "master" agent
//   - During execution, a chaos goroutine randomly kills (cancels) some agents
//     and respawns them, simulating runtime crashes
//   - Reaper sweeps reclaim visibility-expired messages
//
// Success criteria:
//   - All 1000 delegate messages get delivered+acked (no message lost)
//   - All 1000 reports arrive at the master (no orphan reply)
//   - Tasks all transition to completed (no stalled tasks)
//   - All messages end in status='acked' (no leftover queued/delivered)
func TestChaos50Agents1000Messages(t *testing.T) {
	tmp := t.TempDir()
	st, err := store.Open(filepath.Join(tmp, "harness.db"))
	if err != nil {
		t.Fatal(err)
	}
	defer st.Close()
	bus := event.NewBus(st)
	q := transport.New(st, bus)
	q.VisibilityTimeout = 200 * time.Millisecond // short so reclaim is exercised
	q.MaxAttempts = 10                            // allow many re-deliveries through chaos

	if testing.Short() {
		t.Skip("chaos test is a stress test; skipped under -short")
	}
	orch := orchestrator.New(st, bus)
	// 10 minutes accommodates race-detector overhead. Without -race this
	// test finishes in ~7s.
	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
	defer cancel()

	const (
		Agents   = 50
		Messages = 1000
	)
	rng := rand.New(rand.NewSource(7))

	if err := orch.CreateRun(ctx, "run-chaos", "stress"); err != nil {
		t.Fatal(err)
	}
	// Insert agent rows (FK target for tasks).
	for i := 0; i < Agents; i++ {
		_, _ = st.DB().Exec(`INSERT INTO agents(id, run_id, status, spawned_at) VALUES(?, 'run-chaos', 'running', ?)`,
			"a"+strconv.Itoa(i), store.FmtTime(time.Now().UTC()))
	}
	_, _ = st.DB().Exec(`INSERT INTO agents(id, run_id, status, spawned_at) VALUES('master', 'run-chaos', 'running', ?)`,
		store.FmtTime(time.Now().UTC()))

	// Master agent: receives reports and tracks per-task completion. Duplicate
	// reports are expected (reclaimed deliveries → multiple workers may complete
	// the same task). Master is idempotent on completion: it transitions once
	// per unique task_id.
	masterDone := make(chan struct{})
	completedTasks := make(map[string]bool)
	var completedMu sync.Mutex
	go func() {
		defer close(masterDone)
		for {
			completedMu.Lock()
			n := len(completedTasks)
			completedMu.Unlock()
			if n >= Messages {
				return
			}
			e, err := q.Receive(ctx, "master")
			if err != nil {
				return
			}
			if e == nil {
				select {
				case <-ctx.Done():
					return
				case <-time.After(5 * time.Millisecond):
					continue
				}
			}
			if e.Type == envelope.TypeReport && e.TaskID != "" {
				completedMu.Lock()
				first := !completedTasks[e.TaskID]
				if first {
					completedTasks[e.TaskID] = true
				}
				completedMu.Unlock()
				if first {
					_ = orch.Transition(ctx, e.TaskID, orchestrator.StateCompleted)
				}
			}
			_ = q.Ack(ctx, e.ID)
		}
	}()

	// Worker definition: replies to delegate with a report.
	workerStep := func(agentID string) Step {
		return func(in *envelope.Envelope) (*envelope.Envelope, bool, bool) {
			if in.Type != envelope.TypeDelegate {
				return nil, true, false
			}
			reply := &envelope.Envelope{
				ID: "r-" + in.ID, RunID: in.RunID,
				From: agentID, To: in.From, Type: envelope.TypeReport,
				TaskID: in.TaskID, InReplyTo: in.ID, TTLMs: 0,
				Payload: envelope.Payload{Intent: "done", Expects: envelope.ExpectsNone},
			}
			return reply, true, false
		}
	}

	// Launch initial agent generation.
	var wg sync.WaitGroup
	type slot struct {
		mu     sync.Mutex
		cancel context.CancelFunc
	}
	slots := make([]*slot, Agents)
	spawn := func(i int) {
		slots[i].mu.Lock()
		defer slots[i].mu.Unlock()
		aCtx, aCancel := context.WithCancel(ctx)
		slots[i].cancel = aCancel
		agent := &NullAgent{
			ID:    "a" + strconv.Itoa(i),
			Queue: q,
			Step:  workerStep("a" + strconv.Itoa(i)),
		}
		wg.Add(1)
		go func() {
			defer wg.Done()
			agent.Run(aCtx)
		}()
	}
	for i := 0; i < Agents; i++ {
		slots[i] = &slot{}
		spawn(i)
	}

	// Create+dispatch 1000 tasks, each as a delegate from master to a random agent.
	var dispatchErrors atomic.Int64
	for i := 0; i < Messages; i++ {
		taskID := "t" + strconv.Itoa(i)
		if err := orch.CreateTask(ctx, orchestrator.Task{
			ID: taskID, RunID: "run-chaos", Title: "synthetic " + taskID,
			Deadline: ptrTime(time.Now().UTC().Add(60 * time.Second)),
		}); err != nil {
			t.Logf("CreateTask %s: %v", taskID, err)
			dispatchErrors.Add(1)
			continue
		}
		target := "a" + strconv.Itoa(i%Agents)
		if err := orch.AssignOwner(ctx, taskID, target); err != nil {
			t.Logf("AssignOwner %s: %v", taskID, err)
			dispatchErrors.Add(1)
		}
		if err := orch.Transition(ctx, taskID, orchestrator.StateInProgress); err != nil {
			t.Logf("Transition %s→in_progress: %v", taskID, err)
			dispatchErrors.Add(1)
		}
		msg := &envelope.Envelope{
			ID:    "d-" + taskID,
			RunID: "run-chaos",
			From:  "master", To: target, Type: envelope.TypeDelegate,
			TaskID: taskID, TTLMs: 0,
			Payload: envelope.Payload{Intent: "synthetic", Expects: envelope.ExpectsReport},
		}
		if err := q.Send(ctx, msg); err != nil {
			t.Fatalf("send delegate: %v", err)
		}
	}
	if n := dispatchErrors.Load(); n > 0 {
		t.Logf("dispatch errors: %d", n)
	}

	// Chaos: every 50ms, kill+respawn a random agent. Run for ~3s.
	chaosDone := make(chan struct{})
	go func() {
		defer close(chaosDone)
		ticker := time.NewTicker(50 * time.Millisecond)
		defer ticker.Stop()
		timeout := time.After(3 * time.Second)
		for {
			select {
			case <-ctx.Done():
				return
			case <-timeout:
				return
			case <-ticker.C:
				i := rng.Intn(Agents)
				slots[i].mu.Lock()
				slots[i].cancel()
				slots[i].mu.Unlock()
				// Wait briefly to let it die, then respawn.
				time.Sleep(10 * time.Millisecond)
				spawn(i)
			}
		}
	}()

	// Reaper: every 100ms, reclaim expired messages and stalled tasks.
	reaperDone := make(chan struct{})
	go func() {
		defer close(reaperDone)
		ticker := time.NewTicker(100 * time.Millisecond)
		defer ticker.Stop()
		for {
			select {
			case <-ctx.Done():
				return
			case <-masterDone:
				return
			case <-ticker.C:
				_, _ = q.ReclaimExpired(ctx)
				_, _ = orch.ReapStalledTasks(ctx)
			}
		}
	}()

	// Wait for master to have completed all tasks.
	select {
	case <-masterDone:
	case <-ctx.Done():
		completedMu.Lock()
		n := len(completedTasks)
		completedMu.Unlock()
		t.Fatalf("chaos test timed out; unique tasks completed: %d/%d", n, Messages)
	}

	// Tear down agents.
	for i := 0; i < Agents; i++ {
		slots[i].mu.Lock()
		slots[i].cancel()
		slots[i].mu.Unlock()
	}
	wg.Wait()
	<-chaosDone
	<-reaperDone

	completedMu.Lock()
	uniqueCompleted := len(completedTasks)
	completedMu.Unlock()
	if uniqueCompleted != Messages {
		t.Fatalf("unique tasks completed = %d, want %d", uniqueCompleted, Messages)
	}

	// Invariants:
	// (1) every task completed.
	var open int
	_ = st.DB().QueryRow(`SELECT COUNT(*) FROM tasks WHERE state NOT IN ('completed','failed','abandoned','escalated')`).Scan(&open)
	if open != 0 {
		rows, _ := st.DB().Query(`SELECT id, state FROM tasks WHERE state NOT IN ('completed','failed','abandoned','escalated') LIMIT 5`)
		for rows.Next() {
			var id, state string
			_ = rows.Scan(&id, &state)
			t.Logf("open task: id=%s state=%s", id, state)
		}
		rows.Close()
		t.Errorf("tasks still open after chaos: %d", open)
	}
	var failed int
	_ = st.DB().QueryRow(`SELECT COUNT(*) FROM tasks WHERE state='failed'`).Scan(&failed)
	if failed != 0 {
		// In our scripted chaos, no agent fails work — failures would mean a
		// reaper killed a task that had no chance to complete. The bar is zero.
		t.Errorf("failed tasks: %d (want 0)", failed)
	}
	// (2) no messages left in flight.
	var stuck int
	_ = st.DB().QueryRow(`SELECT COUNT(*) FROM messages WHERE status IN ('queued','delivered')`).Scan(&stuck)
	if stuck != 0 {
		t.Errorf("messages stuck in queued/delivered: %d", stuck)
	}
}

func ptrTime(t time.Time) *time.Time { return &t }
