Skip to content

Feat/long link#141

Open
wardseptember wants to merge 12 commits into
trpc-group:masterfrom
wardseptember:feat/long_link
Open

Feat/long link#141
wardseptember wants to merge 12 commits into
trpc-group:masterfrom
wardseptember:feat/long_link

Conversation

@wardseptember

Copy link
Copy Markdown
Collaborator

No description provided.

@codecov

codecov Bot commented May 16, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 90.40767% with 40 lines in your changes missing coverage. Please review.
✅ Project coverage is 86.34286%. Comparing base (063c0d2) to head (8f706bc).

Files with missing lines Patch % Lines
.../trpc/transport/netty/NettyTcpClientTransport.java 85.48387% 9 Missing ⚠️
...t/trpc/core/transport/AbstractClientTransport.java 83.78378% 6 Missing ⚠️
...ent/trpc/core/cluster/RpcClusterClientManager.java 94.38202% 5 Missing ⚠️
...ncent/trpc/core/cluster/def/DefClusterInvoker.java 55.55556% 4 Missing ⚠️
...t/trpc/proto/http/client/Http2ConsumerInvoker.java 73.33333% 4 Missing ⚠️
...nt/trpc/proto/http/client/HttpConsumerInvoker.java 71.42857% 4 Missing ⚠️
.../transport/netty/NettyAbstractClientTransport.java 93.10345% 4 Missing ⚠️
.../trpc/transport/netty/NettyUdpClientTransport.java 66.66667% 3 Missing ⚠️
...m/tencent/trpc/core/transport/ClientTransport.java 0.00000% 1 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@                  Coverage Diff                  @@
##                master        #141         +/-   ##
=====================================================
+ Coverage     85.87630%   86.34286%   +0.46656%     
- Complexity        4328        4398         +70     
=====================================================
  Files              436         437          +1     
  Lines            14373       14659        +286     
  Branches          1287        1338         +51     
=====================================================
+ Hits             12343       12657        +314     
+ Misses            2030        2002         -28     
Files with missing lines Coverage Δ
...n/java/com/tencent/trpc/core/common/Constants.java 66.66667% <ø> (ø)
...tencent/trpc/core/common/config/BackendConfig.java 97.17868% <100.00000%> (+0.02678%) ⬆️
...nt/trpc/core/common/config/BaseProtocolConfig.java 100.00000% <100.00000%> (ø)
...tencent/trpc/proto/http/client/Http2RpcClient.java 100.00000% <100.00000%> (+9.09090%) ⬆️
...encent/trpc/proto/http/client/Http2cRpcClient.java 100.00000% <100.00000%> (+18.75000%) ⬆️
.../tencent/trpc/proto/http/client/HttpRpcClient.java 100.00000% <100.00000%> (+15.78946%) ⬆️
...t/configuration/schema/AbstractProtocolSchema.java 100.00000% <100.00000%> (ø)
...ncent/trpc/transport/netty/NettyClientHandler.java 100.00000% <ø> (ø)
...ncent/trpc/transport/netty/NettyServerHandler.java 100.00000% <ø> (ø)
.../trpc/transport/netty/NettyTcpServerTransport.java 85.18519% <100.00000%> (+0.36375%) ⬆️
... and 9 more

... and 6 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@wardseptember

wardseptember commented May 16, 2026

Copy link
Copy Markdown
Collaborator Author

一、总览(总)

本分支核心目标:把 tRPC/HTTP 客户端从"伪长连接 + 粗暴空闲扫描关闭"改造成真正可靠的长连接

旧方案本质问题:框架基于 pull 模型维护服务端 IP,RpcClusterClientManager 用全局空闲扫描器不分协议lastUsedNanos 粗暴关闭客户端;Netty 客户端/服务端 Handler 又各自在 IdleStateEvent 里直接 channel.close()。这套机制导致连接被无谓销毁、半死连接探测不到、请求被发到已断开/正在关闭的链接而报错,以及大量并发竞态。

新方案按分层、分协议重构:Netty(tRPC)连接随 BackendConfig 常驻、空闲治理下沉传输层、靠"懒重连 + closeFuture 回调"恢复;非 Netty(HTTP)靠连接池保活 + 30s 集群扫描兜底;并系统性修复并发竞态、收敛默认参数。


二、旧方案存在的问题(分·问题)

⭐ 1. 会把请求发到"已断开 / 正在关闭"的链接,导致报错(最关键)

原方案 ensureChannelActive 在取 channel 前重建检查,稳态下不会一直打死链;但在以下三类竞态/缺陷下会把请求打到断链上:

  • (a) 客户端 idle 异步 close,且关闭前不失效 slotNettyClientHandler 收到 idle 事件直接 channel.close()(异步派发到 EventLoop)。从"决定关闭"到"channel.isConnected() 翻转为 false"之间有时间窗;窗口内请求线程 isAvailable() 仍返回 true → 判定无需重建 → 把请求发到正在关闭的 channel。
  • (b) channelsArrayList,存在内存可见性 data racegetChannel0 无锁读、ensureChannelActive 锁内写,缺乏 happens-before,读线程可能读到过期 item,发到已被替换/已断开的旧 channel。
  • (c) "检查可用 → send" 非原子 + 死链探测慢:检查那刻还连着,真正 send 时可能已被 RST/FIN 断开;原方案无 TCP keepalive 快速探测、且服务端还会主动 idle(240s)关连接发 FIN,断连感知延迟大、窗口更宽(典型 connection reset/IOException)。

2. 空闲即关闭,长连接形同虚设

服务端/客户端 Handler 的 userEventTriggered 直接 close;RpcClusterClientManager 还按 lastUsedNanos 关"长时间未使用"的客户端——对 Netty 客户端尤其致命(牵连共享 EventLoopGroup、打断在途请求)。

3. 半死连接(half-dead)无法探测

"持续写 + 静默丢包"场景,客户端一直能写进内核缓冲区,ALL_IDLE/WRITE_IDLE 永不触发,完全感知不到对端已死。

4. 其它并发竞态

  • 断连后一波请求各自重建 slot → 惊群 connect/disconnect 风暴和大量 TIME_WAIT
  • 空闲扫描器关闭客户端时若有在途请求 → 在途请求报伪 IO 失败
  • DefClusterInvoker 用非 CAS 的 invokerCache.remove(key) → 可能误删别的线程刚装好的新 proxy。

5. epoll 与共享 IO 线程组互斥

旧代码只有一个共享 NioEventLoopGroup,开 epoll 就被迫关 ioThreadGroupShare

6. HTTP 连接池几乎裸奔

maxConns 没生效(退化到 25/5)、无 validateAfterInactivity(stale/NoHttpResponseException)、无空闲驱逐(fd 泄漏)、服务端不返回 Keep-Alive 头按无限保活被 NAT/LB 丢弃、无连接 TTL(IP 漂移不恢复)、黑洞场景无兜底。

7. 默认值不合理

maxConnections=20480(过大)、connsPerAddr=2


三、当前解决方案(分·方案)

A. tRPC 协议(Netty 真长连接)

⭐ 1. 消除"发到断链"竞态(对应问题 1)

  • invalidate-before-close:客户端 idle 触发时invalidateChannel 把 slot 置空(下次必然重连),再异步 ctx.close(),使请求线程立刻看到"需要重连",消除 (a) 窗口。
  • channelsArrayListCopyOnWriteArrayList,提供 volatile 可见性,修复 (b) race。
  • READ-idle + Linux epoll TCP keepalive(~60s 探测) + 服务端不再主动 idle 关连接,大幅缩短 (c) 的断连感知延迟和窗口。

2. 移除粗暴关闭,连接常驻

  • 删除 NettyServerHandler/NettyClientHandler 中 idle 直接 close 的逻辑;服务端 NettyTcpServerTransport 不再装 IdleStateHandleridleTimeout 仅保留兼容)。
  • RpcClusterClientManager 对 Netty 客户端不再按空闲关闭,随 BackendConfig 生命周期常驻。

3. 客户端 READ idle 关闭 + 懒重连

  • NettyTcpClientTransportREAD idleIdleStateHandler(默认 180000ms):只有"长时间收不到回包"才触发,正对应半死连接。
  • 打印 [long-link][idle-fire]/[idle-close] 运维日志。

4. TCP keepalive 调优 + epoll 解耦

  • 新增 tcpKeepAliveIdle/Intvl/Cnt(Dubbo 风格 30/10/3),~60s 内被内核 RST。
  • 维护 NIO + EPOLL 两个带引用计数的共享组,开 epoll 不再被迫关共享;构造期急切获取引用避免 TOCTOU,acquire/release 幂等。

5. 惊群防护

  • ensureChannelActiveconnLock双重检查 + needsReconnect 状态机(notYetConnect/connecting/available),杜绝重连风暴。

B. 集群侧(统一缓存治理 + 非 Netty 兜底)

  • 30s 空闲扫描器只处理非 Netty 客户端:超 idleTimeout 无成功响应才关;用 in-flight 计数 + 单次 CAS 抢占 + CAS 后复查时间戳 三重手段把竞态窗口收到最窄,避免误杀在途请求。
  • 缓存清理统一改 CAS 删除remove(key, value)),closeFuture 回调摘除后下次请求懒重建,避免误删新 proxy。

C. HTTP / HTTP2 协议

  • 连接池治理:maxConns 生效、validateAfterInactivity=2sevictExpired+evictIdle(60s)keepAliveStrategy 封顶 5min、connectionTimeToLive=10minSO_KEEPALIVE
  • 健康信号:HttpConsumerInvoker 上报 markUsed/Success/FailureisAvailable() 在空闲 >10min 或连续失败 ≥50 次时报不可用,供集群扫描驱逐。

D. 默认值与配置

  • maxConnections 20480 → 200connsPerAddr 2 → 4(注意服务端入向连接翻倍)、新增 keepalive 默认值;BaseProtocolConfig 与 Spring AbstractProtocolSchema 打通 tcp_keep_alive_* 配置。

四、总结(总)

主线一句话:从"pull 模型 + 全局粗暴空闲关闭"升级为"真长连接 + 分协议分层治理 + 懒重连 + 竞态收窄"

维度 旧方案问题 新方案
发到断链报错 idle 异步 close 不失效 slot、ArrayList 可见性 race、死链探测慢+服务端主动关 invalidate-before-closeCopyOnWriteArrayList、READ-idle+keepalive、服务端不主动关
空闲治理 客户端/服务端/集群三处粗暴 close 服务端不主动关;Netty READ-idle+懒重连;HTTP 池+集群扫描兜底
半死连接 持续写探测不到 READ-idle + epoll TCP keepalive(~60s)
并发竞态 惊群重连、在途请求误杀、非 CAS 误删缓存 锁内双检、in-flight+CAS 双复查、CAS 删除
HTTP 池 25/5 上限、stale 连接、fd 泄漏、IP 漂移不恢复 maxConns 生效、validate/evict、keepalive 封顶、TTL、SO_KEEPALIVE
资源/默认值 epoll 与共享互斥、默认值粗放 NIO/EPOLL 双共享组、默认值收敛并参数化

最终效果:正常流量下连接稳定复用不再被无谓拆除;"正在关闭/对端已关"的竞态窗口里不再误发请求;异常(半死/RST/IP 漂移/黑洞)能在有界时间内被探测并自愈。配套约 4000+ 行测试覆盖上述竞态与恢复场景。


五、核心代码块清单

⭐ 消除"发到断链"竞态

1. invalidate-before-close(客户端 idle:先失效 slot 再异步关)

    private final class IdleCloseHandler extends ChannelDuplexHandler {
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            if (evt instanceof IdleStateEvent) {
                io.netty.channel.Channel ioChannel = ctx.channel();
                ...
                // 先失效 transport 里的 slot,让请求线程立刻看到“需要重连”
                com.tencent.trpc.core.transport.Channel wrapper =
                        NettyChannelManager.getOrAddChannel(ioChannel, config);
                if (wrapper != null) {
                    invalidateChannel(wrapper);
                }
                // 再异步关闭
                ctx.close().addListener(future -> logger.info("[long-link][idle-close] ...", ...));
                return;
            }
            super.userEventTriggered(ctx, evt);
        }
    }

2. invalidateChannel:锁内复读置空 slot(AbstractClientTransport

    @Override
    public void invalidateChannel(Channel target) {
        ...
        for (int i = 0; i < channels.size(); i++) {
            ChannelFutureItem item = channels.get(i);
            ...
            if (ch != target) { continue; }
            connLock.lock();
            try {
                ChannelFutureItem latest = channels.get(i);
                if (latest != item) { return; }   // 不覆盖别的线程刚刷新的 slot
                channels.set(i, new ChannelFutureItem(null, config));  // 置空 → 下次必重连
                item.close();
            } finally { connLock.unlock(); }
            return;
        }
    }

3. channelsCopyOnWriteArrayList(修复可见性 race)

    /**
     * Channel pool. Backed by {@link CopyOnWriteArrayList} so that the slot publication ...
     * is visible to concurrent readers in {@link #getChannel0} with volatile semantics,
     * eliminating the data race that an {@link java.util.ArrayList} would have.
     */
    protected List<ChannelFutureItem> channels;

4. 懒重连 + 惊群防护(锁内双检 + 状态机)

    protected void ensureChannelActive(int chIndex) {
        if (!needsReconnect(channels.get(chIndex))) { return; }
        connLock.lock();
        try {
            ChannelFutureItem latest = channels.get(chIndex);
            if (!needsReconnect(latest)) { return; }   // 双检防惊群
            channels.set(chIndex, new ChannelFutureItem(createChannel().toCompletableFuture(), config));
            latest.close();
        } finally { connLock.unlock(); }
    }

    private static boolean needsReconnect(ChannelFutureItem item) {
        if (item.isNotYetConnect()) { return true; }
        return !item.isAvailable() && !item.isConnecting();
    }

tRPC 长连接其它核心

5. READ-idle handler 安装 + TCP keepalive 调优

                if (idleTimeoutMills > 0) {
                    p.addBefore("handler", "idleState",
                            new IdleStateHandler(idleTimeoutMills, 0L, 0L, TimeUnit.MILLISECONDS));
                    p.addBefore("handler", "idleClose", new IdleCloseHandler());
                }
    private void applyTcpKeepAliveTuning(Bootstrap bootstrap) {
        Integer idle = config.getTcpKeepAliveIdle();
        Integer intvl = config.getTcpKeepAliveIntvl();
        Integer cnt = config.getTcpKeepAliveCnt();
        if (idle != null && idle > 0)  { bootstrap.option(EpollChannelOption.TCP_KEEPIDLE, idle); }
        if (intvl != null && intvl > 0){ bootstrap.option(EpollChannelOption.TCP_KEEPINTVL, intvl); }
        if (cnt != null && cnt > 0)    { bootstrap.option(EpollChannelOption.TCP_KEEPCNT, cnt); }
    }

6. 服务端不再装 IdleStateHandler(NettyTcpServerTransport

// Long-connection mode: do NOT install IdleStateHandler. The server never proactively
// closes a client connection due to idle.
if (codec != null) {
    p.addLast("encode", nettyCodec.getEncoder()).addLast("decode", nettyCodec.getDecoder());
}

7. NIO/EPOLL 双共享组(引用计数幂等)

    private static void acquireSharedGroup(SharedGroupKind kind, int ioThreads, String threadPoolName) { ... }
    private static void releaseSharedGroup(SharedGroupKind kind) {
        // 计数减到 0 才 shutdownGracefully 并置 null;负数纠正为 0 防重复 shutdown
    }

集群侧治理

8. 非 Netty 空闲关闭:三重防竞态

    private static void closeIfIdleResponseTimedOut(BackendConfig bConfig, String key, RpcClientProxy proxy) {
        ...
        if (transporter == null || Constants.TRANSPORTER_NETTY.equalsIgnoreCase(transporter)) { return; }
        if (proxy.inFlight.get() > 0) { return; }                       // ① 在途跳过
        if (System.currentTimeMillis() - proxy.lastResponseTimeMs.get() < idleTimeoutMs) { return; }
        if (!proxy.closing.compareAndSet(false, true)) { return; }      // ② 单次 CAS 抢占
        long idleMsAfterCas = System.currentTimeMillis() - proxy.lastResponseTimeMs.get();
        if (idleMsAfterCas < idleTimeoutMs || proxy.inFlight.get() > 0) {// ③ CAS 后复查
            proxy.closing.set(false);
            return;
        }
        proxy.close();
    }

9. in-flight 计数 + 仅成功响应刷新活跃时间

        public CompletionStage<Response> invoke(Request request) {
            rpcClient.inFlight.incrementAndGet();
            CompletionStage<Response> stage;
            try { stage = delegate.invoke(request); }
            catch (Throwable ex) { rpcClient.inFlight.decrementAndGet(); throw ex; }
            return stage.whenComplete((response, throwable) -> {
                try {
                    if (throwable == null && response != null) { rpcClient.markResponseReceived(); }
                } finally { rpcClient.inFlight.decrementAndGet(); }
            });
        }

10. CAS 摘除缓存(防误删新 proxy,DefClusterInvoker / RpcClusterClientManager

                if (value != null && !value.isAvailable()) {
                    invokerCache.remove(key, value);          // CAS 删除
                }
                ...
                invokerCache.put(key, created);
                rpcClient.closeFuture().whenComplete((r, e) -> {
                    boolean removed = invokerCache.remove(key, created);   // CAS 删除
                });

HTTP / HTTP2

11. HTTP/1.1 连接池保活/驱逐治理

    protected void doOpen() {
        PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager();
        int maxConns = protocolConfig.getMaxConns();
        cm.setMaxTotal(maxConns);
        cm.setDefaultMaxPerRoute(maxConns);
        cm.setValidateAfterInactivity(VALIDATE_AFTER_INACTIVITY_MS);          // 2s
        cm.setDefaultSocketConfig(SocketConfig.custom().setSoKeepAlive(true).build());
        httpClient = HttpClients.custom().setConnectionManager(cm)
                .evictExpiredConnections()
                .evictIdleConnections(EVICT_IDLE_CONNECTIONS_SECONDS, TimeUnit.SECONDS)  // 60s
                .setKeepAliveStrategy(HttpRpcClient::resolveKeepAliveDuration)           // 封顶 5min
                .setConnectionTimeToLive(CONNECTION_TTL_MINUTES, TimeUnit.MINUTES)       // 10min
                .build();
    }

12. HTTP 健康信号驱动集群驱逐

    public boolean isAvailable() {
        if (!super.isAvailable()) { return false; }
        if (consecutiveFailures.get() >= FAILURE_UNAVAILABLE_THRESHOLD) { return false; }   // 连续失败 50
        return (System.nanoTime() - lastUsedNanos) <= IDLE_UNAVAILABLE_THRESHOLD_NANOS;     // 空闲 10min
    }

默认值

13. Constants 关键默认值

DEFAULT_MAX_CONNECTIONS:     "20480""200"
DEFAULT_CONNECTIONS_PERADDR: "2""4"      // 服务端入向连接翻倍,需检查 fd ulimit / LB 限制
DEFAULT_TCP_KEEPALIVE_IDLE:  新增 "30"
DEFAULT_TCP_KEEPALIVE_INTVL: 新增 "10"
DEFAULT_TCP_KEEPALIVE_CNT:   新增 "3"           // 30 + 10×3 ≈ 60s 半死探测窗口

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant