Skip to content

feat(spark): add some date functions #373

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 11, 2025

Conversation

andrew-coleman
Copy link
Contributor

The date/time functions in Spark don’t map directly to the Substrait eqivalents. E.g.

  • date ± interval-days are handled by the DateAdd & DateSub functions in Spark, but as a variant of the arithmetic add function in substrait.
  • The date/time component extraction functions are all handled by different functions in Spark, but by a single extract function in Substrait with an enum argument to specify which component.

Neither of these could be handled using the existing function mapping capabilities in the spark module.

This commit exends this capability so that it can now handle these two scenarios in (I hope) a generic way.

I’ve added a few variants of the extract function - more can follow.

Adding this will give us 100% pass rate for all the TPC-DS querues. The README is updated accordingly.

@Blizzara Blizzara self-requested a review April 4, 2025 13:59
Copy link
Contributor

@Blizzara Blizzara left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Makes sense for the extract parts, but for the DateAdd and DateSub I don't think the signature is correct (output type would be wrong). I think we should instead add the correct signatures.

}
}

private def validateOutputType(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd just get rid of these calls, maybe leave a TODO in place instead

@@ -31,22 +30,10 @@ class TPCDSPlan extends TPCDSBase with SubstraitPlanTestBase {
spark.conf.set("spark.sql.readSideCharPadding", "false")
}

// spotless:off
val failingSQL: Set[String] = Set(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice! 🎉


val opTypesStr = operands.map {
case e: SExpression => e.getType.accept(ToTypeString.INSTANCE)
case t: Type => t.accept(ToTypeString.INSTANCE)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you know if we have any tests for this (t: Type) case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this gets driven by the last test in DateTimeSuite.scala and by three of the TCP-H tests (that extract the year component - this was previously handled by an internal definition in spark.yaml)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't the use the EnumArg case, not the Type case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, sorry. The Type case is there for completeness. As far as I can see, there are no currently supported functions that use type arguments. I suppose we could throw an unsupported exception in this case, if you'd prefer.

}

def unapply(name_args: (String, Seq[Expression])): Option[Expression] = name_args match {
case ("add:date_iday", Seq(startDate, i @ Literal(_, DayTimeIntervalType.DEFAULT))) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think these are correct, the add:date_iday has a return type of timestamp, while Spark returns date. We should rather add new definitions, either into substrait itself or into spark.yml here, for add:date_i32 -> date

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we try to ensure that as many of the default Substrait functions are mapped to Spark and trying to keep the Spark specific mappings as minimal as possible? Especially for the Substrait -> Spark direction?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When they match, yes, but in this case they don't. In Substrait you can give here an interval like 1d 12h, so if the date is let's say today 2025-04-09, the result should be 2025-04-10T12:00:00, while this in Spark would return 2025-04-10.

Copy link
Contributor

@nielspardon nielspardon Apr 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we agree we would need to ensure that for this mapping the behavior in Spark matches the behavior in Substrait for the Substrait -> Spark direction?

I do understand that when we talk about the other direction Spark -> Substrait that we should make sure we are not losing something that can be expressed in Spark when mapping to Substrait.

I'm just a bit concerned if we keep adding custom extensions for the subtrait-spark mappings that we are losing some of the utility of having a common format for query plans independent of engines.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would need to ensure that for this mapping the behavior in Spark matches the behavior in Substrait for the Substrait -> Spark direction?

Yes, which is currently not the case.

I do understand that when we talk about the other direction Spark -> Substrait that we should make sure we are not losing something that can be expressed in Spark when mapping to Substrait.

In this case the Spark expression is more limited (takes number of days, while Substrait takes an interval), so there is no losing. However the return type is wrong - the Spark expr returns date, while the Substrait definition claims a timestamp.

I'm just a bit concerned if we keep adding custom extensions for the subtrait-spark mappings that we are losing some of the utility of having a common format for query plans independent of engines.

Yeah, that's tricky. It's a nice goal, and worth going for when possible, but it just isn't always doable (or at least not easy) since the engines support different things and in different ways.

Internally, we've found it most useful to map to Substrait extensions what maps nicely, and for the rest do Spark specific mappings (and then replicate those in other engines we want to use). Not saying that's what this repo should do necessarily.

One way to solve this could be to map add:date_iday into Add(Cast(X as TimestampNTZ), Y), I guess that should match for types.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To move this forward (we don't use an internal fork), I've added add:date_i32 to spark.yaml for now. I'll work separately on pushing this down into the core substrait library.

import org.apache.spark.sql.catalyst.expressions.{LeafExpression, Unevaluable}
import org.apache.spark.sql.types.{DataType, NullType}

case class Enum(value: String) extends LeafExpression with Unevaluable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense, mind adding a docstring to explain what this is for though?

@@ -196,7 +199,10 @@ abstract class ToSubstraitExpression extends HasOutputStack[Seq[Attribute]] {
case InSet(value, set) => translateIn(value, set.toSeq.map(v => Literal(v)))
case scalar @ ScalarFunction(children) =>
Util
.seqToOption(children.map(translateUp))
.seqToOption(children.map {
case Enum(value) => Some(ImmutableEnumArg.builder.value(Optional.of(value)).build)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: if EnumArg doesn't have the builder on itself, you could add it there so that you could do just EnumArg.builder...

case _ => None
}
val tz =
if (Cast.needsTimeZone(childExp.dataType, tt))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice!

@Blizzara
Copy link
Contributor

Blizzara commented Apr 9, 2025

FWIW, in our fork I've done something similar, but maybe even more generic still (yeah, I should pull it upstream too..):

  trait ToSpark {
    def apply(name: String, args: Seq[Expression], options: Seq[FunctionOption]): Expression
  }

  trait ToSubstrait {
    def apply(expr: Expression)
    : (String /* name */, Seq[Expression] /* args */, Seq[FunctionOption] /* options */ )
  }

  case class Mapping(
      sparkClasses: Seq[Class[_]],
      substraitNames: Seq[String],
      toSpark: ToSpark,
      toSubstrait: ToSubstrait) {}

  /** Helper for creating simple 1-to-1 mappings */
  def s[T <: Expression: ClassTag](name: String): Mapping = {
    val builder = FunctionRegistryBase.build[T](name, None)._2
    s[T](name, builder, (expr: Expression) => expr.children)
  }

  /** Helper for creating reasonably simple 1-to-1 mappings with custom handling for arguments */
  def s[T <: Expression: ClassTag](
      name: String,
      builder: Seq[Expression] => T,
      args: T => Seq[Expression]): Mapping = {
    // For cases requiring custom arguments handling
    Mapping(
      Seq(scala.reflect.classTag[T].runtimeClass),
      Seq(name),
      (_, args, _) => builder(args),
      expr => (name, args(expr.asInstanceOf[T]), Seq.empty)
    )
  }

this then allows you to do both simple and more complex mappings logic:

    s[IsNaN]("is_nan"),
    Mapping(
      Seq(classOf[Add], classOf[DateAddInterval]),
      Seq("add"),
      (_, args, _) => {
        (args.head.dataType, args.last.dataType) match {
          case (DataTypes.DateType, DataTypes.CalendarIntervalType) => DateAddInterval(args.head, args.last)
          case _ => Add(args.head, args.last)
        }
      },
      {
        case Add(left, right, _) => ("add", Seq(left, right), Seq.empty)
        case DateAddInterval(left, right, _, _) => ("add", Seq(left, right), Seq.empty)
      }
    ),

The date/time functions in Spark don’t map directly to the Substrait eqivalents. E.g.
- `date ± interval-days` are handled by the `DateAdd` & `DateSub` functions in Spark,
   but as a variant of the arithmetic `add` function in substrait.
- The date/time component extraction functions are all handled by different functions
   in Spark,  but by a single `extract` function in Substrait with an `enum` argument
   to specify which component.

Neither of these could be handled using the existing function mapping capabilities
in the `spark` module.

This commit exends this capability so that it can now handle these
two scenarios in (I hope) a generic way.

I’ve added a few variants of the `extract` function - more can follow.

Adding this will give us 100% pass rate for all the TPC-DS querues.
The README is updated accordingly.
@Blizzara Blizzara merged commit 2ece486 into substrait-io:main Apr 11, 2025
12 checks passed
@andrew-coleman andrew-coleman deleted the date_functions branch April 11, 2025 12:38
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants