Skip to content

Commit

Permalink
TANGO-2124-unit-tests : Stable unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
adamallegro committed Aug 28, 2024
1 parent 92ca69e commit 484a073
Show file tree
Hide file tree
Showing 7 changed files with 218 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ public SingleRecipientMessageSenderAdapter(CompletableFutureAwareMessageSender a
this.adaptee = adaptee;
}

public CompletableFutureAwareMessageSender getAdaptee() {
return adaptee;
}

@Override
public CompletableFuture<MessageSendingResult> send(Message message) {
return resilientMessageSender.send(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ public class GoogleBigQueryJsonStreamWriterFactory implements GoogleBigQueryStre
private final Credentials credentials;
private final BigQueryWriteClient writeClient;

public GoogleBigQueryJsonStreamWriterFactory(CredentialsProvider credentials, BigQueryWriteSettings writeSettings) throws IOException {
public GoogleBigQueryJsonStreamWriterFactory(CredentialsProvider credentials, GoogleBigQueryJsonWriteClientProvider writeClientProvider) throws IOException {
this.credentials = credentials.getCredentials();
this.writeClient = BigQueryWriteClient.create(writeSettings);
this.writeClient = writeClientProvider.getWriteClient();
}

public JsonStreamWriter getWriterForStream(String stream) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package pl.allegro.tech.hermes.consumers.consumer.sender.googlebigquery.json;

import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Component
public class GoogleBigQueryJsonWriteClientProvider {
private final BigQueryWriteSettings writeSettings;
public GoogleBigQueryJsonWriteClientProvider(BigQueryWriteSettings writeSettings) {
this.writeSettings = writeSettings;
}

public BigQueryWriteClient getWriteClient() throws IOException {
return BigQueryWriteClient.create(this.writeSettings);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package pl.allegro.tech.hermes.consumers.consumer.sender.googlebigquery

import com.google.api.gax.core.NoCredentialsProvider
import com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings
import pl.allegro.tech.hermes.api.ContentType
import pl.allegro.tech.hermes.api.DeliveryType
import pl.allegro.tech.hermes.api.EndpointAddress
import pl.allegro.tech.hermes.api.Subscription
import pl.allegro.tech.hermes.api.SubscriptionMode
import pl.allegro.tech.hermes.consumers.consumer.ResilientMessageSender
import pl.allegro.tech.hermes.consumers.consumer.rate.SerialConsumerRateLimiter
import pl.allegro.tech.hermes.consumers.consumer.sender.MessageSender
import pl.allegro.tech.hermes.consumers.consumer.sender.SingleRecipientMessageSenderAdapter
import pl.allegro.tech.hermes.consumers.consumer.sender.googlebigquery.avro.GoogleBigQueryAvroDataWriterPool
import pl.allegro.tech.hermes.consumers.consumer.sender.googlebigquery.avro.GoogleBigQueryAvroMessageTransformer
import pl.allegro.tech.hermes.consumers.consumer.sender.googlebigquery.avro.GoogleBigQueryAvroStreamWriterFactory
import pl.allegro.tech.hermes.consumers.consumer.sender.googlebigquery.avro.GoogleBigQueryAvroToProtoConverter
import pl.allegro.tech.hermes.consumers.consumer.sender.googlebigquery.json.GoogleBigQueryJsonDataWriterPool
import pl.allegro.tech.hermes.consumers.consumer.sender.googlebigquery.json.GoogleBigQueryJsonMessageTransformer
import pl.allegro.tech.hermes.consumers.consumer.sender.googlebigquery.json.GoogleBigQueryJsonSender
import pl.allegro.tech.hermes.consumers.consumer.sender.googlebigquery.json.GoogleBigQueryJsonStreamWriterFactory
import pl.allegro.tech.hermes.domain.CredentialsRepository
import spock.lang.Specification

class GoogleBigQueryMessageSenderProviderTest extends Specification {
def 'should create sender provider'() {
given:
GoogleBigQueryMessageSenderProvider provider = new GoogleBigQueryMessageSenderProvider(
Mock(GoogleBigQuerySenderTargetResolver),
Mock(GoogleBigQueryJsonMessageTransformer),
Mock(GoogleBigQueryAvroMessageTransformer),
Mock(GoogleBigQueryJsonDataWriterPool),
Mock(GoogleBigQueryAvroDataWriterPool)
)

EndpointAddress endpointAddress = EndpointAddress.of("googlebigquery://projects/project/datasets/dataset/tables/table")
Subscription subscription = Subscription.create("pl.allegro.group.topicname",
"subscription",
endpointAddress,
Subscription.State.PENDING,
"test",
Map.of(),
false,
null,
null,
null,
contentType,
DeliveryType.SERIAL,
[],
SubscriptionMode.ANYCAST,
[],
null,
null,
false,
false,
0,
false,
false
)

ResilientMessageSender resilientMessageSender = Mock(ResilientMessageSender)

when:

MessageSender sender = provider.create(subscription, resilientMessageSender)


then:
(sender as SingleRecipientMessageSenderAdapter).getAdaptee().getClass().name == adapteeClassName

where:
contentType || adapteeClassName
ContentType.JSON || "pl.allegro.tech.hermes.consumers.consumer.sender.googlebigquery.json.GoogleBigQueryJsonSender"
ContentType.AVRO || "pl.allegro.tech.hermes.consumers.consumer.sender.googlebigquery.avro.GoogleBigQueryAvroSender"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package pl.allegro.tech.hermes.consumers.consumer.sender.googlebigquery

import com.google.cloud.bigquery.storage.v1.TableName
import pl.allegro.tech.hermes.api.EndpointAddress
import spock.lang.Specification

class GoogleBigQuerySenderTargetResolverTest extends Specification {
def 'should resolve endpoint address'() {

given:
EndpointAddress endpointAddress = EndpointAddress.of("googlebigquery://projects/project/datasets/dataset/tables/table")
GoogleBigQuerySenderTargetResolver resolver = new GoogleBigQuerySenderTargetResolver()

TableName expected = TableName.of("project", "dataset", "table")



when:
GoogleBigQuerySenderTarget target = resolver.resolve(endpointAddress)

then:
target.getTableName().project == expected.project
target.getTableName().dataset == expected.dataset
target.getTableName().table == expected.table
}

def 'should throw exception when endpoint address is invalid'() {

given:
EndpointAddress endpointAddress = EndpointAddress.of("googlebigquery://projects/project/datasets/dataset/tables/table/streams/_default")
GoogleBigQuerySenderTargetResolver resolver = new GoogleBigQuerySenderTargetResolver()

when:
resolver.resolve(endpointAddress)

then:
thrown(IllegalArgumentException)

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package pl.allegro.tech.hermes.consumers.consumer.sender.googlebigquery

import com.google.cloud.bigquery.storage.v1.TableName
import spock.lang.Specification

class GoogleBigQuerySenderTargetTest extends Specification{
def 'should create sender target'(){

given:
TableName tableName = TableName.of("project", "dataset", "table")

when:
GoogleBigQuerySenderTarget target = GoogleBigQuerySenderTarget.newBuilder().withTableName(tableName).build()

then:
tableName == target.tableName
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package pl.allegro.tech.hermes.consumers.consumer.sender.googlebigquery.json

import org.json.JSONException
import org.json.JSONObject
import pl.allegro.tech.hermes.api.ContentType
import pl.allegro.tech.hermes.consumers.consumer.Message
import pl.allegro.tech.hermes.consumers.test.MessageBuilder
import spock.lang.Specification

class GoogleBigQueryJsonMessageTransformerTest extends Specification {
def 'should translate valid message to json'() {
given:
GoogleBigQueryJsonMessageTransformer transformer = new GoogleBigQueryJsonMessageTransformer()
byte[] content = "{'message': 'some text'}".bytes
String expected = "{\"message\":\"some text\"}"
Message message = MessageBuilder.withTestMessage()
.withContentType(ContentType.JSON)
.withContent(content)
.build()

when:
JSONObject object = transformer.fromHermesMessage(message)

then:
object.toString() == expected
}

def 'should throw an exception when json is invalid'() {
given:
GoogleBigQueryJsonMessageTransformer transformer = new GoogleBigQueryJsonMessageTransformer()
byte[] content = "not a json".bytes
Message message = MessageBuilder.withTestMessage()
.withContentType(ContentType.JSON)
.withContent(content)
.build()

when:
transformer.fromHermesMessage(message)

then:
thrown(JSONException)
}
def 'should throw an exception when message is not a json'() {
given:
GoogleBigQueryJsonMessageTransformer transformer = new GoogleBigQueryJsonMessageTransformer()
byte[] content = "not a json".bytes
Message message = MessageBuilder.withTestMessage()
.withContentType(ContentType.AVRO)
.withContent(content)
.build()

when:
transformer.fromHermesMessage(message)

then:
thrown(IllegalArgumentException)
}
}

0 comments on commit 484a073

Please sign in to comment.