From 4e5a3e1bdad845cac8c9ff318ef58e052ff2650f Mon Sep 17 00:00:00 2001 From: Flavian Alexandru Date: Tue, 13 Dec 2016 19:25:58 +0000 Subject: [PATCH] Adding the ability to map records lazily and paginate them (#572) * Adding the ability to map records lazily and paginate them * Fixing compilation errors. * Adding logging configuration * Fixed type on provider instance variable (#581) * TravisCI: Bumping version to match CI definition [ci skip] * Adding a 2.0 roadmap description [ci skip] * Adding more details about 2.0.0 release [ci skip] * Add support for Jdk8 LocalDateTime (#593) * TravisCI: Bumping version to match CI definition [ci skip] * Updating version to correct setup of 1.29.5 [ci skip] (#594) * Adding missing comment * Removing snapshot resolver * Reverting macro changeset accidentally leaked (#596) * Removing macros, re-adding correct dep, removing custom resolver. * Small adjustments * Fixing the 2 broken tests. * Fixing generators * Fixing compilation error * Adding table name variabe to prevent issues * Adding an error handler. * Adding a dual variant test * Adding more safe guards * Using macro type comparison symbol instead of equality checking * Removing redundant conversions * Fixing indenation [ci skip] --- README.md | 67 +++++++ build.sbt | 29 +-- .../phantom/connectors/KeySpaceBuilder.scala | 12 +- .../phantom/connectors/Keyspace.scala | 4 +- .../builder/query/ExecutableQuery.scala | 188 ++++++++++++++---- .../phantom/database/Database.scala | 5 +- .../phantom/macros/TableHelper.scala | 22 +- .../SelectQuerySerialisationTest.scala | 2 +- .../phantom/tables/PrimitivesJoda.scala | 2 +- .../phantom/jdk8/DefaultJava8Primitives.scala | 25 +++ .../outworkers/phantom/jdk8/dsl/package.scala | 13 ++ .../phantom/jdk8/OptionalPrimitivesJdk8.scala | 15 +- .../jdk8/tables/ConcretePrimitivesJdk8.scala | 15 +- .../phantom/jdk8/tables/package.scala | 8 +- .../phantom/reactivestreams/package.scala | 25 ++- .../src/test/logback-test.xml | 19 ++ .../suites/PublisherIntegrationTest.scala | 2 +- .../suites/iteratee/IteratorTest.scala | 29 ++- project/plugins.sbt | 3 +- 19 files changed, 374 insertions(+), 111 deletions(-) create mode 100644 phantom-reactivestreams/src/test/logback-test.xml diff --git a/README.md b/README.md index 314e8c9e1..47dabc4ce 100644 --- a/README.md +++ b/README.md @@ -128,6 +128,73 @@ Intermediary releases of phantom 2.0.x are already available via `Resolver.bintr - [ ] Add a documentation website on the main page. - [ ] Create a navigator that allows viewing the documentation at a particular point in time. +### Roadmap to Phantom 2.0.0 + +With the rapidly evolving requirements, Cassansdra releases, and competition, it was only natural we kept Phantom up to scratch. In line with a lot of user feedback, the priorities of 2.0.0 were: + +- Go back to the flexible licensing model everyone knows and loves(especially your legal department). No one wants to go through corporate litigation and licensing compliance to a `build.sbt` dependency, and if you've ever worked in a bank we all know it's not happening. + +- Phantom was a really fun time saving introduction years ago when it was first introduced, but since then Scala has evolved to a point where many features of more esoteric components, such as the macro API, have reached a degree of stability that we can now exploit to our great advantage: **boilerplate elimitation**. + +- From type parameters to keys, table class cake patterns, having to define `fromRow`, and a whole lot of other boilerplatey items, we have eliminated them one by one, reducing the amount of code you need to type to make it all work. The future looks even brighter, as we plan on fully eliminating the mapping DSL very shortly in favour of even more lightweight techniques. + +Phantom's next major release is slowly approaching completion, and if you would like to know more about what's to come and about what we have in store for you next, have a look at the below list. Feedback and contributions are welcome, and we are happy to prioritise any crucial features Phantom may currently be lacking. + +If the boxes are checked it means this is already complete on the gigantic [2.0.0 pull request](https://github.com/outworkers/phantom/pull/576). This is initial "in progres" [migration guide](https://github.com/outworkers/phantom/tree/feature/2.0.0#200-migration-guide) to Phantom 2.0.0 that should give you some more detail into the changes made. + +Intermediary releases of phantom 2.0.x are already available via `Resolver.bintrayRepo("outworkers", "oss-releases")` and the latest version is [![Bintray](https://api.bintray.com/packages/outworkers/oss-releases/phantom-dsl/images/download.svg) ](https://bintray.com/outworkers/oss-releases/phantom-dsl/_latestVersion). + +#### Licensing and distribution + +- [x] Revert all Outworkers projects and all their dependencies to the Apache V2 License. +- [x] Publish `outworkers-util` and all sub modules to Maven Central. +- [x] Publish `outworkers-diesel` and all sub modules to Maven Central. +- [x] Remove all non standard resolvers from Phantom, all dependencies should build from JCenter and Maven Central by default with no custom resolvers required. +- [x] Change all package names and resolvers to reflect our business name change from `Websudos` to `Outworkers`. +- [x] Create a `1.30.x` release that allows users to transition to a no custom resolver version of Phantom 1.0.x even before 2.0.0 is stable. + +#### Macro API to replace runtime features + +- [x] Replace the Scala reflection library with a macro that can figure out what the contents of a table are. +- [x] Generate the name of a table using macros. +- [x] Generate the primary key of a table using macros. +- [x] Enforce primary key restrictions on a table using a macro. +- [x] Generate the `fromRow` method of `CassandraTable` using a macro if the `case class` fields and `table` columns are matched. +- [ ] Enforce a same ordering restriction for case class fields and table columns to avoid generating invalid methods with the macro. +- [ ] Generate the `fromRow` if the fields match, they are in abitrary order, but there are no duplicate types. +- [ ] Allow arbitrary inheritance and usage patterns for Cassandra tables, and resolve inheritance resolutions with macros to correctly identify desired table structures. + +#### Tech debt + +- [ ] Correctly implement Cassandra pagination using iterators, currently setting a `fetchSize` on a query does not correctly propagate or consume the resulting iterator, which leads to API inconsistencies and `PagingState` not being set on any `ResultSet`. +- [ ] Add a build matrix that will test phantom against multiple versions of Cassandra in Travis for Scala 2.11, with support for all major releases of Cassandra. +- [ ] Bump code coverage up to 100% + +#### Features + +- [ ] Native support for multi-tenanted environments via cached sessions. +- [ ] Case sensitive CQL. +- [ ] Materialized views. +- [ ] SASI index support +- [ ] Support for `PER PARTITION LIMIT` in `SelectQuery`. +- [ ] Support for `GROUP BY` in `SelectQuery`. + +#### Scala 2.12 support + +- [ ] Add support for Scala 2.12 in the `util` library, remove all dependencies that don't comply. +- [x] Add support for Scala 2.12 in the `diesel-engine`. +- [ ] Add support for Scala 2.12 in `phantom-dsl` +- [ ] Add support for Scala 2.12 in `phantom-connectors` +- [ ] Add support for Scala 2.12 in `phantom-reactivestreams` +- [ ] Add support for Scala 2.12 in `phantom-finagle` + +#### Documentatiom + +- [ ] Offer a complete migration guide for transitioning to Phantom 2.0.0. [Guide here](https://github.com/outworkers/phantom/tree/feature/2.0.0#200-migration-guide). +- [ ] Move documentation back to the docs folder. +- [ ] Add a documentation website on the main page. +- [ ] Create a navigator that allows viewing the documentation at a particular point in time. + Tutorials on phantom and Cassandra ====================================================================== diff --git a/build.sbt b/build.sbt index fe7b602de..a1cc46315 100644 --- a/build.sbt +++ b/build.sbt @@ -19,15 +19,16 @@ import com.twitter.sbt._ lazy val Versions = new { val logback = "1.1.7" - val util = "0.23.1" + val util = "0.25.0" + val json4s = "3.3.0" val datastax = "3.1.0" val scalatest = "2.2.4" - val shapeless = "2.2.5" + val shapeless = "2.3.2" val thrift = "0.8.0" val finagle = "6.37.0" val twitterUtil = "6.34.0" val scalameter = "0.6" - val diesel = "0.4.1" + val diesel = "0.5.0" val scalacheck = "1.13.0" val slf4j = "1.7.21" val reactivestreams = "1.0.0" @@ -45,7 +46,7 @@ lazy val Versions = new { val lift: String => String = { s => CrossVersion.partialVersion(s) match { - case Some((major, minor)) if minor >= 11 => "3.0-RC3" + case Some((major, minor)) if minor >= 11 => "3.0" case _ => "3.0-M1" } } @@ -79,16 +80,8 @@ lazy val Versions = new { } } } - val defaultConcurrency = 4 -val scalaMacroDependencies: String => Seq[ModuleID] = { - s => CrossVersion.partialVersion(s) match { - case Some((major, minor)) if minor >= 11 => Seq.empty - case _ => Seq(compilerPlugin("org.scalamacros" % "paradise" % "2.1.0" cross CrossVersion.full)) - } -} - val PerformanceTest = config("perf").extend(Test) lazy val performanceFilter: String => Boolean = _.endsWith("PerformanceTest") @@ -102,8 +95,7 @@ val sharedSettings: Seq[Def.Setting[_]] = Defaults.coreDefaultSettings ++ Seq( "Twitter Repository" at "http://maven.twttr.com", Resolver.typesafeRepo("releases"), Resolver.sonatypeRepo("releases"), - Resolver.jcenterRepo, - Resolver.bintrayRepo("outworkers", "oss-releases") + Resolver.jcenterRepo ), scalacOptions ++= Seq( "-language:experimental.macros", @@ -182,17 +174,12 @@ lazy val phantomDsl = (project in file("phantom-dsl")).configs( concurrentRestrictions in Test := Seq( Tags.limit(Tags.ForkedTestGroup, defaultConcurrency) ), - unmanagedSourceDirectories in Compile ++= Seq( - (sourceDirectory in Compile).value / ("scala-2." + { - CrossVersion.partialVersion(scalaBinaryVersion.value) match { - case Some((major, minor)) => minor - } - })), libraryDependencies ++= Seq( "org.typelevel" %% "macro-compat" % "1.1.1", "org.scala-lang" % "scala-compiler" % scalaVersion.value % "provided", compilerPlugin("org.scalamacros" % "paradise" % "2.1.0" cross CrossVersion.full), - "com.outworkers" %% "diesel-engine" % Versions.diesel, + "org.scala-lang" % "scala-reflect" % scalaVersion.value, + "com.outworkers" %% "diesel-reflection" % Versions.diesel, "com.chuusai" %% "shapeless" % Versions.shapeless, "joda-time" % "joda-time" % "2.9.4", "org.joda" % "joda-convert" % "1.8.1", diff --git a/phantom-connectors/src/main/scala/com/outworkers/phantom/connectors/KeySpaceBuilder.scala b/phantom-connectors/src/main/scala/com/outworkers/phantom/connectors/KeySpaceBuilder.scala index 70022cf66..fd62a1a9a 100644 --- a/phantom-connectors/src/main/scala/com/outworkers/phantom/connectors/KeySpaceBuilder.scala +++ b/phantom-connectors/src/main/scala/com/outworkers/phantom/connectors/KeySpaceBuilder.scala @@ -40,9 +40,8 @@ class KeySpaceBuilder(clusterBuilder: ClusterBuilder) { * @return A new cluster builder, with the heartbeat interval set to 0(disabled). */ def noHeartbeat(): KeySpaceBuilder = { - new KeySpaceBuilder(clusterBuilder andThen(_.withPoolingOptions( - new PoolingOptions().setHeartbeatIntervalSeconds(0)) - ) + new KeySpaceBuilder(clusterBuilder andThen ( + _.withPoolingOptions(new PoolingOptions().setHeartbeatIntervalSeconds(0))) ) } @@ -56,7 +55,8 @@ class KeySpaceBuilder(clusterBuilder: ClusterBuilder) { def keySpace( name: String, autoinit: Boolean = true, - query: Option[(Session, KeySpace) => String] = None + query: Option[(Session, KeySpace) => String] = None, + errorHandler: Throwable => Throwable = identity ): CassandraConnection = { new CassandraConnection(name, clusterBuilder, autoinit, query) } @@ -74,6 +74,4 @@ class KeySpaceBuilder(clusterBuilder: ClusterBuilder) { query: (Session, KeySpace) => String ): CassandraConnection = { new CassandraConnection(name, clusterBuilder, true, Some(query)) - } - -} + }} diff --git a/phantom-connectors/src/main/scala/com/outworkers/phantom/connectors/Keyspace.scala b/phantom-connectors/src/main/scala/com/outworkers/phantom/connectors/Keyspace.scala index 05f39f791..adac3c00f 100644 --- a/phantom-connectors/src/main/scala/com/outworkers/phantom/connectors/Keyspace.scala +++ b/phantom-connectors/src/main/scala/com/outworkers/phantom/connectors/Keyspace.scala @@ -32,11 +32,11 @@ trait SessionAugmenter { protocolVersion.compareTo(pv) > 0 } - def v3orNewer : Boolean = isNewerThan(ProtocolVersion.V2) + def v3orNewer: Boolean = isNewerThan(ProtocolVersion.V2) def protocolConsistency: Boolean = isNewerThan(ProtocolVersion.V1) - def v4orNewer : Boolean = isNewerThan(ProtocolVersion.V3) + def v4orNewer: Boolean = isNewerThan(ProtocolVersion.V3) } trait SessionAugmenterImplicits { diff --git a/phantom-dsl/src/main/scala/com/outworkers/phantom/builder/query/ExecutableQuery.scala b/phantom-dsl/src/main/scala/com/outworkers/phantom/builder/query/ExecutableQuery.scala index e3e70e129..ba30562ca 100644 --- a/phantom-dsl/src/main/scala/com/outworkers/phantom/builder/query/ExecutableQuery.scala +++ b/phantom-dsl/src/main/scala/com/outworkers/phantom/builder/query/ExecutableQuery.scala @@ -15,8 +15,7 @@ */ package com.outworkers.phantom.builder.query -import java.util.{List => JavaList} - +import java.util.{List => JavaList, Iterator => JavaIterator} import com.datastax.driver.core._ import com.outworkers.phantom.CassandraTable import com.outworkers.phantom.builder.{LimitBound, Unlimited} @@ -34,6 +33,12 @@ trait RecordResult[R] { case class ListResult[R](records: List[R], result: ResultSet) extends RecordResult[R] +object ListResult { + def apply[R](res: ResultSet, records: List[R]): ListResult[R] = ListResult(records, res) + + def apply[R](rec: (ResultSet, List[R])): ListResult[R] = apply(rec._2, rec._1) +} + case class IteratorResult[R](records: Iterator[R], result: ResultSet) extends RecordResult[R] trait ExecutableStatement extends CassandraOperations { @@ -144,6 +149,10 @@ private[phantom] trait RootExecutableQuery[R] { protected[this] def directMapper(results: JavaList[Row]): List[R] = { List.tabulate(results.size())(index => fromRow(results.get(index))) } + + protected[this] def directMapper(results: JavaIterator[Row]): List[R] = { + results.asScala.map(fromRow).toList + } } /** @@ -158,6 +167,21 @@ trait ExecutableQuery[T <: CassandraTable[T, _], R, Limit <: LimitBound] def fromRow(r: Row): R + protected[this] def greedyEval( + f: ScalaFuture[ResultSet] + )(implicit ex: ExecutionContextExecutor): ScalaFuture[ListResult[R]] = { + f map { r => + val records = if (r.isFullyFetched) directMapper(r.all()) else directMapper(r.iterator()) + ListResult(records, r) + } + } + + protected[this] def lazyEval( + f: ScalaFuture[ResultSet] + )(implicit ex: ExecutionContextExecutor): ScalaFuture[IteratorResult[R]] = { + f map { r => IteratorResult(r.iterator().asScala.map(fromRow), r) } + } + private[phantom] def singleFetch()( implicit session: Session, keySpace: KeySpace, @@ -208,33 +232,32 @@ trait ExecutableQuery[T <: CassandraTable[T, _], R, Limit <: LimitBound] * @param ec The implicit Scala execution context. * @return A Scala future wrapping a list of mapped results. */ - def fetch(state: PagingState)( + def fetch(modifyStatement : Modifier)( implicit session: Session, keySpace: KeySpace, ec: ExecutionContextExecutor ): ScalaFuture[List[R]] = { - future(_.setPagingState(state)) map { - resultSet => directMapper(resultSet.all) - } + future(modifyStatement) map { resultSet => directMapper(resultSet.all) } } /** * Returns a parsed sequence of [R]ows - * This is not suitable for big results set + * This is not suitable for big results set as it will attempt to fetch the entire result set + * as a List, circumventing pagination settings. + * + * Use [[paginateRecord()]] or other means if you like to deal with bigger result sets. * * @param session The implicit session provided by a [[com.outworkers.phantom.connectors.Connector]]. * @param keySpace The implicit keySpace definition provided by a [[com.outworkers.phantom.connectors.Connector]]. * @param ec The implicit Scala execution context. * @return A Scala future wrapping a list of mapped results. */ - def fetch(modifyStatement : Modifier)( + def fetchRecord()( implicit session: Session, keySpace: KeySpace, ec: ExecutionContextExecutor - ): ScalaFuture[List[R]] = { - future(modifyStatement) map { - resultSet => directMapper(resultSet.all) - } + ): ScalaFuture[ListResult[R]] = { + future() map (resultSet => ListResult(directMapper(resultSet.all), resultSet)) } /** @@ -246,43 +269,53 @@ trait ExecutableQuery[T <: CassandraTable[T, _], R, Limit <: LimitBound] * @param ec The implicit Scala execution context. * @return A Scala future wrapping a list of mapped results. */ - def fetchRecord()( + def fetchRecord(modifyStatement: Modifier)( implicit session: Session, keySpace: KeySpace, ec: ExecutionContextExecutor ): ScalaFuture[ListResult[R]] = { - future() map (resultSet => ListResult(directMapper(resultSet.all), resultSet)) + future(modifyStatement) map { + set => ListResult(directMapper(set.all), set) + } } /** - * Returns a parsed sequence of [R]ows - * This is not suitable for big results set + * Returns a parsed sequence of [R]ows but paginates the results using paging state. + * This will not consume or return the entire set of available results, it will + * instead return an amount of records equal to the fetch size setting. * * @param session The implicit session provided by a [[com.outworkers.phantom.connectors.Connector]]. * @param keySpace The implicit keySpace definition provided by a [[com.outworkers.phantom.connectors.Connector]]. * @param ec The implicit Scala execution context. * @return A Scala future wrapping a list of mapped results. */ - def fetchRecord(state: PagingState)( + def paginateRecord()( implicit session: Session, keySpace: KeySpace, ec: ExecutionContextExecutor ): ScalaFuture[ListResult[R]] = { - future(st => st.setPagingState(state)) map { - set => ListResult(directMapper(set.all), set) - } + greedyEval(future()) } - def fetchRecord(state: Option[PagingState])( + /** + * Returns a parsed sequence of [R]ows. + * This will only fetch the amount of records defined in the fetchSize setting. + * It will allow pagination of the inner result set as a [[scala.collection.immutable.List]]. + * + * It will greedy evaluate the records inside a single fetch size batch as it returns a list as opposed to + * an iterator. For a non greedy variant of the size method use [[iterator()]]. + * + * @param session The implicit session provided by a [[com.outworkers.phantom.connectors.Connector]]. + * @param keySpace The implicit keySpace definition provided by a [[com.outworkers.phantom.connectors.Connector]]. + * @param ec The implicit Scala execution context. + * @return A Scala future wrapping a list of mapped results. + */ + def paginateRecord(state: PagingState)( implicit session: Session, keySpace: KeySpace, ec: ExecutionContextExecutor ): ScalaFuture[ListResult[R]] = { - state.fold(future().map { - set => ListResult(directMapper(set.all), set) - }) (state => future(_.setPagingState(state)) map { - set => ListResult(directMapper(set.all), set) - }) + greedyEval(future(_.setPagingState(state))) } /** @@ -294,37 +327,112 @@ trait ExecutableQuery[T <: CassandraTable[T, _], R, Limit <: LimitBound] * @param ec The implicit Scala execution context. * @return A Scala future wrapping a list of mapped results. */ - def fetchRecord(modifyStatement: Modifier)( + def paginateRecord(state: Option[PagingState])( implicit session: Session, keySpace: KeySpace, ec: ExecutionContextExecutor ): ScalaFuture[ListResult[R]] = { - future(modifyStatement) map { - set => ListResult(directMapper(set.all), set) + state match { + case None => greedyEval(future()) + case Some(defined) => greedyEval(future(_.setPagingState(defined))) } } /** - * Returns a parsed iterator of [R]ows - * - * @param session The implicit session provided by a [[com.outworkers.phantom.connectors.Connector]]. - * @param keySpace The implicit keySpace definition provided by a [[com.outworkers.phantom.connectors.Connector]]. - * @param ec The implicit Scala execution context. - * @return A Scala future wrapping scala iterator of mapped results. - */ + * Returns a parsed sequence of [R]ows. + * This will only fetch the amount of records defined in the fetchSize setting. + * It will allow pagination of the inner result set as a [[scala.collection.immutable.List]]. + * + * It will greedy evaluate the records inside a single fetch size batch as it returns a list as opposed to + * an iterator. For a non greedy variant of the size method use [[iterator()]]. + * + * @param session The implicit session provided by a [[com.outworkers.phantom.connectors.Connector]]. + * @param keySpace The implicit keySpace definition provided by a [[com.outworkers.phantom.connectors.Connector]]. + * @param ec The implicit Scala execution context. + * @return A Scala future wrapping a list of mapped results. + */ + def paginateRecord(modifier: Modifier)( + implicit session: Session, + keySpace: KeySpace, + ec: ExecutionContextExecutor + ): ScalaFuture[ListResult[R]] = { + greedyEval(future(modifier)) + } + + /** + * Returns a parsed iterator of [R]ows lazily evaluated. This will respect the fetch size setting + * of a query, meaning you will need to provide a paging state to fetch records beyond the regular fetch + * size. + * + * @param session The implicit session provided by a [[com.outworkers.phantom.connectors.Connector]]. + * @param keySpace The implicit keySpace definition provided by a [[com.outworkers.phantom.connectors.Connector]]. + * @param ec The implicit Scala execution context. + * @return A Scala future wrapping scala iterator of mapped results. + */ def iterator()( implicit session: Session, keySpace: KeySpace, ec: ExecutionContextExecutor - ): ScalaFuture[Iterator[R]] = { - future() map { _.iterator().asScala.map(fromRow) } + ): ScalaFuture[IteratorResult[R]] = { + future() map { res => IteratorResult(res.iterator().asScala.map(fromRow), res) } } - def iteratorRecord()( + /** + * Returns a parsed iterator of [R]ows lazily evaluated. This will respect the fetch size setting + * of a query, meaning you will need to provide a paging state to fetch records beyond the regular fetch + * size. + * + * @param session The implicit session provided by a [[com.outworkers.phantom.connectors.Connector]]. + * @param keySpace The implicit keySpace definition provided by a [[com.outworkers.phantom.connectors.Connector]]. + * @param ec The implicit Scala execution context. + * @return A Scala future wrapping scala iterator of mapped results. + */ + def iterator(modifier: Modifier)( implicit session: Session, keySpace: KeySpace, ec: ExecutionContextExecutor ): ScalaFuture[IteratorResult[R]] = { - future() map { result => IteratorResult(result.iterator().asScala.map(fromRow), result) } + future(modifier) map { res => IteratorResult(res.iterator().asScala.map(fromRow), res) } + } + + /** + * Returns a parsed sequence of [R]ows. + * This will only fetch the amount of records defined in the fetchSize setting. + * It will allow pagination of the inner result set as a [[scala.collection.immutable.List]]. + * + * It will greedy evaluate the records inside a single fetch size batch as it returns a list as opposed to + * an iterator. For a non greedy variant of the size method use [[iterator()]]. + * + * @param session The implicit session provided by a [[com.outworkers.phantom.connectors.Connector]]. + * @param keySpace The implicit keySpace definition provided by a [[com.outworkers.phantom.connectors.Connector]]. + * @param ec The implicit Scala execution context. + * @return A Scala future wrapping a list of mapped results. + */ + def iterator(state: PagingState)( + implicit session: Session, + keySpace: KeySpace, + ec: ExecutionContextExecutor + ): ScalaFuture[IteratorResult[R]] = { + lazyEval(future(_.setPagingState(state))) + } + + /** + * Returns a parsed sequence of [R]ows + * This is not suitable for big results set + * + * @param session The implicit session provided by a [[com.outworkers.phantom.connectors.Connector]]. + * @param keySpace The implicit keySpace definition provided by a [[com.outworkers.phantom.connectors.Connector]]. + * @param ec The implicit Scala execution context. + * @return A Scala future wrapping a list of mapped results. + */ + def iterator(state: Option[PagingState])( + implicit session: Session, + keySpace: KeySpace, + ec: ExecutionContextExecutor + ): ScalaFuture[IteratorResult[R]] = { + state match { + case None => lazyEval(future()) + case Some(defined) => lazyEval(future(_.setPagingState(defined))) + } } } diff --git a/phantom-dsl/src/main/scala/com/outworkers/phantom/database/Database.scala b/phantom-dsl/src/main/scala/com/outworkers/phantom/database/Database.scala index 4fd817a46..3931520f7 100644 --- a/phantom-dsl/src/main/scala/com/outworkers/phantom/database/Database.scala +++ b/phantom-dsl/src/main/scala/com/outworkers/phantom/database/Database.scala @@ -165,12 +165,11 @@ abstract class Database[ } sealed class ExecutableCreateStatementsList(val queries: KeySpace => Seq[CreateQuery[_, _, _]]) { - - def future()( + def future()( implicit session: Session, keySpace: KeySpace, ec: ExecutionContextExecutor ): Future[Seq[ResultSet]] = { Future.sequence(queries(keySpace).map(_.future())) } -} \ No newline at end of file +} diff --git a/phantom-dsl/src/main/scala/com/outworkers/phantom/macros/TableHelper.scala b/phantom-dsl/src/main/scala/com/outworkers/phantom/macros/TableHelper.scala index 6259a935c..3a419a94b 100644 --- a/phantom-dsl/src/main/scala/com/outworkers/phantom/macros/TableHelper.scala +++ b/phantom-dsl/src/main/scala/com/outworkers/phantom/macros/TableHelper.scala @@ -152,7 +152,7 @@ class TableHelperMacro(override val c: blackbox.Context) extends MacroUtils(c) { * Alternatively, this will return an unimplemented ??? method, provided a correct * definition could not be inferred. */ - def materializeExtractor(tableTpe: Type, recordTpe: Type, columns: List[Symbol]): Option[Tree] = { + def materializeExtractor[T](tableTpe: Type, recordTpe: Type, columns: List[Symbol]): Option[Tree] = { val columnNames = columns.map( tpe => { q"$tableTerm.${tpe.typeSignatureIn(tableTpe).typeSymbol.name.toTermName}.apply($rowTerm)" @@ -192,23 +192,35 @@ class TableHelperMacro(override val c: blackbox.Context) extends MacroUtils(c) { } } + val tableSymbolName = tableTpe.typeSymbol.name + if (recordMembers.size == colMembers.size) { - if (!recordMembers.toSeq.zip(colMembers).forall { case (rec, col) => rec != col }) { - Some(q"""new $recordTpe(..$columnNames)""") + if (recordMembers.zip(colMembers).forall { case (rec, col) => rec =:= col }) { + + val tree = q"""new $recordTpe(..$columnNames)""" + + Console.println(s"Automatically generated fromRow method as types matched for $tableSymbolName") + Console.println(showCode(tree)) + Some(tree) } else { - Console.println(s"The case class records did not match the column member types for ${tableTpe.typeSymbol.name}") + Console.println(s"The case class records did not match the column member types for $tableSymbolName") Console.println(recordMembers.map(_.typeSymbol.name.toTypeName.decodedName.toString).mkString(", ")) Console.println(colMembers.map(_.typeSymbol.name.toTermName.decodedName.toString).mkString(", ")) None } } else { - Console.println(s"There were ${recordMembers.size} case class fields and ${colMembers.size} columns for ${tableTpe.typeSymbol.name}") + Console.println(s"There were ${recordMembers.size} case class fields and ${colMembers.size} columns for ${tableSymbolName}") Console.println(recordMembers.map(_.typeSymbol.name.toTypeName.decodedName.toString).mkString(", ")) Console.println(colMembers.map(_.typeSymbol.name.toTermName.decodedName.toString).mkString(", ")) None } } + /** + * Finds the first type in the type hierarchy for which columns exist as direct members. + * @param tpe The type of the table. + * @return An optional symbol, if such a type was found in the type hierarchy. + */ def determineReferenceTable(tpe: Type): Option[Symbol] = { tpe.baseClasses.reverse.find(symbol => { symbol.typeSignature.decls.exists(_.typeSignature <:< typeOf[AbstractColumn[_]]) diff --git a/phantom-dsl/src/test/scala/com/outworkers/phantom/builder/serializers/SelectQuerySerialisationTest.scala b/phantom-dsl/src/test/scala/com/outworkers/phantom/builder/serializers/SelectQuerySerialisationTest.scala index a6db1c00d..e76687c1c 100644 --- a/phantom-dsl/src/test/scala/com/outworkers/phantom/builder/serializers/SelectQuerySerialisationTest.scala +++ b/phantom-dsl/src/test/scala/com/outworkers/phantom/builder/serializers/SelectQuerySerialisationTest.scala @@ -184,7 +184,7 @@ class SelectQuerySerialisationTest extends QueryBuilderTest { .where(_.id eqs id) .qb.queryString - qb shouldEqual s"SELECT dateOf(id) FROM phantom.timeUUIDTable WHERE id = $id" + qb shouldEqual s"SELECT dateOf(id) FROM phantom.${TestDatabase.timeuuidTable.tableName} WHERE id = $id" } } } diff --git a/phantom-dsl/src/test/scala/com/outworkers/phantom/tables/PrimitivesJoda.scala b/phantom-dsl/src/test/scala/com/outworkers/phantom/tables/PrimitivesJoda.scala index d76631fff..fa7f3bfb6 100644 --- a/phantom-dsl/src/test/scala/com/outworkers/phantom/tables/PrimitivesJoda.scala +++ b/phantom-dsl/src/test/scala/com/outworkers/phantom/tables/PrimitivesJoda.scala @@ -47,7 +47,7 @@ abstract class ConcretePrimitivesJoda extends PrimitivesJoda with RootConnector } def fetchPage(limit: Int, paging: Option[PagingState]): Future[ListResult[JodaRow]] = { - select.limit(limit).fetchRecord(paging) + select.limit(limit).paginateRecord(paging) } override val tableName = "PrimitivesJoda" diff --git a/phantom-jdk8/src/main/scala/com/outworkers/phantom/jdk8/DefaultJava8Primitives.scala b/phantom-jdk8/src/main/scala/com/outworkers/phantom/jdk8/DefaultJava8Primitives.scala index 7b2ad41ed..957ff58e3 100644 --- a/phantom-jdk8/src/main/scala/com/outworkers/phantom/jdk8/DefaultJava8Primitives.scala +++ b/phantom-jdk8/src/main/scala/com/outworkers/phantom/jdk8/DefaultJava8Primitives.scala @@ -99,4 +99,29 @@ trait DefaultJava8Primitives { override def clz: Class[com.datastax.driver.core.LocalDate] = classOf[com.datastax.driver.core.LocalDate] } + implicit object JdkLocalDateTimeIsPrimitive extends Primitive[LocalDateTime] { + + override type PrimitiveType = java.time.LocalDateTime + + val cassandraType = CQLSyntax.Types.Timestamp + + override def asCql(value: LocalDateTime): String = { + CQLQuery.empty.singleQuote(value.atZone(ZoneOffset.UTC).toString) + } + + override def fromRow(column: String, row: GettableByNameData): Try[LocalDateTime] = nullCheck(column, row) { + r => LocalDateTime.ofInstant(Instant.ofEpochMilli(r.getTimestamp(column).getTime), ZoneOffset.UTC) + } + + override def fromRow(index: Int, row: GettableByIndexData): Try[LocalDateTime] = nullCheck(index, row) { + r => LocalDateTime.ofInstant(Instant.ofEpochMilli(r.getTimestamp(index).getTime), ZoneOffset.UTC) + } + + override def fromString(value: String): LocalDateTime = { + Instant.ofEpochMilli(value.toLong).atZone(ZoneOffset.UTC).toLocalDateTime + } + + override def clz: Class[LocalDateTime] = classOf[LocalDateTime] + } + } diff --git a/phantom-jdk8/src/main/scala/com/outworkers/phantom/jdk8/dsl/package.scala b/phantom-jdk8/src/main/scala/com/outworkers/phantom/jdk8/dsl/package.scala index d03ea29c4..e70971221 100644 --- a/phantom-jdk8/src/main/scala/com/outworkers/phantom/jdk8/dsl/package.scala +++ b/phantom-jdk8/src/main/scala/com/outworkers/phantom/jdk8/dsl/package.scala @@ -15,6 +15,8 @@ */ package com.outworkers.phantom.jdk8 +import java.time.LocalDateTime + import com.outworkers.phantom.dsl.CassandraTable package object dsl extends DefaultJava8Primitives { @@ -34,6 +36,11 @@ package object dsl extends DefaultJava8Primitives { Record ] = com.outworkers.phantom.column.PrimitiveColumn[Owner, Record, JdkLocalDate] + type JdkLocalDateTimeColumn[ + Owner <: CassandraTable[Owner, Record], + Record + ] = com.outworkers.phantom.column.PrimitiveColumn[Owner, Record, LocalDateTime] + type OptionalOffsetDateTimeColumn[ Owner <: CassandraTable[Owner, Record], Record @@ -49,8 +56,14 @@ package object dsl extends DefaultJava8Primitives { Record ] = com.outworkers.phantom.column.OptionalPrimitiveColumn[Owner, Record, JdkLocalDate] + type OptionalJdkLocalDateTimeColumn[ + Owner <: CassandraTable[Owner, Record], + Record + ] = com.outworkers.phantom.column.OptionalPrimitiveColumn[Owner, Record, LocalDateTime] + type OffsetDateTime = java.time.OffsetDateTime type ZonedDateTime = java.time.ZonedDateTime type JdkLocalDate = java.time.LocalDate + type JdkLocalDateTime = java.time.LocalDateTime } diff --git a/phantom-jdk8/src/test/scala/com/outworkers/phantom/jdk8/OptionalPrimitivesJdk8.scala b/phantom-jdk8/src/test/scala/com/outworkers/phantom/jdk8/OptionalPrimitivesJdk8.scala index a01a57427..0ba891eac 100644 --- a/phantom-jdk8/src/test/scala/com/outworkers/phantom/jdk8/OptionalPrimitivesJdk8.scala +++ b/phantom-jdk8/src/test/scala/com/outworkers/phantom/jdk8/OptionalPrimitivesJdk8.scala @@ -15,7 +15,7 @@ */ package com.outworkers.phantom.jdk8 -import java.time.{LocalDate, OffsetDateTime} +import java.time.{LocalDate, LocalDateTime, OffsetDateTime} import com.outworkers.phantom.CassandraTable import com.outworkers.phantom.connectors.RootConnector @@ -27,7 +27,8 @@ case class OptionalJdk8Row( pkey: String, offsetDateTime: Option[OffsetDateTime], zonedDateTime: Option[ZonedDateTime], - localDate: Option[LocalDate] + localDate: Option[LocalDate], + localDateTime: Option[LocalDateTime] ) sealed class OptionalPrimitivesJdk8 extends CassandraTable[ConcreteOptionalPrimitivesJdk8, OptionalJdk8Row] { @@ -40,14 +41,7 @@ sealed class OptionalPrimitivesJdk8 extends CassandraTable[ConcreteOptionalPrimi object localDate extends OptionalJdkLocalDateColumn(this) - override def fromRow(r: Row): OptionalJdk8Row = { - OptionalJdk8Row( - pkey = pkey(r), - offsetDateTime = offsetDateTime(r), - zonedDateTime = zonedDateTime(r), - localDate = localDate(r) - ) - } + object localDateTime extends OptionalJdkLocalDateTimeColumn(this) } abstract class ConcreteOptionalPrimitivesJdk8 extends OptionalPrimitivesJdk8 with RootConnector { @@ -57,6 +51,7 @@ abstract class ConcreteOptionalPrimitivesJdk8 extends OptionalPrimitivesJdk8 wit .value(_.offsetDateTime, primitive.offsetDateTime) .value(_.zonedDateTime, primitive.zonedDateTime) .value(_.localDate, primitive.localDate) + .value(_.localDateTime, primitive.localDateTime) } override val tableName = "OptionalPrimitivesJdk8" diff --git a/phantom-jdk8/src/test/scala/com/outworkers/phantom/jdk8/tables/ConcretePrimitivesJdk8.scala b/phantom-jdk8/src/test/scala/com/outworkers/phantom/jdk8/tables/ConcretePrimitivesJdk8.scala index f490344a1..baf5a317d 100644 --- a/phantom-jdk8/src/test/scala/com/outworkers/phantom/jdk8/tables/ConcretePrimitivesJdk8.scala +++ b/phantom-jdk8/src/test/scala/com/outworkers/phantom/jdk8/tables/ConcretePrimitivesJdk8.scala @@ -15,7 +15,7 @@ */ package com.outworkers.phantom.jdk8.tables -import java.time.{LocalDate, OffsetDateTime} +import java.time.{LocalDate, LocalDateTime, OffsetDateTime} import com.outworkers.phantom.CassandraTable import com.outworkers.phantom.connectors.RootConnector @@ -27,7 +27,8 @@ case class Jdk8Row( pkey: String, offsetDateTime: OffsetDateTime, zonedDateTime: ZonedDateTime, - localDate: LocalDate + localDate: LocalDate, + localDateTime: LocalDateTime ) sealed class PrimitivesJdk8 extends CassandraTable[ConcretePrimitivesJdk8, Jdk8Row] { @@ -40,14 +41,7 @@ sealed class PrimitivesJdk8 extends CassandraTable[ConcretePrimitivesJdk8, Jdk8R object localDate extends JdkLocalDateColumn(this) - override def fromRow(r: Row): Jdk8Row = { - Jdk8Row( - pkey = pkey(r), - offsetDateTime = offsetDateTime(r), - zonedDateTime = zonedDateTime(r), - localDate = localDate(r) - ) - } + object localDateTime extends JdkLocalDateTimeColumn(this) } abstract class ConcretePrimitivesJdk8 extends PrimitivesJdk8 with RootConnector { @@ -57,6 +51,7 @@ abstract class ConcretePrimitivesJdk8 extends PrimitivesJdk8 with RootConnector .value(_.offsetDateTime, primitive.offsetDateTime) .value(_.zonedDateTime, primitive.zonedDateTime) .value(_.localDate, primitive.localDate) + .value(_.localDateTime, primitive.localDateTime) } override val tableName = "PrimitivesJdk8" diff --git a/phantom-jdk8/src/test/scala/com/outworkers/phantom/jdk8/tables/package.scala b/phantom-jdk8/src/test/scala/com/outworkers/phantom/jdk8/tables/package.scala index f935435a3..58fa5ef85 100644 --- a/phantom-jdk8/src/test/scala/com/outworkers/phantom/jdk8/tables/package.scala +++ b/phantom-jdk8/src/test/scala/com/outworkers/phantom/jdk8/tables/package.scala @@ -15,7 +15,7 @@ */ package com.outworkers.phantom.jdk8 -import java.time.{LocalDate, OffsetDateTime, ZoneOffset, ZonedDateTime} +import java.time._ import com.outworkers.util.testing.{Sample, _} @@ -27,7 +27,8 @@ package object tables { gen[String], OffsetDateTime.now(ZoneOffset.UTC).plusSeconds(gen[Long]), ZonedDateTime.now(ZoneOffset.UTC).plusSeconds(gen[Long]), - LocalDate.now().plusDays(gen[Long]) + LocalDate.now().plusDays(gen[Long]), + LocalDateTime.now().plusSeconds(gen[Long]) ) } } @@ -38,7 +39,8 @@ package object tables { gen[String], Some(OffsetDateTime.now(ZoneOffset.UTC).plusSeconds(gen[Long])), Some(ZonedDateTime.now(ZoneOffset.UTC).plusSeconds(gen[Long])), - Some(LocalDate.now().plusDays(gen[Long])) + Some(LocalDate.now().plusDays(gen[Long])), + Some(LocalDateTime.now().plusSeconds(gen[Long])) ) } } diff --git a/phantom-reactivestreams/src/main/scala/com/outworkers/phantom/reactivestreams/package.scala b/phantom-reactivestreams/src/main/scala/com/outworkers/phantom/reactivestreams/package.scala index 4ee8b85b8..48db94703 100644 --- a/phantom-reactivestreams/src/main/scala/com/outworkers/phantom/reactivestreams/package.scala +++ b/phantom-reactivestreams/src/main/scala/com/outworkers/phantom/reactivestreams/package.scala @@ -16,7 +16,7 @@ package com.outworkers.phantom import akka.actor.ActorSystem -import com.datastax.driver.core.Session +import com.datastax.driver.core.{Session, Statement} import com.outworkers.phantom.batch.BatchType import com.outworkers.phantom.builder.LimitBound import com.outworkers.phantom.builder.query.{ExecutableQuery, RootSelectBlock} @@ -195,7 +195,7 @@ package object reactivestreams { * @param session The Cassandra session in use. * @param keySpace The keyspace object in use. * @param ctx The Execution Context. - * @return + * @return A play enumerator containing the results of the query. */ def fetchEnumerator()( implicit session: Session, @@ -208,6 +208,27 @@ package object reactivestreams { } } } + + /** + * Produces an Enumerator for [R]ows + * This enumerator can be consumed afterwards with an Iteratee + * @param mod A modifier to apply to a statement. + * @param session The Cassandra session in use. + * @param keySpace The keyspace object in use. + * @param ctx The Execution Context. + * @return A play enumerator containing the results of the query. + */ + def fetchEnumerator(mod: Statement => Statement)( + implicit session: Session, + keySpace: KeySpace, + ctx: ExecutionContextExecutor + ): PlayEnumerator[R] = { + PlayEnumerator.flatten { + query.future(mod) map { res => + Enumerator.enumerator(res) through Enumeratee.map(query.fromRow) + } + } + } } } diff --git a/phantom-reactivestreams/src/test/logback-test.xml b/phantom-reactivestreams/src/test/logback-test.xml new file mode 100644 index 000000000..2e7418b67 --- /dev/null +++ b/phantom-reactivestreams/src/test/logback-test.xml @@ -0,0 +1,19 @@ + + + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + + + + + + \ No newline at end of file diff --git a/phantom-reactivestreams/src/test/scala/com/outworkers/phantom/reactivestreams/suites/PublisherIntegrationTest.scala b/phantom-reactivestreams/src/test/scala/com/outworkers/phantom/reactivestreams/suites/PublisherIntegrationTest.scala index 47a55294c..b5e0eed23 100644 --- a/phantom-reactivestreams/src/test/scala/com/outworkers/phantom/reactivestreams/suites/PublisherIntegrationTest.scala +++ b/phantom-reactivestreams/src/test/scala/com/outworkers/phantom/reactivestreams/suites/PublisherIntegrationTest.scala @@ -68,6 +68,6 @@ class PublisherIntegrationTest extends FlatSpec with StreamTest with TestImplici eventually { counter.get() shouldEqual generatorCount - } (defaultPatience) + } } } diff --git a/phantom-reactivestreams/src/test/scala/com/outworkers/phantom/reactivestreams/suites/iteratee/IteratorTest.scala b/phantom-reactivestreams/src/test/scala/com/outworkers/phantom/reactivestreams/suites/iteratee/IteratorTest.scala index 12e2c4d9c..3b1b633b9 100644 --- a/phantom-reactivestreams/src/test/scala/com/outworkers/phantom/reactivestreams/suites/iteratee/IteratorTest.scala +++ b/phantom-reactivestreams/src/test/scala/com/outworkers/phantom/reactivestreams/suites/iteratee/IteratorTest.scala @@ -36,13 +36,36 @@ class IteratorTest extends BigTest with ScalaFutures { val chain = for { store <- Future.sequence(rows.map(row => database.timeuuidTable.store(row).future())) - iterator <- TestDatabase.timeuuidTable.select.where(_.user eqs user).iterator() + iterator <- database.timeuuidTable.select.where(_.user eqs user).iterator() } yield iterator whenReady(chain) { res => { - res.size shouldEqual generationSize - res.foreach(x => rows should contain (x)) + res.records.size shouldEqual generationSize + res.records.forall(rows contains _) + } + } + } + + ignore should "correctly paginate a query using an iterator" in { + val generationSize = 100 + val fetchSize = generationSize / 2 + val user = gen[UUID] + val rows = genList[TimeUUIDRecord](generationSize).map(_.copy(user = user)) + + val chain = for { + store <- Future.sequence(rows.map(row => database.timeuuidTable.store(row).future())) + firstHalf <- database.timeuuidTable.select.where(_.user eqs user).iterator(_.setFetchSize(fetchSize)) + secondHalf <- database.timeuuidTable.select.where(_.user eqs user).iterator(firstHalf.pagingState) + } yield (firstHalf, secondHalf) + + whenReady(chain) { + case (firstBatch, secondBatch) => { + firstBatch.records.size shouldEqual fetchSize + firstBatch.records.forall(rows contains _) + + secondBatch.records.size shouldEqual fetchSize + secondBatch.records.forall(rows contains _) } } } diff --git a/project/plugins.sbt b/project/plugins.sbt index 4f9228a6a..ec41d7747 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -26,8 +26,7 @@ def outworkersPattern: Patterns = { } resolvers ++= Seq( - "Sonatype snapshots" at "http://oss.sonatype.org/content/repositories/snapshots/", - "sonatype-releases" at "https://oss.sonatype.org/content/repositories/releases/", + Resolver.sonatypeRepo("releases"), "jgit-repo" at "http://download.eclipse.org/jgit/maven", "Twitter Repo" at "http://maven.twttr.com/", Resolver.bintrayRepo("websudos", "oss-releases"),