diff --git a/cmd/import/create.go b/cmd/import/create.go new file mode 100644 index 0000000..a60d830 --- /dev/null +++ b/cmd/import/create.go @@ -0,0 +1,120 @@ +/* +Copyright © 2023 OpenFGA + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package _import + +import ( + "fmt" + "github.com/openfga/cli/internal/cmdutils" + "github.com/openfga/cli/internal/job" + "github.com/openfga/cli/internal/storage" + "github.com/openfga/cli/internal/tuplefile" + "github.com/openfga/go-sdk/client" + "github.com/spf13/cobra" + _ "modernc.org/sqlite" +) + +// ImportJobTuples receives a client.ClientWriteRequest and imports the tuples to the store. It can be used to import +// either writes or deletes. +// It returns a pointer to an ImportResponse and an error. +// The ImportResponse contains the tuples that were successfully imported and the tuples that failed to be imported. +// Deletes and writes are put together in the same ImportResponse. +func ImportJobTuples( + fgaClient client.SdkClient, + tuples []client.ClientTupleKey, + storeID string, + requestRate int, + maxRequests int, + rampIntervalInSeconds int64, +) error { + conn, err := storage.NewDatabase() + if err != nil { + return err + } + bulkJobID, err := job.CreateJob(conn, storeID, tuples) + if err != nil { + return err + } + fmt.Printf("Job created successfully - %s\n", bulkJobID) + + err = job.ImportTuples(conn, bulkJobID, fgaClient, requestRate, maxRequests, rampIntervalInSeconds) + if err != nil { + return err + } + + success, failed, err := storage.GetSummary(conn, bulkJobID) + fmt.Printf("The status for Job ID - %s: Success - %d, Failed - %d", bulkJobID, success, failed) + return nil +} + +// createCmd represents the import command. +var createCmd = &cobra.Command{ + Use: "create", + Short: "Import Relationship Tuples as a job", + Long: "Imports Relationship Tuples to the store. " + + "This will write the tuples in chunks and at the end will report the tuple chunks that failed.", + RunE: func(cmd *cobra.Command, _ []string) error { + clientConfig := cmdutils.GetClientConfig(cmd) + + storeID, err := cmd.Flags().GetString("store-id") + if err != nil { + return fmt.Errorf("failed to get store-id: %w", err) + } + + fgaClient, err := clientConfig.GetFgaClient() + if err != nil { + return fmt.Errorf("failed to initialize FGA Client due to %w", err) + } + + fileName, err := cmd.Flags().GetString("file") + if err != nil { + return fmt.Errorf("failed to parse file name due to %w", err) + } + + initialRequestRate, err := cmd.Flags().GetInt("initial-request-rate") + if initialRequestRate <= 0 || err != nil { + initialRequestRate = 20 + } + + maxRequests, err := cmd.Flags().GetInt("max-requests") + if maxRequests <= 0 || err != nil { + maxRequests = 2000 + } + + rampInterval, err := cmd.Flags().GetInt64("ramp-interval-seconds") + if rampInterval <= 0 || err != nil { + rampInterval = 120 + } + + var tuples []client.ClientTupleKey + tuples, err = tuplefile.ReadTupleFile(fileName) + if err != nil { + return err //nolint:wrapcheck + } + + err = ImportJobTuples(fgaClient, tuples, storeID, initialRequestRate, maxRequests, rampInterval) + if err != nil { + return err + } + + return nil + }, +} + +func init() { + createCmd.Flags().String("file", "", "Tuples file") + createCmd.Flags().String("store-id", "", "Store ID") +} diff --git a/cmd/import/import.go b/cmd/import/import.go new file mode 100644 index 0000000..7f8ac87 --- /dev/null +++ b/cmd/import/import.go @@ -0,0 +1,36 @@ +/* +Copyright © 2023 OpenFGA + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package _import contains commands to manage import job into OpenFGA. +package _import + +import ( + "github.com/spf13/cobra" +) + +// ImportCmd represents the store command. +var ImportCmd = &cobra.Command{ + Use: "import", + Short: "Create a job to insert a large volume of tuples into OpenFGA", + Long: "import jobs are backed by database that can track the successful inserts of tuples and we can retry failed inserts or follow the status of the job.", +} + +func init() { + ImportCmd.AddCommand(createCmd) + ImportCmd.AddCommand(statusCmd) + ImportCmd.AddCommand(retryCmd) + ImportCmd.AddCommand(listCmd) +} diff --git a/cmd/import/list.go b/cmd/import/list.go new file mode 100644 index 0000000..22db1a4 --- /dev/null +++ b/cmd/import/list.go @@ -0,0 +1,45 @@ +/* +Copyright © 2023 OpenFGA + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package _import + +import ( + "fmt" + "github.com/openfga/cli/internal/storage" + "github.com/spf13/cobra" + _ "modernc.org/sqlite" +) + +// listCmd represents the import command. +var listCmd = &cobra.Command{ + Use: "list", + Short: "List all the import jobs", + Long: "List all the import jobs", + RunE: func(cmd *cobra.Command, _ []string) error { + conn, err := storage.NewDatabase() + if err != nil { + return err + } + results, err := storage.GetAllJobs(conn) + if err != nil { + return err + } + for _, result := range results { + fmt.Println(result) + } + return nil + }, +} diff --git a/cmd/import/retry.go b/cmd/import/retry.go new file mode 100644 index 0000000..226efc6 --- /dev/null +++ b/cmd/import/retry.go @@ -0,0 +1,74 @@ +/* +Copyright © 2023 OpenFGA + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package _import + +import ( + "fmt" + "github.com/openfga/cli/internal/cmdutils" + "github.com/openfga/cli/internal/job" + "github.com/openfga/cli/internal/storage" + "github.com/spf13/cobra" + _ "modernc.org/sqlite" +) + +// statusCmd represents the import command. +var retryCmd = &cobra.Command{ + Use: "retry", + Short: "Retry a import job", + Long: "Retry a import job", + RunE: func(cmd *cobra.Command, _ []string) error { + clientConfig := cmdutils.GetClientConfig(cmd) + conn, err := storage.NewDatabase() + bulkJobID, err := cmd.Flags().GetString("job-id") + if err != nil { + return fmt.Errorf("failed to get job-id: %w", err) + } + + fgaClient, err := clientConfig.GetFgaClient() + if err != nil { + return fmt.Errorf("failed to initialize FGA Client due to %w", err) + } + + initialRequestRate, err := cmd.Flags().GetInt("initial-request-rate") + if initialRequestRate <= 0 || err != nil { + initialRequestRate = 20 + } + + maxRequests, err := cmd.Flags().GetInt("max-requests") + if maxRequests <= 0 || err != nil { + maxRequests = 2000 + } + + rampInterval, err := cmd.Flags().GetInt64("ramp-interval-seconds") + if rampInterval <= 0 || err != nil { + rampInterval = 120 + } + + err = job.ImportTuples(conn, bulkJobID, fgaClient, initialRequestRate, maxRequests, rampInterval) + if err != nil { + return err + } + + success, failed, err := storage.GetSummary(conn, bulkJobID) + fmt.Printf("The status for Job ID - %s: Success - %d, Failed - %d", bulkJobID, success, failed) + return nil + }, +} + +func init() { + retryCmd.Flags().String("job-id", "", "Job ID") +} diff --git a/cmd/import/status.go b/cmd/import/status.go new file mode 100644 index 0000000..dbca83e --- /dev/null +++ b/cmd/import/status.go @@ -0,0 +1,51 @@ +/* +Copyright © 2023 OpenFGA + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package _import + +import ( + "fmt" + "github.com/openfga/cli/internal/storage" + "github.com/spf13/cobra" + _ "modernc.org/sqlite" +) + +// statusCmd represents the import command. +var statusCmd = &cobra.Command{ + Use: "status", + Short: "Check the status of the import job", + Long: "The status command is used to check if the import job is running.", + RunE: func(cmd *cobra.Command, _ []string) error { + bulkJobID, err := cmd.Flags().GetString("job-id") + if err != nil { + return fmt.Errorf("failed to get job-id: %w", err) + } + conn, err := storage.NewDatabase() + if err != nil { + return err + } + success, failed, err := storage.GetSummary(conn, bulkJobID) + if err != nil { + return err + } + fmt.Printf("The status for Job ID - %s: Success - %d, Failed - %d", bulkJobID, success, failed) + return nil + }, +} + +func init() { + statusCmd.Flags().String("job-id", "", "Job ID") +} diff --git a/cmd/query/check.go b/cmd/query/check.go index 092d401..65cd5c8 100644 --- a/cmd/query/check.go +++ b/cmd/query/check.go @@ -20,7 +20,6 @@ import ( "context" "fmt" - openfga "github.com/openfga/go-sdk" "github.com/openfga/go-sdk/client" "github.com/spf13/cobra" @@ -35,7 +34,6 @@ func check( object string, contextualTuples []client.ClientContextualTupleKey, queryContext *map[string]interface{}, - consistency *openfga.ConsistencyPreference, ) (*client.ClientCheckResponse, error) { body := &client.ClientCheckRequest{ User: user, @@ -46,14 +44,9 @@ func check( } options := &client.ClientCheckOptions{} - // Don't set if UNSPECIFIED has been provided, it's the default anyway - if *consistency != openfga.CONSISTENCYPREFERENCE_UNSPECIFIED { - options.Consistency = consistency - } - response, err := fgaClient.Check(context.Background()).Body(*body).Options(*options).Execute() if err != nil { - return nil, err //nolint:wrapcheck + return nil, fmt.Errorf("failed to check due to %w", err) } return response, nil @@ -63,7 +56,7 @@ func check( var checkCmd = &cobra.Command{ Use: "check", Short: "Check", - Example: `fga query check --store-id="01H4P8Z95KTXXEP6Z03T75Q984" user:anne can_view document:roadmap --context '{"ip_address":"127.0.0.1"}' --consistency "HIGHER_CONSISTENCY"`, //nolint:lll + Example: `fga query check --store-id="01H4P8Z95KTXXEP6Z03T75Q984" user:anne can_view document:roadmap --context '{"ip_address":"127.0.0.1"}'`, //nolint:lll Long: "Check if a user has a particular relation with an object.", Args: cobra.ExactArgs(3), //nolint:mnd RunE: func(cmd *cobra.Command, args []string) error { @@ -83,14 +76,9 @@ var checkCmd = &cobra.Command{ return fmt.Errorf("error parsing query context for check: %w", err) } - consistency, err := cmdutils.ParseConsistencyFromCmd(cmd) - if err != nil { - return fmt.Errorf("error parsing consistency for check: %w", err) - } - - response, err := check(fgaClient, args[0], args[1], args[2], contextualTuples, queryContext, consistency) + response, err := check(fgaClient, args[0], args[1], args[2], contextualTuples, queryContext) if err != nil { - return fmt.Errorf("check failed: %w", err) + return fmt.Errorf("failed to check due to %w", err) } return output.Display(*response) diff --git a/cmd/root.go b/cmd/root.go index 25018a8..77fd426 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -19,6 +19,7 @@ package cmd import ( "fmt" + _import "github.com/openfga/cli/cmd/import" "os" "strings" @@ -82,6 +83,7 @@ func init() { rootCmd.AddCommand(model.ModelCmd) rootCmd.AddCommand(tuple.TupleCmd) rootCmd.AddCommand(query.QueryCmd) + rootCmd.AddCommand(_import.ImportCmd) } // initConfig reads in config file and ENV variables if set. diff --git a/example/tuples.csv b/example/tuples.csv new file mode 100644 index 0000000..b2f7810 --- /dev/null +++ b/example/tuples.csv @@ -0,0 +1,4 @@ +user_type,user_id,user_relation,relation,object_type,object_id,condition_name,condition_context +user,anne,,owner,folder,product,inOfficeIP, +folder,product,,parent,folder,product-2021,inOfficeIP,"{""ip_addr"":""10.0.0.1""}" +team,fga,member,viewer,folder,product-2021,, diff --git a/example/tuples.json b/example/tuples.json new file mode 100644 index 0000000..2f180b7 --- /dev/null +++ b/example/tuples.json @@ -0,0 +1,17 @@ +[ + { + "user": "user:anne", + "relation": "owner", + "object": "folder:product" + }, + { + "user": "folder:product", + "relation": "parent", + "object": "folder:product-2021" + }, + { + "user": "user:beth", + "relation": "viewer", + "object": "folder:product-2021" + } +] diff --git a/go.mod b/go.mod index 8d49505..1b2e061 100644 --- a/go.mod +++ b/go.mod @@ -21,21 +21,28 @@ require ( go.uber.org/mock v0.5.0 google.golang.org/protobuf v1.36.3 gopkg.in/yaml.v3 v3.0.1 + modernc.org/sqlite v1.34.5 ) require ( cel.dev/expr v0.18.0 // indirect github.com/Yiling-J/theine-go v0.6.0 // indirect + github.com/dustin/go-humanize v1.0.1 // indirect github.com/emirpasic/gods v1.18.1 // indirect github.com/google/uuid v1.6.0 // indirect github.com/klauspost/cpuid/v2 v2.0.9 // indirect github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/ncruces/go-strftime v0.1.9 // indirect + github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect github.com/rivo/uniseg v0.4.7 // indirect github.com/zeebo/xxh3 v1.0.2 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect golang.org/x/term v0.28.0 // indirect gonum.org/v1/gonum v0.15.1 // indirect + modernc.org/libc v1.55.3 // indirect + modernc.org/mathutil v1.6.0 // indirect + modernc.org/memory v1.8.0 // indirect ) require ( diff --git a/go.sum b/go.sum index e9000ef..b9059bd 100644 --- a/go.sum +++ b/go.sum @@ -84,6 +84,8 @@ github.com/google/cel-go v0.22.1/go.mod h1:BuznPXXfQDpXKWQ9sPW3TzlAJN5zzFe+i9tIs github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/pprof v0.0.0-20240409012703-83162a5b38cd h1:gbpYu9NMq8jhDVbvlGkMFWCjLFlqqEZjEmObmhUy6Vo= +github.com/google/pprof v0.0.0-20240409012703-83162a5b38cd/go.mod h1:kf6iHlnVGwgKolg33glAes7Yg/8iWP8ukqeldJSO7jw= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 h1:UH//fgunKIs4JdUbpDl1VZCDaL56wXCB/5+wF6uHfaI= @@ -101,9 +103,6 @@ github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+l github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= github.com/hashicorp/go-retryablehttp v0.7.7 h1:C8hUCYzor8PIfXHa4UrZkU4VvK8o9ISHxT2Q8+VepXU= github.com/hashicorp/go-retryablehttp v0.7.7/go.mod h1:pkQpWZeYWskR+D1tR2O5OcBFOxfA7DoAO6xtkuQnHTk= -github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc= -github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= -github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= @@ -391,16 +390,26 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= -modernc.org/gc/v3 v3.0.0-20240107210532-573471604cb6 h1:5D53IMaUuA5InSeMu9eJtlQXS2NxAhyWQvkKEgXZhHI= -modernc.org/gc/v3 v3.0.0-20240107210532-573471604cb6/go.mod h1:Qz0X07sNOR1jWYCrJMEnbW/X55x206Q7Vt4mz6/wHp4= +modernc.org/cc/v4 v4.21.4 h1:3Be/Rdo1fpr8GrQ7IVw9OHtplU4gWbb+wNgeoBMmGLQ= +modernc.org/cc/v4 v4.21.4/go.mod h1:HM7VJTZbUCR3rV8EYBi9wxnJ0ZBRiGE5OeGXNA0IsLQ= +modernc.org/ccgo/v4 v4.19.2 h1:lwQZgvboKD0jBwdaeVCTouxhxAyN6iawF3STraAal8Y= +modernc.org/ccgo/v4 v4.19.2/go.mod h1:ysS3mxiMV38XGRTTcgo0DQTeTmAO4oCmJl1nX9VFI3s= +modernc.org/fileutil v1.3.0 h1:gQ5SIzK3H9kdfai/5x41oQiKValumqNTDXMvKo62HvE= +modernc.org/fileutil v1.3.0/go.mod h1:XatxS8fZi3pS8/hKG2GH/ArUogfxjpEKs3Ku3aK4JyQ= +modernc.org/gc/v2 v2.4.1 h1:9cNzOqPyMJBvrUipmynX0ZohMhcxPtMccYgGOJdOiBw= +modernc.org/gc/v2 v2.4.1/go.mod h1:wzN5dK1AzVGoH6XOzc3YZ+ey/jPgYHLuVckd62P0GYU= modernc.org/libc v1.55.3 h1:AzcW1mhlPNrRtjS5sS+eW2ISCgSOLLNyFzRh/V3Qj/U= modernc.org/libc v1.55.3/go.mod h1:qFXepLhz+JjFThQ4kzwzOjA/y/artDeg+pcYnY+Q83w= modernc.org/mathutil v1.6.0 h1:fRe9+AmYlaej+64JsEEhoWuAYBkOtQiMEU7n/XgfYi4= modernc.org/mathutil v1.6.0/go.mod h1:Ui5Q9q1TR2gFm0AQRqQUaBWFLAhQpCwNcuhBOSedWPo= modernc.org/memory v1.8.0 h1:IqGTL6eFMaDZZhEWwcREgeMXYwmW83LYW8cROZYkg+E= modernc.org/memory v1.8.0/go.mod h1:XPZ936zp5OMKGWPqbD3JShgd/ZoQ7899TUuQqxY+peU= -modernc.org/sqlite v1.34.4 h1:sjdARozcL5KJBvYQvLlZEmctRgW9xqIZc2ncN7PU0P8= -modernc.org/sqlite v1.34.4/go.mod h1:3QQFCG2SEMtc2nv+Wq4cQCH7Hjcg+p/RMlS1XK+zwbk= +modernc.org/opt v0.1.3 h1:3XOZf2yznlhC+ibLltsDGzABUGVx8J6pnFMS3E4dcq4= +modernc.org/opt v0.1.3/go.mod h1:WdSiB5evDcignE70guQKxYUl14mgWtbClRi5wmkkTX0= +modernc.org/sortutil v1.2.0 h1:jQiD3PfS2REGJNzNCMMaLSp/wdMNieTbKX920Cqdgqc= +modernc.org/sortutil v1.2.0/go.mod h1:TKU2s7kJMf1AE84OoiGppNHJwvB753OYfNl2WRb++Ss= +modernc.org/sqlite v1.34.5 h1:Bb6SR13/fjp15jt70CL4f18JIN7p7dnMExd+UFnF15g= +modernc.org/sqlite v1.34.5/go.mod h1:YLuNmX9NKs8wRNK2ko1LW1NGYcc9FkBO69JOt1AR9JE= modernc.org/strutil v1.2.0 h1:agBi9dp1I+eOnxXeiZawM8F4LawKv4NzGWSaLfyeNZA= modernc.org/strutil v1.2.0/go.mod h1:/mdcBmfOibveCTBxUl5B5l6W+TTH1FXPLHZE6bTosX0= modernc.org/token v1.1.0 h1:Xl7Ap9dKaEs5kLoOQeQmPWevfnk/DM5qcLcYlA8ys6Y= diff --git a/internal/job/job.go b/internal/job/job.go new file mode 100644 index 0000000..6687894 --- /dev/null +++ b/internal/job/job.go @@ -0,0 +1,78 @@ +package job + +import ( + "context" + "database/sql" + "fmt" + "github.com/oklog/ulid/v2" + "github.com/openfga/cli/internal/storage" + "github.com/openfga/go-sdk/client" + "math" + "sync/atomic" + "time" +) + +func CreateJob(conn *sql.Conn, storeID string, tuples []client.ClientTupleKey) (string, error) { + bulkJobID := ulid.Make().String() + err := storage.InsertTuples(conn, bulkJobID, storeID, tuples) + if err != nil { + return "", err + } + return bulkJobID, nil +} + +func ImportTuples(conn *sql.Conn, bulkJobID string, + fgaClient client.SdkClient, + requestRate int, + maxRequests int, + rampIntervalInSeconds int64, +) error { + notInsertedTuplesCount, insertedTuplesCount, err := storage.GetTotalAndRemainingTuples(conn, bulkJobID) + totalTuplesCount := insertedTuplesCount + notInsertedTuplesCount + if err != nil { + return err + } + completedTuples := atomic.Int64{} + completedTuples.Store(insertedTuplesCount) + + rampStartTime := time.Now().Unix() + currentRequestRate := requestRate + for completedTuples.Load() < totalTuplesCount { + startTime := time.Now() + if rampStartTime > time.Now().Unix()+rampIntervalInSeconds { + currentRequestRate = GetRequestRate(requestRate, maxRequests) + rampStartTime = time.Now().Unix() + } + remainingTuples, e := storage.GetRemainingTuples(conn, bulkJobID, currentRequestRate) + if e != nil { + return e + } + for _, tuple := range remainingTuples { + _, e = fgaClient. + WriteTuples(context.Background()). + Body(client.ClientWriteTuplesBody{tuple.Tuple}). + Options(client.ClientWriteOptions{}). + Execute() + if e != nil { + errStr := e.Error() + storage.UpdateStatus(conn, tuple.Rowid, storage.NOT_INSERTED, errStr) + } else { + storage.UpdateStatus(conn, tuple.Rowid, storage.INSERTED, "") + } + completedTuples.Add(1) + } + elapsed := time.Since(startTime) + + fmt.Printf("Completed %d/%d. Requests Per Second - %d\n", completedTuples.Load(), totalTuplesCount, currentRequestRate) + time.Sleep(time.Second - elapsed) + } + return nil +} + +func GetRequestRate(currentRequestRate int, maxRequests int) int { + increasedRequestRate := currentRequestRate + int(math.Ceil(float64(currentRequestRate)*0.3)) + if increasedRequestRate > maxRequests { + return maxRequests + } + return increasedRequestRate +} diff --git a/internal/storage/sqlite.go b/internal/storage/sqlite.go new file mode 100644 index 0000000..3b24bed --- /dev/null +++ b/internal/storage/sqlite.go @@ -0,0 +1,202 @@ +package storage + +import ( + "context" + "database/sql" + "fmt" + openfga "github.com/openfga/go-sdk" + "github.com/openfga/go-sdk/client" + "strings" + "time" +) + +const ( + NOT_INSERTED = iota + INSERTED +) + +var CREATE_TABLE = ` + create table if not exists import_job + ( + bulk_job_id INTEGER, STORE_ID CHAR(26), inserted_at INT NOT NULL, + imported_at INT, subject VARCHAR(256), relation VARCHAR(256), + object VARCHAR(256), condition TEXT, status INT, reason TEXT +)` + +var INSERT_TUPLES = ` + INSERT INTO import_job ( + bulk_job_id, store_id, inserted_at, + imported_at, subject, object, relation, + condition, status, reason) values %s +` + +var READ_TUPLES = `SELECT + ROWID, subject, object, relation, condition + from import_job where bulk_job_id = ? and status = ? order by ROWID limit ? +` + +var GET_TUPLES_COUNT = ` + SELECT COUNT(*) from import_job where bulk_job_id = ? and status = ? +` + +var UPDATE_TUPLE_STATUS = ` + UPDATE import_job set status = ?, reason = ? where ROWID = ? +` + +var JOB_STATUS = ` + SELECT status, count(status) FROM import_job where bulk_job_id = ? group by status +` + +var GET_ALL_JOBS = ` + SELECT DISTINCT bulk_job_id FROM import_job +` + +func NewDatabase() (*sql.Conn, error) { + dsnURI := "file:cli.db?_journal_mode=WAL" + db, err := sql.Open("sqlite", dsnURI) + if err != nil { + return nil, err + } + _, err = db.Exec(CREATE_TABLE) + if err != nil { + return nil, err + } + conn, err := db.Conn(context.Background()) + return conn, nil +} + +func InsertTuples(db *sql.Conn, bulkJobID string, storeID string, tuples []client.ClientTupleKey) error { + valueStrings := make([]string, 0, len(tuples)) + valueArgs := make([]interface{}, 0, len(tuples)*10) + for _, tuple := range tuples { + valueStrings = append(valueStrings, "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?)") + valueArgs = append(valueArgs, bulkJobID) + valueArgs = append(valueArgs, storeID) + valueArgs = append(valueArgs, time.Now().Unix()) + valueArgs = append(valueArgs, nil) + valueArgs = append(valueArgs, tuple.User) + valueArgs = append(valueArgs, tuple.Object) + valueArgs = append(valueArgs, tuple.Relation) + if tuple.Condition != nil { + json, err := tuple.Condition.MarshalJSON() + if err != nil { + return err + } + valueArgs = append(valueArgs, json) + } else { + valueArgs = append(valueArgs, "") + } + valueArgs = append(valueArgs, NOT_INSERTED) + valueArgs = append(valueArgs, "") + } + + stmt := fmt.Sprintf(INSERT_TUPLES, strings.Join(valueStrings, ",")) + _, err := db.ExecContext(context.Background(), stmt, valueArgs...) + if err != nil { + return err + } + + return nil +} + +func GetRemainingTuples(db *sql.Conn, bulkJobID string, count int) ([]TuplesResult, error) { + row, err := db.QueryContext(context.Background(), READ_TUPLES, bulkJobID, NOT_INSERTED, count) + if err != nil { + return nil, err + } + defer row.Close() + var tuples []TuplesResult + for row.Next() { + result := client.ClientTupleKey{Condition: &openfga.RelationshipCondition{}} + var rowid int64 + var condition string + err = row.Scan(&rowid, &result.User, &result.Object, &result.Relation, &condition) + if err != nil { + return nil, err + } + if condition != "" { + nullable := openfga.NullableRelationshipCondition{} + err = nullable.UnmarshalJSON([]byte(condition)) + if err != nil { + return nil, err + } + if nullable.IsSet() { + result.Condition = nullable.Get() + } else { + result.Condition = nil + } + } else { + result.Condition = nil + } + tuples = append(tuples, TuplesResult{Rowid: rowid, Tuple: result}) + } + return tuples, nil +} + +type TuplesResult struct { + Tuple client.ClientTupleKey + Rowid int64 +} + +func GetTotalAndRemainingTuples(db *sql.Conn, bulkJobID string) (int64, int64, error) { + var notInsertedCount, insertedCount int64 + row, err := db.QueryContext(context.Background(), GET_TUPLES_COUNT, bulkJobID, NOT_INSERTED) + if err != nil { + return 0, 0, err + } + row.Next() + row.Scan(¬InsertedCount) + row, err = db.QueryContext(context.Background(), GET_TUPLES_COUNT, bulkJobID, INSERTED) + if err != nil { + return 0, 0, err + } + row.Next() + row.Scan(&insertedCount) + return notInsertedCount, insertedCount, nil +} + +func UpdateStatus(db *sql.Conn, rowid int64, status int, reason string) error { + _, err := db.ExecContext(context.Background(), UPDATE_TUPLE_STATUS, status, reason, rowid) + if err != nil { + return err + } + return nil +} + +func GetSummary(db *sql.Conn, bulkJobID string) (int, int, error) { + var success, failed, status, result int + row, err := db.QueryContext(context.Background(), JOB_STATUS, bulkJobID) + if err != nil { + return 0, 0, err + } + for row.Next() { + err = row.Scan(&status, &result) + if err != nil { + return 0, 0, err + } + switch status { + case NOT_INSERTED: + failed = result + break + case INSERTED: + success = result + break + } + } + + return success, failed, nil +} + +func GetAllJobs(db *sql.Conn) ([]string, error) { + results := make([]string, 0) + var result string + row, err := db.QueryContext(context.Background(), GET_ALL_JOBS) + if err != nil { + return nil, err + } + for row.Next() { + row.Scan(&result) + results = append(results, result) + } + return results, nil +}