Feat/long link#141
Conversation
一、总览(总)本分支核心目标:把 tRPC/HTTP 客户端从"伪长连接 + 粗暴空闲扫描关闭"改造成真正可靠的长连接。 旧方案本质问题:框架基于 pull 模型维护服务端 IP, 新方案按分层、分协议重构:Netty(tRPC)连接随 二、旧方案存在的问题(分·问题)⭐ 1. 会把请求发到"已断开 / 正在关闭"的链接,导致报错(最关键)原方案
2. 空闲即关闭,长连接形同虚设服务端/客户端 Handler 的 3. 半死连接(half-dead)无法探测"持续写 + 静默丢包"场景,客户端一直能写进内核缓冲区, 4. 其它并发竞态
5. epoll 与共享 IO 线程组互斥旧代码只有一个共享 6. HTTP 连接池几乎裸奔
7. 默认值不合理
三、当前解决方案(分·方案)A. tRPC 协议(Netty 真长连接)⭐ 1. 消除"发到断链"竞态(对应问题 1)
2. 移除粗暴关闭,连接常驻
3. 客户端 READ idle 关闭 + 懒重连
4. TCP keepalive 调优 + epoll 解耦
5. 惊群防护
B. 集群侧(统一缓存治理 + 非 Netty 兜底)
C. HTTP / HTTP2 协议
D. 默认值与配置
四、总结(总)主线一句话:从"pull 模型 + 全局粗暴空闲关闭"升级为"真长连接 + 分协议分层治理 + 懒重连 + 竞态收窄"。
最终效果:正常流量下连接稳定复用不再被无谓拆除;"正在关闭/对端已关"的竞态窗口里不再误发请求;异常(半死/RST/IP 漂移/黑洞)能在有界时间内被探测并自愈。配套约 4000+ 行测试覆盖上述竞态与恢复场景。 五、核心代码块清单⭐ 消除"发到断链"竞态1. invalidate-before-close(客户端 idle:先失效 slot 再异步关) private final class IdleCloseHandler extends ChannelDuplexHandler {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
io.netty.channel.Channel ioChannel = ctx.channel();
...
// 先失效 transport 里的 slot,让请求线程立刻看到“需要重连”
com.tencent.trpc.core.transport.Channel wrapper =
NettyChannelManager.getOrAddChannel(ioChannel, config);
if (wrapper != null) {
invalidateChannel(wrapper);
}
// 再异步关闭
ctx.close().addListener(future -> logger.info("[long-link][idle-close] ...", ...));
return;
}
super.userEventTriggered(ctx, evt);
}
}2. @Override
public void invalidateChannel(Channel target) {
...
for (int i = 0; i < channels.size(); i++) {
ChannelFutureItem item = channels.get(i);
...
if (ch != target) { continue; }
connLock.lock();
try {
ChannelFutureItem latest = channels.get(i);
if (latest != item) { return; } // 不覆盖别的线程刚刷新的 slot
channels.set(i, new ChannelFutureItem(null, config)); // 置空 → 下次必重连
item.close();
} finally { connLock.unlock(); }
return;
}
}3. /**
* Channel pool. Backed by {@link CopyOnWriteArrayList} so that the slot publication ...
* is visible to concurrent readers in {@link #getChannel0} with volatile semantics,
* eliminating the data race that an {@link java.util.ArrayList} would have.
*/
protected List<ChannelFutureItem> channels;4. 懒重连 + 惊群防护(锁内双检 + 状态机) protected void ensureChannelActive(int chIndex) {
if (!needsReconnect(channels.get(chIndex))) { return; }
connLock.lock();
try {
ChannelFutureItem latest = channels.get(chIndex);
if (!needsReconnect(latest)) { return; } // 双检防惊群
channels.set(chIndex, new ChannelFutureItem(createChannel().toCompletableFuture(), config));
latest.close();
} finally { connLock.unlock(); }
}
private static boolean needsReconnect(ChannelFutureItem item) {
if (item.isNotYetConnect()) { return true; }
return !item.isAvailable() && !item.isConnecting();
}tRPC 长连接其它核心5. READ-idle handler 安装 + TCP keepalive 调优 if (idleTimeoutMills > 0) {
p.addBefore("handler", "idleState",
new IdleStateHandler(idleTimeoutMills, 0L, 0L, TimeUnit.MILLISECONDS));
p.addBefore("handler", "idleClose", new IdleCloseHandler());
} private void applyTcpKeepAliveTuning(Bootstrap bootstrap) {
Integer idle = config.getTcpKeepAliveIdle();
Integer intvl = config.getTcpKeepAliveIntvl();
Integer cnt = config.getTcpKeepAliveCnt();
if (idle != null && idle > 0) { bootstrap.option(EpollChannelOption.TCP_KEEPIDLE, idle); }
if (intvl != null && intvl > 0){ bootstrap.option(EpollChannelOption.TCP_KEEPINTVL, intvl); }
if (cnt != null && cnt > 0) { bootstrap.option(EpollChannelOption.TCP_KEEPCNT, cnt); }
}6. 服务端不再装 IdleStateHandler( 7. NIO/EPOLL 双共享组(引用计数幂等) private static void acquireSharedGroup(SharedGroupKind kind, int ioThreads, String threadPoolName) { ... }
private static void releaseSharedGroup(SharedGroupKind kind) {
// 计数减到 0 才 shutdownGracefully 并置 null;负数纠正为 0 防重复 shutdown
}集群侧治理8. 非 Netty 空闲关闭:三重防竞态 private static void closeIfIdleResponseTimedOut(BackendConfig bConfig, String key, RpcClientProxy proxy) {
...
if (transporter == null || Constants.TRANSPORTER_NETTY.equalsIgnoreCase(transporter)) { return; }
if (proxy.inFlight.get() > 0) { return; } // ① 在途跳过
if (System.currentTimeMillis() - proxy.lastResponseTimeMs.get() < idleTimeoutMs) { return; }
if (!proxy.closing.compareAndSet(false, true)) { return; } // ② 单次 CAS 抢占
long idleMsAfterCas = System.currentTimeMillis() - proxy.lastResponseTimeMs.get();
if (idleMsAfterCas < idleTimeoutMs || proxy.inFlight.get() > 0) {// ③ CAS 后复查
proxy.closing.set(false);
return;
}
proxy.close();
}9. in-flight 计数 + 仅成功响应刷新活跃时间 public CompletionStage<Response> invoke(Request request) {
rpcClient.inFlight.incrementAndGet();
CompletionStage<Response> stage;
try { stage = delegate.invoke(request); }
catch (Throwable ex) { rpcClient.inFlight.decrementAndGet(); throw ex; }
return stage.whenComplete((response, throwable) -> {
try {
if (throwable == null && response != null) { rpcClient.markResponseReceived(); }
} finally { rpcClient.inFlight.decrementAndGet(); }
});
}10. CAS 摘除缓存(防误删新 proxy, if (value != null && !value.isAvailable()) {
invokerCache.remove(key, value); // CAS 删除
}
...
invokerCache.put(key, created);
rpcClient.closeFuture().whenComplete((r, e) -> {
boolean removed = invokerCache.remove(key, created); // CAS 删除
});HTTP / HTTP211. HTTP/1.1 连接池保活/驱逐治理 protected void doOpen() {
PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager();
int maxConns = protocolConfig.getMaxConns();
cm.setMaxTotal(maxConns);
cm.setDefaultMaxPerRoute(maxConns);
cm.setValidateAfterInactivity(VALIDATE_AFTER_INACTIVITY_MS); // 2s
cm.setDefaultSocketConfig(SocketConfig.custom().setSoKeepAlive(true).build());
httpClient = HttpClients.custom().setConnectionManager(cm)
.evictExpiredConnections()
.evictIdleConnections(EVICT_IDLE_CONNECTIONS_SECONDS, TimeUnit.SECONDS) // 60s
.setKeepAliveStrategy(HttpRpcClient::resolveKeepAliveDuration) // 封顶 5min
.setConnectionTimeToLive(CONNECTION_TTL_MINUTES, TimeUnit.MINUTES) // 10min
.build();
}12. HTTP 健康信号驱动集群驱逐 public boolean isAvailable() {
if (!super.isAvailable()) { return false; }
if (consecutiveFailures.get() >= FAILURE_UNAVAILABLE_THRESHOLD) { return false; } // 连续失败 50
return (System.nanoTime() - lastUsedNanos) <= IDLE_UNAVAILABLE_THRESHOLD_NANOS; // 空闲 10min
}默认值13. DEFAULT_MAX_CONNECTIONS: "20480" → "200"
DEFAULT_CONNECTIONS_PERADDR: "2" → "4" // 服务端入向连接翻倍,需检查 fd ulimit / LB 限制
DEFAULT_TCP_KEEPALIVE_IDLE: 新增 "30"
DEFAULT_TCP_KEEPALIVE_INTVL: 新增 "10"
DEFAULT_TCP_KEEPALIVE_CNT: 新增 "3" // 30 + 10×3 ≈ 60s 半死探测窗口 |
No description provided.