tailscale/ipn/ipnlocal/bus_test.go
Brad Fitzpatrick aa5da2e5f2 ipn/ipnlocal, control/controlclient: process node adds/removes in constant time
For large tailnets (~50k+ nodes) with frequent peer churn (ephemeral
GitHub Actions workers etc.), tailscaled used to rebuild the full
netmap and fan it out on the IPN bus on every MapResponse that
added or removed a peer. There were two O(N) costs per delta: the
full netmap rebuild + every Notify.NetMap encode to every bus watcher.

This change tackles both:

  1. Plumb O(1) peer add/remove through the delta path. PeersChanged
     and PeersRemoved no longer prevent the delta happy path; instead,
     they mutate the per-node-backend peer map in place.

  2. Restrict ipn.Notify.NetMap emission to the platforms whose host
     GUIs still depend on it (Windows, macOS, iOS) and migrate
     in-tree consumers off it everywhere else:

     - Migrate reactive consumers (containerboot, kube agents,
       sniproxy, tsconsensus, etc.) off Notify.NetMap to the
       previously-added Notify.SelfChange signal so they no longer
       have to subscribe to the full netmap.
     - Add ipn.NotifyNoNetMap so GUI clients on "legacy-emit" platforms
       that have already migrated can opt out of the per-watcher
       NetMap encode.
     - Gate Notify.NetMap emission on the producer side by a compile-
       time GOOS check, so the supporting code is dead-code-eliminated
       on Linux and other geese where no GUI consumer needs it.

Re-running BenchmarkGiantTailnet from tstest/largetailnet, which was
added along with baseline numbers on unmodified main in ad5436af0d,
the per-delta cost (one peer add+remove pair) is now ~O(1) regardless
of tailnet size N:

    N         no-watcher (ms/op)            bus-watcher (ms/op)
              before    now     factor      before    now     factor
     10000        32   0.11       300x         166   0.13      1300x
     50000       222   0.11      2000x         865   0.13      6700x
    100000       504   0.12      4100x        1765   0.13     13400x
    250000      1551   0.12     12500x        4696   0.15     32400x

Updates #12542

Change-Id: I94e34b37331d1a8ec74c299deffadf4d061fda9e
Signed-off-by: Brad Fitzpatrick <bradfitz@tailscale.com>
2026-05-21 09:26:19 -07:00

328 lines
8.8 KiB
Go

// Copyright (c) Tailscale Inc & contributors
// SPDX-License-Identifier: BSD-3-Clause
package ipnlocal
import (
"context"
"reflect"
"slices"
"testing"
"time"
"tailscale.com/drive"
"tailscale.com/ipn"
"tailscale.com/tailcfg"
"tailscale.com/tstest"
"tailscale.com/tstime"
"tailscale.com/types/logger"
"tailscale.com/types/netmap"
"tailscale.com/types/views"
)
func TestIsNotableNotify(t *testing.T) {
tests := []struct {
name string
notify *ipn.Notify
want bool
}{
{"nil", nil, false},
{"empty", &ipn.Notify{}, false},
{"version", &ipn.Notify{Version: "foo"}, false},
{"netmap", &ipn.Notify{NetMap: new(netmap.NetworkMap)}, false},
{"peerchanges", &ipn.Notify{PeerChangedPatch: []*tailcfg.PeerChange{{}}}, false},
{"peerschanged", &ipn.Notify{PeersChanged: []*tailcfg.Node{{}}}, false},
{"peersremoved", &ipn.Notify{PeersRemoved: []tailcfg.NodeID{1}}, false},
{"userprofiles", &ipn.Notify{UserProfiles: map[tailcfg.UserID]tailcfg.UserProfileView{1: (&tailcfg.UserProfile{}).View()}}, false},
{"engine", &ipn.Notify{Engine: new(ipn.EngineStatus)}, false},
{"selfchange", &ipn.Notify{SelfChange: &tailcfg.Node{}}, true},
}
// Then for all other fields, assume they're notable.
// We use reflect to catch fields that might be added in the future without
// remembering to update the [isNotableNotify] function.
rt := reflect.TypeFor[ipn.Notify]()
for sf := range rt.Fields() {
n := &ipn.Notify{}
switch sf.Name {
case "_", "NetMap", "PeerChangedPatch", "SelfChange", "PeersChanged", "PeersRemoved", "UserProfiles", "Engine", "Version":
// Already covered above or not applicable.
continue
case "DriveShares":
n.DriveShares = views.SliceOfViews[*drive.Share, drive.ShareView](make([]*drive.Share, 1))
default:
rf := reflect.ValueOf(n).Elem().FieldByIndex(sf.Index)
switch rf.Kind() {
case reflect.Pointer:
rf.Set(reflect.New(rf.Type().Elem()))
case reflect.String:
rf.SetString("foo")
case reflect.Slice:
rf.Set(reflect.MakeSlice(rf.Type(), 1, 1))
default:
t.Errorf("unhandled field kind %v for %q", rf.Kind(), sf.Name)
}
}
tests = append(tests, struct {
name string
notify *ipn.Notify
want bool
}{
name: "field-" + sf.Name,
notify: n,
want: true,
})
}
for _, tt := range tests {
if got := isNotableNotify(tt.notify); got != tt.want {
t.Errorf("%v: got %v; want %v", tt.name, got, tt.want)
}
}
}
type rateLimitingBusSenderTester struct {
tb testing.TB
got []*ipn.Notify
clock *tstest.Clock
s *rateLimitingBusSender
}
func (st *rateLimitingBusSenderTester) init() {
if st.s != nil {
return
}
st.clock = tstest.NewClock(tstest.ClockOpts{
Start: time.Unix(1731777537, 0), // time I wrote this test :)
})
st.s = &rateLimitingBusSender{
clock: tstime.DefaultClock{Clock: st.clock},
fn: func(n *ipn.Notify) bool {
st.got = append(st.got, n)
return true
},
}
}
func (st *rateLimitingBusSenderTester) send(n *ipn.Notify) {
st.tb.Helper()
st.init()
if !st.s.send(n) {
st.tb.Fatal("unexpected send failed")
}
}
func (st *rateLimitingBusSenderTester) advance(d time.Duration) {
st.tb.Helper()
st.clock.Advance(d)
select {
case <-st.s.flushChan():
if !st.s.flush() {
st.tb.Fatal("unexpected flush failed")
}
default:
}
}
func TestRateLimitingBusSender(t *testing.T) {
// Both share NodeID 1 so merge collapses to a single PeerChange and
// the later one (nm2) wins.
nm1 := &ipn.Notify{PeerChangedPatch: []*tailcfg.PeerChange{{NodeID: 1, DERPRegion: 1}}}
nm2 := &ipn.Notify{PeerChangedPatch: []*tailcfg.PeerChange{{NodeID: 1, DERPRegion: 2}}}
eng1 := &ipn.Notify{Engine: new(ipn.EngineStatus)}
eng2 := &ipn.Notify{Engine: new(ipn.EngineStatus)}
t.Run("unbuffered", func(t *testing.T) {
st := &rateLimitingBusSenderTester{tb: t}
st.send(nm1)
st.send(nm2)
st.send(eng1)
st.send(eng2)
if !slices.Equal(st.got, []*ipn.Notify{nm1, nm2, eng1, eng2}) {
t.Errorf("got %d items; want 4 specific ones, unmodified", len(st.got))
}
})
t.Run("buffered", func(t *testing.T) {
st := &rateLimitingBusSenderTester{tb: t}
st.init()
st.s.interval = 1 * time.Second
st.send(&ipn.Notify{Version: "initial"})
if len(st.got) != 1 {
t.Fatalf("got %d items; expected 1 (first to flush immediately)", len(st.got))
}
st.send(nm1)
st.send(nm2)
st.send(eng1)
st.send(eng2)
if len(st.got) != 1 {
if len(st.got) != 1 {
t.Fatalf("got %d items; expected still just that first 1", len(st.got))
}
}
// But moving the clock should flush the rest, collasced into one new one.
st.advance(5 * time.Second)
if len(st.got) != 2 {
t.Fatalf("got %d items; want 2", len(st.got))
}
gotn := st.got[1]
if !reflect.DeepEqual(gotn.PeerChangedPatch, nm2.PeerChangedPatch) {
t.Errorf("got wrong PeerChangedPatch; got %v want %v", gotn.PeerChangedPatch, nm2.PeerChangedPatch)
}
if gotn.Engine != eng2.Engine {
t.Errorf("got wrong Engine; got %p", gotn.Engine)
}
if t.Failed() {
t.Logf("failed Notify was: %v", logger.AsJSON(gotn))
}
})
// Test the Run method
t.Run("run", func(t *testing.T) {
st := &rateLimitingBusSenderTester{tb: t}
st.init()
st.s.interval = 1 * time.Second
st.s.lastFlush = st.clock.Now() // pretend we just flushed
flushc := make(chan *ipn.Notify, 1)
st.s.fn = func(n *ipn.Notify) bool {
flushc <- n
return true
}
didSend := make(chan bool, 2)
st.s.didSendTestHook = func() { didSend <- true }
waitSend := func() {
select {
case <-didSend:
case <-time.After(5 * time.Second):
t.Error("timeout waiting for call to send")
}
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
incoming := make(chan *ipn.Notify, 2)
go func() {
incoming <- nm1
waitSend()
incoming <- nm2
waitSend()
st.advance(5 * time.Second)
select {
case n := <-flushc:
if !reflect.DeepEqual(n.PeerChangedPatch, nm2.PeerChangedPatch) {
t.Errorf("got wrong PeerChangedPatch; got %v want %v", n.PeerChangedPatch, nm2.PeerChangedPatch)
}
case <-time.After(10 * time.Second):
t.Error("timeout")
}
cancel()
}()
st.s.Run(ctx, incoming)
})
}
func TestMergePeerChangedPatch(t *testing.T) {
online := true
offline := false
t.Run("no_overlap_appends", func(t *testing.T) {
old := []*tailcfg.PeerChange{
{NodeID: 1, DERPRegion: 1},
}
new := []*tailcfg.PeerChange{
{NodeID: 2, DERPRegion: 2},
}
got := mergePeerChangedPatch(old, new)
if len(got) != 2 {
t.Fatalf("len = %d; want 2", len(got))
}
if got[0].NodeID != 1 || got[1].NodeID != 2 {
t.Errorf("got NodeIDs %d, %d; want 1, 2", got[0].NodeID, got[1].NodeID)
}
})
t.Run("overlap_merges", func(t *testing.T) {
old := []*tailcfg.PeerChange{
{NodeID: 1, DERPRegion: 1, Online: &online},
{NodeID: 2, DERPRegion: 10},
}
new := []*tailcfg.PeerChange{
{NodeID: 1, DERPRegion: 5, Online: &offline},
}
got := mergePeerChangedPatch(old, new)
if len(got) != 2 {
t.Fatalf("len = %d; want 2 (merged, not appended)", len(got))
}
if got[0].DERPRegion != 5 {
t.Errorf("DERPRegion = %d; want 5 (from new)", got[0].DERPRegion)
}
if *got[0].Online != false {
t.Errorf("Online = %v; want false (from new)", *got[0].Online)
}
// Node 2 should be untouched.
if got[1].NodeID != 2 || got[1].DERPRegion != 10 {
t.Errorf("node 2 was modified unexpectedly")
}
})
t.Run("partial_overlap_merges_and_appends", func(t *testing.T) {
old := []*tailcfg.PeerChange{
{NodeID: 1, DERPRegion: 1},
}
new := []*tailcfg.PeerChange{
{NodeID: 1, DERPRegion: 2},
{NodeID: 3, DERPRegion: 30},
}
got := mergePeerChangedPatch(old, new)
if len(got) != 2 {
t.Fatalf("len = %d; want 2", len(got))
}
if got[0].NodeID != 1 || got[0].DERPRegion != 2 {
t.Errorf("node 1: DERPRegion = %d; want 2", got[0].DERPRegion)
}
if got[1].NodeID != 3 || got[1].DERPRegion != 30 {
t.Errorf("node 3: DERPRegion = %d; want 30", got[1].DERPRegion)
}
})
t.Run("preserves_old_fields_on_merge", func(t *testing.T) {
old := []*tailcfg.PeerChange{
{NodeID: 1, DERPRegion: 1, Online: &online, Cap: 10},
}
new := []*tailcfg.PeerChange{
{NodeID: 1, Online: &offline},
}
got := mergePeerChangedPatch(old, new)
if len(got) != 1 {
t.Fatalf("len = %d; want 1", len(got))
}
if got[0].DERPRegion != 1 {
t.Errorf("DERPRegion = %d; want 1 (preserved from old)", got[0].DERPRegion)
}
if got[0].Cap != 10 {
t.Errorf("Cap = %d; want 10 (preserved from old)", got[0].Cap)
}
if *got[0].Online != false {
t.Errorf("Online = %v; want false (from new)", *got[0].Online)
}
})
t.Run("nil_old", func(t *testing.T) {
new := []*tailcfg.PeerChange{
{NodeID: 1, DERPRegion: 1},
}
got := mergePeerChangedPatch(nil, new)
if len(got) != 1 {
t.Fatalf("len = %d; want 1", len(got))
}
if got[0].NodeID != 1 {
t.Errorf("NodeID = %d; want 1", got[0].NodeID)
}
})
}