Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

init function in .map_init called unexpectedly many times. #1214

Open
Neutron3529 opened this issue Dec 3, 2024 · 3 comments
Open

init function in .map_init called unexpectedly many times. #1214

Neutron3529 opened this issue Dec 3, 2024 · 3 comments

Comments

@Neutron3529
Copy link

Neutron3529 commented Dec 3, 2024

The main idea is that, I want to have a counter, which count the frequency of all possible results. Since I know the result is usize and less than a small value (for example, 300), I could generate a vector of length 300, and deal the results with the vector. The question is, there is no such .count() function, and create vectors with such memory allocation might be slow.

Is it possible to provide some counter (.count_with_vec(vec_length) / .count_with_hash()) for quickly summarize the results?


Old content, which I think that might be a bug, duplicated to #742 #718

use std::sync::atomic::{AtomicUsize, Ordering};
use rayon::prelude::*;
fn main() {
    let item = AtomicUsize::new(0);
    println!(
        "got {}",
        (0..1_0000_0000)
            .into_par_iter()
            .map_init(|| item.fetch_add(1, Ordering::SeqCst), |_, _| 1)
            .sum::<usize>()
    );
    println!("init called: {item:?}")
}

with cargo run --release, I've found the item goes to 3000~5000 after the iter ends, which is unexpectedly high.

$ cargo rr
    Finished `release` profile [optimized] target(s) in 0.01s
     Running `target/release/rayon-bug`
got 100000000
init called: 3498
$ cargo rr
    Finished `release` profile [optimized] target(s) in 0.01s
     Running `target/release/rayon-bug`
got 100000000
init called: 3154
$ cargo rr
    Finished `release` profile [optimized] target(s) in 0.01s
     Running `target/release/rayon-bug`
got 100000000
init called: 4342
$ cargo rr
    Finished `release` profile [optimized] target(s) in 0.01s
     Running `target/release/rayon-bug`
got 100000000
init called: 3670

Even the init is a heavy function, the result does not changes:

use std::{
    sync::atomic::{AtomicUsize, Ordering},
    thread,
    time::Duration,
};

use rayon::prelude::*;
fn main() {
    rayon::ThreadPoolBuilder::new()
        .num_threads(16)
        .build_global()
        .unwrap();
    let item = AtomicUsize::new(0);
    println!(
        "got {}",
        (0..10_0000_0000)
            .into_par_iter()
            .map_init(
                || {
                    item.fetch_add(1, Ordering::SeqCst);
                    thread::sleep(Duration::from_millis(10)); // suppose we do a lot of things here.
                },
                |_, _| 1
            )
            .sum::<usize>()
    );
    println!("init called: {item:?}")
}
$ cargo rr
   Compiling rayon-bug v0.1.0 (/me/rayon-bug)
    Finished `release` profile [optimized] target(s) in 0.81s
     Running `target/release/rayon-bug`
got 1000000000
init called: 4018

I have no idea how to prevent init called so many times.

@cuviper
Copy link
Member

cuviper commented Dec 3, 2024

The "adaptive" splitting is sometimes too aggressive in the way iterators use join to split jobs, and I haven't found a sound way to share the init data when it gets over-split (i.e. split but not actually stolen to a different thread). Maybe I should re-evaluate #857 to make it less aggressive -- I'd be interested to hear how that does on your real workload.

I have no idea how to prevent init called so many times.

If the source iterator is an IndexedParallelIterator, you can use with_min_len to set a lower bound, limiting the total number of splits.

@Neutron3529
Copy link
Author

Is it possible doing something like:

use std::{
    sync::atomic::{AtomicUsize, Ordering},
    thread,
};
#[inline(always)]
pub fn repeat<M, R: std::marker::Send, T, const N: usize>(
    to: usize,
    chunk_size: usize,
    init: impl std::marker::Send + Copy + Fn(usize) -> (R, T),
    map_reduce: impl std::marker::Send + Copy + Fn((R, T), usize) -> (R, T),
) -> [R; N] {
    let from = AtomicUsize::new(chunk_size * N);
    thread::scope(|s| {
        std::array::from_fn(|idx| {
            let from = &from;
            s.spawn(move || {
                let (mut result, mut data) = init(idx);
                let mut old = idx * chunk_size;
                let mut new = old + chunk_size;
                while new < to {
                    (result, data) = (old..new).fold((result, data), map_reduce);

                    old = from.load(Ordering::Acquire);
                    new = old + chunk_size;
                    while let Err(o) =
                        from.compare_exchange_weak(old, new, Ordering::AcqRel, Ordering::Acquire)
                    {
                        (old, new) = (o, o + chunk_size)
                    }
                }

                (old..to).fold((result, data), map_reduce).0
            })
        })
        .map(|jh| jh.join().unwrap())
    })
}

Since I have no little knowledge about work stealing, I use a CAS queue here. The result seems a 1% slower than rayon's implementation, and I have no idea why.

The full program is here:

#![feature(iter_collect_into, gen_blocks)]
type Ty = u32;
#[derive(Debug)]
pub struct UnionFind {
    fa: Vec<Ty>,
}
impl UnionFind {
    pub fn new(fa: Vec<Ty>) -> Self {
        Self { fa }
    }

    pub fn find(&mut self, x: Ty) -> Ty {
        unsafe {
            assert_unchecked((x as usize) < self.fa.len());
        }
        let mut fa = self.fa[x as usize];
        if fa != x {
            // 路径压缩
            fa = self.find(fa);
            unsafe {
                assert_unchecked((x as usize) < self.fa.len());
            }
            self.fa[x as usize] = fa;
        }
        fa
    }

    pub fn merge(&mut self, i: Ty, j: Ty) {
        unsafe {
            assert_unchecked((i as usize) < self.fa.len());
            assert_unchecked((j as usize) < self.fa.len());
        }
        let ifa = self.find(i);
        let jfa = self.find(j);
        unsafe {
            assert_unchecked((ifa as usize) < self.fa.len());
        }
        self.fa[ifa as usize] = jfa;
    }

    pub fn connected(&mut self, i: Ty, j: Ty) -> bool {
        self.find(i) == self.find(j)
    }
}
use rand::prelude::*;
#[derive(Debug)]
pub struct Sample<R: Rng> {
    x: Ty,
    y: Ty,
    connect: UnionFind,
    memory: Vec<Ty>,
    order: Vec<Ty>,
    rng: R,
}
impl<R: Rng + Default> Sample<R> {
    pub fn new(mut x: Ty, mut y: Ty) -> Self {
        use std::iter::{once, repeat_n};
        x = x + 2;
        y = y + 2;
        let memory = repeat_n(0, x as usize)
            .chain((1..(y - 1)).flat_map(|_| {
                once(x * y - 1)
                    .chain(repeat_n(Ty::MAX, y as usize - 2))
                    .chain(once(0))
            }))
            .chain(repeat_n(x * y - 1, x as usize))
            .collect::<Vec<Ty>>();
        Self {
            x,
            y,
            connect: UnionFind::new(memory.clone()),
            memory,
            order: (1..(x - 1))
                .flat_map(|xidx| (1..(y - 1)).map(move |yidx| xidx * x + yidx))
                .collect(),
            rng: Default::default(),
        }
    }
    pub fn reset(&mut self) {
        // self.order.shuffle(&mut self.rng);
        self.connect.fa.copy_from_slice(&self.memory);
    }
    pub fn order_iter<'a>(
        order: &'a mut Vec<Ty>,
        rng: &'a mut R,
    ) -> impl Iterator<Item = (Ty, Ty)> + 'a {
        gen {
            let l = order.len() as Ty;
            for idx in 0..l {
                order.swap(idx as usize, rng.gen_range(idx..l) as usize);
                yield (order[idx as usize], idx);
            }
        }
    }
    pub fn sample(&mut self) -> Ty {
        self.reset();
        for (idx, n) in Self::order_iter(&mut self.order, &mut self.rng) {
            if self.connect.fa[idx as usize] == Ty::MAX {
                self.connect.fa[idx as usize] = idx
            }
            for nidx in [
                idx - self.x - 1,
                idx - self.x,
                idx - self.x + 1,
                idx - 1,
                idx + 1,
                idx + self.x - 1,
                idx + self.x,
                idx + self.x + 1,
            ]
            .into_iter()
            {
                unsafe {
                    assert_unchecked((nidx as usize) < self.connect.fa.len());
                }
                // 这里以connect.fa[x as usize] == Ty::MAX作为某个块可通行的标记
                if self.connect.fa[nidx as usize] < self.connect.fa.len() as Ty {
                    self.connect.merge(nidx, idx);
                }
            }
            if self.connect.connected(1, self.x) {
                return n;
            }
        }
        self.order.len() as Ty
    }
}
use rayon::prelude::*;
fn sampling(x: Ty, y: Ty, chunk: usize, repeat: usize) {
    let vec: Vec<_> = (0..repeat)
        .into_par_iter()
        .map_init(
            || Sample::<ThreadRng>::new(x, y),
            |sample, _| sample.sample(),
        )
        .fold_chunks(
            chunk,
            || vec![0; ((x + 2) * (y + 2) + 1) as usize],
            |mut s, res| {
                if let Some(x) = s.get_mut(res as usize) {
                    *x += 1
                }
                s
            },
        )
        .reduce_with(|mut a, b| {
            a.iter_mut()
                .zip(b.iter().copied())
                .for_each(|(a, b)| *a += b);
            a
        })
        .unwrap();
    println!(
        "{}/{repeat} done",
        vec.into_iter().enumerate().fold(0, |cumsum, (n, item)| {
            if cumsum < repeat {
                println!(
                    "k={n:w2$}, freq = {item:width$}/{repeat}",
                    item = repeat - cumsum,
                    width = repeat.max(1).ilog10() as usize + 1,
                    w2 = ((x + 2) * (y + 2)).ilog10() as usize + 1
                )
            }
            cumsum + item as usize
        })
    );
}
fn sampling_v(x: Ty, y: Ty, chunk: usize, repeat: usize) {
    let vec = puzzle::repeat::<Ty, Vec<Ty>, Sample<ThreadRng>, 16>(
        repeat,
        chunk,
        |_| (vec![0; ((x + 2) * (y + 2) + 1) as usize], Sample::new(x, y)),
        |(mut r, mut s), _| {
            if let Some(x) = r.get_mut(s.sample() as usize) {
                *x += 1
            }
            (r, s)
        },
    )
    .into_iter()
    .fold(Vec::new(), |b, mut a| {
        a.iter_mut()
            .zip(b.iter().copied())
            .for_each(|(a, b)| *a += b);
        a
    });
    println!(
        "{}/{repeat} done",
        vec.into_iter().enumerate().fold(0, |cumsum, (n, item)| {
            if cumsum < repeat {
                println!(
                    "k={n:w2$}, freq = {item:width$}/{repeat}",
                    item = repeat - cumsum,
                    width = repeat.max(1).ilog10() as usize + 1,
                    w2 = ((x + 2) * (y + 2)).ilog10() as usize + 1
                )
            }
            cumsum + item as usize
        })
    );
}
use std::{env, hint::assert_unchecked, time::Instant};
fn main() {
    println!("sample got {}", Sample::<ThreadRng>::new(10, 10).sample());
    dbg!([(); 10].map(|_| {
        let now = Instant::now();
        sampling(
            env::args().nth(1).unwrap_or_default().parse().unwrap_or(10),
            env::args().nth(2).unwrap_or_default().parse().unwrap_or(10),
            env::args().nth(3).unwrap_or_default().parse().unwrap_or(10),
            env::args().nth(4).unwrap_or_default().parse().unwrap_or(10),
        );
        let rayon = now.elapsed();
        let now = Instant::now();
        sampling_v(
            env::args().nth(1).unwrap_or_default().parse().unwrap_or(10),
            env::args().nth(2).unwrap_or_default().parse().unwrap_or(10),
            env::args().nth(3).unwrap_or_default().parse().unwrap_or(10),
            env::args().nth(4).unwrap_or_default().parse().unwrap_or(10),
        );
        let cas = now.elapsed();
        println!("{rayon:?} {cas:?}");
        (rayon, cas)
    }));
}

@cuviper
Copy link
Member

cuviper commented Dec 6, 2024

I haven't read your code in depth, but I do believe it's possible to beat rayon with bespoke threading code. Rayon is operating under very general constraints, whereas custom code can make more assumptions. e.g. A big blocker on Rayon is that work-stealing can make the map_init callback re-entrant, so we can't just use TLS here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants