From c46ba0a1a5392d262840fb8ae714ebaae02c0829 Mon Sep 17 00:00:00 2001 From: ouertani Date: Tue, 3 Apr 2018 10:36:31 +0300 Subject: [PATCH 1/6] add KafkaStreamsS --- project/Versions.scala | 2 +- project/build.properties | 2 +- .../kafka/scala/streams/KafkaStreamsS.scala | 89 +++++++++++++++++++ .../kafka/scala/server/KafkaLocalServer.scala | 3 +- .../scala/streams/KafkaStreamsTest.scala | 10 +-- ...bilisticCountingScalaIntegrationTest.scala | 9 +- .../kafka/scala/streams/PunctuateTest.scala | 9 +- ...inScalaIntegrationTestImplicitSerdes.scala | 14 ++- ...ntegrationTestImplicitSerdesWithAvro.scala | 28 +++--- 9 files changed, 118 insertions(+), 48 deletions(-) create mode 100644 src/main/scala/com/lightbend/kafka/scala/streams/KafkaStreamsS.scala diff --git a/project/Versions.scala b/project/Versions.scala index 042d24ad0..4e56ded62 100644 --- a/project/Versions.scala +++ b/project/Versions.scala @@ -7,7 +7,7 @@ object Versions { val CuratorVersion = "4.0.0" val MinitestVersion = "2.0.0" val JDKVersion = "1.8" - val Scala_2_12_Version = "2.12.4" + val Scala_2_12_Version = "2.12.5" val Scala_2_11_Version = "2.11.11" val Avro4sVersion = "1.8.3" val CrossScalaVersions = Seq(Scala_2_12_Version, Scala_2_11_Version ) diff --git a/project/build.properties b/project/build.properties index 8b697bbb9..05313438a 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=1.1.0 +sbt.version=1.1.2 diff --git a/src/main/scala/com/lightbend/kafka/scala/streams/KafkaStreamsS.scala b/src/main/scala/com/lightbend/kafka/scala/streams/KafkaStreamsS.scala new file mode 100644 index 000000000..17971e741 --- /dev/null +++ b/src/main/scala/com/lightbend/kafka/scala/streams/KafkaStreamsS.scala @@ -0,0 +1,89 @@ +package com.lightbend.kafka.scala.streams + +import java.util.Properties + +import org.apache.kafka.common.{Metric, MetricName} +import org.apache.kafka.streams.processor.{StateRestoreListener, StreamPartitioner, ThreadMetadata} +import org.apache.kafka.streams.state.{QueryableStoreType, StreamsMetadata} +import org.apache.kafka.streams.{KafkaClientSupplier, KafkaStreams, StreamsConfig, Topology} + +import scala.collection.JavaConverters._ + +class KafkaStreamsS(inner: KafkaStreams) { + + def allMetadata(): Iterable[StreamsMetadata] = { + inner.allMetadata().asScala + } + + def allMetadataForStore(storeName: String): Iterable[StreamsMetadata] = { + inner.allMetadataForStore(storeName).asScala + } + + def cleanUp() = { + inner.cleanUp() + this + } + + def close() = { + inner.close() + } + + def close(timeout: Long, timeUnit: java.util.concurrent.TimeUnit) = { + inner.close(timeout, timeUnit) + } + + def localThreadsMetadata(): Set[ThreadMetadata] = { + inner.localThreadsMetadata.asScala.toSet + } + + def metadataForKey[K](storeName: String, key: K, keySerializer: Serializer[K]): StreamsMetadata = { + inner.metadataForKey(storeName, key, keySerializer) + } + + def metadataForKey[K](storeName: String, key: K, partitioner: StreamPartitioner[_ >: K, _]): StreamsMetadata = { + inner.metadataForKey(storeName, key, partitioner) + } + + def metrics(): Map[MetricName, _ <: Metric] = { + inner.metrics().asScala.toMap + } + + def withGlobalStateRestoreListener(globalStateRestoreListener: StateRestoreListener) = { + inner.setGlobalStateRestoreListener(globalStateRestoreListener) + this + } + + def withStateListener(listener: KafkaStreams.StateListener) = { + inner.setStateListener(listener) + this + } + + def withUncaughtExceptionHandler(eh: java.lang.Thread.UncaughtExceptionHandler) = { + inner.setUncaughtExceptionHandler(eh) + this + } + + def start(): KafkaStreamsS = { + inner.start() + this + } + + def state(): KafkaStreams.State = { + inner.state() + } + + def store[T](storeName: String, queryableStoreType: QueryableStoreType[T]) = { + inner.store(storeName, queryableStoreType) + } +} + +object KafkaStreamsS { + def apply(s: StreamsBuilderS, p: Properties): KafkaStreamsS = new KafkaStreamsS(new KafkaStreams(s.build(), p)) + + def apply(topology: Topology, p: Properties): KafkaStreamsS = new KafkaStreamsS(new KafkaStreams(topology, p)) + + def apply(topology: Topology, config: StreamsConfig) = new KafkaStreamsS(new KafkaStreams(topology, config)) + + def apply(topology: Topology, config: StreamsConfig, clientSupplier: KafkaClientSupplier) = new KafkaStreamsS(new KafkaStreams(topology, config, clientSupplier)) + +} diff --git a/src/test/scala/com/lightbend/kafka/scala/server/KafkaLocalServer.scala b/src/test/scala/com/lightbend/kafka/scala/server/KafkaLocalServer.scala index f65217545..174a6cb49 100644 --- a/src/test/scala/com/lightbend/kafka/scala/server/KafkaLocalServer.scala +++ b/src/test/scala/com/lightbend/kafka/scala/server/KafkaLocalServer.scala @@ -29,10 +29,11 @@ class KafkaLocalServer private (kafkaProperties: Properties, zooKeeperServer: Zo private var zkUtils : ZkUtils = ZkUtils.apply(s"localhost:${zooKeeperServer.getPort()}", DEFAULT_ZK_SESSION_TIMEOUT_MS, DEFAULT_ZK_CONNECTION_TIMEOUT_MS, false) - def start(): Unit = { + def start(): KafkaLocalServer = { broker = KafkaServerStartable.fromProps(kafkaProperties) broker.startup() + this } //scalastyle:off null diff --git a/src/test/scala/com/lightbend/kafka/scala/streams/KafkaStreamsTest.scala b/src/test/scala/com/lightbend/kafka/scala/streams/KafkaStreamsTest.scala index 356d135fe..a9ef0d61f 100644 --- a/src/test/scala/com/lightbend/kafka/scala/streams/KafkaStreamsTest.scala +++ b/src/test/scala/com/lightbend/kafka/scala/streams/KafkaStreamsTest.scala @@ -11,16 +11,14 @@ import com.lightbend.kafka.scala.server.{KafkaLocalServer, MessageListener, Mess import minitest.TestSuite import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization._ -import org.apache.kafka.streams.{KafkaStreams, KeyValue, StreamsConfig} +import org.apache.kafka.streams.{ KeyValue, StreamsConfig} import ImplicitConversions._ import com.typesafe.scalalogging.LazyLogging object KafkaStreamsTest extends TestSuite[KafkaLocalServer] with WordCountTestData with LazyLogging { override def setup(): KafkaLocalServer = { - val s = KafkaLocalServer(cleanOnStart = true, Some(localStateDir)) - s.start() - s + KafkaLocalServer(cleanOnStart = true, Some(localStateDir)).start() } override def tearDown(server: KafkaLocalServer): Unit = { @@ -57,8 +55,8 @@ object KafkaStreamsTest extends TestSuite[KafkaLocalServer] with WordCountTestDa wordCounts.toStream.to(outputTopic) - val streams = new KafkaStreams(builder.build(), streamsConfiguration) - streams.start() + val streams = KafkaStreamsS(builder, streamsConfiguration).start() + // // Step 2: Produce some input data to the input topic. diff --git a/src/test/scala/com/lightbend/kafka/scala/streams/ProbabilisticCountingScalaIntegrationTest.scala b/src/test/scala/com/lightbend/kafka/scala/streams/ProbabilisticCountingScalaIntegrationTest.scala index 10a2d4949..d865b811b 100644 --- a/src/test/scala/com/lightbend/kafka/scala/streams/ProbabilisticCountingScalaIntegrationTest.scala +++ b/src/test/scala/com/lightbend/kafka/scala/streams/ProbabilisticCountingScalaIntegrationTest.scala @@ -29,7 +29,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization._ import org.apache.kafka.streams.kstream.Transformer import org.apache.kafka.streams.processor.ProcessorContext -import org.apache.kafka.streams.{KafkaStreams, KeyValue, StreamsConfig} +import org.apache.kafka.streams.{ KeyValue, StreamsConfig} import ImplicitConversions._ import com.typesafe.scalalogging.LazyLogging @@ -70,9 +70,7 @@ object ProbabilisticCountingScalaIntegrationTest extends TestSuite[KafkaLocalSer with ProbabilisticCountingScalaIntegrationTestData { override def setup(): KafkaLocalServer = { - val s = KafkaLocalServer(cleanOnStart = true, Some(localStateDir)) - s.start() - s + KafkaLocalServer(cleanOnStart = true, Some(localStateDir)).start() } override def tearDown(server: KafkaLocalServer): Unit = { @@ -151,8 +149,7 @@ object ProbabilisticCountingScalaIntegrationTest extends TestSuite[KafkaLocalSer .transform(() => new ProbabilisticCounter, cmsStoreName) .to(outputTopic) - val streams: KafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration) - streams.start() + val streams: KafkaStreamsS = KafkaStreamsS(builder, streamsConfiguration).start() // Step 2: Publish some input text lines. val sender = MessageSender[String, String](brokers, classOf[StringSerializer].getName, classOf[StringSerializer].getName) diff --git a/src/test/scala/com/lightbend/kafka/scala/streams/PunctuateTest.scala b/src/test/scala/com/lightbend/kafka/scala/streams/PunctuateTest.scala index 3301ea60f..579700d96 100644 --- a/src/test/scala/com/lightbend/kafka/scala/streams/PunctuateTest.scala +++ b/src/test/scala/com/lightbend/kafka/scala/streams/PunctuateTest.scala @@ -11,7 +11,7 @@ import com.typesafe.scalalogging.LazyLogging import minitest.TestSuite import org.apache.kafka.common.serialization._ import org.apache.kafka.streams.processor.{AbstractProcessor, ProcessorContext, PunctuationType} -import org.apache.kafka.streams.{KafkaStreams, StreamsConfig, Topology} +import org.apache.kafka.streams.{ StreamsConfig, Topology} /** * This sample is using usage of punctuate, which is significantly changed in version 1.0 and @@ -23,9 +23,7 @@ import org.apache.kafka.streams.{KafkaStreams, StreamsConfig, Topology} object PunctuateTest extends TestSuite[KafkaLocalServer] with PunctuateTestData with LazyLogging { override def setup(): KafkaLocalServer = { - val s = KafkaLocalServer(cleanOnStart = true, Some(localStateDir)) - s.start() - s + KafkaLocalServer(cleanOnStart = true, Some(localStateDir)).start() } override def tearDown(server: KafkaLocalServer): Unit = { @@ -53,8 +51,7 @@ object PunctuateTest extends TestSuite[KafkaLocalServer] with PunctuateTestData topology.addSource("data", inputTopic) // Processors topology.addProcessor("data processor", () => new SampleProcessor(5000), "data") - val streams = new KafkaStreams(topology, streamsConfiguration) - streams.start() + val streams = KafkaStreamsS(topology, streamsConfiguration).start() // Allpw time for the streams to start up Thread.sleep(5000L) diff --git a/src/test/scala/com/lightbend/kafka/scala/streams/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala b/src/test/scala/com/lightbend/kafka/scala/streams/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala index 34ba96731..dbd5d9c1b 100644 --- a/src/test/scala/com/lightbend/kafka/scala/streams/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala +++ b/src/test/scala/com/lightbend/kafka/scala/streams/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala @@ -47,9 +47,7 @@ object StreamToTableJoinScalaIntegrationTestImplicitSerdes extends TestSuite[Kaf with StreamToTableJoinTestData with LazyLogging { override def setup(): KafkaLocalServer = { - val s = KafkaLocalServer(cleanOnStart = true, Some(localStateDir)) - s.start() - s + KafkaLocalServer(cleanOnStart = true, Some(localStateDir)).start() } override def tearDown(server: KafkaLocalServer): Unit = { @@ -106,21 +104,19 @@ object StreamToTableJoinScalaIntegrationTestImplicitSerdes extends TestSuite[Kaf // Write the (continuously updating) results to the output topic. clicksPerRegion.toStream.to(outputTopic) - val streams: KafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration) + val streams: KafkaStreamsS = KafkaStreamsS(builder, streamsConfiguration) - streams.setUncaughtExceptionHandler((_: Thread, e: Throwable) => try { + streams.withUncaughtExceptionHandler((_: Thread, e: Throwable) => try { logger.error(s"Stream terminated because of uncaught exception .. Shutting down app", e) e.printStackTrace() - val closed: Unit = streams.close() + val closed = streams.close() logger.info(s"Exiting application after streams close ($closed)") } catch { case x: Exception => x.printStackTrace() } finally { logger.debug("Exiting application ..") System.exit(-1) - }) - - streams.start() + }).start() // // Step 2: Publish user-region information. diff --git a/src/test/scala/com/lightbend/kafka/scala/streams/StreamToTableJoinScalaIntegrationTestImplicitSerdesWithAvro.scala b/src/test/scala/com/lightbend/kafka/scala/streams/StreamToTableJoinScalaIntegrationTestImplicitSerdesWithAvro.scala index 805b0ba57..e56250159 100644 --- a/src/test/scala/com/lightbend/kafka/scala/streams/StreamToTableJoinScalaIntegrationTestImplicitSerdesWithAvro.scala +++ b/src/test/scala/com/lightbend/kafka/scala/streams/StreamToTableJoinScalaIntegrationTestImplicitSerdesWithAvro.scala @@ -30,9 +30,10 @@ import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization._ import org.apache.kafka.streams._ import ImplicitConversions._ +import com.typesafe.scalalogging.LazyLogging object StreamToTableJoinScalaIntegrationTestImplicitSerdesWithAvro - extends TestSuite[KafkaLocalServer] with StreamToTableJoinTestData { + extends TestSuite[KafkaLocalServer] with StreamToTableJoinTestData with LazyLogging{ case class UserClicks(clicks: Long) @@ -82,9 +83,7 @@ object StreamToTableJoinScalaIntegrationTestImplicitSerdesWithAvro override def setup(): KafkaLocalServer = { - val s = KafkaLocalServer(true, Some(localStateDir)) - s.start() - s + KafkaLocalServer(true, Some(localStateDir)).start() } override def tearDown(server: KafkaLocalServer): Unit = { @@ -141,28 +140,21 @@ object StreamToTableJoinScalaIntegrationTestImplicitSerdesWithAvro // Write the (continuously updating) results to the output topic. clicksPerRegion.toStream.to(outputTopic) - val streams: KafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration) - - streams - .setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { + val streams: KafkaStreamsS = KafkaStreamsS(builder.build(), streamsConfiguration) + streams.withUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { override def uncaughtException(t: Thread, e: Throwable): Unit = try { - println( + logger.error( s"Stream terminated because of uncaught exception .. Shutting " + - s"down app", - - e) - e.printStackTrace + s"down app", e) val closed = streams.close() - println(s"Exiting application after streams close ($closed)") + logger.debug(s"Exiting application after streams close ($closed)") } catch { case x: Exception => x.printStackTrace } finally { - println("Exiting application ..") + logger.debug("Exiting application ..") System.exit(-1) } - }) - - streams.start() + }).start() // // Step 2: Publish user-region information. From b12d426a4f0300df492946d6169d5d80fa5eb5e6 Mon Sep 17 00:00:00 2001 From: ouertani Date: Tue, 3 Apr 2018 10:42:29 +0300 Subject: [PATCH 2/6] update travis scala version to 2.12.5 --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 11d1ea110..20771af53 100644 --- a/.travis.yml +++ b/.travis.yml @@ -3,7 +3,7 @@ sudo: false jdk: oraclejdk8 scala: - 2.11.11 -- 2.12.4 +- 2.12.5 sbt_args: -mem 2000 script: - sbt "++ ${TRAVIS_SCALA_VERSION}!" test From 9f80c65a98da6d060f803223fdd6752344537ed8 Mon Sep 17 00:00:00 2001 From: ouertani Date: Thu, 3 May 2018 14:49:20 +0300 Subject: [PATCH 3/6] scalafmt format --- ...ntegrationTestImplicitSerdesWithAvro.scala | 41 ++++++++++--------- 1 file changed, 22 insertions(+), 19 deletions(-) diff --git a/src/test/scala/com/lightbend/kafka/scala/streams/StreamToTableJoinScalaIntegrationTestImplicitSerdesWithAvro.scala b/src/test/scala/com/lightbend/kafka/scala/streams/StreamToTableJoinScalaIntegrationTestImplicitSerdesWithAvro.scala index 7dba75c94..34a5a4fb7 100644 --- a/src/test/scala/com/lightbend/kafka/scala/streams/StreamToTableJoinScalaIntegrationTestImplicitSerdesWithAvro.scala +++ b/src/test/scala/com/lightbend/kafka/scala/streams/StreamToTableJoinScalaIntegrationTestImplicitSerdesWithAvro.scala @@ -32,8 +32,9 @@ import ImplicitConversions._ import com.typesafe.scalalogging.LazyLogging object StreamToTableJoinScalaIntegrationTestImplicitSerdesWithAvro - extends TestSuite[KafkaLocalServer] with StreamToTableJoinTestData with LazyLogging{ - + extends TestSuite[KafkaLocalServer] + with StreamToTableJoinTestData + with LazyLogging { case class UserClicks(clicks: Long) @@ -79,9 +80,8 @@ object StreamToTableJoinScalaIntegrationTestImplicitSerdesWithAvro * must be `static` and `public`) to a workaround combination of `@Rule * def` and a `private val`. */ - override def setup(): KafkaLocalServer = { + override def setup(): KafkaLocalServer = KafkaLocalServer(true, Some(localStateDir)).start() - } override def tearDown(server: KafkaLocalServer): Unit = server.stop() @@ -137,21 +137,24 @@ object StreamToTableJoinScalaIntegrationTestImplicitSerdesWithAvro // Write the (continuously updating) results to the output topic. clicksPerRegion.toStream.to(outputTopic) - val streams: KafkaStreamsS = KafkaStreamsS(builder.build(), streamsConfiguration) - streams.withUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { - override def uncaughtException(t: Thread, e: Throwable): Unit = try { - logger.error( - s"Stream terminated because of uncaught exception .. Shutting " + - s"down app", e) - val closed = streams.close() - logger.debug(s"Exiting application after streams close ($closed)") - } catch { - case x: Exception => x.printStackTrace - } finally { - logger.debug("Exiting application ..") - System.exit(-1) - } - }).start() + val streams: KafkaStreamsS = KafkaStreamsS(builder.build(), streamsConfiguration) + streams + .withUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { + override def uncaughtException(t: Thread, e: Throwable): Unit = + try { + logger.error(s"Stream terminated because of uncaught exception .. Shutting " + + s"down app", + e) + val closed = streams.close() + logger.debug(s"Exiting application after streams close ($closed)") + } catch { + case x: Exception => x.printStackTrace + } finally { + logger.debug("Exiting application ..") + System.exit(-1) + } + }) + .start() // // Step 2: Publish user-region information. From b866befa59f5271bd2e087e2f262505a461d8a75 Mon Sep 17 00:00:00 2001 From: ouertani Date: Thu, 3 May 2018 14:56:34 +0300 Subject: [PATCH 4/6] scalafmt format --- .../kafka/scala/streams/KafkaStreamsS.scala | 33 +++++++------------ 1 file changed, 12 insertions(+), 21 deletions(-) diff --git a/src/main/scala/com/lightbend/kafka/scala/streams/KafkaStreamsS.scala b/src/main/scala/com/lightbend/kafka/scala/streams/KafkaStreamsS.scala index 17971e741..f77f50bb0 100644 --- a/src/main/scala/com/lightbend/kafka/scala/streams/KafkaStreamsS.scala +++ b/src/main/scala/com/lightbend/kafka/scala/streams/KafkaStreamsS.scala @@ -11,42 +11,34 @@ import scala.collection.JavaConverters._ class KafkaStreamsS(inner: KafkaStreams) { - def allMetadata(): Iterable[StreamsMetadata] = { + def allMetadata(): Iterable[StreamsMetadata] = inner.allMetadata().asScala - } - def allMetadataForStore(storeName: String): Iterable[StreamsMetadata] = { + def allMetadataForStore(storeName: String): Iterable[StreamsMetadata] = inner.allMetadataForStore(storeName).asScala - } def cleanUp() = { inner.cleanUp() this } - def close() = { + def close() = inner.close() - } - def close(timeout: Long, timeUnit: java.util.concurrent.TimeUnit) = { + def close(timeout: Long, timeUnit: java.util.concurrent.TimeUnit) = inner.close(timeout, timeUnit) - } - def localThreadsMetadata(): Set[ThreadMetadata] = { + def localThreadsMetadata(): Set[ThreadMetadata] = inner.localThreadsMetadata.asScala.toSet - } - def metadataForKey[K](storeName: String, key: K, keySerializer: Serializer[K]): StreamsMetadata = { + def metadataForKey[K](storeName: String, key: K, keySerializer: Serializer[K]): StreamsMetadata = inner.metadataForKey(storeName, key, keySerializer) - } - def metadataForKey[K](storeName: String, key: K, partitioner: StreamPartitioner[_ >: K, _]): StreamsMetadata = { + def metadataForKey[K](storeName: String, key: K, partitioner: StreamPartitioner[_ >: K, _]): StreamsMetadata = inner.metadataForKey(storeName, key, partitioner) - } - def metrics(): Map[MetricName, _ <: Metric] = { + def metrics(): Map[MetricName, _ <: Metric] = inner.metrics().asScala.toMap - } def withGlobalStateRestoreListener(globalStateRestoreListener: StateRestoreListener) = { inner.setGlobalStateRestoreListener(globalStateRestoreListener) @@ -68,13 +60,11 @@ class KafkaStreamsS(inner: KafkaStreams) { this } - def state(): KafkaStreams.State = { + def state(): KafkaStreams.State = inner.state() - } - def store[T](storeName: String, queryableStoreType: QueryableStoreType[T]) = { + def store[T](storeName: String, queryableStoreType: QueryableStoreType[T]) = inner.store(storeName, queryableStoreType) - } } object KafkaStreamsS { @@ -84,6 +74,7 @@ object KafkaStreamsS { def apply(topology: Topology, config: StreamsConfig) = new KafkaStreamsS(new KafkaStreams(topology, config)) - def apply(topology: Topology, config: StreamsConfig, clientSupplier: KafkaClientSupplier) = new KafkaStreamsS(new KafkaStreams(topology, config, clientSupplier)) + def apply(topology: Topology, config: StreamsConfig, clientSupplier: KafkaClientSupplier) = + new KafkaStreamsS(new KafkaStreams(topology, config, clientSupplier)) } From a92686594f47900ec12132283b4d6365778de7b3 Mon Sep 17 00:00:00 2001 From: ouertani Date: Thu, 3 May 2018 14:59:53 +0300 Subject: [PATCH 5/6] scalafmt format --- .../lightbend/kafka/scala/streams/KafkaStreamsTest.scala | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/test/scala/com/lightbend/kafka/scala/streams/KafkaStreamsTest.scala b/src/test/scala/com/lightbend/kafka/scala/streams/KafkaStreamsTest.scala index def1e7d1b..bff177453 100644 --- a/src/test/scala/com/lightbend/kafka/scala/streams/KafkaStreamsTest.scala +++ b/src/test/scala/com/lightbend/kafka/scala/streams/KafkaStreamsTest.scala @@ -10,15 +10,14 @@ import com.lightbend.kafka.scala.server.{KafkaLocalServer, MessageListener, Mess import minitest.TestSuite import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization._ -import org.apache.kafka.streams.{ KeyValue, StreamsConfig} +import org.apache.kafka.streams.{KeyValue, StreamsConfig} import ImplicitConversions._ import com.typesafe.scalalogging.LazyLogging object KafkaStreamsTest extends TestSuite[KafkaLocalServer] with WordCountTestData with LazyLogging { - override def setup(): KafkaLocalServer = { + override def setup(): KafkaLocalServer = KafkaLocalServer(cleanOnStart = true, Some(localStateDir)).start() - } override def tearDown(server: KafkaLocalServer): Unit = server.stop() @@ -53,8 +52,7 @@ object KafkaStreamsTest extends TestSuite[KafkaLocalServer] with WordCountTestDa wordCounts.toStream.to(outputTopic) - val streams = KafkaStreamsS(builder, streamsConfiguration).start() - + val streams = KafkaStreamsS(builder, streamsConfiguration).start() // // Step 2: Produce some input data to the input topic. From 8cf1c959d00073a4b3b016fac78d1086c21eeedf Mon Sep 17 00:00:00 2001 From: ouertani Date: Thu, 3 May 2018 15:09:37 +0300 Subject: [PATCH 6/6] scalafmt format --- ...bilisticCountingScalaIntegrationTest.scala | 5 ++- .../kafka/scala/streams/PunctuateTest.scala | 7 ++-- ...inScalaIntegrationTestImplicitSerdes.scala | 34 +++++++++++-------- 3 files changed, 24 insertions(+), 22 deletions(-) diff --git a/src/test/scala/com/lightbend/kafka/scala/streams/ProbabilisticCountingScalaIntegrationTest.scala b/src/test/scala/com/lightbend/kafka/scala/streams/ProbabilisticCountingScalaIntegrationTest.scala index c52a1b952..66a8e2d12 100644 --- a/src/test/scala/com/lightbend/kafka/scala/streams/ProbabilisticCountingScalaIntegrationTest.scala +++ b/src/test/scala/com/lightbend/kafka/scala/streams/ProbabilisticCountingScalaIntegrationTest.scala @@ -28,7 +28,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization._ import org.apache.kafka.streams.kstream.Transformer import org.apache.kafka.streams.processor.ProcessorContext -import org.apache.kafka.streams.{ KeyValue, StreamsConfig} +import org.apache.kafka.streams.{KeyValue, StreamsConfig} import ImplicitConversions._ import com.typesafe.scalalogging.LazyLogging @@ -69,9 +69,8 @@ object ProbabilisticCountingScalaIntegrationTest extends TestSuite[KafkaLocalServer] with ProbabilisticCountingScalaIntegrationTestData { - override def setup(): KafkaLocalServer = { + override def setup(): KafkaLocalServer = KafkaLocalServer(cleanOnStart = true, Some(localStateDir)).start() - } override def tearDown(server: KafkaLocalServer): Unit = server.stop() diff --git a/src/test/scala/com/lightbend/kafka/scala/streams/PunctuateTest.scala b/src/test/scala/com/lightbend/kafka/scala/streams/PunctuateTest.scala index 2034205ac..0d2da9438 100644 --- a/src/test/scala/com/lightbend/kafka/scala/streams/PunctuateTest.scala +++ b/src/test/scala/com/lightbend/kafka/scala/streams/PunctuateTest.scala @@ -10,7 +10,7 @@ import com.typesafe.scalalogging.LazyLogging import minitest.TestSuite import org.apache.kafka.common.serialization._ import org.apache.kafka.streams.processor.{AbstractProcessor, ProcessorContext, PunctuationType} -import org.apache.kafka.streams.{ StreamsConfig, Topology} +import org.apache.kafka.streams.{StreamsConfig, Topology} /** * This sample is using usage of punctuate, which is significantly changed in version 1.0 and @@ -20,9 +20,8 @@ import org.apache.kafka.streams.{ StreamsConfig, Topology} */ object PunctuateTest extends TestSuite[KafkaLocalServer] with PunctuateTestData with LazyLogging { - override def setup(): KafkaLocalServer = { + override def setup(): KafkaLocalServer = KafkaLocalServer(cleanOnStart = true, Some(localStateDir)).start() - } override def tearDown(server: KafkaLocalServer): Unit = server.stop() @@ -47,7 +46,7 @@ object PunctuateTest extends TestSuite[KafkaLocalServer] with PunctuateTestData topology.addSource("data", inputTopic) // Processors topology.addProcessor("data processor", () => new SampleProcessor(5000), "data") - val streams = KafkaStreamsS(topology, streamsConfiguration).start() + val streams = KafkaStreamsS(topology, streamsConfiguration).start() // Allpw time for the streams to start up Thread.sleep(5000L) diff --git a/src/test/scala/com/lightbend/kafka/scala/streams/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala b/src/test/scala/com/lightbend/kafka/scala/streams/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala index 81e44a244..e3ea0e3b3 100644 --- a/src/test/scala/com/lightbend/kafka/scala/streams/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala +++ b/src/test/scala/com/lightbend/kafka/scala/streams/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala @@ -46,9 +46,8 @@ object StreamToTableJoinScalaIntegrationTestImplicitSerdes with StreamToTableJoinTestData with LazyLogging { - override def setup(): KafkaLocalServer = { + override def setup(): KafkaLocalServer = KafkaLocalServer(cleanOnStart = true, Some(localStateDir)).start() - } override def tearDown(server: KafkaLocalServer): Unit = server.stop() @@ -104,19 +103,24 @@ object StreamToTableJoinScalaIntegrationTestImplicitSerdes // Write the (continuously updating) results to the output topic. clicksPerRegion.toStream.to(outputTopic) - val streams: KafkaStreamsS = KafkaStreamsS(builder, streamsConfiguration) - - streams.withUncaughtExceptionHandler((_: Thread, e: Throwable) => try { - logger.error(s"Stream terminated because of uncaught exception .. Shutting down app", e) - e.printStackTrace() - val closed = streams.close() - logger.info(s"Exiting application after streams close ($closed)") - } catch { - case x: Exception => x.printStackTrace() - } finally { - logger.debug("Exiting application ..") - System.exit(-1) - }).start() + val streams: KafkaStreamsS = KafkaStreamsS(builder, streamsConfiguration) + + streams + .withUncaughtExceptionHandler( + (_: Thread, e: Throwable) => + try { + logger.error(s"Stream terminated because of uncaught exception .. Shutting down app", e) + e.printStackTrace() + val closed = streams.close() + logger.info(s"Exiting application after streams close ($closed)") + } catch { + case x: Exception => x.printStackTrace() + } finally { + logger.debug("Exiting application ..") + System.exit(-1) + } + ) + .start() // // Step 2: Publish user-region information.