Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(network scan): A working scanner with ping, port discovery, broadcast, and netbios #683

Merged
merged 46 commits into from
Feb 9, 2024
Merged
Show file tree
Hide file tree
Changes from 37 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
bc31f68
Network Scanner finished
irvingoujAtDevolution Feb 5, 2024
4c60074
Cargo.lock
irvingoujAtDevolution Feb 5, 2024
cda38e5
update cargo
irvingoujAtDevolution Feb 5, 2024
178c6c0
remove drop runtime
irvingoujAtDevolution Feb 5, 2024
7c6f256
checkout master for examples
irvingoujAtDevolution Feb 5, 2024
c9d7af4
checkout master for tests
irvingoujAtDevolution Feb 5, 2024
2d2f054
fix ci test
irvingoujAtDevolution Feb 5, 2024
b145e36
clippy and fmt
irvingoujAtDevolution Feb 5, 2024
20df4fd
Fix Runtime Logic
irvingoujAtDevolution Feb 5, 2024
e68b37d
fix connect logic
irvingoujAtDevolution Feb 5, 2024
00e2865
review fix
irvingoujAtDevolution Feb 5, 2024
a9c41a9
Update test.rs
irvingoujAtDevolution Feb 5, 2024
6325c7f
Debuggin test
irvingoujAtDevolution Feb 5, 2024
a227487
CI and Review fix
irvingoujAtDevolution Feb 5, 2024
66249d6
Fix tests
irvingoujAtDevolution Feb 5, 2024
d05cf0b
Fix linux test
irvingoujAtDevolution Feb 5, 2024
d92ec84
remove code not related to PR
irvingoujAtDevolution Feb 5, 2024
1b37f06
small fixes, change log level
irvingoujAtDevolution Feb 5, 2024
2411062
clippy
irvingoujAtDevolution Feb 5, 2024
14277e0
Merge branch 'master' into Network-Scan-Scanner
irvingoujAtDevolution Feb 6, 2024
fd41583
Update crates/network-scanner-proto/Cargo.toml
irvingoujAtDevolution Feb 6, 2024
8d5e3ce
Fix, add task mamanger
irvingoujAtDevolution Feb 7, 2024
857510c
remove dashmap
irvingoujAtDevolution Feb 7, 2024
7f36173
Update crates/network-scanner-proto/README.md
irvingoujAtDevolution Feb 7, 2024
1d0f4e1
Merge branch 'Network-Scan-Scanner' of github.com:Devolutions/devolut…
irvingoujAtDevolution Feb 7, 2024
29a227c
remove unnecessary count
irvingoujAtDevolution Feb 7, 2024
1dac52d
update polling
irvingoujAtDevolution Feb 7, 2024
8941a61
fix linux
irvingoujAtDevolution Feb 7, 2024
a68569f
fix linux
irvingoujAtDevolution Feb 7, 2024
a81b396
fix linux
irvingoujAtDevolution Feb 7, 2024
bb515c0
review fixes
irvingoujAtDevolution Feb 9, 2024
28634b9
fix examples
irvingoujAtDevolution Feb 9, 2024
2efeb0c
Update crates/network-scanner/src/task_utils.rs
irvingoujAtDevolution Feb 9, 2024
f298fa2
review fix
irvingoujAtDevolution Feb 9, 2024
5104f51
Merge branch 'Network-Scan-Scanner' of github.com:Devolutions/devolut…
irvingoujAtDevolution Feb 9, 2024
b607462
review fix
irvingoujAtDevolution Feb 9, 2024
e646b99
review fix
irvingoujAtDevolution Feb 9, 2024
ffe8982
review fix and better tracing
irvingoujAtDevolution Feb 9, 2024
06a10fd
merge master
irvingoujAtDevolution Feb 9, 2024
bdc9edf
review fix
irvingoujAtDevolution Feb 9, 2024
586f64d
review fix
irvingoujAtDevolution Feb 9, 2024
657d46f
review fix
irvingoujAtDevolution Feb 9, 2024
a541598
adjust queue size
irvingoujAtDevolution Feb 9, 2024
58cf8d7
add netbios interval
irvingoujAtDevolution Feb 9, 2024
ef29417
add dns resolver
irvingoujAtDevolution Feb 9, 2024
877ebe3
Fix dependencies
CBenoit Feb 9, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
192 changes: 114 additions & 78 deletions Cargo.lock
irvingoujAtDevolution marked this conversation as resolved.
Show resolved Hide resolved

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion crates/network-scanner-net/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ anyhow = "1.0.79"
crossbeam = { version = "0.8.4", features = ["crossbeam-channel"] }
futures = "0.3.30"
parking_lot = "0.12.1"
polling = { git = "https://github.com/smol-rs/polling", rev = "62430fd56e668559d08ca7071ab13a0e116ba515" }
polling = { git = "https://github.com/devolutions/polling.git", branch = "fix-detect-connection-fail" }
socket2 = { version = "0.5.5", features = ["all"] }
thiserror = "1.0.56"
tokio-stream = "0.1.14"
Expand Down
2 changes: 1 addition & 1 deletion crates/network-scanner-net/examples/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use socket2::SockAddr;
#[tokio::main]
pub async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::SubscriberBuilder::default()
.with_max_level(tracing::Level::DEBUG)
.with_max_level(tracing::Level::TRACE)
.with_thread_names(true)
.init();

Expand Down
42 changes: 42 additions & 0 deletions crates/network-scanner-net/examples/tcp_bench.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
use std::net::{IpAddr, SocketAddr};

use network_scanner_net::runtime::Socket2Runtime;
use socket2::{Domain, SockAddr, Type};

fn main() -> anyhow::Result<()> {
let rt = tokio::runtime::Runtime::new()?;

rt.block_on(async move {
let runtime = Socket2Runtime::new(None)?;

let mut futures = vec![];
for ip in 0..255 {
let ip: IpAddr = format!("10.10.0.{}", ip).parse().unwrap();
let runtime = runtime.clone();
let ports = vec![22, 23, 80, 443, 3389];
for port in ports {
let socket = runtime.new_socket(Domain::IPV4, Type::STREAM, None)?;
let addr = SocketAddr::from((ip, port));
let addr = SockAddr::from(addr);
let future = async move {
socket.connect(&addr).await?;
anyhow::Ok(())
};
futures.push(future);
}
}

let now = std::time::Instant::now();
let hanldes: Vec<_> = futures.into_iter().map(|f| tokio::task::spawn(f)).collect();

for handle in hanldes {
let _ = handle.await;
}

println!("elapsed: {:?}", now.elapsed());

anyhow::Ok(())
})?;

Ok(())
}
173 changes: 84 additions & 89 deletions crates/network-scanner-net/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ use std::{
Arc,
},
task::Waker,
time::Duration,
};

use anyhow::Context;
use crossbeam::channel::{Receiver, Sender};

use parking_lot::Mutex;
use polling::{Event, Events};
use socket2::Socket;
Expand All @@ -23,8 +25,7 @@ pub struct Socket2Runtime {
next_socket_id: AtomicUsize,
is_terminated: Arc<AtomicBool>,
register_sender: Sender<RegisterEvent>,
event_receiver: Receiver<Event>,
event_cache: Mutex<HashSet<EventWrapper>>,
event_history: Arc<Mutex<HashSet<EventWrapper>>>,
}

impl Drop for Socket2Runtime {
Expand All @@ -40,27 +41,25 @@ impl Drop for Socket2Runtime {
}
}

const QUEUE_CAPACITY: usize = 1024;
const QUEUE_CAPACITY: usize = 8024;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe too big?

Copy link
Member

@CBenoit CBenoit Feb 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason why you changed? It wasn’t big enough?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it was causing errors on registering the events.

impl Socket2Runtime {
/// Create a new runtime with a queue capacity, default is 1024.
/// Create a new runtime with a queue capacity, default is 8024.
pub fn new(queue_capacity: Option<usize>) -> anyhow::Result<Arc<Self>> {
let poller = polling::Poller::new()?;

let (register_sender, register_receiver) =
crossbeam::channel::bounded(queue_capacity.unwrap_or(QUEUE_CAPACITY));

let (event_sender, event_receiver) = crossbeam::channel::bounded(queue_capacity.unwrap_or(QUEUE_CAPACITY));

let event_history = Arc::new(Mutex::new(HashSet::new()));
let runtime = Self {
poller: Arc::new(poller),
next_socket_id: AtomicUsize::new(0),
is_terminated: Arc::new(AtomicBool::new(false)),
register_sender,
event_receiver,
event_cache: Mutex::new(HashSet::new()),
event_history: event_history.clone(),
};
let runtime = Arc::new(runtime);
runtime.start_loop(register_receiver, event_sender)?;
runtime.start_loop(register_receiver, event_history)?;
Ok(runtime)
}

Expand All @@ -81,130 +80,105 @@ impl Socket2Runtime {
pub(crate) fn remove_socket(&self, socket: &socket2::Socket, id: usize) -> anyhow::Result<()> {
self.poller.delete(socket)?;
// remove all events related to this socket
self.event_cache.lock().retain(|event| id == event.0.key);
self.remove_events_with_id_from_history(id);
Ok(())
}

fn start_loop(
&self,
register_receiver: Receiver<RegisterEvent>,
event_sender: Sender<Event>,
event_history: Arc<Mutex<HashSet<EventWrapper>>>,
) -> anyhow::Result<()> {
// To prevent an Arc cycle with the Arc<Socket2Runtime>, we added additional indirection.
// Otherwise, the reference in the new thread would prevent the runtime from being dropped and shutdown.
let poller = self.poller.clone();
// We make is_terminated Arc<AtomicBool> and poller Arc<Poller> so that we can clone them and move them into the thread.
// The reason why we cannot hold a Arc<Socket2Runtime> in the thread is because it will create a cycle reference and the runtime will never be dropped.
let is_terminated = self.is_terminated.clone();

let poller = self.poller.clone();
std::thread::Builder::new()
.name("[raw-socket]:io-event-loop".to_string())
.spawn(move || {
let mut events = Events::with_capacity(NonZeroUsize::new(1024).unwrap());
let mut events = Events::with_capacity(NonZeroUsize::new(QUEUE_CAPACITY).unwrap());
tracing::debug!("starting io event loop");
// events registered but not happened yet
let mut events_registered = HashMap::new();

// events happened but not registered yet
let mut events_happened = HashMap::new();
let mut events_registered: HashMap<EventWrapper, Waker> = HashMap::new();

loop {
if is_terminated.load(Ordering::Acquire) {
break;
}

tracing::debug!("polling events");
if let Err(e) = poller.wait(&mut events, None) {
// The timeout 200ms is critical, sometimes the event might be registered after the event happened
// the timeout limit will allow the events to be checked periodically.
if let Err(e) = poller.wait(&mut events, Some(Duration::from_millis(200))) {
tracing::error!(error = ?e, "failed to poll events");
is_terminated.store(true, Ordering::SeqCst);
break;
};

for event in events.iter() {
tracing::trace!(?event, "event happened");
events_happened.insert(event.key, event);
// This is different from just insert, as the event wrapper will have the same hash, it actually does not replace the old one.
// by removing the old one first, we can make sure the new one is inserted.
event_history.lock().remove(&event.into());
event_history.lock().insert(event.into());
}
events.clear();

while let Ok(event) = register_receiver.try_recv() {
match event {
RegisterEvent::Register { id, waker } => {
events_registered.insert(id, waker);
RegisterEvent::Register { event, waker } => {
events_registered.insert(event.into(), waker);
}
RegisterEvent::Unregister { id } => {
events_registered.remove(&id);
RegisterEvent::Unregister { event } => {
events_registered.remove(&event.into());
}
}
}

let intersection = events_happened
.keys()
.filter(|key| events_registered.contains_key(key))
.cloned()
.collect::<Vec<_>>();

intersection.into_iter().for_each(|ref key| {
let event = events_happened.remove(key).unwrap();
let waker = events_registered.remove(key).unwrap();
let _ = event_sender.try_send(event);
waker.wake_by_ref();
tracing::trace!(?event, "waking up waker");
});
for (event, waker) in events_registered.iter() {
if event_history.lock().get(event).is_some() {
waker.wake_by_ref();
}
}
}
tracing::info!("io event loop terminated");
tracing::debug!("io event loop terminated");
})
.with_context(|| "failed to spawn io event loop thread")?;

Ok(())
}

/// Ideally, we should have a dedicated thread to handle events we received, but we don't really want to spawn a second thread
/// Alternatively, we can have all socket futures call this function to check if there is any event for them.
/// The number of times the socket futures is polled is almost guaranteed to be more than the number of registration we received.
/// hence the event receiver will not be blocked.
pub(crate) fn check_event(&self, event: Event, remove: bool) -> Option<Event> {
let mut event_cache = self.event_cache.lock();
while let Ok(event) = self.event_receiver.try_recv() {
event_cache.insert(event.into());
pub(crate) fn check_event_with_id(&self, id: usize) -> Vec<Event> {
let event_interested = vec![
Event::readable(id),
Event::writable(id),
Event::all(id),
Event::none(id),
];

let mut res = Vec::new();
for event in event_interested {
if let Some(event) = self.event_history.lock().get(&event.into()) {
res.push(event.0);
}
}
tracing::debug!("checking event cache {:?}", event_cache);

let event = if remove {
event_cache.take(&event.into())
} else if event_cache.contains(&event.into()) {
Some(event.into())
} else {
None
};

event.map(|event| event.into_inner())
res
}

pub(crate) fn check_event_with_id(&self, id: usize, remove: bool) -> Vec<Event> {
let mut event_cache = self.event_cache.lock();
while let Ok(event) = self.event_receiver.try_recv() {
event_cache.insert(event.into());
}
pub(crate) fn remove_event_from_history(&self, event: Event) {
self.event_history.lock().remove(&event.into());
}

pub(crate) fn remove_events_with_id_from_history(&self, id: usize) {
let event_interested = vec![
Event::readable(id),
Event::writable(id),
Event::all(id),
Event::none(id),
];
let mut res = vec![];

if remove {
event_interested.into_iter().for_each(|event| {
if let Some(event) = event_cache.take(&event.into()) {
res.push(event.into_inner());
}
});
} else {
event_interested.into_iter().for_each(|event| {
if event_cache.contains(&event.into()) {
res.push(event);
}
});
for event in event_interested {
self.event_history.lock().remove(&event.into());
}

res
}

pub(crate) fn register(&self, socket: &Socket, event: Event, waker: Waker) -> anyhow::Result<()> {
Expand All @@ -219,24 +193,45 @@ impl Socket2Runtime {
// it would be better to drop the register event then block the worker thread or main thread.
// as the worker thread is shared for the entire application.
self.register_sender
.try_send(RegisterEvent::Register { id: event.key, waker })
.try_send(RegisterEvent::Register { event, waker })
.with_context(|| "failed to send register event to register loop")
}

pub(crate) fn unregister(&self, id: usize) -> anyhow::Result<()> {
pub(crate) fn register_events(&self, socket: &Socket, events: &[Event], waker: Waker) -> anyhow::Result<()> {
if self.is_terminated.load(Ordering::Acquire) {
Err(ScannnerNetError::AsyncRuntimeError("runtime is terminated".to_string()))?;
}

for event in events {
tracing::trace!(?event, ?socket, "registering event");
self.poller.modify(socket, *event)?;
self.register_sender
.try_send(RegisterEvent::Register {
event: *event,
waker: waker.clone(),
})
.with_context(|| "failed to send register event to register loop")?;
}

Ok(())
}

pub(crate) fn unregister(&self, event: Event) -> anyhow::Result<()> {
if self.is_terminated.load(Ordering::Acquire) {
Err(ScannnerNetError::AsyncRuntimeError("runtime is terminated".to_string()))?;
}
self.register_sender
.try_send(RegisterEvent::Unregister { id })
.with_context(|| "failed to send unregister event to register loop")
.try_send(RegisterEvent::Unregister { event })
.with_context(|| "failed to send unregister event to register loop")?;

Ok(())
}
}

#[derive(Debug)]
enum RegisterEvent {
Register { id: usize, waker: Waker },
Unregister { id: usize },
Register { event: Event, waker: Waker },
Unregister { event: Event },
}

#[derive(Debug)]
Expand Down Expand Up @@ -264,8 +259,8 @@ impl From<Event> for EventWrapper {
}
}

impl EventWrapper {
pub(crate) fn into_inner(self) -> Event {
self.0
impl From<&Event> for EventWrapper {
fn from(event: &Event) -> Self {
Self(*event)
}
}
Loading