From 672e810becf20bec295e2815f9614ac147fa4712 Mon Sep 17 00:00:00 2001 From: emanuele-f Date: Fri, 28 Jan 2022 18:00:40 +0100 Subject: [PATCH] Improve capture performance with PCAP dump PCAP dump is now performed into a separate thread. This greatly reduces the chance for packet loss in root mode as well as preventing latency spikes in VPN mode. --- .../remote_capture/CaptureService.java | 128 ++++++++++++------ app/src/main/jni/core/capture_root.c | 6 +- app/src/main/jni/core/jni_impl.c | 2 +- app/src/main/jni/pcapd/pcapd.c | 4 +- 4 files changed, 97 insertions(+), 43 deletions(-) diff --git a/app/src/main/java/com/emanuelef/remote_capture/CaptureService.java b/app/src/main/java/com/emanuelef/remote_capture/CaptureService.java index 5f022d6e..2302d34d 100644 --- a/app/src/main/java/com/emanuelef/remote_capture/CaptureService.java +++ b/app/src/main/java/com/emanuelef/remote_capture/CaptureService.java @@ -99,7 +99,9 @@ public class CaptureService extends VpnService implements Runnable { private Thread mCaptureThread; private Thread mBlacklistsUpdateThread; private Thread mConnUpdateThread; + private Thread mDumperThread; private final LinkedBlockingDeque> mPendingUpdates = new LinkedBlockingDeque<>(32); + private LinkedBlockingDeque mDumpQueue; private String vpn_ipv4; private String vpn_dns; private String dns_server; @@ -273,6 +275,9 @@ public class CaptureService extends VpnService implements Runnable { } if(mDumper != null) { + // Max memory usage = (JAVA_PCAP_BUFFER_SIZE * 64) = 32 MB + mDumpQueue = new LinkedBlockingDeque<>(64); + try { mDumper.startDumper(); } catch (IOException | SecurityException e) { @@ -358,6 +363,11 @@ public class CaptureService extends VpnService implements Runnable { mConnUpdateThread = new Thread(this::connUpdateWork, "UpdateListener"); mConnUpdateThread.start(); + if(mDumper != null) { + mDumperThread = new Thread(this::dumpWork, "DumperThread"); + mDumperThread.start(); + } + // Start the native capture thread mQueueFull = false; mCaptureThread = new Thread(this, "PacketCapture"); @@ -395,6 +405,7 @@ public class CaptureService extends VpnService implements Runnable { e.printStackTrace(); } mDumper = null; + mDumpQueue.clear(); } appsResolver = null; @@ -571,7 +582,10 @@ public class CaptureService extends VpnService implements Runnable { private void stop() { stopPacketLoop(); - mPendingUpdates.offer(new Pair<>(null, null)); // signal termination to the mConnUpdateThread + + // signal termination + mPendingUpdates.offer(new Pair<>(null, null)); + mDumpQueue.offer(new byte[0]); while((mCaptureThread != null) && (mCaptureThread.isAlive())) { try { @@ -594,6 +608,18 @@ public class CaptureService extends VpnService implements Runnable { } mConnUpdateThread = null; + while((mDumperThread != null) && (mDumperThread.isAlive())) { + try { + Log.d(TAG, "Joining dumper thread..."); + mDumperThread.join(); + } catch (InterruptedException e) { + Log.e(TAG, "Joining dumper thread failed"); + mDumpQueue.offer(new byte[0]); + } + } + mDumperThread = null; + mDumper = null; + if(mParcelFileDescriptor != null) { try { mParcelFileDescriptor.close(); @@ -603,15 +629,6 @@ public class CaptureService extends VpnService implements Runnable { mParcelFileDescriptor = null; } - if(mDumper != null) { - try { - mDumper.stopDumper(); - } catch (IOException e) { - e.printStackTrace(); - } - mDumper = null; - } - mPcapUri = null; mPendingUpdates.clear(); unregisterNetworkCallbacks(); @@ -780,28 +797,60 @@ public class CaptureService extends VpnService implements Runnable { } private void connUpdateWork() { - try { - while(true) { - Pair item = mPendingUpdates.take(); - if(item.first == null) // termination request - break; - - ConnectionDescriptor[] new_conns = item.first; - ConnectionUpdate[] conns_updates = item.second; - - checkBlacklistsUpdates(); - - // synchronize the conn_reg to ensure that newConnections and connectionsUpdates run atomically - // thus preventing the ConnectionsAdapter from interleaving other operations - synchronized (conn_reg) { - if(new_conns.length > 0) - conn_reg.newConnections(new_conns); - - if(conns_updates.length > 0) - conn_reg.connectionsUpdates(conns_updates); - } + while(true) { + Pair item; + try { + item = mPendingUpdates.take(); + } catch (InterruptedException e) { + continue; } - } catch (InterruptedException e) { + + if(item.first == null) // termination request + break; + + ConnectionDescriptor[] new_conns = item.first; + ConnectionUpdate[] conns_updates = item.second; + + checkBlacklistsUpdates(); + + // synchronize the conn_reg to ensure that newConnections and connectionsUpdates run atomically + // thus preventing the ConnectionsAdapter from interleaving other operations + synchronized (conn_reg) { + if(new_conns.length > 0) + conn_reg.newConnections(new_conns); + + if(conns_updates.length > 0) + conn_reg.connectionsUpdates(conns_updates); + } + } + } + + private void dumpWork() { + while(true) { + byte[] data; + try { + data = mDumpQueue.take(); + } catch (InterruptedException e) { + continue; + } + + if(data.length == 0) // termination request + break; + + try { + mDumper.dumpData(data); + } catch (IOException e) { + // Stop the capture + e.printStackTrace(); + reportError(e.getLocalizedMessage()); + mHandler.post(this::stop); + break; + } + } + + try { + mDumper.stopDumper(); + } catch (IOException e) { e.printStackTrace(); } } @@ -941,13 +990,16 @@ public class CaptureService extends VpnService implements Runnable { /* Exports a PCAP data chunk */ public void dumpPcapData(byte[] data) { - if(mDumper != null) { - try { - mDumper.dumpData(data); - } catch (IOException e) { - e.printStackTrace(); - reportError(e.getLocalizedMessage()); - stopPacketLoop(); + if((mDumper != null) && (data.length > 0)) { + while(true) { + try { + // wait until the queue has space to insert the data. If the queue is full, we + // will experience slow-downs/drops but this is expected + mDumpQueue.put(data); + break; + } catch (InterruptedException e) { + // retry + } } } } diff --git a/app/src/main/jni/core/capture_root.c b/app/src/main/jni/core/capture_root.c index 856dc717..6181bf43 100644 --- a/app/src/main/jni/core/capture_root.c +++ b/app/src/main/jni/core/capture_root.c @@ -528,12 +528,12 @@ int run_root(pcapdroid_t *pd) { pd_refresh_time(pd); - if(!running) - break; - if(!FD_ISSET(sock, &fdset)) goto housekeeping; + if(!running) + break; + ssize_t xrv = xread(sock, &hdr, sizeof(hdr)); if(xrv != sizeof(hdr)) { if(xrv < 0) diff --git a/app/src/main/jni/core/jni_impl.c b/app/src/main/jni/core/jni_impl.c index c879d651..e216649e 100644 --- a/app/src/main/jni/core/jni_impl.c +++ b/app/src/main/jni/core/jni_impl.c @@ -97,7 +97,7 @@ static void sendStatsDump(pcapdroid_t *pd) { static void sendPcapDump(pcapdroid_t *pd) { JNIEnv *env = pd->env; - log_d("Exporting a %d B PCAP buffer", pd->pcap_dump.buffer_idx); + //log_d("Exporting a %d B PCAP buffer", pd->pcap_dump.buffer_idx); jbyteArray barray = (*env)->NewByteArray(env, pd->pcap_dump.buffer_idx); if(jniCheckException(env)) diff --git a/app/src/main/jni/pcapd/pcapd.c b/app/src/main/jni/pcapd/pcapd.c index 3172cf23..e6ae976c 100644 --- a/app/src/main/jni/pcapd/pcapd.c +++ b/app/src/main/jni/pcapd/pcapd.c @@ -915,7 +915,9 @@ cleanup: if(rt.tun) zdtun_finalize(rt.tun); - log_i("Pkts: %u rcvd, %u drops, %u iface_drops", rt.stats.ps_recv, rt.stats.ps_drop, rt.stats.ps_ifdrop); + log_i("Pkts: %u rcvd, %u drops (%.1f%%), %u iface_drops", rt.stats.ps_recv, rt.stats.ps_drop, + rt.stats.ps_drop * 100.f / (rt.stats.ps_recv + rt.stats.ps_drop + 1), + rt.stats.ps_ifdrop); return rv; }