From 5be70826aa92ed043a636abeb55a453c607b058e Mon Sep 17 00:00:00 2001 From: open-trade Date: Sat, 7 Mar 2020 02:06:17 +0800 Subject: [PATCH] tested, will refactor to spawn --- Cargo.lock | 53 +++++-------------------------------- Cargo.toml | 2 +- src/rendezvous_server.rs | 56 ++++++++++++++++++++++++++++++---------- 3 files changed, 50 insertions(+), 61 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 44ac300..4460f6a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -23,11 +23,6 @@ dependencies = [ "winapi 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "autocfg" -version = "1.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" - [[package]] name = "bitflags" version = "1.2.1" @@ -43,16 +38,6 @@ name = "cfg-if" version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" -[[package]] -name = "crossbeam-utils" -version = "0.7.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "autocfg 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", - "cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", - "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "env_logger" version = "0.7.1" @@ -84,11 +69,6 @@ name = "fuchsia-zircon-sys" version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -[[package]] -name = "futures" -version = "0.1.29" -source = "registry+https://github.com/rust-lang/crates.io-index" - [[package]] name = "futures" version = "0.3.4" @@ -181,8 +161,8 @@ dependencies = [ "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "protobuf 2.10.2 (registry+https://github.com/rust-lang/crates.io-index)", "protobuf-codegen-pure 2.10.2 (registry+https://github.com/rust-lang/crates.io-index)", + "simple-error 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "tokio 0.2.13 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-timer 0.2.13 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-util 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -418,6 +398,11 @@ dependencies = [ "libc 0.2.67 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "simple-error" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "slab" version = "0.4.2" @@ -483,15 +468,6 @@ dependencies = [ "winapi 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "tokio-executor" -version = "0.1.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "crossbeam-utils 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)", - "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "tokio-macros" version = "0.2.5" @@ -502,17 +478,6 @@ dependencies = [ "syn 1.0.16 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "tokio-timer" -version = "0.2.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "crossbeam-utils 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)", - "futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)", - "slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", - "tokio-executor 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "tokio-util" version = "0.3.0" @@ -582,16 +547,13 @@ dependencies = [ "checksum aho-corasick 0.7.9 (registry+https://github.com/rust-lang/crates.io-index)" = "d5e63fd144e18ba274ae7095c0197a870a7b9468abc801dd62f190d80817d2ec" "checksum arc-swap 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)" = "d7b8a9123b8027467bce0099fe556c628a53c8d83df0507084c31e9ba2e39aff" "checksum atty 0.2.14 (registry+https://github.com/rust-lang/crates.io-index)" = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" -"checksum autocfg 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f8aac770f1885fd7e387acedd76065302551364496e46b3dd00860b2f8359b9d" "checksum bitflags 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693" "checksum bytes 0.5.4 (registry+https://github.com/rust-lang/crates.io-index)" = "130aac562c0dd69c56b3b1cc8ffd2e17be31d0b6c25b61c96b76231aa23e39e1" "checksum cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)" = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822" -"checksum crossbeam-utils 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)" = "c3c7c73a2d1e9fc0886a08b93e98eb643461230d5f1925e4036204d5f2e261a8" "checksum env_logger 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)" = "44533bbbb3bb3c1fa17d9f2e4e38bbbaf8396ba82193c4cb1b6445d711445d36" "checksum fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)" = "2fad85553e09a6f881f739c29f0b00b0f01357c743266d478b68951ce23285f3" "checksum fuchsia-zircon 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "2e9763c69ebaae630ba35f74888db465e49e259ba1bc0eda7d06f4a067615d82" "checksum fuchsia-zircon-sys 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7" -"checksum futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)" = "1b980f2816d6ee8673b6517b52cb0e808a180efc92e5c19d02cdda79066703ef" "checksum futures 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "5c329ae8753502fb44ae4fc2b622fa2a94652c41e795143765ba0927f92ab780" "checksum futures-channel 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "f0c77d04ce8edd9cb903932b608268b3fffec4163dc053b3b402bf47eac1f1a8" "checksum futures-core 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "f25592f769825e89b92358db00d26f965761e094951ac44d3663ef25b7ac464a" @@ -630,15 +592,14 @@ dependencies = [ "checksum regex 1.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "322cf97724bea3ee221b78fe25ac9c46114ebb51747ad5babd51a2fc6a8235a8" "checksum regex-syntax 0.6.16 (registry+https://github.com/rust-lang/crates.io-index)" = "1132f845907680735a84409c3bebc64d1364a5683ffbce899550cd09d5eaefc1" "checksum signal-hook-registry 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "94f478ede9f64724c5d173d7bb56099ec3e2d9fc2774aac65d34b8b890405f41" +"checksum simple-error 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "339844c9af2d844b9230bb28e8f819a7790cbf20a29b5cbd2b59916a03a1ef51" "checksum slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8" "checksum socket2 0.3.11 (registry+https://github.com/rust-lang/crates.io-index)" = "e8b74de517221a2cb01a53349cf54182acdc31a074727d3079068448c0676d85" "checksum syn 1.0.16 (registry+https://github.com/rust-lang/crates.io-index)" = "123bd9499cfb380418d509322d7a6d52e5315f064fe4b3ad18a53d6b92c07859" "checksum termcolor 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "bb6bfa289a4d7c5766392812c0a1f4c1ba45afa1ad47803c11e1f407d846d75f" "checksum thread_local 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "d40c6d1b69745a6ec6fb1ca717914848da4b44ae29d9b3080cbee91d72a69b14" "checksum tokio 0.2.13 (registry+https://github.com/rust-lang/crates.io-index)" = "0fa5e81d6bc4e67fe889d5783bd2a128ab2e0cfa487e0be16b6a8d177b101616" -"checksum tokio-executor 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)" = "fb2d1b8f4548dbf5e1f7818512e9c406860678f29c300cdf0ebac72d1a3a1671" "checksum tokio-macros 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)" = "f0c3acc6aa564495a0f2e1d59fab677cd7f81a19994cfc7f3ad0e64301560389" -"checksum tokio-timer 0.2.13 (registry+https://github.com/rust-lang/crates.io-index)" = "93044f2d313c95ff1cb7809ce9a7a05735b012288a888b62d4434fd58c94f296" "checksum tokio-util 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "af67cdce2b40f8dffb0ee04c853a24217b5d0d3e358f0f5ccc0b5332174ed9a8" "checksum unicode-xid 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "826e7639553986605ec5979c7dd957c7895e93eabed50ab2ffa7f6128a75097c" "checksum winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)" = "167dc9d6949a9b857f3451275e911c3f44255842c1f7a76f33c55103a909087a" diff --git a/Cargo.toml b/Cargo.toml index f7d6289..91ac527 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,11 +10,11 @@ edition = "2018" tokio = { version = "0.2", features = ["full"] } protobuf = "2.10" tokio-util = { version = "0.3", features = ["full"] } -tokio-timer = "0.2" log = "0.4" env_logger = "0.7" futures = "0.3" bytes = "0.5" +simple-error = "0.2" [build-dependencies] protobuf-codegen-pure = "2.10" diff --git a/src/rendezvous_server.rs b/src/rendezvous_server.rs index 72ad23b..cd44fbe 100644 --- a/src/rendezvous_server.rs +++ b/src/rendezvous_server.rs @@ -1,24 +1,27 @@ use super::message_proto::*; use bytes::Bytes; -use futures::{FutureExt, SinkExt}; +use futures::SinkExt; use protobuf::{parse_from_bytes, Message as _}; use std::{ collections::HashMap, error::Error, net::{Ipv4Addr, SocketAddr, SocketAddrV4}, - time::{SystemTime, UNIX_EPOCH}, + time::{Duration, SystemTime, UNIX_EPOCH}, +}; +use tokio::{ + net::UdpSocket, + stream::StreamExt, + time::{self, delay_for}, }; -use tokio::net::UdpSocket; -use tokio::stream::StreamExt; use tokio_util::{codec::BytesCodec, udp::UdpFramed}; /// Certain router and firewalls scan the packet and if they /// find an IP address belonging to their pool that they use to do the NAT mapping/translation, so here we mangle the ip address -pub struct V4AddrMangle(Vec); +pub struct V4AddrMangle(); impl V4AddrMangle { - pub fn encode(addr: &SocketAddrV4) -> Self { + pub fn encode(addr: &SocketAddrV4) -> Vec { let tm = (SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() @@ -35,12 +38,12 @@ impl V4AddrMangle { break; } } - Self(bytes[..(16 - n_padding)].to_vec()) + bytes[..(16 - n_padding)].to_vec() } - pub fn decode(&self) -> SocketAddrV4 { + pub fn decode(bytes: &[u8]) -> SocketAddrV4 { let mut padded = [0u8; 16]; - padded[..self.0.len()].copy_from_slice(&self.0); + padded[..bytes.len()].copy_from_slice(&bytes); let number = u128::from_ne_bytes(padded); let tm = (number >> 17) & (u32::max_value() as u128); let ip = (((number >> 49) - tm) as u32).to_ne_bytes(); @@ -83,11 +86,9 @@ impl RendezvousServer { }, ); } - tokio_timer::sleep(std::time::Duration::from_secs(60)); } Some(Message_oneof_union::peek_peer(pp)) => { rs.handle_peek_peer(&pp, addr, &mut socket).await?; - tokio_timer::sleep(std::time::Duration::from_secs(60)); } _ => {} } @@ -106,7 +107,7 @@ impl RendezvousServer { if let Some(peer) = self.peer_map.get(&pp.hbb_addr) { let mut msg_out = Message::new(); msg_out.set_peek_peer_response(PeekPeerResponse { - socket_addr: V4AddrMangle::encode(&peer.socket_addr).0.to_vec(), + socket_addr: V4AddrMangle::encode(&peer.socket_addr), ..Default::default() }); send_to(&msg_out, addr, socket).await?; @@ -123,13 +124,17 @@ pub async fn send_to(msg: &Message, addr: SocketAddr, socket: &mut FramedSocket) Ok(()) } +pub async fn sleep(sec: f32) { + delay_for(Duration::from_secs_f32(sec)).await; +} + #[cfg(test)] mod tests { use super::*; #[test] fn test_mangle() { let addr = SocketAddrV4::new(Ipv4Addr::new(192, 168, 16, 32), 21116); - assert_eq!(addr, V4AddrMangle::encode(&addr).decode()); + assert_eq!(addr, V4AddrMangle::decode(&V4AddrMangle::encode(&addr)[..])); } #[allow(unused_must_use)] @@ -140,15 +145,38 @@ mod tests { let to_addr = server_addr.parse().unwrap(); let f2 = async { let socket = UdpSocket::bind("127.0.0.1:0").await.unwrap(); + let local_addr = socket.local_addr().unwrap(); let mut socket = UdpFramed::new(socket, BytesCodec::new()); let mut msg_out = Message::new(); + msg_out.set_register_peer(RegisterPeer { + hbb_addr: "123".to_string(), + ..Default::default() + }); + send_to(&msg_out, to_addr, &mut socket).await; msg_out.set_peek_peer(PeekPeer { hbb_addr: "123".to_string(), ..Default::default() }); send_to(&msg_out, to_addr, &mut socket).await; + if let Ok(Some(Ok((bytes, _)))) = + time::timeout(Duration::from_millis(1), socket.next()).await + { + if let Ok(msg_in) = parse_from_bytes::(&bytes) { + assert_eq!( + local_addr, + SocketAddr::V4(V4AddrMangle::decode( + &msg_in.get_peek_peer_response().socket_addr[..] + )) + ); + } + } + if true { + Err(Box::new(simple_error::SimpleError::new("done"))) + } else { + Ok(()) + } }; - tokio::join!(f1, f2); + tokio::try_join!(f1, f2); } #[test]