usbip: defer darwin RET_UNLINK until completion; route VHCI through primary; close lease-consume race

Three independent correctness bugs that all violated "decide before
publishing": each surface (wire reply, sysfs path, broadcast state)
committed to a state the code later had to contradict.

darwin: switch IOUSBHostPipe abort to IOUSBHostAbortOptionSynchronous and
add the EP-0 path via abortDeviceRequestsWithOption (previously a silent
no-op for control transfers). After abort returns, await a per-pending
drained channel before writing RET_UNLINK so the late RetSubmit
suppression in finishSubmit has actually taken effect — the drained
channel is allocated lazily by markSubmitUnlinked, so the hot submit
path is unchanged.

linux: collapse the per-controller VHCI abstraction. The kernel exposes
attach/detach/status/status.N only on vhci_hcd.0 with globally unique
port numbers; the previous code wrote vhci_hcd.N/attach for N>0 and
silently failed past the primary controller's port range. Glob status*
on the primary, key reservations on bare port int, and carry the
status-suffix as a diagnostic-only secondary index for Description().

export_ledger: ConsumeLeaseAndReserve no longer publishes busy=true and
then conditionally rolls it back. Read seq under fast first, then make
every decision (including the lease.Generation check) inside one slow
critical section before any busy mutation. This removes the window
where a concurrent BroadcastIfChanged could observe transient busy and
poison l.state with no corrective broadcast on rollback.
This commit is contained in:
世界 2026-05-15 17:32:30 +08:00
parent 8e0b405b1b
commit 00a2704ecf
No known key found for this signature in database
GPG Key ID: CD109927C34A63C4
6 changed files with 174 additions and 168 deletions

View File

@ -277,57 +277,57 @@ func (l *exportLedger) IssueLease(ctx context.Context, subID uint64, request con
// ConsumeLeaseAndReserve atomically validates a lease, removes its entry,
// and marks the busid busy under a single slow-lock critical section so
// no concurrent import can observe the consume-before-reserve gap. It
// keeps the consume-on-read semantics of the old ConsumeLease: the entry
// is removed on every outcome except a nonce/ID mismatch, which preserves
// the lease for the legitimate holder. LeaseCheck is not re-run — the
// lease itself attests that LeaseCheck passed at issue time, and the
// generation equality test below confirms the export has not been
// reconciled since. The caller must pair every success with a later
// ReleaseImport.
// no concurrent import can observe the consume-before-reserve gap, and
// no concurrent broadcast can observe a transient busy=true that we
// later roll back. The generation snapshot is taken before the slow
// critical section so the comparison either matches the lease exactly
// or is strictly stale (the safe rejection direction): a broadcast that
// fires between the snapshot and slow.Lock can only advance seq.
//
// Consume-on-read semantics from the old ConsumeLease are preserved:
// the lease entry is removed on every outcome except a nonce/ID
// mismatch (which preserves the lease for the legitimate holder).
// LeaseCheck is not re-run — the lease attests that it passed at issue
// time, and the generation equality test confirms the export has not
// been reconciled since. The caller must pair every success with a
// later ReleaseImport.
func (l *exportLedger) ConsumeLeaseAndReserve(request ImportExtRequest) (Export, bool, string) {
l.fast.Lock()
currentGeneration := l.seq
l.fast.Unlock()
l.slow.Lock()
defer l.slow.Unlock()
now := l.now()
l.cleanupExpiredLocked(now)
lease, found := l.leases[request.BusID]
if !found {
l.slow.Unlock()
return nil, false, "lease not found"
}
if lease.ID != request.LeaseID || lease.ClientNonce != request.ClientNonce {
l.slow.Unlock()
return nil, false, "lease mismatch"
}
if lease.Generation != currentGeneration {
delete(l.leases, request.BusID)
return nil, false, "lease stale"
}
if !now.Before(lease.Expires) {
delete(l.leases, request.BusID)
l.slow.Unlock()
return nil, false, "lease expired"
}
export, stillExported := l.exports[request.BusID]
if !stillExported {
delete(l.leases, request.BusID)
l.slow.Unlock()
return nil, false, "unknown busid"
}
if l.busy[request.BusID] {
delete(l.leases, request.BusID)
l.slow.Unlock()
return nil, false, deviceStateBusy
}
delete(l.leases, request.BusID)
l.busy[request.BusID] = true
leaseGeneration := lease.Generation
l.slow.Unlock()
l.fast.Lock()
currentGeneration := l.seq
l.fast.Unlock()
if leaseGeneration != currentGeneration {
l.slow.Lock()
delete(l.busy, request.BusID)
l.slow.Unlock()
return nil, false, "lease stale"
}
return export, true, ""
}

View File

@ -369,6 +369,7 @@ type darwinServerDataSession struct {
type darwinServerPendingSubmit struct {
endpoint uint8
unlinked bool
drained chan struct{}
}
func newDarwinServerDataSession(ctx context.Context, logger log.ContextLogger, conn net.Conn, device *darwinUSBHostDevice) *darwinServerDataSession {
@ -474,11 +475,13 @@ func (s *darwinServerDataSession) serve() error {
return err
}
status := int32(0)
if endpoint, ok := s.markSubmitUnlinked(command.SeqNum); ok {
endpoint, drained, found := s.markSubmitUnlinked(command.SeqNum)
if found {
abortErr := s.device.abortEndpoint(endpoint)
if abortErr != nil {
s.logger.Debug("abort endpoint 0x", hex8(endpoint), ": ", abortErr)
}
<-drained
status = usbipStatusECONNRESET
}
s.writeAccess.Lock()
@ -588,27 +591,36 @@ func (s *darwinServerDataSession) trackSubmit(seq uint32, endpoint uint8) {
s.pending[seq] = darwinServerPendingSubmit{endpoint: endpoint}
}
func (s *darwinServerDataSession) markSubmitUnlinked(seq uint32) (uint8, bool) {
func (s *darwinServerDataSession) markSubmitUnlinked(seq uint32) (uint8, <-chan struct{}, bool) {
s.access.Lock()
defer s.access.Unlock()
pending, ok := s.pending[seq]
if !ok {
return 0, false
pending, found := s.pending[seq]
if !found {
return 0, nil, false
}
pending.unlinked = true
if pending.drained == nil {
pending.drained = make(chan struct{})
}
s.pending[seq] = pending
return pending.endpoint, true
return pending.endpoint, pending.drained, true
}
func (s *darwinServerDataSession) finishSubmit(seq uint32) bool {
s.access.Lock()
defer s.access.Unlock()
pending, ok := s.pending[seq]
if !ok {
pending, found := s.pending[seq]
if !found {
s.access.Unlock()
return true
}
delete(s.pending, seq)
return !pending.unlinked
drained := pending.drained
unlinked := pending.unlinked
s.access.Unlock()
if drained != nil {
close(drained)
}
return !unlinked
}
func (s *darwinServerDataSession) abortPendingSubmits() {

View File

@ -29,7 +29,7 @@ func newPlatformExportHost(logger log.ContextLogger, matches []option.USBIPDevic
func newPlatformImportHost(logger log.ContextLogger) (ImportHost, error) {
return &linuxImportHost{
logger: logger,
ports: make(map[vhciPortKey]struct{}),
ports: make(map[int]struct{}),
}, nil
}
@ -459,33 +459,14 @@ func (e *linuxExport) NewServerDataSession(ctx context.Context, conn net.Conn) (
}
type linuxImportHost struct {
logger log.ContextLogger
controllers []vhciController
logger log.ContextLogger
portsAccess sync.Mutex
ports map[vhciPortKey]struct{}
ports map[int]struct{}
}
func (h *linuxImportHost) Start(ctx context.Context) error {
controllers, err := discoverVHCIControllers()
if err != nil {
return E.Cause(err, "discover vhci controllers")
}
if len(controllers) == 0 {
err = ensureKernelPath(sysVHCIControllerV0, "vhci-hcd", "vhci_hcd.0")
if err != nil {
return err
}
controllers, err = discoverVHCIControllers()
if err != nil {
return E.Cause(err, "discover vhci controllers")
}
if len(controllers) == 0 {
return E.New("no vhci controllers present after loading vhci-hcd")
}
}
h.controllers = controllers
return nil
return ensureKernelPath(sysVHCIControllerV0, "vhci-hcd", "vhci_hcd.0")
}
func (h *linuxImportHost) Close() error {
@ -502,75 +483,87 @@ func (h *linuxImportHost) Attach(ctx context.Context, info DeviceInfoTruncated,
mode = "relay"
}
h.logger.Debug("usbip client handoff ", info.BusIDString(), ": ", mode)
ctrl, port, attachErr := h.attachOnce(ctx, info, handoff)
port, secondary, attachErr := h.attachOnce(ctx, info, handoff)
if attachErr != nil {
_ = handoff.Close()
return nil, attachErr
}
_ = handoff.Start()
return &linuxClientSession{
handoff: handoff,
host: h,
controller: ctrl,
port: port,
handoff: handoff,
host: h,
port: port,
secondary: secondary,
}, nil
}
func (h *linuxImportHost) attachOnce(ctx context.Context, info DeviceInfoTruncated, handoff *kernelHandoffSession) (vhciController, int, error) {
triedPorts := make(map[vhciPortKey]struct{})
func (h *linuxImportHost) attachOnce(ctx context.Context, info DeviceInfoTruncated, handoff *kernelHandoffSession) (int, int, error) {
triedPorts := make(map[int]struct{})
for {
ctrl, port, err := vhciPickFreePort(h.controllers, info.Speed, triedPorts)
port, err := vhciPickFreePort(info.Speed, triedPorts)
if err != nil {
return "", -1, err
return -1, 0, err
}
key := vhciPortKey{controller: ctrl, port: port}
if !h.reservePort(ctrl, port) {
triedPorts[key] = struct{}{}
if !h.reservePort(port) {
triedPorts[port] = struct{}{}
continue
}
attachLine := fmt.Sprintf("%d %d %d %d", port, int(handoff.file.Fd()), info.DevID(), info.Speed)
err = writeSysfs(filepath.Join(string(ctrl), "attach"), attachLine)
err = writeSysfs(filepath.Join(sysVHCIControllerV0, "attach"), attachLine)
if err != nil {
h.releasePort(ctrl, port)
h.releasePort(port)
if errors.Is(err, unix.EBUSY) {
triedPorts[key] = struct{}{}
triedPorts[port] = struct{}{}
continue
}
return "", -1, E.Cause(err, "vhci attach")
return -1, 0, E.Cause(err, "vhci attach")
}
err = handoff.closeKernelFD()
if err != nil {
h.logger.Debug("close kernel fd ", info.BusIDString(), ": ", err)
}
return ctrl, port, nil
return port, lookupSecondaryForPort(port), nil
}
}
func (h *linuxImportHost) reservePort(ctrl vhciController, port int) bool {
key := vhciPortKey{controller: ctrl, port: port}
func lookupSecondaryForPort(port int) int {
records, err := readPrimaryVHCIStatus()
if err != nil {
return 0
}
for _, record := range records {
if record.port == port {
return record.secondary
}
}
return 0
}
func (h *linuxImportHost) reservePort(port int) bool {
h.portsAccess.Lock()
defer h.portsAccess.Unlock()
if _, exists := h.ports[key]; exists {
h.logger.Debug(ctrl.name(), " port ", port, " already reserved locally")
_, exists := h.ports[port]
if exists {
h.logger.Debug("vhci port ", port, " already reserved locally")
return false
}
h.logger.Debug("reserve ", ctrl.name(), " port ", port)
h.ports[key] = struct{}{}
h.logger.Debug("reserve vhci port ", port)
h.ports[port] = struct{}{}
return true
}
func (h *linuxImportHost) releasePort(ctrl vhciController, port int) {
func (h *linuxImportHost) releasePort(port int) {
h.portsAccess.Lock()
defer h.portsAccess.Unlock()
h.logger.Debug("release ", ctrl.name(), " port ", port)
delete(h.ports, vhciPortKey{controller: ctrl, port: port})
h.logger.Debug("release vhci port ", port)
delete(h.ports, port)
}
type linuxClientSession struct {
handoff *kernelHandoffSession
host *linuxImportHost
controller vhciController
port int
handoff *kernelHandoffSession
host *linuxImportHost
port int
secondary int
closeOnce sync.Once
closeErr error
@ -590,14 +583,17 @@ func (s *linuxClientSession) Start() error {
func (s *linuxClientSession) Close() error {
s.closeOnce.Do(func() {
detachErr := writeSysfs(filepath.Join(string(s.controller), "detach"), strconv.Itoa(s.port))
detachErr := writeSysfs(filepath.Join(sysVHCIControllerV0, "detach"), strconv.Itoa(s.port))
closeErr := s.handoff.Close()
s.host.releasePort(s.controller, s.port)
s.host.releasePort(s.port)
s.closeErr = E.Errors(detachErr, closeErr)
})
return s.closeErr
}
func (s *linuxClientSession) Description() string {
return fmt.Sprintf("%s port %d", s.controller.name(), s.port)
if s.secondary == 0 {
return fmt.Sprintf("vhci_hcd.0 port %d", s.port)
}
return fmt.Sprintf("vhci_hcd.0 (controller %d) port %d", s.secondary, s.port)
}

View File

@ -230,7 +230,7 @@ func waitForUSBIPTeardown(condition func() bool) bool {
func detachUsedVHCIPorts() {
for _, record := range readAllVHCIStatus() {
if record.state == 6 {
_ = writeSysfs(filepath.Join(string(record.controller), "detach"), strconv.Itoa(record.port))
_ = writeSysfs(filepath.Join(sysVHCIControllerV0, "detach"), strconv.Itoa(record.port))
}
}
}

View File

@ -21,7 +21,6 @@ import (
const (
sysBusUSBDevices = "/sys/bus/usb/devices"
sysUsbipHostDriver = "/sys/bus/usb/drivers/usbip-host"
sysVHCIPlatform = "/sys/devices/platform"
sysVHCIControllerV0 = "/sys/devices/platform/vhci_hcd.0"
usbipStatusAvailable = 1
@ -29,18 +28,6 @@ const (
usbipStatusError = 3
)
// vhciController is the canonical sysfs path of a vhci_hcd platform device,
// e.g. /sys/devices/platform/vhci_hcd.0. The kernel module instantiates
// controllers at load time and never adds more at runtime.
type vhciController string
func (c vhciController) name() string { return filepath.Base(string(c)) }
type vhciPortKey struct {
controller vhciController
port int
}
type sysfsDevice struct {
BusID string
Path string
@ -79,11 +66,15 @@ func (d *sysfsDevice) toProtocol() DeviceInfoTruncated {
return info
}
// vhciStatusRecord is one row of /sys/devices/platform/vhci_hcd.0/status
// or status.N. The kernel emits globally unique port numbers across every
// status* file; secondary identifies which file the row came from
// (0 for status, N for status.N) and exists only for diagnostic logging.
type vhciStatusRecord struct {
controller vhciController
hub string
port int
state int
secondary int
hub string
port int
state int
}
func listUSBDevices() ([]sysfsDevice, error) {
@ -218,76 +209,74 @@ func waitForUsbipStatusCleared(ctx context.Context, busid string) {
}
}
func discoverVHCIControllers() ([]vhciController, error) {
matches, err := filepath.Glob(filepath.Join(sysVHCIPlatform, "vhci_hcd.*"))
// readPrimaryVHCIStatus reads every status* file under
// /sys/devices/platform/vhci_hcd.0 and concatenates the rows in lexical
// order. status reports controller 0; status.N reports controller N.
// Port numbers are already globally unique — no remapping is needed.
func readPrimaryVHCIStatus() ([]vhciStatusRecord, error) {
matches, err := filepath.Glob(filepath.Join(sysVHCIControllerV0, "status*"))
if err != nil {
return nil, err
}
sort.Strings(matches)
out := make([]vhciController, 0, len(matches))
records := make([]vhciStatusRecord, 0)
for _, path := range matches {
out = append(out, vhciController(path))
secondary, parseErr := vhciSecondaryFromStatusFile(filepath.Base(path))
if parseErr != nil {
continue
}
raw, readErr := os.ReadFile(path)
if readErr != nil {
return nil, readErr
}
records = append(records, parseVHCIStatus(secondary, string(raw))...)
}
return out, nil
return records, nil
}
func vhciPickFreePort(controllers []vhciController, speed uint32, skip map[vhciPortKey]struct{}) (vhciController, int, error) {
func readAllVHCIStatus() []vhciStatusRecord {
records, err := readPrimaryVHCIStatus()
if err != nil {
return nil
}
return records
}
func vhciPickFreePort(speed uint32, skip map[int]struct{}) (int, error) {
targetHub := "hs"
switch speed {
case SpeedSuper, SpeedSuperPlus:
targetHub = "ss"
}
var firstErr error
for _, ctrl := range controllers {
records, err := readVHCIStatus(ctrl)
if err != nil {
if firstErr == nil {
firstErr = err
}
records, err := readPrimaryVHCIStatus()
if err != nil {
return -1, err
}
for _, record := range records {
if record.hub != targetHub || record.state != 4 {
continue
}
for _, record := range records {
if record.hub != targetHub || record.state != 4 {
continue
}
key := vhciPortKey{controller: ctrl, port: record.port}
if _, skipped := skip[key]; skipped {
continue
}
return ctrl, record.port, nil
}
}
if firstErr != nil {
return "", -1, firstErr
}
return "", -1, E.New("no free ", targetHub, " vhci port")
}
func readVHCIStatus(ctrl vhciController) ([]vhciStatusRecord, error) {
raw, err := os.ReadFile(filepath.Join(string(ctrl), "status"))
if err != nil {
return nil, err
}
return parseVHCIStatus(ctrl, string(raw)), nil
}
func readAllVHCIStatus() []vhciStatusRecord {
controllers, err := discoverVHCIControllers()
if err != nil {
return nil
}
var out []vhciStatusRecord
for _, ctrl := range controllers {
records, err := readVHCIStatus(ctrl)
if err != nil {
_, skipped := skip[record.port]
if skipped {
continue
}
out = append(out, records...)
return record.port, nil
}
return out
return -1, E.New("no free ", targetHub, " vhci port")
}
func parseVHCIStatus(ctrl vhciController, raw string) []vhciStatusRecord {
func vhciSecondaryFromStatusFile(name string) (int, error) {
if name == "status" {
return 0, nil
}
suffix := strings.TrimPrefix(name, "status.")
if suffix == name {
return 0, E.New("not a status file: ", name)
}
return strconv.Atoi(suffix)
}
func parseVHCIStatus(secondary int, raw string) []vhciStatusRecord {
scanner := bufio.NewScanner(strings.NewReader(raw))
records := make([]vhciStatusRecord, 0)
first := true
@ -313,10 +302,10 @@ func parseVHCIStatus(ctrl vhciController, raw string) []vhciStatusRecord {
continue
}
records = append(records, vhciStatusRecord{
controller: ctrl,
hub: fields[0],
port: port,
state: state,
secondary: secondary,
hub: fields[0],
port: port,
state: state,
})
}
return records

View File

@ -664,14 +664,23 @@ bool box_usbhost_device_iso(box_usbhost_device_t *device, uint8_t endpoint, uint
bool box_usbhost_device_abort_endpoint(box_usbhost_device_t *device, uint8_t endpoint, char **error_out) {
BoxUSBHostDevice *box = box_device(device);
IOUSBHostPipe *pipe = box_pipe_for_endpoint(box, endpoint);
if (pipe == nil) {
return true;
if (box == nil) {
box_set_error_string(error_out, @"IOUSBHost abort: invalid device handle");
return false;
}
NSError *error = nil;
BOOL ok = [pipe abortWithOption:IOUSBHostAbortOptionAsynchronous error:&error];
BOOL ok;
if ((endpoint & kIOUSBEndpointDescriptorNumber) == 0) {
ok = [box.device abortDeviceRequestsWithOption:IOUSBHostAbortOptionSynchronous error:&error];
} else {
IOUSBHostPipe *pipe = box_pipe_for_endpoint(box, endpoint);
if (pipe == nil) {
return true;
}
ok = [pipe abortWithOption:IOUSBHostAbortOptionSynchronous error:&error];
}
if (!ok) {
box_set_error_from_nserror(error_out, @"IOUSBHostPipe abortWithOption", error);
box_set_error_from_nserror(error_out, @"IOUSBHost abort", error);
}
return ok;
}