Skip to content

Commit 58b5374

Browse files
committed
fix: deadlock for tokio::select
1 parent fc08577 commit 58b5374

3 files changed

Lines changed: 69 additions & 39 deletions

File tree

src/client/main.rs

Lines changed: 58 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use clap::Parser;
22
use std::sync::{Arc};
33
use std::time::Duration;
4+
use tokio::sync::{mpsc, RwLock};
45
use tokio::time::interval;
56
use crate::client::{Args, P2P_HOLE_PUNCH_PORT, P2P_UDP_PORT};
67
use crate::client::relay::{RelayHandler, new_relay_handler};
@@ -56,8 +57,8 @@ pub async fn run_client() {
5657
}
5758
};
5859

59-
// Initialize P2P handler if enabled
60-
let mut p2p_handler = if args.enable_p2p {
60+
// Initialize P2P handler if enabled (wrapped in Arc<RwLock<>> for sharing with device packet task)
61+
let p2p_handler = if args.enable_p2p {
6162
tracing::info!("P2P mode enabled");
6263

6364
let mut handler = PeerHandler::new(
@@ -67,7 +68,7 @@ pub async fn run_client() {
6768
handler.run_peer_service();
6869
handler.rewrite_peers(device_config.peer_details.clone()).await;
6970
handler.start_probe_timer().await;
70-
Some(handler)
71+
Some(Arc::new(RwLock::new(handler)))
7172
} else {
7273
tracing::info!("P2P mode disabled, using relay only");
7374
None
@@ -109,7 +110,7 @@ pub async fn run_client() {
109110
}
110111

111112
// Run main event loop
112-
run_event_loop(&mut relay_handler, &mut p2p_handler, &mut dev).await;
113+
run_event_loop(&mut relay_handler, p2p_handler, &mut dev).await;
113114
}
114115

115116
async fn init_device(device_config: &HandshakeReplyFrame, enable_masq: bool) -> crate::Result<DeviceHandler> {
@@ -138,31 +139,51 @@ async fn init_device(device_config: &HandshakeReplyFrame, enable_masq: bool) ->
138139

139140
async fn run_event_loop(
140141
client_handler: &mut RelayHandler,
141-
p2p_handler: &mut Option<PeerHandler>,
142+
p2p_handler: Option<Arc<RwLock<PeerHandler>>>,
142143
dev: &mut DeviceHandler,
143144
) {
144145
let mut refresh_ticker = interval(Duration::from_secs(30));
145-
146-
loop {
147-
tokio::select! {
148-
// TUN device -> Network (P2P or Relay)
149-
packet = dev.recv() => {
150-
if let Some(packet) = packet {
151-
handle_device_packet(client_handler, p2p_handler, packet).await;
146+
let relay_outbound = match client_handler.get_outbound_tx() {
147+
Some(tx) => tx,
148+
None => return,
149+
};
150+
151+
let mut dev_inbound = match dev.get_dev_inbound() {
152+
Some(dev) => dev,
153+
None => return,
154+
};
155+
156+
let p2p_for_spawn = p2p_handler.clone();
157+
158+
tokio::spawn(async move {
159+
loop {
160+
tokio::select! {
161+
packet = dev_inbound.recv() => {
162+
if let Some(packet) = packet {
163+
handle_device_packet(relay_outbound.clone(), p2p_for_spawn.clone(), packet).await;
164+
}
152165
}
153166
}
167+
}
168+
});
154169

170+
loop {
171+
tokio::select! {
155172
// Server -> TUN device or route update
156173
frame = client_handler.recv_frame() => {
157174
if let Ok(frame) = frame {
158-
handle_relay_frame(frame, p2p_handler, dev).await;
175+
handle_relay_frame(frame, &p2p_handler, dev).await;
159176
}
160177
}
161178

162179
// P2P -> TUN device (only if P2P enabled)
163180
frame = async {
164-
match p2p_handler {
165-
Some(handler) => handler.recv_frame().await,
181+
match p2p_handler.as_ref() {
182+
Some(arc) => {
183+
let arc = arc.clone();
184+
let mut guard = arc.write().await;
185+
guard.recv_frame().await
186+
}
166187
None => std::future::pending().await, // Never resolves if no P2P
167188
}
168189
} => {
@@ -176,25 +197,31 @@ async fn run_event_loop(
176197

177198
// refresh config and status
178199
_ = refresh_ticker.tick() => {
179-
get_status(client_handler, p2p_handler.as_ref(), dev).await;
180-
200+
match &p2p_handler {
201+
Some(arc) => {
202+
let guard = arc.read().await;
203+
get_status(client_handler, Some(&*guard), dev).await;
204+
}
205+
None => get_status(client_handler, None, dev).await,
206+
}
181207
}
182208
}
183209
}
184210
}
185211

186-
/// Handle outbound packet from TUN device
212+
/// Handle outbound packet from TUN device: try P2P first if available, then fallback to relay.
187213
async fn handle_device_packet(
188-
client_handler: &mut RelayHandler,
189-
p2p_handler: &mut Option<PeerHandler>,
214+
relay_outbound: mpsc::Sender<Frame>,
215+
p2p_handler: Option<Arc<RwLock<PeerHandler>>>,
190216
packet: Vec<u8>,
191217
) {
192218
let data_frame = DataFrame { payload: packet.clone() };
193-
219+
194220
// Try P2P first if available
195-
if let Some(p2p) = p2p_handler {
221+
if let Some(arc) = &p2p_handler {
196222
let dst = data_frame.dst();
197-
match p2p.send_frame(Frame::Data(data_frame), dst.as_str()).await {
223+
let guard = arc.read().await;
224+
match guard.send_frame(Frame::Data(data_frame.clone()), dst.as_str()).await {
198225
Ok(_) => {
199226
tracing::debug!("Device -> P2P: {} bytes", packet.len());
200227
return;
@@ -204,18 +231,18 @@ async fn handle_device_packet(
204231
}
205232
}
206233
}
207-
234+
208235
// Fallback to relay (or direct if no P2P)
209236
let frame = Frame::Data(DataFrame { payload: packet });
210-
if let Err(e) = client_handler.send_frame(frame).await {
237+
if let Err(e) = RelayHandler::send_frame(relay_outbound, frame).await {
211238
tracing::error!("Failed to send via relay: {}", e);
212239
}
213240
}
214241

215242
/// Handle frame received from relay server
216243
async fn handle_relay_frame(
217244
frame: Frame,
218-
p2p_handler: &mut Option<PeerHandler>,
245+
p2p_handler: &Option<Arc<RwLock<PeerHandler>>>,
219246
dev: &mut DeviceHandler,
220247
) {
221248
match frame {
@@ -227,13 +254,14 @@ async fn handle_relay_frame(
227254
}
228255
Frame::KeepAlive(keepalive) => {
229256
tracing::debug!("Received keepalive with {:?} peer details", keepalive.peer_details);
230-
257+
231258
// Update routes in device handler
232259
dev.reload_route(keepalive.peer_details.clone()).await;
233-
260+
234261
// Update P2P peer information if P2P is enabled
235-
if let Some(p2p) = p2p_handler {
236-
p2p.insert_or_update(keepalive.peer_details).await;
262+
if let Some(arc) = p2p_handler {
263+
let mut guard = arc.write().await;
264+
guard.insert_or_update(keepalive.peer_details).await;
237265
}
238266
}
239267
_ => {}

src/client/relay.rs

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -310,19 +310,17 @@ impl RelayHandler {
310310
});
311311
}
312312

313-
pub async fn send_frame(&mut self, frame: Frame) -> crate::Result<()> {
314-
self.metrics.tx_frame += 1;
315-
let outbound_tx = match self.outbound_tx.clone() {
316-
Some(tx) => tx,
317-
None => {
318-
self.metrics.tx_error += 1;
319-
return Err("relay connection disconnect".into())}
320-
};
313+
pub fn get_outbound_tx(&self)-> Option<mpsc::Sender<Frame>> {
314+
self.outbound_tx.clone()
315+
}
316+
317+
pub async fn send_frame(outbound_tx: mpsc::Sender<Frame>, frame: Frame) -> crate::Result<()> {
318+
// self.metrics.tx_frame += 1;
321319
let result = outbound_tx.send_timeout(frame, Duration::from_secs(1)).await;
322320
match result {
323321
Ok(()) => Ok(()),
324322
Err(e) => {
325-
self.metrics.tx_error += 1;
323+
// self.metrics.tx_error += 1;
326324
Err(format!("device=> server fail {:?}", e).into())
327325
},
328326
}

src/utils/device.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,10 @@ impl DeviceHandler {
183183
Ok(tun_index)
184184
}
185185

186+
pub fn get_dev_inbound(&mut self) -> Option<mpsc::Receiver<Vec<u8>>> {
187+
self.inbound_rx.take()
188+
}
189+
186190
pub async fn recv(&mut self) -> Option<Vec<u8>> {
187191
let inbound_rx = match self.inbound_rx.as_mut() {
188192
Some(rx) => rx,

0 commit comments

Comments
 (0)