// Package transport is the SQLite-backed message queue. Same envelope, two
// fronts: agents Send and Receive; the orchestrator and runtime use the same
// API. See plan §7.
//
// Invariants:
//   - Send writes a "queued" row and emits message.enqueued.
//   - Receive uses TxImmediate (BEGIN IMMEDIATE) to atomically pick the next
//     visible message for a target and bump next_visible_at out by the
//     visibility timeout. A consumer that crashes between Receive and Ack
//     lets the message become visible again automatically.
//   - Ack marks the row delivered+acked. Nack increments attempts and either
//     requeues with exponential backoff or marks the message failed and emits
//     message.failed; in either case it routes a nack reply upward.
//   - Interrupts have priority=100 so they're dequeued before normal traffic
//     for the same recipient.
//
// We never call bus.Emit from inside a store.Tx — that would nest transactions
// on the same DB, blocking on the connection pool. Each method completes its
// Tx first, then emits.
package transport

import (
	"context"
	"database/sql"
	"encoding/json"
	"errors"
	"fmt"
	"math"
	"time"

	"github.com/flothus/tmux-xterm-research/server-go/internal/harness/envelope"
	"github.com/flothus/tmux-xterm-research/server-go/internal/harness/event"
	"github.com/flothus/tmux-xterm-research/server-go/internal/harness/store"
)

// Queue is the transport handle. One per process.
type Queue struct {
	st  *store.Store
	bus *event.Bus

	VisibilityTimeout time.Duration
	MaxAttempts       int
	BackoffBase       time.Duration
	BackoffMax        time.Duration
}

// New returns a Queue with default policy.
func New(st *store.Store, bus *event.Bus) *Queue {
	return &Queue{
		st:                st,
		bus:               bus,
		VisibilityTimeout: 30 * time.Second,
		MaxAttempts:       3,
		BackoffBase:       250 * time.Millisecond,
		BackoffMax:        30 * time.Second,
	}
}

// Send validates and persists the envelope as a queued message.
func (q *Queue) Send(ctx context.Context, e *envelope.Envelope) error {
	if err := e.Validate(); err != nil {
		return fmt.Errorf("transport: invalid envelope: %w", err)
	}
	if e.Priority == 0 {
		e.Priority = envelope.PriorityFor(e.Type)
	}
	now := store.Now()
	if e.CreatedAt == "" {
		e.CreatedAt = store.FmtTime(now)
	}
	pj, err := json.Marshal(e.Payload)
	if err != nil {
		return err
	}
	err = q.st.Tx(ctx, func(qry store.Querier) error {
		return insertMessage(qry, e, pj, now)
	})
	if err != nil {
		return err
	}
	_, _ = q.bus.Emit(ctx, event.Event{
		Kind: event.KindMessageEnqueued, RunID: e.RunID, AgentID: e.From,
		TaskID: e.TaskID,
		Payload: map[string]any{
			"id": e.ID, "to": e.To, "type": string(e.Type), "priority": e.Priority,
		},
	})
	return nil
}

// SendInTx is the composable variant of Send: the message INSERT runs inside
// the caller's existing transaction so callers can bundle the enqueue with
// other writes atomically (e.g. master.delegate inserting a task + enqueuing
// the delegate envelope in one shot — the "broken handoff" failure mode
// disappears because either both writes commit, or neither does).
//
// Caller is responsible for emitting message.enqueued AFTER its tx commits
// (we never call bus.Emit inside a tx; see package doc).
func (q *Queue) SendInTx(qry store.Querier, e *envelope.Envelope) error {
	if err := e.Validate(); err != nil {
		return fmt.Errorf("transport: invalid envelope: %w", err)
	}
	if e.Priority == 0 {
		e.Priority = envelope.PriorityFor(e.Type)
	}
	now := store.Now()
	if e.CreatedAt == "" {
		e.CreatedAt = store.FmtTime(now)
	}
	pj, err := json.Marshal(e.Payload)
	if err != nil {
		return err
	}
	return insertMessage(qry, e, pj, now)
}

// EmitSent is the caller's hook to fire the message.enqueued event after a
// successful SendInTx commit. Keeps the (Tx writes) / (bus emit) split
// consistent with the package invariant in the file-level doc.
func (q *Queue) EmitSent(ctx context.Context, e *envelope.Envelope) {
	_, _ = q.bus.Emit(ctx, event.Event{
		Kind: event.KindMessageEnqueued, RunID: e.RunID, AgentID: e.From,
		TaskID: e.TaskID,
		Payload: map[string]any{
			"id": e.ID, "to": e.To, "type": string(e.Type), "priority": e.Priority,
		},
	})
}

// insertMessage is the shared SQL writer used by both Send (its own tx) and
// SendInTx (caller's tx). Keeps the INSERT shape in one place so we don't
// drift between the two paths.
func insertMessage(qry store.Querier, e *envelope.Envelope, pj []byte, now time.Time) error {
	_, err := qry.Exec(
		`INSERT INTO messages(
			id, run_id, from_agent, to_agent, type, payload_json, in_reply_to, task_id,
			status, attempts, ttl_ms, priority, next_visible_at, created_at
		) VALUES(?, ?, ?, ?, ?, ?, ?, ?, 'queued', 0, ?, ?, ?, ?)`,
		e.ID, e.RunID, e.From, e.To, string(e.Type), string(pj),
		nullableStr(e.InReplyTo), nullableStr(e.TaskID),
		e.TTLMs, e.Priority, store.FmtTime(now), store.FmtTime(now),
	)
	return err
}

// Receive atomically picks the highest-priority, oldest visible message for
// the recipient and marks it in-flight (next_visible_at += visibilityTimeout).
// Returns (nil, nil) if no message is available — caller may poll.
func (q *Queue) Receive(ctx context.Context, agentID string) (*envelope.Envelope, error) {
	now := store.Now()
	nowStr := store.FmtTime(now)
	var picked *envelope.Envelope

	err := q.st.TxImmediate(ctx, func(qry store.Querier) error {
		// V54: TTL enforcement at dequeue time. Messages where
		// created_at + ttl_ms is in the past are skipped (and a separate
		// sweep below marks them 'failed' so the row doesn't stay around
		// forever). ttl_ms=0 means infinite TTL — kept as escape hatch.
		//
		// julianday() returns a fractional day count; multiplying by
		// 86_400_000 gives milliseconds, preserving sub-second precision
		// that strftime('%s') would have truncated.
		row := qry.QueryRow(
			`SELECT id, run_id, from_agent, to_agent, type, payload_json, in_reply_to,
			        task_id, ttl_ms, priority, created_at, attempts
			   FROM messages
			  WHERE to_agent=? AND status IN ('queued','delivered') AND next_visible_at<=?
			    AND (ttl_ms = 0 OR (julianday(?) - julianday(created_at)) * 86400000.0 < ttl_ms)
			  ORDER BY priority DESC, created_at ASC
			  LIMIT 1`,
			agentID, nowStr, nowStr,
		)
		var (
			id, runID, from, to, typ, pj string
			inReplyTo, taskID            sql.NullString
			ttlMs                        int64
			prio, attempts               int
			createdAt                    string
		)
		err := row.Scan(&id, &runID, &from, &to, &typ, &pj, &inReplyTo, &taskID, &ttlMs, &prio, &createdAt, &attempts)
		if errors.Is(err, sql.ErrNoRows) {
			return nil
		}
		if err != nil {
			return err
		}
		newVis := now.Add(q.VisibilityTimeout)
		_, err = qry.Exec(
			`UPDATE messages SET status='delivered', delivered_at=?, next_visible_at=?, attempts=attempts+1 WHERE id=?`,
			nowStr, store.FmtTime(newVis), id,
		)
		if err != nil {
			return err
		}
		var payload envelope.Payload
		if err := json.Unmarshal([]byte(pj), &payload); err != nil {
			return fmt.Errorf("transport: payload decode: %w", err)
		}
		picked = &envelope.Envelope{
			ID: id, RunID: runID, From: from, To: to, Type: envelope.Type(typ),
			InReplyTo: inReplyTo.String, TaskID: taskID.String,
			TTLMs: ttlMs, Priority: prio, CreatedAt: createdAt, Payload: payload,
		}
		return nil
	})
	if err != nil {
		return nil, err
	}
	if picked != nil {
		_, _ = q.bus.Emit(ctx, event.Event{
			Kind: event.KindMessageDelivered, RunID: picked.RunID, AgentID: agentID,
			TaskID: picked.TaskID,
			Payload: map[string]any{"id": picked.ID, "type": string(picked.Type)},
		})
	}
	return picked, nil
}

// Ack marks the message as successfully processed. Idempotent.
func (q *Queue) Ack(ctx context.Context, msgID string) error {
	nowStr := store.FmtTime(store.Now())
	var changed bool
	err := q.st.Tx(ctx, func(qry store.Querier) error {
		res, err := qry.Exec(`UPDATE messages SET status='acked', acked_at=? WHERE id=? AND status<>'acked'`, nowStr, msgID)
		if err != nil {
			return err
		}
		n, _ := res.RowsAffected()
		changed = n > 0
		return nil
	})
	if err != nil {
		return err
	}
	if changed {
		_, _ = q.bus.Emit(ctx, event.Event{
			Kind:    event.KindMessageAcked,
			Payload: map[string]any{"id": msgID},
		})
	}
	return nil
}

// Nack signals a delivery failure. If attempts < MaxAttempts, requeue with
// backoff; else mark failed AND enqueue a dead-letter TypeNack envelope back
// to the original sender so they can recover (requeue, escalate, or notify
// the user). Dropping the message silently is the "broken handoff chain"
// failure mode the brief explicitly forbids.
func (q *Queue) Nack(ctx context.Context, msgID string, reason string) error {
	now := store.Now()
	type outcome int
	const (
		outRequeued outcome = iota
		outFailed
		outNoop
	)

	var (
		state           outcome
		runID, fromAgt  string
		toAgt, typ      string
		taskID          sql.NullString
		attempts        int
		backoffComputed time.Duration
		deadLetter      *envelope.Envelope
	)

	err := q.st.TxImmediate(ctx, func(qry store.Querier) error {
		err := qry.QueryRow(
			`SELECT attempts, run_id, from_agent, to_agent, type, task_id FROM messages WHERE id=?`, msgID,
		).Scan(&attempts, &runID, &fromAgt, &toAgt, &typ, &taskID)
		if err != nil {
			return err
		}
		if attempts >= q.MaxAttempts {
			if _, err := qry.Exec(`UPDATE messages SET status='failed' WHERE id=?`, msgID); err != nil {
				return err
			}
			state = outFailed
			// Build and enqueue dead-letter notice to sender IN THE SAME TX
			// so either both the failed-mark and the notice land, or
			// neither does. No half-state.
			//
			// V92: skip dead-letter if the original sender is a synthetic
			// name (no real agent inbox). Otherwise queue/reaper messages
			// would dead-letter to themselves and cycle.
			if fromAgt != "" && !isSyntheticAgent(fromAgt) {
				deadLetter = &envelope.Envelope{
					ID:     "dl_" + msgID,
					RunID:  runID,
					From:   "queue",
					To:     fromAgt,
					Type:   envelope.TypeNack,
					TaskID: taskID.String,
					TTLMs:  60000,
					Payload: envelope.Payload{
						Reason: reason,
						Intent: "dead-letter: " + typ + " to " + toAgt + " exhausted retries",
					},
				}
				deadLetter.Priority = envelope.PriorityFor(deadLetter.Type)
				pj, err := json.Marshal(deadLetter.Payload)
				if err != nil {
					return err
				}
				if err := insertMessage(qry, deadLetter, pj, now); err != nil {
					return err
				}
			}
			return nil
		}
		backoffComputed = q.backoffFor(attempts)
		_, err = qry.Exec(
			`UPDATE messages SET status='queued', next_visible_at=?, delivered_at=NULL WHERE id=?`,
			store.FmtTime(now.Add(backoffComputed)), msgID,
		)
		if err != nil {
			return err
		}
		state = outRequeued
		return nil
	})
	if err != nil {
		return err
	}

	switch state {
	case outFailed:
		_, _ = q.bus.Emit(ctx, event.Event{
			Kind:  event.KindMessageFailed,
			RunID: runID,
			Payload: map[string]any{
				"id": msgID, "reason": reason, "type": typ,
				"from": fromAgt, "to": toAgt,
			},
		})
		if deadLetter != nil {
			q.EmitSent(ctx, deadLetter)
			_, _ = q.bus.Emit(ctx, event.Event{
				Kind:  "queue.dead_letter_notified",
				RunID: runID,
				Payload: map[string]any{
					"original_id": msgID,
					"notice_id":   deadLetter.ID,
					"sender":      fromAgt,
					"reason":      reason,
				},
			})
		}
	case outRequeued:
		_, _ = q.bus.Emit(ctx, event.Event{
			Kind:    event.KindMessageRequeued,
			RunID:   runID,
			Payload: map[string]any{"id": msgID, "attempt": attempts, "backoff_ms": backoffComputed.Milliseconds(), "reason": reason},
		})
	}
	return nil
}

// ReclaimExpired finds messages whose visibility lease expired (delivered but
// not acked, next_visible_at <= now). For each:
//   - if attempts >= MaxAttempts → mark failed + enqueue dead-letter notice to
//     the original sender. This closes the crash-loop: a worker that picks
//     up a message and crashes (never calling Nack) used to perpetually
//     re-deliver because nothing checked attempts here.
//   - otherwise → requeue at status='queued' for redelivery.
//
// Returns the count requeued (not the dead-lettered count — those are a
// terminal disposition, not a retry).
func (q *Queue) ReclaimExpired(ctx context.Context) (int, error) {
	now := store.Now()
	nowStr := store.FmtTime(now)
	var reclaimed int
	type dead struct {
		id, runID, fromAgt, toAgt, typ, taskID string
	}
	var deads []dead
	var deadLetters []*envelope.Envelope
	err := q.st.TxImmediate(ctx, func(qry store.Querier) error {
		// First, find candidates and split into requeue vs. dead-letter.
		rows, err := qry.Query(
			`SELECT id, attempts, run_id, from_agent, to_agent, type, IFNULL(task_id,'')
			   FROM messages WHERE status='delivered' AND next_visible_at<=?`, nowStr,
		)
		if err != nil {
			return err
		}
		type cand struct {
			id, runID, fromAgt, toAgt, typ, taskID string
			attempts                                int
		}
		var cands []cand
		for rows.Next() {
			var c cand
			if err := rows.Scan(&c.id, &c.attempts, &c.runID, &c.fromAgt, &c.toAgt, &c.typ, &c.taskID); err != nil {
				rows.Close()
				return err
			}
			cands = append(cands, c)
		}
		rows.Close()

		for _, c := range cands {
			if c.attempts >= q.MaxAttempts {
				// Dead-letter path.
				if _, err := qry.Exec(`UPDATE messages SET status='failed' WHERE id=?`, c.id); err != nil {
					return err
				}
				if c.fromAgt != "" && !isSyntheticAgent(c.fromAgt) {
					dl := &envelope.Envelope{
						ID:     "dl_" + c.id,
						RunID:  c.runID,
						From:   "queue",
						To:     c.fromAgt,
						Type:   envelope.TypeNack,
						TaskID: c.taskID,
						TTLMs:  60000,
						Payload: envelope.Payload{
							Reason: "delivery lease expired " + intToStr(c.attempts) + " times without ack",
							Intent: "dead-letter: " + c.typ + " to " + c.toAgt + " — recipient unresponsive (crash loop?)",
						},
					}
					dl.Priority = envelope.PriorityFor(dl.Type)
					pj, err := json.Marshal(dl.Payload)
					if err != nil {
						return err
					}
					if err := insertMessage(qry, dl, pj, now); err != nil {
						return err
					}
					deadLetters = append(deadLetters, dl)
				}
				deads = append(deads, dead{c.id, c.runID, c.fromAgt, c.toAgt, c.typ, c.taskID})
				continue
			}
			// Requeue path.
			if _, err := qry.Exec(`UPDATE messages SET status='queued' WHERE id=?`, c.id); err != nil {
				return err
			}
			reclaimed++
		}
		return nil
	})
	if err != nil {
		return 0, err
	}
	if reclaimed > 0 {
		_, _ = q.bus.Emit(ctx, event.Event{
			Kind:    event.KindMessageRequeued,
			Payload: map[string]any{"reclaimed_by": "reaper", "count": reclaimed},
		})
	}
	for _, d := range deads {
		_, _ = q.bus.Emit(ctx, event.Event{
			Kind: event.KindMessageFailed, RunID: d.runID,
			Payload: map[string]any{
				"id": d.id, "reason": "reclaim_exhausted", "type": d.typ,
				"from": d.fromAgt, "to": d.toAgt,
			},
		})
	}
	for _, dl := range deadLetters {
		q.EmitSent(ctx, dl)
		_, _ = q.bus.Emit(ctx, event.Event{
			Kind:  "queue.dead_letter_notified",
			RunID: dl.RunID,
			Payload: map[string]any{
				"original_id": dl.ID[3:], // strip "dl_"
				"notice_id":   dl.ID,
				"sender":      dl.To,
				"reason":      "reclaim_exhausted",
			},
		})
	}
	return reclaimed, nil
}

// intToStr is a tiny helper for the dead-letter Reason field (no need to
// pull fmt for one int format).
func intToStr(n int) string {
	if n == 0 {
		return "0"
	}
	neg := n < 0
	if neg {
		n = -n
	}
	var b [20]byte
	i := len(b)
	for n > 0 {
		i--
		b[i] = byte('0' + n%10)
		n /= 10
	}
	if neg {
		i--
		b[i] = '-'
	}
	return string(b[i:])
}

// SweepExpiredTTL marks messages whose TTL has elapsed as failed. Run
// periodically by the reaper loop so the table doesn't accumulate stale
// rows. ttl_ms=0 is treated as infinite.
func (q *Queue) SweepExpiredTTL(ctx context.Context) (int, error) {
	now := store.Now()
	var swept int
	err := q.st.TxImmediate(ctx, func(qry store.Querier) error {
		res, err := qry.Exec(
			`UPDATE messages SET status='failed'
			  WHERE status IN ('queued','delivered')
			    AND ttl_ms > 0
			    AND (julianday(?) - julianday(created_at)) * 86400000.0 >= ttl_ms`,
			store.FmtTime(now),
		)
		if err != nil {
			return err
		}
		n, _ := res.RowsAffected()
		swept = int(n)
		return nil
	})
	if err != nil {
		return 0, err
	}
	if swept > 0 {
		_, _ = q.bus.Emit(ctx, event.Event{
			Kind:    event.KindMessageFailed,
			Payload: map[string]any{"reason": "ttl_expired", "count": swept},
		})
	}
	return swept, nil
}

// Depth returns the count of queued/delivered messages for an agent.
func (q *Queue) Depth(ctx context.Context, agentID string) (int, error) {
	row := q.st.DB().QueryRowContext(ctx,
		`SELECT COUNT(*) FROM messages WHERE to_agent=? AND status IN ('queued','delivered')`,
		agentID,
	)
	var n int
	if err := row.Scan(&n); err != nil {
		return 0, err
	}
	return n, nil
}

func (q *Queue) backoffFor(attempt int) time.Duration {
	if attempt < 1 {
		attempt = 1
	}
	d := q.BackoffBase * time.Duration(math.Pow(2, float64(attempt-1)))
	if d > q.BackoffMax {
		d = q.BackoffMax
	}
	return d
}

// isSyntheticAgent reports whether the given "from" identifier is a
// pseudo-agent name (no inbox in the agents table). Dead-letters addressed
// to these would cycle forever — V92.
func isSyntheticAgent(name string) bool {
	switch name {
	case "queue", "reaper", "watchdog", "system":
		return true
	}
	return false
}

func nullableStr(s string) any {
	if s == "" {
		return nil
	}
	return s
}
