From 9eb17e7ad627fea52232becc338368ecb48a1178 Mon Sep 17 00:00:00 2001 From: Anshul Khandelwal <12948312+k-anshul@users.noreply.github.com> Date: Fri, 20 Dec 2024 11:43:43 +0530 Subject: [PATCH] review comments --- runtime/metricsview/executor.go | 15 ++++++- runtime/query.go | 18 +++++--- runtime/resolver.go | 3 ++ runtime/resolvers/metricsview_cache_key.go | 50 ++++++---------------- runtime/resolvers/sql.go | 7 --- runtime/resolvers/util.go | 18 ++++++-- 6 files changed, 55 insertions(+), 56 deletions(-) diff --git a/runtime/metricsview/executor.go b/runtime/metricsview/executor.go index 5884a411ebe..c9b297ba82a 100644 --- a/runtime/metricsview/executor.go +++ b/runtime/metricsview/executor.go @@ -13,6 +13,7 @@ import ( "github.com/rilldata/rill/runtime" "github.com/rilldata/rill/runtime/drivers" "github.com/rilldata/rill/runtime/drivers/druid" + "github.com/rilldata/rill/runtime/pkg/jsonval" ) const ( @@ -97,16 +98,26 @@ func (e *Executor) CacheKey(ctx context.Context) ([]byte, bool, error) { return nil, false, err } defer res.Close() - var key string + var key any if res.Next() { if err := res.Scan(&key); err != nil { return nil, false, err } + + key, err = jsonval.ToValue(key, res.Schema.Fields[0].Type) + if err != nil { + return nil, false, err + } } if res.Err() != nil { return nil, false, err } - return []byte(key), true, nil + + keyBytes, err := json.Marshal(key) + if err != nil { + return nil, false, err + } + return keyBytes, true, nil } // ValidateQuery validates the provided query against the executor's metrics view. diff --git a/runtime/query.go b/runtime/query.go index f2f9686bdc0..fe0e6add7fc 100644 --- a/runtime/query.go +++ b/runtime/query.go @@ -2,6 +2,7 @@ package runtime import ( "context" + "errors" "fmt" "io" "strings" @@ -80,10 +81,12 @@ func (r *Runtime) Query(ctx context.Context, instanceID string, query Query, pri // Using StateUpdatedOn instead of StateVersion because the state version is reset when the resource is deleted and recreated. key := fmt.Sprintf("%s:%s:%d:%d", res.Meta.Name.Kind, res.Meta.Name.Name, res.Meta.StateUpdatedOn.Seconds, res.Meta.StateUpdatedOn.Nanos/int32(time.Millisecond)) if mv := res.GetMetricsView(); mv != nil { - cacheKey, err := r.metricsViewCacheKey(ctx, instanceID, res.Meta.Name.Name, priority) + cacheKey, ok, err := r.metricsViewCacheKey(ctx, instanceID, res.Meta.Name.Name, priority) if err != nil { + return err + } + if !ok { // skip caching - // the cache_key_resolver should ideally only return an error if caching is disabled or context is cancelled return query.Resolve(ctx, r, instanceID, priority) } key = key + ":" + string(cacheKey) @@ -141,7 +144,7 @@ func (r *Runtime) Query(ctx context.Context, instanceID string, query Query, pri return nil } -func (r *Runtime) metricsViewCacheKey(ctx context.Context, instanceID, name string, priority int) ([]byte, error) { +func (r *Runtime) metricsViewCacheKey(ctx context.Context, instanceID, name string, priority int) ([]byte, bool, error) { cacheKeyResolver, err := r.Resolve(ctx, &ResolveOptions{ InstanceID: instanceID, Resolver: "metrics_cache_key", @@ -150,13 +153,16 @@ func (r *Runtime) metricsViewCacheKey(ctx context.Context, instanceID, name stri Claims: &SecurityClaims{SkipChecks: true}, }) if err != nil { - return nil, err + if errors.Is(err, ErrMetricsViewCachingDisabled) { + return nil, false, nil + } + return nil, false, err } cacheKey, err := cacheKeyResolver.MarshalJSON() if err != nil { - return nil, err + return nil, false, err } - return cacheKey, nil + return cacheKey, true, nil } type queryCacheKey struct { diff --git a/runtime/resolver.go b/runtime/resolver.go index 33452c10db6..b68fca82562 100644 --- a/runtime/resolver.go +++ b/runtime/resolver.go @@ -6,6 +6,7 @@ import ( "encoding/binary" "encoding/hex" "encoding/json" + "errors" "fmt" "io" "strconv" @@ -16,6 +17,8 @@ import ( "github.com/rilldata/rill/runtime/pkg/jsonval" ) +var ErrMetricsViewCachingDisabled = errors.New("metrics_cache_key: caching is disabled") + // Resolver represents logic, such as a SQL query, that produces output data. // Resolvers are used to evaluate API requests, alerts, reports, etc. // diff --git a/runtime/resolvers/metricsview_cache_key.go b/runtime/resolvers/metricsview_cache_key.go index a2c2cfea50d..b07dbcb7fe6 100644 --- a/runtime/resolvers/metricsview_cache_key.go +++ b/runtime/resolvers/metricsview_cache_key.go @@ -5,19 +5,14 @@ import ( "errors" "fmt" "io" - "strconv" - "strings" "time" - "github.com/mitchellh/hashstructure/v2" runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1" "github.com/rilldata/rill/runtime" "github.com/rilldata/rill/runtime/metricsview" "github.com/rilldata/rill/runtime/pkg/mapstructureutil" ) -var errCachingDisabled = errors.New("metrics_cache_key: caching is disabled") - func init() { runtime.RegisterResolverInitializer("metrics_cache_key", newMetricsViewCacheKeyResolver) } @@ -36,12 +31,12 @@ type metricsViewCacheKeyResolverArgs struct { Priority int `mapstructure:"priority"` } -type metricsViewCacheKey struct { +type metricsViewCacheKeyProps struct { MetricsView string `mapstructure:"metrics_view"` } func newMetricsViewCacheKeyResolver(ctx context.Context, opts *runtime.ResolverOptions) (runtime.Resolver, error) { - tr := &metricsViewCacheKey{} + tr := &metricsViewCacheKeyProps{} if err := mapstructureutil.WeakDecode(opts.Properties, tr); err != nil { return nil, err } @@ -95,36 +90,22 @@ func (r *metricsViewCacheKeyResolver) Close() error { return nil } -func (r *metricsViewCacheKeyResolver) Cacheable() bool { - return true -} - func (r *metricsViewCacheKeyResolver) CacheKey(ctx context.Context) ([]byte, bool, error) { - var sb strings.Builder - sb.WriteString(runtime.ResourceKindMetricsView) - sb.WriteString(":") - sb.WriteString(r.mvName) - sb.WriteString(":") - sb.WriteString("cahe_key") - ttlSeconds := r.mv.CacheKeyTtlSeconds - if ttlSeconds == 0 && r.streaming { + var key string + ttl := time.Duration(r.mv.CacheKeyTtlSeconds) * time.Second + if ttl == 0 && r.streaming { // If streaming, we need to cache the key for 60 seconds - // For non streaming metrics view we don't need to expire the key as data itself will be invalidated basis the ref's state version - ttlSeconds = 60 + // For non streaming metrics view we don't need to expire the key as it will be invalidated basis the ref's state version + ttl = time.Minute } - if ttlSeconds != 0 { - sb.WriteString(":") - sb.WriteString(truncateTime(time.Now(), ttlSeconds).Format(time.RFC3339)) + if ttl != 0 { + key = time.Now().Truncate(ttl).Format(time.RFC3339) } - hash, err := hashstructure.Hash(sb.String(), hashstructure.FormatV2, nil) - if err != nil { - return nil, false, err - } - return []byte(strconv.FormatUint(hash, 16)), true, nil + return []byte(key), true, nil } func (r *metricsViewCacheKeyResolver) Refs() []*runtimev1.ResourceName { - return []*runtimev1.ResourceName{} + return []*runtimev1.ResourceName{{Kind: runtime.ResourceKindMetricsView, Name: r.mvName}} } func (r *metricsViewCacheKeyResolver) Validate(ctx context.Context) error { @@ -137,7 +118,7 @@ func (r *metricsViewCacheKeyResolver) ResolveInteractive(ctx context.Context) (r return nil, err } if !ok { - return nil, errCachingDisabled + return nil, runtime.ErrMetricsViewCachingDisabled } schema := &runtimev1.StructType{ Fields: []*runtimev1.StructType_Field{ @@ -150,10 +131,3 @@ func (r *metricsViewCacheKeyResolver) ResolveInteractive(ctx context.Context) (r func (r *metricsViewCacheKeyResolver) ResolveExport(ctx context.Context, w io.Writer, opts *runtime.ResolverExportOptions) error { return errors.New("not implemented") } - -func truncateTime(t time.Time, seconds int64) time.Time { - // Convert x seconds to a duration - duration := time.Duration(seconds) * time.Second - // Truncate the time to the nearest x seconds - return t.Truncate(duration) -} diff --git a/runtime/resolvers/sql.go b/runtime/resolvers/sql.go index 123a70a0d96..a387b8ca202 100644 --- a/runtime/resolvers/sql.go +++ b/runtime/resolvers/sql.go @@ -87,13 +87,6 @@ func (r *sqlResolver) Close() error { return nil } -func (r *sqlResolver) Cacheable() bool { - if r.olap.Dialect() == drivers.DialectDuckDB { - return len(r.refs) != 0 - } - return false -} - func (r *sqlResolver) CacheKey(ctx context.Context) ([]byte, bool, error) { if r.olap.Dialect() == drivers.DialectDuckDB || r.olap.Dialect() == drivers.DialectClickHouse { return []byte(r.sql), len(r.refs) != 0, nil diff --git a/runtime/resolvers/util.go b/runtime/resolvers/util.go index 8f5df91b577..15fedadb1b0 100644 --- a/runtime/resolvers/util.go +++ b/runtime/resolvers/util.go @@ -3,6 +3,8 @@ package resolvers import ( "context" "errors" + "fmt" + "io" "slices" "strings" @@ -36,14 +38,24 @@ func cacheKeyForMetricsView(ctx context.Context, r *runtime.Runtime, instanceID, Claims: &runtime.SecurityClaims{SkipChecks: true}, }) if err != nil { - if errors.Is(err, errCachingDisabled) { + if errors.Is(err, runtime.ErrMetricsViewCachingDisabled) { return nil, false, nil } return nil, false, err } - cacheKey, err := cacheKeyResolver.MarshalJSON() + defer cacheKeyResolver.Close() + + row, err := cacheKeyResolver.Next() if err != nil { + if errors.Is(err, io.EOF) { + return nil, false, fmt.Errorf("`metrics_cache_key` resolver returned no rows") + } return nil, false, err } - return cacheKey, true, nil + res, ok := row["key"].(string) + if !ok { + // should never happen but just in case + return nil, false, errors.New("`metrics_cache_key`: expected a column key of type string in result") + } + return []byte(res), true, nil }