diff --git a/Cargo.lock b/Cargo.lock index 73f98afdf..420970d9f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -47,12 +47,6 @@ dependencies = [ "unicode-segmentation", ] -[[package]] -name = "autocfg" -version = "1.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1fdabc7756949593fe60f30ec81974b613357de856987752631dea1e3394c80" - [[package]] name = "backtrace" version = "0.3.71" @@ -137,15 +131,6 @@ dependencies = [ "unicode-segmentation", ] -[[package]] -name = "concurrent-queue" -version = "2.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d16048cd947b08fa32c24458a22f5dc5e835264f689f4f5653210c69fd107363" -dependencies = [ - "crossbeam-utils", -] - [[package]] name = "console" version = "0.15.8" @@ -155,7 +140,7 @@ dependencies = [ "encode_unicode", "lazy_static", "libc", - "windows-sys 0.52.0", + "windows-sys", ] [[package]] @@ -207,7 +192,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a258e46cdc063eb8519c00b9fc845fc47bcfca4130e2f08e88665ceda8474245" dependencies = [ "libc", - "windows-sys 0.52.0", + "windows-sys", ] [[package]] @@ -219,7 +204,7 @@ dependencies = [ "cfg-if", "libc", "redox_syscall", - "windows-sys 0.52.0", + "windows-sys", ] [[package]] @@ -393,28 +378,6 @@ version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" -[[package]] -name = "pin-project-lite" -version = "0.2.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8afb450f006bf6385ca15ef45d71d2288452bc3683ce2e2cacc0d18e4be60b58" - -[[package]] -name = "polling" -version = "2.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4b2d323e8ca7996b3e23126511a523f7e62924d93ecd5ae73b333815b0eb3dce" -dependencies = [ - "autocfg", - "bitflags 1.3.2", - "cfg-if", - "concurrent-queue", - "libc", - "log", - "pin-project-lite", - "windows-sys 0.48.0", -] - [[package]] name = "ppv-lite86" version = "0.2.17" @@ -525,7 +488,7 @@ dependencies = [ "libc", "spin", "untrusted", - "windows-sys 0.52.0", + "windows-sys", ] [[package]] @@ -536,7 +499,6 @@ dependencies = [ "crossbeam-queue", "crossbeam-utils", "libc", - "polling", "rand", "rustix", "socket2", @@ -559,7 +521,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys", - "windows-sys 0.52.0", + "windows-sys", ] [[package]] @@ -626,7 +588,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05ffd9c0a93b7543e062e759284fcf5f5e3b098501104bfbdde4d404db792871" dependencies = [ "libc", - "windows-sys 0.52.0", + "windows-sys", ] [[package]] @@ -782,37 +744,13 @@ dependencies = [ "rustls-pki-types", ] -[[package]] -name = "windows-sys" -version = "0.48.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" -dependencies = [ - "windows-targets 0.48.5", -] - [[package]] name = "windows-sys" version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" dependencies = [ - "windows-targets 0.52.4", -] - -[[package]] -name = "windows-targets" -version = "0.48.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a2fa6e2155d7247be68c096456083145c183cbbbc2764150dda45a87197940c" -dependencies = [ - "windows_aarch64_gnullvm 0.48.5", - "windows_aarch64_msvc 0.48.5", - "windows_i686_gnu 0.48.5", - "windows_i686_msvc 0.48.5", - "windows_x86_64_gnu 0.48.5", - "windows_x86_64_gnullvm 0.48.5", - "windows_x86_64_msvc 0.48.5", + "windows-targets", ] [[package]] @@ -821,93 +759,51 @@ version = "0.52.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7dd37b7e5ab9018759f893a1952c9420d060016fc19a472b4bb20d1bdd694d1b" dependencies = [ - "windows_aarch64_gnullvm 0.52.4", - "windows_aarch64_msvc 0.52.4", - "windows_i686_gnu 0.52.4", - "windows_i686_msvc 0.52.4", - "windows_x86_64_gnu 0.52.4", - "windows_x86_64_gnullvm 0.52.4", - "windows_x86_64_msvc 0.52.4", + "windows_aarch64_gnullvm", + "windows_aarch64_msvc", + "windows_i686_gnu", + "windows_i686_msvc", + "windows_x86_64_gnu", + "windows_x86_64_gnullvm", + "windows_x86_64_msvc", ] -[[package]] -name = "windows_aarch64_gnullvm" -version = "0.48.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" - [[package]] name = "windows_aarch64_gnullvm" version = "0.52.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bcf46cf4c365c6f2d1cc93ce535f2c8b244591df96ceee75d8e83deb70a9cac9" -[[package]] -name = "windows_aarch64_msvc" -version = "0.48.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" - [[package]] name = "windows_aarch64_msvc" version = "0.52.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "da9f259dd3bcf6990b55bffd094c4f7235817ba4ceebde8e6d11cd0c5633b675" -[[package]] -name = "windows_i686_gnu" -version = "0.48.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" - [[package]] name = "windows_i686_gnu" version = "0.52.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b474d8268f99e0995f25b9f095bc7434632601028cf86590aea5c8a5cb7801d3" -[[package]] -name = "windows_i686_msvc" -version = "0.48.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" - [[package]] name = "windows_i686_msvc" version = "0.52.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1515e9a29e5bed743cb4415a9ecf5dfca648ce85ee42e15873c3cd8610ff8e02" -[[package]] -name = "windows_x86_64_gnu" -version = "0.48.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" - [[package]] name = "windows_x86_64_gnu" version = "0.52.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5eee091590e89cc02ad514ffe3ead9eb6b660aedca2183455434b93546371a03" -[[package]] -name = "windows_x86_64_gnullvm" -version = "0.48.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" - [[package]] name = "windows_x86_64_gnullvm" version = "0.52.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "77ca79f2451b49fa9e2af39f0747fe999fcda4f5e241b2898624dca97a1f2177" -[[package]] -name = "windows_x86_64_msvc" -version = "0.48.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" - [[package]] name = "windows_x86_64_msvc" version = "0.52.4" diff --git a/rt/Cargo.toml b/rt/Cargo.toml index d7f635112..5f5121c4a 100644 --- a/rt/Cargo.toml +++ b/rt/Cargo.toml @@ -19,10 +19,9 @@ crossbeam-queue = "^0.3" # (e.g. signal management) we still need to use libc in those cases. libc = "^0.2" rand = { version = "^0.8", features = ["default", "small_rng"] } -polling = "^2.8" unicode-segmentation = "^1.10" backtrace = "^0.3" -rustix = { version = "^0.38", features = ["fs", "mm", "param", "process", "net", "std", "time"], default-features = false } +rustix = { version = "^0.38", features = ["fs", "mm", "param", "process", "net", "std", "time", "event"], default-features = false } [dependencies.socket2] version = "^0.5" diff --git a/rt/src/network_poller.rs b/rt/src/network_poller.rs index e0eb95f64..e28c278d1 100644 --- a/rt/src/network_poller.rs +++ b/rt/src/network_poller.rs @@ -1,64 +1,33 @@ //! Polling of non-blocking sockets using the system's polling mechanism. -use crate::process::{ProcessPointer, RescheduleRights}; +use crate::process::RescheduleRights; use crate::state::RcState; -use polling::{Event, Poller, Source}; -use std::io; -/// The type of event a poller should wait for. -pub(crate) enum Interest { - /// We're only interested in read operations. - Read, - - /// We're only interested in write operations. - Write, -} - -/// A poller for non-blocking sockets. -pub(crate) struct NetworkPoller { - poller: Poller, -} +#[cfg(target_os = "linux")] +mod epoll; -impl NetworkPoller { - pub(crate) fn new() -> Self { - NetworkPoller { - poller: Poller::new().expect("Failed to set up the network poller"), - } - } +#[cfg(any(target_os = "freebsd", target_os = "macos"))] +mod kqueue; - pub(crate) fn poll(&self, events: &mut Vec) -> io::Result { - self.poller.wait(events, None) - } +#[cfg(target_os = "linux")] +use crate::network_poller::epoll as sys; - pub(crate) fn add( - &self, - process: ProcessPointer, - source: impl Source, - interest: Interest, - ) -> io::Result<()> { - self.poller.add(source, self.event(process, interest)) - } +#[cfg(any(target_os = "freebsd", target_os = "macos"))] +use crate::network_poller::kqueue as sys; - pub(crate) fn modify( - &self, - process: ProcessPointer, - source: impl Source, - interest: Interest, - ) -> io::Result<()> { - self.poller.modify(source, self.event(process, interest)) - } +/// The maximum number of events to poll in a single call. +/// +/// We deliberately use a large capacity here in order to reduce the amount of +/// poll wakeups, improving performance when many sockets become available at +/// the same time. +const CAPACITY: usize = 1024; - pub(crate) fn delete(&self, source: impl Source) -> io::Result<()> { - self.poller.delete(source) - } - - fn event(&self, process: ProcessPointer, interest: Interest) -> Event { - let key = process.identifier(); +/// A poller for non-blocking sockets. +pub(crate) type NetworkPoller = sys::Poller; - match interest { - Interest::Read => Event::readable(key), - Interest::Write => Event::writable(key), - } - } +/// The type of event a poller should wait for. +pub(crate) enum Interest { + Read, + Write, } /// A thread that polls a poller and reschedules processes. @@ -72,47 +41,32 @@ impl Worker { Worker { id, state } } - pub(crate) fn run(&self) { - let mut events = Vec::new(); + pub(crate) fn run(&mut self) { + let mut events = sys::Events::with_capacity(CAPACITY); let poller = &self.state.network_pollers[self.id]; loop { - if let Err(err) = poller.poll(&mut events) { - if err.kind() != io::ErrorKind::Interrupted { - // It's not entirely clear if/when we ever run into this, - // but should we run into any error that's _not_ an - // interrupt then there's probably more going on, and all we - // can do is abort. - panic!("Polling for IO events failed: {:?}", err); - } - } - - let processes = events - .iter() - .filter_map(|ev| { - let proc = unsafe { ProcessPointer::new(ev.key as *mut _) }; - let mut state = proc.state(); - let rights = state.try_reschedule_for_io(); - - // A process may have also been registered with the timeout - // thread (e.g. when using a timeout). As such we should - // only reschedule the process if the timout thread didn't - // already do this for us. - match rights { - RescheduleRights::Failed => None, - RescheduleRights::Acquired => Some(proc), - RescheduleRights::AcquiredWithTimeout => { - self.state - .timeout_worker - .increase_expired_timeouts(); - Some(proc) - } + let mut processes = poller.poll(&mut events); + + processes.retain(|proc| { + let mut state = proc.state(); + let rights = state.try_reschedule_for_io(); + + // A process may have also been registered with the timeout + // thread (e.g. when using a timeout). As such we should only + // reschedule the process if the timout thread didn't already do + // this for us. + match rights { + RescheduleRights::Failed => false, + RescheduleRights::Acquired => true, + RescheduleRights::AcquiredWithTimeout => { + self.state.timeout_worker.increase_expired_timeouts(); + true } - }) - .collect(); + } + }); self.state.scheduler.schedule_multiple(processes); - events.clear(); } } } @@ -122,7 +76,6 @@ mod tests { use super::*; use crate::test::{empty_process_class, new_process}; use std::net::UdpSocket; - use std::time::Duration; #[test] fn test_add() { @@ -131,7 +84,7 @@ mod tests { let output = UdpSocket::bind("0.0.0.0:0").unwrap(); let poller = NetworkPoller::new(); - assert!(poller.add(*process, &output, Interest::Read).is_ok()); + poller.add(*process, &output, Interest::Read); } #[test] @@ -141,8 +94,8 @@ mod tests { let class = empty_process_class("A"); let process = new_process(*class); - assert!(poller.add(*process, &output, Interest::Read).is_ok()); - assert!(poller.modify(*process, &output, Interest::Write).is_ok()); + poller.add(*process, &output, Interest::Read); + poller.modify(*process, &output, Interest::Write); } #[test] @@ -151,18 +104,9 @@ mod tests { let poller = NetworkPoller::new(); let class = empty_process_class("A"); let process = new_process(*class); - let mut events = Vec::with_capacity(1); - - assert!(poller.add(*process, &output, Interest::Write).is_ok()); - assert!(poller.delete(&output).is_ok()); - - let len = poller - .poller - .wait(&mut events, Some(Duration::from_millis(10))) - .unwrap(); - assert_eq!(len, 0); - assert_eq!(events.len(), 0); + poller.add(*process, &output, Interest::Write); + poller.delete(&output); } #[test] @@ -171,14 +115,13 @@ mod tests { let poller = NetworkPoller::new(); let class = empty_process_class("A"); let process = new_process(*class); - let mut events = Vec::with_capacity(1); + let mut events = sys::Events::with_capacity(1); - poller.add(*process, &output, Interest::Write).unwrap(); + poller.add(*process, &output, Interest::Write); + let procs = poller.poll(&mut events); - assert!(poller.poll(&mut events).is_ok()); - assert_eq!(events.capacity(), 1); - assert_eq!(events.len(), 1); - assert_eq!(events[0].key, process.identifier()); + assert_eq!(procs.len(), 1); + assert_eq!(events.len(), 0); } #[test] @@ -187,14 +130,21 @@ mod tests { let sock2 = UdpSocket::bind("0.0.0.0:0").unwrap(); let poller = NetworkPoller::new(); let class = empty_process_class("A"); - let process = new_process(*class); - let mut events = Vec::with_capacity(1); + let proc1 = new_process(*class); + let proc2 = new_process(*class); + let mut events = sys::Events::with_capacity(1); + + poller.add(*proc1, &sock1, Interest::Write); + poller.add(*proc2, &sock2, Interest::Write); - poller.add(*process, &sock1, Interest::Write).unwrap(); - poller.add(*process, &sock2, Interest::Write).unwrap(); + let procs = poller.poll(&mut events); - assert!(poller.poll(&mut events).is_ok()); - assert!(events.capacity() >= 2); - assert_eq!(events.len(), 2); + assert_eq!(procs.len(), 1); + assert_eq!(events.len(), 0); + + let procs = poller.poll(&mut events); + + assert_eq!(procs.len(), 1); + assert_eq!(events.len(), 0); } } diff --git a/rt/src/network_poller/epoll.rs b/rt/src/network_poller/epoll.rs new file mode 100644 index 000000000..6fecea8d8 --- /dev/null +++ b/rt/src/network_poller/epoll.rs @@ -0,0 +1,74 @@ +use crate::network_poller::Interest; +use crate::process::ProcessPointer; +use rustix::event::epoll::{ + add, create, delete, modify, wait, CreateFlags, EventData, EventFlags, + EventVec, +}; +use rustix::fd::{AsFd, OwnedFd}; +use rustix::io::Errno; + +fn flags_for(interest: Interest) -> EventFlags { + let flags = match interest { + Interest::Read => EventFlags::IN, + Interest::Write => EventFlags::OUT, + }; + + flags | EventFlags::ET | EventFlags::ONESHOT +} + +pub(crate) type Events = EventVec; + +pub(crate) struct Poller { + fd: OwnedFd, +} + +impl Poller { + pub(crate) fn new() -> Poller { + let fd = create(CreateFlags::CLOEXEC).expect("epoll_create() failed"); + + Poller { fd } + } + + pub(crate) fn poll(&self, events: &mut Events) -> Vec { + match wait(&self.fd, events, -1) { + Ok(_) | Err(Errno::INTR) => {} + Err(_) => panic!("epoll_wait() failed"), + } + + let procs = events + .iter() + .map(|e| unsafe { ProcessPointer::new(e.data.u64() as *mut _) }) + .collect(); + + events.clear(); + procs + } + + pub(crate) fn add( + &self, + process: ProcessPointer, + source: impl AsFd, + interest: Interest, + ) { + let data = EventData::new_u64(process.identifier() as _); + + add(&self.fd, source, data, flags_for(interest)) + .expect("epoll_ctl() failed"); + } + + pub(crate) fn modify( + &self, + process: ProcessPointer, + source: impl AsFd, + interest: Interest, + ) { + let data = EventData::new_u64(process.identifier() as _); + + modify(&self.fd, source, data, flags_for(interest)) + .expect("epoll_ctl() failed"); + } + + pub(crate) fn delete(&self, source: impl AsFd) { + delete(&self.fd, source).expect("epoll_ctl() failed"); + } +} diff --git a/rt/src/network_poller/kqueue.rs b/rt/src/network_poller/kqueue.rs new file mode 100644 index 000000000..4d3790277 --- /dev/null +++ b/rt/src/network_poller/kqueue.rs @@ -0,0 +1,107 @@ +use crate::network_poller::Interest; +use crate::process::ProcessPointer; +use rustix::event::kqueue::{kevent, kqueue, Event, EventFilter, EventFlags}; +use rustix::fd::{AsFd, AsRawFd, OwnedFd}; +use rustix::io::Errno; + +pub(crate) type Events = Vec; + +pub(crate) struct Poller { + fd: OwnedFd, +} + +impl Poller { + pub(crate) fn new() -> Poller { + let fd = kqueue().expect("failed to create the kqueue descriptor"); + + Poller { fd } + } + + pub(crate) fn poll(&self, events: &mut Events) -> Vec { + match unsafe { kevent(&self.fd, &[], events, None) } { + Ok(_) | Err(Errno::INTR) => {} + Err(_) => panic!("kevent() failed"), + } + + let procs = events + .iter() + .map(|e| unsafe { + ProcessPointer::new(e.udata() as usize as *mut _) + }) + .collect(); + + events.clear(); + procs + } + + pub(crate) fn add( + &self, + process: ProcessPointer, + source: impl AsFd, + interest: Interest, + ) { + let fd = source.as_fd().as_raw_fd(); + let (add, del) = match interest { + Interest::Read => (EventFilter::Read(fd), EventFilter::Write(fd)), + Interest::Write => (EventFilter::Write(fd), EventFilter::Read(fd)), + }; + let id = process.identifier() as isize; + let flags = + EventFlags::CLEAR | EventFlags::ONESHOT | EventFlags::RECEIPT; + let events = [ + Event::new(add, EventFlags::ADD | flags, id), + Event::new(del, EventFlags::DELETE, 0), + ]; + + self.apply(&events); + } + + pub(crate) fn modify( + &self, + process: ProcessPointer, + source: impl AsFd, + interest: Interest, + ) { + self.add(process, source, interest); + } + + pub(crate) fn delete(&self, source: impl AsFd) { + let fd = source.as_fd().as_raw_fd(); + let events = [ + Event::new(EventFilter::Read(fd), EventFlags::DELETE, 0), + Event::new(EventFilter::Write(fd), EventFlags::DELETE, 0), + ]; + + self.apply(&events); + } + + fn apply(&self, events: &[Event; 2]) { + let mut changes = Vec::with_capacity(events.len()); + + unsafe { + match kevent(&self.fd, events, &mut changes, None) { + Ok(_) | Err(Errno::INTR) => {} + Err(e) => panic!("kevent() failed: {}", e), + } + }; + + for event in changes { + let data = event.data() as i32; + + // Per https://github.com/tokio-rs/mio/issues/582 we ignore PIPE, + // though it's highly unlikely to ever occur in reality given the + // affected macOS versions are quite old at this point. + if event.flags().contains(EventFlags::ERROR) + && data != 0 + && data != Errno::NOENT.raw_os_error() + && data != Errno::PIPE.raw_os_error() + { + let err = Errno::from_raw_os_error(data); + + // In the extremely unlikely event we reach this code, there's + // nothing we can really do but abort. + panic!("kevent() failed to apply the changes: {}", err); + } + } + } +} diff --git a/rt/src/runtime/socket.rs b/rt/src/runtime/socket.rs index 665981504..352ebd75a 100644 --- a/rt/src/runtime/socket.rs +++ b/rt/src/runtime/socket.rs @@ -55,7 +55,7 @@ fn blocking( proc_state.waiting_for_io(None); } - socket.register(state, process, poll_id, interest)?; + socket.register(state, process, poll_id, interest); } // Safety: the current thread is holding on to the process' run lock, so if diff --git a/rt/src/socket.rs b/rt/src/socket.rs index 9026affdd..c45288267 100644 --- a/rt/src/socket.rs +++ b/rt/src/socket.rs @@ -226,7 +226,7 @@ impl Socket { process: ProcessPointer, thread_poller_id: usize, interest: Interest, - ) -> io::Result<()> { + ) { let existing_id = self.registered.load(Ordering::Acquire); // Once registered, the process might be rescheduled immediately if @@ -242,11 +242,11 @@ impl Socket { self.registered.store(thread_poller_id as i8, Ordering::Release); - poller.add(process, &self.inner, interest) + poller.add(process, &self.inner, interest); } else { let poller = &state.network_pollers[existing_id as usize]; - poller.modify(process, &self.inner, interest) + poller.modify(process, &self.inner, interest); } // *DO NOT* use "self" from here on, as the socket/process may already // be running on a different thread. @@ -254,7 +254,8 @@ impl Socket { pub(crate) fn deregister(&mut self, state: &State) { let poller_id = self.registered.load(Ordering::Acquire) as usize; - let _ = state.network_pollers[poller_id].delete(&self.inner); + + state.network_pollers[poller_id].delete(&self.inner); } pub(crate) fn accept(&self) -> io::Result {