Improve capture performance with PCAP dump

PCAP dump is now performed into a separate thread. This greatly reduces
the chance for packet loss in root mode as well as preventing latency
spikes in VPN mode.
This commit is contained in:
emanuele-f 2022-01-28 18:00:40 +01:00
parent a2d795152f
commit 672e810bec
4 changed files with 97 additions and 43 deletions

View File

@ -99,7 +99,9 @@ public class CaptureService extends VpnService implements Runnable {
private Thread mCaptureThread; private Thread mCaptureThread;
private Thread mBlacklistsUpdateThread; private Thread mBlacklistsUpdateThread;
private Thread mConnUpdateThread; private Thread mConnUpdateThread;
private Thread mDumperThread;
private final LinkedBlockingDeque<Pair<ConnectionDescriptor[], ConnectionUpdate[]>> mPendingUpdates = new LinkedBlockingDeque<>(32); private final LinkedBlockingDeque<Pair<ConnectionDescriptor[], ConnectionUpdate[]>> mPendingUpdates = new LinkedBlockingDeque<>(32);
private LinkedBlockingDeque<byte[]> mDumpQueue;
private String vpn_ipv4; private String vpn_ipv4;
private String vpn_dns; private String vpn_dns;
private String dns_server; private String dns_server;
@ -273,6 +275,9 @@ public class CaptureService extends VpnService implements Runnable {
} }
if(mDumper != null) { if(mDumper != null) {
// Max memory usage = (JAVA_PCAP_BUFFER_SIZE * 64) = 32 MB
mDumpQueue = new LinkedBlockingDeque<>(64);
try { try {
mDumper.startDumper(); mDumper.startDumper();
} catch (IOException | SecurityException e) { } catch (IOException | SecurityException e) {
@ -358,6 +363,11 @@ public class CaptureService extends VpnService implements Runnable {
mConnUpdateThread = new Thread(this::connUpdateWork, "UpdateListener"); mConnUpdateThread = new Thread(this::connUpdateWork, "UpdateListener");
mConnUpdateThread.start(); mConnUpdateThread.start();
if(mDumper != null) {
mDumperThread = new Thread(this::dumpWork, "DumperThread");
mDumperThread.start();
}
// Start the native capture thread // Start the native capture thread
mQueueFull = false; mQueueFull = false;
mCaptureThread = new Thread(this, "PacketCapture"); mCaptureThread = new Thread(this, "PacketCapture");
@ -395,6 +405,7 @@ public class CaptureService extends VpnService implements Runnable {
e.printStackTrace(); e.printStackTrace();
} }
mDumper = null; mDumper = null;
mDumpQueue.clear();
} }
appsResolver = null; appsResolver = null;
@ -571,7 +582,10 @@ public class CaptureService extends VpnService implements Runnable {
private void stop() { private void stop() {
stopPacketLoop(); stopPacketLoop();
mPendingUpdates.offer(new Pair<>(null, null)); // signal termination to the mConnUpdateThread
// signal termination
mPendingUpdates.offer(new Pair<>(null, null));
mDumpQueue.offer(new byte[0]);
while((mCaptureThread != null) && (mCaptureThread.isAlive())) { while((mCaptureThread != null) && (mCaptureThread.isAlive())) {
try { try {
@ -594,6 +608,18 @@ public class CaptureService extends VpnService implements Runnable {
} }
mConnUpdateThread = null; mConnUpdateThread = null;
while((mDumperThread != null) && (mDumperThread.isAlive())) {
try {
Log.d(TAG, "Joining dumper thread...");
mDumperThread.join();
} catch (InterruptedException e) {
Log.e(TAG, "Joining dumper thread failed");
mDumpQueue.offer(new byte[0]);
}
}
mDumperThread = null;
mDumper = null;
if(mParcelFileDescriptor != null) { if(mParcelFileDescriptor != null) {
try { try {
mParcelFileDescriptor.close(); mParcelFileDescriptor.close();
@ -603,15 +629,6 @@ public class CaptureService extends VpnService implements Runnable {
mParcelFileDescriptor = null; mParcelFileDescriptor = null;
} }
if(mDumper != null) {
try {
mDumper.stopDumper();
} catch (IOException e) {
e.printStackTrace();
}
mDumper = null;
}
mPcapUri = null; mPcapUri = null;
mPendingUpdates.clear(); mPendingUpdates.clear();
unregisterNetworkCallbacks(); unregisterNetworkCallbacks();
@ -780,28 +797,60 @@ public class CaptureService extends VpnService implements Runnable {
} }
private void connUpdateWork() { private void connUpdateWork() {
try { while(true) {
while(true) { Pair<ConnectionDescriptor[], ConnectionUpdate[]> item;
Pair<ConnectionDescriptor[], ConnectionUpdate[]> item = mPendingUpdates.take(); try {
if(item.first == null) // termination request item = mPendingUpdates.take();
break; } catch (InterruptedException e) {
continue;
ConnectionDescriptor[] new_conns = item.first;
ConnectionUpdate[] conns_updates = item.second;
checkBlacklistsUpdates();
// synchronize the conn_reg to ensure that newConnections and connectionsUpdates run atomically
// thus preventing the ConnectionsAdapter from interleaving other operations
synchronized (conn_reg) {
if(new_conns.length > 0)
conn_reg.newConnections(new_conns);
if(conns_updates.length > 0)
conn_reg.connectionsUpdates(conns_updates);
}
} }
} catch (InterruptedException e) {
if(item.first == null) // termination request
break;
ConnectionDescriptor[] new_conns = item.first;
ConnectionUpdate[] conns_updates = item.second;
checkBlacklistsUpdates();
// synchronize the conn_reg to ensure that newConnections and connectionsUpdates run atomically
// thus preventing the ConnectionsAdapter from interleaving other operations
synchronized (conn_reg) {
if(new_conns.length > 0)
conn_reg.newConnections(new_conns);
if(conns_updates.length > 0)
conn_reg.connectionsUpdates(conns_updates);
}
}
}
private void dumpWork() {
while(true) {
byte[] data;
try {
data = mDumpQueue.take();
} catch (InterruptedException e) {
continue;
}
if(data.length == 0) // termination request
break;
try {
mDumper.dumpData(data);
} catch (IOException e) {
// Stop the capture
e.printStackTrace();
reportError(e.getLocalizedMessage());
mHandler.post(this::stop);
break;
}
}
try {
mDumper.stopDumper();
} catch (IOException e) {
e.printStackTrace(); e.printStackTrace();
} }
} }
@ -941,13 +990,16 @@ public class CaptureService extends VpnService implements Runnable {
/* Exports a PCAP data chunk */ /* Exports a PCAP data chunk */
public void dumpPcapData(byte[] data) { public void dumpPcapData(byte[] data) {
if(mDumper != null) { if((mDumper != null) && (data.length > 0)) {
try { while(true) {
mDumper.dumpData(data); try {
} catch (IOException e) { // wait until the queue has space to insert the data. If the queue is full, we
e.printStackTrace(); // will experience slow-downs/drops but this is expected
reportError(e.getLocalizedMessage()); mDumpQueue.put(data);
stopPacketLoop(); break;
} catch (InterruptedException e) {
// retry
}
} }
} }
} }

View File

@ -528,12 +528,12 @@ int run_root(pcapdroid_t *pd) {
pd_refresh_time(pd); pd_refresh_time(pd);
if(!running)
break;
if(!FD_ISSET(sock, &fdset)) if(!FD_ISSET(sock, &fdset))
goto housekeeping; goto housekeeping;
if(!running)
break;
ssize_t xrv = xread(sock, &hdr, sizeof(hdr)); ssize_t xrv = xread(sock, &hdr, sizeof(hdr));
if(xrv != sizeof(hdr)) { if(xrv != sizeof(hdr)) {
if(xrv < 0) if(xrv < 0)

View File

@ -97,7 +97,7 @@ static void sendStatsDump(pcapdroid_t *pd) {
static void sendPcapDump(pcapdroid_t *pd) { static void sendPcapDump(pcapdroid_t *pd) {
JNIEnv *env = pd->env; JNIEnv *env = pd->env;
log_d("Exporting a %d B PCAP buffer", pd->pcap_dump.buffer_idx); //log_d("Exporting a %d B PCAP buffer", pd->pcap_dump.buffer_idx);
jbyteArray barray = (*env)->NewByteArray(env, pd->pcap_dump.buffer_idx); jbyteArray barray = (*env)->NewByteArray(env, pd->pcap_dump.buffer_idx);
if(jniCheckException(env)) if(jniCheckException(env))

View File

@ -915,7 +915,9 @@ cleanup:
if(rt.tun) if(rt.tun)
zdtun_finalize(rt.tun); zdtun_finalize(rt.tun);
log_i("Pkts: %u rcvd, %u drops, %u iface_drops", rt.stats.ps_recv, rt.stats.ps_drop, rt.stats.ps_ifdrop); log_i("Pkts: %u rcvd, %u drops (%.1f%%), %u iface_drops", rt.stats.ps_recv, rt.stats.ps_drop,
rt.stats.ps_drop * 100.f / (rt.stats.ps_recv + rt.stats.ps_drop + 1),
rt.stats.ps_ifdrop);
return rv; return rv;
} }