diff --git a/util/eventbus/client.go b/util/eventbus/client.go index 0168acdd4..e8edf4395 100644 --- a/util/eventbus/client.go +++ b/util/eventbus/client.go @@ -146,7 +146,11 @@ func Subscribe[T any](c *Client) *Subscriber[T] { r := c.subscribeStateLocked() s := newSubscriber[T](r, logfForCaller(c.logger())) - r.addSubscriber(s) + // Register the non-generic core with the bus rather than the + // typed facade, mirroring SubscribeFunc and Publish: this + // keeps the bus's outputs map and subscriber-interface itab + // out of per-T cost. + r.addSubscriber(s.core) return s } diff --git a/util/eventbus/subscribe.go b/util/eventbus/subscribe.go index 45ac1403f..2ad4e97d3 100644 --- a/util/eventbus/subscribe.go +++ b/util/eventbus/subscribe.go @@ -183,50 +183,63 @@ func (s *subscribeState) closed() <-chan struct{} { // A Subscriber delivers one type of event from a [Client]. // Events are sent to the [Subscriber.Events] channel. +// +// Implementation note: like SubscriberFunc[T], Subscriber[T] is a +// thin user-facing facade. The package-private subscriber-interface +// implementation (Close, subscribeType, dispatch, slow timer, +// unregister) lives on the non-generic *subscriberCore. The only +// per-T state owned by the facade itself is the typed delivery +// channel; everything else is shared with SubscriberFunc[T]. type Subscriber[T any] struct { - stop stopFlag - read chan T - unregister func() - logf logger.Logf - slow *time.Timer // used to detect slow subscriber service + core *subscriberCore + read chan T } func newSubscriber[T any](r *subscribeState, logf logger.Logf) *Subscriber[T] { - slow := time.NewTimer(0) - slow.Stop() // reset in dispatch - return &Subscriber[T]{ - read: make(chan T), - unregister: func() { r.deleteSubscriber(reflect.TypeFor[T]()) }, - logf: logf, - slow: slow, + core := newSubscriberCore(r, logf, reflect.TypeFor[T]()) + s := &Subscriber[T]{ + core: core, + read: make(chan T), } -} - -func newMonitor[T any](attach func(fn func(T)) (cancel func())) *Subscriber[T] { - ret := &Subscriber[T]{ - read: make(chan T, 100), // arbitrary, large + // Subscriber[T].dispatch can't share the non-generic dispatch + // loop the way SubscriberFunc does because its delivery is a + // typed channel send (case s.read <- t:) and Go does not allow + // a typed channel-send case inside a non-generic select. Going + // through a goroutine to bridge typed and untyped would make + // every event delivery pay a goroutine-allocation cost, which + // was measured at ~2.7x throughput on BenchmarkBasicThroughput. + // Instead, we keep the dispatch loop generic for Subscriber[T] + // and accept the per-shape stencil cost (~380 bytes per shape). + // The non-generic helper subscribeDispatch implements the + // shared cases; only the typed select is in the generic + // wrapper. + core.dispatchFn = func( + ctx context.Context, + vals *queue[DeliveredEvent], + acceptCh func() chan DeliveredEvent, + snapshot chan chan []DeliveredEvent, + ) bool { + return s.dispatchTyped(ctx, vals, acceptCh, snapshot) } - ret.unregister = attach(ret.monitor) - return ret + return s } -func (s *Subscriber[T]) subscribeType() reflect.Type { - return reflect.TypeFor[T]() -} - -func (s *Subscriber[T]) monitor(debugEvent T) { - select { - case s.read <- debugEvent: - case <-s.stop.Done(): - } -} - -func (s *Subscriber[T]) dispatch(ctx context.Context, vals *queue[DeliveredEvent], acceptCh func() chan DeliveredEvent, snapshot chan chan []DeliveredEvent) bool { +// dispatchTyped is the per-T dispatch loop for Subscriber[T]. It +// has to remain generic because the typed channel send +// `case s.read <- t:` must appear lexically inside the select. The +// rest of the cases match the non-generic dispatchFunc body to +// keep behavior aligned between Subscriber and SubscriberFunc. +func (s *Subscriber[T]) dispatchTyped( + ctx context.Context, + vals *queue[DeliveredEvent], + acceptCh func() chan DeliveredEvent, + snapshot chan chan []DeliveredEvent, +) bool { t := vals.Peek().Event.(T) start := time.Now() - s.slow.Reset(slowSubscriberTimeout) - defer s.slow.Stop() + s.core.slow.Reset(slowSubscriberTimeout) + defer s.core.slow.Stop() for { // Keep the cases in this select in sync with subscribeState.pump @@ -242,13 +255,34 @@ func (s *Subscriber[T]) dispatch(ctx context.Context, vals *queue[DeliveredEvent return false case ch := <-snapshot: ch <- vals.Snapshot() - case <-s.slow.C: - s.logf("subscriber for %T is slow (%v elapsed)", t, time.Since(start)) - s.slow.Reset(slowSubscriberTimeout) + case <-s.core.slow.C: + s.core.logf("subscriber for %s is slow (%v elapsed)", s.core.typeName, time.Since(start)) + s.core.slow.Reset(slowSubscriberTimeout) } } } +func newMonitor[T any](attach func(fn func(T)) (cancel func())) *Subscriber[T] { + s := &Subscriber[T]{ + // Monitors don't go through the bus's dispatch path (they + // are attached directly to the debug hook), so they don't + // need a subscriberCore — only the typed delivery channel + // and an unregister callback. We give them a tiny placeholder + // core so Close() and other facade methods work uniformly. + core: &subscriberCore{}, + read: make(chan T, 100), // arbitrary, large + } + s.core.unregister = attach(s.monitor) + return s +} + +func (s *Subscriber[T]) monitor(debugEvent T) { + select { + case s.read <- debugEvent: + case <-s.core.stop.Done(): + } +} + // Events returns a channel on which the subscriber's events are // delivered. func (s *Subscriber[T]) Events() <-chan T { @@ -258,7 +292,7 @@ func (s *Subscriber[T]) Events() <-chan T { // Done returns a channel that is closed when the subscriber is // closed. func (s *Subscriber[T]) Done() <-chan struct{} { - return s.stop.Done() + return s.core.stop.Done() } // Close closes the Subscriber, indicating the caller no longer wishes @@ -268,30 +302,30 @@ func (s *Subscriber[T]) Done() <-chan struct{} { // If the Bus from which the Subscriber was created is closed, // the Subscriber is implicitly closed and does not need to be closed // separately. -func (s *Subscriber[T]) Close() { - s.stop.Stop() // unblock receivers - s.unregister() -} +func (s *Subscriber[T]) Close() { s.core.Close() } // A SubscriberFunc delivers one type of event from a [Client]. // Events are forwarded synchronously to a function provided at construction. // // Implementation note: SubscriberFunc[T] is a thin facade over a -// non-generic *subscriberFuncCore. All of the behavior — the +// non-generic *subscriberCore. All of the behavior — the // subscriber-interface implementation (Close, subscribeType, // dispatch), the slow-subscriber timer, the type assertion, and // the user callback invocation — lives on the core and is not // instantiated per T. The only per-T cost is the small forwarding // Close method below. type SubscriberFunc[T any] struct { - core *subscriberFuncCore + core *subscriberCore } -// subscriberFuncCore is the non-generic implementation of a -// SubscriberFunc. It implements the package-private subscriber -// interface so that the bus (and the subscribeState map) can store -// it without per-T itabs or dictionaries. -type subscriberFuncCore struct { +// subscriberCore is the non-generic backing for both Subscriber[T] +// and SubscriberFunc[T]. It implements the package-private +// subscriber interface so that the bus (and the subscribeState map) +// can store it without per-T itabs or dictionaries. The per-T +// behavior (type assertion plus either typed channel send or user +// callback invocation) is encapsulated in the dispatchFn closure +// set up by the constructor of the typed facade. +type subscriberCore struct { stop stopFlag unregister func() logf logger.Logf @@ -309,12 +343,12 @@ type subscriberFuncCore struct { typeName string // dispatchFn is the per-T dispatch closure. It performs the - // type assertion vals.Peek().Event.(T) and runs the user - // callback on the unboxed value. The closure body is - // non-generic; its only per-T contribution is the type - // assertion and the call through s.read(T), which sit inside - // a single small captured closure rather than across a full - // select-loop stencil per T. + // type assertion vals.Peek().Event.(T) and runs the typed + // delivery (either a user-callback invocation for + // SubscriberFunc[T] or a typed channel send for Subscriber[T]). + // The closure body is non-generic apart from those two T-bound + // operations; the bulk of the dispatch work happens in the + // non-generic dispatchFunc/dispatchSub helpers below. dispatchFn func( ctx context.Context, vals *queue[DeliveredEvent], @@ -324,13 +358,13 @@ type subscriberFuncCore struct { } func newSubscriberFunc[T any](r *subscribeState, f func(T), logf logger.Logf) *SubscriberFunc[T] { - core := newSubscriberFuncCore(r, logf, reflect.TypeFor[T]()) + core := newSubscriberCore(r, logf, reflect.TypeFor[T]()) // The dispatch closure is the only piece that intrinsically // needs T: it performs the type assertion on the head queue // value and forwards the unboxed value to the user callback. // All non-generic setup (timer, core allocation, unregister - // closure) lives in newSubscriberFuncCore so it isn't - // duplicated per T. + // closure) lives in newSubscriberCore so it isn't duplicated + // per T. core.dispatchFn = func( ctx context.Context, vals *queue[DeliveredEvent], @@ -343,22 +377,21 @@ func newSubscriberFunc[T any](r *subscribeState, f func(T), logf logger.Logf) *S return &SubscriberFunc[T]{core: core} } -// newSubscriberFuncCore performs the non-generic portion of -// subscriber construction: timer setup, core struct allocation, -// and creation of the unregister closure that captures only the -// (non-generic) reflect.Type and *subscribeState. The caller fills -// in the per-T dispatchFn afterward. +// newSubscriberCore performs the non-generic portion of subscriber +// construction: timer setup, core struct allocation, and creation +// of the unregister closure that captures only the (non-generic) +// reflect.Type and *subscribeState. The caller fills in the per-T +// dispatchFn afterward. // -// Hoisting this out of newSubscriberFunc[T] eliminates the bulk of -// the constructor body's per-T stencil cost; the only T-typed -// instructions left in the generic constructor are the -// reflect.TypeFor[T]() call (whose body is shared via the -// internal/abi.TypeFor[T] dictionary) and the construction of the -// dispatch closure itself. -func newSubscriberFuncCore(r *subscribeState, logf logger.Logf, typ reflect.Type) *subscriberFuncCore { +// Hoisting this out of the typed constructors eliminates the bulk +// of their per-T stencil cost; the only T-typed instructions left +// in each generic constructor are the reflect.TypeFor[T]() call +// (whose body is shared via the internal/abi.TypeFor[T] dictionary) +// and the construction of the dispatch closure itself. +func newSubscriberCore(r *subscribeState, logf logger.Logf, typ reflect.Type) *subscriberCore { slow := time.NewTimer(0) slow.Stop() // reset in dispatch - core := &subscriberFuncCore{ + core := &subscriberCore{ logf: logf, slow: slow, typ: typ, @@ -377,18 +410,18 @@ func newSubscriberFuncCore(r *subscribeState, logf logger.Logf, typ reflect.Type func (s *SubscriberFunc[T]) Close() { s.core.Close() } // Close implements the subscriber interface and the user-facing -// (*SubscriberFunc[T]).Close. -func (c *subscriberFuncCore) Close() { +// Close on both Subscriber[T] and SubscriberFunc[T]. +func (c *subscriberCore) Close() { c.stop.Stop() c.unregister() } // subscribeType implements the subscriber interface. -func (c *subscriberFuncCore) subscribeType() reflect.Type { return c.typ } +func (c *subscriberCore) subscribeType() reflect.Type { return c.typ } // dispatch implements the subscriber interface by invoking the // per-T dispatch closure that was captured at construction time. -func (c *subscriberFuncCore) dispatch( +func (c *subscriberCore) dispatch( ctx context.Context, vals *queue[DeliveredEvent], acceptCh func() chan DeliveredEvent, @@ -408,7 +441,7 @@ func (c *subscriberFuncCore) dispatch( // callback synchronously on the unboxed value. func dispatchFunc( ctx context.Context, - core *subscriberFuncCore, + core *subscriberCore, vals *queue[DeliveredEvent], acceptCh func() chan DeliveredEvent, snapshot chan chan []DeliveredEvent,