Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

int: Use polling crate as interface to poll system #123

Merged
merged 9 commits into from
Apr 9, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ log = "0.4"
nix = { version = "0.26", default-features = false, features = ["event", "fs", "signal", "socket", "time"] }
futures-util = { version = "0.3.5", optional = true, default-features = false, features = ["std"]}
futures-io = { version = "0.3.5", optional = true }
slotmap = "1.0"
thiserror = "1.0"
vec_map = "0.8.2"
pin-utils = { version = "0.1.0", optional = true }
polling = "2.6.0"
slab = "0.4.7"

[dev-dependencies]
futures = "0.3.5"
Expand Down
1 change: 1 addition & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ pub enum Error {
impl From<nix::errno::Errno> for Error {
/// Converts a [`nix::Error`] into a wrapped version of the equivalent
/// [`std::io::Error`].
#[cfg_attr(coverage, no_coverage)]
fn from(err: nix::errno::Errno) -> Self {
Into::<std::io::Error>::into(err).into()
}
Expand Down
12 changes: 7 additions & 5 deletions src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ use nix::fcntl::{fcntl, FcntlArg, OFlag};
use futures_io::{AsyncRead, AsyncWrite, IoSlice, IoSliceMut};

use crate::{
loop_logic::LoopInner, sources::EventDispatcher, Interest, Mode, Poll, PostAction, Readiness,
Token, TokenFactory,
loop_logic::{LoopInner, MAX_SOURCES_MASK},
sources::EventDispatcher,
Interest, Mode, Poll, PostAction, Readiness, Token, TokenFactory,
};

/// Adapter for async IO manipulations
Expand Down Expand Up @@ -63,7 +64,7 @@ impl<'l, F: AsFd> Async<'l, F> {
last_readiness: Readiness::EMPTY,
}));
let key = inner.sources.borrow_mut().insert(dispatcher.clone());
dispatcher.borrow_mut().token = Some(Token { key, sub_id: 0 });
dispatcher.borrow_mut().token = Some(Token { key });
inner.register(&dispatcher)?;

// Straightforward casting would require us to add the bound `Data: 'l` but we don't actually need it
Expand Down Expand Up @@ -198,11 +199,12 @@ impl<'l, Data> IoLoopInner for LoopInner<'l, Data> {
.borrow()
.token
.expect("No token for IO dispatcher")
.key;
.key
& MAX_SOURCES_MASK;
let _source = self
.sources
.borrow_mut()
.remove(key)
.try_remove(key)
.expect("Attempting to remove a non-existent source?!");
}
}
Expand Down
112 changes: 66 additions & 46 deletions src/loop_logic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,37 @@ use std::time::{Duration, Instant};
use std::future::Future;

use io_lifetimes::AsFd;
use slotmap::SlotMap;
use slab::Slab;

use crate::sources::{Dispatcher, EventSource, Idle, IdleDispatcher};
use crate::sys::Notifier;
use crate::{EventDispatcher, InsertError, Poll, PostAction, TokenFactory};

type IdleCallback<'i, Data> = Rc<RefCell<dyn IdleDispatcher<Data> + 'i>>;

slotmap::new_key_type! {
pub(crate) struct CalloopKey;
}
// The maximum number of sources that we are allowed to have.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not the maximum number of sources, it's the number of bits used to store the source id.

//
// This plus `MAX_SUBSOURCES` must equal the number of bits in `usize`.
#[cfg(target_pointer_width = "64")]
pub(crate) const MAX_SOURCES: u32 = 44;
#[cfg(target_pointer_width = "32")]
pub(crate) const MAX_SOURCES: u32 = 22;
#[cfg(target_pointer_width = "16")]
pub(crate) const MAX_SOURCES: u32 = 10;

// The maximum number of sub-sources that we are allowed to have.
//
// This plus `MAX_SOURCES` must equal the number of bits in `usize`.
#[cfg(target_pointer_width = "64")]
pub(crate) const MAX_SUBSOURCES: u32 = 20;
#[cfg(target_pointer_width = "32")]
pub(crate) const MAX_SUBSOURCES: u32 = 10;
#[cfg(target_pointer_width = "16")]
pub(crate) const MAX_SUBSOURCES: u32 = 6;

pub(crate) const MAX_SOURCES_TOTAL: usize = 1 << MAX_SOURCES;
pub(crate) const MAX_SUBSOURCES_TOTAL: usize = 1 << MAX_SUBSOURCES;
pub(crate) const MAX_SOURCES_MASK: usize = MAX_SOURCES_TOTAL - 1;

/// A token representing a registration in the [`EventLoop`].
///
Expand All @@ -29,12 +50,12 @@ slotmap::new_key_type! {
/// [remove](LoopHandle#method.remove) or [kill](LoopHandle#method.kill) it.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct RegistrationToken {
key: CalloopKey,
key: usize,
}

pub(crate) struct LoopInner<'l, Data> {
pub(crate) poll: RefCell<Poll>,
pub(crate) sources: RefCell<SlotMap<CalloopKey, Rc<dyn EventDispatcher<Data> + 'l>>>,
pub(crate) sources: RefCell<Slab<Rc<dyn EventDispatcher<Data> + 'l>>>,
idles: RefCell<Vec<IdleCallback<'l, Data>>>,
pending_action: Cell<PostAction>,
}
Expand Down Expand Up @@ -104,14 +125,24 @@ impl<'l, Data> LoopHandle<'l, Data> {
let mut sources = self.inner.sources.borrow_mut();
let mut poll = self.inner.poll.borrow_mut();

// Make sure we won't overflow the token.
if sources.vacant_key() >= MAX_SOURCES_TOTAL {
// We probably can't hit this branch unless we OOM.
#[cfg_attr(coverage, no_coverage)]
return Err(crate::Error::IoError(std::io::Error::new(
std::io::ErrorKind::Other,
"Too many sources",
)));
}

let key = sources.insert(dispatcher.clone_as_event_dispatcher());
let ret = sources
.get(key)
.unwrap()
.register(&mut poll, &mut TokenFactory::new(key));

if let Err(error) = ret {
sources.remove(key).expect("Source was just inserted?!");
sources.try_remove(key).expect("Source was just inserted?!");
return Err(error);
}

Expand Down Expand Up @@ -180,7 +211,7 @@ impl<'l, Data> LoopHandle<'l, Data> {

/// Removes this source from the event loop.
pub fn remove(&self, token: RegistrationToken) {
if let Some(source) = self.inner.sources.borrow_mut().remove(token.key) {
if let Some(source) = self.inner.sources.borrow_mut().try_remove(token.key) {
if let Err(e) = source.unregister(&mut self.inner.poll.borrow_mut()) {
log::warn!(
"[calloop] Failed to unregister source from the polling system: {:?}",
Expand Down Expand Up @@ -208,7 +239,6 @@ impl<'l, Data> LoopHandle<'l, Data> {
pub struct EventLoop<'l, Data> {
handle: LoopHandle<'l, Data>,
signals: Arc<Signals>,
ping: crate::sources::ping::Ping,
}

impl<'l, Data> std::fmt::Debug for EventLoop<'l, Data> {
Expand All @@ -233,7 +263,7 @@ impl<'l, Data> EventLoop<'l, Data> {
///
/// Fails if the initialization of the polling system failed.
pub fn try_new() -> crate::Result<Self> {
Self::inner_new(false)
Self::inner_new()
}

/// Create a new event loop in high precision mode
Expand All @@ -244,29 +274,28 @@ impl<'l, Data> EventLoop<'l, Data> {
///
/// Fails if the initialization of the polling system failed.
pub fn try_new_high_precision() -> crate::Result<Self> {
Self::inner_new(true)
// The polling crate uses high precision no matter what.
Self::inner_new()
}
notgull marked this conversation as resolved.
Show resolved Hide resolved

fn inner_new(high_precision: bool) -> crate::Result<Self> {
let poll = Poll::new(high_precision)?;
fn inner_new() -> crate::Result<Self> {
let poll = Poll::new()?;
let handle = LoopHandle {
inner: Rc::new(LoopInner {
poll: RefCell::new(poll),
sources: RefCell::new(SlotMap::with_key()),
sources: RefCell::new(Slab::new()),
idles: RefCell::new(Vec::new()),
pending_action: Cell::new(PostAction::Continue),
}),
};
let (ping, ping_source) = crate::sources::ping::make_ping()?;
handle.insert_source(ping_source, |_, _, _| {})?;

Ok(EventLoop {
handle,
signals: Arc::new(Signals {
stop: AtomicBool::new(false),
#[cfg(feature = "block_on")]
future_ready: AtomicBool::new(false),
}),
ping,
})
}

Expand All @@ -282,7 +311,7 @@ impl<'l, Data> EventLoop<'l, Data> {
) -> crate::Result<()> {
let now = Instant::now();
let events = {
let mut poll = self.handle.inner.poll.borrow_mut();
let poll = self.handle.inner.poll.borrow();
loop {
let result = poll.poll(timeout);

Expand All @@ -305,12 +334,15 @@ impl<'l, Data> EventLoop<'l, Data> {
};

for event in events {
// Get the registration token associated with the event.
let registroken_token = event.token.key & MAX_SOURCES_MASK;

let opt_disp = self
.handle
.inner
.sources
.borrow()
.get(event.token.key)
.get(registroken_token)
.cloned();

if let Some(disp) = opt_disp {
Expand Down Expand Up @@ -352,7 +384,7 @@ impl<'l, Data> EventLoop<'l, Data> {
.inner
.sources
.borrow()
.contains_key(event.token.key)
.contains(registroken_token)
{
// the source has been removed from within its callback, unregister it
let mut poll = self.handle.inner.poll.borrow_mut();
Expand Down Expand Up @@ -387,8 +419,8 @@ impl<'l, Data> EventLoop<'l, Data> {
.inner
.sources
.borrow()
.values()
.cloned()
.iter()
.map(|(_, source)| source.clone())
.collect::<Vec<_>>();

for source in sources {
Expand All @@ -404,8 +436,8 @@ impl<'l, Data> EventLoop<'l, Data> {
.inner
.sources
.borrow()
.values()
.cloned()
.iter()
.map(|(_, source)| source.clone())
.collect::<Vec<_>>();

for source in sources {
Expand Down Expand Up @@ -442,7 +474,7 @@ impl<'l, Data> EventLoop<'l, Data> {
pub fn get_signal(&self) -> LoopSignal {
LoopSignal {
signal: self.signals.clone(),
ping: self.ping.clone(),
notifier: self.handle.inner.poll.borrow().notifier(),
}
}

Expand Down Expand Up @@ -493,25 +525,19 @@ impl<'l, Data> EventLoop<'l, Data> {
use std::task::{Context, Poll, Wake, Waker};

/// A waker that will wake up the event loop when it is ready to make progress.
struct EventLoopWaker {
/// A signal that can be used to wake up the event loop.
ping: crate::sources::ping::Ping,

/// The signals to set.
signals: Arc<Signals>,
}
struct EventLoopWaker(LoopSignal);

impl Wake for EventLoopWaker {
fn wake(self: Arc<Self>) {
// Set the waker.
self.signals.future_ready.store(true, Ordering::Release);
self.ping.ping();
self.0.signal.future_ready.store(true, Ordering::Release);
self.0.notifier.notify().ok();
}

fn wake_by_ref(self: &Arc<Self>) {
// Set the waker.
self.signals.future_ready.store(true, Ordering::Release);
self.ping.ping();
self.0.signal.future_ready.store(true, Ordering::Release);
self.0.notifier.notify().ok();
}
}

Expand All @@ -520,10 +546,7 @@ impl<'l, Data> EventLoop<'l, Data> {

// Create a waker that will wake up the event loop when it is ready to make progress.
let waker = {
let handle = EventLoopWaker {
ping: self.ping.clone(),
signals: self.signals.clone(),
};
let handle = EventLoopWaker(self.get_signal());

Waker::from(Arc::new(handle))
};
Expand Down Expand Up @@ -563,7 +586,7 @@ impl<'l, Data> EventLoop<'l, Data> {
#[derive(Clone)]
pub struct LoopSignal {
signal: Arc<Signals>,
ping: crate::sources::ping::Ping,
notifier: Notifier,
}

impl std::fmt::Debug for LoopSignal {
Expand All @@ -579,7 +602,7 @@ impl LoopSignal {
/// Once this method is called, the next time the event loop has finished
/// waiting for events, it will return rather than starting to wait again.
///
/// This is only usefull if you are using the `EventLoop::run()` method.
/// This is only useful if you are using the `EventLoop::run()` method.
pub fn stop(&self) {
self.signal.stop.store(true, Ordering::Release);
}
Expand All @@ -591,7 +614,7 @@ impl LoopSignal {
/// ensures the event loop will terminate quickly if you specified a long
/// timeout (or no timeout at all) to the `dispatch` or `run` method.
pub fn wakeup(&self) {
self.ping.ping();
self.notifier.notify().ok();
}
}

Expand Down Expand Up @@ -742,9 +765,6 @@ mod tests {
.unwrap();
assert!(!dispatched);

// disabling it again is an error
event_loop.handle().disable(&ping_token).unwrap_err();

// reenable it, the previous ping now gets dispatched
event_loop.handle().enable(&ping_token).unwrap();
let mut dispatched = false;
Expand Down
Loading