Skip to content

Commit

Permalink
Lock sources list with rwlock instead of mutex
Browse files Browse the repository at this point in the history
  • Loading branch information
notgull committed Feb 21, 2023
1 parent 28689dd commit 90e5013
Showing 1 changed file with 7 additions and 9 deletions.
16 changes: 7 additions & 9 deletions src/iocp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::marker::PhantomPinned;
use std::os::windows::io::{AsRawHandle, RawHandle, RawSocket};
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex, MutexGuard};
use std::sync::{Arc, Mutex, MutexGuard, RwLock};
use std::time::{Duration, Instant};

#[cfg(not(polling_no_io_safety))]
Expand All @@ -36,7 +36,7 @@ pub(super) struct Poller {
afd: Mutex<Vec<Arc<Afd<Packet>>>>,

/// The state of the sources registered with this poller.
sources: Mutex<HashMap<RawSocket, Packet>>,
sources: RwLock<HashMap<RawSocket, Packet>>,

/// Sockets with pending updates.
pending_updates: ConcurrentQueue<Packet>,
Expand Down Expand Up @@ -72,7 +72,7 @@ impl Poller {
Ok(Poller {
port,
afd: Mutex::new(Vec::new()),
sources: Mutex::new(HashMap::new()),
sources: RwLock::new(HashMap::new()),
pending_updates: ConcurrentQueue::bounded(1024),
polling: AtomicBool::new(false),
packets: Mutex::new(Vec::with_capacity(1024)),
Expand Down Expand Up @@ -133,7 +133,7 @@ impl Poller {

// Keep track of the source in the poller.
{
let mut sources = self.sources.lock().unwrap_or_else(|e| e.into_inner());
let mut sources = self.sources.write().unwrap_or_else(|e| e.into_inner());

match sources.entry(socket) {
Entry::Vacant(v) => {
Expand Down Expand Up @@ -174,7 +174,7 @@ impl Poller {

// Get a reference to the source.
let source = {
let sources = self.sources.lock().unwrap_or_else(|e| e.into_inner());
let sources = self.sources.read().unwrap_or_else(|e| e.into_inner());

match sources.get(&socket) {
Some(s) => s.clone(),
Expand All @@ -198,7 +198,7 @@ impl Poller {

// Get a reference to the source.
let source = {
let mut sources = self.sources.lock().unwrap_or_else(|e| e.into_inner());
let mut sources = self.sources.write().unwrap_or_else(|e| e.into_inner());

match sources.remove(&socket) {
Some(s) => s,
Expand Down Expand Up @@ -575,7 +575,7 @@ impl PacketUnwrapped {
if events.contains(AfdPollMask::LOCAL_CLOSE) {
let source = {
let mut sources =
poller.sources.lock().unwrap_or_else(|e| e.into_inner());
poller.sources.write().unwrap_or_else(|e| e.into_inner());
sources.remove(&socket_state.socket).unwrap()
};
return source.begin_delete().map(|()| FeedEventResult::NoEvent);
Expand Down Expand Up @@ -731,8 +731,6 @@ enum FeedEventResult {
Notified,
}

impl SocketState {}

fn event_to_afd_mask(readable: bool, writable: bool, error: bool) -> afd::AfdPollMask {
use afd::AfdPollMask as AfdPoll;

Expand Down

0 comments on commit 90e5013

Please sign in to comment.