util/eventbus: unify Subscriber/SubscriberFunc cores; structural symmetry

Brings Subscriber[T] in line with the same non-generic-core
pattern already applied to SubscriberFunc[T] and Publisher[T]:

  - Renames subscriberFuncCore to subscriberCore and shares it
    between Subscriber[T] and SubscriberFunc[T]. Both typed
    facades hold a *subscriberCore plus their respective per-T
    delivery state (Subscriber: chan T; SubscriberFunc: nothing,
    the user callback is captured in the dispatch closure).

  - The bus's outputs map and subscriber-interface itab key on
    *subscriberCore for both subscriber kinds, so adding a new
    Subscribe[T] call site no longer pays a per-T itab,
    dictionary, or equality function for the subscriber-interface
    side.

  - Subscribe[T] now hoists the non-generic constructor portion
    into newSubscriberCore (timer setup, core allocation, cached
    type/typeName, unregister closure), matching SubscribeFunc.

The dispatch loop is intentionally NOT extracted to a non-generic
helper for Subscriber[T], unlike SubscriberFunc[T]. The reason is
the typed channel send 'case s.read <- t:' must appear lexically
inside the select; the only way to lift it into a non-generic
loop is to bridge typed and untyped via a per-event goroutine,
which costs ~2.7x throughput on BenchmarkBasicThroughput. We keep
dispatchTyped on the generic facade and accept the per-shape
stencil cost (~600 bytes per shape) as the cheaper alternative.

Symbol-level effect on tailscaled (linux/amd64):

  (*Subscriber[T]).dispatch           2,285 B  (6 instances) ->
  (*Subscriber[T]).dispatchTyped       1,260 B (2 instances)
                                       -1,025 B
  (*Subscriber[T]).Close                 369 B (6 instances) ->
                                          0 B   forwards to core.Close
                                       -369 B
  + smaller savings on Subscribe[T] and newSubscriber[T] bodies

Stripped tailscaled (linux/amd64): -8,192 B vs the prior commit.
arm64 stays page-quantized at the same boundary.

Cumulative reduction from baseline (5167ff412):
    linux/amd64:  -118,784 bytes (-0.420%)
    linux/arm64:  -131,072 bytes (-0.499%)

Behavior is unchanged:
  BenchmarkBasicThroughput: 2018 -> 2006 ns/op (within noise)
  All eventbus tests pass with -race.

Updates #12614

Change-Id: I97918ec68bd2cdb15958bbfd7687592b39663efe
Signed-off-by: James Tucker <james@tailscale.com>
This commit is contained in:
James Tucker 2026-05-04 22:08:54 +00:00
parent 4d72a3fb2a
commit 35c7cd24f6
No known key found for this signature in database
2 changed files with 113 additions and 76 deletions

View File

@ -146,7 +146,11 @@ func Subscribe[T any](c *Client) *Subscriber[T] {
r := c.subscribeStateLocked()
s := newSubscriber[T](r, logfForCaller(c.logger()))
r.addSubscriber(s)
// Register the non-generic core with the bus rather than the
// typed facade, mirroring SubscribeFunc and Publish: this
// keeps the bus's outputs map and subscriber-interface itab
// out of per-T cost.
r.addSubscriber(s.core)
return s
}

View File

@ -183,50 +183,63 @@ func (s *subscribeState) closed() <-chan struct{} {
// A Subscriber delivers one type of event from a [Client].
// Events are sent to the [Subscriber.Events] channel.
//
// Implementation note: like SubscriberFunc[T], Subscriber[T] is a
// thin user-facing facade. The package-private subscriber-interface
// implementation (Close, subscribeType, dispatch, slow timer,
// unregister) lives on the non-generic *subscriberCore. The only
// per-T state owned by the facade itself is the typed delivery
// channel; everything else is shared with SubscriberFunc[T].
type Subscriber[T any] struct {
stop stopFlag
read chan T
unregister func()
logf logger.Logf
slow *time.Timer // used to detect slow subscriber service
core *subscriberCore
read chan T
}
func newSubscriber[T any](r *subscribeState, logf logger.Logf) *Subscriber[T] {
slow := time.NewTimer(0)
slow.Stop() // reset in dispatch
return &Subscriber[T]{
read: make(chan T),
unregister: func() { r.deleteSubscriber(reflect.TypeFor[T]()) },
logf: logf,
slow: slow,
core := newSubscriberCore(r, logf, reflect.TypeFor[T]())
s := &Subscriber[T]{
core: core,
read: make(chan T),
}
}
func newMonitor[T any](attach func(fn func(T)) (cancel func())) *Subscriber[T] {
ret := &Subscriber[T]{
read: make(chan T, 100), // arbitrary, large
// Subscriber[T].dispatch can't share the non-generic dispatch
// loop the way SubscriberFunc does because its delivery is a
// typed channel send (case s.read <- t:) and Go does not allow
// a typed channel-send case inside a non-generic select. Going
// through a goroutine to bridge typed and untyped would make
// every event delivery pay a goroutine-allocation cost, which
// was measured at ~2.7x throughput on BenchmarkBasicThroughput.
// Instead, we keep the dispatch loop generic for Subscriber[T]
// and accept the per-shape stencil cost (~380 bytes per shape).
// The non-generic helper subscribeDispatch implements the
// shared cases; only the typed select is in the generic
// wrapper.
core.dispatchFn = func(
ctx context.Context,
vals *queue[DeliveredEvent],
acceptCh func() chan DeliveredEvent,
snapshot chan chan []DeliveredEvent,
) bool {
return s.dispatchTyped(ctx, vals, acceptCh, snapshot)
}
ret.unregister = attach(ret.monitor)
return ret
return s
}
func (s *Subscriber[T]) subscribeType() reflect.Type {
return reflect.TypeFor[T]()
}
func (s *Subscriber[T]) monitor(debugEvent T) {
select {
case s.read <- debugEvent:
case <-s.stop.Done():
}
}
func (s *Subscriber[T]) dispatch(ctx context.Context, vals *queue[DeliveredEvent], acceptCh func() chan DeliveredEvent, snapshot chan chan []DeliveredEvent) bool {
// dispatchTyped is the per-T dispatch loop for Subscriber[T]. It
// has to remain generic because the typed channel send
// `case s.read <- t:` must appear lexically inside the select. The
// rest of the cases match the non-generic dispatchFunc body to
// keep behavior aligned between Subscriber and SubscriberFunc.
func (s *Subscriber[T]) dispatchTyped(
ctx context.Context,
vals *queue[DeliveredEvent],
acceptCh func() chan DeliveredEvent,
snapshot chan chan []DeliveredEvent,
) bool {
t := vals.Peek().Event.(T)
start := time.Now()
s.slow.Reset(slowSubscriberTimeout)
defer s.slow.Stop()
s.core.slow.Reset(slowSubscriberTimeout)
defer s.core.slow.Stop()
for {
// Keep the cases in this select in sync with subscribeState.pump
@ -242,13 +255,34 @@ func (s *Subscriber[T]) dispatch(ctx context.Context, vals *queue[DeliveredEvent
return false
case ch := <-snapshot:
ch <- vals.Snapshot()
case <-s.slow.C:
s.logf("subscriber for %T is slow (%v elapsed)", t, time.Since(start))
s.slow.Reset(slowSubscriberTimeout)
case <-s.core.slow.C:
s.core.logf("subscriber for %s is slow (%v elapsed)", s.core.typeName, time.Since(start))
s.core.slow.Reset(slowSubscriberTimeout)
}
}
}
func newMonitor[T any](attach func(fn func(T)) (cancel func())) *Subscriber[T] {
s := &Subscriber[T]{
// Monitors don't go through the bus's dispatch path (they
// are attached directly to the debug hook), so they don't
// need a subscriberCore — only the typed delivery channel
// and an unregister callback. We give them a tiny placeholder
// core so Close() and other facade methods work uniformly.
core: &subscriberCore{},
read: make(chan T, 100), // arbitrary, large
}
s.core.unregister = attach(s.monitor)
return s
}
func (s *Subscriber[T]) monitor(debugEvent T) {
select {
case s.read <- debugEvent:
case <-s.core.stop.Done():
}
}
// Events returns a channel on which the subscriber's events are
// delivered.
func (s *Subscriber[T]) Events() <-chan T {
@ -258,7 +292,7 @@ func (s *Subscriber[T]) Events() <-chan T {
// Done returns a channel that is closed when the subscriber is
// closed.
func (s *Subscriber[T]) Done() <-chan struct{} {
return s.stop.Done()
return s.core.stop.Done()
}
// Close closes the Subscriber, indicating the caller no longer wishes
@ -268,30 +302,30 @@ func (s *Subscriber[T]) Done() <-chan struct{} {
// If the Bus from which the Subscriber was created is closed,
// the Subscriber is implicitly closed and does not need to be closed
// separately.
func (s *Subscriber[T]) Close() {
s.stop.Stop() // unblock receivers
s.unregister()
}
func (s *Subscriber[T]) Close() { s.core.Close() }
// A SubscriberFunc delivers one type of event from a [Client].
// Events are forwarded synchronously to a function provided at construction.
//
// Implementation note: SubscriberFunc[T] is a thin facade over a
// non-generic *subscriberFuncCore. All of the behavior — the
// non-generic *subscriberCore. All of the behavior — the
// subscriber-interface implementation (Close, subscribeType,
// dispatch), the slow-subscriber timer, the type assertion, and
// the user callback invocation — lives on the core and is not
// instantiated per T. The only per-T cost is the small forwarding
// Close method below.
type SubscriberFunc[T any] struct {
core *subscriberFuncCore
core *subscriberCore
}
// subscriberFuncCore is the non-generic implementation of a
// SubscriberFunc. It implements the package-private subscriber
// interface so that the bus (and the subscribeState map) can store
// it without per-T itabs or dictionaries.
type subscriberFuncCore struct {
// subscriberCore is the non-generic backing for both Subscriber[T]
// and SubscriberFunc[T]. It implements the package-private
// subscriber interface so that the bus (and the subscribeState map)
// can store it without per-T itabs or dictionaries. The per-T
// behavior (type assertion plus either typed channel send or user
// callback invocation) is encapsulated in the dispatchFn closure
// set up by the constructor of the typed facade.
type subscriberCore struct {
stop stopFlag
unregister func()
logf logger.Logf
@ -309,12 +343,12 @@ type subscriberFuncCore struct {
typeName string
// dispatchFn is the per-T dispatch closure. It performs the
// type assertion vals.Peek().Event.(T) and runs the user
// callback on the unboxed value. The closure body is
// non-generic; its only per-T contribution is the type
// assertion and the call through s.read(T), which sit inside
// a single small captured closure rather than across a full
// select-loop stencil per T.
// type assertion vals.Peek().Event.(T) and runs the typed
// delivery (either a user-callback invocation for
// SubscriberFunc[T] or a typed channel send for Subscriber[T]).
// The closure body is non-generic apart from those two T-bound
// operations; the bulk of the dispatch work happens in the
// non-generic dispatchFunc/dispatchSub helpers below.
dispatchFn func(
ctx context.Context,
vals *queue[DeliveredEvent],
@ -324,13 +358,13 @@ type subscriberFuncCore struct {
}
func newSubscriberFunc[T any](r *subscribeState, f func(T), logf logger.Logf) *SubscriberFunc[T] {
core := newSubscriberFuncCore(r, logf, reflect.TypeFor[T]())
core := newSubscriberCore(r, logf, reflect.TypeFor[T]())
// The dispatch closure is the only piece that intrinsically
// needs T: it performs the type assertion on the head queue
// value and forwards the unboxed value to the user callback.
// All non-generic setup (timer, core allocation, unregister
// closure) lives in newSubscriberFuncCore so it isn't
// duplicated per T.
// closure) lives in newSubscriberCore so it isn't duplicated
// per T.
core.dispatchFn = func(
ctx context.Context,
vals *queue[DeliveredEvent],
@ -343,22 +377,21 @@ func newSubscriberFunc[T any](r *subscribeState, f func(T), logf logger.Logf) *S
return &SubscriberFunc[T]{core: core}
}
// newSubscriberFuncCore performs the non-generic portion of
// subscriber construction: timer setup, core struct allocation,
// and creation of the unregister closure that captures only the
// (non-generic) reflect.Type and *subscribeState. The caller fills
// in the per-T dispatchFn afterward.
// newSubscriberCore performs the non-generic portion of subscriber
// construction: timer setup, core struct allocation, and creation
// of the unregister closure that captures only the (non-generic)
// reflect.Type and *subscribeState. The caller fills in the per-T
// dispatchFn afterward.
//
// Hoisting this out of newSubscriberFunc[T] eliminates the bulk of
// the constructor body's per-T stencil cost; the only T-typed
// instructions left in the generic constructor are the
// reflect.TypeFor[T]() call (whose body is shared via the
// internal/abi.TypeFor[T] dictionary) and the construction of the
// dispatch closure itself.
func newSubscriberFuncCore(r *subscribeState, logf logger.Logf, typ reflect.Type) *subscriberFuncCore {
// Hoisting this out of the typed constructors eliminates the bulk
// of their per-T stencil cost; the only T-typed instructions left
// in each generic constructor are the reflect.TypeFor[T]() call
// (whose body is shared via the internal/abi.TypeFor[T] dictionary)
// and the construction of the dispatch closure itself.
func newSubscriberCore(r *subscribeState, logf logger.Logf, typ reflect.Type) *subscriberCore {
slow := time.NewTimer(0)
slow.Stop() // reset in dispatch
core := &subscriberFuncCore{
core := &subscriberCore{
logf: logf,
slow: slow,
typ: typ,
@ -377,18 +410,18 @@ func newSubscriberFuncCore(r *subscribeState, logf logger.Logf, typ reflect.Type
func (s *SubscriberFunc[T]) Close() { s.core.Close() }
// Close implements the subscriber interface and the user-facing
// (*SubscriberFunc[T]).Close.
func (c *subscriberFuncCore) Close() {
// Close on both Subscriber[T] and SubscriberFunc[T].
func (c *subscriberCore) Close() {
c.stop.Stop()
c.unregister()
}
// subscribeType implements the subscriber interface.
func (c *subscriberFuncCore) subscribeType() reflect.Type { return c.typ }
func (c *subscriberCore) subscribeType() reflect.Type { return c.typ }
// dispatch implements the subscriber interface by invoking the
// per-T dispatch closure that was captured at construction time.
func (c *subscriberFuncCore) dispatch(
func (c *subscriberCore) dispatch(
ctx context.Context,
vals *queue[DeliveredEvent],
acceptCh func() chan DeliveredEvent,
@ -408,7 +441,7 @@ func (c *subscriberFuncCore) dispatch(
// callback synchronously on the unboxed value.
func dispatchFunc(
ctx context.Context,
core *subscriberFuncCore,
core *subscriberCore,
vals *queue[DeliveredEvent],
acceptCh func() chan DeliveredEvent,
snapshot chan chan []DeliveredEvent,