Skip to content

Commit

Permalink
Added some branchless parsing to the mix.
Browse files Browse the repository at this point in the history
  • Loading branch information
royvanrijn committed Jan 2, 2024
1 parent 9bc4789 commit ab1ede3
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 69 deletions.
10 changes: 8 additions & 2 deletions calculate_average_royvanrijn.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,11 @@
#


JAVA_OPTS=""
time java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_royvanrijn
# Added for fun, doesn't seem to be making a difference...
if [ -f "target/calculate_average_royvanrijn.jsa" ]; then
JAVA_OPTS="-XX:SharedArchiveFile=target/calculate_average_royvanrijn.jsa -Xshare:on"
else
# First run, create the archive:
JAVA_OPTS="-XX:ArchiveClassesAtExit=target/calculate_average_royvanrijn.jsa"
fi
java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_royvanrijn
169 changes: 102 additions & 67 deletions src/main/java/dev/morling/onebrc/CalculateAverage_royvanrijn.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,91 +21,101 @@
import java.io.InputStreamReader;
import java.io.RandomAccessFile;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;

/**
* Initial submission:
* Took: 62347
* Best runtimes per version:
*
* Initial submission: 62347 ms
* Chunked reader: 16347 ms
* Optimized parser: 12499 ms
* Branchless methods: 10514 ms
*
* Best performance on MacBook M2 Pro: 21.0.1-graal
* `sdk use java 21.0.1-graal`
*/
public class CalculateAverage_royvanrijn {

private static final String FILE = "./measurements.txt";
private static final int CHUNKS = 16;

// mutable state now instead of records, ugh, less instantiation.
static final class Measurement {
int min, max, count;
long sum;

public Measurement() {
this.min = 10000;
this.max = -10000;
}

private record Measurement(double min, double max, double sum, long count) {
public Measurement(int min, int max, long sum, int count) {
this.min = min;
this.max = max;
this.sum = sum;
this.count = count;
}

Measurement(double initialMeasurement) {
this(initialMeasurement, initialMeasurement, initialMeasurement, 1);
public Measurement updateWith(int measurement) {
min = min(min, measurement);
max = max(max, measurement);
sum += measurement;
count++;
return this;
}

public static Measurement combineWith(Measurement m1, Measurement m2) {
return new Measurement(
Math.min(m1.min, m2.min),
Math.max(m1.max, m2.max),
m1.sum + m2.sum,
m1.count + m2.count
);
public Measurement updateWith(Measurement measurement) {
min = min(min, measurement.min);
max = max(max, measurement.max);
sum += measurement.sum;
count += measurement.count;
return this;
}

public String toString() {
return round(min) + "/" + round(sum / count) + "/" + round(max);
return round(min) + "/" + round((1.0 * sum) / count) + "/" + round(max);
}

private double round(double value) {
return Math.round(value * 10.0) / 10.0;
return Math.round(value) / 10.0;
}
}

public static void main(String[] args) throws Exception {

long before = System.currentTimeMillis();
public static final void main(String[] args) throws Exception {

// Split the file into numberOfChunks, use random access to read and process in parallel.

int numberOfChunks = 42; // Of course.

long fileSize = new File(FILE).length();
long chunkSize = fileSize / numberOfChunks;

List<Future<Map<String, Measurement>>> futures = new ArrayList<>();
final long fileSize = new File(FILE).length();
final long chunkSize = fileSize / CHUNKS;

ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
final var es = Executors.newVirtualThreadPerTaskExecutor();
final var cs = new ExecutorCompletionService<Map<String, Measurement>>(es);

long lastEnd = 0;
for (int i = 0; i < numberOfChunks; i++) {
for (int i = 0; i < CHUNKS; i++) {
// Method findNextLineStart finds the correct point to break;
long start = lastEnd;
long end = (i == numberOfChunks - 1) ? fileSize : findNextLineStart(FILE, (i + 1) * chunkSize);
futures.add(executor.submit(() -> processChunk(FILE, start, end)));
long end = (i == CHUNKS - 1) ? fileSize : findNextLineStart(FILE, (i + 1) * chunkSize);

cs.submit(() -> processChunk(FILE, start, end));
lastEnd = end;
}

// Combine results from all futures, is there a better way to do this?
Map<String, Measurement> combinedResult = new TreeMap<>();
for (Future<Map<String, Measurement>> future : futures) {
final Map<String, Measurement> resultMap = future.get();
for (Map.Entry<String, Measurement> entry : resultMap.entrySet()) {
combinedResult.merge(entry.getKey(), entry.getValue(), Measurement::combineWith);
final Map<String, Measurement> mergedMeasurements = new HashMap<>(700);
for (int i = 0; i < CHUNKS; i++) {
for (var entry : cs.take().get().entrySet()) {
mergedMeasurements.computeIfAbsent(entry.getKey(), (k) -> new Measurement()).updateWith(entry.getValue());
}
}

System.out.print("{");
System.out.print(
combinedResult.entrySet().stream().map(Object::toString).collect(Collectors.joining(", ")));
System.out.println("}");

System.out.println("Took: " + (System.currentTimeMillis() - before));

es.shutdown();
System.out.println("{" + mergedMeasurements.entrySet().stream().sorted(Map.Entry.comparingByKey()).map(Object::toString).collect(Collectors.joining(", ")) + "}");
}

public static Map<String, Measurement> processChunk(String filePath, long start, long end) throws Exception {
private static Map<String, Measurement> processChunk(final String filePath, final long start, final long end) {

try (RandomAccessFile file = new RandomAccessFile(filePath, "r");
FileInputStream fileInputStream = new FileInputStream(file.getFD());
Expand All @@ -117,43 +127,68 @@ public static Map<String, Measurement> processChunk(String filePath, long start,
file.seek(start);
}

long bytesProcessed = 0;
long processedUpTo = start;

Map<String, Measurement> measurements = new HashMap<>();
Map<String, Measurement> measurements = new HashMap<>(700); // kenough room.

String line;
while ((line = bufferedReader.readLine()) != null && (start + bytesProcessed) < end) {
while (processedUpTo < end && (line = bufferedReader.readLine()) != null) {

processedUpTo += line.length() + 1;

// Process the line
processLine(line, measurements);
bytesProcessed += line.getBytes().length + 1;
int pivot = line.indexOf(";");
String key = line.substring(0, pivot);
int measured = branchlessParseInt(line.substring(pivot + 1));

// Tried merge() and compute() but this seems to be fastest
Measurement measurement = measurements.computeIfAbsent(key, (k) -> new Measurement());
measurement.updateWith(measured);
}
return measurements;
}
catch (Exception e) {
throw new RuntimeException(e);
}
}

private static void processLine(final String record, final Map<String, Measurement> measurements) {
// Specialized base-10 parser, skips - and .
private static int branchlessParseInt(final String input) {
// Branchless parser, goes from:
// "-1.2" to -12
// "40.1" to 401
// etc.

// Magic.
int off = ~(input.charAt(0) >> 4) & 1;
int neg = 1 - off - off;
int has4 = ((input.length() - off) >> 2) & 1;
return neg * ((input.charAt(off) - '0') * 100 * has4 + (input.charAt(off + has4) - '0') * 10 + (input.charAt(2 + off + has4) - '0'));
}

int pivot = record.indexOf(";");
String key = record.substring(0, pivot);
double measured = Double.parseDouble(record.substring(pivot + 1));
// branchless max (unprecise for large numbers, but good enough)
static int max(final int a, final int b) {
final int diff = a - b;
final int dsgn = diff >> 31;
return a - (diff & dsgn);
}

measurements.compute(key, (k, v) -> {
Measurement measurement = new Measurement(measured);
if (v == null) {
return measurement;
}
return Measurement.combineWith(measurement, v);
});
// branchless min (unprecise for large numbers, but good enough)
static int min(final int a, final int b) {
final int diff = a - b;
final int dsgn = diff >> 31;
return b + (diff & dsgn);
}

// Find the next valid start position:
public static long findNextLineStart(String filePath, long requestedStart) throws Exception {
public static long findNextLineStart(final String filePath, final long requestedStart) throws Exception {
try (RandomAccessFile file = new RandomAccessFile(filePath, "r")) {
file.seek(requestedStart);
if (requestedStart != 0) {
// Skip to the next line if not at the start of the file
while (file.read() != '\n')
;
int r;
while ((r = file.read()) != '\n' && r != -1) {
// Skip to the next line if not at the start of the file
}
}
return file.getFilePointer();
}
Expand Down

0 comments on commit ab1ede3

Please sign in to comment.