Skip to content

Commit

Permalink
review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
k-anshul committed Dec 20, 2024
1 parent 0197df2 commit 9eb17e7
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 56 deletions.
15 changes: 13 additions & 2 deletions runtime/metricsview/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/rilldata/rill/runtime"
"github.com/rilldata/rill/runtime/drivers"
"github.com/rilldata/rill/runtime/drivers/druid"
"github.com/rilldata/rill/runtime/pkg/jsonval"
)

const (
Expand Down Expand Up @@ -97,16 +98,26 @@ func (e *Executor) CacheKey(ctx context.Context) ([]byte, bool, error) {
return nil, false, err
}
defer res.Close()
var key string
var key any
if res.Next() {
if err := res.Scan(&key); err != nil {
return nil, false, err
}

key, err = jsonval.ToValue(key, res.Schema.Fields[0].Type)
if err != nil {
return nil, false, err
}
}
if res.Err() != nil {
return nil, false, err
}
return []byte(key), true, nil

keyBytes, err := json.Marshal(key)
if err != nil {
return nil, false, err
}
return keyBytes, true, nil
}

// ValidateQuery validates the provided query against the executor's metrics view.
Expand Down
18 changes: 12 additions & 6 deletions runtime/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package runtime

import (
"context"
"errors"
"fmt"
"io"
"strings"
Expand Down Expand Up @@ -80,10 +81,12 @@ func (r *Runtime) Query(ctx context.Context, instanceID string, query Query, pri
// Using StateUpdatedOn instead of StateVersion because the state version is reset when the resource is deleted and recreated.
key := fmt.Sprintf("%s:%s:%d:%d", res.Meta.Name.Kind, res.Meta.Name.Name, res.Meta.StateUpdatedOn.Seconds, res.Meta.StateUpdatedOn.Nanos/int32(time.Millisecond))
if mv := res.GetMetricsView(); mv != nil {
cacheKey, err := r.metricsViewCacheKey(ctx, instanceID, res.Meta.Name.Name, priority)
cacheKey, ok, err := r.metricsViewCacheKey(ctx, instanceID, res.Meta.Name.Name, priority)
if err != nil {
return err
}
if !ok {
// skip caching
// the cache_key_resolver should ideally only return an error if caching is disabled or context is cancelled
return query.Resolve(ctx, r, instanceID, priority)
}
key = key + ":" + string(cacheKey)
Expand Down Expand Up @@ -141,7 +144,7 @@ func (r *Runtime) Query(ctx context.Context, instanceID string, query Query, pri
return nil
}

func (r *Runtime) metricsViewCacheKey(ctx context.Context, instanceID, name string, priority int) ([]byte, error) {
func (r *Runtime) metricsViewCacheKey(ctx context.Context, instanceID, name string, priority int) ([]byte, bool, error) {
cacheKeyResolver, err := r.Resolve(ctx, &ResolveOptions{
InstanceID: instanceID,
Resolver: "metrics_cache_key",
Expand All @@ -150,13 +153,16 @@ func (r *Runtime) metricsViewCacheKey(ctx context.Context, instanceID, name stri
Claims: &SecurityClaims{SkipChecks: true},
})
if err != nil {
return nil, err
if errors.Is(err, ErrMetricsViewCachingDisabled) {
return nil, false, nil
}
return nil, false, err
}
cacheKey, err := cacheKeyResolver.MarshalJSON()
if err != nil {
return nil, err
return nil, false, err
}
return cacheKey, nil
return cacheKey, true, nil
}

type queryCacheKey struct {
Expand Down
3 changes: 3 additions & 0 deletions runtime/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/binary"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io"
"strconv"
Expand All @@ -16,6 +17,8 @@ import (
"github.com/rilldata/rill/runtime/pkg/jsonval"
)

var ErrMetricsViewCachingDisabled = errors.New("metrics_cache_key: caching is disabled")

// Resolver represents logic, such as a SQL query, that produces output data.
// Resolvers are used to evaluate API requests, alerts, reports, etc.
//
Expand Down
50 changes: 12 additions & 38 deletions runtime/resolvers/metricsview_cache_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,14 @@ import (
"errors"
"fmt"
"io"
"strconv"
"strings"
"time"

"github.com/mitchellh/hashstructure/v2"
runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1"
"github.com/rilldata/rill/runtime"
"github.com/rilldata/rill/runtime/metricsview"
"github.com/rilldata/rill/runtime/pkg/mapstructureutil"
)

var errCachingDisabled = errors.New("metrics_cache_key: caching is disabled")

func init() {
runtime.RegisterResolverInitializer("metrics_cache_key", newMetricsViewCacheKeyResolver)
}
Expand All @@ -36,12 +31,12 @@ type metricsViewCacheKeyResolverArgs struct {
Priority int `mapstructure:"priority"`
}

type metricsViewCacheKey struct {
type metricsViewCacheKeyProps struct {
MetricsView string `mapstructure:"metrics_view"`
}

func newMetricsViewCacheKeyResolver(ctx context.Context, opts *runtime.ResolverOptions) (runtime.Resolver, error) {
tr := &metricsViewCacheKey{}
tr := &metricsViewCacheKeyProps{}
if err := mapstructureutil.WeakDecode(opts.Properties, tr); err != nil {
return nil, err
}
Expand Down Expand Up @@ -95,36 +90,22 @@ func (r *metricsViewCacheKeyResolver) Close() error {
return nil
}

func (r *metricsViewCacheKeyResolver) Cacheable() bool {
return true
}

func (r *metricsViewCacheKeyResolver) CacheKey(ctx context.Context) ([]byte, bool, error) {
var sb strings.Builder
sb.WriteString(runtime.ResourceKindMetricsView)
sb.WriteString(":")
sb.WriteString(r.mvName)
sb.WriteString(":")
sb.WriteString("cahe_key")
ttlSeconds := r.mv.CacheKeyTtlSeconds
if ttlSeconds == 0 && r.streaming {
var key string
ttl := time.Duration(r.mv.CacheKeyTtlSeconds) * time.Second
if ttl == 0 && r.streaming {
// If streaming, we need to cache the key for 60 seconds
// For non streaming metrics view we don't need to expire the key as data itself will be invalidated basis the ref's state version
ttlSeconds = 60
// For non streaming metrics view we don't need to expire the key as it will be invalidated basis the ref's state version
ttl = time.Minute
}
if ttlSeconds != 0 {
sb.WriteString(":")
sb.WriteString(truncateTime(time.Now(), ttlSeconds).Format(time.RFC3339))
if ttl != 0 {
key = time.Now().Truncate(ttl).Format(time.RFC3339)
}
hash, err := hashstructure.Hash(sb.String(), hashstructure.FormatV2, nil)
if err != nil {
return nil, false, err
}
return []byte(strconv.FormatUint(hash, 16)), true, nil
return []byte(key), true, nil
}

func (r *metricsViewCacheKeyResolver) Refs() []*runtimev1.ResourceName {
return []*runtimev1.ResourceName{}
return []*runtimev1.ResourceName{{Kind: runtime.ResourceKindMetricsView, Name: r.mvName}}
}

func (r *metricsViewCacheKeyResolver) Validate(ctx context.Context) error {
Expand All @@ -137,7 +118,7 @@ func (r *metricsViewCacheKeyResolver) ResolveInteractive(ctx context.Context) (r
return nil, err
}
if !ok {
return nil, errCachingDisabled
return nil, runtime.ErrMetricsViewCachingDisabled
}
schema := &runtimev1.StructType{
Fields: []*runtimev1.StructType_Field{
Expand All @@ -150,10 +131,3 @@ func (r *metricsViewCacheKeyResolver) ResolveInteractive(ctx context.Context) (r
func (r *metricsViewCacheKeyResolver) ResolveExport(ctx context.Context, w io.Writer, opts *runtime.ResolverExportOptions) error {
return errors.New("not implemented")
}

func truncateTime(t time.Time, seconds int64) time.Time {
// Convert x seconds to a duration
duration := time.Duration(seconds) * time.Second
// Truncate the time to the nearest x seconds
return t.Truncate(duration)
}
7 changes: 0 additions & 7 deletions runtime/resolvers/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,6 @@ func (r *sqlResolver) Close() error {
return nil
}

func (r *sqlResolver) Cacheable() bool {
if r.olap.Dialect() == drivers.DialectDuckDB {
return len(r.refs) != 0
}
return false
}

func (r *sqlResolver) CacheKey(ctx context.Context) ([]byte, bool, error) {
if r.olap.Dialect() == drivers.DialectDuckDB || r.olap.Dialect() == drivers.DialectClickHouse {
return []byte(r.sql), len(r.refs) != 0, nil
Expand Down
18 changes: 15 additions & 3 deletions runtime/resolvers/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package resolvers
import (
"context"
"errors"
"fmt"
"io"
"slices"
"strings"

Expand Down Expand Up @@ -36,14 +38,24 @@ func cacheKeyForMetricsView(ctx context.Context, r *runtime.Runtime, instanceID,
Claims: &runtime.SecurityClaims{SkipChecks: true},
})
if err != nil {
if errors.Is(err, errCachingDisabled) {
if errors.Is(err, runtime.ErrMetricsViewCachingDisabled) {
return nil, false, nil
}
return nil, false, err
}
cacheKey, err := cacheKeyResolver.MarshalJSON()
defer cacheKeyResolver.Close()

row, err := cacheKeyResolver.Next()
if err != nil {
if errors.Is(err, io.EOF) {
return nil, false, fmt.Errorf("`metrics_cache_key` resolver returned no rows")
}
return nil, false, err
}
return cacheKey, true, nil
res, ok := row["key"].(string)
if !ok {
// should never happen but just in case
return nil, false, errors.New("`metrics_cache_key`: expected a column key of type string in result")
}
return []byte(res), true, nil
}

0 comments on commit 9eb17e7

Please sign in to comment.