// Package event is the append-only event log for the harness. Every state
// transition (task created, message delivered, draft approved, run killed,
// policy violation, …) writes one row. The events table is the authoritative
// trail.
//
// Taxonomy is extensible at runtime: callers Register new Kinds with a
// Classification (mostly for dashboard grouping) before emitting them. Unknown
// kinds are still persisted but logged as "unclassified" so the operator
// notices and registers them.
package event

import (
	"context"
	"encoding/json"
	"fmt"
	"sync"
	"sync/atomic"

	"github.com/flothus/tmux-xterm-research/server-go/internal/harness/store"
)

// Kind is a string identifier like "task.created", "message.delivered",
// "policy.violation". Dotted prefix indicates the module domain.
type Kind string

// Classification is the coarse grouping the dashboard uses (e.g. "lifecycle",
// "transport", "policy", "evaluation"). Registered at startup or by modules.
type Classification string

const (
	ClassUnclassified Classification = "unclassified"
)

// Event is the row shape. The events table assigns the integer id; we keep
// the Go struct without an ID until persisted (Emit returns the id).
type Event struct {
	RunID          string         `json:"run_id,omitempty"`
	AgentID        string         `json:"agent_id,omitempty"`
	TaskID         string         `json:"task_id,omitempty"`
	Kind           Kind           `json:"kind"`
	Classification Classification `json:"classification,omitempty"`
	Payload        map[string]any `json:"payload,omitempty"`
}

// Bus is the runtime event hub: it owns the registered taxonomy, writes events
// to the store, and fans out to in-process subscribers (the dashboard, the
// trace bridge, tests). Subscribers see events synchronously in emit order.
type Bus struct {
	st *store.Store

	mu      sync.RWMutex
	taxon   map[Kind]Classification
	subs    map[int]chan Event
	drops   map[int]*int64 // V104: per-subscriber drop counters
	nextSub int
}

// NewBus returns a Bus bound to the given store. The store must be open.
func NewBus(st *store.Store) *Bus {
	return &Bus{
		st:    st,
		taxon: defaultTaxonomy(),
		subs:  map[int]chan Event{},
	}
}

// Register associates a Kind with a Classification. Idempotent; later calls
// overwrite. Modules call this at package init or startup.
func (b *Bus) Register(k Kind, c Classification) {
	b.mu.Lock()
	b.taxon[k] = c
	b.mu.Unlock()
}

// Classify returns the registered classification for k, or ClassUnclassified.
func (b *Bus) Classify(k Kind) Classification {
	b.mu.RLock()
	defer b.mu.RUnlock()
	if c, ok := b.taxon[k]; ok {
		return c
	}
	return ClassUnclassified
}

// Emit persists e and fans it out to subscribers. Returns the assigned id.
func (b *Bus) Emit(ctx context.Context, e Event) (int64, error) {
	// V111: empty Kind is a caller bug — the event would be unfindable
	// by kind. Better to fail loud than silently insert.
	if e.Kind == "" {
		return 0, fmt.Errorf("event: empty Kind")
	}
	c := b.Classify(e.Kind)
	if e.Classification == "" {
		e.Classification = c
	}
	payload := e.Payload
	if payload == nil {
		payload = map[string]any{}
	}
	pj, err := json.Marshal(payload)
	if err != nil {
		return 0, fmt.Errorf("event: marshal payload: %w", err)
	}

	var id int64
	err = b.st.Tx(ctx, func(q store.Querier) error {
		var err error
		id, err = insertEvent(q, e, pj)
		return err
	})
	if err != nil {
		return 0, err
	}

	b.fan(e)
	return id, nil
}

// EmitInTx writes the event row inside the caller's existing transaction so
// the event log is atomic with the underlying state change. The fan-out to
// in-process subscribers (Fan) is the caller's responsibility AFTER tx
// commit — we never fan inside a tx (subscribers may read DB rows the tx
// hasn't yet committed, which would observe phantom state).
//
// Use this for transitions where "the state changed AND the event row
// exists" must be all-or-nothing — e.g. orchestrator.Transition, where a
// post-commit Emit crash leaves the DB in a new state with no event row,
// violating the "every transition writes one event row" invariant.
func (b *Bus) EmitInTx(q store.Querier, e Event) (int64, error) {
	// V111: same guard as Emit — empty Kind is a caller bug.
	if e.Kind == "" {
		return 0, fmt.Errorf("event: empty Kind")
	}
	c := b.Classify(e.Kind)
	if e.Classification == "" {
		e.Classification = c
	}
	payload := e.Payload
	if payload == nil {
		payload = map[string]any{}
	}
	pj, err := json.Marshal(payload)
	if err != nil {
		return 0, fmt.Errorf("event: marshal payload: %w", err)
	}
	return insertEvent(q, e, pj)
}

// Fan delivers e to in-process subscribers. Public so callers of EmitInTx
// can complete the emit semantics after their tx commits.
func (b *Bus) Fan(e Event) { b.fan(e) }

// insertEvent is the shared SQL writer used by both Emit and EmitInTx.
func insertEvent(q store.Querier, e Event, pj []byte) (int64, error) {
	res, err := q.Exec(
		`INSERT INTO events(run_id, agent_id, task_id, kind, classification, payload_json, ts)
		 VALUES(?, ?, ?, ?, ?, ?, ?)`,
		nullable(e.RunID), nullable(e.AgentID), nullable(e.TaskID),
		string(e.Kind), string(e.Classification), string(pj),
		store.FmtTime(store.Now()),
	)
	if err != nil {
		return 0, err
	}
	return res.LastInsertId()
}

// Subscribe returns a channel that receives every subsequent event and a
// cancel func. The channel is buffered; slow consumers drop events (V104:
// observable via SubscriberDrops(id)).
func (b *Bus) Subscribe(buf int) (<-chan Event, func()) {
	if buf <= 0 {
		buf = 256
	}
	ch := make(chan Event, buf)
	b.mu.Lock()
	id := b.nextSub
	b.nextSub++
	b.subs[id] = ch
	if b.drops == nil {
		b.drops = map[int]*int64{}
	}
	var c int64
	b.drops[id] = &c
	b.mu.Unlock()
	cancel := func() {
		b.mu.Lock()
		if _, ok := b.subs[id]; ok {
			delete(b.subs, id)
			delete(b.drops, id)
			close(ch)
		}
		b.mu.Unlock()
	}
	return ch, cancel
}

// SubscriberDropCount returns the total number of events dropped across all
// in-process subscribers since process start. Use to detect "are dashboards
// missing events under load." For a per-subscriber breakdown call from a
// test setup that retains the subscriber id.
func (b *Bus) SubscriberDropCount() int64 {
	b.mu.RLock()
	defer b.mu.RUnlock()
	var total int64
	for _, c := range b.drops {
		if c != nil {
			total += atomic.LoadInt64(c)
		}
	}
	return total
}

func (b *Bus) fan(e Event) {
	b.mu.RLock()
	defer b.mu.RUnlock()
	for id, ch := range b.subs {
		select {
		case ch <- e:
		default:
			// Slow consumer; drop. V104: bump the per-subscriber drop
			// counter so the loss is observable. We don't recurse into
			// Emit to log it (would deadlock under back-pressure).
			if c := b.drops[id]; c != nil {
				atomic.AddInt64(c, 1)
			}
		}
	}
}

// defaultTaxonomy registers the kinds the core modules emit. Modules add their
// own via Register at startup.
func defaultTaxonomy() map[Kind]Classification {
	return map[Kind]Classification{
		KindRunStarted:            "lifecycle",
		KindRunEnded:              "lifecycle",
		KindRunKilled:             "lifecycle",
		KindAgentSpawned:          "lifecycle",
		KindAgentTerminated:       "lifecycle",
		KindAgentHeartbeat:        "lifecycle",
		// Spawn-pipeline visibility (Phase K diagnostic events). Without these
		// a stuck spawn used to look like "run just stops at runtime_selected".
		"agent.spawn.routing":         "lifecycle",
		"agent.spawn.calling_runtime": "lifecycle",
		"agent.spawn.failed":          "lifecycle",
		"agent.role_loaded":           "lifecycle",
		"agent.role_template_missing": "lifecycle",
		"run.spawn_failed":            "lifecycle",
		"runtime.health_check":        "lifecycle",
		// Per-step agent activity (Phase observability extension).
		"agent.handle_one.started": "agent",
		"agent.handle_one.acked":   "agent",
		"agent.llm.call_started":   "agent",
		"agent.llm.call_ended":     "agent",
		"agent.llm.call_failed":    "agent",
		"agent.tool.called":        "agent",
		"agent.tool.failed":        "agent",
		"run.stalled":              "policy",
		KindTaskCreated:           "task",
		KindTaskAssigned:          "task",
		KindTaskStateChanged:      "task",
		KindTaskCompleted:         "task",
		KindTaskFailed:            "task",
		KindTaskAbandoned:         "task",
		KindTaskStalled:           "task",
		KindMessageEnqueued:       "transport",
		KindMessageDelivered:      "transport",
		KindMessageAcked:          "transport",
		KindMessageNacked:         "transport",
		KindMessageRequeued:       "transport",
		KindMessageFailed:         "transport",
		KindPolicyViolation:       "policy",
		KindCoordinationLoop:      "policy",
		KindCoordinationThrash:    "policy",
		KindCostCeilingHit:        "policy",
		KindInterruptUnacked:      "policy",
		KindEvaluationRecorded:    "evaluation",
		KindParseFailure:          "transport",
	}
}

// Known kinds. New kinds can be defined inline by modules that need them.
const (
	KindRunStarted         Kind = "run.started"
	KindRunEnded           Kind = "run.ended"
	KindRunKilled          Kind = "run.killed"
	KindAgentSpawned       Kind = "agent.spawned"
	KindAgentTerminated    Kind = "agent.terminated"
	KindAgentHeartbeat     Kind = "agent.heartbeat"
	KindTaskCreated        Kind = "task.created"
	KindTaskAssigned       Kind = "task.assigned"
	KindTaskStateChanged   Kind = "task.state_changed"
	KindTaskCompleted      Kind = "task.completed"
	KindTaskFailed         Kind = "task.failed"
	KindTaskAbandoned      Kind = "task.abandoned"
	KindTaskStalled        Kind = "task.stalled"
	KindMessageEnqueued    Kind = "message.enqueued"
	KindMessageDelivered   Kind = "message.delivered"
	KindMessageAcked       Kind = "message.acked"
	KindMessageNacked      Kind = "message.nacked"
	KindMessageRequeued    Kind = "message.requeued"
	KindMessageFailed      Kind = "message.failed"
	KindPolicyViolation    Kind = "policy.violation"
	KindCoordinationLoop   Kind = "coordination.loop"
	KindCoordinationThrash Kind = "coordination.thrash"
	KindCostCeilingHit     Kind = "policy.cost_ceiling"
	KindInterruptUnacked   Kind = "interrupt.unacknowledged"
	KindEvaluationRecorded Kind = "evaluation.recorded"
	KindParseFailure       Kind = "sidecar.parse_failed"
)

func nullable(s string) any {
	if s == "" {
		return nil
	}
	return s
}
