diff --git a/Cargo.lock b/Cargo.lock index aa2304c4f..c94eff245 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -723,9 +723,9 @@ dependencies = [ [[package]] name = "curve25519-dalek" -version = "4.1.1" +version = "4.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e89b8c6a2e4b1f45971ad09761aafb85514a84744b67a95e32c3cc1352d1f65c" +checksum = "0a677b8922c94e01bdbb12126b0bc852f00447528dee1782229af9c720c3f348" dependencies = [ "cfg-if", "cpufeatures", @@ -996,6 +996,18 @@ dependencies = [ "syn 0.15.44", ] +[[package]] +name = "dns-lookup" +version = "2.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5766087c2235fec47fafa4cfecc81e494ee679d0fd4a59887ea0919bfb0e4fc" +dependencies = [ + "cfg-if", + "libc", + "socket2", + "windows-sys 0.48.0", +] + [[package]] name = "ecdsa" version = "0.16.9" @@ -1022,9 +1034,9 @@ dependencies = [ [[package]] name = "ed25519-dalek" -version = "2.1.0" +version = "2.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1f628eaec48bfd21b865dc2950cfa014450c01d2fa2b69a86c2fd5844ec523c0" +checksum = "4a3daa8e81a3963a60642bcc1f90a670680bd4a77535faa384e9d1c79d620871" dependencies = [ "curve25519-dalek", "ed25519", @@ -1138,9 +1150,9 @@ dependencies = [ [[package]] name = "fiat-crypto" -version = "0.2.5" +version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "27573eac26f4dd11e2b1916c3fe1baa56407c83c71a773a8ba17ec0bca03b6b7" +checksum = "1676f435fc1dadde4d03e43f5d62b259e1ce5f40bd4ffb21db2b42ebe59c1382" [[package]] name = "flagset" @@ -1444,9 +1456,9 @@ dependencies = [ [[package]] name = "hermit-abi" -version = "0.3.4" +version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5d3d0e0f38255e7fa3cf31335b3a56f05febd18025f4db5ef7a0cfb4f8da651f" +checksum = "d0c62115964e08cb8039170eb33c1d0e2388a256930279edca206fff675f82c3" [[package]] name = "hex-literal" @@ -1630,12 +1642,11 @@ dependencies = [ [[package]] name = "hyper-util" -version = "0.1.2" +version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bdea9aac0dbe5a9240d68cfd9501e2db94222c6dc06843e06640b9e07f0fdc67" +checksum = "ca38ef113da30126bbff9cd1705f9273e15d45498615d138b0c20279ac7a76aa" dependencies = [ "bytes", - "futures-channel", "futures-util", "http 1.0.0", "http-body 1.0.0", @@ -1643,14 +1654,13 @@ dependencies = [ "pin-project-lite", "socket2", "tokio", - "tracing", ] [[package]] name = "iana-time-zone" -version = "0.1.59" +version = "0.1.60" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6a67363e2aa4443928ce15e57ebae94fd8949958fd1223c4cfc0cd473ad7539" +checksum = "e7ffbb5a1b541ea2561f8c41c087286cc091e21e556a4f09a8f6cbf17b69b141" dependencies = [ "android_system_properties", "core-foundation-sys", @@ -1681,9 +1691,9 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.1.0" +version = "2.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d530e1a18b1cb4c484e6e34556a0d948706958449fca0cab753d649f2bce3d1f" +checksum = "824b2ae422412366ba479e8111fd301f7b5faece8149317bb81925979a53f520" dependencies = [ "equivalent", "hashbrown", @@ -1842,9 +1852,9 @@ dependencies = [ [[package]] name = "js-sys" -version = "0.3.67" +version = "0.3.68" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a1d36f1235bc969acba30b7f5990b864423a6068a10f7c90ae8f0112e3a59d1" +checksum = "406cda4b368d531c842222cf9d2600a9a4acce8d29423695379c6868a143a9ee" dependencies = [ "wasm-bindgen", ] @@ -1869,9 +1879,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.152" +version = "0.2.153" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "13e3bf6590cbc649f4d1a3eefc9d5d6eb746f5200ffb04e5e142700b8faa56e7" +checksum = "9c198f91728a82281a64e1f4f9eeb25d82cb32a5de251c6bd1b5154d63a8e7bd" [[package]] name = "libm" @@ -2007,9 +2017,9 @@ checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" [[package]] name = "miniz_oxide" -version = "0.7.1" +version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e7810e0be55b428ada41041c41f32c9f1a42817901b4ccf45fa3d4b6561e74c7" +checksum = "9d811f3e15f28568be3407c8e7fdb6514c1cda3cb30683f15b6a1a1dc4ea14a7" dependencies = [ "adler", ] @@ -2082,17 +2092,35 @@ dependencies = [ "tempfile", ] +[[package]] +name = "network-interface" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d68759ef97fe9c9e46f79ea8736c19f1d28992e24c8dc8ce86752918bfeaae7" +dependencies = [ + "cc", + "libc", + "thiserror", + "winapi", +] + [[package]] name = "network-scanner" version = "0.0.0" dependencies = [ "anyhow", + "crossbeam", + "dns-lookup", + "network-interface", "network-scanner-net", "network-scanner-proto", + "parking_lot", "socket2", + "thiserror", "tokio", "tracing", "tracing-subscriber", + "typed-builder", ] [[package]] @@ -2101,13 +2129,11 @@ version = "0.0.0" dependencies = [ "anyhow", "crossbeam", - "futures", "parking_lot", "polling", "socket2", "thiserror", "tokio", - "tokio-stream", "tracing", "tracing-cov-mark", "tracing-subscriber", @@ -2236,6 +2262,12 @@ dependencies = [ "zeroize", ] +[[package]] +name = "num-conv" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" + [[package]] name = "num-derive" version = "0.3.3" @@ -2249,9 +2281,9 @@ dependencies = [ [[package]] name = "num-derive" -version = "0.4.1" +version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cfb77679af88f8b125209d354a202862602672222e7f2313fdd6dc349bad4712" +checksum = "ed3955f1a9c7c0c15e092f9c887db08b1fc683305fdf6eb6684f22555355e202" dependencies = [ "proc-macro2 1.0.78", "quote 1.0.35", @@ -2260,19 +2292,18 @@ dependencies = [ [[package]] name = "num-integer" -version = "0.1.45" +version = "0.1.46" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "225d3389fb3509a24c93f5c29eb6bde2586b98d9f016636dff58d7c6f7569cd9" +checksum = "7969661fd2958a5cb096e56c8e1ad0444ac2bbcd0061bd28660485a44879858f" dependencies = [ - "autocfg", "num-traits", ] [[package]] name = "num-iter" -version = "0.1.43" +version = "0.1.44" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7d03e6c028c5dc5cac6e2dec0efda81fc887605bb3d884578bb6d6bf7514e252" +checksum = "d869c01cc0c455284163fd0092f1f93835385ccab5a98a0dcc497b2f8bf055a9" dependencies = [ "autocfg", "num-integer", @@ -2281,9 +2312,9 @@ dependencies = [ [[package]] name = "num-traits" -version = "0.2.17" +version = "0.2.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "39e3200413f237f41ab11ad6d161bc7239c84dcb631773ccd7de3dfe4b5c267c" +checksum = "da0df0e5185db44f69b44f26786fe401b6c293d1907744beaa7fa62b2e5a517a" dependencies = [ "autocfg", "libm", @@ -2690,8 +2721,8 @@ checksum = "626dec3cac7cc0e1577a2ec3fc496277ec2baa084bebad95bb6fdbfae235f84c" [[package]] name = "polling" -version = "3.3.2" -source = "git+https://github.com/smol-rs/polling?rev=62430fd56e668559d08ca7071ab13a0e116ba515#62430fd56e668559d08ca7071ab13a0e116ba515" +version = "3.4.0" +source = "git+https://github.com/devolutions/polling.git?rev=c04e8ee40415cad551fe044457270f4a2d7c491d#c04e8ee40415cad551fe044457270f4a2d7c491d" dependencies = [ "cfg-if", "concurrent-queue", @@ -3022,9 +3053,9 @@ checksum = "e898588f33fdd5b9420719948f9f2a32c922a246964576f71ba7f24f80610fbc" [[package]] name = "reqwest" -version = "0.11.23" +version = "0.11.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "37b1ae8d9ac08420c66222fb9096fc5de435c3c48542bc5336c51892cffafb41" +checksum = "c6920094eb85afde5e4a138be3f2de8bbdf28000f0029e72c45025a56b042251" dependencies = [ "base64 0.21.7", "bytes", @@ -3044,9 +3075,11 @@ dependencies = [ "once_cell", "percent-encoding", "pin-project-lite", + "rustls-pemfile 1.0.4", "serde", "serde_json", "serde_urlencoded", + "sync_wrapper", "system-configuration", "tokio", "tokio-native-tls", @@ -3172,9 +3205,9 @@ dependencies = [ [[package]] name = "rustix" -version = "0.38.30" +version = "0.38.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "322394588aaf33c24007e8bb3238ee3e4c5c09c084ab32bc73890b99ff326bca" +checksum = "6ea3e1a662af26cd7a3ba09c0297a31af215563ecf42817c98df621387f4e949" dependencies = [ "bitflags 2.4.2", "errno", @@ -3216,7 +3249,7 @@ dependencies = [ "log", "ring 0.17.7", "rustls-pki-types", - "rustls-webpki 0.102.1", + "rustls-webpki 0.102.2", "subtle", "zeroize", ] @@ -3267,9 +3300,9 @@ dependencies = [ [[package]] name = "rustls-pki-types" -version = "1.1.0" +version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e9d979b3ce68192e42760c7810125eb6cf2ea10efae545a156063e61f314e2a" +checksum = "0a716eb65e3158e90e17cd93d855216e27bde02745ab842f2cab4a39dba1bacf" [[package]] name = "rustls-webpki" @@ -3283,9 +3316,9 @@ dependencies = [ [[package]] name = "rustls-webpki" -version = "0.102.1" +version = "0.102.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ef4ca26037c909dedb327b48c3327d0ba91d3dd3c4e05dad328f210ffb68e95b" +checksum = "faaa0a62740bedb9b2ef5afa303da42764c012f743917351dc9a237ea1663610" dependencies = [ "ring 0.17.7", "rustls-pki-types", @@ -3427,9 +3460,9 @@ dependencies = [ [[package]] name = "serde_html_form" -version = "0.2.3" +version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "224e6a14f315852940f3ec103125aa6482f0e224732ed91ed3330ed633077c34" +checksum = "20e1066e1cfa6692a722cf40386a2caec36da5ddc4a2c16df592f0f609677e8c" dependencies = [ "form_urlencoded", "indexmap", @@ -3482,9 +3515,9 @@ dependencies = [ [[package]] name = "serde_yaml" -version = "0.9.30" +version = "0.9.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1bf28c79a99f70ee1f1d83d10c875d2e70618417fda01ad1785e027579d9d38" +checksum = "adf8a49373e98a4c5f0ceb5d05aa7c648d75f63774981ed95b7c7443bbd50c6e" dependencies = [ "indexmap", "itoa", @@ -3642,7 +3675,7 @@ dependencies = [ "md-5", "md4", "num-bigint-dig", - "num-derive 0.4.1", + "num-derive 0.4.2", "num-traits", "oid", "picky", @@ -3790,13 +3823,12 @@ checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" [[package]] name = "tempfile" -version = "3.9.0" +version = "3.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "01ce4141aa927a6d1bd34a041795abd0db1cccba5d5f24b009f694bdf3a1f3fa" +checksum = "a365e8cd18e44762ef95d87f284f4b5cd04107fec2ff3052bd6a3e6069669e67" dependencies = [ "cfg-if", "fastrand", - "redox_syscall", "rustix", "windows-sys 0.52.0", ] @@ -3846,13 +3878,14 @@ dependencies = [ [[package]] name = "time" -version = "0.3.31" +version = "0.3.34" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f657ba42c3f86e7680e53c8cd3af8abbe56b5491790b46e22e19c0d57463583e" +checksum = "c8248b6521bb14bc45b4067159b9b6ad792e2d6d754d6c41fb50e29fefe38749" dependencies = [ "deranged", "itoa", "libc", + "num-conv", "num_threads", "powerfmt", "serde", @@ -3868,10 +3901,11 @@ checksum = "ef927ca75afb808a4d64dd374f00a2adf8d0fcff8e7b184af886c3c87ec4a3f3" [[package]] name = "time-macros" -version = "0.2.16" +version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26197e33420244aeb70c3e8c78376ca46571bc4e701e4791c2cd9f57dcb3a43f" +checksum = "7ba3a3ef41e6672a2f0f001392bb5dcd3ff0a9992d618ca761a11c3121547774" dependencies = [ + "num-conv", "time-core", ] @@ -4052,9 +4086,9 @@ dependencies = [ [[package]] name = "toml" -version = "0.8.8" +version = "0.8.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1a195ec8c9da26928f773888e0742ca3ca1040c6cd859c919c9f59c1954ab35" +checksum = "9a9aad4a3066010876e8dcf5a8a06e70a558751117a145c6ce2b82c2e2054290" dependencies = [ "serde", "serde_spanned", @@ -4073,9 +4107,9 @@ dependencies = [ [[package]] name = "toml_edit" -version = "0.21.0" +version = "0.22.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d34d383cd00a163b4a5b85053df514d45bc330f6de7737edfe0a93311d1eaa03" +checksum = "0c9ffdf896f8daaabf9b66ba8e77ea1ed5ed0f72821b398aba62352e95062951" dependencies = [ "indexmap", "serde", @@ -4499,9 +4533,9 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" [[package]] name = "wasm-bindgen" -version = "0.2.90" +version = "0.2.91" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1223296a201415c7fad14792dbefaace9bd52b62d33453ade1c5b5f07555406" +checksum = "c1e124130aee3fb58c5bdd6b639a0509486b0338acaaae0c84a5124b0f588b7f" dependencies = [ "cfg-if", "wasm-bindgen-macro", @@ -4509,9 +4543,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-backend" -version = "0.2.90" +version = "0.2.91" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fcdc935b63408d58a32f8cc9738a0bffd8f05cc7c002086c6ef20b7312ad9dcd" +checksum = "c9e7e1900c352b609c8488ad12639a311045f40a35491fb69ba8c12f758af70b" dependencies = [ "bumpalo", "log", @@ -4524,9 +4558,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-futures" -version = "0.4.40" +version = "0.4.41" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bde2032aeb86bdfaecc8b261eef3cba735cc426c1f3a3416d1e0791be95fc461" +checksum = "877b9c3f61ceea0e56331985743b13f3d25c406a7098d45180fb5f09bc19ed97" dependencies = [ "cfg-if", "js-sys", @@ -4536,9 +4570,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.90" +version = "0.2.91" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3e4c238561b2d428924c49815533a8b9121c664599558a5d9ec51f8a1740a999" +checksum = "b30af9e2d358182b5c7449424f017eba305ed32a7010509ede96cdc4696c46ed" dependencies = [ "quote 1.0.35", "wasm-bindgen-macro-support", @@ -4546,9 +4580,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.90" +version = "0.2.91" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bae1abb6806dc1ad9e560ed242107c0f6c84335f1749dd4e8ddb012ebd5e25a7" +checksum = "642f325be6301eb8107a83d12a8ac6c1e1c54345a7ef1a9261962dfefda09e66" dependencies = [ "proc-macro2 1.0.78", "quote 1.0.35", @@ -4559,15 +4593,15 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.90" +version = "0.2.91" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d91413b1c31d7539ba5ef2451af3f0b833a005eb27a631cec32bc0635a8602b" +checksum = "4f186bd2dcf04330886ce82d6f33dd75a7bfcf69ecf5763b89fcde53b6ac9838" [[package]] name = "web-sys" -version = "0.3.67" +version = "0.3.68" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "58cd2333b6e0be7a39605f0e255892fd7418a682d8da8fe042fe25128794d2ed" +checksum = "96565907687f7aceb35bc5fc03770a8a0471d82e479f25832f54a0e3f4b28446" dependencies = [ "js-sys", "wasm-bindgen", @@ -4585,9 +4619,9 @@ dependencies = [ [[package]] name = "webpki-roots" -version = "0.25.3" +version = "0.25.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1778a42e8b3b90bff8d0f5032bf22250792889a5cdc752aa0020c84abe3aaf10" +checksum = "5f20c57d8d7db6d3b86154206ae5d8fba62dd39573114de97c2cb0578251f8e1" [[package]] name = "widestring" @@ -4860,9 +4894,9 @@ checksum = "dff9641d1cd4be8d1a070daf9e3773c5f67e78b4d9d42263020c057706765c04" [[package]] name = "winnow" -version = "0.5.35" +version = "0.5.39" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1931d78a9c73861da0134f453bb1f790ce49b2e30eba8410b4b79bac72b46a2d" +checksum = "5389a154b01683d28c77f8f68f49dea75f0a4da32557a58f68ee51ebba472d29" dependencies = [ "memchr", ] @@ -4907,9 +4941,9 @@ dependencies = [ [[package]] name = "x25519-dalek" -version = "2.0.0" +version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fb66477291e7e8d2b0ff1bcb900bf29489a9692816d79874bea351e7a8b6de96" +checksum = "c7e468321c81fb07fa7f4c636c3972b9100f0346e5b6a9f2bd0603a52f7ed277" dependencies = [ "curve25519-dalek", "rand_core", diff --git a/crates/network-scanner-net/Cargo.toml b/crates/network-scanner-net/Cargo.toml index 41baa027f..fc4e95fb2 100644 --- a/crates/network-scanner-net/Cargo.toml +++ b/crates/network-scanner-net/Cargo.toml @@ -8,12 +8,10 @@ publish = false [dependencies] anyhow = "1.0.79" crossbeam = { version = "0.8.4", features = ["crossbeam-channel"] } -futures = "0.3.30" parking_lot = "0.12.1" -polling = { git = "https://github.com/smol-rs/polling", rev = "62430fd56e668559d08ca7071ab13a0e116ba515" } +polling = { git = "https://github.com/devolutions/polling.git", rev = "c04e8ee40415cad551fe044457270f4a2d7c491d" } socket2 = { version = "0.5.5", features = ["all"] } thiserror = "1.0.56" -tokio-stream = "0.1.14" tracing = "0.1.40" [dev-dependencies] diff --git a/crates/network-scanner-net/examples/broadcast.rs b/crates/network-scanner-net/examples/broadcast.rs index 2136cc355..5d6d70ccb 100644 --- a/crates/network-scanner-net/examples/broadcast.rs +++ b/crates/network-scanner-net/examples/broadcast.rs @@ -5,7 +5,7 @@ use socket2::SockAddr; #[tokio::main] pub async fn main() -> anyhow::Result<()> { tracing_subscriber::fmt::SubscriberBuilder::default() - .with_max_level(tracing::Level::DEBUG) + .with_max_level(tracing::Level::TRACE) .with_thread_names(true) .init(); diff --git a/crates/network-scanner-net/examples/tcp_bench.rs b/crates/network-scanner-net/examples/tcp_bench.rs new file mode 100644 index 000000000..340488f34 --- /dev/null +++ b/crates/network-scanner-net/examples/tcp_bench.rs @@ -0,0 +1,42 @@ +use std::net::{IpAddr, SocketAddr}; + +use network_scanner_net::runtime::Socket2Runtime; +use socket2::{Domain, SockAddr, Type}; + +fn main() -> anyhow::Result<()> { + let rt = tokio::runtime::Runtime::new()?; + + rt.block_on(async move { + let runtime = Socket2Runtime::new(None)?; + + let mut futures = vec![]; + for ip in 0..255 { + let ip: IpAddr = format!("10.10.0.{}", ip).parse().unwrap(); + let runtime = runtime.clone(); + let ports = vec![22, 23, 80, 443, 3389]; + for port in ports { + let socket = runtime.new_socket(Domain::IPV4, Type::STREAM, None)?; + let addr = SocketAddr::from((ip, port)); + let addr = SockAddr::from(addr); + let future = async move { + socket.connect(&addr).await?; + anyhow::Ok(()) + }; + futures.push(future); + } + } + + let now = std::time::Instant::now(); + let hanldes: Vec<_> = futures.into_iter().map(|f| tokio::task::spawn(f)).collect(); + + for handle in hanldes { + let _ = handle.await; + } + + println!("elapsed: {:?}", now.elapsed()); + + anyhow::Ok(()) + })?; + + Ok(()) +} diff --git a/crates/network-scanner-net/src/runtime.rs b/crates/network-scanner-net/src/runtime.rs index 483954b94..c82f67622 100644 --- a/crates/network-scanner-net/src/runtime.rs +++ b/crates/network-scanner-net/src/runtime.rs @@ -7,10 +7,12 @@ use std::{ Arc, }, task::Waker, + time::Duration, }; use anyhow::Context; use crossbeam::channel::{Receiver, Sender}; + use parking_lot::Mutex; use polling::{Event, Events}; use socket2::Socket; @@ -23,8 +25,7 @@ pub struct Socket2Runtime { next_socket_id: AtomicUsize, is_terminated: Arc, register_sender: Sender, - event_receiver: Receiver, - event_cache: Mutex>, + event_history: Arc>>, } impl Drop for Socket2Runtime { @@ -43,27 +44,25 @@ impl Drop for Socket2Runtime { } } -const QUEUE_CAPACITY: usize = 1024; +const QUEUE_CAPACITY: usize = 8024; impl Socket2Runtime { - /// Create a new runtime with a queue capacity, default is 1024. + /// Create a new runtime with a queue capacity, default is 8024. pub fn new(queue_capacity: Option) -> anyhow::Result> { let poller = polling::Poller::new()?; let (register_sender, register_receiver) = crossbeam::channel::bounded(queue_capacity.unwrap_or(QUEUE_CAPACITY)); - let (event_sender, event_receiver) = crossbeam::channel::bounded(queue_capacity.unwrap_or(QUEUE_CAPACITY)); - + let event_history = Arc::new(Mutex::new(HashSet::new())); let runtime = Self { poller: Arc::new(poller), next_socket_id: AtomicUsize::new(0), is_terminated: Arc::new(AtomicBool::new(false)), register_sender, - event_receiver, - event_cache: Mutex::new(HashSet::new()), + event_history: event_history.clone(), }; let runtime = Arc::new(runtime); - runtime.start_loop(register_receiver, event_sender)?; + runtime.start_loop(register_receiver, event_history)?; Ok(runtime) } @@ -84,130 +83,105 @@ impl Socket2Runtime { pub(crate) fn remove_socket(&self, socket: &socket2::Socket, id: usize) -> anyhow::Result<()> { self.poller.delete(socket)?; // remove all events related to this socket - self.event_cache.lock().retain(|event| id == event.0.key); + self.remove_events_with_id_from_history(id); Ok(()) } fn start_loop( &self, register_receiver: Receiver, - event_sender: Sender, + event_history: Arc>>, ) -> anyhow::Result<()> { - // To prevent an Arc cycle with the Arc, we added additional indirection. - // Otherwise, the reference in the new thread would prevent the runtime from being dropped and shutdown. - let poller = self.poller.clone(); + // We make is_terminated Arc and poller Arc so that we can clone them and move them into the thread. + // The reason why we cannot hold a Arc in the thread is because it will create a cycle reference and the runtime will never be dropped. let is_terminated = self.is_terminated.clone(); - + let poller = self.poller.clone(); std::thread::Builder::new() .name("[raw-socket]:io-event-loop".to_string()) .spawn(move || { - let mut events = Events::with_capacity(NonZeroUsize::new(1024).unwrap()); + let mut events = Events::with_capacity(NonZeroUsize::new(QUEUE_CAPACITY).unwrap()); tracing::debug!("starting io event loop"); // events registered but not happened yet - let mut events_registered = HashMap::new(); - - // events happened but not registered yet - let mut events_happened = HashMap::new(); + let mut events_registered: HashMap = HashMap::new(); loop { if is_terminated.load(Ordering::Acquire) { break; } - tracing::debug!("polling events"); - if let Err(e) = poller.wait(&mut events, None) { + // The timeout 200ms is critical, sometimes the event might be registered after the event happened + // the timeout limit will allow the events to be checked periodically. + if let Err(e) = poller.wait(&mut events, Some(Duration::from_millis(200))) { tracing::error!(error = ?e, "failed to poll events"); is_terminated.store(true, Ordering::SeqCst); break; }; + for event in events.iter() { tracing::trace!(?event, "event happened"); - events_happened.insert(event.key, event); + // This is different from just insert, as the event wrapper will have the same hash, it actually does not replace the old one. + // by removing the old one first, we can make sure the new one is inserted. + event_history.lock().remove(&event.into()); + event_history.lock().insert(event.into()); } events.clear(); - while let Ok(event) = register_receiver.try_recv() { match event { - RegisterEvent::Register { id, waker } => { - events_registered.insert(id, waker); + RegisterEvent::Register { event, waker } => { + events_registered.insert(event.into(), waker); } - RegisterEvent::Unregister { id } => { - events_registered.remove(&id); + RegisterEvent::Unregister { event } => { + events_registered.remove(&event.into()); } } } - let intersection = events_happened - .keys() - .filter(|key| events_registered.contains_key(key)) - .cloned() - .collect::>(); - - intersection.into_iter().for_each(|ref key| { - let event = events_happened.remove(key).unwrap(); - let waker = events_registered.remove(key).unwrap(); - let _ = event_sender.try_send(event); - waker.wake_by_ref(); - tracing::trace!(?event, "waking up waker"); - }); + for (event, waker) in events_registered.iter() { + if event_history.lock().get(event).is_some() { + waker.wake_by_ref(); + } + } } - tracing::info!("io event loop terminated"); + tracing::debug!("io event loop terminated"); }) .with_context(|| "failed to spawn io event loop thread")?; Ok(()) } - /// Ideally, we should have a dedicated thread to handle events we received, but we don't really want to spawn a second thread - /// Alternatively, we can have all socket futures call this function to check if there is any event for them. - /// The number of times the socket futures is polled is almost guaranteed to be more than the number of registration we received. - /// hence the event receiver will not be blocked. - pub(crate) fn check_event(&self, event: Event, remove: bool) -> Option { - let mut event_cache = self.event_cache.lock(); - while let Ok(event) = self.event_receiver.try_recv() { - event_cache.insert(event.into()); + pub(crate) fn check_event_with_id(&self, id: usize) -> Vec { + let event_interested = vec![ + Event::readable(id), + Event::writable(id), + Event::all(id), + Event::none(id), + ]; + + let mut res = Vec::new(); + for event in event_interested { + if let Some(event) = self.event_history.lock().get(&event.into()) { + res.push(event.0); + } } - tracing::debug!("checking event cache {:?}", event_cache); - - let event = if remove { - event_cache.take(&event.into()) - } else if event_cache.contains(&event.into()) { - Some(event.into()) - } else { - None - }; - event.map(|event| event.into_inner()) + res } - pub(crate) fn check_event_with_id(&self, id: usize, remove: bool) -> Vec { - let mut event_cache = self.event_cache.lock(); - while let Ok(event) = self.event_receiver.try_recv() { - event_cache.insert(event.into()); - } + pub(crate) fn remove_event_from_history(&self, event: Event) { + self.event_history.lock().remove(&event.into()); + } + + pub(crate) fn remove_events_with_id_from_history(&self, id: usize) { let event_interested = vec![ Event::readable(id), Event::writable(id), Event::all(id), Event::none(id), ]; - let mut res = vec![]; - if remove { - event_interested.into_iter().for_each(|event| { - if let Some(event) = event_cache.take(&event.into()) { - res.push(event.into_inner()); - } - }); - } else { - event_interested.into_iter().for_each(|event| { - if event_cache.contains(&event.into()) { - res.push(event); - } - }); + for event in event_interested { + self.event_history.lock().remove(&event.into()); } - - res } pub(crate) fn register(&self, socket: &Socket, event: Event, waker: Waker) -> anyhow::Result<()> { @@ -222,24 +196,45 @@ impl Socket2Runtime { // it would be better to drop the register event then block the worker thread or main thread. // as the worker thread is shared for the entire application. self.register_sender - .try_send(RegisterEvent::Register { id: event.key, waker }) + .try_send(RegisterEvent::Register { event, waker }) .with_context(|| "failed to send register event to register loop") } - pub(crate) fn unregister(&self, id: usize) -> anyhow::Result<()> { + pub(crate) fn register_events(&self, socket: &Socket, events: &[Event], waker: Waker) -> anyhow::Result<()> { + if self.is_terminated.load(Ordering::Acquire) { + Err(ScannnerNetError::AsyncRuntimeError("runtime is terminated".to_string()))?; + } + + for event in events { + tracing::trace!(?event, ?socket, "registering event"); + self.poller.modify(socket, *event)?; + self.register_sender + .try_send(RegisterEvent::Register { + event: *event, + waker: waker.clone(), + }) + .with_context(|| "failed to send register event to register loop")?; + } + + Ok(()) + } + + pub(crate) fn unregister(&self, event: Event) -> anyhow::Result<()> { if self.is_terminated.load(Ordering::Acquire) { Err(ScannnerNetError::AsyncRuntimeError("runtime is terminated".to_string()))?; } self.register_sender - .try_send(RegisterEvent::Unregister { id }) - .with_context(|| "failed to send unregister event to register loop") + .try_send(RegisterEvent::Unregister { event }) + .with_context(|| "failed to send unregister event to register loop")?; + + Ok(()) } } #[derive(Debug)] enum RegisterEvent { - Register { id: usize, waker: Waker }, - Unregister { id: usize }, + Register { event: Event, waker: Waker }, + Unregister { event: Event }, } #[derive(Debug)] @@ -267,8 +262,8 @@ impl From for EventWrapper { } } -impl EventWrapper { - pub(crate) fn into_inner(self) -> Event { - self.0 +impl From<&Event> for EventWrapper { + fn from(event: &Event) -> Self { + Self(*event) } } diff --git a/crates/network-scanner-net/src/socket.rs b/crates/network-scanner-net/src/socket.rs index 61f305c89..69e901e4a 100644 --- a/crates/network-scanner-net/src/socket.rs +++ b/crates/network-scanner-net/src/socket.rs @@ -24,7 +24,6 @@ impl Debug for AsyncRawSocket { impl Drop for AsyncRawSocket { fn drop(&mut self) { - tracing::trace!(id = %self.id,socket = ?self.socket, "drop socket"); let _ = self // We ignore errors here, avoid crashing the thread .runtime .remove_socket(&self.socket, self.id) @@ -141,9 +140,6 @@ struct RecvFromFuture<'a> { impl Future for RecvFromFuture<'_> { type Output = std::io::Result<(usize, SockAddr)>; fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll { - // By checking event at every call, it removes excessive events excessive in the cache and the channel - self.runtime.check_event(Event::readable(self.id), true); - let socket = &self.socket.clone(); // avoid borrow checker error match socket.recv_from(self.buf) { Ok(a) => std::task::Poll::Ready(Ok(a)), @@ -152,6 +148,13 @@ impl Future for RecvFromFuture<'_> { } } +impl Drop for RecvFromFuture<'_> { + fn drop(&mut self) { + self.runtime.remove_event_from_history(Event::readable(self.id)); + let _ = self.runtime.unregister(Event::readable(self.id)); + } +} + struct SendToFuture<'a> { socket: Arc, runtime: Arc, @@ -164,7 +167,6 @@ impl<'a> Future for SendToFuture<'a> { type Output = std::io::Result; fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll { - self.runtime.check_event(Event::writable(self.id), true); match self.socket.send_to(self.data, self.addr) { Ok(a) => std::task::Poll::Ready(Ok(a)), Err(e) => resolve(e, &self.socket, &self.runtime, Event::writable(self.id), cx.waker()), @@ -172,6 +174,13 @@ impl<'a> Future for SendToFuture<'a> { } } +impl Drop for SendToFuture<'_> { + fn drop(&mut self) { + self.runtime.remove_event_from_history(Event::writable(self.id)); + let _ = self.runtime.unregister(Event::writable(self.id)); + } +} + struct AcceptFuture { socket: Arc, runtime: Arc, @@ -182,7 +191,6 @@ impl Future for AcceptFuture { type Output = std::io::Result<(AsyncRawSocket, SockAddr)>; fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll { - self.runtime.check_event(Event::readable(self.id), true); match self.socket.accept() { Ok((socket, addr)) => { let socket = AsyncRawSocket::from_socket(socket, self.id, self.runtime.clone())?; @@ -192,6 +200,14 @@ impl Future for AcceptFuture { } } } + +impl Drop for AcceptFuture { + fn drop(&mut self) { + self.runtime.remove_event_from_history(Event::readable(self.id)); + let _ = self.runtime.unregister(Event::readable(self.id)); + } +} + struct ConnectFuture<'a> { socket: Arc, runtime: Arc, @@ -203,15 +219,32 @@ impl<'a> Future for ConnectFuture<'a> { type Output = std::io::Result<()>; fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll { - let events = self.runtime.check_event_with_id(self.id, true); - if events.iter().any(|e| e.is_connect_failed().unwrap_or(false)) { - tracing::warn!(?self.socket, ?self.addr, "connect failed"); - return std::task::Poll::Ready(Err(std::io::Error::new(std::io::ErrorKind::Other, "connect failed"))); - }; + let events = self.runtime.check_event_with_id(self.id); + for event in events { + tracing::trace!(?event, "event found"); + if event + .is_err() // For linux, failed connection is ERR and HUP, a sigle HUP does not indicate a failed connection + .expect("your platform does not support connect failed") + { + return std::task::Poll::Ready(Err(std::io::Error::new( + std::io::ErrorKind::Other, + "connection failed", + ))); + } - if !events.is_empty() { - tracing::trace!(?events, "connect success"); - return std::task::Poll::Ready(Ok(())); + // This is a special case, this happens when using epoll to wait for a unconnected TCP socket. + // We clearly needs to call connect function, so we break the loop and call connect. + #[cfg(target_os = "linux")] + if event.is_interrupt() && !event.is_err().expect("your platform does not support connect failed") { + tracing::trace!("out and hup"); + self.runtime.remove_events_with_id_from_history(self.id); + break; + } + + if event.writable || event.readable { + self.runtime.remove_events_with_id_from_history(self.id); + return std::task::Poll::Ready(Ok(())); + } } let err = match self.socket.connect(self.addr) { @@ -230,13 +263,13 @@ impl<'a> Future for ConnectFuture<'a> { #[cfg(not(target_os = "linux"))] let in_progress = err.kind() == std::io::ErrorKind::WouldBlock; + let events_interested = [Event::readable(self.id), Event::writable(self.id), Event::all(self.id)]; if in_progress { - tracing::trace!("connect should register"); if let Err(e) = self .runtime - .register(&self.socket, Event::all(self.id), cx.waker().clone()) + .register_events(&self.socket, &events_interested, cx.waker().clone()) { - tracing::warn!(?self.socket, ?self.addr, "failed to register socket to poller"); + tracing::warn!(?self.socket, ?self.addr, "failed to register socket to poller, error: {}", e); return std::task::Poll::Ready(Err(std::io::Error::new( std::io::ErrorKind::Other, format!("failed to register socket to poller: {}", e), @@ -247,6 +280,16 @@ impl<'a> Future for ConnectFuture<'a> { } } +impl Drop for ConnectFuture<'_> { + fn drop(&mut self) { + self.runtime.remove_events_with_id_from_history(self.id); + let events = [Event::readable(self.id), Event::writable(self.id), Event::all(self.id)]; + events.into_iter().for_each(|event| { + self.runtime.remove_event_from_history(event); + }); + } +} + struct SendFuture<'a> { socket: Arc, runtime: Arc, @@ -258,7 +301,6 @@ impl<'a> Future for SendFuture<'a> { type Output = std::io::Result; fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll { - self.runtime.check_event(Event::writable(self.id), true); match self.socket.send(self.data) { Ok(a) => std::task::Poll::Ready(Ok(a)), Err(e) => resolve(e, &self.socket, &self.runtime, Event::writable(self.id), cx.waker()), @@ -266,6 +308,13 @@ impl<'a> Future for SendFuture<'a> { } } +impl Drop for SendFuture<'_> { + fn drop(&mut self) { + self.runtime.remove_event_from_history(Event::writable(self.id)); + let _ = self.runtime.unregister(Event::writable(self.id)); + } +} + struct RecvFuture<'a> { socket: Arc, buf: &'a mut [MaybeUninit], @@ -277,7 +326,6 @@ impl<'a> Future for RecvFuture<'a> { type Output = std::io::Result; fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll { - self.runtime.check_event(Event::readable(self.id), true); let socket = &self.socket.clone(); // avoid borrow checker error match socket.recv(self.buf) { Ok(a) => std::task::Poll::Ready(Ok(a)), @@ -286,26 +334,13 @@ impl<'a> Future for RecvFuture<'a> { } } -/// Non-blocking socket does not work with timeout, so we need impl Drop for unregistering socket from poller -/// Impl Drop for unregistering socket from poller, the caller can use other async timer for timeout -macro_rules! impl_drop { - ($type:ty) => { - impl Drop for $type { - fn drop(&mut self) { - tracing::trace!(id = %self.id,socket = ?self.socket, "drop future"); - let _ = self.runtime.unregister(self.id); - } - } - }; +impl Drop for RecvFuture<'_> { + fn drop(&mut self) { + self.runtime.remove_event_from_history(Event::readable(self.id)); + let _ = self.runtime.unregister(Event::readable(self.id)); + } } -impl_drop!(RecvFromFuture<'_>); -impl_drop!(SendToFuture<'_>); -impl_drop!(AcceptFuture); -impl_drop!(ConnectFuture<'_>); -impl_drop!(SendFuture<'_>); -impl_drop!(RecvFuture<'_>); - fn resolve( err: std::io::Error, socket: &Arc, @@ -314,9 +349,8 @@ fn resolve( waker: &std::task::Waker, ) -> std::task::Poll> { if err.kind() == std::io::ErrorKind::WouldBlock { - tracing::trace!(?event, "operation would block, expecting event"); if let Err(e) = runtime.register(socket, event, waker.clone()) { - tracing::warn!(?socket, ?event, "failed to register socket to poller"); + tracing::warn!(?socket, ?event, ?e, "failed to register socket to poller"); return std::task::Poll::Ready(Err(std::io::Error::new( std::io::ErrorKind::Other, format!("failed to register socket to poller: {}", e), diff --git a/crates/network-scanner-net/src/tests.rs b/crates/network-scanner-net/src/tests.rs index 2d0b2d83e..3b310b630 100644 --- a/crates/network-scanner-net/src/tests.rs +++ b/crates/network-scanner-net/src/tests.rs @@ -15,7 +15,6 @@ use tokio::{ use crate::socket::AsyncRawSocket; #[tokio::test(flavor = "multi_thread", worker_threads = 4)] -#[ignore = "TODO"] async fn multiple_udp() -> anyhow::Result<()> { let addr = local_udp_server()?; tokio::time::sleep(std::time::Duration::from_millis(200)).await; // wait for the other socket to start @@ -60,7 +59,6 @@ async fn multiple_udp() -> anyhow::Result<()> { } #[tokio::test(flavor = "multi_thread", worker_threads = 4)] -#[ignore = "TODO"] async fn test_connectivity() -> anyhow::Result<()> { let kill_server = Arc::new(AtomicBool::new(false)); let (addr, handle) = local_tcp_server(kill_server.clone()).await?; @@ -79,7 +77,6 @@ async fn test_connectivity() -> anyhow::Result<()> { } #[tokio::test(flavor = "multi_thread", worker_threads = 6)] -#[ignore = "TODO"] async fn multiple_tcp() -> anyhow::Result<()> { let kill_server = Arc::new(AtomicBool::new(false)); let (addr, handle) = local_tcp_server(kill_server.clone()).await?; @@ -129,7 +126,6 @@ async fn multiple_tcp() -> anyhow::Result<()> { } #[tokio::test(flavor = "multi_thread", worker_threads = 4)] -#[ignore = "TODO"] async fn work_with_tokio_tcp() -> anyhow::Result<()> { let kill_server = Arc::new(AtomicBool::new(false)); let (addr, tcp_handle) = local_tcp_server(kill_server.clone()).await?; @@ -179,46 +175,23 @@ async fn work_with_tokio_tcp() -> anyhow::Result<()> { #[tokio::test(flavor = "multi_thread", worker_threads = 4)] pub async fn drop_runtime() -> anyhow::Result<()> { - let (cov, _guard) = tracing_cov_mark::init_cov_mark(); - let kill_server = Arc::new(AtomicBool::new(false)); - let (addr, handle) = local_tcp_server(kill_server.clone()).await?; - - tracing_subscriber::fmt::SubscriberBuilder::default() - .with_max_level(tracing::Level::TRACE) - .with_thread_names(true) - .init(); - { let runtime = crate::runtime::Socket2Runtime::new(None)?; { - let good_socket = runtime.new_socket(socket2::Domain::IPV4, socket2::Type::STREAM, None)?; let bad_socket = runtime.new_socket(socket2::Domain::IPV4, socket2::Type::STREAM, None)?; - tracing::info!("good_socket: {:?}", good_socket); tracing::info!("bad_socket: {:?}", bad_socket); let unused_port = 12345; - let available_addr = addr; let non_available_addr = SocketAddr::from(([127, 0, 0, 1], unused_port)); - let handle = - tokio::task::spawn(async move { good_socket.connect(&socket2::SockAddr::from(available_addr)).await }); - - let handle2 = + let _ = tokio::task::spawn( async move { bad_socket.connect(&socket2::SockAddr::from(non_available_addr)).await }, - ); - - let (a, b) = tokio::join!(handle, handle2); - // Remove the outer error from tokio task. - let a = a?; - let b = b?; - tracing::info!("should connect: {:?}", &a); - tracing::info!("should not connect: {:?}", &b); - assert!(a.is_ok()); - assert!(b.is_err()); + ) + .await; } tracing::info!("runtime arc count: {}", Arc::strong_count(&runtime)); @@ -226,11 +199,6 @@ pub async fn drop_runtime() -> anyhow::Result<()> { } tracing::info!("runtime should be dropped here"); - cov.assert_mark("socket2_runtime_drop"); - - kill_server.store(true, std::sync::atomic::Ordering::Relaxed); - handle.abort(); - Ok(()) } diff --git a/crates/network-scanner-proto/Cargo.toml b/crates/network-scanner-proto/Cargo.toml index dcfe9ee53..80c156260 100644 --- a/crates/network-scanner-proto/Cargo.toml +++ b/crates/network-scanner-proto/Cargo.toml @@ -1,12 +1,15 @@ [package] name = "network-scanner-proto" version = "0.0.0" -authors = ["Devolutions Inc. ","Jeremy Wall "] +authors = [ + "Devolutions Inc. ", + "Jeremy Wall ", + "Jon Grimes " +] edition = "2021" publish = false -license = "Apache-2.0" +license = "MIT/Apache-2.0" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - [dependencies] byteorder = "1.5.0" diff --git a/crates/network-scanner-proto/README.md b/crates/network-scanner-proto/README.md index ef181c577..dffd9e7e5 100644 --- a/crates/network-scanner-proto/README.md +++ b/crates/network-scanner-proto/README.md @@ -2,4 +2,6 @@ This crates contains network packet struct definitions and parsers. ## Acknowledgement -This project includes portions of [icmp-socket](https://github.com/zaphar/icmp-socket), held by Jeremy Wall, 2022. \ No newline at end of file +This project includes portions of +1. [icmp-socket](https://github.com/zaphar/icmp-socket), held by Jeremy Wall, 2022. +2. [nbtscanner](https://github.com/jonkgrimes/nbtscanner/blob/main/src/nbt_packet.rs) held by Jon Grimes,2018 \ No newline at end of file diff --git a/crates/network-scanner-proto/src/icmp_v4.rs b/crates/network-scanner-proto/src/icmp_v4.rs index 6af8a1f30..e7d59a903 100644 --- a/crates/network-scanner-proto/src/icmp_v4.rs +++ b/crates/network-scanner-proto/src/icmp_v4.rs @@ -17,6 +17,7 @@ pub enum Icmpv4MessageType { InformationReply = 16, } +#[derive(Debug)] pub enum Icmpv4Message { EchoReply { // type 0 @@ -213,6 +214,8 @@ impl Icmpv4Message { bytes } } + +#[derive(Debug)] pub struct Icmpv4Packet { pub code: u8, pub checksum: u16, @@ -228,8 +231,8 @@ impl TryFrom<&[u8]> for Icmpv4Packet { } impl From for Vec { - fn from(value: Icmpv4Packet) -> Self { - value.to_bytes(true) + fn from(val: Icmpv4Packet) -> Self { + val.to_bytes(true) } } @@ -309,7 +312,6 @@ impl Icmpv4Packet { return Err(PacketParseError::UnrecognizedICMPType(t)); } }; - Ok(Icmpv4Packet { code, checksum, @@ -385,7 +387,6 @@ fn sum_big_endian_words(bs: &[u8]) -> u32 { let len = bs.len(); let mut data = bs; let mut sum = 0u32; - // Iterate by word which is two bytes. while data.len() >= 2 { sum += BigEndian::read_u16(&data[0..2]) as u32; @@ -397,6 +398,5 @@ fn sum_big_endian_words(bs: &[u8]) -> u32 { // If odd then checksum the last byte sum += (data[0] as u32) << 8; } - sum } diff --git a/crates/network-scanner-proto/src/lib.rs b/crates/network-scanner-proto/src/lib.rs index aa68e4424..6ddd3fe51 100644 --- a/crates/network-scanner-proto/src/lib.rs +++ b/crates/network-scanner-proto/src/lib.rs @@ -1 +1,2 @@ pub mod icmp_v4; +pub mod netbios; diff --git a/crates/network-scanner-proto/src/netbios.rs b/crates/network-scanner-proto/src/netbios.rs new file mode 100644 index 000000000..e1a16f1ba --- /dev/null +++ b/crates/network-scanner-proto/src/netbios.rs @@ -0,0 +1,202 @@ +/// original source: https://github.com/jonkgrimes/nbtscanner/blob/main/Cargo.toml +use std::fmt; +use std::fmt::Display; +use std::net::Ipv4Addr; + +const RESPONSE_BASE_LEN: usize = 57; +const RESPONSE_NAME_LEN: usize = 15; +const RESPONSE_NAME_BLOCK_LEN: usize = 18; + +pub struct NetBiosPacket<'a> { + pub ip: Ipv4Addr, + pub data: &'a [u8], +} + +impl<'a> Display for NetBiosPacket<'a> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + let mut values = String::new(); + for byte in self.data.iter() { + values.push_str(&format!("0x{:01$X}, ", byte, 2)); + } + write!(f, "[{}]", values) + } +} + +impl<'a> NetBiosPacket<'a> { + pub fn from(ip: Ipv4Addr, data: &[u8]) -> NetBiosPacket { + NetBiosPacket { ip, data } + } + + pub fn name(&self) -> String { + let offset = RESPONSE_BASE_LEN + RESPONSE_NAME_LEN; + let name_range = RESPONSE_BASE_LEN..offset; + let name_bytes = Vec::from(&self.data[name_range]); + + match String::from_utf8(name_bytes) { + Ok(name) => String::from(name.trim_end()), + Err(_) => { + eprintln!("Couldn't decode the name"); + String::from("N/A") + } + } + } + + pub fn group(&self) -> Option { + let offset = RESPONSE_BASE_LEN + RESPONSE_NAME_LEN + 2; + let block_range = offset..(offset + RESPONSE_NAME_BLOCK_LEN - 1); + let block_bytes = Vec::from(&self.data[block_range]); + + match String::from_utf8(block_bytes) { + Ok(group) => { + let trimmed_group = group.trim_matches('\u{0}').trim_end(); + Some(String::from(trimmed_group)) + } + Err(_) => None, + } + } + + pub fn group_and_name(&self) -> String { + if let Some(group) = self.group() { + if !group.is_empty() { + return format!("{}\\{}", group, self.name()); + } + } + self.name() + } + + pub fn mac_address(&self) -> String { + let name_count = self.data[RESPONSE_BASE_LEN - 1] as usize; + let mut name_bytes: [u8; 6] = [0; 6]; + for (n, byte) in name_bytes.iter_mut().enumerate() { + let offset = RESPONSE_BASE_LEN + RESPONSE_NAME_BLOCK_LEN * name_count + n; + *byte = self.data[offset]; + } + format!( + "{:02X}:{:02X}:{:02X}:{:02X}:{:02X}:{:02X}", + name_bytes[0], name_bytes[1], name_bytes[2], name_bytes[3], name_bytes[4], name_bytes[5] + ) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn create_nbt_packet_from_data_slice() { + let mut data = [0u8; 1024]; + let packet = [ + 0xA2, 0x48, 0x84, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x20, 0x43, 0x4B, 0x41, 0x41, 0x41, + 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, + 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x00, 0x00, 0x21, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x77, 0x04, 0x4A, 0x41, 0x43, 0x4B, 0x49, 0x45, 0x47, 0x2D, 0x57, 0x53, 0x20, 0x20, 0x20, 0x20, 0x20, + 0x20, 0x44, 0x00, 0x4A, 0x41, 0x43, 0x4B, 0x49, 0x45, 0x47, 0x2D, 0x57, 0x53, 0x20, 0x20, 0x20, 0x20, 0x20, + 0x00, 0x44, 0x00, 0x53, 0x50, 0x49, 0x43, 0x45, 0x20, 0x20, 0x20, 0x20, 0x20, 0x20, 0x20, 0x20, 0x20, 0x20, + 0x00, 0xC4, 0x00, 0x53, 0x50, 0x49, 0x43, 0x45, 0x20, 0x20, 0x20, 0x20, 0x20, 0x20, 0x20, 0x20, 0x20, 0x20, + 0x1E, 0xC4, 0x00, 0x2C, 0x41, 0x38, 0xBA, 0xC3, 0x64, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + ]; + for (i, elem) in packet.iter().enumerate() { + data[i] = *elem; + } + let _actual = NetBiosPacket::from(Ipv4Addr::from([127, 0, 0, 1]), &data); + assert_eq!(true, true) + } + + #[test] + fn parse_name_from_data_correctly() { + let mut data = [0u8; 1024]; + let packet = [ + 0xA2, 0x48, 0x84, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x20, 0x43, 0x4B, 0x41, 0x41, 0x41, + 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, + 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x00, 0x00, 0x21, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x77, 0x04, 0x4A, 0x41, 0x43, 0x4B, 0x49, 0x45, 0x47, 0x2D, 0x57, 0x53, 0x20, 0x20, 0x20, 0x20, 0x20, + 0x20, 0x44, 0x00, 0x4A, 0x41, 0x43, 0x4B, 0x49, 0x45, 0x47, 0x2D, 0x57, 0x53, 0x20, 0x20, 0x20, 0x20, 0x20, + 0x00, 0x44, 0x00, 0x53, 0x50, 0x49, 0x43, 0x45, 0x20, 0x20, 0x20, 0x20, 0x20, 0x20, 0x20, 0x20, 0x20, 0x20, + 0x00, 0xC4, 0x00, 0x53, 0x50, 0x49, 0x43, 0x45, 0x20, 0x20, 0x20, 0x20, 0x20, 0x20, 0x20, 0x20, 0x20, 0x20, + 0x1E, 0xC4, 0x00, 0x2C, 0x41, 0x38, 0xBA, 0xC3, 0x64, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + ]; + for (i, elem) in packet.iter().enumerate() { + data[i] = *elem; + } + let expected = "JACKIEG-WS"; + let actual = NetBiosPacket::from(Ipv4Addr::from([127, 0, 0, 1]), &data); + + assert_eq!(expected, actual.name()); + } + + #[test] + fn parse_group_from_data_correctly() { + let mut data = [0u8; 1024]; + let packet = [ + 0xA2, 0x48, 0x84, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x20, 0x43, 0x4B, 0x41, 0x41, 0x41, + 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, + 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x00, 0x00, 0x21, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x77, 0x04, 0x4A, 0x41, 0x43, 0x4B, 0x49, 0x45, 0x47, 0x2D, 0x57, 0x53, 0x20, 0x20, 0x20, 0x20, 0x20, + 0x20, 0x44, 0x00, 0x4A, 0x41, 0x43, 0x4B, 0x49, 0x45, 0x47, 0x2D, 0x57, 0x53, 0x20, 0x20, 0x20, 0x20, 0x20, + 0x00, 0x44, 0x00, 0x53, 0x50, 0x49, 0x43, 0x45, 0x20, 0x20, 0x20, 0x20, 0x20, 0x20, 0x20, 0x20, 0x20, 0x20, + 0x00, 0xC4, 0x00, 0x53, 0x50, 0x49, 0x43, 0x45, 0x20, 0x20, 0x20, 0x20, 0x20, 0x20, 0x20, 0x20, 0x20, 0x20, + 0x1E, 0xC4, 0x00, 0x2C, 0x41, 0x38, 0xBA, 0xC3, 0x64, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + ]; + for (i, elem) in packet.iter().enumerate() { + data[i] = *elem; + } + let expected = String::from("JACKIEG-WS"); + let actual = NetBiosPacket::from(Ipv4Addr::from([127, 0, 0, 1]), &data); + + assert_eq!(Some(expected), actual.group()); + } + + #[test] + fn parse_name_and_group_from_data_correctly_2() { + let mut data = [0u8; 1024]; + let packet = [ + 0xA2, 0x48, 0x84, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x20, 0x43, 0x4B, 0x41, 0x41, 0x41, + 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, + 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x00, 0x00, 0x21, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x77, 0x04, 0x41, 0x4C, 0x45, 0x58, 0x4B, 0x2D, 0x50, 0x43, 0x20, 0x20, 0x20, 0x20, 0x20, 0x20, 0x20, + 0x00, 0x44, 0x00, 0x53, 0x50, 0x49, 0x43, 0x45, 0x20, 0x20, 0x20, 0x20, 0x20, 0x20, 0x20, 0x20, 0x20, 0x20, + 0x00, 0xC4, 0x00, 0x41, 0x4C, 0x45, 0x58, 0x4B, 0x2D, 0x50, 0x43, 0x20, 0x20, 0x20, 0x20, 0x20, 0x20, 0x20, + 0x20, 0x44, 0x00, 0x53, 0x50, 0x49, 0x43, 0x45, 0x20, 0x20, 0x20, 0x20, 0x20, 0x20, 0x20, 0x20, 0x20, 0x20, + 0x1E, 0xC4, 0x00, 0xD0, 0xBF, 0x9C, 0xE4, 0x24, 0x90, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + ]; + for (i, elem) in packet.iter().enumerate() { + data[i] = *elem; + } + let expected = "ALEXK-PC"; + let actual = NetBiosPacket::from(Ipv4Addr::from([127, 0, 0, 1]), &data); + + assert_eq!(expected, actual.name()); + } + + #[test] + fn parse_mac_from_data_correctly() { + let mut data = [0u8; 1024]; + let packet = [ + 0xA2, 0x48, 0x84, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x20, 0x43, 0x4B, 0x41, 0x41, 0x41, + 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, + 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x00, 0x00, 0x21, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x77, 0x04, 0x4A, 0x41, 0x43, 0x4B, 0x49, 0x45, 0x47, 0x2D, 0x57, 0x53, 0x20, 0x20, 0x20, 0x20, 0x20, + 0x20, 0x44, 0x00, 0x4A, 0x41, 0x43, 0x4B, 0x49, 0x45, 0x47, 0x2D, 0x57, 0x53, 0x20, 0x20, 0x20, 0x20, 0x20, + 0x00, 0x44, 0x00, 0x53, 0x50, 0x49, 0x43, 0x45, 0x20, 0x20, 0x20, 0x20, 0x20, 0x20, 0x20, 0x20, 0x20, 0x20, + 0x00, 0xC4, 0x00, 0x53, 0x50, 0x49, 0x43, 0x45, 0x20, 0x20, 0x20, 0x20, 0x20, 0x20, 0x20, 0x20, 0x20, 0x20, + 0x1E, 0xC4, 0x00, 0x2C, 0x41, 0x38, 0xBA, 0xC3, 0x64, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + ]; + for (i, elem) in packet.iter().enumerate() { + data[i] = *elem; + } + let expected = "2C:41:38:BA:C3:64"; + let actual = NetBiosPacket::from(Ipv4Addr::from([127, 0, 0, 1]), &data); + + assert_eq!(expected, actual.mac_address()); + } +} diff --git a/crates/network-scanner/Cargo.toml b/crates/network-scanner/Cargo.toml index d399881ae..02aa5af9c 100644 --- a/crates/network-scanner/Cargo.toml +++ b/crates/network-scanner/Cargo.toml @@ -8,12 +8,18 @@ publish = false [dependencies] anyhow = "1.0.79" +crossbeam = { version = "0.8.4", features = ["crossbeam-channel"] } +dns-lookup = "2.0.4" +network-interface = "1.1.1" network-scanner-net ={ path = "../network-scanner-net" } network-scanner-proto ={ path = "../network-scanner-proto" } +parking_lot = "0.12.1" socket2 = "0.5.5" -tokio = { version = "1.36.0", features = ["io-util"] } +thiserror = "1.0.56" +tokio = { version = "1.36.0", features = ["rt", "sync", "time"] } tracing = "0.1.40" +typed-builder = "0.18.1" [dev-dependencies] -tokio = { version = "1.36.0", features = ["rt","macros","rt-multi-thread"] } +tokio = { version = "1.36.0", features = ["rt","macros","rt-multi-thread","tracing"] } tracing-subscriber = "0.3.18" diff --git a/crates/network-scanner/examples/broadcast.rs b/crates/network-scanner/examples/broadcast.rs new file mode 100644 index 000000000..860934c64 --- /dev/null +++ b/crates/network-scanner/examples/broadcast.rs @@ -0,0 +1,33 @@ +use network_scanner::{broadcast::asynchronous::broadcast, ip_utils::get_subnets, task_utils::TaskManager}; +use network_scanner_net::runtime; +use std::time::Duration; + +#[tokio::main] +pub async fn main() -> anyhow::Result<()> { + tracing_subscriber::fmt::SubscriberBuilder::default() + .with_max_level(tracing::Level::INFO) + .with_thread_names(true) + .init(); + + let runtime = runtime::Socket2Runtime::new(None)?; + let subnets = get_subnets()?; + let mut handles = vec![]; + for subnet in subnets { + tracing::info!("Broadcast: {:?}", subnet.broadcast); + let runtime = runtime.clone(); + let handle = tokio::spawn(async move { + let mut receiver = broadcast(subnet.broadcast, Duration::from_secs(3), runtime, TaskManager::new()) + .await + .unwrap(); + while let Some(addr) = receiver.recv().await { + tracing::info!("Received: {:?}", addr); + } + }); + handles.push(handle); + } + + for handle in handles { + handle.await?; + } + Ok(()) +} diff --git a/crates/network-scanner/examples/netbios.rs b/crates/network-scanner/examples/netbios.rs new file mode 100644 index 000000000..f9b7f68c9 --- /dev/null +++ b/crates/network-scanner/examples/netbios.rs @@ -0,0 +1,33 @@ +use std::{net::Ipv4Addr, time::Duration}; + +use network_scanner::task_utils::TaskManager; + +#[tokio::main] +pub async fn main() -> anyhow::Result<()> { + tracing_subscriber::fmt::SubscriberBuilder::default() + .with_max_level(tracing::Level::DEBUG) + .with_thread_names(true) + .init(); + + let runtime = network_scanner_net::runtime::Socket2Runtime::new(None)?; + + let lower: Ipv4Addr = "10.10.0.0".parse()?; + let upper: Ipv4Addr = "10.10.0.125".parse()?; + let ip_range = + network_scanner::ip_utils::IpAddrRange::new(std::net::IpAddr::V4(lower), std::net::IpAddr::V4(upper))?; + let single_query_duration = Duration::from_secs(1); + let interval = Duration::from_millis(20); + let mut receiver = network_scanner::netbios::netbios_query_scan( + runtime, + ip_range, + single_query_duration, + interval, + TaskManager::new(), + )?; + + while let Some((ip, name)) = receiver.recv().await { + println!("{}: {:?}", ip, name); + } + + Ok(()) +} diff --git a/crates/network-scanner/examples/ping.rs b/crates/network-scanner/examples/ping.rs index 8c648a407..9332972e6 100644 --- a/crates/network-scanner/examples/ping.rs +++ b/crates/network-scanner/examples/ping.rs @@ -1,11 +1,16 @@ +use std::time::Duration; + +use anyhow::Context; use network_scanner::ping::ping; #[tokio::main] -pub async fn main() { +pub async fn main() -> anyhow::Result<()> { tracing_subscriber::fmt::SubscriberBuilder::default() .with_max_level(tracing::Level::TRACE) .init(); - let ip = std::net::Ipv4Addr::new(127, 0, 0, 1); - ping(ip).await.expect("Failed to ping"); + let ip = std::net::Ipv4Addr::new(8, 8, 8, 82); //famous google dns + let runtime = network_scanner_net::runtime::Socket2Runtime::new(None)?; + ping(runtime, ip, Duration::from_secs(1)).await.context("ping failed")?; // this will fail + Ok(()) } diff --git a/crates/network-scanner/examples/ping_range.rs b/crates/network-scanner/examples/ping_range.rs new file mode 100644 index 000000000..36c69e192 --- /dev/null +++ b/crates/network-scanner/examples/ping_range.rs @@ -0,0 +1,54 @@ +use std::{ + net::{IpAddr, Ipv4Addr}, + time::Duration, +}; + +use anyhow::Ok; +use network_scanner::ping::ping_range; +use network_scanner::task_utils::TaskManager; + +#[tokio::main] +pub async fn main() -> anyhow::Result<()> { + tracing_subscriber::fmt::SubscriberBuilder::default() + .with_max_level(tracing::Level::INFO) + .init(); + + let lower = IpAddr::V4(Ipv4Addr::new(10, 10, 0, 0)); + let upper = IpAddr::V4(Ipv4Addr::new(10, 10, 0, 125)); + + let range = network_scanner::ip_utils::IpAddrRange::new(lower, upper)?; + + let runtime = network_scanner_net::runtime::Socket2Runtime::new(None)?; + + let ping_interval = Duration::from_millis(20); + + let ping_wait_time = Duration::from_secs(1); + + let should_ping = |ip: IpAddr| { + // if ip is even, ping it + ip.is_ipv4() && { + if let IpAddr::V4(v4) = ip { + v4.octets()[3] % 2 == 0 + } else { + false + } + } + }; + let now = std::time::Instant::now(); + let mut receiver = ping_range( + runtime, + range, + ping_interval, + ping_wait_time, + should_ping, + TaskManager::new(), + )?; + + while let Some(ip) = receiver.recv().await { + tracing::info!("{} is alive", ip); + } + + tracing::info!("Elapsed time: {:?}", now.elapsed()); + + Ok(()) +} diff --git a/crates/network-scanner/examples/port_discovery.rs b/crates/network-scanner/examples/port_discovery.rs new file mode 100644 index 000000000..0ca9b4026 --- /dev/null +++ b/crates/network-scanner/examples/port_discovery.rs @@ -0,0 +1,25 @@ +use std::time::Duration; + +use network_scanner::task_utils::TaskManager; + +#[tokio::main] +pub async fn main() { + tracing_subscriber::fmt::SubscriberBuilder::default() + .with_max_level(tracing::Level::DEBUG) + .with_thread_names(true) + .init(); + + let runtime = network_scanner_net::runtime::Socket2Runtime::new(None).expect("Failed to create runtime"); + + let ip = std::net::Ipv4Addr::new(127, 0, 0, 1); + // let port = 22,80,443,12345,3399,88 + let port = vec![22, 80, 443, 12345, 3399, 88]; + let mut res = + network_scanner::port_discovery::scan_ports(ip, &port, runtime, Duration::from_secs(5), TaskManager::new()) + .await + .expect("Failed to scan ports"); + + while let Some(res) = res.recv().await { + tracing::warn!("Port scan result: {:?}", res); + } +} diff --git a/crates/network-scanner/examples/scan.rs b/crates/network-scanner/examples/scan.rs new file mode 100644 index 000000000..a853698e1 --- /dev/null +++ b/crates/network-scanner/examples/scan.rs @@ -0,0 +1,51 @@ +use std::time::Duration; + +use anyhow::Context; +use network_scanner::scanner::{NetworkScanner, NetworkScannerParams}; + +fn main() -> anyhow::Result<()> { + tracing_subscriber::fmt::SubscriberBuilder::default() + .with_max_level(tracing::Level::DEBUG) + .with_file(true) + .with_line_number(true) + .with_thread_names(true) + .init(); + + let params = NetworkScannerParams { + ports: vec![22, 80, 443, 389, 636], + ping_interval: 20, + ping_timeout: 1000, + + broadcast_timeout: 2000, + + port_scan_timeout: 2000, + + netbios_timeout: 1000, + netbios_interval: 20, + + max_wait_time: 120 * 1000, + }; + let rt = tokio::runtime::Runtime::new()?; + rt.block_on(async move { + let scanner = NetworkScanner::new(params).unwrap(); + let stream = scanner.start()?; + let stream_clone = stream.clone(); + let now = std::time::Instant::now(); + while let Ok(Some(res)) = stream_clone + .recv_timeout(Duration::from_secs(120)) + .await + .with_context(|| { + tracing::error!("Failed to receive from stream"); + "Failed to receive from stream" + }) + { + tracing::warn!("Result: {:?}", res); + } + stream_clone.stop(); + tracing::warn!("Network Scan finished. elapsed: {:?}", now.elapsed()); + anyhow::Result::<()>::Ok(()) + })?; + + tracing::info!("Done"); + Ok(()) +} diff --git a/crates/network-scanner/src/broadcast/asynchronous.rs b/crates/network-scanner/src/broadcast/asynchronous.rs new file mode 100644 index 000000000..c84cb3f70 --- /dev/null +++ b/crates/network-scanner/src/broadcast/asynchronous.rs @@ -0,0 +1,63 @@ +use std::{ + mem::MaybeUninit, + net::{Ipv4Addr, SocketAddr}, + sync::Arc, + time::Duration, +}; + +use anyhow::Context; +use network_scanner_net::runtime; +use network_scanner_proto::icmp_v4; +use socket2::SockAddr; + +use crate::create_echo_request; + +/// Broadcasts a ping to the given ip address +/// caller need to make sure that the ip address is a broadcast address +/// The timeout is for the read, if for more than given time no packet is received, the stream will end +pub async fn broadcast( + ip: Ipv4Addr, + read_time_out: Duration, + runtime: Arc, + task_manager: crate::task_utils::TaskManager, +) -> anyhow::Result> { + let socket = runtime.new_socket( + socket2::Domain::IPV4, + socket2::Type::RAW, + Some(socket2::Protocol::ICMPV4), + )?; + + socket.set_broadcast(true)?; + + // skip verification, we are not interested in the response + let (packet, _) = create_echo_request()?; + let (sender, receiver) = tokio::sync::mpsc::channel(255); + + task_manager.spawn_no_sub_task(async move { + socket + .send_to(&packet.to_bytes(true), &SockAddr::from(SocketAddr::new(ip.into(), 0))) + .await?; + + tokio::time::timeout(read_time_out, loop_receive(socket, sender)).await??; + tracing::debug!("broadcast future dropped"); + anyhow::Ok(()) + }); + + async fn loop_receive( + mut socket: network_scanner_net::socket::AsyncRawSocket, + sender: tokio::sync::mpsc::Sender, + ) -> anyhow::Result<()> { + let mut buffer = [MaybeUninit::uninit(); icmp_v4::ICMPV4_MTU]; + loop { + if let Ok((_, addr)) = socket.recv_from(&mut buffer).await { + let ip = *addr + .as_socket_ipv4() + .context("unreachable: we only use ipv4 for broadcast")? + .ip(); + sender.send(ip).await?; + }; + } + } + + Ok(receiver) +} diff --git a/crates/network-scanner/src/broadcast/blocking.rs b/crates/network-scanner/src/broadcast/blocking.rs new file mode 100644 index 000000000..53b94d7f0 --- /dev/null +++ b/crates/network-scanner/src/broadcast/blocking.rs @@ -0,0 +1,86 @@ +use std::{ + mem::MaybeUninit, + net::{Ipv4Addr, SocketAddr}, + time::Duration, +}; + +use anyhow::Context; + +use network_scanner_proto::icmp_v4; + +use crate::create_echo_request; + +use super::BroadcastResponseEntry; + +pub struct BorcastBlockStream { + socket: socket2::Socket, + verifier: Vec, + should_verify: bool, +} + +impl Iterator for BorcastBlockStream { + type Item = anyhow::Result; + + fn next(&mut self) -> Option { + let mut buffer = [MaybeUninit::uninit(); icmp_v4::ICMPV4_MTU]; + let res = self.socket.recv_from(&mut buffer); + + let (size, addr) = match res { + Ok(res) => res, + Err(e) => { + return Some(Err(e.into())); + } + }; + + if size == 0 { + return None; + } + + let ping_result = unsafe { BroadcastResponseEntry::from_raw(addr, &buffer, size) }; + + let ping_result = match ping_result { + Ok(a) => a, + Err(e) => return Some(Err(e)), + }; + + if self.should_verify && !ping_result.verify(&self.verifier) { + return Some(Err(anyhow::anyhow!("failed to verify ping response"))); + } + + Some(Ok(ping_result)) + } +} + +impl BorcastBlockStream { + pub fn should_verify(&mut self, should_verify: bool) { + self.should_verify = should_verify; + } +} + +pub fn block_broadcast(ip: Ipv4Addr, read_time_out: Option) -> anyhow::Result { + let socket = socket2::Socket::new( + socket2::Domain::IPV4, + socket2::Type::RAW, + Some(socket2::Protocol::ICMPV4), + )?; + socket.set_broadcast(true)?; + + if let Some(time_out) = read_time_out { + socket.set_read_timeout(Some(time_out))?; + } + + let addr = SocketAddr::new(ip.into(), 0); + + let (packet, verifier) = create_echo_request()?; + + tracing::trace!(?packet, "sending packet"); + socket + .send_to(&packet.to_bytes(true), &addr.into()) + .with_context(|| format!("Failed to send packet to {}", ip))?; + + Ok(BorcastBlockStream { + socket, + verifier, + should_verify: true, + }) +} diff --git a/crates/network-scanner/src/broadcast/mod.rs b/crates/network-scanner/src/broadcast/mod.rs new file mode 100644 index 000000000..717564263 --- /dev/null +++ b/crates/network-scanner/src/broadcast/mod.rs @@ -0,0 +1,43 @@ +use std::{mem::MaybeUninit, net::Ipv4Addr}; + +use anyhow::Context; + +pub mod asynchronous; +pub mod blocking; + +#[derive(Debug)] +pub struct BroadcastResponseEntry { + pub addr: Ipv4Addr, + pub packet: network_scanner_proto::icmp_v4::Icmpv4Packet, +} + +impl BroadcastResponseEntry { + pub(crate) unsafe fn from_raw( + addr: socket2::SockAddr, + payload: &[MaybeUninit], + size: usize, + ) -> anyhow::Result { + let addr = *addr + .as_socket_ipv4() + .with_context(|| "sock addr is not ipv4".to_string())? + .ip(); // ip is private + + let payload = payload[..size] + .as_ref() + .iter() + .map(|u| unsafe { u.assume_init() }) + .collect::>(); + + let packet = network_scanner_proto::icmp_v4::Icmpv4Packet::parse(payload.as_slice())?; + + Ok(BroadcastResponseEntry { addr, packet }) + } + + pub fn verify(&self, verifier: &[u8]) -> bool { + if let network_scanner_proto::icmp_v4::Icmpv4Message::EchoReply { payload, .. } = &self.packet.message { + payload == verifier + } else { + false + } + } +} diff --git a/crates/network-scanner/src/ip_utils.rs b/crates/network-scanner/src/ip_utils.rs new file mode 100644 index 000000000..5d958aeea --- /dev/null +++ b/crates/network-scanner/src/ip_utils.rs @@ -0,0 +1,256 @@ +use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; + +use anyhow::Context; +use network_interface::{Addr, NetworkInterfaceConfig, V4IfAddr}; + +pub struct IpAddrRange { + lower: IpAddr, + upper: IpAddr, +} + +pub struct IpAddrRangeIter { + range: IpAddrRange, + current: Option, +} + +impl IpAddrRangeIter { + pub fn new(range: IpAddrRange) -> Self { + Self { + current: Some(range.lower), + range, + } + } +} + +impl Iterator for IpAddrRangeIter { + type Item = IpAddr; + + fn next(&mut self) -> Option { + let current = self.current.take()?; + if current > self.range.upper { + return None; + } + self.current = Some(increment_ip(current)); + Some(current) + } +} + +fn increment_ip(ip: IpAddr) -> IpAddr { + match ip { + IpAddr::V4(ip) => { + let mut octets = ip.octets(); + for i in (0..4).rev() { + if octets[i] < 255 { + octets[i] += 1; + break; + } else { + octets[i] = 0; + } + } + IpAddr::V4(Ipv4Addr::from(octets)) + } + IpAddr::V6(ip) => { + let mut segments = ip.segments(); + for i in (0..8).rev() { + if segments[i] < 0xffff { + segments[i] += 1; + break; + } else { + segments[i] = 0; + } + } + IpAddr::V6(Ipv6Addr::from(segments)) + } + } +} + +impl IpAddrRange { + pub fn is_ipv6(&self) -> bool { + self.lower.is_ipv6() && self.upper.is_ipv6() + } + + pub fn is_ipv4(&self) -> bool { + self.lower.is_ipv4() && self.upper.is_ipv4() + } + + pub fn new(lower: IpAddr, upper: IpAddr) -> anyhow::Result { + if lower.is_ipv4() != upper.is_ipv4() { + anyhow::bail!("IP range needs to be the same type"); + } + + if lower > upper { + return Ok(Self { + lower: upper, + upper: lower, + }); + } + + Ok(Self { lower, upper }) + } + + fn into_iter_inner(self) -> IpAddrRangeIter { + IpAddrRangeIter::new(self) + } + + pub fn has_overlap(&self, other: &Self) -> bool { + self.lower <= other.upper && self.upper >= other.lower + } +} + +impl IntoIterator for IpAddrRange { + type Item = IpAddr; + type IntoIter = IpAddrRangeIter; + + fn into_iter(self) -> Self::IntoIter { + self.into_iter_inner() + } +} + +impl TryFrom for IpAddrRange { + type Error = anyhow::Error; + + fn try_from(value: V4IfAddr) -> Result { + let V4IfAddr { + ip, + broadcast: _, + netmask, + } = value; + + let Some(netmask) = netmask else { + anyhow::bail!("Network interface does not have a netmask"); + }; + + let (lower, upper) = calculate_subnet_bounds(ip, netmask); + + Self::new(lower.into(), upper.into()) + } +} + +impl From for IpAddrRange { + fn from(value: Subnet) -> Self { + let Subnet { ip, netmask, .. } = value; + + let (lower, upper) = calculate_subnet_bounds(ip, netmask); + Self::new(lower.into(), upper.into()).unwrap() + } +} + +impl From<&Subnet> for IpAddrRange { + fn from(value: &Subnet) -> Self { + let Subnet { ip, netmask, .. } = value; + + let (lower, upper) = calculate_subnet_bounds(*ip, *netmask); + Self::new(lower.into(), upper.into()).unwrap() + } +} + +fn calculate_subnet_bounds(ip: Ipv4Addr, netmask: Ipv4Addr) -> (Ipv4Addr, Ipv4Addr) { + let ip_u32 = u32::from(ip); + let netmask_u32 = u32::from(netmask); + + // Calculate Network Address (Lower IP) + let network_address = Ipv4Addr::from(ip_u32 & netmask_u32); + + // Calculate Broadcast Address (Upper IP) + let wildcard_mask = !netmask_u32; + let broadcast_address = Ipv4Addr::from(ip_u32 | wildcard_mask); + + (network_address, broadcast_address) +} + +#[derive(Debug, Clone)] +pub struct Subnet { + pub ip: Ipv4Addr, + pub netmask: Ipv4Addr, + pub broadcast: Ipv4Addr, +} + +pub fn get_subnets() -> anyhow::Result> { + let interfaces = network_interface::NetworkInterface::show().context("Failed to get network interfaces")?; + + let subnet: Vec<_> = interfaces + .into_iter() + .map(|interface| { + interface + .addr + .into_iter() + .filter_map(|addr| { + let addr = match addr { + Addr::V4(v4) => { + if v4.ip.is_loopback() || v4.ip.is_link_local() { + return None; + } + v4 + } + Addr::V6(_) => return None, + }; + + if addr.broadcast.is_some() { + Some(addr) + } else { + None + } + }) + .collect::>() + }) + .flat_map(|addrs| addrs.into_iter()) + .map(|addr| { + let ip = addr.ip; + let netmask = addr.netmask.unwrap(); + let broadcast = addr.broadcast.unwrap(); + Subnet { ip, netmask, broadcast } + }) + .collect(); + + Ok(subnet) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_from_v4_if_addr() { + let ip = Ipv4Addr::new(192, 168, 1, 50); + let netmask = Ipv4Addr::new(255, 255, 255, 0); + let broadcast = Ipv4Addr::new(192, 168, 1, 255); + let v4_if_addr = V4IfAddr { + ip, + broadcast: Some(broadcast), + netmask: Some(netmask), + }; + + let range = IpAddrRange::try_from(v4_if_addr).unwrap(); + + assert_eq!(range.lower, "192.168.1.0".parse::().unwrap()); + assert_eq!(range.upper, broadcast); + } + + #[test] + fn test_from_bad_v4_if_addr() { + let ip = Ipv4Addr::new(192, 168, 1, 50); + let bad_v4_if_addr = V4IfAddr { + ip, + broadcast: None, + netmask: None, + }; + + let range = IpAddrRange::try_from(bad_v4_if_addr); + assert!(range.is_err()); + } + + #[test] + fn test_iter_ipv4() { + let lower = "10.10.0.0".parse::().unwrap(); + let upper = "10.10.0.2".parse::().unwrap(); + + let range = IpAddrRange::new(lower.into(), upper.into()).unwrap(); + + let mut iter = range.into_iter(); + + assert_eq!(iter.next(), Some(IpAddr::V4(Ipv4Addr::new(10, 10, 0, 0)))); + assert_eq!(iter.next(), Some(IpAddr::V4(Ipv4Addr::new(10, 10, 0, 1)))); + assert_eq!(iter.next(), Some(IpAddr::V4(Ipv4Addr::new(10, 10, 0, 2)))); + assert_eq!(iter.next(), None); + } +} diff --git a/crates/network-scanner/src/lib.rs b/crates/network-scanner/src/lib.rs index a766209cf..5acd0379d 100644 --- a/crates/network-scanner/src/lib.rs +++ b/crates/network-scanner/src/lib.rs @@ -1 +1,56 @@ +use std::{mem::MaybeUninit, net::IpAddr}; + +use anyhow::Context; +use network_interface::Addr; +use network_scanner_proto::icmp_v4; + +pub mod broadcast; +pub mod ip_utils; +pub mod netbios; pub mod ping; +pub mod port_discovery; +pub mod scanner; +pub mod task_utils; + +#[derive(Debug, thiserror::Error)] +pub enum ScannerError { + #[error("Ipv6 currently no t supported for this operation: {0}")] + DoesNotSupportIpv6(String), + + #[error("IP range needs to be the same type")] + IpRangeNeedsToBeTheSameType(IpAddr, IpAddr), + + #[error("Network interface does not have a netmask")] + InterfaceDoesNotHaveNetmask(Addr), + + #[error("Other error: {0}")] + Other(#[from] anyhow::Error), +} + +/// Assume the `buf`fer to be initialised. +/// +/// # Safety +/// +/// It is up to the caller to guarantee that the MaybeUninit elements really are in an initialized state. +/// Calling this when the content is not yet fully initialized causes undefined behavior. +// TODO: replace with `MaybeUninit::slice_assume_init_ref` once stable. +// https://github.com/rust-lang/rust/issues/63569 +pub(crate) unsafe fn assume_init(buf: &[MaybeUninit]) -> &[u8] { + &*(buf as *const [MaybeUninit] as *const [u8]) +} + +pub(crate) fn create_echo_request() -> anyhow::Result<(icmp_v4::Icmpv4Packet, Vec)> { + let time = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .with_context(|| "failed to get current time")? + .as_secs(); + + let echo_message = icmp_v4::Icmpv4Message::Echo { + identifier: 0, + sequence: 0, + payload: time.to_be_bytes().to_vec(), + }; + + let packet = icmp_v4::Icmpv4Packet::from_message(echo_message); + Ok((packet, time.to_be_bytes().to_vec())) +} diff --git a/crates/network-scanner/src/netbios.rs b/crates/network-scanner/src/netbios.rs new file mode 100644 index 000000000..72dfaf5f9 --- /dev/null +++ b/crates/network-scanner/src/netbios.rs @@ -0,0 +1,70 @@ +use std::{ + mem::MaybeUninit, + net::{IpAddr, SocketAddr}, + sync::Arc, +}; + +use network_scanner_net::{runtime::Socket2Runtime, socket::AsyncRawSocket}; +use network_scanner_proto::netbios::NetBiosPacket; +use socket2::{Domain, SockAddr, Type}; +use tokio::time::timeout; + +use crate::{assume_init, ip_utils::IpAddrRange, task_utils::IpReceiver, ScannerError}; + +const MESSAGE: [u8; 50] = [ + 0xA2, 0x48, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x20, 0x43, 0x4b, 0x41, 0x41, 0x41, 0x41, + 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, + 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x41, 0x00, 0x00, 0x21, 0x00, 0x01, +]; + +const NET_BIOS_PORT: u16 = 137; +pub fn netbios_query_scan( + runtime: Arc, + ip_range: IpAddrRange, + single_query_duration: std::time::Duration, + netbios_scan_interval: std::time::Duration, + task_manager: crate::task_utils::TaskManager, +) -> Result { + if ip_range.is_ipv6() { + return Err(ScannerError::DoesNotSupportIpv6("netbios".to_string())); + } + + let (sender, receiver) = tokio::sync::mpsc::channel(255); + task_manager.spawn(move |task_manager: crate::task_utils::TaskManager| async move { + for ip in ip_range.into_iter() { + let socket = runtime.new_socket(Domain::IPV4, Type::DGRAM, None)?; + let (sender, task_manager) = (sender.clone(), task_manager.clone()); + netbios_query_one(ip, socket, sender, single_query_duration, task_manager); + tokio::time::sleep(netbios_scan_interval).await; + } + anyhow::Ok(()) + }); + + Ok(receiver) +} + +pub(crate) fn netbios_query_one( + ip: IpAddr, + mut socket: AsyncRawSocket, + result_sender: crate::task_utils::IpSender, + duration: std::time::Duration, + task_manager: crate::task_utils::TaskManager, +) { + task_manager.spawn_no_sub_task(async move { + let socket_addr: SocketAddr = (ip, NET_BIOS_PORT).into(); + let addr = SockAddr::from(socket_addr); + + timeout(duration, socket.send_to(&MESSAGE, &addr)).await??; + let mut buf: [MaybeUninit; 1024] = [MaybeUninit::::uninit(); 1024]; + timeout(duration, socket.recv(&mut buf)).await??; + unsafe { + let buf = assume_init(&buf); + let IpAddr::V4(ipv4) = ip else { + anyhow::bail!("unreachable"); + }; + let packet = NetBiosPacket::from(ipv4, buf); + result_sender.send((ipv4.into(), Some(packet.name()))).await? + } + anyhow::Result::<()>::Ok(()) + }); +} diff --git a/crates/network-scanner/src/ping.rs b/crates/network-scanner/src/ping.rs index c16952b8b..f306562eb 100644 --- a/crates/network-scanner/src/ping.rs +++ b/crates/network-scanner/src/ping.rs @@ -1,13 +1,83 @@ use std::{ mem::MaybeUninit, - net::{Ipv4Addr, SocketAddr}, + net::{IpAddr, Ipv4Addr, SocketAddr}, + sync::Arc, + time::Duration, }; use anyhow::Context; +use network_scanner_net::{runtime::Socket2Runtime, socket::AsyncRawSocket}; use network_scanner_proto::icmp_v4; +use tokio::time::timeout; -pub async fn ping(ip: Ipv4Addr) -> anyhow::Result<()> { - tokio::task::spawn_blocking(move || blocking_ping(ip)).await? +use crate::{create_echo_request, ip_utils::IpAddrRange}; + +pub fn ping_range( + runtime: Arc, + range: IpAddrRange, + ping_interval: Duration, + ping_wait_time: Duration, + should_ping: impl Fn(IpAddr) -> bool + Send + Sync + 'static + Clone, + task_manager: crate::task_utils::TaskManager, +) -> anyhow::Result> { + let (sender, receiver) = tokio::sync::mpsc::channel(255); + let mut futures = vec![]; + for ip in range.into_iter() { + let socket = runtime.new_socket( + socket2::Domain::IPV4, + socket2::Type::RAW, + Some(socket2::Protocol::ICMPV4), + )?; + let addr = SocketAddr::new(ip, 0); + let sender = sender.clone(); + let should_ping = should_ping.clone(); + let ping_future = async move { + if !should_ping(ip) { + return anyhow::Ok(()); + } + if try_ping(addr.into(), socket).await.is_ok() { + sender.send(ip).await?; + } + anyhow::Ok(()) + }; + + futures.push(ping_future); + } + + task_manager.spawn(move |task_manager: crate::task_utils::TaskManager| async move { + for future in futures { + task_manager.spawn_no_sub_task(async move { + timeout(ping_wait_time, future).await??; + anyhow::Ok(()) + }); + tokio::time::sleep(ping_interval).await; + } + anyhow::Ok(()) + }); + + Ok(receiver) +} + +pub async fn ping(runtime: Arc, ip: impl Into, duration: Duration) -> anyhow::Result<()> { + let socket = runtime.new_socket( + socket2::Domain::IPV4, + socket2::Type::RAW, + Some(socket2::Protocol::ICMPV4), + )?; + let addr = SocketAddr::new(ip.into(), 0); + timeout(duration, try_ping(addr.into(), socket)).await? +} + +async fn try_ping(addr: socket2::SockAddr, mut socket: AsyncRawSocket) -> anyhow::Result<()> { + // skip verification, we are not interested in the response + let (packet, _) = create_echo_request()?; + let packet_bytes = packet.to_bytes(true); + + socket.send_to(&packet_bytes, &addr).await?; + + let mut buffer = [MaybeUninit::uninit(); icmp_v4::ICMPV4_MTU]; + socket.recv_from(&mut buffer).await?; + Ok(()) } pub fn blocking_ping(ip: Ipv4Addr) -> anyhow::Result<()> { @@ -19,58 +89,16 @@ pub fn blocking_ping(ip: Ipv4Addr) -> anyhow::Result<()> { let addr = SocketAddr::new(ip.into(), 0); - let (packet, verifier) = create_echo_request()?; + let (packet, _) = create_echo_request()?; socket .send_to(&packet.to_bytes(true), &addr.into()) .with_context(|| format!("failed to send packet to {}", ip))?; let mut buffer = [MaybeUninit::uninit(); icmp_v4::ICMPV4_MTU]; - let (size, _) = socket + let _ = socket .recv_from(&mut buffer) .with_context(|| format!("failed to receive packet from {}", ip))?; - // SAFETY: `recv_from` returns the number of bytes written into the buffer, so the `size` first - // elements are in an initialized state. - let filled = unsafe { assume_init(&buffer[..size]) }; - - let packet = icmp_v4::Icmpv4Packet::parse(filled).context("cannot parse icmp packet")?; - - if let icmp_v4::Icmpv4Message::EchoReply { payload, .. } = packet.message { - if payload != verifier { - anyhow::bail!("payload does not match for echo reply"); - } else { - Ok(()) - } - } else { - anyhow::bail!("received non-echo reply"); - } -} - -/// Assume the `buf`fer to be initialised. -/// -/// # Safety -/// -/// It is up to the caller to guarantee that the MaybeUninit elements really are in an initialized state. -/// Calling this when the content is not yet fully initialized causes undefined behavior. -// TODO: replace with `MaybeUninit::slice_assume_init_ref` once stable. -// https://github.com/rust-lang/rust/issues/63569 -pub(crate) unsafe fn assume_init(buf: &[MaybeUninit]) -> &[u8] { - &*(buf as *const [MaybeUninit] as *const [u8]) -} - -fn create_echo_request() -> anyhow::Result<(icmp_v4::Icmpv4Packet, Vec)> { - let time = std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .with_context(|| "failed to get current time")? - .as_secs(); - - let echo_message = icmp_v4::Icmpv4Message::Echo { - identifier: 0, - sequence: 0, - payload: time.to_be_bytes().to_vec(), - }; - - let packet = icmp_v4::Icmpv4Packet::from_message(echo_message); - Ok((packet, time.to_be_bytes().to_vec())) + Ok(()) } diff --git a/crates/network-scanner/src/port_discovery.rs b/crates/network-scanner/src/port_discovery.rs new file mode 100644 index 000000000..89192011d --- /dev/null +++ b/crates/network-scanner/src/port_discovery.rs @@ -0,0 +1,85 @@ +use std::{ + net::{IpAddr, SocketAddr}, + sync::Arc, + time::Duration, +}; + +use anyhow::Context; +use network_scanner_net::runtime::Socket2Runtime; +use socket2::SockAddr; + +use crate::task_utils::TaskManager; + +pub async fn scan_ports( + ip: impl Into, + port: &[u16], + runtime: Arc, + timeout: Duration, + task_manager: TaskManager, +) -> anyhow::Result> { + let ip = ip.into(); + let mut sockets = vec![]; + for p in port { + let addr = SockAddr::from(SocketAddr::from((ip, *p))); + let socket = runtime.new_socket(socket2::Domain::IPV4, socket2::Type::STREAM, None)?; + sockets.push((socket, addr)); + } + + let (sender, receiver) = tokio::sync::mpsc::channel(port.len()); + for (socket, addr) in sockets { + let sender = sender.clone(); + task_manager.spawn_no_sub_task(async move { + let connect_future = socket.connect(&addr); + let addr = addr + .as_socket() + .context("Failed to scan port: only IPv4/IPv6 addresses are supported")?; + + match tokio::time::timeout(timeout, connect_future).await { + Ok(Ok(())) => { + // Successfully connected to the port + sender.send(PortScanResult::Open(addr)).await?; + } + Ok(Err(_)) => { + // Failed to connect, but not due to a timeout (e.g., port is closed) + sender.send(PortScanResult::Closed(addr)).await?; + } + Err(_) => { + // Operation timed out + sender.send(PortScanResult::Timeout(addr)).await?; + } + } + + Ok::<(), anyhow::Error>(()) + }); + } + + Ok(receiver) +} + +#[derive(Debug)] +pub enum PortScanResult { + Open(SocketAddr), + Closed(SocketAddr), + Timeout(SocketAddr), +} + +impl PortScanResult { + pub fn is_open(&self) -> bool { + matches!(self, PortScanResult::Open(_)) + } + + pub fn is_closed(&self) -> bool { + matches!(self, PortScanResult::Closed(_)) + } + + pub fn is_timeout(&self) -> bool { + matches!(self, PortScanResult::Timeout(_)) + } + + pub fn unwrap_open(self) -> SocketAddr { + match self { + PortScanResult::Open(addr) => addr, + _ => panic!("unwrap_open called on non-open result"), + } + } +} diff --git a/crates/network-scanner/src/scanner.rs b/crates/network-scanner/src/scanner.rs new file mode 100644 index 000000000..cdbbd7031 --- /dev/null +++ b/crates/network-scanner/src/scanner.rs @@ -0,0 +1,309 @@ +use crate::{ + ip_utils::IpAddrRange, + netbios::netbios_query_scan, + ping::ping_range, + port_discovery::{scan_ports, PortScanResult}, + task_utils::TaskManager, +}; + +use anyhow::Context; +use std::{fmt::Display, net::IpAddr, sync::Arc, time::Duration}; +use typed_builder::TypedBuilder; + +use tokio::sync::Mutex; + +use crate::{ + broadcast::asynchronous::broadcast, + task_utils::{TaskExecutionContext, TaskExecutionRunner}, +}; + +#[derive(Debug, Clone)] +pub struct NetworkScanner { + pub ports: Vec, + + pub(crate) runtime: Arc, + // TODO: use this + // scan_method: Vec, + pub ping_interval: Duration, // in milliseconds + pub ping_timeout: Duration, // in milliseconds + pub broadcast_timeout: Duration, // in milliseconds + pub port_scan_timeout: Duration, // in milliseconds + pub netbios_timeout: Duration, // in milliseconds + pub netbios_interval: Duration, // in milliseconds + pub max_wait_time: Duration, // max_wait for entire scan duration in milliseconds, suggested! +} + +impl NetworkScanner { + pub fn start(&self) -> anyhow::Result> { + let mut task_executor = TaskExecutionRunner::new(self.clone())?; + + task_executor.run(move |context, task_manager| async move { + let TaskExecutionContext { + ip_cache, + ip_receiver, + ports, + runtime, + port_scan_timeout, + port_sender, + .. + } = context; + let ip_cache = ip_cache.clone(); + while let Some((ip, host)) = ip_receiver.lock().await.recv().await { + if ip_cache.read().get(&ip).is_some() { + if host.is_some() { + ip_cache.write().insert(ip, host); + } + continue; + } + + ip_cache.write().insert(ip, host); + + let (runtime, ports, port_sender, ip_cache) = + (runtime.clone(), ports.clone(), port_sender.clone(), ip_cache.clone()); + + task_manager.spawn(move |task_manager| async move { + tracing::debug!(scanning_ip = ?ip); + + let dns_look_up_res = tokio::task::spawn_blocking(move || dns_lookup::lookup_addr(&ip).ok()); + + let mut port_scan_receiver = + scan_ports(ip, &ports, runtime, port_scan_timeout, task_manager).await?; + + let dns = dns_look_up_res.await?; + + ip_cache.write().insert(ip, dns.clone()); + + while let Some(res) = port_scan_receiver.recv().await { + tracing::trace!(port_scan_result = ?res); + if let PortScanResult::Open(socket_addr) = res { + let dns = ip_cache.read().get(&ip).cloned().flatten(); + port_sender.send((ip, dns, socket_addr.port())).await?; + } + } + anyhow::Ok(()) + }); + } + + anyhow::Ok(()) + }); + + task_executor.run(move |context, task_manager| async move { + let TaskExecutionContext { + subnets, + broadcast_timeout, + runtime, + ip_sender, + .. + } = context; + + for subnet in subnets { + let (runtime, ip_sender) = (runtime.clone(), ip_sender.clone()); + task_manager.spawn(move |task_manager: crate::task_utils::TaskManager| async move { + let mut receiver = broadcast(subnet.broadcast, broadcast_timeout, runtime, task_manager).await?; + while let Some(ip) = receiver.recv().await { + tracing::trace!(broadcast_sent_ip = ?ip); + ip_sender.send((ip.into(), None)).await?; + } + anyhow::Ok(()) + }); + } + anyhow::Ok(()) + }); + + task_executor.run(move |context, task_manager| async move { + let TaskExecutionContext { + subnets, + netbios_timeout, + netbios_interval, + runtime, + ip_sender, + .. + } = context; + + let ip_ranges: Vec = subnets.iter().map(|subnet| subnet.into()).collect(); + + for ip_range in ip_ranges { + let (runtime, ip_sender, task_manager) = (runtime.clone(), ip_sender.clone(), task_manager.clone()); + let mut receiver = + netbios_query_scan(runtime, ip_range, netbios_timeout, netbios_interval, task_manager)?; + while let Some(res) = receiver.recv().await { + tracing::debug!(netbios_query_sent_ip = ?res.0); + ip_sender.send(res).await?; + } + } + anyhow::Ok(()) + }); + + task_executor.run(move |context, task_manager| async move { + let TaskExecutionContext { + ping_interval, + ping_timeout, + runtime, + ip_sender, + subnets, + ip_cache, + .. + } = context; + + let ip_ranges: Vec = subnets.iter().map(|subnet| subnet.into()).collect(); + + let should_ping = move |ip: IpAddr| -> bool { !ip_cache.read().contains_key(&ip) }; + + for ip_range in ip_ranges { + let (task_manager, runtime, ip_sender) = (task_manager.clone(), runtime.clone(), ip_sender.clone()); + let should_ping = should_ping.clone(); + let mut receiver = ping_range( + runtime, + ip_range, + ping_interval, + ping_timeout, + should_ping, + task_manager, + )?; + + while let Some(ip) = receiver.recv().await { + tracing::debug!(ping_sent_ip = ?ip); + ip_sender.send((ip, None)).await?; + } + } + anyhow::Ok(()) + }); + + let TaskExecutionRunner { + context: TaskExecutionContext { port_receiver, .. }, + task_manager, + } = task_executor; + + task_manager.stop_timeout(self.max_wait_time); + + Ok({ + Arc::new(NetworkScannerStream { + result_receiver: port_receiver, + task_manager, + }) + }) + } + + pub fn new(params: NetworkScannerParams) -> anyhow::Result { + let NetworkScannerParams { + ports, + ping_timeout, + max_wait_time: max_wait, + ping_interval, + broadcast_timeout, + port_scan_timeout, + netbios_timeout, + netbios_interval, + } = params; + + let runtime = network_scanner_net::runtime::Socket2Runtime::new(None)?; + + let ping_timeout = Duration::from_millis(ping_timeout); + let ping_interval = Duration::from_millis(ping_interval); + let broadcast_timeout = Duration::from_millis(broadcast_timeout); + let port_scan_timeout = Duration::from_millis(port_scan_timeout); + let netbios_timeout = Duration::from_millis(netbios_timeout); + let netbios_interval = Duration::from_millis(netbios_interval); + let max_wait = Duration::from_millis(max_wait); + + Ok(Self { + runtime, + ports, + ping_interval, + ping_timeout, + broadcast_timeout, + port_scan_timeout, + netbios_timeout, + netbios_interval, + max_wait_time: max_wait, + }) + } +} + +type ResultReceiver = tokio::sync::mpsc::Receiver<(IpAddr, Option, u16)>; +pub struct NetworkScannerStream { + result_receiver: Arc>, + task_manager: TaskManager, +} + +impl NetworkScannerStream { + pub async fn recv(self: &Arc) -> Option<(IpAddr, Option, u16)> { + // the caller sometimes require Send, hence the Arc is necessary for socket_addr_receiver + self.result_receiver.lock().await.recv().await + } + pub async fn recv_timeout( + self: &Arc, + duration: Duration, + ) -> anyhow::Result, u16)>> { + tokio::time::timeout(duration, self.result_receiver.lock().await.recv()) + .await + .context("recv_timeout timed out") + } + + pub fn stop(self: Arc) { + self.task_manager.stop(); + } +} + +#[derive(Debug)] +pub struct ScanResult { + pub ip: IpAddr, + pub port: u16, + pub is_open: bool, +} + +pub struct NetworkScanEntry { + pub ip: IpAddr, + pub port: u16, +} + +impl TryFrom for NetworkScanner { + type Error = anyhow::Error; + + fn try_from(value: NetworkScannerParams) -> Result { + Self::new(value) + } +} + +#[derive(Debug, Clone)] +pub enum ScanMethod { + Ping, + Broadcast, + Zeroconf, +} + +impl TryFrom<&str> for ScanMethod { + type Error = anyhow::Error; + + fn try_from(value: &str) -> Result { + match value { + "ping" | "Ping" => Ok(ScanMethod::Ping), + "broadcast" | "Broadcast" => Ok(ScanMethod::Broadcast), + "zeroconf" | "ZeroConf" => Ok(ScanMethod::Zeroconf), + _ => Err(anyhow::anyhow!("Invalid scan method")), + } + } +} + +impl Display for ScanMethod { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let s = match self { + ScanMethod::Ping => "ping", + ScanMethod::Broadcast => "broadcast", + ScanMethod::Zeroconf => "zeroconf", + }; + write!(f, "{}", s) + } +} + +#[derive(Debug, Clone, TypedBuilder, Default)] +pub struct NetworkScannerParams { + pub ports: Vec, + pub ping_interval: u64, // in milliseconds + pub ping_timeout: u64, // in milliseconds + pub broadcast_timeout: u64, // in milliseconds + pub port_scan_timeout: u64, // in milliseconds + pub netbios_timeout: u64, // in milliseconds + pub netbios_interval: u64, // in milliseconds + pub max_wait_time: u64, // max_wait for entire scan duration in milliseconds, suggested! +} diff --git a/crates/network-scanner/src/task_utils.rs b/crates/network-scanner/src/task_utils.rs new file mode 100644 index 000000000..76be3cce6 --- /dev/null +++ b/crates/network-scanner/src/task_utils.rs @@ -0,0 +1,178 @@ +use std::collections::HashMap; +use std::sync::atomic::AtomicBool; +use std::{net::IpAddr, sync::Arc, time::Duration}; + +use std::future::Future; + +use tokio::sync::Mutex; + +use crate::{ + ip_utils::{get_subnets, Subnet}, + scanner::NetworkScanner, +}; + +pub(crate) type IpSender = tokio::sync::mpsc::Sender<(IpAddr, Option)>; +pub(crate) type IpReceiver = tokio::sync::mpsc::Receiver<(IpAddr, Option)>; +pub(crate) type PortSender = tokio::sync::mpsc::Sender<(IpAddr, Option, u16)>; +pub(crate) type PortReceiver = tokio::sync::mpsc::Receiver<(IpAddr, Option, u16)>; + +#[derive(Debug, Clone)] +pub(crate) struct TaskExecutionContext { + pub ip_sender: IpSender, + pub ip_receiver: Arc>, + + pub port_sender: PortSender, + pub port_receiver: Arc>, + + pub ip_cache: Arc>>>, + + pub ports: Vec, + + pub runtime: Arc, + pub ping_interval: Duration, // in milliseconds + pub ping_timeout: Duration, // in milliseconds + pub broadcast_timeout: Duration, // in milliseconds + pub port_scan_timeout: Duration, // in milliseconds + pub netbios_timeout: Duration, // in milliseconds + pub netbios_interval: Duration, // in milliseconds + + pub subnets: Vec, +} + +type HandlesReceiver = crossbeam::channel::Receiver>>; +type HandlesSender = crossbeam::channel::Sender>>; + +#[derive(Debug)] +pub(crate) struct TaskExecutionRunner { + pub(crate) context: TaskExecutionContext, + pub(crate) task_manager: TaskManager, +} + +impl TaskExecutionContext { + pub(crate) fn new(network_scanner: NetworkScanner) -> anyhow::Result { + let (ip_sender, ip_receiver) = tokio::sync::mpsc::channel(5); + let ip_receiver = Arc::new(Mutex::new(ip_receiver)); + + let (port_sender, port_receiver) = tokio::sync::mpsc::channel(100); + let port_receiver = Arc::new(Mutex::new(port_receiver)); + + let subnets = get_subnets()?; + let NetworkScanner { + ports, + ping_timeout, + ping_interval, + broadcast_timeout, + port_scan_timeout, + netbios_timeout, + runtime, + netbios_interval, + .. + } = network_scanner; + + let res = Self { + ip_sender, + ip_receiver, + port_sender, + port_receiver, + ip_cache: Arc::new(parking_lot::RwLock::new(HashMap::new())), + ports, + runtime, + ping_interval, + ping_timeout, + broadcast_timeout, + port_scan_timeout, + netbios_timeout, + netbios_interval, + subnets, + }; + + Ok(res) + } +} + +impl TaskExecutionRunner { + pub(crate) fn run(&mut self, task: T) + where + T: FnOnce(TaskExecutionContext, TaskManager) -> F + Send + 'static, + F: Future> + Send + 'static, + { + let context = self.context.clone(); + self.task_manager + .spawn_no_sub_task(task(context, self.task_manager.clone())); + } + + pub(crate) fn new(scanner: NetworkScanner) -> anyhow::Result { + Ok(Self { + context: TaskExecutionContext::new(scanner)?, + task_manager: TaskManager::new(), + }) + } +} + +/// A task manager that can spawn tasks and stop them. +/// Collects all the handles of the spawned tasks and stops them when the stop method is called. +/// Helps to manage the lifetime of the spawned tasks. +#[derive(Debug, Clone)] +pub struct TaskManager { + handles_sender: HandlesSender, + handles_receiver: Arc, + should_stop: Arc, +} + +impl Default for TaskManager { + fn default() -> Self { + Self::new() + } +} + +impl TaskManager { + pub fn new() -> Self { + // This channel needs to be unbounded. Because we only clear out the channel once when we stop the tasks. + // If the channel is bounded, all tokio workers will be blocked forever and eventually the program will hang. + let (handles_sender, handles_receiver) = crossbeam::channel::unbounded(); + Self { + handles_sender, + handles_receiver: Arc::new(handles_receiver), + should_stop: Arc::new(AtomicBool::new(false)), + } + } + + pub(crate) fn spawn(&self, task: T) + where + T: FnOnce(Self) -> F + Send + 'static, + F: Future> + Send + 'static, + { + // Avoid race condition when stopping the tasks. + // If the stop method is called, we should not spawn any more tasks. + if self.should_stop.load(std::sync::atomic::Ordering::SeqCst) { + return; + } + let clone = self.clone(); + let handle = tokio::spawn(task(clone)); + let _ = self.handles_sender.send(handle); + } + + pub(crate) fn spawn_no_sub_task(&self, task: F) + where + F: Future> + Send + 'static, + { + self.spawn(|_| task); + } + + pub(crate) fn stop(&self) { + self.should_stop.store(true, std::sync::atomic::Ordering::SeqCst); + let handles = self.handles_receiver.clone(); + while let Ok(handle) = handles.try_recv() { + handle.abort(); + } + tracing::debug!("All tasks stopped"); + } + + pub(crate) fn stop_timeout(&self, timeout: Duration) { + let self_clone = self.clone(); + tokio::spawn(async move { + tokio::time::sleep(timeout).await; + self_clone.stop(); + }); + } +}