Skip to content

Commit

Permalink
Merge pull request #713 from Banno/avro4s-split
Browse files Browse the repository at this point in the history
feat!: split Avro4s functionality into its own project
  • Loading branch information
Kazark authored Jan 25, 2023
2 parents 022047f + 8120d2a commit faf3a94
Show file tree
Hide file tree
Showing 24 changed files with 562 additions and 321 deletions.
39 changes: 39 additions & 0 deletions avro4s/src/main/scala/Avro4sConsumer.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright 2019 Jack Henry & Associates, Inc.®
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.banno.kafka.avro4s

import cats.*
import cats.effect.*
import cats.syntax.all.*
import com.banno.kafka.consumer.*
import com.sksamuel.avro4s.FromRecord
import org.apache.avro.generic.GenericRecord

object Avro4sConsumer {
def apply[F[_]: Functor, K, V](
c: ConsumerApi[F, GenericRecord, GenericRecord]
)(implicit
kfr: FromRecord[K],
vfr: FromRecord[V],
): ConsumerApi[F, K, V] =
c.bimap(kfr.from, vfr.from)

def resource[F[_]: Async, K: FromRecord, V: FromRecord](
configs: (String, AnyRef)*
): Resource[F, ConsumerApi[F, K, V]] =
ConsumerApi.Avro.Generic.resource[F](configs: _*).map(Avro4sConsumer(_))
}
37 changes: 37 additions & 0 deletions avro4s/src/main/scala/Avro4sProducer.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2019 Jack Henry & Associates, Inc.®
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.banno.kafka.avro4s

import cats.effect.*
import com.banno.kafka.producer.*
import com.sksamuel.avro4s.ToRecord
import org.apache.avro.generic.GenericRecord

object Avro4sProducer {
def apply[F[_], K, V](
p: ProducerApi[F, GenericRecord, GenericRecord]
)(implicit
ktr: ToRecord[K],
vtr: ToRecord[V],
): ProducerApi[F, K, V] =
p.contrabimap(ktr.to, vtr.to)

def resource[F[_]: Async, K: ToRecord, V: ToRecord](
configs: (String, AnyRef)*
): Resource[F, ProducerApi[F, K, V]] =
ProducerApi.Avro.Generic.resource[F](configs: _*).map(Avro4sProducer(_))
}
45 changes: 45 additions & 0 deletions avro4s/src/main/scala/SchemaObjectAvro4sOps.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 2019 Jack Henry & Associates, Inc.®
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.banno.kafka
package avro4s

import com.sksamuel.avro4s.*
import org.apache.avro.{Schema as JSchema}
import org.apache.avro.generic.GenericRecord
import scala.util.*

object SchemaObjectAvro4sOps {
private def fromGeneric[A](
gr: GenericRecord
)(implicit FR: FromRecord[A]): Try[A] =
Try(FR.from(gr))

private def toGeneric[A](
x: A
)(implicit TR: ToRecord[A]): GenericRecord =
TR.to(x)

private def schema[A](implicit SF: SchemaFor[A]): JSchema =
SF.schema(DefaultFieldMapper)

def apply[A: FromRecord: ToRecord: SchemaFor]: Schema[A] =
Schema(
schema,
fromGeneric(_),
toGeneric(_),
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,26 +14,25 @@
* limitations under the License.
*/

package com.banno.kafka.schemaregistry
package com.banno.kafka.avro4s

import org.apache.avro.Schema
import com.sksamuel.avro4s.{DefaultFieldMapper, SchemaFor}
import cats.FlatMap
import cats.syntax.all._

case class SchemaRegistryOps[F[_]](registry: SchemaRegistryApi[F]) {

def keySubject(topic: String): String = topic + "-key"
def valueSubject(topic: String): String = topic + "-value"
import cats.*
import cats.effect.*
import cats.syntax.all.*
import com.banno.kafka.schemaregistry.*
import com.sksamuel.avro4s.*

final class SchemaRegistryAvro4sOps[F[_]](
private val registry: SchemaRegistryApi[F]
) extends AnyVal {
def register[A](subject: String)(implicit SF: SchemaFor[A]): F[Int] =
registry.register(subject, SF.schema(DefaultFieldMapper).asParsedSchema)

def registerKey[K: SchemaFor](topic: String): F[Int] =
register[K](keySubject(topic))
register[K](registry.keySubject(topic))

def registerValue[V: SchemaFor](topic: String): F[Int] =
register[V](valueSubject(topic))
register[V](registry.valueSubject(topic))

def register[K: SchemaFor, V: SchemaFor](
topic: String
Expand All @@ -43,17 +42,14 @@ case class SchemaRegistryOps[F[_]](registry: SchemaRegistryApi[F]) {
v <- registerValue[V](topic)
} yield (k, v)

def isCompatible(subject: String, schema: Schema): F[Boolean] =
registry.testCompatibility(subject, schema.asParsedSchema)

def isCompatible[A](subject: String)(implicit SF: SchemaFor[A]): F[Boolean] =
isCompatible(subject, SF.schema(DefaultFieldMapper))
registry.isCompatible(subject, SF.schema(DefaultFieldMapper))

def isKeyCompatible[K: SchemaFor](topic: String): F[Boolean] =
isCompatible[K](keySubject(topic))
isCompatible[K](registry.keySubject(topic))

def isValueCompatible[V: SchemaFor](topic: String): F[Boolean] =
isCompatible[V](valueSubject(topic))
isCompatible[V](registry.valueSubject(topic))

def isCompatible[K: SchemaFor, V: SchemaFor](
topic: String
Expand All @@ -62,5 +58,17 @@ case class SchemaRegistryOps[F[_]](registry: SchemaRegistryApi[F]) {
k <- isKeyCompatible[K](topic)
v <- isValueCompatible[V](topic)
} yield (k, v)
}

object SchemaRegistryApiObjectAvro4sOps {
def register[F[_]: Sync, K: SchemaFor, V: SchemaFor](
baseUrl: String,
topic: String,
configs: Map[String, Object] = Map.empty,
) =
for {
schemaRegistry <- SchemaRegistryApi(baseUrl, configs)
k <- schemaRegistry.avro4s.registerKey[K](topic)
v <- schemaRegistry.avro4s.registerValue[V](topic)
} yield (k, v)
}
50 changes: 50 additions & 0 deletions avro4s/src/main/scala/TopicObjectAvro4sOps.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright 2019 Jack Henry & Associates, Inc.®
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.banno.kafka
package avro4s

import com.sksamuel.avro4s.*

object TopicObjectAvro4sOps {
def apply[
K: FromRecord: ToRecord: SchemaFor,
V: FromRecord: ToRecord: SchemaFor,
](
topic: String,
topicPurpose: TopicPurpose,
): Topic[K, V] =
Topic(
topic,
topicPurpose,
Schema.avro4s[K],
Schema.avro4s[V],
)

def builder[
K: FromRecord: ToRecord: SchemaFor,
V: FromRecord: ToRecord: SchemaFor,
](
topic: String,
topicPurpose: TopicPurpose,
): Topic.Builder[K, V] =
Topic.builder(
topic,
topicPurpose,
Schema.avro4s[K],
Schema.avro4s[V],
)
}
83 changes: 83 additions & 0 deletions avro4s/src/main/scala/package.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright 2019 Jack Henry & Associates, Inc.®
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.banno.kafka

import com.banno.kafka.producer.*
import com.banno.kafka.schemaregistry.*
import com.sksamuel.avro4s.*
import org.apache.avro.generic.GenericRecord
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.producer.ProducerRecord

package object avro4s {
implicit final class SchemaRegistryAvro4sOpsOps[F[_]](
private val r: SchemaRegistryApi[F]
) extends AnyVal {
def avro4s: SchemaRegistryAvro4sOps[F] =
new SchemaRegistryAvro4sOps(r)
}

implicit final class SchemaRegistryObjectAvro4sOpsOps(
private val x: SchemaRegistryApi.type
) extends AnyVal {
def avro4s = SchemaRegistryApiObjectAvro4sOps
}

implicit final class TopicObjectAvro4sOpsOps(
private val x: Topic.type
) extends AnyVal {
def avro4s = TopicObjectAvro4sOps
}

implicit final class SchemaObjectAvro4sOpsOps(
private val x: Schema.type
) extends AnyVal {
def avro4s = SchemaObjectAvro4sOps
}

implicit final class GenericProducerAvro4sOps[F[_]](
private val producer: ProducerApi[F, GenericRecord, GenericRecord]
) extends AnyVal {
def toAvro4s[K: ToRecord, V: ToRecord]: ProducerApi[F, K, V] =
Avro4sProducer[F, K, V](producer)
}

implicit final class GenericConsumerRecordAvro4sOps(
private val cr: ConsumerRecord[GenericRecord, GenericRecord]
) extends AnyVal {
def maybeKeyAs[K](implicit kfr: FromRecord[K]): Option[K] =
cr.maybeKey.map(kfr.from)
def maybeValueAs[V](implicit vfr: FromRecord[V]): Option[V] =
cr.maybeValue.map(vfr.from)

// note that these will probably throw NPE if key/value is null
def keyAs[K](implicit kfr: FromRecord[K]): K = kfr.from(cr.key)
def valueAs[V](implicit vfr: FromRecord[V]): V = vfr.from(cr.value)
}

implicit final class ProducerRecordAvro4sOps[K, V](
private val pr: ProducerRecord[K, V]
) extends AnyVal {

/** This only works when both key and value are non-null. */
def toGenericRecord(implicit
ktr: ToRecord[K],
vtr: ToRecord[V],
): ProducerRecord[GenericRecord, GenericRecord] =
pr.bimap(ktr.to, vtr.to)
}
}
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,25 @@

package com.banno.kafka

import org.scalacheck._
import cats.syntax.all._
import cats.effect._
import org.apache.kafka.clients.producer.ProducerRecord

import scala.concurrent.duration._
import com.banno.kafka.producer._
import com.banno.kafka.consumer._
import fs2._

import scala.util.Random
import org.scalacheck.magnolia._
import cats.effect.*
import cats.syntax.all.*
import com.banno.kafka.avro4s.*
import com.banno.kafka.consumer.*
import com.banno.kafka.producer.*
import com.sksamuel.avro4s.RecordFormat
import org.apache.kafka.common.TopicPartition
import org.scalatestplus.scalacheck._

import scala.jdk.CollectionConverters._
import fs2.*
import java.util.ConcurrentModificationException

import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition
import org.scalacheck.*
import org.scalacheck.magnolia.*
import org.scalatest.EitherValues
import org.scalatest.matchers.should.Matchers
import org.scalatest.propspec.AnyPropSpec
import org.scalatestplus.scalacheck.*
import scala.concurrent.duration.*
import scala.jdk.CollectionConverters.*
import scala.util.Random

class ConsumerAndProducerApiSpec
extends AnyPropSpec
Expand Down Expand Up @@ -312,13 +309,13 @@ class ConsumerAndProducerApiSpec
forAll { values: Vector[(PersonId, Person2)] =>
val actual = (for {
p <- Stream.resource(
ProducerApi.Avro4s.resource[IO, PersonId, Person2](
Avro4sProducer.resource[IO, PersonId, Person2](
BootstrapServers(bootstrapServer),
SchemaRegistryUrl(schemaRegistryUrl),
)
)
c <- Stream.resource(
ConsumerApi.Avro4s.resource[IO, PersonId, Person2](
Avro4sConsumer.resource[IO, PersonId, Person2](
BootstrapServers(bootstrapServer),
GroupId(groupId),
AutoOffsetReset.earliest,
Expand Down
Loading

0 comments on commit faf3a94

Please sign in to comment.