From 1417754b23b855ba415ebd412b176fa3c469b232 Mon Sep 17 00:00:00 2001 From: open-trade Date: Fri, 25 Sep 2020 17:57:06 +0800 Subject: [PATCH] refactor --- src/rendezvous_server.rs | 44 +++++++++++++++++++++++++--------------- 1 file changed, 28 insertions(+), 16 deletions(-) diff --git a/src/rendezvous_server.rs b/src/rendezvous_server.rs index daf6436..8d5c6d3 100644 --- a/src/rendezvous_server.rs +++ b/src/rendezvous_server.rs @@ -169,16 +169,26 @@ impl RendezvousServer { log::debug!("Tcp connection from {:?}", addr); let (a, mut b) = Framed::new(stream, BytesCodec::new()).split(); let tcp_punch = rs.tcp_punch.clone(); - tcp_punch.lock().unwrap().insert(addr, a); let mut rs = rs.clone(); tokio::spawn(async move { + let mut sender = Some(a); while let Ok(Some(Ok(bytes))) = timeout(30_000, b.next()).await { if let Ok(msg_in) = RendezvousMessage::parse_from_bytes(&bytes) { match msg_in.union { Some(rendezvous_message::Union::punch_hole_request(ph)) => { + if let Some(sender) = sender.take() { + tcp_punch.lock().unwrap().insert(addr, sender); + } else { + break; + } allow_err!(rs.handle_tcp_punch_hole_request(addr, ph.id).await); } Some(rendezvous_message::Union::request_relay(mut rf)) => { + if let Some(sender) = sender.take() { + tcp_punch.lock().unwrap().insert(addr, sender); + } else { + break; + } if let Some(peer) = rs.pm.map.read().unwrap().get(&rf.id).map(|x| x.clone()) { let mut msg_out = RendezvousMessage::new(); rf.socket_addr = AddrMangle::encode(addr); @@ -191,12 +201,7 @@ impl RendezvousServer { rfr.socket_addr = Default::default(); let mut msg_out = RendezvousMessage::new(); msg_out.set_request_relay_response(rfr); - let sender_b = rs.tcp_punch.lock().unwrap().remove(&addr_b); - if let Some(mut sender_b) = sender_b { - if let Ok(bytes) = msg_out.write_to_bytes() { - allow_err!(sender_b.send(Bytes::from(bytes)).await); - } - } + allow_err!(rs.send_to_tcp_sync(&msg_out, addr_b).await); break; } Some(rendezvous_message::Union::punch_hole_sent(phs)) => { @@ -207,6 +212,19 @@ impl RendezvousServer { allow_err!(rs.handle_local_addr(la, addr, None).await); break; } + Some(rendezvous_message::Union::test_nat_request(_)) => { + let mut msg_out = RendezvousMessage::new(); + msg_out.set_test_nat_response(TestNatResponse { + port: addr.port() as _, + ..Default::default() + }); + if let Some(tcp) = sender.as_mut() { + if let Ok(bytes) = msg_out.write_to_bytes() { + allow_err!(tcp.send(Bytes::from(bytes)).await); + } + } + break; + } _ => { break; } @@ -215,7 +233,9 @@ impl RendezvousServer { break; } } - rs.tcp_punch.lock().unwrap().remove(&addr); + if sender.is_none() { + rs.tcp_punch.lock().unwrap().remove(&addr); + } log::debug!("Tcp connection from {:?} closed", addr); }); } @@ -321,14 +341,6 @@ impl RendezvousServer { socket.send(&msg_out, addr).await?; } } - Some(rendezvous_message::Union::test_nat_request(_)) => { - let mut msg_out = RendezvousMessage::new(); - msg_out.set_test_nat_response(TestNatResponse { - port: addr.port() as _, - ..Default::default() - }); - socket.send(&msg_out, addr).await?; - } _ => {} } }