diff --git a/provider-service/base/pkg/util/util.go b/provider-service/base/pkg/util/util.go new file mode 100644 index 00000000..8c2770e7 --- /dev/null +++ b/provider-service/base/pkg/util/util.go @@ -0,0 +1,51 @@ +package util + +import ( + "fmt" + "github.com/sky-uk/kfp-operator/argo/common" + "regexp" + "strings" +) + +var FieldMatcher = regexp.MustCompile(`\S+`) + +const OutputSeparator = " " +const StdNumFields = 5 +const GoNumFields = 6 + +type CronSchedule struct { + fields []string +} + +func (cs CronSchedule) PrintStandard() string { + return strings.Join(cs.fields[1:], OutputSeparator) +} + +func (cs CronSchedule) PrintGo() string { + return strings.Join(cs.fields, OutputSeparator) +} + +func ParseCron(schedule string) (CronSchedule, error) { + fields := FieldMatcher.FindAllString(schedule, -1) + + if len(fields) > GoNumFields { + return CronSchedule{}, fmt.Errorf("too many fields for go cron schedule") + } + if len(fields) < StdNumFields { + return CronSchedule{}, fmt.Errorf("too few fields for standard cron schedule") + } + + if len(fields) == StdNumFields { + return CronSchedule{ + fields: append([]string{"0"}, fields...), + }, nil + } else { + return CronSchedule{ + fields: fields, + }, nil + } +} + +func ResourceNameFromNamespacedName(namespacedName common.NamespacedName) (string, error) { + return namespacedName.SeparatedString("-") +} diff --git a/provider-service/base/pkg/util/util_test.go b/provider-service/base/pkg/util/util_test.go new file mode 100644 index 00000000..31c48d95 --- /dev/null +++ b/provider-service/base/pkg/util/util_test.go @@ -0,0 +1,53 @@ +//go:build unit + +package util + +import ( + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/sky-uk/kfp-operator/argo/common" +) + +var _ = Context("cron parser", func() { + _ = Describe("should accept standard cron schedules", func() { + schedule, err := ParseCron(" a b c d e f ") + Expect(err).NotTo(HaveOccurred()) + Expect(schedule.PrintGo()).To(Equal("a b c d e f")) + Expect(schedule.PrintStandard()).To(Equal("b c d e f")) + }) + + _ = Describe("should accept go cron schedules", func() { + schedule, err := ParseCron(" b c d e f ") + Expect(err).NotTo(HaveOccurred()) + Expect(schedule.PrintGo()).To(Equal("0 b c d e f")) + Expect(schedule.PrintStandard()).To(Equal("b c d e f")) + }) + + _ = Describe("should not parse when fields are missing", func() { + _, err := ParseCron("* * * *") + Expect(err).To(HaveOccurred()) + }) + + _ = Describe("should not parse when too many fields are present", func() { + _, err := ParseCron("* * * * * * *") + Expect(err).To(HaveOccurred()) + }) +}) + +var _ = Context("ResourceNameFromNamespacedName", func() { + _ = Describe("should return string separated with hyphens", func() { + result, err := ResourceNameFromNamespacedName(common.NamespacedName{ + Namespace: "my-namespace", + Name: "my-name", + }) + Expect(err).NotTo(HaveOccurred()) + Expect(result).To(Equal("my-namespace-my-name")) + }) + + _ = Describe("should return error when namespace only set", func() { + _, err := ResourceNameFromNamespacedName(common.NamespacedName{ + Namespace: "my-namespace", + }) + Expect(err).To(HaveOccurred()) + }) +}) diff --git a/provider-service/vai/go.mod b/provider-service/vai/go.mod index afca729d..d6afbb11 100644 --- a/provider-service/vai/go.mod +++ b/provider-service/vai/go.mod @@ -3,9 +3,9 @@ module github.com/sky-uk/kfp-operator/provider-service/vai go 1.20 require ( - cloud.google.com/go/aiplatform v1.48.0 + cloud.google.com/go/aiplatform v1.54.0 cloud.google.com/go/pubsub v1.33.0 - github.com/go-logr/logr v1.2.3 + github.com/go-logr/logr v1.3.0 github.com/golang/mock v1.6.0 github.com/googleapis/gax-go/v2 v2.12.0 github.com/onsi/ginkgo/v2 v2.8.0 @@ -13,25 +13,29 @@ require ( github.com/sky-uk/kfp-operator/argo/common v0.0.0-20241107144142-477120a374b9 github.com/sky-uk/kfp-operator/provider-service/base v0.0.0-00010101000000-000000000000 go.uber.org/zap v1.24.0 - google.golang.org/api v0.136.0 + google.golang.org/api v0.154.0 google.golang.org/protobuf v1.31.0 ) replace github.com/sky-uk/kfp-operator/provider-service/base => ../base require ( - cloud.google.com/go v0.110.6 // indirect - cloud.google.com/go/compute v1.23.0 // indirect + cloud.google.com/go v0.110.10 // indirect + cloud.google.com/go/compute v1.23.3 // indirect cloud.google.com/go/compute/metadata v0.2.3 // indirect - cloud.google.com/go/iam v1.1.1 // indirect - cloud.google.com/go/longrunning v0.5.1 // indirect + cloud.google.com/go/iam v1.1.5 // indirect + cloud.google.com/go/longrunning v0.5.4 // indirect + cloud.google.com/go/storage v1.36.0 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/docker/distribution v2.7.1+incompatible // indirect github.com/emicklei/go-restful/v3 v3.8.0 // indirect github.com/evanphx/json-patch v5.6.0+incompatible // indirect + github.com/felixge/httpsnoop v1.0.4 // indirect github.com/fsnotify/fsnotify v1.6.0 // indirect + github.com/fsouza/fake-gcs-server v1.47.7 // indirect + github.com/go-logr/stdr v1.2.2 // indirect github.com/go-logr/zapr v1.2.3 // indirect github.com/go-openapi/jsonpointer v0.19.5 // indirect github.com/go-openapi/jsonreference v0.20.0 // indirect @@ -41,11 +45,14 @@ require ( github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.3 // indirect github.com/google/gnostic v0.5.7-v3refs // indirect - github.com/google/go-cmp v0.5.9 // indirect + github.com/google/go-cmp v0.6.0 // indirect github.com/google/gofuzz v1.2.0 // indirect - github.com/google/s2a-go v0.1.4 // indirect - github.com/google/uuid v1.3.0 // indirect - github.com/googleapis/enterprise-certificate-proxy v0.2.5 // indirect + github.com/google/renameio/v2 v2.0.0 // indirect + github.com/google/s2a-go v0.1.7 // indirect + github.com/google/uuid v1.5.0 // indirect + github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect + github.com/gorilla/handlers v1.5.2 // indirect + github.com/gorilla/mux v1.8.1 // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/imdario/mergo v0.3.15 // indirect github.com/josharian/intern v1.0.0 // indirect @@ -60,6 +67,7 @@ require ( github.com/opencontainers/go-digest v1.0.0 // indirect github.com/pelletier/go-toml/v2 v2.0.8 // indirect github.com/pkg/errors v0.9.1 // indirect + github.com/pkg/xattr v0.4.9 // indirect github.com/prometheus/client_golang v1.15.1 // indirect github.com/prometheus/client_model v0.3.0 // indirect github.com/prometheus/common v0.42.0 // indirect @@ -73,22 +81,28 @@ require ( github.com/subosito/gotenv v1.4.2 // indirect github.com/thanhpk/randstr v1.0.4 // indirect go.opencensus.io v0.24.0 // indirect + go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.1 // indirect + go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.46.1 // indirect + go.opentelemetry.io/otel v1.21.0 // indirect + go.opentelemetry.io/otel/metric v1.21.0 // indirect + go.opentelemetry.io/otel/trace v1.21.0 // indirect go.uber.org/atomic v1.9.0 // indirect go.uber.org/multierr v1.8.0 // indirect golang.org/x/crypto v0.25.0 // indirect golang.org/x/net v0.27.0 // indirect - golang.org/x/oauth2 v0.11.0 // indirect + golang.org/x/oauth2 v0.15.0 // indirect golang.org/x/sync v0.7.0 // indirect golang.org/x/sys v0.22.0 // indirect golang.org/x/term v0.22.0 // indirect golang.org/x/text v0.16.0 // indirect golang.org/x/time v0.6.0 // indirect + golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect gomodules.xyz/jsonpatch/v2 v2.2.0 // indirect - google.golang.org/appengine v1.6.7 // indirect - google.golang.org/genproto v0.0.0-20230803162519-f966b187b2e5 // indirect - google.golang.org/genproto/googleapis/api v0.0.0-20230803162519-f966b187b2e5 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20230807174057-1744710a1577 // indirect - google.golang.org/grpc v1.57.0 // indirect + google.golang.org/appengine v1.6.8 // indirect + google.golang.org/genproto v0.0.0-20231120223509-83a465c0220f // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20231120223509-83a465c0220f // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20231127180814-3a041ad873d4 // indirect + google.golang.org/grpc v1.60.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect diff --git a/provider-service/vai/go.sum b/provider-service/vai/go.sum index 96c230ec..14b723ad 100644 --- a/provider-service/vai/go.sum +++ b/provider-service/vai/go.sum @@ -22,8 +22,12 @@ cloud.google.com/go v0.79.0/go.mod h1:3bzgcEeQlzbuEAYu4mrWhKqWjmpprinYgKJLgKHnbb cloud.google.com/go v0.81.0/go.mod h1:mk/AM35KwGk/Nm2YSeZbxXdrNK3KZOYHmLkOqC2V6E0= cloud.google.com/go v0.110.6 h1:8uYAkj3YHTP/1iwReuHPxLSbdcyc+dSBbzFMrVwDR6Q= cloud.google.com/go v0.110.6/go.mod h1:+EYjdK8e5RME/VY/qLCAtuyALQ9q67dvuum8i+H5xsI= +cloud.google.com/go v0.110.10 h1:LXy9GEO+timppncPIAZoOj3l58LIU9k+kn48AN7IO3Y= +cloud.google.com/go v0.110.10/go.mod h1:v1OoFqYxiBkUrruItNM3eT4lLByNjxmJSV/xDKJNnic= cloud.google.com/go/aiplatform v1.48.0 h1:M5davZWCTzE043rJCn+ZLW6hSxfG1KAx4vJTtas2/ec= cloud.google.com/go/aiplatform v1.48.0/go.mod h1:Iu2Q7sC7QGhXUeOhAj/oCK9a+ULz1O4AotZiqjQ8MYA= +cloud.google.com/go/aiplatform v1.54.0 h1:wH7OYl9Vq/5tupok0BPTFY9xaTLb0GxkReHtB5PF7cI= +cloud.google.com/go/aiplatform v1.54.0/go.mod h1:pwZMGvqe0JRkI1GWSZCtnAfrR4K1bv65IHILGA//VEU= cloud.google.com/go/bigquery v1.0.1/go.mod h1:i/xbL2UlR5RvWAURpBYZTtm/cXjCha9lbfbpx4poX+o= cloud.google.com/go/bigquery v1.3.0/go.mod h1:PjpwJnslEMmckchkHFfq+HTD2DmtT67aNFKH1/VBDHE= cloud.google.com/go/bigquery v1.4.0/go.mod h1:S8dzgnTigyfTmLBfrtrhyYhwRxG72rYxvftPBK2Dvzc= @@ -32,6 +36,8 @@ cloud.google.com/go/bigquery v1.7.0/go.mod h1://okPTzCYNXSlb24MZs83e2Do+h+VXtc4g cloud.google.com/go/bigquery v1.8.0/go.mod h1:J5hqkt3O0uAFnINi6JXValWIb1v0goeZM77hZzJN/fQ= cloud.google.com/go/compute v1.23.0 h1:tP41Zoavr8ptEqaW6j+LQOnyBBhO7OkOMAGrgLopTwY= cloud.google.com/go/compute v1.23.0/go.mod h1:4tCnrn48xsqlwSAiLf1HXMQk8CONslYbdiEZc9FEIbM= +cloud.google.com/go/compute v1.23.3 h1:6sVlXXBmbd7jNX0Ipq0trII3e4n1/MsADLK6a+aiVlk= +cloud.google.com/go/compute v1.23.3/go.mod h1:VCgBUoMnIVIR0CscqQiPJLAG25E3ZRZMzcFZeQ+h8CI= cloud.google.com/go/compute/metadata v0.2.3 h1:mg4jlk7mCAj6xXp9UJ4fjI9VUI5rubuGBW5aJ7UnBMY= cloud.google.com/go/compute/metadata v0.2.3/go.mod h1:VAV5nSsACxMJvgaAuX6Pk2AawlZn8kiOGuCv6gTkwuA= cloud.google.com/go/datastore v1.0.0/go.mod h1:LXYbyblFSglQ5pkeyhO+Qmw7ukd3C+pD7TKLgZqpHYE= @@ -39,9 +45,14 @@ cloud.google.com/go/datastore v1.1.0/go.mod h1:umbIZjpQpHh4hmRpGhH4tLFup+FVzqBi1 cloud.google.com/go/firestore v1.1.0/go.mod h1:ulACoGHTpvq5r8rxGJ4ddJZBZqakUQqClKRT5SZwBmk= cloud.google.com/go/iam v1.1.1 h1:lW7fzj15aVIXYHREOqjRBV9PsH0Z6u8Y46a1YGvQP4Y= cloud.google.com/go/iam v1.1.1/go.mod h1:A5avdyVL2tCppe4unb0951eI9jreack+RJ0/d+KUZOU= +cloud.google.com/go/iam v1.1.5 h1:1jTsCu4bcsNsE4iiqNT5SHwrDRCfRmIaaaVFhRveTJI= +cloud.google.com/go/iam v1.1.5/go.mod h1:rB6P/Ic3mykPbFio+vo7403drjlgvoWfYpJhMXEbzv8= cloud.google.com/go/kms v1.15.0 h1:xYl5WEaSekKYN5gGRyhjvZKM22GVBBCzegGNVPy+aIs= +cloud.google.com/go/kms v1.15.5 h1:pj1sRfut2eRbD9pFRjNnPNg/CzJPuQAzUujMIM1vVeM= cloud.google.com/go/longrunning v0.5.1 h1:Fr7TXftcqTudoyRJa113hyaqlGdiBQkp0Gq7tErFDWI= cloud.google.com/go/longrunning v0.5.1/go.mod h1:spvimkwdz6SPWKEt/XBij79E9fiTkHSQl/fRUUQJYJc= +cloud.google.com/go/longrunning v0.5.4 h1:w8xEcbZodnA2BbW6sVirkkoC+1gP8wS57EUUgGS0GVg= +cloud.google.com/go/longrunning v0.5.4/go.mod h1:zqNVncI0BOP8ST6XQD1+VcvuShMmq7+xFSzOL++V0dI= cloud.google.com/go/pubsub v1.0.1/go.mod h1:R0Gpsv3s54REJCy4fxDixWD93lHJMoZTyQ2kNxGRt3I= cloud.google.com/go/pubsub v1.1.0/go.mod h1:EwwdRX2sKPjnvnqCa270oGRyludottCI76h+R3AArQw= cloud.google.com/go/pubsub v1.2.0/go.mod h1:jhfEVHT8odbXTkndysNHCcx0awwzvfOlguIAii9o8iA= @@ -54,6 +65,8 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs= cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0= cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3fOKtUw0Xmo= +cloud.google.com/go/storage v1.36.0 h1:P0mOkAcaJxhCTvAkMhxMfrTKiNcub4YmmPBtlhAyTr8= +cloud.google.com/go/storage v1.36.0/go.mod h1:M6M/3V/D3KpzMTJyPOR/HU6n2Si5QdaXYEsng2xgOs8= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= github.com/Azure/go-ansiterm v0.0.0-20210608223527-2377c96fe795/go.mod h1:LmzpDX56iTiv29bbRTIsUNlaFfuhWRQBWjQdVyAevI8= github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E= @@ -154,6 +167,8 @@ github.com/evanphx/json-patch v5.6.0+incompatible h1:jBYDEEiFBPxA0v50tFdvOzQQTCv github.com/evanphx/json-patch v5.6.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= +github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= +github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k= github.com/form3tech-oss/jwt-go v3.2.3+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k= github.com/frankban/quicktest v1.14.4 h1:g2rn0vABPOOXmZUj+vbmUp0lPoXEMuhTpIluN0XL9UY= @@ -161,6 +176,8 @@ github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMo github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= +github.com/fsouza/fake-gcs-server v1.47.7 h1:56/U4rKY081TaNbq0gHWi7/71UxC2KROqcnrD9BRJhs= +github.com/fsouza/fake-gcs-server v1.47.7/go.mod h1:4vPUynN8/zZlxk5Jpy6LvvTTxItdTAObK4DYnp89Jys= github.com/getkin/kin-openapi v0.76.0/go.mod h1:660oXbgy5JFMKreazJaQTw7o+X00qeSyhcnluiMv+Xg= github.com/getsentry/raven-go v0.2.0/go.mod h1:KungGk8q33+aIAZUIVWZDr2OfAEBsO49PX4NzFV5kcQ= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= @@ -179,6 +196,10 @@ github.com/go-logr/logr v1.2.0/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbV github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0= github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.3.0 h1:2y3SDp0ZXuc6/cjLSZ+Q3ir+QB9T/iG5yYRXqsagWSY= +github.com/go-logr/logr v1.3.0/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-logr/zapr v1.2.0/go.mod h1:Qa4Bsj2Vb+FAVeAKsLD8RLQ+YRJB8YDmOAKxaBQf7Ro= github.com/go-logr/zapr v1.2.3 h1:a9vnzlIBPQBBkeaR9IuMUfmVOrQlkoC4YfPoFkX3T7A= github.com/go-logr/zapr v1.2.3/go.mod h1:eIauM6P8qSvTw5o2ez6UEAfGjQKrxQTl5EoK+Qa2oG4= @@ -259,6 +280,7 @@ github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/gofuzz v1.1.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= @@ -278,14 +300,23 @@ github.com/google/pprof v0.0.0-20201203190320-1bf35d6f28c2/go.mod h1:kpwsk12EmLe github.com/google/pprof v0.0.0-20201218002935-b9804c9f04c2/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/pprof v0.0.0-20210122040257-d980be63207e/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/pprof v0.0.0-20210226084205-cbba55b83ad5/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= +github.com/google/renameio v0.1.0 h1:GOZbcHa3HfsPKPlmyPyN2KEohoMXOhdMbHrvbpl2QaA= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= +github.com/google/renameio/v2 v2.0.0 h1:UifI23ZTGY8Tt29JbYFiuyIU3eX+RNFtUwefq9qAhxg= +github.com/google/renameio/v2 v2.0.0/go.mod h1:BtmJXm5YlszgC+TD4HOEEUFgkJP3nLxehU6hfe7jRt4= github.com/google/s2a-go v0.1.4 h1:1kZ/sQM3srePvKs3tXAvQzo66XfcReoqFpIpIccE7Oc= github.com/google/s2a-go v0.1.4/go.mod h1:Ej+mSEMGRnqRzjc7VtF+jdBwYG5fuJfiZ8ELkjEwM0A= +github.com/google/s2a-go v0.1.7 h1:60BLSyTrOV4/haCDW4zb1guZItoSq8foHCXrAnjBo/o= +github.com/google/s2a-go v0.1.7/go.mod h1:50CgR4k1jNlWBu4UfS4AcfhVe1r6pdZPygJ3R8F0Qdw= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.5.0 h1:1p67kYwdtXjb0gL0BPiP1Av9wiZPo5A8z2cWkTZ+eyU= +github.com/google/uuid v1.5.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/enterprise-certificate-proxy v0.2.5 h1:UR4rDjcgpgEnqpIEvkiqTYKBCKLNmlge2eVjoZfySzM= github.com/googleapis/enterprise-certificate-proxy v0.2.5/go.mod h1:RxW0N9901Cko1VOCW3SXCpWP+mlIEkk2tP7jnHy9a3w= +github.com/googleapis/enterprise-certificate-proxy v0.3.2 h1:Vie5ybvEvT75RniqhfFxPRy3Bf7vr3h0cechB90XaQs= +github.com/googleapis/enterprise-certificate-proxy v0.3.2/go.mod h1:VLSiSSBs/ksPL8kq3OBOQ6WRI2QnaFynd1DCjZ62+V0= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= github.com/googleapis/gax-go/v2 v2.12.0 h1:A+gCJKdRfqXkr+BIRGtZLibNXf0m1f9E4HG56etFpas= @@ -294,7 +325,11 @@ github.com/googleapis/gnostic v0.5.1/go.mod h1:6U4PtQXGIEt/Z3h5MAT7FNofLnw9vXk2c github.com/googleapis/gnostic v0.5.5/go.mod h1:7+EbHbldMins07ALC74bsA81Ovc97DwqyJO1AENw9kA= github.com/googleapis/google-cloud-go-testing v0.0.0-20200911160855-bcd43fbb19e8/go.mod h1:dvDLG8qkwmyD9a/MJJN3XJcT3xFxOKAvTZGvuZmac9g= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= +github.com/gorilla/handlers v1.5.2 h1:cLTUSsNkgcwhgRqvCNmdbRWG0A3N4F+M2nWKdScwyEE= +github.com/gorilla/handlers v1.5.2/go.mod h1:dX+xVpaxdSw+q0Qek8SSsl3dfMk3jNddUkMzo0GtH0w= github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= +github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY= +github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA= github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs= @@ -435,6 +470,8 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/sftp v1.10.1/go.mod h1:lYOWFsE0bwd1+KfKJaKeuokY15vzFx25BLbzYYoAxZI= github.com/pkg/sftp v1.13.1/go.mod h1:3HaPG6Dq1ILlpPZRO0HVMrsydcdLt6HRDccSgb87qRg= +github.com/pkg/xattr v0.4.9 h1:5883YPCtkSd8LFbs13nXplj9g9tlrwoJRjgpgMu1/fE= +github.com/pkg/xattr v0.4.9/go.mod h1:di8WF84zAKk8jzR1UBTEWh9AUlIZZ7M/JNt8e9B6ktU= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI= @@ -562,17 +599,28 @@ go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E= go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= +go.opentelemetry.io/contrib v0.20.0 h1:ubFQUn0VCZ0gPwIoJfBJVpeBlyRMxu8Mm/huKWYd9p0= go.opentelemetry.io/contrib v0.20.0/go.mod h1:G/EtFaa6qaN7+LxqfIAT3GiZa7Wv5DTBUzl5H4LY0Kc= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.20.0/go.mod h1:oVGt1LRbBOBq1A5BQLlUg9UaU/54aiHw8cgjV3aWZ/E= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.1 h1:SpGay3w+nEwMpfVnbqOLH5gY52/foP8RE8UzTZ1pdSE= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.1/go.mod h1:4UoMYEZOC0yN/sPGH76KPkkU7zgiEWYWL9vwmbnTJPE= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.20.0/go.mod h1:2AboqHi0CiIZU0qwhtUfCYD1GeUzvvIXWNkhDt7ZMG4= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.46.1 h1:aFJWCqJMNjENlcleuuOkGAPH82y0yULBScfXcIEdS24= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.46.1/go.mod h1:sEGXWArGqc3tVa+ekntsN65DmVbVeW+7lTKTjZF3/Fo= go.opentelemetry.io/otel v0.20.0/go.mod h1:Y3ugLH2oa81t5QO+Lty+zXf8zC9L26ax4Nzoxm/dooo= +go.opentelemetry.io/otel v1.21.0 h1:hzLeKBZEL7Okw2mGzZ0cc4k/A7Fta0uoPgaJCr8fsFc= +go.opentelemetry.io/otel v1.21.0/go.mod h1:QZzNPQPm1zLX4gZK4cMi+71eaorMSGT3A4znnUvNNEo= go.opentelemetry.io/otel/exporters/otlp v0.20.0/go.mod h1:YIieizyaN77rtLJra0buKiNBOm9XQfkPEKBeuhoMwAM= go.opentelemetry.io/otel/metric v0.20.0/go.mod h1:598I5tYlH1vzBjn+BTuhzTCSb/9debfNp6R3s7Pr1eU= +go.opentelemetry.io/otel/metric v1.21.0 h1:tlYWfeo+Bocx5kLEloTjbcDwBuELRrIFxwdQ36PlJu4= +go.opentelemetry.io/otel/metric v1.21.0/go.mod h1:o1p3CA8nNHW8j5yuQLdc1eeqEaPfzug24uvsyIEJRWM= go.opentelemetry.io/otel/oteltest v0.20.0/go.mod h1:L7bgKf9ZB7qCwT9Up7i9/pn0PWIa9FqQ2IQ8LoxiGnw= go.opentelemetry.io/otel/sdk v0.20.0/go.mod h1:g/IcepuwNsoiX5Byy2nNV0ySUF1em498m7hBWC279Yc= go.opentelemetry.io/otel/sdk/export/metric v0.20.0/go.mod h1:h7RBNMsDJ5pmI1zExLi+bJK+Dr8NQCh0qGhm1KDnNlE= go.opentelemetry.io/otel/sdk/metric v0.20.0/go.mod h1:knxiS8Xd4E/N+ZqKmUPf3gTTZ4/0TjTXukfxjzSTpHE= go.opentelemetry.io/otel/trace v0.20.0/go.mod h1:6GjCW8zgDjwGHGa6GkyeB8+/5vjT16gUEi0Nf1iBdgw= +go.opentelemetry.io/otel/trace v1.21.0 h1:WD9i5gzvoUPuXIXH24ZNBudiarZDKuekPqi/E8fpfLc= +go.opentelemetry.io/otel/trace v1.21.0/go.mod h1:LGbsEB0f9LGjN+OZaQQ26sohbOmiMR+BaslueVtS/qQ= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= @@ -709,6 +757,8 @@ golang.org/x/oauth2 v0.0.0-20210514164344-f6687ab2804c/go.mod h1:KelEdhl1UZF7XfJ golang.org/x/oauth2 v0.0.0-20210819190943-2bc19b11175f/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/oauth2 v0.11.0 h1:vPL4xzxBM4niKCW6g9whtaWVXTJf1U5e4aZxxFx/gbU= golang.org/x/oauth2 v0.11.0/go.mod h1:LdF7O/8bLR/qWK9DrpXmbHLTouvRHK0SgJl0GmDBchk= +golang.org/x/oauth2 v0.15.0 h1:s8pnnxNVzjWyrvYdFUQq5llS1PX2zhPXmccZv99h7uQ= +golang.org/x/oauth2 v0.15.0/go.mod h1:q48ptWNTY5XWf+JNten23lcvHpLJ0ZSxF5ttTHKVCAM= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -787,6 +837,7 @@ golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210831042530-f4d43177bf5e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220408201424-a24fb2fb8a0f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -881,6 +932,8 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 h1:H2TDz8ibqkAF6YGhCdN3jS9O0/s90v0rJh3X/OLHEUk= +golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8= gomodules.xyz/jsonpatch/v2 v2.2.0 h1:4pT439QV83L+G9FkcCriY6EkpcK6r6bK+A5FBUMI7qY= gomodules.xyz/jsonpatch/v2 v2.2.0/go.mod h1:WXp+iVDkoLQqPudfQ9GBlwB2eZ5DKOnjQZCYdOS8GPY= google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE= @@ -907,6 +960,8 @@ google.golang.org/api v0.43.0/go.mod h1:nQsDGjRXMo4lvh5hP0TKqF244gqhGcr/YSIykhUk google.golang.org/api v0.44.0/go.mod h1:EBOGZqzyhtvMDoxwS97ctnh0zUmYY6CxqXsc1AvkYD8= google.golang.org/api v0.136.0 h1:e/6enzUE1s4tGPa6Q3ZYShKTtvRc+1Jq0rrafhppmOs= google.golang.org/api v0.136.0/go.mod h1:XtJfF+V2zgUxelOn5Zs3kECtluMxneJG8ZxUTlLNTPA= +google.golang.org/api v0.154.0 h1:X7QkVKZBskztmpPKWQXgjJRPA2dJYrL6r+sYPRLj050= +google.golang.org/api v0.154.0/go.mod h1:qhSMkM85hgqiokIYsrRyKxrjfBeIhgl4Z2JmeRkYylc= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= @@ -915,6 +970,8 @@ google.golang.org/appengine v1.6.5/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCID google.golang.org/appengine v1.6.6/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= google.golang.org/appengine v1.6.7 h1:FZR1q0exgwxzPzp/aF+VccGrSfxfPpkBqjIIEq3ru6c= google.golang.org/appengine v1.6.7/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= +google.golang.org/appengine v1.6.8 h1:IhEN5q69dyKagZPYMSdIjS2HqprW324FRQZJcGqPAsM= +google.golang.org/appengine v1.6.8/go.mod h1:1jJ3jBArFh5pcgW8gCtRJnepW8FzD1V44FJffLiz/Ds= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20190307195333-5fe7a883aa19/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= google.golang.org/genproto v0.0.0-20190418145605-e7d98fc518a7/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= @@ -964,10 +1021,16 @@ google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c/go.mod h1:UODoCrxH google.golang.org/genproto v0.0.0-20210831024726-fe130286e0e2/go.mod h1:eFjDcFEctNawg4eG61bRv87N7iHBWyVhJu7u1kqDUXY= google.golang.org/genproto v0.0.0-20230803162519-f966b187b2e5 h1:L6iMMGrtzgHsWofoFcihmDEMYeDR9KN/ThbPWGrh++g= google.golang.org/genproto v0.0.0-20230803162519-f966b187b2e5/go.mod h1:oH/ZOT02u4kWEp7oYBGYFFkCdKS/uYR9Z7+0/xuuFp8= +google.golang.org/genproto v0.0.0-20231120223509-83a465c0220f h1:Vn+VyHU5guc9KjB5KrjI2q0wCOWEOIh0OEsleqakHJg= +google.golang.org/genproto v0.0.0-20231120223509-83a465c0220f/go.mod h1:nWSwAFPb+qfNJXsoeO3Io7zf4tMSfN8EA8RlDA04GhY= google.golang.org/genproto/googleapis/api v0.0.0-20230803162519-f966b187b2e5 h1:nIgk/EEq3/YlnmVVXVnm14rC2oxgs1o0ong4sD/rd44= google.golang.org/genproto/googleapis/api v0.0.0-20230803162519-f966b187b2e5/go.mod h1:5DZzOUPCLYL3mNkQ0ms0F3EuUNZ7py1Bqeq6sxzI7/Q= +google.golang.org/genproto/googleapis/api v0.0.0-20231120223509-83a465c0220f h1:2yNACc1O40tTnrsbk9Cv6oxiW8pxI/pXj0wRtdlYmgY= +google.golang.org/genproto/googleapis/api v0.0.0-20231120223509-83a465c0220f/go.mod h1:Uy9bTZJqmfrw2rIBxgGLnamc78euZULUBrLZ9XTITKI= google.golang.org/genproto/googleapis/rpc v0.0.0-20230807174057-1744710a1577 h1:wukfNtZmZUurLN/atp2hiIeTKn7QJWIQdHzqmsOnAOk= google.golang.org/genproto/googleapis/rpc v0.0.0-20230807174057-1744710a1577/go.mod h1:+Bk1OCOj40wS2hwAMA+aCW9ypzm63QTBBHp6lQ3p+9M= +google.golang.org/genproto/googleapis/rpc v0.0.0-20231127180814-3a041ad873d4 h1:DC7wcm+i+P1rN3Ff07vL+OndGg5OhNddHyTA+ocPqYE= +google.golang.org/genproto/googleapis/rpc v0.0.0-20231127180814-3a041ad873d4/go.mod h1:eJVxU6o+4G1PSczBr85xmyvSNYAKvAYgkub40YGomFM= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM= @@ -993,6 +1056,8 @@ google.golang.org/grpc v1.40.0/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9K google.golang.org/grpc v1.45.0/go.mod h1:lN7owxKUQEqMfSyQikvvk5tf/6zMPsrK+ONuO11+0rQ= google.golang.org/grpc v1.57.0 h1:kfzNeI/klCGD2YPMUlaGNT3pxvYfga7smW3Vth8Zsiw= google.golang.org/grpc v1.57.0/go.mod h1:Sd+9RMTACXwmub0zcNY2c4arhtrbBYD1AUHI/dt16Mo= +google.golang.org/grpc v1.60.0 h1:6FQAR0kM31P6MRdeluor2w2gPaS4SVNrD/DNTxrQ15k= +google.golang.org/grpc v1.60.0/go.mod h1:OlCHIeLYqSSsLi6i49B5QGdzaMZK9+M7LXN2FKz4eGM= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= diff --git a/provider-service/vai/go.work.sum b/provider-service/vai/go.work.sum index d80e8502..44a99277 100644 --- a/provider-service/vai/go.work.sum +++ b/provider-service/vai/go.work.sum @@ -2,7 +2,6 @@ bazil.org/fuse v0.0.0-20180421153158-65cc252bf669 h1:FNCRpXiquG1aoyqcIWVFmpTSKVc cel.dev/expr v0.15.0 h1:O1jzfJCQBfL5BFoYktaxwIhuttaQPsVWerH9/EEKx0w= cel.dev/expr v0.15.0/go.mod h1:TRSuuV7DlVCE/uwv5QbAiW/v8l5O8C4eEPHeu7gf7Sg= cloud.google.com/go v0.110.8/go.mod h1:Iz8AkXJf1qmxC3Oxoep8R1T36w8B92yU29PcBhHO5fk= -cloud.google.com/go v0.110.10/go.mod h1:v1OoFqYxiBkUrruItNM3eT4lLByNjxmJSV/xDKJNnic= cloud.google.com/go v0.112.0/go.mod h1:3jEEVwZ/MHU4djK5t5RHuKOA/GbLddgTdVubX1qnPD4= cloud.google.com/go v0.112.1 h1:uJSeirPke5UNZHIb4SxfZklVSiWWVqW4oXlETwZziwM= cloud.google.com/go v0.112.1/go.mod h1:+Vbu+Y1UU+I1rjmzeMOb/8RfkKJK2Gyxi1X6jJCZLo4= @@ -78,7 +77,6 @@ cloud.google.com/go/cloudtasks v1.12.1 h1:cMh9Q6dkvh+Ry5LAPbD/U2aw6KAqdiU6Fttwhb cloud.google.com/go/cloudtasks v1.12.6 h1:EUt1hIZ9bLv8Iz9yWaCrqgMnIU+Tdh0yXM1MMVGhjfE= cloud.google.com/go/cloudtasks v1.12.6/go.mod h1:b7c7fe4+TJsFZfDyzO51F7cjq7HLUlRi/KZQLQjDsaY= cloud.google.com/go/compute v1.23.1/go.mod h1:CqB3xpmPKKt3OJpW2ndFIXnA9A4xAy/F3Xp1ixncW78= -cloud.google.com/go/compute v1.23.3/go.mod h1:VCgBUoMnIVIR0CscqQiPJLAG25E3ZRZMzcFZeQ+h8CI= cloud.google.com/go/compute v1.23.4/go.mod h1:/EJMj55asU6kAFnuZET8zqgwgJ9FvXWXOkkfQZa4ioI= cloud.google.com/go/compute v1.24.0/go.mod h1:kw1/T+h/+tK2LJK0wiPPx1intgdAM3j/g3hFDlscY40= cloud.google.com/go/compute v1.25.1 h1:ZRpHJedLtTpKgr3RV1Fx23NuaAEN1Zfx9hw1u4aJdjU= @@ -176,7 +174,6 @@ cloud.google.com/go/gsuiteaddons v1.6.1 h1:mi9jxZpzVjLQibTS/XfPZvl+Jr6D5Bs8pGqUj cloud.google.com/go/gsuiteaddons v1.6.5 h1:CZEbaBwmbYdhFw21Fwbo+C35HMe36fTE0FBSR4KSfWg= cloud.google.com/go/gsuiteaddons v1.6.5/go.mod h1:Lo4P2IvO8uZ9W+RaC6s1JVxo42vgy+TX5a6hfBZ0ubs= cloud.google.com/go/iam v1.1.3/go.mod h1:3khUlaBXfPKKe7huYgEpDn6FtgRyMEqbkvBxrQyY5SE= -cloud.google.com/go/iam v1.1.5/go.mod h1:rB6P/Ic3mykPbFio+vo7403drjlgvoWfYpJhMXEbzv8= cloud.google.com/go/iam v1.1.6 h1:bEa06k05IO4f4uJonbB5iAgKTPpABy1ayxaIZV/GHVc= cloud.google.com/go/iam v1.1.6/go.mod h1:O0zxdPeGBoFdWW3HWmBxJsk0pfvNM/p/qa82rWOGTwI= cloud.google.com/go/iap v1.8.1 h1:X1tcp+EoJ/LGX6cUPt3W2D4H2Kbqq0pLAsldnsCjLlE= @@ -200,7 +197,6 @@ cloud.google.com/go/lifesciences v0.9.5/go.mod h1:OdBm0n7C0Osh5yZB7j9BXyrMnTRGBJ cloud.google.com/go/logging v1.7.0 h1:CJYxlNNNNAMkHp9em/YEXcfJg+rPDg7YfwoRpMU+t5I= cloud.google.com/go/logging v1.9.0 h1:iEIOXFO9EmSiTjDmfpbRjOxECO7R8C7b8IXUGOj7xZw= cloud.google.com/go/logging v1.9.0/go.mod h1:1Io0vnZv4onoUnsVUQY3HZ3Igb1nBchky0A0y7BBBhE= -cloud.google.com/go/longrunning v0.5.4/go.mod h1:zqNVncI0BOP8ST6XQD1+VcvuShMmq7+xFSzOL++V0dI= cloud.google.com/go/longrunning v0.5.5 h1:GOE6pZFdSrTb4KAiKnXsJBtlE6mEyaW44oKyMILWnOg= cloud.google.com/go/longrunning v0.5.5/go.mod h1:WV2LAxD8/rg5Z1cNW6FJ/ZpX4E4VnDnoTk0yawPBB7s= cloud.google.com/go/managedidentities v1.6.1 h1:2/qZuOeLgUHorSdxSQGtnOu9xQkBn37+j+oZQv/KHJY= @@ -577,8 +573,6 @@ github.com/fatih/color v1.14.1 h1:qfhVLaG5s+nCROl1zJsZRxFeYrHLqWroPOQ8BWiNb4w= github.com/fatih/color v1.14.1/go.mod h1:2oHN61fhTpgcxD3TSWCgKDiH1+x4OiDVVGH8WlgGZGg= github.com/fatih/structs v1.1.0 h1:Q7juDM0QtcnhCpeyLGQKyg4TOIghuNXrkL32pHAUMxo= github.com/felixge/httpsnoop v1.0.3 h1:s/nj+GCswXYzN5v2DpNMuMQYe+0DDwt5WVCU6CWBdXk= -github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= -github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568 h1:BHsljHzVlRcyQhjrss6TZTdY2VfCqZPbv5k3iBFa2ZQ= github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90 h1:WXb3TSNmHp2vHoCroCIB1foO/yQ36swABL8aOVeDpgg= github.com/form3tech-oss/jwt-go v3.2.3+incompatible h1:7ZaBxOI7TMoYBfyA3cQHErNNyAWIKUMIwqxEtgHOs5c= @@ -614,8 +608,6 @@ github.com/go-kit/log v0.2.1 h1:MRVx0/zhvdseW+Gza6N9rVzU/IVzaeE1SFI4raAhmBU= github.com/go-kit/log v0.2.1/go.mod h1:NwTd00d/i8cPZ3xOwwiv2PO5MOcx78fFErGNcVmBjv0= github.com/go-logfmt/logfmt v0.5.1 h1:otpy5pqBCBZ1ng9RQ0dPu4PN7ba75Y/aA+UpowDyNVA= github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs= -github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= -github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-openapi/analysis v0.21.2 h1:hXFrOYFHUAMQdu6zwAiKKJHJQ8kqZs1ux/ru1P1wLJU= github.com/go-openapi/analysis v0.21.4 h1:ZDFLvSNxpDaomuCueM0BlSXxpANBlFYiBvr+GXrvIHc= github.com/go-openapi/errors v0.20.2 h1:dxy7PGTqEh94zj2E3h1cUmQQWiM1+aeCROfAr02EmK8= @@ -697,14 +689,9 @@ github.com/google/martian/v3 v3.3.2 h1:IqNFLAmvJOgVlpdEBiQbDc2EwKW77amAycfTuWKdf github.com/google/martian/v3 v3.3.2/go.mod h1:oBOf6HBosgwRXnUGWUB05QECsc6uvmMiJ3+6W4l/CUk= github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38 h1:yAJXTCF9TqKcTiHJAE8dj7HMvPfh66eeA2JYW7eFpSE= github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1 h1:K6RDEckDVWvDI9JAJYCmNdQXq6neHJOYx3V6jnqNEec= -github.com/google/renameio v0.1.0 h1:GOZbcHa3HfsPKPlmyPyN2KEohoMXOhdMbHrvbpl2QaA= -github.com/google/s2a-go v0.1.7 h1:60BLSyTrOV4/haCDW4zb1guZItoSq8foHCXrAnjBo/o= -github.com/google/s2a-go v0.1.7/go.mod h1:50CgR4k1jNlWBu4UfS4AcfhVe1r6pdZPygJ3R8F0Qdw= github.com/google/subcommands v1.0.1 h1:/eqq+otEXm5vhfBrbREPCSVQbvofip6kIz+mX5TUH7k= github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/wire v0.4.0 h1:kXcsA/rIGzJImVqPdhfnr6q0xsS9gU0515q1EPpJ9fE= -github.com/googleapis/enterprise-certificate-proxy v0.3.2 h1:Vie5ybvEvT75RniqhfFxPRy3Bf7vr3h0cechB90XaQs= -github.com/googleapis/enterprise-certificate-proxy v0.3.2/go.mod h1:VLSiSSBs/ksPL8kq3OBOQ6WRI2QnaFynd1DCjZ62+V0= github.com/googleapis/gax-go/v2 v2.12.1/go.mod h1:61M8vcyyXR2kqKFxKrfA22jaA8JGF7Dc8App1U3H6jc= github.com/googleapis/gax-go/v2 v2.12.2/go.mod h1:61M8vcyyXR2kqKFxKrfA22jaA8JGF7Dc8App1U3H6jc= github.com/googleapis/gax-go/v2 v2.12.3 h1:5/zPPDvw8Q1SuXjrqrZslrqT7dL/uJT2CQii/cLCKqA= @@ -788,7 +775,6 @@ github.com/itchyny/gojq v0.12.6 h1:VjaFn59Em2wTxDNGcrRkDK9ZHMNa8IksOgL13sLL4d0= github.com/itchyny/gojq v0.12.12 h1:x+xGI9BXqKoJQZkr95ibpe3cdrTbY8D9lonrK433rcA= github.com/itchyny/timefmt-go v0.1.3 h1:7M3LGVDsqcd0VZH2U+x393obrzZisp7C0uEe921iRkU= github.com/itchyny/timefmt-go v0.1.5 h1:G0INE2la8S6ru/ZI5JecgyzbbJNs5lG1RcBqa7Jm6GE= -github.com/jarcoal/httpmock v1.3.1/go.mod h1:3yb8rc4BI7TCBhFY8ng0gjuLKJNquuDNiPaZjnENuYg= github.com/jawher/mow.cli v1.1.0 h1:NdtHXRc0CwZQ507wMvQ/IS+Q3W3x2fycn973/b8Zuk8= github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 h1:BQSFePA1RWJOlocH6Fxy8MmwDt+yVQYULKfN0RoTN8A= github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8= @@ -824,6 +810,8 @@ github.com/kevinburke/ssh_config v1.2.0 h1:x584FjTGwHzMwvHx18PXxbBVzfnxogHaAReU4 github.com/kisielk/errcheck v1.5.0 h1:e8esj/e4R+SAOwFwN+n3zr0nYeCyeweozKfO23MvHzY= github.com/kisielk/gotool v1.0.0 h1:AV2c/EiW3KqPNT9ZKl07ehoAGi4C5/01Cfbblndcapg= github.com/klauspost/compress v1.16.5 h1:IFV2oUNUzZaz+XyusxpLzpzS8Pt5rh0Z16For/djlyI= +github.com/klauspost/compress v1.17.4 h1:Ej5ixsIri7BrIjBkRZLTo6ghwrEtHFk7ijlczPW4fZ4= +github.com/klauspost/compress v1.17.4/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM= github.com/klauspost/cpuid v1.3.1 h1:5JNjFYYQrZeKRJ0734q51WCEEn2huer72Dc7K+R/b6s= github.com/klauspost/cpuid/v2 v2.2.4 h1:acbojRNwl3o09bUq+yDCtZFc1aiwaAAxtcn8YkZXnvk= github.com/klauspost/cpuid/v2 v2.2.5 h1:0E5MSMDEoAulmXNFquVs//DdoomxaoTY1kUhbc/qbZg= @@ -1073,7 +1061,6 @@ go.etcd.io/etcd/raft/v3 v3.5.0 h1:kw2TmO3yFTgE+F0mdKkG7xMxkit2duBDa2Hu6D/HMlw= go.etcd.io/etcd/server/v3 v3.5.0 h1:jk8D/lwGEDlQU9kZXUFMSANkE22Sg5+mW27ip8xcF9E= go.mongodb.org/mongo-driver v1.8.2 h1:8ssUXufb90ujcIvR6MyE1SchaNj0SFxsakiZgxIyrMk= go.mongodb.org/mongo-driver v1.11.1 h1:QP0znIRTuL0jf1oBQoAoM0C6ZJfBK4kx0Uumtv1A7w8= -go.opentelemetry.io/contrib v0.20.0 h1:ubFQUn0VCZ0gPwIoJfBJVpeBlyRMxu8Mm/huKWYd9p0= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.20.0 h1:sO4WKdPAudZGKPcpZT4MJn6JaDmpyLrMPDGGyA1SttE= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.47.0/go.mod h1:r9vWsPS/3AQItv3OSlEJ/E4mbrhUbbw18meOjArPtKQ= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.48.0/go.mod h1:tIKj3DbO8N9Y2xo52og3irLsPI4GW02DSMtrVgNMgxg= @@ -1149,7 +1136,6 @@ golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= golang.org/x/oauth2 v0.5.0/go.mod h1:9/XBHVqLaWO3/BRHs5jbpYCnOZVjj5V0ndyaAM7KB4I= golang.org/x/oauth2 v0.13.0/go.mod h1:/JMhi4ZRXAf4HG9LiNmxvk+45+96RUlVThiH8FzNBn0= -golang.org/x/oauth2 v0.15.0/go.mod h1:q48ptWNTY5XWf+JNten23lcvHpLJ0ZSxF5ttTHKVCAM= golang.org/x/oauth2 v0.16.0/go.mod h1:hqZ+0LWXsiVoZpeld6jVt06P3adbS2Uu911W1SsJv2o= golang.org/x/oauth2 v0.17.0/go.mod h1:OzPDGQiuQMguemayvdylqddI7qcD9lnSDb+1FiwQ5HA= golang.org/x/oauth2 v0.18.0/go.mod h1:Wf7knwG0MPoWIMMBgFlEaSUDaKskp0dCfrlJRJXbBi8= @@ -1201,8 +1187,6 @@ golang.org/x/tools v0.14.0 h1:jvNa2pY0M4r62jkRQ6RwEZZyPcymeL9XZMLBbV7U2nc= golang.org/x/tools v0.14.0/go.mod h1:uYBEerGOWcJyEORxN+Ek8+TT266gXkNlHdJBwexUsBg= golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d h1:vU5i/LfpvrRCpgM/VPfJLg5KjxD3E+hfT1SH+d9zLwg= golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk= -golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 h1:H2TDz8ibqkAF6YGhCdN3jS9O0/s90v0rJh3X/OLHEUk= -golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8= golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 h1:+cNy6SZtPcJQH3LJVLOSmiC7MMxXNOb3PU/VUEz+EhU= golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028/go.mod h1:NDW/Ps6MPRej6fsCIbMTohpP40sJ/P/vI1MoTEGwX90= gomodules.xyz/envconfig v1.3.1-0.20190308184047-426f31af0d45 h1:juzzlx91nWAOsHuOVfXZPMXHtJEKouZvY9bBbwlOeYs= @@ -1219,8 +1203,6 @@ google.golang.org/api v0.167.0/go.mod h1:4FcBc686KFi7QI/U51/2GKKevfZMpM17sCdibqe google.golang.org/api v0.169.0/go.mod h1:gpNOiMA2tZ4mf5R9Iwf4rK/Dcz0fbdIgWYWVoxmsyLg= google.golang.org/api v0.171.0 h1:w174hnBPqut76FzW5Qaupt7zY8Kql6fiVjgys4f58sU= google.golang.org/api v0.171.0/go.mod h1:Hnq5AHm4OTMt2BUVjael2CWZFD6vksJdWCWiUAmjC9o= -google.golang.org/appengine v1.6.8 h1:IhEN5q69dyKagZPYMSdIjS2HqprW324FRQZJcGqPAsM= -google.golang.org/appengine v1.6.8/go.mod h1:1jJ3jBArFh5pcgW8gCtRJnepW8FzD1V44FJffLiz/Ds= google.golang.org/genproto v0.0.0-20231016165738-49dd2c1f3d0b/go.mod h1:CgAqfJo+Xmu0GwA0411Ht3OU3OntXwsGmrmjI8ioGXI= google.golang.org/genproto v0.0.0-20231106174013-bbf56f31fb17/go.mod h1:J7XzRzVy1+IPwWHZUzoD0IccYZIrXILAQpc+Qy9CMhY= google.golang.org/genproto v0.0.0-20240116215550-a9fa1716bcac/go.mod h1:+Rvu7ElI+aLzyDQhpHMFMMltsD6m7nqpuWDd2CwJw3k= diff --git a/provider-service/vai/internal/client/pipeline_job_client.go b/provider-service/vai/internal/client/pipeline_job_client.go index 1e9e378b..233be349 100644 --- a/provider-service/vai/internal/client/pipeline_job_client.go +++ b/provider-service/vai/internal/client/pipeline_job_client.go @@ -13,4 +13,10 @@ type PipelineJobClient interface { req *aiplatformpb.GetPipelineJobRequest, opts ...gax.CallOption, ) (*aiplatformpb.PipelineJob, error) + + CreatePipelineJob( + ctx context.Context, + req *aiplatformpb.CreatePipelineJobRequest, + opts ...gax.CallOption, + ) (*aiplatformpb.PipelineJob, error) } diff --git a/provider-service/vai/internal/client/schedule_client.go b/provider-service/vai/internal/client/schedule_client.go new file mode 100644 index 00000000..d13baad4 --- /dev/null +++ b/provider-service/vai/internal/client/schedule_client.go @@ -0,0 +1,29 @@ +package client + +import ( + "context" + + aiplatform "cloud.google.com/go/aiplatform/apiv1" + "cloud.google.com/go/aiplatform/apiv1/aiplatformpb" + "github.com/googleapis/gax-go/v2" +) + +type ScheduleClient interface { + CreateSchedule( + ctx context.Context, + req *aiplatformpb.CreateScheduleRequest, + opts ...gax.CallOption, + ) (*aiplatformpb.Schedule, error) + + DeleteSchedule( + ctx context.Context, + req *aiplatformpb.DeleteScheduleRequest, + opts ...gax.CallOption, + ) (*aiplatform.DeleteScheduleOperation, error) + + UpdateSchedule( + ctx context.Context, + req *aiplatformpb.UpdateScheduleRequest, + opts ...gax.CallOption, + ) (*aiplatformpb.Schedule, error) +} diff --git a/provider-service/vai/internal/mocks/file_handler.go b/provider-service/vai/internal/mocks/file_handler.go new file mode 100644 index 00000000..e3ef9524 --- /dev/null +++ b/provider-service/vai/internal/mocks/file_handler.go @@ -0,0 +1,24 @@ +package mocks + +import "github.com/stretchr/testify/mock" + +type MockFileHandler struct{ mock.Mock } + +func (m *MockFileHandler) Write(p []byte, bucket string, filePath string) error { + args := m.Called(p, bucket, filePath) + return args.Error(0) +} + +func (m *MockFileHandler) Delete(id string, bucket string) error { + args := m.Called(id, bucket) + return args.Error(0) +} + +func (m *MockFileHandler) Read(bucket string, filePath string) (map[string]any, error) { + args := m.Called(bucket, filePath) + var data map[string]any + if arg0 := args.Get(0); arg0 != nil { + data = arg0.(map[string]any) + } + return data, args.Error(1) +} diff --git a/provider-service/vai/internal/mocks/job_builder.go b/provider-service/vai/internal/mocks/job_builder.go new file mode 100644 index 00000000..53912d98 --- /dev/null +++ b/provider-service/vai/internal/mocks/job_builder.go @@ -0,0 +1,45 @@ +package mocks + +import ( + "cloud.google.com/go/aiplatform/apiv1/aiplatformpb" + "github.com/sky-uk/kfp-operator/provider-service/base/pkg/server/resource" + "github.com/stretchr/testify/mock" +) + +type MockJobBuilder struct{ mock.Mock } + +func (m *MockJobBuilder) MkRunPipelineJob( + rd resource.RunDefinition, +) (*aiplatformpb.PipelineJob, error) { + args := m.Called(rd) + var pipelineJob *aiplatformpb.PipelineJob + if arg0 := args.Get(0); arg0 != nil { + pipelineJob = arg0.(*aiplatformpb.PipelineJob) + } + return pipelineJob, args.Error(1) +} + +func (m *MockJobBuilder) MkRunSchedulePipelineJob( + rsd resource.RunScheduleDefinition, +) (*aiplatformpb.PipelineJob, error) { + args := m.Called(rsd) + var pipelineJob *aiplatformpb.PipelineJob + if arg0 := args.Get(0); arg0 != nil { + pipelineJob = arg0.(*aiplatformpb.PipelineJob) + } + return pipelineJob, args.Error(1) +} + +func (m *MockJobBuilder) MkSchedule( + rsd resource.RunScheduleDefinition, + pipelineJob *aiplatformpb.PipelineJob, + parent string, + maxConcurrentRunCount int64, +) (*aiplatformpb.Schedule, error) { + args := m.Called(rsd, pipelineJob, parent, maxConcurrentRunCount) + var schedule *aiplatformpb.Schedule + if arg0 := args.Get(0); arg0 != nil { + schedule = arg0.(*aiplatformpb.Schedule) + } + return schedule, args.Error(1) +} diff --git a/provider-service/vai/internal/mocks/job_enricher.go b/provider-service/vai/internal/mocks/job_enricher.go new file mode 100644 index 00000000..fbdb5cc8 --- /dev/null +++ b/provider-service/vai/internal/mocks/job_enricher.go @@ -0,0 +1,20 @@ +package mocks + +import ( + "cloud.google.com/go/aiplatform/apiv1/aiplatformpb" + "github.com/stretchr/testify/mock" +) + +type MockJobEnricher struct{ mock.Mock } + +func (m *MockJobEnricher) Enrich( + job *aiplatformpb.PipelineJob, + raw map[string]any, +) (*aiplatformpb.PipelineJob, error) { + args := m.Called(job, raw) + var pipelineJob *aiplatformpb.PipelineJob + if arg0 := args.Get(0); arg0 != nil { + pipelineJob = arg0.(*aiplatformpb.PipelineJob) + } + return pipelineJob, args.Error(1) +} diff --git a/provider-service/vai/internal/mocks/label_gen.go b/provider-service/vai/internal/mocks/label_gen.go new file mode 100644 index 00000000..3701da76 --- /dev/null +++ b/provider-service/vai/internal/mocks/label_gen.go @@ -0,0 +1,24 @@ +package mocks + +import ( + "fmt" + + "github.com/sky-uk/kfp-operator/provider-service/base/pkg/server/resource" +) + +type MockLabelGen struct{} + +func (lg MockLabelGen) GenerateLabels(value any) (map[string]string, error) { + switch v := value.(type) { + case resource.RunDefinition: + return map[string]string{ + "rd-key": "rd-value", + }, nil + case resource.RunScheduleDefinition: + return map[string]string{ + "rsd-key": "rsd-value", + }, nil + default: + return nil, fmt.Errorf("Unexpected value of type %T", v) + } +} diff --git a/provider-service/vai/internal/mocks/pipeline_job_client.go b/provider-service/vai/internal/mocks/pipeline_job_client.go index 19d8610f..81e7c8d9 100644 --- a/provider-service/vai/internal/mocks/pipeline_job_client.go +++ b/provider-service/vai/internal/mocks/pipeline_job_client.go @@ -18,7 +18,6 @@ func (m *MockPipelineJobClient) GetPipelineJob( _ ...gax.CallOption, ) (*aiplatformpb.PipelineJob, error) { args := m.Called(req) - var pipelineJob *aiplatformpb.PipelineJob if arg0 := args.Get(0); arg0 != nil { pipelineJob = arg0.(*aiplatformpb.PipelineJob) @@ -26,3 +25,16 @@ func (m *MockPipelineJobClient) GetPipelineJob( return pipelineJob, args.Error(1) } + +func (m *MockPipelineJobClient) CreatePipelineJob( + _ context.Context, + req *aiplatformpb.CreatePipelineJobRequest, + _ ...gax.CallOption, +) (*aiplatformpb.PipelineJob, error) { + args := m.Called(req) + var pipelineJob *aiplatformpb.PipelineJob + if arg1 := args.Get(0); arg1 != nil { + pipelineJob = arg1.(*aiplatformpb.PipelineJob) + } + return pipelineJob, args.Error(1) +} diff --git a/provider-service/vai/internal/mocks/schedule_client.go b/provider-service/vai/internal/mocks/schedule_client.go new file mode 100644 index 00000000..c07d9a58 --- /dev/null +++ b/provider-service/vai/internal/mocks/schedule_client.go @@ -0,0 +1,50 @@ +package mocks + +import ( + "context" + + aiplatform "cloud.google.com/go/aiplatform/apiv1" + "cloud.google.com/go/aiplatform/apiv1/aiplatformpb" + "github.com/googleapis/gax-go/v2" + "github.com/stretchr/testify/mock" +) + +type MockScheduleClient struct{ mock.Mock } + +func (m *MockScheduleClient) CreateSchedule( + _ context.Context, + req *aiplatformpb.CreateScheduleRequest, + _ ...gax.CallOption, +) (*aiplatformpb.Schedule, error) { + args := m.Called(req) + var schedule *aiplatformpb.Schedule + if arg1 := args.Get(0); arg1 != nil { + schedule = arg1.(*aiplatformpb.Schedule) + } + return schedule, args.Error(1) +} + +func (m *MockScheduleClient) DeleteSchedule( + _ context.Context, + req *aiplatformpb.DeleteScheduleRequest, + _ ...gax.CallOption, +) (*aiplatform.DeleteScheduleOperation, error) { + args := m.Called(req) + var operation *aiplatform.DeleteScheduleOperation + if arg1 := args.Get(0); arg1 != nil { + operation = arg1.(*aiplatform.DeleteScheduleOperation) + } + return operation, args.Error(1) +} +func (m *MockScheduleClient) UpdateSchedule( + _ context.Context, + req *aiplatformpb.UpdateScheduleRequest, + _ ...gax.CallOption, +) (*aiplatformpb.Schedule, error) { + args := m.Called(req) + var schedule *aiplatformpb.Schedule + if arg1 := args.Get(0); arg1 != nil { + schedule = arg1.(*aiplatformpb.Schedule) + } + return schedule, args.Error(1) +} diff --git a/provider-service/vai/internal/provider/file_handler.go b/provider-service/vai/internal/provider/file_handler.go new file mode 100644 index 00000000..db54a67b --- /dev/null +++ b/provider-service/vai/internal/provider/file_handler.go @@ -0,0 +1,114 @@ +package provider + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + + "cloud.google.com/go/storage" + "google.golang.org/api/iterator" + "google.golang.org/api/option" +) + +type FileHandler interface { + Write(content []byte, bucket string, filePath string) error + Delete(id string, bucket string) error + Read(bucket string, filePath string) (map[string]any, error) +} + +type GcsFileHandler struct { + ctx context.Context + gcsClient storage.Client +} + +func NewGcsFileHandler( + ctx context.Context, + gcsEndpoint string, +) (GcsFileHandler, error) { + var client *storage.Client + var err error + if gcsEndpoint != "" { + client, err = storage.NewClient( + ctx, + option.WithoutAuthentication(), + option.WithEndpoint(gcsEndpoint), + ) + } else { + client, err = storage.NewClient(ctx) + } + + if err != nil { + return GcsFileHandler{}, err + } + + return GcsFileHandler{ctx: ctx, gcsClient: *client}, nil +} + +// Write writes bytes into the location inferred by GCS bucket name and +// file path (relative to GCS bucket location). +func (g *GcsFileHandler) Write( + content []byte, + bucket string, + filePath string, +) error { + writer := g.gcsClient.Bucket(bucket).Object(filePath).NewWriter(g.ctx) + + _, err := io.Writer(writer).Write(content) + if err != nil { + return err + } + if err = writer.Close(); err != nil { + return err + } + return nil +} + +// Delete deletes all files inferred by the GCS bucket name and id. +func (g *GcsFileHandler) Delete(id string, bucket string) error { + query := &storage.Query{Prefix: fmt.Sprintf("%s/", id)} + + it := g.gcsClient.Bucket(bucket).Objects(g.ctx, query) + for { + attrs, err := it.Next() + if errors.Is(err, iterator.Done) { + break + } + if err != nil { + return err + } + + err = g.gcsClient.Bucket(bucket).Object(attrs.Name).Delete(g.ctx) + if err != nil { + return err + } + } + return nil +} + +// Read reads and returns the unmarshalled from the location inferred by the +// GCS bucket name and file path. +func (g *GcsFileHandler) Read( + bucket string, + filePath string, +) (map[string]any, error) { + reader, err := g.gcsClient.Bucket(bucket).Object(filePath).NewReader(g.ctx) + if err != nil { + return nil, err + } + + buf := new(bytes.Buffer) + _, err = buf.ReadFrom(reader) + if err != nil { + return nil, err + } + + raw := map[string]any{} + err = json.Unmarshal(buf.Bytes(), &raw) + if err != nil { + return nil, err + } + return raw, nil +} diff --git a/provider-service/vai/internal/provider/file_handler_test.go b/provider-service/vai/internal/provider/file_handler_test.go new file mode 100644 index 00000000..2619205f --- /dev/null +++ b/provider-service/vai/internal/provider/file_handler_test.go @@ -0,0 +1,85 @@ +//go:build unit + +package provider + +import ( + "context" + "encoding/json" + + "github.com/fsouza/fake-gcs-server/fakestorage" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = Describe("GcsFileHandler", Ordered, func() { + var ( + ctx context.Context + server *fakestorage.Server + handler GcsFileHandler + bucket string + filePath string + testData map[string]any + testBytes []byte + ) + + BeforeAll(func() { + var err error + server = fakestorage.NewServer(nil) + Expect(err).ShouldNot(HaveOccurred()) + + ctx = context.Background() + + // fake GCS server doesn't share data between different clients + // (required for a round-trip test), so the same client must be used + // throughout. + handler = GcsFileHandler{ctx, *server.Client()} + Expect(err).ShouldNot(HaveOccurred()) + + bucket = "test-bucket" + filePath = "test-folder/test-file.json" + testData = map[string]any{"key": "value"} + testBytes, _ = json.Marshal(testData) + + err = server.Client().Bucket(bucket).Create(ctx, "test-project", nil) + Expect(err).ShouldNot(HaveOccurred()) + }) + + AfterAll(func() { + server.Stop() + }) + + Context("Write, Read And Delete round trip", func() { + When("Write", func() { + It("should write data to the specified bucket and file path", func() { + err := handler.Write(testBytes, bucket, filePath) + err = handler.Write(testBytes, bucket, "test-folder/test-file2.json") + Expect(err).ShouldNot(HaveOccurred()) + + obj, err := server.Client().Bucket(bucket).Object(filePath).Attrs(ctx) + Expect(err).ShouldNot(HaveOccurred()) + Expect(obj.Name).To(Equal(filePath)) + }) + }) + When("Read", func() { + It("should extract the written data from the bucket", func() { + readData, err := handler.Read(bucket, filePath) + Expect(err).ShouldNot(HaveOccurred()) + Expect(readData).To(Equal(testData)) + }) + }) + When("Delete", func() { + It("should delete the file in the bucket", func() { + err := handler.Delete("test-folder", bucket) + Expect(err).ShouldNot(HaveOccurred()) + + readData, err := handler.Read(bucket, filePath) + Expect(err).Should(HaveOccurred()) + Expect(readData).Should(BeNil()) + + readData, err = handler.Read(bucket, "test-folder/test-file2.json") + Expect(err).Should(HaveOccurred()) + Expect(readData).Should(BeNil()) + }) + }) + }) +}) diff --git a/provider-service/vai/internal/provider/job_builder.go b/provider-service/vai/internal/provider/job_builder.go new file mode 100644 index 00000000..34a66970 --- /dev/null +++ b/provider-service/vai/internal/provider/job_builder.go @@ -0,0 +1,149 @@ +package provider + +import ( + "fmt" + + "cloud.google.com/go/aiplatform/apiv1/aiplatformpb" + "github.com/sky-uk/kfp-operator/provider-service/base/pkg/server/resource" + baseUtil "github.com/sky-uk/kfp-operator/provider-service/base/pkg/util" + "github.com/sky-uk/kfp-operator/provider-service/vai/internal/util" + "google.golang.org/protobuf/types/known/timestamppb" +) + +type JobBuilder interface { + MkRunPipelineJob( + rd resource.RunDefinition, + ) (*aiplatformpb.PipelineJob, error) + MkRunSchedulePipelineJob( + rsd resource.RunScheduleDefinition, + ) (*aiplatformpb.PipelineJob, error) + MkSchedule( + rsd resource.RunScheduleDefinition, + pipelineJob *aiplatformpb.PipelineJob, + parent string, + maxConcurrentRunCount int64, + ) (*aiplatformpb.Schedule, error) +} + +type DefaultJobBuilder struct { + serviceAccount string + pipelineBucket string + labelGen LabelGen +} + +// MkRunPipelineJob creates a vai pipeline job for a run that can be submitted +// to a vai pipeline job client. +func (jb DefaultJobBuilder) MkRunPipelineJob( + rd resource.RunDefinition, +) (*aiplatformpb.PipelineJob, error) { + params := make(map[string]*aiplatformpb.Value, len(rd.RuntimeParameters)) + for name, value := range rd.RuntimeParameters { + params[name] = &aiplatformpb.Value{ + Value: &aiplatformpb.Value_StringValue{ + StringValue: value, + }, + } + } + + templateUri, err := util.PipelineUri( + rd.PipelineName, + rd.PipelineVersion, + jb.pipelineBucket, + ) + if err != nil { + return nil, err + } + + labels, err := jb.labelGen.GenerateLabels(rd) + if err != nil { + return nil, err + } + + job := &aiplatformpb.PipelineJob{ + Labels: labels, + RuntimeConfig: &aiplatformpb.PipelineJob_RuntimeConfig{ + Parameters: params, + }, + ServiceAccount: jb.serviceAccount, + TemplateUri: templateUri, + } + return job, nil +} + +// MkRunScheudlePipelineJob creates a vai pipeline job for a run schedule that +// can be used to create a vai schedule. +func (jb DefaultJobBuilder) MkRunSchedulePipelineJob( + rsd resource.RunScheduleDefinition, +) (*aiplatformpb.PipelineJob, error) { + params := make(map[string]*aiplatformpb.Value, len(rsd.RuntimeParameters)) + for name, value := range rsd.RuntimeParameters { + params[name] = &aiplatformpb.Value{ + Value: &aiplatformpb.Value_StringValue{ + StringValue: value, + }, + } + } + + // Note: unable to migrate from `Parameters` to `ParameterValues` at this + // point as `PipelineJob.pipeline_spec.schema_version` used by TFX + // is 2.0.0 see deprecated comment. + templateUri, err := util.PipelineUri( + rsd.PipelineName, + rsd.PipelineVersion, + jb.pipelineBucket, + ) + if err != nil { + return nil, err + } + + labels, err := jb.labelGen.GenerateLabels(rsd) + if err != nil { + return nil, err + } + + job := &aiplatformpb.PipelineJob{ + Labels: labels, + ServiceAccount: jb.serviceAccount, + RuntimeConfig: &aiplatformpb.PipelineJob_RuntimeConfig{ + Parameters: params, + }, + TemplateUri: templateUri, + } + return job, nil +} + +// MkSchedule create a vai schedule using a vai pipeline job that can be +// submitted to a vai schedule client. +func (jb DefaultJobBuilder) MkSchedule( + rsd resource.RunScheduleDefinition, + pipelineJob *aiplatformpb.PipelineJob, + parent string, + maxConcurrentRunCount int64, +) (*aiplatformpb.Schedule, error) { + cron, err := baseUtil.ParseCron(rsd.Schedule.CronExpression) + if err != nil { + return nil, err + } + + schedule := &aiplatformpb.Schedule{ + TimeSpecification: &aiplatformpb.Schedule_Cron{Cron: cron.PrintStandard()}, + Request: &aiplatformpb.Schedule_CreatePipelineJobRequest{ + CreatePipelineJobRequest: &aiplatformpb.CreatePipelineJobRequest{ + Parent: parent, + PipelineJob: pipelineJob, + }, + }, + DisplayName: fmt.Sprintf("rc-%s-%s", rsd.Name.Namespace, rsd.Name.Name), + MaxConcurrentRunCount: maxConcurrentRunCount, + AllowQueueing: true, + } + + if rsd.Schedule.StartTime != nil { + schedule.StartTime = timestamppb.New(rsd.Schedule.StartTime.Time) + } + + if rsd.Schedule.EndTime != nil { + schedule.EndTime = timestamppb.New(rsd.Schedule.EndTime.Time) + } + return schedule, nil +} diff --git a/provider-service/vai/internal/provider/job_builder_test.go b/provider-service/vai/internal/provider/job_builder_test.go new file mode 100644 index 00000000..957baee9 --- /dev/null +++ b/provider-service/vai/internal/provider/job_builder_test.go @@ -0,0 +1,166 @@ +//go:build unit + +package provider + +import ( + "fmt" + + "cloud.google.com/go/aiplatform/apiv1/aiplatformpb" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/sky-uk/kfp-operator/provider-service/vai/internal/mocks" + "github.com/sky-uk/kfp-operator/provider-service/vai/internal/provider/testutil" + "google.golang.org/protobuf/types/known/timestamppb" +) + +var _ = Describe("JobBuilder", func() { + var jb = DefaultJobBuilder{ + serviceAccount: "service-account", + pipelineBucket: "pipeline-bucket", + labelGen: mocks.MockLabelGen{}, + } + + Context("MkRunPipelineJob", func() { + When("templateUri is valid", func() { + It("should make a run pipeline job", func() { + rd := testutil.RandomBasicRunDefinition() + job, err := jb.MkRunPipelineJob(rd) + expectedTemplateUri := fmt.Sprintf( + "gs://%s/%s/%s/%s", + jb.pipelineBucket, + rd.PipelineName.Namespace, + rd.PipelineName.Name, + rd.PipelineVersion, + ) + + Expect(err).ToNot(HaveOccurred()) + Expect(job.Labels).To(Equal(map[string]string{"rd-key": "rd-value"})) + for k, v := range job.RuntimeConfig.Parameters { + Expect(v.GetStringValue).To(Equal(rd.RuntimeParameters[k])) + } + Expect(job.ServiceAccount).To(Equal(jb.serviceAccount)) + Expect(job.TemplateUri).To(Equal(expectedTemplateUri)) + }) + }) + When("templateUri is invalid", func() { + It("should return error", func() { + rd := testutil.RandomBasicRunDefinition() + rd.PipelineName.Name = "" + _, err := jb.MkRunPipelineJob(rd) + + Expect(err).To(HaveOccurred()) + }) + }) + }) + + Context("MkRunSchedulePipelineJob", func() { + When("templateUri is valid", func() { + It("should make a run schedule pipeline job", func() { + rsd := testutil.RandomRunScheduleDefinition() + job, err := jb.MkRunSchedulePipelineJob(rsd) + expectedTemplateUri := fmt.Sprintf( + "gs://%s/%s/%s/%s", + jb.pipelineBucket, + rsd.PipelineName.Namespace, + rsd.PipelineName.Name, + rsd.PipelineVersion, + ) + + Expect(err).ToNot(HaveOccurred()) + Expect(job.Labels).To(Equal(map[string]string{"rsd-key": "rsd-value"})) + for k, v := range job.RuntimeConfig.Parameters { + Expect(v.GetStringValue).To(Equal(rsd.RuntimeParameters[k])) + } + Expect(job.ServiceAccount).To(Equal(jb.serviceAccount)) + Expect(job.TemplateUri).To(Equal(expectedTemplateUri)) + }) + }) + When("templateUri is invalid", func() { + It("should return error", func() { + rsd := testutil.RandomRunScheduleDefinition() + rsd.PipelineName.Name = "" + _, err := jb.MkRunSchedulePipelineJob(rsd) + + Expect(err).To(HaveOccurred()) + }) + }) + }) + + Context("MkSchedule", func() { + It("should make a Schedule", func() { + rsd := testutil.RandomRunScheduleDefinition() + expectedCron := aiplatformpb.Schedule_Cron{Cron: "1 2 * 1 2"} + rsd.Schedule.CronExpression = expectedCron.Cron + + expectedPipelineJob := aiplatformpb.PipelineJob{Name: "test"} + schedule, err := jb.MkSchedule( + rsd, + &expectedPipelineJob, + "parent", + 1000, + ) + + expectedPipelineJobReq := aiplatformpb.Schedule_CreatePipelineJobRequest{ + CreatePipelineJobRequest: &aiplatformpb.CreatePipelineJobRequest{ + Parent: "parent", + PipelineJob: &expectedPipelineJob, + }, + } + + Expect(err).ToNot(HaveOccurred()) + Expect(schedule.TimeSpecification).To(Equal(&expectedCron)) + Expect(schedule.Request).To(Equal(&expectedPipelineJobReq)) + Expect(schedule.DisplayName).To(Equal(fmt.Sprintf("rc-%s-%s", rsd.Name.Namespace, rsd.Name.Name))) + Expect(schedule.StartTime).To(Equal(timestamppb.New(testutil.Now.Time))) + Expect(schedule.EndTime).To(Equal(timestamppb.New(testutil.Now.Time))) + Expect(schedule.MaxConcurrentRunCount).To(Equal(int64(1000))) + Expect(schedule.AllowQueueing).To(BeTrue()) + }) + When("schedule cron expression is invalid", func() { + It("returns an error", func() { + rsd := testutil.RandomRunScheduleDefinition() + rsd.Schedule.CronExpression = "invalid cron" + + _, err := jb.MkSchedule( + rsd, + &aiplatformpb.PipelineJob{Name: "test"}, + "parent", + 1000, + ) + Expect(err).To(HaveOccurred()) + }) + }) + When("run definition schedule start time is empty", func() { + It("should create a scheudle without start time", func() { + rsd := testutil.RandomRunScheduleDefinition() + rsd.Schedule.StartTime = nil + schedule, err := jb.MkSchedule( + rsd, + &aiplatformpb.PipelineJob{Name: "test"}, + "parent", + 1000, + ) + + Expect(err).ToNot(HaveOccurred()) + Expect(schedule.StartTime).To(BeNil()) + Expect(schedule.EndTime).To(Equal(timestamppb.New(testutil.Now.Time))) + }) + }) + When("run definition schedule end time is empty", func() { + It("should create a scheudle without end time", func() { + rsd := testutil.RandomRunScheduleDefinition() + rsd.Schedule.EndTime = nil + schedule, err := jb.MkSchedule( + rsd, + &aiplatformpb.PipelineJob{Name: "test"}, + "parent", + 1000, + ) + + Expect(err).ToNot(HaveOccurred()) + Expect(schedule.StartTime).To(Equal(timestamppb.New(testutil.Now.Time))) + Expect(schedule.EndTime).To(BeNil()) + }) + }) + }) +}) diff --git a/provider-service/vai/internal/provider/job_enricher.go b/provider-service/vai/internal/provider/job_enricher.go new file mode 100644 index 00000000..3184cffa --- /dev/null +++ b/provider-service/vai/internal/provider/job_enricher.go @@ -0,0 +1,85 @@ +package provider + +import ( + "fmt" + + "cloud.google.com/go/aiplatform/apiv1/aiplatformpb" + "google.golang.org/protobuf/types/known/structpb" +) + +type JobEnricher interface { + Enrich( + job *aiplatformpb.PipelineJob, + raw map[string]any, + ) (*aiplatformpb.PipelineJob, error) +} + +type DefaultJobEnricher struct{} + +// Enrich enriches a pipeline job with specific data from the compiled pipeline; +// which is represented by an unstructured map. However it expects the values to +// have a particular structure. +func (je DefaultJobEnricher) Enrich( + job *aiplatformpb.PipelineJob, + raw map[string]any, +) (*aiplatformpb.PipelineJob, error) { + displayName, ok := raw["displayName"].(string) + if !ok { + return nil, fmt.Errorf( + "expected string for 'displayName', got %T", + raw["displayName"], + ) + } + job.DisplayName = displayName + + pipelineSpec, ok := raw["pipelineSpec"].(map[string]any) + if !ok { + return nil, fmt.Errorf( + "expected map for 'pipelineSpec', got %T", + raw["pipelineSpec"], + ) + } + + pipelineSpecStruct, err := structpb.NewStruct(pipelineSpec) + if err != nil { + return nil, err + } + job.PipelineSpec = pipelineSpecStruct + + labels, ok := raw["labels"].(map[string]any) + if !ok { + return nil, fmt.Errorf( + "expected map for 'labels', got %T", + raw["labels"], + ) + } + if job.Labels == nil { + job.Labels = map[string]string{} + } + for k, v := range labels { + job.Labels[k] = v.(string) + } + + runtimeConfig, ok := raw["runtimeConfig"].(map[string]any) + if !ok { + return nil, fmt.Errorf( + "expected map for 'runtimeConfig', got %T", + raw["runtimeConfig"], + ) + } + + gcsOutputDirectory, ok := runtimeConfig["gcsOutputDirectory"].(string) + if !ok { + return nil, fmt.Errorf( + "expected string for 'gcsOutputDirectory', got %T", + runtimeConfig["gcsOutputDirectory"], + ) + } + + if job.RuntimeConfig == nil { + return nil, fmt.Errorf("job has no RuntimeConfig") + } + job.RuntimeConfig.GcsOutputDirectory = gcsOutputDirectory + + return job, nil +} diff --git a/provider-service/vai/internal/provider/job_enricher_test.go b/provider-service/vai/internal/provider/job_enricher_test.go new file mode 100644 index 00000000..cd796d4b --- /dev/null +++ b/provider-service/vai/internal/provider/job_enricher_test.go @@ -0,0 +1,119 @@ +//go:build unit + +package provider + +import ( + "cloud.google.com/go/aiplatform/apiv1/aiplatformpb" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "google.golang.org/protobuf/types/known/structpb" +) + +var _ = Describe("JobEnricher", func() { + var job aiplatformpb.PipelineJob + var raw map[string]any + var je = DefaultJobEnricher{} + + BeforeEach(func() { + job = aiplatformpb.PipelineJob{ + Labels: map[string]string{"key": "value"}, + RuntimeConfig: &aiplatformpb.PipelineJob_RuntimeConfig{}, + } + + raw = map[string]any{ + "displayName": "test-display-name", + "pipelineSpec": map[string]any{ + "key": "value", + }, + "labels": map[string]any{ + "label-key-from-raw": "label-value-from-raw", + }, + "runtimeConfig": map[string]any{ + "gcsOutputDirectory": "gs://test-bucket", + }, + } + }) + + Context("Enrich", Ordered, func() { + It("should enrich the pipeline job with values from the raw map", func() { + _, err := je.Enrich(&job, raw) + Expect(err).ToNot(HaveOccurred()) + Expect(job.DisplayName).To(Equal(raw["displayName"])) + pipelineSpec, err := structpb.NewStruct( + map[string]any{ + "key": "value", + }, + ) + Expect(err).ToNot(HaveOccurred()) + Expect(job.PipelineSpec).To(Equal(pipelineSpec)) + Expect(job.Labels).To(Equal( + map[string]string{ + "key": "value", + "label-key-from-raw": "label-value-from-raw", + }, + )) + Expect(job.RuntimeConfig.GcsOutputDirectory).To(Equal("gs://test-bucket")) + }) + When("job has no label field", func() { + It("should set the label field to an empty map", func() { + job.Labels = nil + + _, err := je.Enrich(&job, raw) + Expect(err).ToNot(HaveOccurred()) + Expect(job.Labels).To(Equal(map[string]string{ + "label-key-from-raw": "label-value-from-raw", + })) + }) + }) + When("displayName is not a string", func() { + It("should return error", func() { + raw["displayName"] = 123 + + _, err := je.Enrich(&job, raw) + Expect(err).To(HaveOccurred()) + }) + }) + When("pipelineSpec is not a map", func() { + It("should return error", func() { + raw["pipelineSpec"] = 123 + + _, err := je.Enrich(&job, raw) + Expect(err).To(HaveOccurred()) + }) + }) + When("labels is not a map", func() { + It("should return error", func() { + raw["labels"] = 123 + + _, err := je.Enrich(&job, raw) + Expect(err).To(HaveOccurred()) + }) + }) + When("runtimeConfig is not a map", func() { + It("should return error", func() { + raw["runtimeConfig"] = 123 + + _, err := je.Enrich(&job, raw) + Expect(err).To(HaveOccurred()) + }) + }) + When("runtimeConfig.gcsOutputDirectory is not a string", func() { + It("should return error", func() { + raw["runtimeConfig"] = map[string]any{ + "gcsOutputDirectory": 123, + } + + _, err := je.Enrich(&job, raw) + Expect(err).To(HaveOccurred()) + }) + }) + When("job has no RuntimeConfig", func() { + It("should return error", func() { + job.RuntimeConfig = nil + + _, err := je.Enrich(&job, raw) + Expect(err).To(HaveOccurred()) + }) + }) + }) +}) diff --git a/provider-service/vai/internal/provider/label_gen.go b/provider-service/vai/internal/provider/label_gen.go new file mode 100644 index 00000000..9469cff9 --- /dev/null +++ b/provider-service/vai/internal/provider/label_gen.go @@ -0,0 +1,81 @@ +package provider + +import ( + "fmt" + "strings" + + "github.com/sky-uk/kfp-operator/argo/common" + "github.com/sky-uk/kfp-operator/provider-service/base/pkg/server/resource" + "github.com/sky-uk/kfp-operator/provider-service/vai/internal/label" +) + +type LabelGen interface { + GenerateLabels(value any) (map[string]string, error) +} + +type DefaultLabelGen struct{} + +// GenerateLabels generates labels for vertex ai runs and schedules to show +// which run configuration it originated from. +func (lg DefaultLabelGen) GenerateLabels(value any) (map[string]string, error) { + switch v := value.(type) { + case resource.RunDefinition: + return lg.runLabelsFromRunDefinition(v), nil + case resource.RunScheduleDefinition: + return lg.runLabelsFromSchedule(v), nil + default: + return nil, fmt.Errorf( + "Unexpected definition received [%T], expected %T or %T", + value, + resource.RunDefinition{}, + resource.RunScheduleDefinition{}, + ) + } +} + +func (lg DefaultLabelGen) runLabelsFromPipeline( + pipelineName common.NamespacedName, + pipelineVersion string, +) map[string]string { + return map[string]string{ + label.PipelineName: pipelineName.Name, + label.PipelineNamespace: pipelineName.Namespace, + label.PipelineVersion: strings.ReplaceAll(pipelineVersion, ".", "-"), + } +} + +func (lg DefaultLabelGen) runLabelsFromRunDefinition( + rd resource.RunDefinition, +) map[string]string { + runLabels := lg.runLabelsFromPipeline( + rd.PipelineName, + rd.PipelineVersion, + ) + + if !rd.RunConfigurationName.Empty() { + runLabels[label.RunConfigurationName] = + rd.RunConfigurationName.Name + runLabels[label.RunConfigurationNamespace] = + rd.RunConfigurationName.Namespace + } + + if !rd.Name.Empty() { + runLabels[label.RunName] = rd.Name.Name + runLabels[label.RunNamespace] = rd.Name.Namespace + } + + return runLabels +} + +func (lg DefaultLabelGen) runLabelsFromSchedule( + rsd resource.RunScheduleDefinition, +) map[string]string { + runLabels := lg.runLabelsFromPipeline(rsd.PipelineName, rsd.PipelineVersion) + + if !rsd.RunConfigurationName.Empty() { + runLabels[label.RunConfigurationName] = rsd.RunConfigurationName.Name + runLabels[label.RunConfigurationNamespace] = rsd.RunConfigurationName.Namespace + } + + return runLabels +} diff --git a/provider-service/vai/internal/provider/label_gen_test.go b/provider-service/vai/internal/provider/label_gen_test.go new file mode 100644 index 00000000..fd582428 --- /dev/null +++ b/provider-service/vai/internal/provider/label_gen_test.go @@ -0,0 +1,131 @@ +//go:build unit + +package provider + +import ( + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/sky-uk/kfp-operator/argo/common" + "github.com/sky-uk/kfp-operator/provider-service/vai/internal/label" + "github.com/sky-uk/kfp-operator/provider-service/vai/internal/provider/testutil" +) + +var _ = Describe("DefaultLabelGen", func() { + var lg = DefaultLabelGen{} + + Context("GenerateLabels", func() { + When("value is RunDefinition", func() { + It("should not error", func() { + rd := testutil.RandomBasicRunDefinition() + _, err := lg.GenerateLabels(rd) + + Expect(err).ToNot(HaveOccurred()) + }) + }) + When("value is RunScheduleDefinition", func() { + It("should not error", func() { + rsd := testutil.RandomRunScheduleDefinition() + _, err := lg.GenerateLabels(rsd) + + Expect(err).ToNot(HaveOccurred()) + }) + }) + When("value is not RunDefinition or RunScheduleDefinition", func() { + It("should return error", func() { + _, err := lg.GenerateLabels(0) + + Expect(err).To(HaveOccurred()) + }) + }) + }) + + Context("runLabelsFromRunDefinition", func() { + When("RunConfigurationName and RunName is present", func() { + It("generates run labels with RunConfigurationName and RunName", func() { + rd := testutil.RandomBasicRunDefinition() + rl := lg.runLabelsFromRunDefinition(rd) + + Expect(rl[label.PipelineName]).To(Equal(rd.PipelineName.Name)) + Expect(rl[label.PipelineNamespace]).To(Equal(rd.PipelineName.Namespace)) + Expect(rl[label.PipelineVersion]).To(Equal(rd.PipelineVersion)) + Expect(rl[label.RunConfigurationName]).To(Equal(rd.RunConfigurationName.Name)) + Expect(rl[label.RunConfigurationNamespace]).To(Equal(rd.RunConfigurationName.Namespace)) + Expect(rl[label.RunName]).To(Equal(rd.Name.Name)) + Expect(rl[label.RunNamespace]).To(Equal(rd.Name.Namespace)) + }) + }) + When("RunConfigurationName is empty", func() { + It("generates run labels with RunName", func() { + rd := testutil.RandomBasicRunDefinition() + rd.RunConfigurationName = common.NamespacedName{} + rl := lg.runLabelsFromRunDefinition(rd) + + Expect(rl[label.PipelineName]).To(Equal(rd.PipelineName.Name)) + Expect(rl[label.PipelineNamespace]).To(Equal(rd.PipelineName.Namespace)) + Expect(rl[label.PipelineVersion]).To(Equal(rd.PipelineVersion)) + Expect(rl[label.RunName]).To(Equal(rd.Name.Name)) + Expect(rl[label.RunNamespace]).To(Equal(rd.Name.Namespace)) + Expect(rl).NotTo(HaveKey(label.RunConfigurationName)) + Expect(rl).NotTo(HaveKey(label.RunConfigurationNamespace)) + }) + }) + When("RunName is empty", func() { + It("generates run labels with RunName", func() { + rd := testutil.RandomBasicRunDefinition() + rd.Name = common.NamespacedName{} + rl := lg.runLabelsFromRunDefinition(rd) + + Expect(rl[label.PipelineName]).To(Equal(rd.PipelineName.Name)) + Expect(rl[label.PipelineNamespace]).To(Equal(rd.PipelineName.Namespace)) + Expect(rl[label.PipelineVersion]).To(Equal(rd.PipelineVersion)) + Expect(rl[label.RunConfigurationName]).To(Equal(rd.RunConfigurationName.Name)) + Expect(rl[label.RunConfigurationNamespace]).To(Equal(rd.RunConfigurationName.Namespace)) + Expect(rl).NotTo(HaveKey(label.RunName)) + Expect(rl).NotTo(HaveKey(label.RunNamespace)) + }) + }) + It("replaces fullstops with dashes in pipelineVersion", func() { + rd := testutil.RandomBasicRunDefinition() + rd.PipelineVersion = "0.4.0" + rl := lg.runLabelsFromRunDefinition(rd) + + Expect(rl[label.PipelineVersion]).To(Equal("0-4-0")) + }) + }) + + Context("runLabelsFromSchedule", func() { + When("RunConfigurationName is present", func() { + It("generates run labels with RunConfiguration name and namespace", func() { + rsd := testutil.RandomRunScheduleDefinition() + rl := lg.runLabelsFromSchedule(rsd) + + Expect(rl[label.PipelineName]).To(Equal(rsd.PipelineName.Name)) + Expect(rl[label.PipelineNamespace]).To(Equal(rsd.PipelineName.Namespace)) + Expect(rl[label.PipelineVersion]).To(Equal(rsd.PipelineVersion)) + Expect(rl[label.RunConfigurationName]).To(Equal(rsd.RunConfigurationName.Name)) + Expect(rl[label.RunConfigurationNamespace]).To(Equal(rsd.RunConfigurationName.Namespace)) + }) + }) + When("RunConfigurationName is empty", func() { + It("generates run labels without RunConfiguration name and namespace", func() { + rsd := testutil.RandomRunScheduleDefinition() + rsd.RunConfigurationName = common.NamespacedName{} + rl := lg.runLabelsFromSchedule(rsd) + + Expect(rl[label.PipelineName]).To(Equal(rsd.PipelineName.Name)) + Expect(rl[label.PipelineNamespace]).To(Equal(rsd.PipelineName.Namespace)) + Expect(rl[label.PipelineVersion]).To(Equal(rsd.PipelineVersion)) + Expect(rl).NotTo(HaveKey(label.RunConfigurationName)) + Expect(rl).NotTo(HaveKey(label.RunConfigurationNamespace)) + }) + }) + + It("replaces fullstops with dashes in pipelineVersion", func() { + rd := testutil.RandomBasicRunDefinition() + rd.PipelineVersion = "0.4.0" + rl := lg.runLabelsFromRunDefinition(rd) + + Expect(rl[label.PipelineVersion]).To(Equal("0-4-0")) + }) + }) +}) diff --git a/provider-service/vai/internal/provider/provider.go b/provider-service/vai/internal/provider/provider.go new file mode 100644 index 00000000..0c729328 --- /dev/null +++ b/provider-service/vai/internal/provider/provider.go @@ -0,0 +1,322 @@ +package provider + +import ( + "context" + "errors" + "fmt" + + aiplatform "cloud.google.com/go/aiplatform/apiv1" + "cloud.google.com/go/aiplatform/apiv1/aiplatformpb" + "github.com/sky-uk/kfp-operator/argo/common" + "github.com/sky-uk/kfp-operator/provider-service/base/pkg/server/resource" + "github.com/sky-uk/kfp-operator/provider-service/vai/internal/client" + "github.com/sky-uk/kfp-operator/provider-service/vai/internal/config" + "github.com/sky-uk/kfp-operator/provider-service/vai/internal/util" + "google.golang.org/api/option" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/fieldmaskpb" +) + +type VAIProvider struct { + ctx context.Context + config config.VAIProviderConfig + fileHandler FileHandler + pipelineClient client.PipelineJobClient + scheduleClient client.ScheduleClient + jobBuilder JobBuilder + jobEnricher JobEnricher +} + +func NewProvider( + ctx context.Context, + config config.VAIProviderConfig, +) (*VAIProvider, error) { + fh, err := NewGcsFileHandler(ctx, config.Parameters.GcsEndpoint) + if err != nil { + return nil, err + } + + pc, err := aiplatform.NewPipelineClient( + ctx, + option.WithEndpoint(config.VaiEndpoint()), + ) + if err != nil { + return nil, err + } + + sc, err := aiplatform.NewScheduleClient( + ctx, + option.WithEndpoint(config.VaiEndpoint()), + ) + if err != nil { + return nil, err + } + + return &VAIProvider{ + ctx: ctx, + config: config, + fileHandler: &fh, + pipelineClient: pc, + scheduleClient: sc, + jobBuilder: DefaultJobBuilder{ + serviceAccount: config.Parameters.VaiJobServiceAccount, + pipelineBucket: config.Parameters.PipelineBucket, + labelGen: DefaultLabelGen{}, + }, + jobEnricher: DefaultJobEnricher{}, + }, nil +} + +func (vaip *VAIProvider) CreatePipeline( + pdw resource.PipelineDefinitionWrapper, +) (string, error) { + pipelineId, err := vaip.UpdatePipeline(pdw, "") + if err != nil { + return "", err + } + return pipelineId, nil +} + +func (vaip *VAIProvider) UpdatePipeline( + pdw resource.PipelineDefinitionWrapper, + _ string, +) (string, error) { + pipelineId, err := pdw.PipelineDefinition.Name.String() + if err != nil { + return "", err + } + + storageObject, err := util.PipelineStorageObject( + pdw.PipelineDefinition.Name, + pdw.PipelineDefinition.Version, + ) + if err != nil { + return pipelineId, err + } + + if err = vaip.fileHandler.Write( + pdw.CompiledPipeline, + vaip.config.Parameters.PipelineBucket, + storageObject, + ); err != nil { + return "", err + } + return pipelineId, nil +} + +func (vaip *VAIProvider) DeletePipeline(id string) error { + if err := vaip.fileHandler.Delete( + id, + vaip.config.Parameters.PipelineBucket, + ); err != nil { + return err + } + return nil +} + +func (vaip *VAIProvider) CreateRun(rd resource.RunDefinition) (string, error) { + logger := common.LoggerFromContext(vaip.ctx) + + pipelinePath, err := util.PipelineStorageObject( + rd.PipelineName, + rd.PipelineVersion, + ) + if err != nil { + return "", err + } + + raw, err := vaip.fileHandler.Read( + vaip.config.Parameters.PipelineBucket, + pipelinePath, + ) + if err != nil { + return "", err + } + + job, err := vaip.jobBuilder.MkRunPipelineJob(rd) + if err != nil { + return "", err + } + + enrichedJob, err := vaip.jobEnricher.Enrich(job, raw) + if err != nil { + return "", err + } + + runId := fmt.Sprintf("%s-%s", rd.Name.Namespace, rd.Name.Name) + pipelineJobId := fmt.Sprintf("%s-%s", runId, rd.Version) + + req := &aiplatformpb.CreatePipelineJobRequest{ + Parent: vaip.config.Parent(), + PipelineJobId: pipelineJobId, + PipelineJob: enrichedJob, + } + + _, err = vaip.pipelineClient.CreatePipelineJob(vaip.ctx, req) + if err != nil { + logger.Error(err, "CreatePipelineJob failed", "pipelineJobId", pipelineJobId) + return "", err + } + + return runId, nil +} + +func (vaip *VAIProvider) DeleteRun(_ string) error { + return nil +} + +func (vaip *VAIProvider) CreateRunSchedule( + rsd resource.RunScheduleDefinition, +) (string, error) { + logger := common.LoggerFromContext(vaip.ctx) + + pipelinePath, err := util.PipelineStorageObject( + rsd.PipelineName, + rsd.PipelineVersion, + ) + if err != nil { + return "", err + } + + raw, err := vaip.fileHandler.Read( + vaip.config.Parameters.PipelineBucket, + pipelinePath, + ) + if err != nil { + return "", err + } + + job, err := vaip.jobBuilder.MkRunSchedulePipelineJob(rsd) + if err != nil { + return "", err + } + + enrichedJob, err := vaip.jobEnricher.Enrich(job, raw) + if err != nil { + return "", err + } + + schedule, err := vaip.jobBuilder.MkSchedule( + rsd, + enrichedJob, + vaip.config.Parent(), + vaip.config.GetMaxConcurrentRunCountOrDefault(), + ) + if err != nil { + return "", err + } + + createdSchedule, err := vaip.scheduleClient.CreateSchedule( + vaip.ctx, + &aiplatformpb.CreateScheduleRequest{ + Parent: vaip.config.Parent(), + Schedule: schedule, + }, + ) + if err != nil { + logger.Error(err, "CreateScheduleRequest failed") + return "", err + } + logger.Info("CreateScheduleRequest succeeded", "schedule name", createdSchedule.Name) + + return createdSchedule.Name, nil +} + +func (vaip *VAIProvider) UpdateRunSchedule( + rsd resource.RunScheduleDefinition, + _ string, +) (string, error) { + logger := common.LoggerFromContext(vaip.ctx) + + pipelinePath, err := util.PipelineStorageObject( + rsd.PipelineName, + rsd.PipelineVersion, + ) + if err != nil { + return "", err + } + + raw, err := vaip.fileHandler.Read( + vaip.config.Parameters.PipelineBucket, + pipelinePath, + ) + if err != nil { + return "", err + } + + job, err := vaip.jobBuilder.MkRunSchedulePipelineJob(rsd) + if err != nil { + return "", err + } + + enrichedJob, err := vaip.jobEnricher.Enrich(job, raw) + if err != nil { + return "", err + } + + schedule, err := vaip.jobBuilder.MkSchedule( + rsd, + enrichedJob, + vaip.config.Parent(), + vaip.config.GetMaxConcurrentRunCountOrDefault(), + ) + if err != nil { + return "", err + } + + updateSchedule, err := vaip.scheduleClient.UpdateSchedule( + vaip.ctx, + &aiplatformpb.UpdateScheduleRequest{ + Schedule: schedule, + UpdateMask: &fieldmaskpb.FieldMask{ + Paths: []string{ + "schedule", + }, + }, + }, + ) + if err != nil { + logger.Error(err, "UpdateScheduleRequest failed", "parent", vaip.config.Parent()) + return "", err + } + + return updateSchedule.Name, nil +} + +func (vaip *VAIProvider) DeleteRunSchedule(id string) error { + schedule, err := vaip.scheduleClient.DeleteSchedule( + vaip.ctx, + &aiplatformpb.DeleteScheduleRequest{ + Name: id, + }, + ) + if err != nil { + return ignoreNotFound(err) + } + return ignoreNotFound(schedule.Wait(vaip.ctx)) +} + +func ignoreNotFound(err error) error { + if status.Code(err) == codes.NotFound { + return nil + } + return err +} + +func (vaip *VAIProvider) CreateExperiment( + _ resource.ExperimentDefinition, +) (string, error) { + return "", errors.New("not implemented") +} + +func (vaip *VAIProvider) UpdateExperiment( + _ resource.ExperimentDefinition, + _ string, +) (string, error) { + return "", errors.New("not implemented") +} + +func (vaip *VAIProvider) DeleteExperiment(_ string) error { + return errors.New("not implemented") +} diff --git a/provider-service/vai/internal/provider/provider_test.go b/provider-service/vai/internal/provider/provider_test.go new file mode 100644 index 00000000..eb8ca054 --- /dev/null +++ b/provider-service/vai/internal/provider/provider_test.go @@ -0,0 +1,512 @@ +//go:build unit + +package provider + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + + "cloud.google.com/go/aiplatform/apiv1/aiplatformpb" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/sky-uk/kfp-operator/provider-service/vai/internal/config" + "github.com/sky-uk/kfp-operator/provider-service/vai/internal/mocks" + "github.com/sky-uk/kfp-operator/provider-service/vai/internal/provider/testutil" + "github.com/stretchr/testify/mock" + "google.golang.org/protobuf/types/known/fieldmaskpb" +) + +var _ = Describe("Provider", func() { + var ( + mockFileHandler mocks.MockFileHandler + mockPipelineClient mocks.MockPipelineJobClient + mockScheduleClient mocks.MockScheduleClient + mockJobBuilder mocks.MockJobBuilder + mockJobEnricher mocks.MockJobEnricher + vaiProvider VAIProvider + ) + + BeforeEach(func() { + mockFileHandler = mocks.MockFileHandler{} + mockPipelineClient = mocks.MockPipelineJobClient{} + mockScheduleClient = mocks.MockScheduleClient{} + mockJobBuilder = mocks.MockJobBuilder{} + mockJobEnricher = mocks.MockJobEnricher{} + vaiProvider = VAIProvider{ + ctx: context.Background(), + config: config.VAIProviderConfig{}, + fileHandler: &mockFileHandler, + pipelineClient: &mockPipelineClient, + scheduleClient: &mockScheduleClient, + jobBuilder: &mockJobBuilder, + jobEnricher: &mockJobEnricher, + } + }) + + Context("CreatePipeline", func() { + When("creating a pipeline", func() { + It("should return the pipeline ID", func() { + mockFileHandler.On( + "Write", + mock.Anything, + mock.Anything, + mock.Anything, + ).Return(nil) + + pdw := testutil.RandomPipelineDefinitionWrapper() + pid, err := vaiProvider.CreatePipeline(pdw) + + Expect(err).ToNot(HaveOccurred()) + Expect(pid).To(Equal(fmt.Sprintf( + "%s/%s", + pdw.PipelineDefinition.Name.Namespace, + pdw.PipelineDefinition.Name.Name, + ))) + }) + + It("return an error when the file handler write fails", func() { + mockFileHandler.On( + "Write", + mock.Anything, + mock.Anything, + mock.Anything, + ).Return(errors.New("failed")) + + pdw := testutil.RandomPipelineDefinitionWrapper() + _, err := vaiProvider.CreatePipeline(pdw) + + Expect(err).To(HaveOccurred()) + Expect(err).To(MatchError("failed")) + }) + }) + }) + + Context("UpdatePipeline", func() { + When("updating a pipeline", func() { + It("should return the pipeline ID", func() { + pdw := testutil.RandomPipelineDefinitionWrapper() + mockFileHandler.On( + "Write", + mock.MatchedBy(func(j json.RawMessage) bool { + return bytes.Equal(j, pdw.CompiledPipeline) + }), + vaiProvider.config.Parameters.PipelineBucket, + fmt.Sprintf( + "%s/%s/%s", + pdw.PipelineDefinition.Name.Namespace, + pdw.PipelineDefinition.Name.Name, + pdw.PipelineDefinition.Version, + ), + ).Return(nil) + + pid, err := vaiProvider.UpdatePipeline(pdw, "") + + Expect(err).ToNot(HaveOccurred()) + Expect(pid).To(Equal(fmt.Sprintf( + "%s/%s", pdw.PipelineDefinition.Name.Namespace, pdw.PipelineDefinition.Name.Name, + ))) + }) + + It("return an error when the file handler write fails", func() { + pdw := testutil.RandomPipelineDefinitionWrapper() + mockFileHandler.On( + "Write", + mock.Anything, + vaiProvider.config.Parameters.PipelineBucket, + mock.Anything, + ).Return(errors.New("failed")) + + _, err := vaiProvider.CreatePipeline(pdw) + + Expect(err).To(HaveOccurred()) + Expect(err).To(MatchError("failed")) + }) + }) + }) + + Context("DeletePipeline", func() { + When("deleting a pipeline", func() { + It("should not return an error", func() { + id := "id" + mockFileHandler.On( + "Delete", + id, + vaiProvider.config.Parameters.PipelineBucket, + ).Return(nil) + err := vaiProvider.DeletePipeline(id) + Expect(err).ToNot(HaveOccurred()) + }) + + It("return an error when the file handler delete fails", func() { + mockFileHandler.On( + "Delete", + "pipelineId", + vaiProvider.config.Parameters.PipelineBucket, + ).Return(errors.New("failed")) + err := vaiProvider.DeletePipeline("pipelineId") + Expect(err).To(HaveOccurred()) + Expect(err).To(MatchError("failed")) + }) + }) + }) + + Context("CreateRun", func() { + When("creating a run", func() { + It("return a run ID", func() { + rd := testutil.RandomBasicRunDefinition() + pj := aiplatformpb.PipelineJob{} + mockFileHandler.On( + "Read", + vaiProvider.config.Parameters.PipelineBucket, + fmt.Sprintf( + "%s/%s/%s", + rd.PipelineName.Namespace, + rd.PipelineName.Name, + rd.PipelineVersion, + ), + ).Return(map[string]any{}, nil) + mockJobBuilder.On("MkRunPipelineJob", rd).Return(&pj, nil) + mockJobEnricher.On("Enrich", &pj, map[string]any{}).Return(&pj, nil) + mockPipelineClient.On( + "CreatePipelineJob", + &aiplatformpb.CreatePipelineJobRequest{ + Parent: vaiProvider.config.Parent(), + PipelineJobId: fmt.Sprintf("%s-%s-%s", rd.Name.Namespace, rd.Name.Name, rd.Version), + PipelineJob: &pj, + }, + ).Return(&pj, nil) + runId, err := vaiProvider.CreateRun(rd) + + Expect(err).ToNot(HaveOccurred()) + Expect(runId).To(Equal(fmt.Sprintf("%s-%s", rd.Name.Namespace, rd.Name.Name))) + }) + + It("return an error when the file handler read fails", func() { + rd := testutil.RandomBasicRunDefinition() + mockFileHandler.On( + "Read", + vaiProvider.config.Parameters.PipelineBucket, + mock.Anything, + ).Return(map[string]any{}, errors.New("failed")) + _, err := vaiProvider.CreateRun(rd) + + Expect(err).To(HaveOccurred()) + Expect(err).To(MatchError("failed")) + }) + + It("return an error when the job builder fails", func() { + rd := testutil.RandomBasicRunDefinition() + mockFileHandler.On( + "Read", + vaiProvider.config.Parameters.PipelineBucket, + mock.Anything, + ).Return(map[string]any{}, nil) + mockJobBuilder.On("MkRunPipelineJob", rd).Return(nil, errors.New("failed")) + _, err := vaiProvider.CreateRun(rd) + + Expect(err).To(HaveOccurred()) + Expect(err).To(MatchError("failed")) + }) + + It("return an error when the job enricher fails", func() { + rd := testutil.RandomBasicRunDefinition() + pj := aiplatformpb.PipelineJob{} + mockFileHandler.On( + "Read", + vaiProvider.config.Parameters.PipelineBucket, + mock.Anything, + ).Return(map[string]any{}, nil) + mockJobBuilder.On("MkRunPipelineJob", rd).Return(&pj, nil) + mockJobEnricher.On("Enrich", &pj, map[string]any{}).Return(nil, errors.New("failed")) + _, err := vaiProvider.CreateRun(rd) + + Expect(err).To(HaveOccurred()) + Expect(err).To(MatchError("failed")) + }) + + It("return an error when the pipeline client fails", func() { + rd := testutil.RandomBasicRunDefinition() + pj := aiplatformpb.PipelineJob{} + mockFileHandler.On( + "Read", + vaiProvider.config.Parameters.PipelineBucket, + mock.Anything, + ).Return(map[string]any{}, nil) + mockJobBuilder.On("MkRunPipelineJob", rd).Return(&pj, nil) + mockJobEnricher.On("Enrich", &pj, map[string]any{}).Return(&pj, nil) + mockPipelineClient.On("CreatePipelineJob", mock.Anything).Return(nil, errors.New("failed")) + _, err := vaiProvider.CreateRun(rd) + + Expect(err).To(HaveOccurred()) + Expect(err).To(MatchError("failed")) + }) + }) + }) + + Context("CreateRunSchedule", func() { + When("creating a run schedule", func() { + It("returns a schedule name", func() { + rsd := testutil.RandomRunScheduleDefinition() + pj := aiplatformpb.PipelineJob{} + schedule := aiplatformpb.Schedule{} + mockFileHandler.On( + "Read", + vaiProvider.config.Parameters.PipelineBucket, + fmt.Sprintf( + "%s/%s/%s", + rsd.PipelineName.Namespace, + rsd.PipelineName.Name, + rsd.PipelineVersion, + ), + ).Return(map[string]any{}, nil) + mockJobBuilder.On("MkRunSchedulePipelineJob", rsd).Return(&pj, nil) + mockJobEnricher.On("Enrich", &pj, map[string]any{}).Return(&pj, nil) + mockJobBuilder.On( + "MkSchedule", + rsd, + &pj, + vaiProvider.config.Parent(), + vaiProvider.config.GetMaxConcurrentRunCountOrDefault(), + ).Return(&schedule, nil) + mockScheduleClient.On( + "CreateSchedule", + &aiplatformpb.CreateScheduleRequest{ + Parent: vaiProvider.config.Parent(), + Schedule: &schedule, + }, + ).Return(&schedule, nil) + scheduleName, err := vaiProvider.CreateRunSchedule(rsd) + + Expect(err).ToNot(HaveOccurred()) + Expect(scheduleName).To(Equal(schedule.Name)) + }) + + It("return an error when the file handler read fails", func() { + rsd := testutil.RandomRunScheduleDefinition() + mockFileHandler.On( + "Read", + vaiProvider.config.Parameters.PipelineBucket, + mock.Anything, + ).Return(map[string]any{}, errors.New("failed")) + _, err := vaiProvider.CreateRunSchedule(rsd) + + Expect(err).To(HaveOccurred()) + Expect(err).To(MatchError("failed")) + }) + + It("return an error when the job builder fails to build a pipeline job", func() { + rsd := testutil.RandomRunScheduleDefinition() + mockFileHandler.On( + "Read", + vaiProvider.config.Parameters.PipelineBucket, + mock.Anything, + ).Return(map[string]any{}, nil) + mockJobBuilder.On("MkRunSchedulePipelineJob", rsd).Return(nil, errors.New("failed")) + _, err := vaiProvider.CreateRunSchedule(rsd) + + Expect(err).To(HaveOccurred()) + Expect(err).To(MatchError("failed")) + }) + + It("return an error when the job enricher fails", func() { + rsd := testutil.RandomRunScheduleDefinition() + pj := aiplatformpb.PipelineJob{} + mockFileHandler.On( + "Read", + vaiProvider.config.Parameters.PipelineBucket, + mock.Anything, + ).Return(map[string]any{}, nil) + mockJobBuilder.On("MkRunSchedulePipelineJob", rsd).Return(&pj, nil) + mockJobEnricher.On("Enrich", &pj, map[string]any{}).Return(nil, errors.New("failed")) + _, err := vaiProvider.CreateRunSchedule(rsd) + + Expect(err).To(HaveOccurred()) + Expect(err).To(MatchError("failed")) + }) + + It("return an error when the job builder fails to build a schedule", func() { + rsd := testutil.RandomRunScheduleDefinition() + pj := aiplatformpb.PipelineJob{} + mockFileHandler.On( + "Read", + vaiProvider.config.Parameters.PipelineBucket, + mock.Anything, + ).Return(map[string]any{}, nil) + mockJobBuilder.On("MkRunSchedulePipelineJob", rsd).Return(&pj, nil) + mockJobEnricher.On("Enrich", &pj, map[string]any{}).Return(&pj, nil) + mockJobBuilder.On( + "MkSchedule", + rsd, + &pj, + vaiProvider.config.Parent(), + vaiProvider.config.GetMaxConcurrentRunCountOrDefault(), + ).Return(nil, errors.New("failed")) + _, err := vaiProvider.CreateRunSchedule(rsd) + + Expect(err).To(HaveOccurred()) + Expect(err).To(MatchError("failed")) + }) + + It("return an error when the schedule client fails", func() { + rsd := testutil.RandomRunScheduleDefinition() + pj := aiplatformpb.PipelineJob{} + mockFileHandler.On( + "Read", + vaiProvider.config.Parameters.PipelineBucket, + mock.Anything, + ).Return(map[string]any{}, nil) + mockJobBuilder.On("MkRunSchedulePipelineJob", rsd).Return(&pj, nil) + mockJobEnricher.On("Enrich", &pj, map[string]any{}).Return(&pj, nil) + mockJobBuilder.On( + "MkSchedule", + mock.Anything, + &pj, + mock.Anything, + mock.Anything, + ).Return(&aiplatformpb.Schedule{}, nil) + mockScheduleClient.On("CreateSchedule", mock.Anything).Return(nil, errors.New("failed")) + _, err := vaiProvider.CreateRunSchedule(rsd) + + Expect(err).To(HaveOccurred()) + Expect(err).To(MatchError("failed")) + }) + }) + }) + + Context("UpdateRunSchedule", func() { + When("updating a run schedule", func() { + It("returns a schedule name", func() { + rsd := testutil.RandomRunScheduleDefinition() + pj := aiplatformpb.PipelineJob{} + mockFileHandler.On( + "Read", + vaiProvider.config.Parameters.PipelineBucket, + fmt.Sprintf( + "%s/%s/%s", + rsd.PipelineName.Namespace, + rsd.PipelineName.Name, + rsd.PipelineVersion, + ), + ).Return(map[string]any{}, nil) + mockJobBuilder.On("MkRunSchedulePipelineJob", rsd).Return(&pj, nil) + mockJobEnricher.On("Enrich", &pj, map[string]any{}).Return(&pj, nil) + mockJobBuilder.On( + "MkSchedule", + rsd, + &pj, + vaiProvider.config.Parent(), + mock.Anything, + ).Return(&aiplatformpb.Schedule{}, nil) + schedule := aiplatformpb.Schedule{} + mockScheduleClient.On( + "UpdateSchedule", + &aiplatformpb.UpdateScheduleRequest{ + Schedule: &schedule, + UpdateMask: &fieldmaskpb.FieldMask{ + Paths: []string{ + "schedule", + }, + }, + }, + ).Return(&schedule, nil) + scheduleName, err := vaiProvider.UpdateRunSchedule(rsd, "") + + Expect(err).ToNot(HaveOccurred()) + Expect(scheduleName).To(Equal(schedule.Name)) + }) + + It("return an error when the file handler read fails", func() { + rsd := testutil.RandomRunScheduleDefinition() + mockFileHandler.On( + "Read", + vaiProvider.config.Parameters.PipelineBucket, + mock.Anything, + ).Return(map[string]any{}, errors.New("failed")) + _, err := vaiProvider.UpdateRunSchedule(rsd, "") + + Expect(err).To(HaveOccurred()) + Expect(err).To(MatchError("failed")) + }) + + It("return an error when the job builder fails to build a pipeline job", func() { + rsd := testutil.RandomRunScheduleDefinition() + mockFileHandler.On( + "Read", + vaiProvider.config.Parameters.PipelineBucket, + mock.Anything, + ).Return(map[string]any{}, nil) + mockJobBuilder.On("MkRunSchedulePipelineJob", rsd).Return(nil, errors.New("failed")) + _, err := vaiProvider.UpdateRunSchedule(rsd, "") + + Expect(err).To(HaveOccurred()) + Expect(err).To(MatchError("failed")) + }) + + It("return an error when the job enricher fails", func() { + rsd := testutil.RandomRunScheduleDefinition() + pj := aiplatformpb.PipelineJob{} + mockFileHandler.On( + "Read", + vaiProvider.config.Parameters.PipelineBucket, + mock.Anything, + ).Return(map[string]any{}, nil) + mockJobBuilder.On("MkRunSchedulePipelineJob", rsd).Return(&pj, nil) + mockJobEnricher.On("Enrich", &pj, map[string]any{}).Return(nil, errors.New("failed")) + _, err := vaiProvider.UpdateRunSchedule(rsd, "") + + Expect(err).To(HaveOccurred()) + Expect(err).To(MatchError("failed")) + }) + + It("return an error when the job builder fails to build a schedule", func() { + rsd := testutil.RandomRunScheduleDefinition() + pj := aiplatformpb.PipelineJob{} + mockFileHandler.On( + "Read", + vaiProvider.config.Parameters.PipelineBucket, + mock.Anything, + ).Return(map[string]any{}, nil) + mockJobBuilder.On("MkRunSchedulePipelineJob", rsd).Return(&pj, nil) + mockJobEnricher.On("Enrich", &pj, map[string]any{}).Return(&pj, nil) + mockJobBuilder.On( + "MkSchedule", + mock.Anything, + &pj, + vaiProvider.config.Parent(), + mock.Anything, + ).Return(nil, errors.New("failed")) + _, err := vaiProvider.UpdateRunSchedule(rsd, "") + + Expect(err).To(HaveOccurred()) + Expect(err).To(MatchError("failed")) + }) + + It("return an error when the schedule client fails", func() { + rsd := testutil.RandomRunScheduleDefinition() + pj := aiplatformpb.PipelineJob{} + mockFileHandler.On( + "Read", + vaiProvider.config.Parameters.PipelineBucket, + mock.Anything, + ).Return(map[string]any{}, nil) + mockJobBuilder.On("MkRunSchedulePipelineJob", rsd).Return(&pj, nil) + mockJobEnricher.On("Enrich", &pj, map[string]any{}).Return(&pj, nil) + mockJobBuilder.On( + "MkSchedule", + mock.Anything, + &pj, + vaiProvider.config.Parent(), + mock.Anything, + ).Return(nil, errors.New("failed")) + mockScheduleClient.On("UpdateSchedule", mock.Anything).Return(nil, errors.New("failed")) + _, err := vaiProvider.UpdateRunSchedule(rsd, "") + + Expect(err).To(HaveOccurred()) + Expect(err).To(MatchError("failed")) + }) + }) + }) +}) diff --git a/provider-service/vai/internal/provider/suite_unit_test.go b/provider-service/vai/internal/provider/suite_unit_test.go new file mode 100644 index 00000000..249f9c02 --- /dev/null +++ b/provider-service/vai/internal/provider/suite_unit_test.go @@ -0,0 +1,15 @@ +//go:build unit + +package provider + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestUnitSuite(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "VAI Provider Unit Suite") +} diff --git a/provider-service/vai/internal/provider/testutil/testutil.go b/provider-service/vai/internal/provider/testutil/testutil.go new file mode 100644 index 00000000..562c0873 --- /dev/null +++ b/provider-service/vai/internal/provider/testutil/testutil.go @@ -0,0 +1,57 @@ +package testutil + +import ( + "encoding/json" + + "github.com/sky-uk/kfp-operator/apis" + "github.com/sky-uk/kfp-operator/provider-service/base/pkg/server/resource" + + pipelinesv1 "github.com/sky-uk/kfp-operator/apis/pipelines/v1alpha6" + "github.com/sky-uk/kfp-operator/argo/common" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func RandomBasicRunDefinition() resource.RunDefinition { + return resource.RunDefinition{ + Name: common.RandomNamespacedName(), + PipelineName: common.RandomNamespacedName(), + PipelineVersion: common.RandomString(), + RunConfigurationName: common.RandomNamespacedName(), + } +} + +var Now = metav1.Now() + +func RandomRunScheduleDefinition() resource.RunScheduleDefinition { + return resource.RunScheduleDefinition{ + Name: common.RandomNamespacedName(), + Version: common.RandomString(), + PipelineName: common.RandomNamespacedName(), + PipelineVersion: common.RandomString(), + RunConfigurationName: common.RandomNamespacedName(), + ExperimentName: common.RandomNamespacedName(), + Schedule: pipelinesv1.Schedule{ + CronExpression: "1 1 0 0 0", + StartTime: &Now, + EndTime: &Now, + }, + } +} + +func RandomPipelineDefinition() resource.PipelineDefinition { + return resource.PipelineDefinition{ + Name: common.RandomNamespacedName(), + Version: common.RandomString(), + Image: common.RandomString(), + TfxComponents: common.RandomString(), + Env: make([]apis.NamedValue, 0), + BeamArgs: make([]apis.NamedValue, 0), + } +} + +func RandomPipelineDefinitionWrapper() resource.PipelineDefinitionWrapper { + return resource.PipelineDefinitionWrapper{ + PipelineDefinition: RandomPipelineDefinition(), + CompiledPipeline: json.RawMessage{}, + } +}