代理优化,解决背压下载大文件中途断开的问题

This commit is contained in:
aoshiguchen 2023-09-23 23:21:13 +08:00
parent 840dd2d344
commit 268e17273e
10 changed files with 92 additions and 21 deletions

View File

@ -49,6 +49,17 @@ public class RealServerChannelHandler extends SimpleChannelInboundHandler<ByteBu
// 代理客户端连接断开
ctx.channel().close();
} else {
if (proxyChannel.isWritable()) {
if (!realServerChannel.config().isAutoRead()) {
realServerChannel.config().setAutoRead(true);
}
} else {
if (realServerChannel.config().isAutoRead()) {
realServerChannel.config().setAutoRead(false);
}
}
byte[] bytes = new byte[buf.readableBytes()];
buf.readBytes(bytes);
String visitorId = ProxyUtil.getVisitorIdByRealServerChannel(realServerChannel);

View File

@ -63,16 +63,18 @@ public class TcpProxyChannelHandler extends SimpleChannelInboundHandler<ProxyMes
IdleStateEvent event = (IdleStateEvent)evt;
switch (event.state()) {
case READER_IDLE:
// 读超时断开连接
log.info("[TCP Proxy Channel]Read timeout");
ctx.channel().close();
if (ctx.channel().isWritable()) {
// 读超时断开连接
log.info("[TCP Proxy Channel]Read timeout");
ctx.channel().close();
}
break;
case WRITER_IDLE:
ctx.channel().writeAndFlush(ProxyMessage.buildHeartbeatMessage());
break;
case ALL_IDLE:
log.debug("[TCP Proxy Channel]ReadWrite timeout");
ctx.close();
// log.debug("[TCP Proxy Channel]ReadWrite timeout");
// ctx.close();
break;
}
}

View File

@ -23,6 +23,10 @@ public class ProxyMessageTransferHandler implements ProxyMessageHandler {
public void handle(ChannelHandlerContext ctx, ProxyMessage proxyMessage) {
Channel realServerChannel = ctx.channel().attr(Constants.NEXT_CHANNEL).get();
if (realServerChannel != null) {
// 自己可写则设置来源可读自己不可写则设置来源不可读
realServerChannel.config().setAutoRead(ctx.channel().isWritable());
ByteBuf buf = ctx.alloc().buffer(proxyMessage.getData().length);
buf.writeBytes(proxyMessage.getData());
realServerChannel.writeAndFlush(buf);

View File

@ -9,9 +9,9 @@ neutrino:
length-field-length: 4
initial-bytes-to-strip: 0
length-adjustment: 0
read-idle-time: 8
write-idle-time: 3
all-idle-time-seconds: 8
read-idle-time: 120
write-idle-time: 20
all-idle-time-seconds: 0
logger:
# 日志级别
level: ${LOG_LEVEL:info}

View File

@ -70,10 +70,10 @@ public class ProxyTunnelChannelHandler extends SimpleChannelInboundHandler<Proxy
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
Channel userChannel = ctx.channel().attr(Constants.NEXT_CHANNEL).get();
if (userChannel != null) {
userChannel.config().setOption(ChannelOption.AUTO_READ, ctx.channel().isWritable());
}
// Channel userChannel = ctx.channel().attr(Constants.NEXT_CHANNEL).get();
// if (userChannel != null) {
// userChannel.config().setOption(ChannelOption.AUTO_READ, ctx.channel().isWritable());
// }
super.channelWritabilityChanged(ctx);
}
@ -127,9 +127,10 @@ public class ProxyTunnelChannelHandler extends SimpleChannelInboundHandler<Proxy
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// super.exceptionCaught(ctx, cause);
if (ctx.channel().isActive()) {
ctx.channel().close();
}
// if (ctx.channel().isActive()) {
// ctx.channel().close();
// }
log.error("[Tunnel Channel] error", cause);
}
@Override
@ -138,9 +139,11 @@ public class ProxyTunnelChannelHandler extends SimpleChannelInboundHandler<Proxy
IdleStateEvent event = (IdleStateEvent)evt;
switch (event.state()) {
case READER_IDLE:
// 读超时断开连接
log.debug("Read timeout");
ctx.channel().close();
if (ctx.channel().isWritable()) {
// 读超时断开连接
log.warn("[Tunnel Channel]Read timeout");
ctx.channel().close();
}
break;
case WRITER_IDLE:
ctx.channel().writeAndFlush(ProxyMessage.buildHeartbeatMessage());

View File

@ -43,6 +43,10 @@ public class TcpVisitorChannelHandler extends SimpleChannelInboundHandler<ByteBu
ctx.channel().close();
return;
}
// 代理通道可写则设置访问通道可读代理通道不可写则设置访问通道不可读
visitorChannel.config().setAutoRead(proxyChannel.isWritable());
// 转发代理数据
byte[] bytes = new byte[buf.readableBytes()];
buf.readBytes(bytes);
@ -129,7 +133,8 @@ public class TcpVisitorChannelHandler extends SimpleChannelInboundHandler<ByteBu
if (null == cmdChannel) {
// 该端口还没有代理客户端
ctx.channel().close();
} else {
}
else {
Channel proxyChannel = visitorChannel.attr(Constants.NEXT_CHANNEL).get();
if (null != proxyChannel) {
proxyChannel.config().setOption(ChannelOption.AUTO_READ, visitorChannel.isWritable());

View File

@ -127,4 +127,26 @@ public class UdpVisitorChannelHandler extends SimpleChannelInboundHandler<Datagr
ctx.close();
log.error("[UDP Visitor Channel]VisitorChannel error", cause);
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
// 通知代理客户端
Channel visitorChannel = ctx.channel();
InetSocketAddress sa = (InetSocketAddress) visitorChannel.localAddress();
Channel cmdChannel = ProxyUtil.getCmdChannelByServerPort(sa.getPort());
if (null == cmdChannel) {
// 该端口还没有代理客户端
ctx.channel().close();
}
else {
Channel proxyChannel = visitorChannel.attr(Constants.NEXT_CHANNEL).get();
if (null != proxyChannel) {
proxyChannel.config().setOption(ChannelOption.AUTO_READ, visitorChannel.isWritable());
}
}
super.channelWritabilityChanged(ctx);
}
}

View File

@ -27,6 +27,17 @@ public class ProxyMessageTransferHandler implements ProxyMessageHandler {
public void handle(ChannelHandlerContext ctx, ProxyMessage proxyMessage) {
Channel visitorChannel = ctx.channel().attr(Constants.NEXT_CHANNEL).get();
if (null != visitorChannel) {
if (!visitorChannel.isWritable()) {
//自己不可写通道可以读让通道关闭读
//自己可写通道不可以读让通道打开读
if (ctx.channel().config().isAutoRead()) {
ctx.channel().config().setAutoRead(false);
}
} else {
if (ctx.channel().config().isAutoRead()) {
ctx.channel().config().setAutoRead(true);
}
}
ByteBuf buf = ctx.alloc().buffer(proxyMessage.getData().length);
buf.writeBytes(proxyMessage.getData());
visitorChannel.writeAndFlush(buf);

View File

@ -35,6 +35,19 @@ public class UdpProxyMessageTransferHandler implements ProxyMessageHandler {
Channel visitorChannel = ctx.channel().attr(Constants.NEXT_CHANNEL).get();
if (null != visitorChannel) {
if (!visitorChannel.isWritable()) {
//自己不可写通道可以读让通道关闭读
//自己可写通道不可以读让通道打开读
if (ctx.channel().config().isAutoRead()) {
ctx.channel().config().setAutoRead(false);
}
} else {
if (ctx.channel().config().isAutoRead()) {
ctx.channel().config().setAutoRead(true);
}
}
// InetSocketAddress address = new InetSocketAddress(udpBaseInfo.getVisitorIp(), udpBaseInfo.getVisitorPort());
// ByteBuf byteBuf = Unpooled.copiedBuffer(proxyMessage.getData());
// visitorChannel.writeAndFlush(new DatagramPacket(byteBuf, address));

View File

@ -11,8 +11,8 @@ neutrino:
length-field-length: 4
initial-bytes-to-strip: 0
length-adjustment: 0
read-idle-time: 30
write-idle-time: 5
read-idle-time: 120
write-idle-time: 20
all-idle-time-seconds: 0
logger:
# 日志级别