Skip to content

Commit

Permalink
Optimize single-element last() Gatherer (#44)
Browse files Browse the repository at this point in the history
<img width="1428" alt="Screenshot 2024-10-15 at 13 51 58" src="https://github.com/user-attachments/assets/4b6f0e0c-9f7b-4cbc-9d54-77814cc89ff2">
  • Loading branch information
pivovarit authored Oct 15, 2024
1 parent d4bf153 commit 659881d
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 39 deletions.
119 changes: 81 additions & 38 deletions src/main/java/com/pivovarit/gatherers/LastGatherer.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,56 +6,99 @@
import java.util.function.Supplier;
import java.util.stream.Gatherer;

record LastGatherer<T>(
long n) implements Gatherer<T, LastGatherer.AppendOnlyCircularBuffer<T>, T> {

LastGatherer {
if (n <= 0) {
throw new IllegalArgumentException("number of elements can't be lower than one");
}
final class LastGatherer {
private LastGatherer() {
}

@Override
public Supplier<LastGatherer.AppendOnlyCircularBuffer<T>> initializer() {
return () -> new LastGatherer.AppendOnlyCircularBuffer<>((int) n);
static <T> Gatherer<T, ?, T> size(int n) {
return switch (n) {
case 1 -> new SingleElementLastGatherer<>();
default -> new CircularBufferLastGatherer<>(n);
};
}

@Override
public Integrator<LastGatherer.AppendOnlyCircularBuffer<T>, T, T> integrator() {
return Integrator.ofGreedy((state, element, _) -> {
state.add(element);
return true;
});
}
private record CircularBufferLastGatherer<T>(
long n) implements Gatherer<T, CircularBufferLastGatherer.AppendOnlyCircularBuffer<T>, T> {

@Override
public BiConsumer<LastGatherer.AppendOnlyCircularBuffer<T>, Downstream<? super T>> finisher() {
return (state, downstream) -> state.forEach(downstream::push);
}
CircularBufferLastGatherer {
if (n <= 0) {
throw new IllegalArgumentException("number of elements can't be lower than one");
}
}

static class AppendOnlyCircularBuffer<T> {
private final T[] buffer;
private final int maxSize;
private final AtomicInteger endIdx = new AtomicInteger(0);
private final AtomicInteger size = new AtomicInteger(0);
@Override
public Supplier<AppendOnlyCircularBuffer<T>> initializer() {
return () -> new AppendOnlyCircularBuffer<>((int) n);
}

public AppendOnlyCircularBuffer(int size) {
this.maxSize = size;
this.buffer = (T[]) new Object[size];
@Override
public Integrator<AppendOnlyCircularBuffer<T>, T, T> integrator() {
return Integrator.ofGreedy((state, element, _) -> {
state.add(element);
return true;
});
}

public void add(T element) {
buffer[endIdx.getAndIncrement() % maxSize] = element;
if (size.get() < maxSize) {
size.incrementAndGet();
}
@Override
public BiConsumer<AppendOnlyCircularBuffer<T>, Downstream<? super T>> finisher() {
return (state, downstream) -> state.forEach(downstream::push);
}

public void forEach(Consumer<T> consumer) {
int startIdx = (endIdx.get() - size.get() + maxSize) % maxSize;
for (int i = 0; i < size.get(); i++) {
consumer.accept(buffer[(startIdx + i) % maxSize]);
static class AppendOnlyCircularBuffer<T> {
private final T[] buffer;
private final int maxSize;
private final AtomicInteger endIdx = new AtomicInteger(0);
private final AtomicInteger size = new AtomicInteger(0);

public AppendOnlyCircularBuffer(int size) {
this.maxSize = size;
this.buffer = (T[]) new Object[size];
}

public void add(T element) {
buffer[endIdx.getAndIncrement() % maxSize] = element;
if (size.get() < maxSize) {
size.incrementAndGet();
}
}

public void forEach(Consumer<T> consumer) {
int startIdx = (endIdx.get() - size.get() + maxSize) % maxSize;
for (int i = 0; i < size.get(); i++) {
consumer.accept(buffer[(startIdx + i) % maxSize]);
}
}
}
}

private record SingleElementLastGatherer<T>() implements Gatherer<T, SingleElementLastGatherer.ValueHolder<T>, T> {

@Override
public Supplier<ValueHolder<T>> initializer() {
return ValueHolder::new;
}

@Override
public Integrator<ValueHolder<T>, T, T> integrator() {
return Integrator.ofGreedy((state, element, _) -> {
state.value = element;
state.isSet = true;
return true;
});
}

@Override
public BiConsumer<ValueHolder<T>, Downstream<? super T>> finisher() {
return (state, downstream) -> {
if (state.isSet) {
downstream.push(state.value);
}
};
}

static class ValueHolder<T> {
private T value;
private boolean isSet;
}
}
}
2 changes: 1 addition & 1 deletion src/main/java/com/pivovarit/gatherers/MoreGatherers.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ private MoreGatherers() {
* @return a {@link Gatherer} that collects the last {@code n} elements
*/
public static <T> Gatherer<T, ?, T> last(int n) {
return new LastGatherer<>(n);
return LastGatherer.size(n);
}

/**
Expand Down

0 comments on commit 659881d

Please sign in to comment.