From 22280f47bf755e4b625fc7c810b7c96f753796a2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristo=20Kuusk=C3=BCll?= Date: Mon, 14 Oct 2024 10:31:38 +0300 Subject: [PATCH] Adding thread utils, base91 and largebitmap support. (#49) Added ThreadUtils, Base91 and LargeBitmap. --- .github/workflows/build.yaml | 56 ++-- CHANGELOG.md | 16 +- build.common.gradle | 30 +- build.gradle | 15 +- build.libraries.gradle | 10 +- gradle.properties | 2 +- gradle/wrapper/gradle-wrapper.properties | 2 +- tw-base-utils/build.gradle | 4 +- .../common/baseutils/bitmap/LargeBitmap.java | 30 ++ .../baseutils/bitmap/LargeBitmapImpl.java | 302 ++++++++++++++++++ .../bitmap/LargeBitmapSerializer.java | 37 +++ .../bitmap/LargeBitmapSerializerImpl.java | 230 +++++++++++++ .../common/baseutils/encoding/Base91.java | 166 ++++++++++ .../jackson/DefaultJsonConverter.java | 1 - .../common/baseutils/threads/ThreadInfo.java | 20 ++ .../common/baseutils/threads/ThreadUtils.java | 112 +++++++ .../baseutils/clock/TestClockSpec.groovy | 18 -- .../DiscardingQueueProcessorSpec.groovy | 151 --------- .../bitmap/LargeBitmapSerializerTest.java | 156 +++++++++ .../baseutils/bitmap/LargeBitmapTest.java | 270 ++++++++++++++++ .../DiscardingQueueProcessorTest.java | 141 ++++++++ .../SimpleScheduledTaskExecutorTest.java | 3 +- .../common/baseutils/encoding/Base91Test.java | 157 +++++++++ .../baseutils/jackson/JsonConverterTest.java | 1 + .../baseutils/jdbc/MockConnectionProxy.java | 2 + .../baseutils/threads/ThreadUtilsTest.java | 67 ++++ 26 files changed, 1755 insertions(+), 244 deletions(-) create mode 100644 tw-base-utils/src/main/java/com/transferwise/common/baseutils/bitmap/LargeBitmap.java create mode 100644 tw-base-utils/src/main/java/com/transferwise/common/baseutils/bitmap/LargeBitmapImpl.java create mode 100644 tw-base-utils/src/main/java/com/transferwise/common/baseutils/bitmap/LargeBitmapSerializer.java create mode 100644 tw-base-utils/src/main/java/com/transferwise/common/baseutils/bitmap/LargeBitmapSerializerImpl.java create mode 100644 tw-base-utils/src/main/java/com/transferwise/common/baseutils/encoding/Base91.java create mode 100644 tw-base-utils/src/main/java/com/transferwise/common/baseutils/threads/ThreadInfo.java create mode 100644 tw-base-utils/src/main/java/com/transferwise/common/baseutils/threads/ThreadUtils.java delete mode 100644 tw-base-utils/src/test/groovy/com/transferwise/common/baseutils/clock/TestClockSpec.groovy delete mode 100644 tw-base-utils/src/test/groovy/com/transferwise/common/baseutils/concurrency/DiscardingQueueProcessorSpec.groovy create mode 100644 tw-base-utils/src/test/java/com/transferwise/common/baseutils/bitmap/LargeBitmapSerializerTest.java create mode 100644 tw-base-utils/src/test/java/com/transferwise/common/baseutils/bitmap/LargeBitmapTest.java create mode 100644 tw-base-utils/src/test/java/com/transferwise/common/baseutils/concurrency/DiscardingQueueProcessorTest.java create mode 100644 tw-base-utils/src/test/java/com/transferwise/common/baseutils/encoding/Base91Test.java create mode 100644 tw-base-utils/src/test/java/com/transferwise/common/baseutils/threads/ThreadUtilsTest.java diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index b3ae82a..fa63d75 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -8,6 +8,13 @@ on: branches: - master +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: ${{ github.ref != 'refs/heads/master' }} + +env: + GRADLE_OPTS: -Dorg.gradle.console=plain -Djava.security.egd=file:/dev/./urandom + jobs: matrix_build: name: "Matrix Build" @@ -26,50 +33,35 @@ jobs: SPRING_BOOT_VERSION: ${{ matrix.spring_boot_version }} steps: - name: Checkout repository - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Setup Gradle - uses: gradle/gradle-build-action@v2 - with: - gradle-home-cache-cleanup: true - # Comment out when you are upgrading gradle in a branch and doing tons of commits you would need to test. - # cache-read-only: false + uses: gradle/actions/setup-gradle@v4 - name: "Assemble jar" - uses: gradle/gradle-build-action@v2 - with: - arguments: assemble --console=plain --info --stacktrace --parallel + run: ./gradlew assemble --console=plain --info --stacktrace --parallel - name: "Compile tests" - uses: gradle/gradle-build-action@v2 - with: - arguments: compileTestJava compileTestGroovy --console=plain --info --stacktrace --parallel + run: ./gradlew compileTestJava --console=plain --info --stacktrace --parallel - name: "Run checks" - uses: gradle/gradle-build-action@v2 - with: - arguments: check -x test --console=plain --stacktrace + run: ./gradlew check -x test --console=plain --stacktrace - name: "Run tests" - uses: gradle/gradle-build-action@v2 - with: - arguments: -Dspring.profiles.include=continuous-integration test --console=plain --info --stacktrace + run: ./gradlew -Dspring.profiles.include=continuous-integration test --console=plain --info --stacktrace - name: "Test if publishing works" - uses: gradle/gradle-build-action@v2 - with: - arguments: publishToMavenLocal --console=plain --info --stacktrace + run: ./gradlew publishToMavenLocal --console=plain --info --stacktrace - name: "Publish Test Report" - uses: mikepenz/action-junit-report@v3 + uses: mikepenz/action-junit-report@v4 if: always() with: check_name: Test Report-(${{ matrix.spring_boot_version }}) report_paths: '**/build/test-results/**/*.xml' - github_token: ${{ secrets.GITHUB_TOKEN }} require_tests: true - name: Publish checkstyle report if: always() - uses: jwgmeligmeyling/checkstyle-github-action@master + uses: lcollins/checkstyle-github-action@v3.1.0 with: name: Checkstyle Report-(${{ matrix.spring_boot_version }}) path: '**/build/reports/**/*.xml' - name: Publish spotbugs report if: always() - uses: jwgmeligmeyling/spotbugs-github-action@master + uses: lcollins/spotbugs-github-action@v3.1.0 with: name: Spotbugs Report-(${{ matrix.spring_boot_version }}) path: '**/build/reports/**/*.xml' @@ -78,7 +70,7 @@ jobs: tar -zcvf all-test-reports-${{ matrix.spring_boot_version }}.tar.gz **/build/reports if: always() - name: "Store test results" - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 if: always() with: name: all-test-reports-${{ matrix.spring_boot_version }} @@ -98,15 +90,9 @@ jobs: if: ${{ needs.matrix_build.result != 'success' }} run: exit 1 - name: Checkout repository - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Setup Gradle - uses: gradle/gradle-build-action@v2 - with: - gradle-home-cache-cleanup: true - # Comment out when you are upgrading gradle in a branch and doing tons of commits you would need to test. - # cache-read-only: false + uses: gradle/actions/setup-gradle@v4 - name: "Tag release" if: github.ref == 'refs/heads/master' - uses: gradle/gradle-build-action@v2 - with: - arguments: tagRelease --console=plain --info --stacktrace + run: ./gradlew tagRelease --console=plain --info --stacktrace diff --git a/CHANGELOG.md b/CHANGELOG.md index 08634e0..6af7b9d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,13 +5,24 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [1.13.0] - 2024-10-11 + +### Added + +* Base91 encoder. +* RoaringBitmap based LargeBitmap with better 64 bits support. +* ThreadUtils to take safe-point free thread dumps. Those thread dumps will be inconsistent, but still suitable in many cases. + +### Changed + +* Java 17+ is now required. + ## [1.12.5] - 2024-07-16 ### Changed * Added support for Spring Boot 3.3. - ## [1.12.4] - 2024-02-22 ### Changed @@ -64,7 +75,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added -* Ported `JavaTimeModuleFactory` from old common lib so that the lib can be removed in the future. It provides consistent millisecond serialisation of `Instant` and `ZonedDateTime` objects. +* Ported `JavaTimeModuleFactory` from old common lib so that the lib can be removed in the future. It provides consistent millisecond serialisation of + `Instant` and `ZonedDateTime` objects. ## [1.10.2] - 2023-07-28 diff --git a/build.common.gradle b/build.common.gradle index 8456799..2fb70f8 100644 --- a/build.common.gradle +++ b/build.common.gradle @@ -12,7 +12,6 @@ apply plugin: "java-library" apply plugin: "checkstyle" apply plugin: "idea" apply plugin: "com.github.spotbugs" -apply plugin: "groovy" apply from: "$rootProject.rootDir/build.libraries.gradle" @@ -57,14 +56,8 @@ dependencies { } java { - if (springBootVersion.startsWith("3.")) { - sourceCompatibility = JavaVersion.VERSION_17 - targetCompatibility = JavaVersion.VERSION_17 - } - else{ - sourceCompatibility = JavaVersion.VERSION_11 - targetCompatibility = JavaVersion.VERSION_11 - } + sourceCompatibility = JavaVersion.VERSION_17 + targetCompatibility = JavaVersion.VERSION_17 withSourcesJar() withJavadocJar() } @@ -89,20 +82,13 @@ compileJava { } compileTestJava { - sourceCompatibility = JavaVersion.VERSION_17 - targetCompatibility = JavaVersion.VERSION_17 - javaCompiler = javaToolchains.compilerFor { - languageVersion = JavaLanguageVersion.of(17) - } - options.fork = true options.forkOptions.jvmArgs << '-Xmx256m' } test { - javaLauncher = javaToolchains.launcherFor { - languageVersion = JavaLanguageVersion.of(17) - } + // To test ThreadUtils + // jvmArgs "-Xlog:safepoint=info" useJUnitPlatform() @@ -116,8 +102,6 @@ test { } tasks.findAll { it.name.startsWith("spotbugs") }*.configure { - effort = "max" - excludeFilter = file("$rootProject.rootDir/spotbugs-exclude.xml") reports { @@ -126,11 +110,7 @@ tasks.findAll { it.name.startsWith("spotbugs") }*.configure { } } -spotbugs { - spotbugsTest.enabled = false -} - -tasks.withType(Checkstyle) { +tasks.withType(Checkstyle).configureEach { config = resources.text.fromFile(file("$rootProject.rootDir/google_checks.xml")) // Deprecated, checkstyle does not like the namings. diff --git a/build.gradle b/build.gradle index 8aabf17..e44ed13 100644 --- a/build.gradle +++ b/build.gradle @@ -1,10 +1,12 @@ import org.eclipse.jgit.api.errors.RefAlreadyExistsException +import com.github.spotbugs.snom.Confidence +import com.github.spotbugs.snom.Effort plugins { - id "com.github.spotbugs" version "5.0.14" + id "com.github.spotbugs" version "6+" id "idea" - id 'org.ajoberstar.grgit' version '5.2.0' - id 'io.github.gradle-nexus.publish-plugin' version "1.1.0" + id 'org.ajoberstar.grgit' version '5.3.0' + id 'io.github.gradle-nexus.publish-plugin' version "2.0.0" } idea.project { @@ -19,7 +21,7 @@ ext.projectArtifactName = "tw-base-utils" apply from: 'build.common.gradle' -task tagRelease { +tasks.register('tagRelease') { doLast { try { grgit.tag.add { @@ -43,4 +45,9 @@ nexusPublishing { password = System.getenv("SONATYPE_PASSWORD") } } +} + +spotbugs { + effort = Effort.valueOf('MAX') + reportLevel = Confidence.valueOf('DEFAULT') } \ No newline at end of file diff --git a/build.libraries.gradle b/build.libraries.gradle index 81c0a2d..d46bb40 100644 --- a/build.libraries.gradle +++ b/build.libraries.gradle @@ -1,17 +1,19 @@ ext { - springBootVersion = "${System.getenv("SPRING_BOOT_VERSION") ?: "2.7.18"}" + springBootVersion = "${System.getenv("SPRING_BOOT_VERSION") ?: "3.2.2"}" libraries = [ // version defined - awaitility : "org.awaitility:awaitility:4.2.0", - guava : 'com.google.guava:guava:33.0.0-jre', + awaitility : "org.awaitility:awaitility:4.2.2", + commonsIo : "commons-io:commons-io:2.17.0", + guava : 'com.google.guava:guava:33.3.1-jre', jakartaValidationApi : 'jakarta.validation:jakarta.validation-api:3.0.2', javaxValidationApi : "javax.validation:validation-api:2.0.1.Final", - spockCore : "org.spockframework:spock-core:2.3-groovy-4.0", + roaringBitmap : 'org.roaringbitmap:RoaringBitmap:1.3.0', springBootDependencies: "org.springframework.boot:spring-boot-dependencies:${springBootVersion}", // versions managed by spring-boot-dependencies platform commonsLang3 : "org.apache.commons:commons-lang3", + junitJupiter : "org.junit.jupiter:junit-jupiter", logbackClassic : "ch.qos.logback:logback-classic", lombok : "org.projectlombok:lombok", diff --git a/gradle.properties b/gradle.properties index 950bc9f..ef52e60 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1 +1 @@ -version=1.12.5 \ No newline at end of file +version=1.13.0 \ No newline at end of file diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index ac72c34..0aaefbc 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,6 +1,6 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-8.3-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-8.10.1-bin.zip networkTimeout=10000 validateDistributionUrl=true zipStoreBase=GRADLE_USER_HOME diff --git a/tw-base-utils/build.gradle b/tw-base-utils/build.gradle index 08a0685..63402b4 100644 --- a/tw-base-utils/build.gradle +++ b/tw-base-utils/build.gradle @@ -12,15 +12,17 @@ dependencies { compileOnly libraries.javaxValidationApi implementation libraries.commonsLang3 + implementation libraries.commonsIo implementation libraries.slf4jApi implementation libraries.jacksonDatabind implementation libraries.jacksonJsr310 implementation libraries.jacksonJdk8 implementation libraries.guava + implementation libraries.roaringBitmap + implementation libraries.spotbugsAnnotations testImplementation libraries.awaitility testImplementation libraries.guava testImplementation libraries.junitJupiter testImplementation libraries.micrometerCore - testImplementation libraries.spockCore } diff --git a/tw-base-utils/src/main/java/com/transferwise/common/baseutils/bitmap/LargeBitmap.java b/tw-base-utils/src/main/java/com/transferwise/common/baseutils/bitmap/LargeBitmap.java new file mode 100644 index 0000000..9fcbcaf --- /dev/null +++ b/tw-base-utils/src/main/java/com/transferwise/common/baseutils/bitmap/LargeBitmap.java @@ -0,0 +1,30 @@ +package com.transferwise.common.baseutils.bitmap; + +public interface LargeBitmap { + + boolean isEmpty(); + + void set(long bit); + + void set(long startBit, long endBit); + + boolean checkedSet(long bit); + + void clear(long startBit, long endBit); + + void clear(long bit); + + void clear(); + + boolean checkedClear(long bit); + + boolean isSet(long bit); + + long getFirstClearBit(long offset); + + long getFirstSetBit(long offset); + + long getFirstSetBit(); + + LargeBitmap copy(); +} diff --git a/tw-base-utils/src/main/java/com/transferwise/common/baseutils/bitmap/LargeBitmapImpl.java b/tw-base-utils/src/main/java/com/transferwise/common/baseutils/bitmap/LargeBitmapImpl.java new file mode 100644 index 0000000..ede00de --- /dev/null +++ b/tw-base-utils/src/main/java/com/transferwise/common/baseutils/bitmap/LargeBitmapImpl.java @@ -0,0 +1,302 @@ +package com.transferwise.common.baseutils.bitmap; + +import com.google.common.base.Preconditions; +import java.util.HashMap; +import java.util.Map; +import java.util.TreeSet; +import javax.annotation.concurrent.NotThreadSafe; +import org.roaringbitmap.RoaringBitmap; + +/** + * The reason with this class is to overcome the Integer size limitation of RoaringBitmap. + */ +@NotThreadSafe +public class LargeBitmapImpl implements LargeBitmap { + + private static final long MAX_REL_BIT = 0xFFFFFFFFL; + + private final TreeSet indexes = new TreeSet<>(); + private final Map bitmaps = new HashMap<>(); + + @Override + public boolean isEmpty() { + return indexes.isEmpty(); + } + + @Override + public void set(long bit) { + Preconditions.checkArgument(bit >= 0); + + var bitmapIdx = getBitmapIndex(bit); + var bitmap = getOrCreateBitmap(bitmapIdx); + + int relBit = getBitInBitmap(bit); + bitmap.add(relBit); + } + + @Override + public void set(long startBit, long endBit) { + Preconditions.checkArgument(startBit >= 0 && endBit >= 0 && endBit >= startBit); + + var bitmapIdx = getBitmapIndex(startBit); + var endBitmapIdx = getBitmapIndex(endBit); + var bitmap = getOrCreateBitmap(bitmapIdx); + + long startRelBit = getBitInBitmapAsLong(startBit); + + while (endBitmapIdx > bitmapIdx) { + bitmap.add(startRelBit, MAX_REL_BIT + 1); + bitmapIdx++; + bitmap = getOrCreateBitmap(bitmapIdx); + startRelBit = 0; + } + + long endRelBit = getBitInBitmapAsLong(endBit); + bitmap.add(startRelBit, endRelBit + 1); + } + + @Override + public boolean checkedSet(long bit) { + Preconditions.checkArgument(bit >= 0); + + var bitmapIdx = getBitmapIndex(bit); + var bitmap = getOrCreateBitmap(bitmapIdx); + + int relBit = getBitInBitmap(bit); + return bitmap.checkedAdd(relBit); + } + + @Override + public void clear(long startBit, long endBit) { + Preconditions.checkArgument(startBit >= 0 && endBit >= 0 && endBit >= startBit); + + var startBitmapIdx = getBitmapIndex(startBit); + var firstBitmapIdx = indexes.first(); + var endBitmapIdx = getBitmapIndex(endBit); + + var bitmapIdx = startBitmapIdx; + if (firstBitmapIdx > bitmapIdx) { + bitmapIdx = firstBitmapIdx; + } + + while (bitmapIdx != -1 && endBitmapIdx >= bitmapIdx) { + if (bitmapIdx > startBitmapIdx && bitmapIdx < endBitmapIdx) { + removeBitmap(bitmapIdx); + } else { + long startRelBit; + long endRelBit; + if (bitmapIdx == startBitmapIdx) { + startRelBit = getBitInBitmapAsLong(startBit); + } else { + startRelBit = 0; + } + + if (bitmapIdx == endBitmapIdx) { + endRelBit = getBitInBitmapAsLong(endBit); + } else { + endRelBit = MAX_REL_BIT; + } + + var bitmap = bitmaps.get(bitmapIdx); + bitmap.remove(startRelBit, endRelBit + 1); + if (bitmap.isEmpty()) { + removeBitmap(bitmapIdx); + } + } + + var nextIdx = indexes.higher(bitmapIdx); + bitmapIdx = nextIdx == null ? -1 : nextIdx; + } + } + + @Override + public void clear(long bit) { + Preconditions.checkArgument(bit >= 0); + + var bitmapIdx = getBitmapIndex(bit); + var bitmap = bitmaps.get(bitmapIdx); + + if (bitmap != null) { + int relBit = getBitInBitmap(bit); + bitmap.remove(relBit); + + if (bitmap.isEmpty()) { + removeBitmap(bitmapIdx); + } + } + } + + @Override + public void clear() { + indexes.clear(); + bitmaps.clear(); + } + + @Override + public boolean checkedClear(long bit) { + Preconditions.checkArgument(bit >= 0); + + var bitmapIdx = getBitmapIndex(bit); + var bitmap = bitmaps.get(bitmapIdx); + + if (bitmap == null) { + return false; + } + + int relBit = getBitInBitmap(bit); + boolean removed = bitmap.checkedRemove(relBit); + + if (bitmap.isEmpty()) { + removeBitmap(bitmapIdx); + } + + return removed; + } + + @Override + public boolean isSet(long bit) { + Preconditions.checkArgument(bit >= 0); + + var bitmapIdx = getBitmapIndex(bit); + var bitmap = bitmaps.get(bitmapIdx); + if (bitmap == null) { + return false; + } + + int relBit = getBitInBitmap(bit); + return bitmap.contains(relBit); + } + + @Override + public long getFirstClearBit(long offset) { + Preconditions.checkArgument(offset >= 0); + + if (isEmpty()) { + return offset; + } + + var bitmapIdxAtOffset = getBitmapIndex(offset); + + var bitmapIdx = indexes.higher(bitmapIdxAtOffset - 1); + if (bitmapIdx == null) { + return offset; + } + + var prevBitmapIdx = bitmapIdxAtOffset - 1; + + var relBit = bitmapIdx == bitmapIdxAtOffset ? getBitInBitmap(offset) : 0; + + while (true) { + if (bitmapIdx == null || bitmapIdx > prevBitmapIdx + 1) { + return toAbsoluteBit(prevBitmapIdx + 1, 0); + } + + var absentValue = bitmaps.get(bitmapIdx).nextAbsentValue(relBit); + if (absentValue != -1L) { + return toAbsoluteBit(bitmapIdx, absentValue); + } + + prevBitmapIdx = bitmapIdx; + bitmapIdx = indexes.higher(bitmapIdx); + relBit = 0; + } + } + + @Override + public long getFirstSetBit(long offset) { + Preconditions.checkArgument(offset >= 0); + + if (indexes.isEmpty()) { + return -1L; + } + + int offsetBitmapIdx = getBitmapIndex(offset); + + var bitmapIdx = indexes.higher(offsetBitmapIdx - 1); + + int relBit = 0; + + // This does perform as O(N), when we have consecutive bitmaps all filled with bits. + while (true) { + if (bitmapIdx == null) { + return -1L; + } + if (bitmapIdx == offsetBitmapIdx) { + relBit = getBitInBitmap(offset); + } + + long nextBit = bitmaps.get(bitmapIdx).nextValue(relBit); + if (nextBit == -1L) { + bitmapIdx = indexes.higher(bitmapIdx); + relBit = 0; + } else { + return toAbsoluteBit(bitmapIdx, nextBit); + } + } + } + + @Override + public long getFirstSetBit() { + return getFirstSetBit(0); + } + + @Override + public LargeBitmapImpl copy() { + var clone = new LargeBitmapImpl(); + + clone.indexes.addAll(indexes); + + for (var entry : bitmaps.entrySet()) { + clone.bitmaps.put(entry.getKey(), entry.getValue().clone()); + } + + return clone; + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof LargeBitmapImpl other) { + return indexes.equals(other.indexes) && bitmaps.equals(other.bitmaps); + } + return false; + } + + @Override + public int hashCode() { + int result = 17; + result = 31 * result + indexes.hashCode(); + result = 31 * result + bitmaps.hashCode(); + return result; + } + + protected void removeBitmap(int idx) { + indexes.remove(idx); + bitmaps.remove(idx); + } + + protected RoaringBitmap getOrCreateBitmap(int idx) { + var bitmap = bitmaps.get(idx); + if (bitmap == null) { + bitmaps.put(idx, bitmap = RoaringBitmap.bitmapOf()); + indexes.add(idx); + } + + return bitmap; + } + + protected int getBitmapIndex(long bit) { + return (int) (bit >> 32); + } + + protected int getBitInBitmap(long bit) { + return (int) bit; + } + + protected long getBitInBitmapAsLong(long bit) { + return bit & 0xFFFFFFFFL; + } + + protected long toAbsoluteBit(int bitmapIdx, long relativeBit) { + return ((long) bitmapIdx << 32) + relativeBit; + } +} diff --git a/tw-base-utils/src/main/java/com/transferwise/common/baseutils/bitmap/LargeBitmapSerializer.java b/tw-base-utils/src/main/java/com/transferwise/common/baseutils/bitmap/LargeBitmapSerializer.java new file mode 100644 index 0000000..719455a --- /dev/null +++ b/tw-base-utils/src/main/java/com/transferwise/common/baseutils/bitmap/LargeBitmapSerializer.java @@ -0,0 +1,37 @@ +package com.transferwise.common.baseutils.bitmap; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +public interface LargeBitmapSerializer { + + SerializationResult serialize(LargeBitmap bitmap, OutputStream os) throws IOException; + + DeserializationResult deserializeInto(LargeBitmap bitmap, InputStream in) throws IOException; + + interface SerializationResult { + + SerializationStats getStats(); + + interface SerializationStats { + + long getSerializedBytesCount(); + + long getBitsCount(); + } + } + + interface DeserializationResult { + + DeserializationStats getStats(); + + interface DeserializationStats { + + long getDeserializedBytesCount(); + + long getBitsCount(); + } + } + +} diff --git a/tw-base-utils/src/main/java/com/transferwise/common/baseutils/bitmap/LargeBitmapSerializerImpl.java b/tw-base-utils/src/main/java/com/transferwise/common/baseutils/bitmap/LargeBitmapSerializerImpl.java new file mode 100644 index 0000000..03ddda7 --- /dev/null +++ b/tw-base-utils/src/main/java/com/transferwise/common/baseutils/bitmap/LargeBitmapSerializerImpl.java @@ -0,0 +1,230 @@ +package com.transferwise.common.baseutils.bitmap; + +import com.google.common.primitives.Longs; +import com.transferwise.common.baseutils.bitmap.LargeBitmapSerializerImpl.DeserializationResultImpl.DeserializationStatsImpl; +import com.transferwise.common.baseutils.bitmap.LargeBitmapSerializerImpl.SerializationResultImpl.SerializationStatsImpl; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import javax.annotation.concurrent.ThreadSafe; +import lombok.Data; +import lombok.RequiredArgsConstructor; + +@ThreadSafe +public class LargeBitmapSerializerImpl implements LargeBitmapSerializer { + + // Bitset container is a long, but we leave the last bit to mark if the bitset continues or ends. + private static final int BITS_PER_SET = 63; + private static final int BITS_PER_TWO_SETS = 63 * 2; + + @Override + public SerializationResult serialize(LargeBitmap bitmap, OutputStream os) throws IOException { + var stats = new SerializationStatsImpl(); + var result = new SerializationResultImpl(stats); + + if (bitmap == null || bitmap.isEmpty()) { + return result; + } + + long offset = bitmap.getFirstSetBit(0); + + // We will deduct startOffset from every single offset with the hope that it will allow better compression later. + final long startOffset = offset; + + writeLong(os, stats, startOffset); + + while (offset != -1) { + var clearIdx = bitmap.getFirstClearBit(offset); + if (clearIdx - offset > BITS_PER_SET) { + // range + writeRangeOffset(os, stats, startOffset, offset); + + var rangeLength = clearIdx - offset; + writeLong(os, stats, rangeLength); + stats.bitsCount += rangeLength; + + offset = bitmap.getFirstSetBit(clearIdx); + } else { + // bitset + writeBitSetOffset(os, stats, startOffset, offset); + stats.bitsCount++; + + var bitset = 0L; + var idx = offset; + + while (true) { + idx = bitmap.getFirstSetBit(idx + 1); + if (idx == -1 || idx - offset > BITS_PER_TWO_SETS) { + writeLong(os, stats, bitset); + offset = idx; + break; + } else if (idx - offset > BITS_PER_SET) { + // Last set bit indicates a continuation of bitsets chain. + bitset = setLastBit(bitset); + writeLong(os, stats, bitset); + + offset = offset + BITS_PER_SET; + bitset = setBit(0L, (int) (idx - offset - 1)); + stats.bitsCount++; + } else { + bitset = setBit(bitset, (int) (idx - offset - 1)); + stats.bitsCount++; + } + } + } + } + + os.flush(); + return result; + } + + @Override + public DeserializationResult deserializeInto(LargeBitmap bitmap, InputStream in) + throws IOException { + var reusableLongBuffer = new byte[8]; + var stats = new DeserializationStatsImpl(); + var result = new DeserializationResultImpl(stats); + + long startOffset = readLong(in, stats, reusableLongBuffer); + + while (true) { + var offset = readLong(in, stats, reusableLongBuffer); + + if (offset == -1) { + break; + } + + boolean isRange = isLastBitSet(offset); + var absOffset = startOffset + (offset >> 1); + + if (isRange) { + var length = readLong(in, stats, reusableLongBuffer); + bitmap.set(absOffset, absOffset + length - 1); + stats.bitsCount += length; + } else { + bitmap.set(absOffset); + stats.bitsCount++; + + while (true) { + var bitset = readLong(in, stats, reusableLongBuffer); + for (int i = 0; i < 63; i++) { + if (isBitSet(bitset, i)) { + bitmap.set(absOffset + 1 + i); + stats.bitsCount++; + } + } + if (!isLastBitSet(bitset)) { + break; + } + absOffset = absOffset + 63; + } + } + } + return result; + } + + protected void writeRangeOffset(OutputStream os, SerializationStatsImpl stats, long startOffset, + long offset) + throws IOException { + var rangeOffset = offset - startOffset; + + // Shift left and set last bit to indicate a range. + rangeOffset = rangeOffset << 1; + rangeOffset = setLastBit(rangeOffset); + + writeLong(os, stats, rangeOffset); + } + + protected void writeBitSetOffset(OutputStream os, SerializationStatsImpl stats, long startOffset, + long offset) + throws IOException { + var bitSetOffset = offset - startOffset; + + // Shift left and clear last bit to indicate a bitset + bitSetOffset = bitSetOffset << 1; + + writeLong(os, stats, bitSetOffset); + } + + protected long readLong(InputStream in, DeserializationStatsImpl stats, byte[] buffer) + throws IOException { + + int cnt = 0; + while (cnt < 8) { + // Some input-streams like the compression ones, can give you only part of the buffer, but it does not mark the end of stream. + int len = in.read(buffer, cnt, 8 - cnt); + if (len == -1) { + if (cnt != 0) { + throw new IOException( + "Unexpected stream content, got only " + cnt + " / 8 bytes at position " + stats.deserializedBytesCount + "."); + } + return -1; + } else { + cnt += len; + } + } + + stats.deserializedBytesCount += cnt; + + return Longs.fromByteArray(buffer); + } + + protected void writeLong(OutputStream out, SerializationStatsImpl stats, long value) + throws IOException { + out.write((byte) (value >>> 56)); + out.write((byte) (value >>> 48)); + out.write((byte) (value >>> 40)); + out.write((byte) (value >>> 32)); + out.write((byte) (value >>> 24)); + out.write((byte) (value >>> 16)); + out.write((byte) (value >>> 8)); + out.write((byte) (value)); + + stats.serializedBytesCount += 8; + } + + protected boolean isBitSet(long value, int n) { + return (value & (1L << (63 - n))) != 0; + } + + protected long setBit(long value, int n) { + return value | (1L << (63 - n)); + } + + protected boolean isLastBitSet(long value) { + long mask = 0x1L; + return (value & mask) != 0; + } + + protected long setLastBit(long value) { + return value | 0x1L; + } + + @Data + @RequiredArgsConstructor + protected static class SerializationResultImpl implements SerializationResult { + + private final SerializationStats stats; + + @Data + protected static class SerializationStatsImpl implements SerializationStats { + + private long serializedBytesCount; + private long bitsCount; + } + } + + @Data + @RequiredArgsConstructor + protected static class DeserializationResultImpl implements DeserializationResult { + + private final DeserializationStats stats; + + @Data + protected static class DeserializationStatsImpl implements DeserializationStats { + + private long deserializedBytesCount; + private long bitsCount; + } + } +} diff --git a/tw-base-utils/src/main/java/com/transferwise/common/baseutils/encoding/Base91.java b/tw-base-utils/src/main/java/com/transferwise/common/baseutils/encoding/Base91.java new file mode 100644 index 0000000..76345a4 --- /dev/null +++ b/tw-base-utils/src/main/java/com/transferwise/common/baseutils/encoding/Base91.java @@ -0,0 +1,166 @@ +package com.transferwise.common.baseutils.encoding; + +import java.io.FilterInputStream; +import java.io.FilterOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Arrays; +import javax.annotation.concurrent.NotThreadSafe; +import javax.annotation.concurrent.ThreadSafe; +import lombok.NonNull; + +@ThreadSafe +public class Base91 { + + private static final int BASE = 91; + + private static final char[] toBase91 = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789!#$%&()*+,./:;<=>?@[]^_`{|}~\"".toCharArray(); + + private static final int[] fromBase91 = new int[256]; + + static { + Arrays.fill(fromBase91, -1); + for (int i = 0; i < toBase91.length; i++) { + fromBase91[toBase91[i]] = i; + } + } + + @NotThreadSafe + private static class EncOutputStream extends FilterOutputStream { + + private int bitsExtracted; + private int extractedValue; + private boolean closed; + + private EncOutputStream(OutputStream out) { + super(out); + } + + @Override + public void write(int b) throws IOException { + extractedValue |= (b & 255) << bitsExtracted; + bitsExtracted += 8; + if (bitsExtracted > 13) { + int encodedValue = extractedValue & 0x1FFF; // extract 13 bits + if (encodedValue > 88) { + extractedValue >>= 13; + bitsExtracted -= 13; + } else { + encodedValue = extractedValue & 0x3FFF; // extract 14 bits + extractedValue >>= 14; + bitsExtracted -= 14; + } + + out.write(toBase91[encodedValue % BASE]); + out.write(toBase91[encodedValue / BASE]); + } + } + + @Override + public void write(byte[] data, int offset, int length) throws IOException { + for (int i = offset; i < length; ++i) { + write(data[i]); + } + } + + @Override + public void close() throws IOException { + if (closed) { + return; + } + + if (bitsExtracted > 0) { + out.write(toBase91[extractedValue % BASE]); + if (bitsExtracted > 7 || extractedValue >= BASE) { + out.write(toBase91[extractedValue / BASE]); + } + } + + closed = true; + + super.close(); + } + } + + public OutputStream wrap(@NonNull OutputStream os) { + return new EncOutputStream(os); + } + + public InputStream wrap(@NonNull InputStream in) { + return new DecInputStream(in); + } + + @NotThreadSafe + private static class DecInputStream extends FilterInputStream { + + private int extractedValue = 0; + private int extractedBits = 0; + private int decodedValue = -1; + + private final byte[] sbBuf = new byte[1]; + + private boolean doWriteLoop = false; + + private DecInputStream(InputStream in) { + super(in); + } + + @Override + public int read() throws IOException { + return read(sbBuf, 0, 1) == -1 ? -1 : sbBuf[0] & 0xff; + } + + + @Override + public int read(byte[] bytes, int off, int len) throws IOException { + int pos = 0; + + while (true) { + if (doWriteLoop) { + do { + if (pos >= len) { + return pos; + } + bytes[off + pos++] = (byte) (extractedValue & 255); + extractedValue >>= 8; + extractedBits -= 8; + } while (extractedBits > 7); + decodedValue = -1; + doWriteLoop = false; + if (pos >= len) { + return pos; + } + } + + int next = in.read(); + if (next == -1) { + break; + } + + int nextValue = fromBase91[next]; + if (nextValue == -1) { + throw new IllegalArgumentException("Invalid Base91 character index of " + next + "."); + } + if (decodedValue < 0) { + decodedValue = nextValue; + } else { + decodedValue += nextValue * BASE; + + this.extractedValue |= decodedValue << extractedBits; + extractedBits += (decodedValue & 0x1FFF) > 88 ? 13 : 14; + + doWriteLoop = true; + } + } + + if (pos < len && decodedValue >= 0) { + bytes[off + pos++] = (byte) ((extractedValue | decodedValue << extractedBits) & 255); + decodedValue = -1; + } + + return pos > 0 ? pos : -1; + } + } + +} \ No newline at end of file diff --git a/tw-base-utils/src/main/java/com/transferwise/common/baseutils/jackson/DefaultJsonConverter.java b/tw-base-utils/src/main/java/com/transferwise/common/baseutils/jackson/DefaultJsonConverter.java index d03e80e..3bd5c50 100644 --- a/tw-base-utils/src/main/java/com/transferwise/common/baseutils/jackson/DefaultJsonConverter.java +++ b/tw-base-utils/src/main/java/com/transferwise/common/baseutils/jackson/DefaultJsonConverter.java @@ -7,7 +7,6 @@ import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; import com.transferwise.common.baseutils.ExceptionUtils; import java.util.List; -import org.springframework.stereotype.Component; public final class DefaultJsonConverter implements JsonConverter { diff --git a/tw-base-utils/src/main/java/com/transferwise/common/baseutils/threads/ThreadInfo.java b/tw-base-utils/src/main/java/com/transferwise/common/baseutils/threads/ThreadInfo.java new file mode 100644 index 0000000..2f44aa2 --- /dev/null +++ b/tw-base-utils/src/main/java/com/transferwise/common/baseutils/threads/ThreadInfo.java @@ -0,0 +1,20 @@ +package com.transferwise.common.baseutils.threads; + +import lombok.Data; +import lombok.experimental.Accessors; + +@Data +@Accessors(chain = true) +public class ThreadInfo { + + private Long id; + private long cpuTime; + private long userTime; + private Thread.State state; + private int priority; + private String name; + private String groupName; + private boolean daemon; + + private StackTraceElement[] stackTrace; +} \ No newline at end of file diff --git a/tw-base-utils/src/main/java/com/transferwise/common/baseutils/threads/ThreadUtils.java b/tw-base-utils/src/main/java/com/transferwise/common/baseutils/threads/ThreadUtils.java new file mode 100644 index 0000000..75a6799 --- /dev/null +++ b/tw-base-utils/src/main/java/com/transferwise/common/baseutils/threads/ThreadUtils.java @@ -0,0 +1,112 @@ +package com.transferwise.common.baseutils.threads; + +import com.google.common.util.concurrent.MoreExecutors; +import com.transferwise.common.baseutils.ExceptionUtils; +import java.lang.management.ManagementFactory; +import java.util.Objects; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class ThreadUtils { + + public static String toString(ThreadInfo threadInfo) { + var sb = new StringBuilder(); + sb.append("\"") + .append(threadInfo.getName()) + .append("\"") + .append(" group=\"").append(threadInfo.getGroupName()).append("\"") + .append(" id=").append(threadInfo.getId()) + .append(" prio=").append(threadInfo.getPriority()) + .append(" daemon=").append(threadInfo.isDaemon()) + .append(" cpuTime=").append(threadInfo.getCpuTime() / 1_000_000) + .append(" userTime=").append(threadInfo.getUserTime() / 1_000_000) + .append(" ").append(threadInfo.getState()); + + for (var stackTraceElement : threadInfo.getStackTrace()) { + sb.append("\n").append("\tat ").append(stackTraceElement.toString()); + } + + return sb.toString(); + } + + public static String toString(ThreadInfo[] threadInfos) { + var sb = new StringBuilder(); + boolean first = true; + for (var threadInfo : threadInfos) { + if (!first) { + sb.append("\n\n"); + } else { + first = false; + } + sb.append(toString(threadInfo)); + } + return sb.toString(); + } + + public static ThreadInfo[] getInconsistentThreadDump() { + return getInconsistentThreadDump(MoreExecutors.newDirectExecutorService(), 1); + } + + public static ThreadInfo[] getInconsistentThreadDump(ExecutorService executorService, int concurrency) { + long startTimeMs = System.currentTimeMillis(); + + try { + return ExceptionUtils.doUnchecked(() -> { + var threadMxBean = ManagementFactory.getThreadMXBean(); + + var threads = org.apache.commons.lang3.ThreadUtils.findThreads((Predicate) Objects::nonNull); + + var threadInfos = new ThreadInfo[threads.size()]; + + var semaphore = new Semaphore(concurrency); + var latch = new CountDownLatch(threads.size()); + + int i = 0; + for (var thread : threads) { + var threadId = thread.getId(); + semaphore.acquire(); + int finalI = i; + + executorService.submit(() -> { + try { + StackTraceElement[] stackTrace = thread.getStackTrace(); + + var threadState = new ThreadInfo() + .setStackTrace(stackTrace) + .setId(threadId) + .setState(thread.getState()) + .setName(thread.getName()) + .setPriority(thread.getPriority()) + .setCpuTime(threadMxBean.getThreadCpuTime(threadId)) + .setUserTime(threadMxBean.getThreadUserTime(threadId)) + .setGroupName(thread.getThreadGroup().getName()) + .setDaemon(thread.isDaemon()); + + threadInfos[finalI] = threadState; + } finally { + semaphore.release(); + latch.countDown(); + } + }); + i++; + } + + if (!latch.await(1, TimeUnit.MINUTES)) { + throw new IllegalStateException("We were unable to complete inconsistent thread dump."); + } + + return threadInfos; + }); + } finally { + if (log.isDebugEnabled()) { + log.debug("Inconsistent thread dump took {} ms.", (System.currentTimeMillis() - startTimeMs)); + } + } + } + +} diff --git a/tw-base-utils/src/test/groovy/com/transferwise/common/baseutils/clock/TestClockSpec.groovy b/tw-base-utils/src/test/groovy/com/transferwise/common/baseutils/clock/TestClockSpec.groovy deleted file mode 100644 index 90d5760..0000000 --- a/tw-base-utils/src/test/groovy/com/transferwise/common/baseutils/clock/TestClockSpec.groovy +++ /dev/null @@ -1,18 +0,0 @@ -package com.transferwise.common.baseutils.clock - -import spock.lang.Specification - -import java.time.Instant - -class TestClockSpec extends Specification { - def "plus can be used from groovy"() { - given: - def clock = new TestClock(Instant.parse('2021-11-04T00:00:00Z')) - - when: - clock + 'P1D' - - then: - clock.instant().toString() == '2021-11-05T00:00:00Z' - } -} diff --git a/tw-base-utils/src/test/groovy/com/transferwise/common/baseutils/concurrency/DiscardingQueueProcessorSpec.groovy b/tw-base-utils/src/test/groovy/com/transferwise/common/baseutils/concurrency/DiscardingQueueProcessorSpec.groovy deleted file mode 100644 index e81c685..0000000 --- a/tw-base-utils/src/test/groovy/com/transferwise/common/baseutils/concurrency/DiscardingQueueProcessorSpec.groovy +++ /dev/null @@ -1,151 +0,0 @@ -package com.transferwise.common.baseutils.concurrency - - -import spock.lang.Specification - -import java.util.concurrent.CountDownLatch -import java.util.concurrent.ExecutorService -import java.util.concurrent.Executors - -import static org.awaitility.Awaitility.await - -public class DiscardingQueueProcessorSpec extends Specification { - private ExecutorService executorService - private DiscardingQueueProcessor processor - private List results - private List errors; - - def setup() { - results = Collections.synchronizedList(new ArrayList<>()); - errors = Collections.synchronizedList(new ArrayList<>()); - executorService = Executors.newCachedThreadPool() - processor = new DiscardingQueueProcessor(executorService, { payload -> - results.add(payload.data) - }) { - @Override - void onError(Throwable t) { - errors.add(t); - } - } - processor.setSoftQueueLimit(5) - processor.setHardQueueLimit(10) - - processor.start() - } - - def cleanup() { - processor.stop({ - executorService.shutdown() - }) - } - - def "processing a single event works"() { - when: - processor.schedule("Hello TransferWise!") - await().until() { - results.size() == 1 - } - then: - results[0] == "Hello TransferWise!" - when: - processor.schedule("Hi") - await().until { - results.size() == 2 - } - then: - results[1] == "Hi" - } - - def "soft queue size will discard similar messages"() { - when: - CountDownLatch latch = new CountDownLatch(20) - - processor.processor = { payload -> - latch.await() - results.add(payload.data) - } - - processor.setSoftLimitPredicate({ data -> true }) - - int softDiscardedCount = 0 - 20.times { - def result = processor.schedule("${it}") - if (!result.scheduled && result.discardReason == DiscardingQueueProcessor.DiscardReason.SOFT_LIMIT) { - softDiscardedCount++ - } - latch.countDown() - } - - await().until() { - println(results.size()) - results.size() == 5 - } - then: - results.size() == 5 - softDiscardedCount == 15 - } - - def "hard queue limit is applied"() { - when: - CountDownLatch latch = new CountDownLatch(20) - - processor.processor = { payload -> - latch.await() - results.add(payload.data) - } - - processor.setSoftLimitPredicate({ data -> false }) - - 20.times { - processor.schedule("${it}") - latch.countDown() - } - - await().until() { - results.size() == 10 - } - then: - results.size() == 10 - } - - def "processor errors will not stop processing"() { - when: - CountDownLatch latch = new CountDownLatch(20) - - processor.processor = { payload -> - latch.await() - throw new Exception("Bad Things Do Happen"); - } - - 20.times { - processor.schedule("${it}") - latch.countDown() - } - - await().until({ - errors.size() == 10 - }) - then: - results.size() == 0 - errors.size() == 10 - errors[0].getMessage() == "Bad Things Do Happen" - when: - latch = new CountDownLatch(20) - processor.processor = { payload -> - latch.await() - results.add(payload.data) - } - - 20.times { - processor.schedule("${it}") - latch.countDown() - } - - await().until() { - results.size() == 10 - } - then: - results.size() == 10 - } - -} diff --git a/tw-base-utils/src/test/java/com/transferwise/common/baseutils/bitmap/LargeBitmapSerializerTest.java b/tw-base-utils/src/test/java/com/transferwise/common/baseutils/bitmap/LargeBitmapSerializerTest.java new file mode 100644 index 0000000..aafad1f --- /dev/null +++ b/tw-base-utils/src/test/java/com/transferwise/common/baseutils/bitmap/LargeBitmapSerializerTest.java @@ -0,0 +1,156 @@ +package com.transferwise.common.baseutils.bitmap; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.Random; +import java.util.concurrent.ThreadLocalRandom; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +@Slf4j +public class LargeBitmapSerializerTest { + + @Test + @SneakyThrows + public void testSerializeSingleBitset() { + var originalBitmap = new LargeBitmapImpl(); + + originalBitmap.set(1); + originalBitmap.set(3); + + var deserialized = serializedCopy(originalBitmap); + assertThat(deserialized, equalTo(originalBitmap)); + } + + @Test + @SneakyThrows + public void testSerializeSingleRange() { + var originalBitmap = new LargeBitmapImpl(); + + for (int i = 0; i < 100; i++) { + originalBitmap.set(100 + i); + } + + var deserialized = serializedCopy(originalBitmap); + assertThat(deserialized, equalTo(originalBitmap)); + } + + @Test + @SneakyThrows + public void testSerializeContinuousBitsets() { + var originalBitmap = new LargeBitmapImpl(); + + for (int i = 0; i < 130; i++) { + if (i % 2 == 0) { + originalBitmap.set(100 + i); + } + } + + var deserialized = serializedCopy(originalBitmap); + assertThat(deserialized, equalTo(originalBitmap)); + } + + static int[] getSeeds() { + var n = 128; + var result = new int[n]; + for (int i = 0; i < n; i++) { + result[i] = ThreadLocalRandom.current().nextInt(); + } + return result; + } + + @ParameterizedTest + @MethodSource("getSeeds") + @SneakyThrows + public void testRandomAtIntBoundary(int seed) { + var rnd = new Random(seed); + + var originalBitmap = new LargeBitmapImpl(); + + var range = rnd.nextInt(2, 10000); + var bits = rnd.nextInt(10000); + + var origin = (long) Integer.MAX_VALUE - Integer.MIN_VALUE - range / 2; + var bound = origin + range; + + log.info("Testing at int boundary with seed {}, range of {} and {} bits.", seed, range, bits); + + for (int i = 0; i < bits; i++) { + originalBitmap.set(rnd.nextLong(origin, bound)); + } + + var deserialized = serializedCopy(originalBitmap); + assertThat(deserialized, equalTo(originalBitmap)); + } + + @ParameterizedTest + @MethodSource("getSeeds") + @SneakyThrows + public void testRandomSparseBitsAtLargeRange(int seed) { + var rnd = new Random(seed); + + var originalBitmap = new LargeBitmapImpl(); + + var bits = rnd.nextInt(10000); + + var origin = rnd.nextLong(Long.MAX_VALUE / 4); + var bound = origin + rnd.nextLong(Long.MAX_VALUE / 4); + + log.info("Testing with seed {}, range of {} and {} bits.", seed, bound - origin, bits); + + for (int i = 0; i < bits; i++) { + originalBitmap.set(rnd.nextLong(origin, bound)); + } + + var deserialized = serializedCopy(originalBitmap); + assertThat(deserialized, equalTo(originalBitmap)); + } + + @ParameterizedTest + @MethodSource("getSeeds") + @SneakyThrows + public void testRandomContinuousBitsAtLargeRange(int seed) { + var rnd = new Random(seed); + + var originalBitmap = new LargeBitmapImpl(); + + var blocks = rnd.nextInt(1024); + + var origin = rnd.nextLong(Long.MAX_VALUE / 4); + var bound = origin + rnd.nextLong(Long.MAX_VALUE / 4); + var blockSize = rnd.nextInt(1024); + + log.info( + "Testing with seed {}, range of {}, block size of {} and {} blocks", seed, bound - origin, + blockSize, blocks); + + for (int i = 0; i < blocks; i++) { + var blockOffset = rnd.nextLong(origin, bound); + + originalBitmap.set(blockOffset, blockOffset + blockSize); + } + + var deserialized = serializedCopy(originalBitmap); + assertThat(deserialized, equalTo(originalBitmap)); + } + + protected LargeBitmap serializedCopy(LargeBitmap originalBitmap) throws IOException { + var serializer = new LargeBitmapSerializerImpl(); + + var bos = new ByteArrayOutputStream(); + serializer.serialize(originalBitmap, bos); + + var bis = new ByteArrayInputStream(bos.toByteArray()); + var copy = new LargeBitmapImpl(); + serializer.deserializeInto(copy, bis); + return copy; + } + +} diff --git a/tw-base-utils/src/test/java/com/transferwise/common/baseutils/bitmap/LargeBitmapTest.java b/tw-base-utils/src/test/java/com/transferwise/common/baseutils/bitmap/LargeBitmapTest.java new file mode 100644 index 0000000..d385ce2 --- /dev/null +++ b/tw-base-utils/src/test/java/com/transferwise/common/baseutils/bitmap/LargeBitmapTest.java @@ -0,0 +1,270 @@ +package com.transferwise.common.baseutils.bitmap; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; + +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.util.Random; +import java.util.TreeSet; +import java.util.concurrent.ThreadLocalRandom; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +@Slf4j +@SuppressFBWarnings("DMI") +public class LargeBitmapTest { + + private static final long INT_SIZE = 0x100000000L; + + @Test + public void testOverwritingBits() { + var bitmap = new LargeBitmapImpl(); + + bitmap.set(666); + assertThat(bitmap.isSet(666), equalTo(true)); + bitmap.set(666); + assertThat(bitmap.isSet(666), equalTo(true)); + + bitmap.clear(666); + assertThat(bitmap.isSet(666), equalTo(false)); + bitmap.set(666); + assertThat(bitmap.isSet(666), equalTo(true)); + + bitmap.clear(666); + assertThat(bitmap.isSet(666), equalTo(false)); + bitmap.clear(666); + assertThat(bitmap.isSet(666), equalTo(false)); + } + + @Test + public void testTwoPartitions() { + final var bitmap = new LargeBitmapImpl(); + + final long i = INT_SIZE; + + bitmap.set(i); + bitmap.set(i + 1); + + assertThat(bitmap.getFirstSetBit(), equalTo(i)); + assertThat(bitmap.getFirstSetBit(i + 1), equalTo(i + 1)); + + bitmap.clear(i); + assertThat(bitmap.getFirstSetBit(), equalTo(1L + i)); + + bitmap.clear(1L + i); + assertThat(bitmap.isEmpty(), equalTo(true)); + } + + @Test + void testGettingFistSetBit() { + final var bitmap = new LargeBitmapImpl(); + + bitmap.set(4294967254L); + bitmap.set(INT_SIZE + 64); + assertThat(bitmap.getFirstSetBit(4294967254L + 1), equalTo(INT_SIZE + 64L)); + assertThat(bitmap.getFirstSetBit(INT_SIZE + 65L), equalTo(-1L)); + + bitmap.clear(); + assertThat(bitmap.getFirstSetBit(0), equalTo(-1L)); + + bitmap.set(1000); + assertThat(bitmap.getFirstSetBit(), equalTo(1000L)); + assertThat(bitmap.getFirstSetBit(1000), equalTo(1000L)); + assertThat(bitmap.getFirstSetBit(1001), equalTo(-1L)); + } + + @Test + void testGettingFirstClearBit() { + final var bitmap = new LargeBitmapImpl(); + + bitmap.set(INT_SIZE - 1); + bitmap.set(INT_SIZE + 1); + + assertThat(bitmap.getFirstClearBit(INT_SIZE - 1), equalTo(INT_SIZE)); + + bitmap.clear(); + bitmap.set(INT_SIZE - 1); + bitmap.set(INT_SIZE + 1); + assertThat(bitmap.getFirstClearBit(INT_SIZE - 1), equalTo(INT_SIZE)); + + bitmap.clear(); + bitmap.set(INT_SIZE - 1); + bitmap.set(INT_SIZE * 2 + 1); + assertThat(bitmap.getFirstClearBit(INT_SIZE - 1), equalTo(INT_SIZE)); + + bitmap.clear(); + bitmap.set(INT_SIZE * 5 + 10); + assertThat(bitmap.getFirstClearBit(0), equalTo(0L)); + } + + @Test + public void testRangeOperations() { + final var bitmap = new LargeBitmapImpl(); + final var start = 1L; + final var end = 999L; + + bitmap.set(start, end); + for (long i = start; i <= end; i++) { + assertThat(bitmap.isSet(i), equalTo(true)); + } + + assertThat(bitmap.isSet(start - 1), equalTo(false)); + assertThat(bitmap.isSet(end + 1), equalTo(false)); + + bitmap.clear(start, end); + assertThat(bitmap.isEmpty(), equalTo(true)); + } + + @Test + public void testRangeOperationsAtBoundary() { + final var bitmap = new LargeBitmapImpl(); + final var start = INT_SIZE - 500; + final var end = INT_SIZE + 500; + + bitmap.set(start, end); + for (long i = start; i <= end; i++) { + assertThat(bitmap.isSet(i), equalTo(true)); + } + + assertThat(bitmap.isSet(start - 1), equalTo(false)); + assertThat(bitmap.isSet(end + 1), equalTo(false)); + + assertThat(bitmap.getFirstSetBit(start), equalTo(start)); + assertThat(bitmap.getFirstClearBit(start), equalTo(end + 1)); + + bitmap.clear(start, end); + assertThat(bitmap.isEmpty(), equalTo(true)); + } + + @Test + public void testRangeOperationsOverMultipleBoundaries() { + final var bitmap = new LargeBitmapImpl(); + final var start = INT_SIZE - 500; + final var end = INT_SIZE * 5 + 500; + + bitmap.set(start, end); + + assertThat(bitmap.isSet(start - 1), equalTo(false)); + assertThat(bitmap.isSet(end + 1), equalTo(false)); + + assertThat(bitmap.getFirstSetBit(start), equalTo(start)); + assertThat(bitmap.getFirstClearBit(start), equalTo(end + 1)); + + bitmap.clear(start, end); + assertThat(bitmap.isEmpty(), equalTo(true)); + } + + static int[] getSeeds() { + final var n = 128; + var result = new int[n]; + for (int i = 0; i < n; i++) { + result[i] = ThreadLocalRandom.current().nextInt(); + } + return result; + } + + @ParameterizedTest + @MethodSource("getSeeds") + void testRandomRangeOverBoundary(int seed) { + var bitmap = new LargeBitmapImpl(); + var bits = new TreeSet(); + var rnd = new Random(seed); + + long range = rnd.nextLong(10000); + long origin = INT_SIZE - range / 2; + long bound = INT_SIZE + range / 2; + long iterations = rnd.nextLong(1000); + + log.info("Testing random range over boundary with range {} and iterations: {}.", range, + iterations); + + for (int i = 0; i < iterations; i++) { + var bit = rnd.nextLong(origin, bound); + + bits.add(bit); + bitmap.set(bit); + + assertThat(bits.first(), equalTo(bitmap.getFirstSetBit())); + assertThat(bits.last(), equalTo(bitmap.getFirstSetBit(bits.last()))); + } + + for (int i = 0; i < iterations; i++) { + var bit = rnd.nextLong(origin, bound); + + bits.remove(bit); + bitmap.clear(bit); + + if (bits.isEmpty()) { + assertThat(bitmap.isEmpty(), equalTo(true)); + } else { + assertThat(bits.first(), equalTo(bitmap.getFirstSetBit())); + assertThat(bits.last(), equalTo(bitmap.getFirstSetBit(bits.last()))); + } + } + + if (bits.isEmpty()) { + assertThat(bitmap.isEmpty(), equalTo(true)); + } else { + var lastBit = bits.first() - 1; + for (var bit : bits) { + for (long i = lastBit + 1; i < bit; i++) { + assertThat(bitmap.isSet(i), equalTo(false)); + } + assertThat(bitmap.isSet(bit), equalTo(true)); + + lastBit = bit; + } + } + } + + @ParameterizedTest + @MethodSource("getSeeds") + void testRandomRange(int seed) { + var bitmap = new LargeBitmapImpl(); + var bits = new TreeSet(); + var rnd = new Random(seed); + long iterations = rnd.nextLong(1000); + + log.info("Testing random range with iterations {}.", iterations); + + for (int i = 0; i < iterations; i++) { + var bit = rnd.nextLong(0, Long.MAX_VALUE); + + bits.add(bit); + bitmap.set(bit); + + assertThat(bits.first(), equalTo(bitmap.getFirstSetBit())); + assertThat(bits.last(), equalTo(bitmap.getFirstSetBit(bits.last()))); + } + + var it = bits.iterator(); + + while (it.hasNext()) { + var bit = it.next(); + it.remove(); + bitmap.clear(bit); + + if (bits.isEmpty()) { + assertThat(bitmap.isEmpty(), equalTo(true)); + } else { + assertThat(bits.first(), equalTo(bitmap.getFirstSetBit())); + assertThat(bits.last(), equalTo(bitmap.getFirstSetBit(bits.last()))); + } + } + } + + @Test + void testCheckedOperations() { + var bitmap = new LargeBitmapImpl(); + + assertThat(bitmap.checkedSet(1), equalTo(true)); + assertThat(bitmap.checkedSet(1), equalTo(false)); + + assertThat(bitmap.checkedClear(1), equalTo(true)); + assertThat(bitmap.checkedClear(1), equalTo(false)); + + assertThat(bitmap.checkedSet(1), equalTo(true)); + } +} diff --git a/tw-base-utils/src/test/java/com/transferwise/common/baseutils/concurrency/DiscardingQueueProcessorTest.java b/tw-base-utils/src/test/java/com/transferwise/common/baseutils/concurrency/DiscardingQueueProcessorTest.java new file mode 100644 index 0000000..39fe3a9 --- /dev/null +++ b/tw-base-utils/src/test/java/com/transferwise/common/baseutils/concurrency/DiscardingQueueProcessorTest.java @@ -0,0 +1,141 @@ +package com.transferwise.common.baseutils.concurrency; + +import static org.awaitility.Awaitility.await; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; + +import com.google.common.base.Preconditions; +import com.transferwise.common.baseutils.ExceptionUtils; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public class DiscardingQueueProcessorTest { + + private ExecutorService executorService; + private DiscardingQueueProcessor processor; + private List results; + private List errors; + + @BeforeEach + void beforeEach() { + results = Collections.synchronizedList(new ArrayList<>()); + errors = Collections.synchronizedList(new ArrayList<>()); + executorService = Executors.newCachedThreadPool(); + processor = new DiscardingQueueProcessor<>(executorService, payload -> results.add(payload.getData())) { + @Override + public void onError(Throwable t) { + errors.add(t); + } + }; + processor.setSoftQueueLimit(5); + processor.setHardQueueLimit(10); + + processor.start(); + } + + @AfterEach + void afterEach() { + processor.stop(() -> executorService.shutdown()); + } + + @Test + void processingSingleEventWorks() { + processor.schedule("Hello TransferWise!"); + await().until(() -> results.size(), equalTo(1)); + assertThat(results.get(0), equalTo("Hello TransferWise!")); + + processor.schedule("Hi"); + await().until(() -> results.size(), equalTo(2)); + assertThat(results.get(1), equalTo("Hi")); + } + + @Test + void softQueueSizeWillDiscardSimilarMessages() { + var latch = new CountDownLatch(20); + + processor.setProcessor(payload -> { + awaitOrThrow(latch); + results.add(payload.getData()); + }); + + processor.setSoftLimitPredicate(data -> true); + + var softDiscardedCount = 0; + for (var i = 0; i < 20; i++) { + var result = processor.schedule(String.valueOf(i)); + if (!result.isScheduled() && result.getDiscardReason() == DiscardingQueueProcessor.DiscardReason.SOFT_LIMIT) { + softDiscardedCount++; + } + latch.countDown(); + } + + await().until(() -> results.size(), equalTo(5)); + + assertThat(results.size(), equalTo(5)); + assertThat(softDiscardedCount, equalTo(15)); + } + + @Test + void hardQueueLimitIsApplied() { + var latch = new CountDownLatch(20); + + processor.setProcessor(payload -> { + awaitOrThrow(latch); + results.add(payload.getData()); + }); + + processor.setSoftLimitPredicate(data -> false); + + for (var i = 0; i < 20; i++) { + processor.schedule(String.valueOf(i)); + latch.countDown(); + } + + await().until(() -> results.size(), equalTo(10)); + } + + @Test + void processorErrorsWillNotStopProcessing() { + var latch = new CountDownLatch(20); + + processor.setProcessor(payload -> { + awaitOrThrow(latch); + throw new RuntimeException("Bad Things Do Happen"); + }); + + for (var i = 0; i < 20; i++) { + processor.schedule(String.valueOf(i)); + latch.countDown(); + } + + await().until(() -> errors.size(), equalTo(10)); + assertThat(results.size(), equalTo(0)); + assertThat(errors.size(), equalTo(10)); + assertThat(errors.get(0).getMessage(), equalTo("Bad Things Do Happen")); + + var latch1 = new CountDownLatch(20); + processor.setProcessor(payload -> { + awaitOrThrow(latch1); + results.add(payload.getData()); + }); + + for (var i = 0; i < 20; i++) { + processor.schedule(String.valueOf(i)); + latch1.countDown(); + } + + await().until(() -> results.size(), equalTo(10)); + } + + void awaitOrThrow(CountDownLatch latch) { + ExceptionUtils.doUnchecked(() -> Preconditions.checkState(latch.await(10, TimeUnit.SECONDS))); + } +} diff --git a/tw-base-utils/src/test/java/com/transferwise/common/baseutils/concurrency/SimpleScheduledTaskExecutorTest.java b/tw-base-utils/src/test/java/com/transferwise/common/baseutils/concurrency/SimpleScheduledTaskExecutorTest.java index 9239216..40a74c1 100644 --- a/tw-base-utils/src/test/java/com/transferwise/common/baseutils/concurrency/SimpleScheduledTaskExecutorTest.java +++ b/tw-base-utils/src/test/java/com/transferwise/common/baseutils/concurrency/SimpleScheduledTaskExecutorTest.java @@ -7,6 +7,7 @@ import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; +import com.google.common.base.Preconditions; import com.transferwise.common.baseutils.BaseTest; import com.transferwise.common.baseutils.clock.TestClock; import com.transferwise.common.baseutils.concurrency.ScheduledTaskExecutor.TaskHandle; @@ -146,7 +147,7 @@ public void testParallelExecution() { return true; }); - latch.await(1, TimeUnit.MINUTES); + Preconditions.checkState(latch.await(1, TimeUnit.MINUTES)); await().until(() -> { testClock.tick(Duration.ofSeconds(1)); diff --git a/tw-base-utils/src/test/java/com/transferwise/common/baseutils/encoding/Base91Test.java b/tw-base-utils/src/test/java/com/transferwise/common/baseutils/encoding/Base91Test.java new file mode 100644 index 0000000..7ddf19b --- /dev/null +++ b/tw-base-utils/src/test/java/com/transferwise/common/baseutils/encoding/Base91Test.java @@ -0,0 +1,157 @@ +package com.transferwise.common.baseutils.encoding; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; + +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.StringWriter; +import java.nio.charset.StandardCharsets; +import java.util.Random; +import java.util.concurrent.ThreadLocalRandom; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.io.IOUtils; +import org.apache.commons.io.output.WriterOutputStream; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +@Slf4j +public class Base91Test { + + static String[][] encodingOfStringsSource() { + return new String[][]{ + {"Kristo Kuusküll", "qz,<[[oCU6R;;m%);jwJ"}, + {"Конечно! Вот предложение на русском: \"Сегодня прекрасная погода.\"", + "n;f#,4PE~<\"Kn%k/]E;CVNg^?k&t^|)/mu~PNlr&#kyXDI`#?^fS+4\"D3[xX+Hx/VxIiZBVK8kwXjIk&8\"tB]4h9>`)4RE.f{{~|oLt~nS44\"D>=NBPJ" + + "(/nu!QeiA~%F*~! { + runningThreads.incrementAndGet(); + while (shouldRun.get()) { + ExceptionUtils.doUnchecked(() -> Thread.sleep(1)); + } + }); + } + + await().until(() -> runningThreads.get() == threadsCount); + + long start = System.currentTimeMillis(); + var threadInfos = ThreadUtils.getInconsistentThreadDump(executorService, threadsCount); + + shouldRun.set(false); + + System.out.println("It took " + (System.currentTimeMillis() - start) + "ms"); + + System.out.println(ThreadUtils.toString(threadInfos)); + + int suitableThreadsFound = 0; + + for (var threadInfo : threadInfos) { + var st = ThreadUtils.toString(threadInfo); + if (st.contains("at ") + && st.contains(ThreadUtilsTest.class.getName()) + && st.contains("group=\"main\"")) { + suitableThreadsFound++; + } + } + + // 1 is the test's thread itself + assertThat(suitableThreadsFound, equalTo(threadsCount + 1)); + } + + @Test + @SneakyThrows + void takingThreadDumpWithCurrentThreadWorks() { + assertThat(ThreadUtils.getInconsistentThreadDump().length, greaterThan(0)); + } +}