usbip: align cancel wire with linux and keep stale exports in snapshot

The peer now owns submit/cancel lifecycle: pendingSubmit retains the
original DevID per submit, CMD_UNLINK carries it (stub_rx valid_request
was silently rejecting devid=0), and RET_UNLINK finalizes the bound
transaction. UrbTransaction.Cancel delegates instead of assembling the
wire itself. Stale exports stay in linux and darwin host snapshots so
the ledger broadcasts unavailable State updates rather than Removed.
This commit is contained in:
世界 2026-05-16 00:03:20 +08:00
parent d99e3b0b4a
commit 0cda1659bc
No known key found for this signature in database
GPG Key ID: CD109927C34A63C4
7 changed files with 225 additions and 52 deletions

View File

@ -42,10 +42,12 @@ type Export interface {
}
// ExportSnapshot Backend and StableID fields are populated
// unconditionally. Unavailable snapshots should keep a cached Entry,
// unconditionally. Unavailable snapshots keep a cached Entry,
// including BusID, so the caller can broadcast a state transition
// instead of removing the device outright; snapshots without a BusID
// are treated as non-broadcastable.
// are treated as non-broadcastable. ExportHost.Reconcile returns
// stale entries in its snapshot for the same reason — the ledger
// filters non-broadcastable snapshots in one place, not the host.
type ExportSnapshot struct {
Entry DeviceEntry
Backend string

View File

@ -186,13 +186,7 @@ func (h *darwinExportHost) Reconcile(ctx context.Context, isBusy func(busid stri
for _, exp := range toAdd {
h.exports[exp.busid] = exp
}
out := make(map[string]Export, len(h.exports))
for busid, exp := range h.exports {
if exp.stale {
continue
}
out[busid] = exp
}
out := snapshotDarwinExports(h.exports)
h.access.Unlock()
for _, exp := range toRemove {
@ -253,11 +247,16 @@ func (h *darwinExportHost) FinishImport(ctx context.Context, busid string) (bool
func (h *darwinExportHost) snapshotSelf() map[string]Export {
h.access.Lock()
defer h.access.Unlock()
out := make(map[string]Export, len(h.exports))
for busid, exp := range h.exports {
if exp.stale {
continue
}
return snapshotDarwinExports(h.exports)
}
// snapshotDarwinExports returns every tracked export, including stale
// ones, matching the ExportSnapshot contract: stale exports surface
// to the ledger so they broadcast as State: unavailable updates
// instead of disappearing.
func snapshotDarwinExports(exports map[string]*darwinExport) map[string]Export {
out := make(map[string]Export, len(exports))
for busid, exp := range exports {
out[busid] = exp
}
return out

View File

@ -430,12 +430,14 @@ func (h *linuxExportHost) snapshotSelf() map[string]Export {
return snapshotLinuxExports(h.exports)
}
// snapshotLinuxExports returns every tracked export, including stale
// ones. The ledger treats stale entries as broadcastable State:
// unavailable updates via Export.Snapshot, which is what the
// ExportSnapshot contract requires; filtering here would surface a
// removed device instead of an updated one.
func snapshotLinuxExports(exports map[string]*linuxExport) map[string]Export {
out := make(map[string]Export, len(exports))
for busid, exp := range exports {
if exp.stale {
continue
}
out[busid] = exp
}
return out

View File

@ -53,7 +53,7 @@ func (t *UrbTransaction) Cancel(ctx context.Context) error {
type writeResult struct{ err error }
resultCh := make(chan writeResult, 1)
go func() {
resultCh <- writeResult{err: t.peer.writeUnlink(t.seqnum)}
resultCh <- writeResult{err: t.peer.cancel(t.seqnum)}
}()
select {

View File

@ -35,6 +35,12 @@ func TestUrbTransactionWaitContextCancel(t *testing.T) {
require.ErrorIs(t, err, context.DeadlineExceeded)
}
// TestUrbTransactionCancelIdempotent exercises the canonical Linux
// cancel wire: after CMD_UNLINK the server replies only with
// RET_UNLINK (status ECONNRESET), never a parallel RET_SUBMIT. The
// peer must finalize the transaction off RET_UNLINK alone, and N
// concurrent Cancel() callers must produce exactly one CMD_UNLINK on
// the wire.
func TestUrbTransactionCancelIdempotent(t *testing.T) {
peer, server, _ := newPeerPair(t)
@ -44,18 +50,17 @@ func TestUrbTransactionCancelIdempotent(t *testing.T) {
go func() {
defer serverDone.Done()
submit := server.readSubmit(t)
// Read exactly one CMD_UNLINK.
unlink := server.readUnlink(t)
unlinks.Add(1)
require.Equal(t, submit.Header.SeqNum, unlink.SeqNum)
server.writeSubmitResponse(t, USBIPDirIn, submit.Header.SeqNum, usbipStatusECONNRESET, nil, nil)
server.writeUnlinkResponse(t, unlink.Header.SeqNum, 0)
require.Equal(t, submit.Header.DevID, unlink.Header.DevID)
server.writeUnlinkResponse(t, unlink.Header.SeqNum, usbipStatusECONNRESET)
}()
transaction, err := peer.Submit(SubmitCommand{
Header: DataHeader{
Command: CmdSubmit,
DevID: 1,
DevID: 0xCAFEF00D,
Direction: USBIPDirIn,
Endpoint: 1,
},
@ -82,6 +87,91 @@ func TestUrbTransactionCancelIdempotent(t *testing.T) {
require.Equal(t, int32(1), unlinks.Load(), "Cancel wrote CMD_UNLINK more than once")
}
// TestUrbTransactionCancelAfterUrbCompleted models Linux's
// stub_recv_cmd_unlink race: the URB completes on the server before
// CMD_UNLINK arrives, so the server still emits RET_SUBMIT for the
// completed transfer and then emits RET_UNLINK with status 0. The
// client called Cancel mid-flight; the peer must finalize as
// ErrCanceled (canceling flag wins) and absorb the trailing
// RET_UNLINK without crashing the read loop.
func TestUrbTransactionCancelAfterUrbCompleted(t *testing.T) {
peer, server, _ := newPeerPair(t)
var serverDone sync.WaitGroup
serverDone.Add(1)
go func() {
defer serverDone.Done()
submit := server.readSubmit(t)
unlink := server.readUnlink(t)
require.Equal(t, submit.Header.SeqNum, unlink.SeqNum)
// URB completed before unlink: real status data goes on the
// wire, then RET_UNLINK status=0 reports "nothing to cancel".
server.writeSubmitResponse(t, USBIPDirIn, submit.Header.SeqNum, 0, []byte{1, 2, 3, 4}, nil)
server.writeUnlinkResponse(t, unlink.Header.SeqNum, 0)
}()
transaction, err := peer.Submit(SubmitCommand{
Header: DataHeader{
Command: CmdSubmit,
DevID: 1,
Direction: USBIPDirIn,
Endpoint: 1,
},
TransferBufferLength: 4,
})
require.NoError(t, err)
require.NoError(t, transaction.Cancel(context.Background()))
_, err = transaction.Wait(context.Background())
require.ErrorIs(t, err, ErrCanceled)
serverDone.Wait()
// The peer must remain healthy after absorbing the trailing
// RET_UNLINK; close it cleanly.
require.NoError(t, peer.Close())
}
// TestUrbTransactionCancelWireCarriesDevID pins down the Linux
// stub_rx valid_request check: CMD_UNLINK must carry the original
// submit's DevID. The bytes travel through a real socket pair into a
// real ReadDataHeader, so this observes the actual wire effect rather
// than a builder property.
func TestUrbTransactionCancelWireCarriesDevID(t *testing.T) {
peer, server, _ := newPeerPair(t)
var serverDone sync.WaitGroup
serverDone.Add(1)
go func() {
defer serverDone.Done()
submit := server.readSubmit(t)
unlink := server.readUnlink(t)
require.Equal(t, uint32(0xDEADBEEF), unlink.Header.DevID)
require.Equal(t, submit.Header.SeqNum, unlink.SeqNum)
server.writeUnlinkResponse(t, unlink.Header.SeqNum, usbipStatusECONNRESET)
}()
transaction, err := peer.Submit(SubmitCommand{
Header: DataHeader{
Command: CmdSubmit,
DevID: 0xDEADBEEF,
Direction: USBIPDirOut,
Endpoint: 2,
},
TransferBufferLength: 4,
Buffer: []byte{5, 6, 7, 8},
})
require.NoError(t, err)
require.NoError(t, transaction.Cancel(context.Background()))
_, err = transaction.Wait(context.Background())
require.ErrorIs(t, err, ErrCanceled)
serverDone.Wait()
}
func TestUrbTransactionCancelAfterTerminalNoWire(t *testing.T) {
peer, server, _ := newPeerPair(t)

View File

@ -26,18 +26,33 @@ var (
ErrCanceled error = urbCanceledError{}
)
// pendingSubmit retains the per-submit metadata the peer needs to
// emit a protocol-conformant CMD_UNLINK and to bind RET_UNLINK back
// to the originating submit. DevID is required by Linux's stub_rx
// valid_request check; without it cancellation packets are silently
// rejected.
type pendingSubmit struct {
transaction *UrbTransaction
devID uint32
}
type UsbIpPeer struct {
ctx context.Context
cancel context.CancelFunc
logger log.ContextLogger
conn net.Conn
ctx context.Context
closeCtx context.CancelFunc
logger log.ContextLogger
conn net.Conn
seq atomic.Uint32
writeAccess sync.Mutex
pendingAccess sync.Mutex
pending map[uint32]*UrbTransaction
// pendingAccess guards both pending and unlinkBindings: a
// CMD_UNLINK write registers a binding and inspects pending
// atomically so RET_UNLINK arriving immediately afterwards can
// always be matched back to its submit.
pendingAccess sync.Mutex
pending map[uint32]*pendingSubmit
unlinkBindings map[uint32]uint32
done chan struct{}
closeOnce sync.Once
@ -47,14 +62,15 @@ type UsbIpPeer struct {
}
func NewUsbIpPeer(ctx context.Context, logger log.ContextLogger, conn net.Conn) *UsbIpPeer {
ctx, cancel := context.WithCancel(ctx)
ctx, closeCtx := context.WithCancel(ctx)
peer := &UsbIpPeer{
ctx: ctx,
cancel: cancel,
logger: logger,
conn: conn,
pending: make(map[uint32]*UrbTransaction),
done: make(chan struct{}),
ctx: ctx,
closeCtx: closeCtx,
logger: logger,
conn: conn,
pending: make(map[uint32]*pendingSubmit),
unlinkBindings: make(map[uint32]uint32),
done: make(chan struct{}),
}
go peer.readLoop()
return peer
@ -79,7 +95,10 @@ func (p *UsbIpPeer) Submit(command SubmitCommand) (*UrbTransaction, error) {
p.pendingAccess.Unlock()
return nil, ErrPeerClosed
}
p.pending[seqnum] = transaction
p.pending[seqnum] = &pendingSubmit{
transaction: transaction,
devID: command.Header.DevID,
}
p.pendingAccess.Unlock()
p.writeAccess.Lock()
@ -104,24 +123,51 @@ func (p *UsbIpPeer) Err() error {
func (p *UsbIpPeer) Close() error {
p.closeOnce.Do(func() {
p.cancel()
p.closeCtx()
_ = p.conn.Close()
})
<-p.done
return nil
}
func (p *UsbIpPeer) writeUnlink(submitSeqnum uint32) error {
// cancel sends CMD_UNLINK for the submit identified by submitSeqnum.
// It is a no-op when the submit is no longer pending (already
// finalized, or never registered): Linux's RET_UNLINK in that race
// carries status 0 and adds no observable state to the client. When
// the wire write fails the unlink binding is rolled back so a later
// retry can re-allocate without leaking the binding slot.
func (p *UsbIpPeer) cancel(submitSeqnum uint32) error {
unlinkSeqnum := p.seq.Add(1)
p.pendingAccess.Lock()
if p.pending == nil {
p.pendingAccess.Unlock()
return nil
}
submit, found := p.pending[submitSeqnum]
if !found {
p.pendingAccess.Unlock()
return nil
}
devID := submit.devID
p.unlinkBindings[unlinkSeqnum] = submitSeqnum
p.pendingAccess.Unlock()
p.writeAccess.Lock()
err := WriteUnlinkCommand(p.conn, UnlinkCommand{
Header: DataHeader{
Command: CmdUnlink,
SeqNum: unlinkSeqnum,
DevID: devID,
},
SeqNum: submitSeqnum,
})
p.writeAccess.Unlock()
if err != nil {
p.pendingAccess.Lock()
delete(p.unlinkBindings, unlinkSeqnum)
p.pendingAccess.Unlock()
}
return err
}
@ -139,7 +185,11 @@ func (p *UsbIpPeer) lookupPending(seqnum uint32) *UrbTransaction {
if p.pending == nil {
return nil
}
return p.pending[seqnum]
submit, found := p.pending[seqnum]
if !found {
return nil
}
return submit.transaction
}
func (p *UsbIpPeer) consumePending(seqnum uint32) *UrbTransaction {
@ -148,16 +198,40 @@ func (p *UsbIpPeer) consumePending(seqnum uint32) *UrbTransaction {
if p.pending == nil {
return nil
}
transaction := p.pending[seqnum]
if transaction != nil {
delete(p.pending, seqnum)
submit, found := p.pending[seqnum]
if !found {
return nil
}
return transaction
delete(p.pending, seqnum)
return submit.transaction
}
// consumeUnlink resolves a RET_UNLINK back to its submit transaction
// in one critical section: the binding entry is removed regardless,
// and the submit is consumed only when it has not already been
// finalized by an earlier RET_SUBMIT.
func (p *UsbIpPeer) consumeUnlink(unlinkSeqnum uint32) *UrbTransaction {
p.pendingAccess.Lock()
defer p.pendingAccess.Unlock()
if p.unlinkBindings == nil || p.pending == nil {
return nil
}
submitSeqnum, found := p.unlinkBindings[unlinkSeqnum]
if !found {
return nil
}
delete(p.unlinkBindings, unlinkSeqnum)
submit, stillPending := p.pending[submitSeqnum]
if !stillPending {
return nil
}
delete(p.pending, submitSeqnum)
return submit.transaction
}
func (p *UsbIpPeer) readLoop() {
defer close(p.done)
defer p.cancel()
defer p.closeCtx()
defer p.drainPending()
for {
@ -188,9 +262,11 @@ func (p *UsbIpPeer) readLoop() {
p.setReadError(err)
return
}
// The submit seqnum bound to this unlink is unknown without an
// additional map; we rely on the matching RET_SUBMIT (with
// status=ECONNRESET on cancel) to finalize the transaction.
transaction := p.consumeUnlink(header.SeqNum)
if transaction == nil {
continue
}
transaction.finalize(SubmitResponse{}, ErrCanceled)
default:
p.setReadError(E.New(fmt.Sprintf("unexpected USB/IP response 0x%08x", header.Command)))
return
@ -213,9 +289,10 @@ func (p *UsbIpPeer) drainPending() {
p.pendingAccess.Lock()
pending := p.pending
p.pending = nil
p.unlinkBindings = nil
p.pendingAccess.Unlock()
for _, transaction := range pending {
transaction.finalize(SubmitResponse{}, ErrPeerClosed)
for _, submit := range pending {
submit.transaction.finalize(SubmitResponse{}, ErrPeerClosed)
}
}

View File

@ -144,6 +144,10 @@ func TestUsbIpPeerSessionCloseFailsPending(t *testing.T) {
require.ErrorIs(t, err, ErrPeerClosed)
}
// TestUsbIpPeerCancelMidFlight uses the canonical Linux cancel wire:
// after CMD_UNLINK the server emits only RET_UNLINK (status
// ECONNRESET), never a parallel RET_SUBMIT. The peer finalizes the
// transaction off RET_UNLINK alone.
func TestUsbIpPeerCancelMidFlight(t *testing.T) {
peer, server, _ := newPeerPair(t)
@ -156,9 +160,8 @@ func TestUsbIpPeerCancelMidFlight(t *testing.T) {
require.Equal(t, USBIPDirIn, submit.Header.Direction)
unlink := server.readUnlink(t)
require.Equal(t, submit.Header.SeqNum, unlink.SeqNum)
// RET_SUBMIT with ECONNRESET (canceled), buffer empty.
server.writeSubmitResponse(t, USBIPDirIn, submit.Header.SeqNum, usbipStatusECONNRESET, nil, nil)
server.writeUnlinkResponse(t, unlink.Header.SeqNum, 0)
require.Equal(t, submit.Header.DevID, unlink.Header.DevID)
server.writeUnlinkResponse(t, unlink.Header.SeqNum, usbipStatusECONNRESET)
}()
transaction, err := peer.Submit(SubmitCommand{