package transport

import (
	"context"
	"path/filepath"
	"sync"
	"sync/atomic"
	"testing"
	"time"

	"github.com/flothus/tmux-xterm-research/server-go/internal/harness/envelope"
	"github.com/flothus/tmux-xterm-research/server-go/internal/harness/event"
	"github.com/flothus/tmux-xterm-research/server-go/internal/harness/store"
)

// TestLoadContendedDequeue is the explicit "BEGIN IMMEDIATE under contention"
// stress test the plan calls for (§17 Phase A). It enqueues many messages and
// dequeues from many goroutines simultaneously, ensuring:
//   - no message is delivered twice
//   - no message is lost
//   - throughput is reasonable (the timeout itself is the bar)
//
// If BEGIN IMMEDIATE were misused (e.g. holding the writer lock too long, or a
// real "I read then someone else writes" race), this test would time out or
// hit a deadlock — explicitly the failure mode the critic warned about.
func TestLoadContendedDequeue(t *testing.T) {
	tmp := t.TempDir()
	st, err := store.Open(filepath.Join(tmp, "harness.db"))
	if err != nil {
		t.Fatal(err)
	}
	defer st.Close()
	bus := event.NewBus(st)
	q := New(st, bus)

	const (
		Messages   = 5000
		Producers  = 4
		Consumers  = 16
		Recipients = 4
	)

	// 5 minutes accommodates race-detector overhead (~20× normal). Without
	// -race this test completes in ~3s; with -race the BEGIN IMMEDIATE
	// serialization combined with detector instrumentation can stretch it.
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
	defer cancel()

	// Producers split the work; each enqueues its share.
	var pwg sync.WaitGroup
	for p := 0; p < Producers; p++ {
		pwg.Add(1)
		go func(p int) {
			defer pwg.Done()
			perProducer := Messages / Producers
			for i := 0; i < perProducer; i++ {
				id := "p" + itoa(p) + "-m" + itoa(i)
				to := "r" + itoa(i%Recipients)
				e := &envelope.Envelope{
					ID: id, RunID: "load", From: "src", To: to,
					// TTL=0 = infinite (V72). The load test runs slow
					// under -race; finite TTL would have V54 enforcement
					// filtering late messages out.
					Type: envelope.TypeDelegate, TTLMs: 0,
					Payload: envelope.Payload{Intent: "x", Expects: envelope.ExpectsReport},
				}
				if err := q.Send(ctx, e); err != nil {
					t.Errorf("send %s: %v", id, err)
					return
				}
			}
		}(p)
	}
	pwg.Wait()

	var got int64
	dupCheck := sync.Map{}
	var consumerErr atomic.Int64

	var cwg sync.WaitGroup
	for c := 0; c < Consumers; c++ {
		cwg.Add(1)
		go func(c int) {
			defer cwg.Done()
			target := "r" + itoa(c%Recipients)
			for {
				e, err := q.Receive(ctx, target)
				if err != nil {
					consumerErr.Add(1)
					return
				}
				if e == nil {
					return
				}
				if _, loaded := dupCheck.LoadOrStore(e.ID, c); loaded {
					t.Errorf("duplicate delivery: %s", e.ID)
				}
				atomic.AddInt64(&got, 1)
				if err := q.Ack(ctx, e.ID); err != nil {
					t.Errorf("ack %s: %v", e.ID, err)
				}
			}
		}(c)
	}
	cwg.Wait()

	if consumerErr.Load() > 0 {
		t.Fatalf("consumer errors: %d", consumerErr.Load())
	}
	if got != int64(Messages) {
		t.Fatalf("got %d, want %d", got, Messages)
	}

	// Sanity: no message stuck in queued/delivered.
	var stuck int
	_ = st.DB().QueryRow(`SELECT COUNT(*) FROM messages WHERE status IN ('queued','delivered')`).Scan(&stuck)
	if stuck != 0 {
		t.Errorf("messages still in queue: %d", stuck)
	}
}
