From a2ed76bf872db9588703bfc6a4d72787d6bdc0e0 Mon Sep 17 00:00:00 2001 From: Poovamraj T T Date: Mon, 20 Jan 2025 16:32:52 +0530 Subject: [PATCH 1/3] Support tuples import job --- cmd/import/create.go | 144 +++++++++++++++++++++++++++++++++++++ cmd/import/import.go | 36 ++++++++++ cmd/import/list.go | 35 +++++++++ cmd/import/retry.go | 36 ++++++++++ cmd/import/status.go | 36 ++++++++++ cmd/root.go | 2 + example/tuples.csv | 4 ++ go.mod | 9 ++- go.sum | 45 ++++++++++++ internal/storage/sqlite.go | 143 ++++++++++++++++++++++++++++++++++++ 10 files changed, 489 insertions(+), 1 deletion(-) create mode 100644 cmd/import/create.go create mode 100644 cmd/import/import.go create mode 100644 cmd/import/list.go create mode 100644 cmd/import/retry.go create mode 100644 cmd/import/status.go create mode 100644 example/tuples.csv create mode 100644 internal/storage/sqlite.go diff --git a/cmd/import/create.go b/cmd/import/create.go new file mode 100644 index 0000000..01f090e --- /dev/null +++ b/cmd/import/create.go @@ -0,0 +1,144 @@ +/* +Copyright © 2023 OpenFGA + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package _import + +import ( + "context" + "fmt" + "github.com/oklog/ulid/v2" + "github.com/openfga/cli/internal/cmdutils" + "github.com/openfga/cli/internal/output" + "github.com/openfga/cli/internal/storage" + "github.com/openfga/cli/internal/tuplefile" + "github.com/openfga/go-sdk/client" + "github.com/spf13/cobra" + _ "modernc.org/sqlite" + "sync/atomic" +) + +type CreateImportJobResponse struct { + JobId string `json:"job_id"` +} + +// ImportJobTuples receives a client.ClientWriteRequest and imports the tuples to the store. It can be used to import +// either writes or deletes. +// It returns a pointer to an ImportResponse and an error. +// The ImportResponse contains the tuples that were successfully imported and the tuples that failed to be imported. +// Deletes and writes are put together in the same ImportResponse. +func ImportJobTuples( + fgaClient client.SdkClient, + tuples []client.ClientTupleKey, + storeID string, +) (*CreateImportJobResponse, error) { + bulkJobID := ulid.Make().String() + db, err := storage.NewDatabase() + if err != nil { + return nil, err + } + err = storage.InsertTuples(db, bulkJobID, storeID, tuples) + if err != nil { + return nil, err + } + fmt.Printf("Job created successfully - %s\n", bulkJobID) + + notInsertedTuplesCount, insertedTuplesCount, err := storage.GetTotalAndRemainingTuples(db, bulkJobID) + totalTuplesCount := insertedTuplesCount + notInsertedTuplesCount + if err != nil { + return nil, err + } + completedTuples := atomic.Int64{} + completedTuples.Store(insertedTuplesCount) + + for i := completedTuples; i.Load() < totalTuplesCount; { + remainingTuples, err := storage.GetRemainingTuples(db, bulkJobID, 3) + if err != nil { + return nil, err + } + for _, tuple := range remainingTuples { + go func() { + _, err = fgaClient. + WriteTuples(context.Background()). + Body(client.ClientWriteTuplesBody{tuple.Tuple}). + Options(client.ClientWriteOptions{}). + Execute() + if err != nil { + err.Error() + } else { + println("Success") + } + completedTuples.Add(1) + }() + } + } + + // Write 1 tuple per request + // Start with 20 requests per second + // Slowly ramp up - find ramp up logic + // Each time a request is successful we have to write it to a file + // --Resuming + // If resuming, we can ignore all the successful writes, start with failed writes + // --Failed + // Once fully completed, we can show a summary + // No. of successful writes, number of failed writes + // --While executing we can show + // Percentage completed, current RPS, expected time to complete + return &CreateImportJobResponse{JobId: bulkJobID}, nil +} + +// createCmd represents the import command. +var createCmd = &cobra.Command{ + Use: "create", + Short: "Import Relationship Tuples as a job", + Long: "Imports Relationship Tuples to the store. " + + "This will write the tuples in chunks and at the end will report the tuple chunks that failed.", + RunE: func(cmd *cobra.Command, _ []string) error { + clientConfig := cmdutils.GetClientConfig(cmd) + + storeID, err := cmd.Flags().GetString("store-id") + if err != nil { + return fmt.Errorf("failed to get store-id: %w", err) + } + + fgaClient, err := clientConfig.GetFgaClient() + if err != nil { + return fmt.Errorf("failed to initialize FGA Client due to %w", err) + } + + fileName, err := cmd.Flags().GetString("file") + if err != nil { + return fmt.Errorf("failed to parse file name due to %w", err) + } + + var tuples []client.ClientTupleKey + tuples, err = tuplefile.ReadTupleFile(fileName) + if err != nil { + return err //nolint:wrapcheck + } + + result, err := ImportJobTuples(fgaClient, tuples, storeID) + if err != nil { + return err + } + + return output.Display(*result) + }, +} + +func init() { + createCmd.Flags().String("file", "", "Tuples file") + createCmd.Flags().String("store-id", "", "Store ID") +} diff --git a/cmd/import/import.go b/cmd/import/import.go new file mode 100644 index 0000000..7f8ac87 --- /dev/null +++ b/cmd/import/import.go @@ -0,0 +1,36 @@ +/* +Copyright © 2023 OpenFGA + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package _import contains commands to manage import job into OpenFGA. +package _import + +import ( + "github.com/spf13/cobra" +) + +// ImportCmd represents the store command. +var ImportCmd = &cobra.Command{ + Use: "import", + Short: "Create a job to insert a large volume of tuples into OpenFGA", + Long: "import jobs are backed by database that can track the successful inserts of tuples and we can retry failed inserts or follow the status of the job.", +} + +func init() { + ImportCmd.AddCommand(createCmd) + ImportCmd.AddCommand(statusCmd) + ImportCmd.AddCommand(retryCmd) + ImportCmd.AddCommand(listCmd) +} diff --git a/cmd/import/list.go b/cmd/import/list.go new file mode 100644 index 0000000..afa325a --- /dev/null +++ b/cmd/import/list.go @@ -0,0 +1,35 @@ +/* +Copyright © 2023 OpenFGA + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package _import + +import ( + "github.com/spf13/cobra" + _ "modernc.org/sqlite" +) + +// listCmd represents the import command. +var listCmd = &cobra.Command{ + Use: "list", + Short: "List all the import jobs", + Long: "List all the import jobs", + RunE: func(cmd *cobra.Command, _ []string) error { + return nil + }, +} + +func init() { +} diff --git a/cmd/import/retry.go b/cmd/import/retry.go new file mode 100644 index 0000000..411a972 --- /dev/null +++ b/cmd/import/retry.go @@ -0,0 +1,36 @@ +/* +Copyright © 2023 OpenFGA + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package _import + +import ( + "github.com/spf13/cobra" + _ "modernc.org/sqlite" +) + +// statusCmd represents the import command. +var retryCmd = &cobra.Command{ + Use: "retry", + Short: "Retry a import job", + Long: "Retry a import job", + RunE: func(cmd *cobra.Command, _ []string) error { + return nil + }, +} + +func init() { + retryCmd.Flags().String("job-id", "", "Job ID") +} diff --git a/cmd/import/status.go b/cmd/import/status.go new file mode 100644 index 0000000..692b921 --- /dev/null +++ b/cmd/import/status.go @@ -0,0 +1,36 @@ +/* +Copyright © 2023 OpenFGA + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package _import + +import ( + "github.com/spf13/cobra" + _ "modernc.org/sqlite" +) + +// statusCmd represents the import command. +var statusCmd = &cobra.Command{ + Use: "status", + Short: "Check the status of the import job", + Long: "The status command is used to check if the import job is running.", + RunE: func(cmd *cobra.Command, _ []string) error { + return nil + }, +} + +func init() { + statusCmd.Flags().String("job-id", "", "Job ID") +} diff --git a/cmd/root.go b/cmd/root.go index 74eacc2..7c011ab 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -19,6 +19,7 @@ package cmd import ( "fmt" + _import "github.com/openfga/cli/cmd/import" "os" "strings" @@ -81,6 +82,7 @@ func init() { rootCmd.AddCommand(model.ModelCmd) rootCmd.AddCommand(tuple.TupleCmd) rootCmd.AddCommand(query.QueryCmd) + rootCmd.AddCommand(_import.ImportCmd) } // initConfig reads in config file and ENV variables if set. diff --git a/example/tuples.csv b/example/tuples.csv new file mode 100644 index 0000000..b2f7810 --- /dev/null +++ b/example/tuples.csv @@ -0,0 +1,4 @@ +user_type,user_id,user_relation,relation,object_type,object_id,condition_name,condition_context +user,anne,,owner,folder,product,inOfficeIP, +folder,product,,parent,folder,product-2021,inOfficeIP,"{""ip_addr"":""10.0.0.1""}" +team,fga,member,viewer,folder,product-2021,, diff --git a/go.mod b/go.mod index fee7b75..96c29e7 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/openfga/api/proto v0.0.0-20240807201305-c96ec773cae9 github.com/openfga/go-sdk v0.5.0 github.com/openfga/language/pkg/go v0.2.0-beta.0 - github.com/openfga/openfga v1.5.8 + github.com/openfga/openfga v1.5.0 github.com/spf13/cobra v1.8.1 github.com/spf13/pflag v1.0.5 github.com/spf13/viper v1.19.0 @@ -24,13 +24,20 @@ require ( ) require ( + github.com/dustin/go-humanize v1.0.1 // indirect github.com/emirpasic/gods v1.18.1 // indirect github.com/google/uuid v1.6.0 // indirect github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/ncruces/go-strftime v0.1.9 // indirect + github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect github.com/rivo/uniseg v0.4.7 // indirect golang.org/x/term v0.23.0 // indirect golang.org/x/time v0.6.0 // indirect + modernc.org/libc v1.55.3 // indirect + modernc.org/mathutil v1.6.0 // indirect + modernc.org/memory v1.8.0 // indirect + modernc.org/sqlite v1.34.5 // indirect ) require ( diff --git a/go.sum b/go.sum index c3b6b09..46ddc19 100644 --- a/go.sum +++ b/go.sum @@ -37,13 +37,19 @@ github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1 github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/distribution/reference v0.5.0 h1:/FUIFXtfc/x2gpa5/VGfiGLuOIdYa1t65IKK2OFGvA0= github.com/distribution/reference v0.5.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E= +github.com/docker/docker v24.0.7+incompatible h1:Wo6l37AuwP3JaMnZa226lzVXGA3F9Ig1seQen0cKYlM= +github.com/docker/docker v25.0.5+incompatible h1:UmQydMduGkrD5nQde1mecF/YnSbTOaPeFIeP5C4W+DE= +github.com/docker/docker v26.0.0+incompatible h1:Ng2qi+gdKADUa/VM+6b6YaY2nlZhk/lVJiKR/2bMudU= github.com/docker/docker v26.0.2+incompatible h1:yGVmKUFGgcxA6PXWAokO0sQL22BrQ67cgVjko8tGdXE= github.com/docker/docker v26.0.2+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= github.com/docker/docker v27.1.1+incompatible h1:hO/M4MtV36kzKldqnA37IWhebRA+LnqqcqDja6kVaKY= +github.com/docker/go-connections v0.4.0 h1:El9xVISelRB7BuFusrZozjnkIM5YnzCViNKohAFqRJQ= github.com/docker/go-connections v0.5.0 h1:USnMq7hx7gwdVZq1L49hLXaFtUdTADjXGp+uj1Br63c= github.com/docker/go-connections v0.5.0/go.mod h1:ov60Kzw0kKElRwhNs9UlUHAE/F9Fe6GLaXnqyDdmEXc= github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4= github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= +github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= +github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc= github.com/emirpasic/gods v1.18.1/go.mod h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FMNAnJvWQ= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= @@ -72,6 +78,8 @@ github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY= github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= +github.com/go-sql-driver/mysql v1.7.1 h1:lUIinVbN1DY0xBg0eMOzmmtGoHwWBbvnWubQUrtU8EI= +github.com/go-sql-driver/mysql v1.8.1-0.20240317050433-65395d853d2c h1:oCLNJYPr2xNTnHhZNMMKlLZa8nJl1GaITjrPe4NAiHg= github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y= github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= @@ -121,6 +129,7 @@ github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2 github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk= github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9 h1:L0QtFUgDarD7Fpv9jeVMgy/+Ec0mtnmYuImjTz6dtDA= github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= github.com/jackc/pgx/v5 v5.5.5 h1:amBjrZVmksIdNjxGW/IiIMzxMKZFelXbUoPNb+8sjQw= @@ -190,12 +199,15 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/natefinch/wrap v0.2.0 h1:IXzc/pw5KqxJv55gV0lSOcKHYuEZPGbQrOOXr/bamRk= github.com/natefinch/wrap v0.2.0/go.mod h1:6gMHlAl12DwYEfKP3TkuykYUfLSEAvHw67itm4/KAS8= +github.com/ncruces/go-strftime v0.1.9 h1:bY0MQC28UADQmHmaF5dgpLmImcShSi2kHU9XLdhx/f4= +github.com/ncruces/go-strftime v0.1.9/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls= github.com/nwidger/jsoncolor v0.3.2 h1:rVJJlwAWDJShnbTYOQ5RM7yTA20INyKXlJ/fg4JMhHQ= github.com/nwidger/jsoncolor v0.3.2/go.mod h1:Cs34umxLbJvgBMnVNVqhji9BhoT/N/KinHqZptQ7cf4= github.com/oklog/ulid/v2 v2.1.0 h1:+9lhoxAP56we25tyYETBBY1YLA2SaoLvUFgrP2miPJU= github.com/oklog/ulid/v2 v2.1.0/go.mod h1:rcEKHmBBKfef9DhnvX7y1HZBYxjXb0cP5ExxNsTT1QQ= github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= +github.com/opencontainers/image-spec v1.1.0-rc5 h1:Ygwkfw9bpDvs+c9E34SdgGOj41dX/cbdlwvlWt0pnFI= github.com/opencontainers/image-spec v1.1.0 h1:8SG7/vwALn54lVB/0yZ/MMwhFrPYtpEHQb2IpWsCzug= github.com/opencontainers/image-spec v1.1.0/go.mod h1:W4s4sFTMaBeK1BQLXbG4AdM2szdn85PY75RI83NrTrM= github.com/openfga/api/proto v0.0.0-20240613184530-f33cb24bcd97 h1:nhLlC41cgggzKnfmf3QtSAqm6fEWmOGenHgUrBTxXxU= @@ -206,10 +218,28 @@ github.com/openfga/go-sdk v0.5.0 h1:1IuAu6Xf4eBxgc2AyMfosK7QzApxuZ5yi7jmFaftnl0= github.com/openfga/go-sdk v0.5.0/go.mod h1:AoMnFlPw65sU/7O4xOPpCb2vXA8ZD9K9xp2hZjcvt4g= github.com/openfga/language/pkg/go v0.2.0-beta.0 h1:dTvgDkQImfNnH1iDvxnUIbz4INvKr4kS46dI12oAEzM= github.com/openfga/language/pkg/go v0.2.0-beta.0/go.mod h1:mCwEY2IQvyNgfEwbfH0C0ERUwtL8z6UjSAF8zgn5Xbg= +github.com/openfga/openfga v1.3.0 h1:uKm7DRX8K0O55acPWzxnXyRITSsqNa/WgHlSyTy4NsY= +github.com/openfga/openfga v1.3.0/go.mod h1:VizhpcpmAF20gy2OIeXw80jSq1qwPI0j00FOcFGyma8= +github.com/openfga/openfga v1.4.0 h1:7qdeJcOTAY8C7WO3OptZtqcbQx4fbaGtb61Z67urKDc= +github.com/openfga/openfga v1.4.0/go.mod h1:h0ZXOZN34WJdFtQ7KaTb8ghxY0e9BA889nwtxCWRw34= +github.com/openfga/openfga v1.5.0 h1:SfQIN3MnmOx8hCrDKi/uMnKeF4y8gYwUA7lmB/O2Jxk= +github.com/openfga/openfga v1.5.0/go.mod h1:ht3w17zJEx1Zzo+iaAwxF+BtwnZAmUDwpJWw5yxrDQU= +github.com/openfga/openfga v1.5.2 h1:CqBGMYv5S0oBZkxxDDlErF7mE6rieL04bTvx5nEHZnM= +github.com/openfga/openfga v1.5.2/go.mod h1:kL/pxf40rxuqasF8TWnGZAVF1aCkJG8xQZ8U1rVzA/c= +github.com/openfga/openfga v1.5.3 h1:Uynmlsx3iz/eiP7wW9n2dbwxDh/kiS/27W6C24Y7oyY= +github.com/openfga/openfga v1.5.3/go.mod h1:IcQBDtytjhBxjfJ+1zCzUxQvQa1BB/4Ed+POpZKojTI= +github.com/openfga/openfga v1.5.4 h1:mVrp0uB9jNWX/5+OtZLM6YOx5Y9Y4r/D/O+LNBF/FGQ= +github.com/openfga/openfga v1.5.4/go.mod h1:+PoZg9BJeq+h3L0eR52tqNTwghSapCFtmaVKHsUK7QM= github.com/openfga/openfga v1.5.5 h1:KPVX176JuHOCX9iARSacbS06VqxL5+jZ3WvIGws/xQw= github.com/openfga/openfga v1.5.5/go.mod h1:9R4YjXJZZsd7x+oV2qCVFZNbEOck8DCnXwkaJ3zowwY= +github.com/openfga/openfga v1.5.6 h1:V5VPXbDnThXHORJaP0Hv0kdw0gtS62eV4H0IQk0EqfE= +github.com/openfga/openfga v1.5.6/go.mod h1:Iv2BfL2b6ANYrqWIANSoEveZPh51LV2YnoexrUI8bvU= +github.com/openfga/openfga v1.5.7 h1:Nq08mSnRdOFtMEsnF+E/wxqyqUZl/NNFAXp07RwHZrY= +github.com/openfga/openfga v1.5.7/go.mod h1:SqyUWQBLCJhCGeO1nhxI8KlUsuIRQsjl6xqf++Xf5Bw= github.com/openfga/openfga v1.5.8 h1:/Me5pNAz0/drzasrSF6JryeSqiDBIyBLlnsYZE4ul90= github.com/openfga/openfga v1.5.8/go.mod h1:1OF1qR8nXdIirtosRZq0mPx5B6nuY5phPGk61Yh+9Lc= +github.com/openfga/openfga v1.5.9 h1:1x+9YdBOzbYPbkEUZjPPYt255GXDUbouC0ConpMRtL8= +github.com/openfga/openfga v1.5.9/go.mod h1:1OF1qR8nXdIirtosRZq0mPx5B6nuY5phPGk61Yh+9Lc= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/pborman/getopt v0.0.0-20170112200414-7148bc3a4c30/go.mod h1:85jBQOZwpVEaDAr341tbn15RS4fCAsIst0qp7i8ex1o= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= @@ -222,6 +252,7 @@ github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRI github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c h1:ncq/mPwQF4JjgDlrVEn3C11VoGHZN7m8qihwgMEtzYw= github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE= +github.com/pressly/goose/v3 v3.19.2 h1:z1yuD41jS4iaqLkyjkzGkKBz4rgyz/BYtCyMMGHlgzQ= github.com/pressly/goose/v3 v3.20.0 h1:uPJdOxF/Ipj7ABVNOAMJXSxwFXZGwMGHNqjC8e61VA0= github.com/pressly/goose/v3 v3.20.0/go.mod h1:BRfF2GcG4FTG12QfdBVy3q1yveaf4ckL9vWwEcIO3lA= github.com/prometheus/client_golang v1.19.1 h1:wZWJDwK+NameRJuPGDhlnFgx8e8HN3XHQeLaYJFJBOE= @@ -235,6 +266,8 @@ github.com/prometheus/common v0.55.0 h1:KEi6DK7lXW/m7Ig5i47x0vRzuBsHuvJdi5ee6Y3G github.com/prometheus/common v0.55.0/go.mod h1:2SECS4xJG1kd8XF9IcM1gMX6510RAEL65zxzNImwdc8= github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc= github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= +github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= +github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ= github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= @@ -291,10 +324,13 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8= github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU= +github.com/testcontainers/testcontainers-go v0.29.1 h1:z8kxdFlovA2y97RWx98v/TQ+tR+SXZm6p35M+xB92zk= github.com/testcontainers/testcontainers-go v0.30.0 h1:jmn/XS22q4YRrcMwWg0pAwlClzs/abopbsBzrepyc4E= github.com/testcontainers/testcontainers-go v0.30.0/go.mod h1:K+kHNGiM5zjklKjgTtcrEetF3uhWbMUyqAQoyoh8Pf0= +github.com/testcontainers/testcontainers-go/modules/mysql v0.29.1 h1:SnJtZNcskgxOMyVAT7M+MQjpveP59nwKzlBw2ItX+C8= github.com/testcontainers/testcontainers-go/modules/mysql v0.30.0 h1:wrePvxfU/2HFALKyBqpNs6VoPPvThzHy9aN+PCxse9g= github.com/testcontainers/testcontainers-go/modules/mysql v0.30.0/go.mod h1:Srnlf7wwA7s6K4sKKhjAoBHJcKorRINR/i5dCA4ZyGk= +github.com/testcontainers/testcontainers-go/modules/postgres v0.29.1 h1:hTn3MzhR9w4btwfzr/NborGCaeNZG0MPBpufeDj10KA= github.com/testcontainers/testcontainers-go/modules/postgres v0.30.0 h1:D3HFqpZS90iRGAN7M85DFiuhPfvYvFNnx8urQ6mPAvo= github.com/testcontainers/testcontainers-go/modules/postgres v0.30.0/go.mod h1:e1sKxwUOkqzvaqdHl/oV9mUtFmkDPTfBGp0po2tnWQU= github.com/tklauser/go-sysconf v0.3.12 h1:0QaGUFOdQaIVdPgfITYzaTegZvdCjmYO52cSFAEVmqU= @@ -305,6 +341,7 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yusufpapurcu/wmi v1.2.3 h1:E1ctvB7uKFMOJw3fdOW32DwGE9I7t++CRUEMKvFoFiw= github.com/yusufpapurcu/wmi v1.2.3/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 h1:jq9TW8u3so/bN+JPT166wjOI6/vQPF6Xe7nMNIltagk= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.51.0 h1:Xs2Ncz0gNihqu9iosIZ5SkBbWo5T8JhhLJFMQL1qmLI= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.51.0/go.mod h1:vy+2G/6NvVMpwGX/NyLqcC41fxepnuKHk16E6IZUcJc= go.opentelemetry.io/otel v1.27.0 h1:9BZoF3yMK/O1AafMiQTVu0YDj5Ea4hPhxCs7sGva+cg= @@ -471,3 +508,11 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +modernc.org/libc v1.55.3 h1:AzcW1mhlPNrRtjS5sS+eW2ISCgSOLLNyFzRh/V3Qj/U= +modernc.org/libc v1.55.3/go.mod h1:qFXepLhz+JjFThQ4kzwzOjA/y/artDeg+pcYnY+Q83w= +modernc.org/mathutil v1.6.0 h1:fRe9+AmYlaej+64JsEEhoWuAYBkOtQiMEU7n/XgfYi4= +modernc.org/mathutil v1.6.0/go.mod h1:Ui5Q9q1TR2gFm0AQRqQUaBWFLAhQpCwNcuhBOSedWPo= +modernc.org/memory v1.8.0 h1:IqGTL6eFMaDZZhEWwcREgeMXYwmW83LYW8cROZYkg+E= +modernc.org/memory v1.8.0/go.mod h1:XPZ936zp5OMKGWPqbD3JShgd/ZoQ7899TUuQqxY+peU= +modernc.org/sqlite v1.34.5 h1:Bb6SR13/fjp15jt70CL4f18JIN7p7dnMExd+UFnF15g= +modernc.org/sqlite v1.34.5/go.mod h1:YLuNmX9NKs8wRNK2ko1LW1NGYcc9FkBO69JOt1AR9JE= diff --git a/internal/storage/sqlite.go b/internal/storage/sqlite.go new file mode 100644 index 0000000..0d506f8 --- /dev/null +++ b/internal/storage/sqlite.go @@ -0,0 +1,143 @@ +package storage + +import ( + "database/sql" + "fmt" + openfga "github.com/openfga/go-sdk" + "github.com/openfga/go-sdk/client" + "strings" + "time" +) + +const ( + NOT_INSERTED = iota + INSERTED +) + +var CREATE_TABLE = ` + create table if not exists import_job + ( + bulk_job_id INTEGER, STORE_ID CHAR(26), inserted_at INT NOT NULL, + imported_at INT, subject VARCHAR(256), relation VARCHAR(256), + object VARCHAR(256), condition TEXT, status INT, reason TEXT +)` + +var INSERT_TUPLES = ` + INSERT INTO import_job ( + bulk_job_id, store_id, inserted_at, + imported_at, subject, object, relation, + condition, status, reason) values %s +` + +var READ_TUPLES = `SELECT + ROWID, subject, object, relation, condition + from import_job where bulk_job_id = ? and status = ? order by ROWID limit ? +` + +var GET_TUPLES_COUNT = ` + SELECT COUNT(*) from import_job where bulk_job_id = ? and status = ? +` + +func NewDatabase() (*sql.DB, error) { + dsnURI := "file:cli.db?_journal_mode=WAL" + db, err := sql.Open("sqlite", dsnURI) + if err != nil { + return nil, err + } + _, err = db.Exec(CREATE_TABLE) + if err != nil { + return nil, err + } + return db, nil +} + +func InsertTuples(db *sql.DB, bulkJobID string, storeID string, tuples []client.ClientTupleKey) error { + valueStrings := make([]string, 0, len(tuples)) + valueArgs := make([]interface{}, 0, len(tuples)*10) + for _, tuple := range tuples { + valueStrings = append(valueStrings, "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?)") + valueArgs = append(valueArgs, bulkJobID) + valueArgs = append(valueArgs, storeID) + valueArgs = append(valueArgs, time.Now().Unix()) + valueArgs = append(valueArgs, nil) + valueArgs = append(valueArgs, tuple.User) + valueArgs = append(valueArgs, tuple.Object) + valueArgs = append(valueArgs, tuple.Relation) + if tuple.Condition != nil { + json, err := tuple.Condition.MarshalJSON() + if err != nil { + return err + } + valueArgs = append(valueArgs, json) + } else { + valueArgs = append(valueArgs, "") + } + valueArgs = append(valueArgs, NOT_INSERTED) + valueArgs = append(valueArgs, "") + } + + stmt := fmt.Sprintf(INSERT_TUPLES, strings.Join(valueStrings, ",")) + _, err := db.Exec(stmt, valueArgs...) + if err != nil { + return err + } + + return nil +} + +func GetRemainingTuples(db *sql.DB, bulkJobID string, count int) ([]TuplesResult, error) { + row, err := db.Query(READ_TUPLES, bulkJobID, NOT_INSERTED, count) + if err != nil { + return nil, err + } + defer row.Close() + var tuples []TuplesResult + for row.Next() { + result := client.ClientTupleKey{Condition: &openfga.RelationshipCondition{}} + var rowid int64 + var condition string + err = row.Scan(&rowid, &result.User, &result.Object, &result.Relation, &condition) + if err != nil { + return nil, err + } + if condition != "" { + nullable := openfga.NullableRelationshipCondition{} + err = nullable.UnmarshalJSON([]byte(condition)) + if err != nil { + return nil, err + } + if nullable.IsSet() { + result.Condition = nullable.Get() + } else { + result.Condition = nil + } + } else { + result.Condition = nil + } + tuples = append(tuples, TuplesResult{Rowid: rowid, Tuple: result}) + } + fmt.Printf("%+v", tuples) + return tuples, nil +} + +type TuplesResult struct { + Tuple client.ClientTupleKey + Rowid int64 +} + +func GetTotalAndRemainingTuples(db *sql.DB, bulkJobID string) (int64, int64, error) { + var notInsertedCount, insertedCount int64 + row, err := db.Query(GET_TUPLES_COUNT, bulkJobID, NOT_INSERTED) + if err != nil { + return 0, 0, err + } + row.Next() + row.Scan(¬InsertedCount) + row, err = db.Query(GET_TUPLES_COUNT, bulkJobID, INSERTED) + if err != nil { + return 0, 0, err + } + row.Next() + row.Scan(&insertedCount) + return notInsertedCount, insertedCount, nil +} From a71f0b51d8baeb6357642ee03c331ab74f25ec60 Mon Sep 17 00:00:00 2001 From: Poovamraj T T Date: Thu, 23 Jan 2025 20:47:27 +0530 Subject: [PATCH 2/3] Import Job feature in CLI --- cmd/import/create.go | 86 ++++++++++++++------------------------ cmd/import/list.go | 16 +++++-- cmd/import/retry.go | 38 +++++++++++++++++ cmd/import/status.go | 15 +++++++ example/tuples.json | 17 ++++++++ internal/job/job.go | 78 ++++++++++++++++++++++++++++++++++ internal/storage/sqlite.go | 79 +++++++++++++++++++++++++++++----- 7 files changed, 261 insertions(+), 68 deletions(-) create mode 100644 example/tuples.json create mode 100644 internal/job/job.go diff --git a/cmd/import/create.go b/cmd/import/create.go index 01f090e..a60d830 100644 --- a/cmd/import/create.go +++ b/cmd/import/create.go @@ -17,23 +17,16 @@ limitations under the License. package _import import ( - "context" "fmt" - "github.com/oklog/ulid/v2" "github.com/openfga/cli/internal/cmdutils" - "github.com/openfga/cli/internal/output" + "github.com/openfga/cli/internal/job" "github.com/openfga/cli/internal/storage" "github.com/openfga/cli/internal/tuplefile" "github.com/openfga/go-sdk/client" "github.com/spf13/cobra" _ "modernc.org/sqlite" - "sync/atomic" ) -type CreateImportJobResponse struct { - JobId string `json:"job_id"` -} - // ImportJobTuples receives a client.ClientWriteRequest and imports the tuples to the store. It can be used to import // either writes or deletes. // It returns a pointer to an ImportResponse and an error. @@ -43,60 +36,28 @@ func ImportJobTuples( fgaClient client.SdkClient, tuples []client.ClientTupleKey, storeID string, -) (*CreateImportJobResponse, error) { - bulkJobID := ulid.Make().String() - db, err := storage.NewDatabase() + requestRate int, + maxRequests int, + rampIntervalInSeconds int64, +) error { + conn, err := storage.NewDatabase() if err != nil { - return nil, err + return err } - err = storage.InsertTuples(db, bulkJobID, storeID, tuples) + bulkJobID, err := job.CreateJob(conn, storeID, tuples) if err != nil { - return nil, err + return err } fmt.Printf("Job created successfully - %s\n", bulkJobID) - notInsertedTuplesCount, insertedTuplesCount, err := storage.GetTotalAndRemainingTuples(db, bulkJobID) - totalTuplesCount := insertedTuplesCount + notInsertedTuplesCount + err = job.ImportTuples(conn, bulkJobID, fgaClient, requestRate, maxRequests, rampIntervalInSeconds) if err != nil { - return nil, err - } - completedTuples := atomic.Int64{} - completedTuples.Store(insertedTuplesCount) - - for i := completedTuples; i.Load() < totalTuplesCount; { - remainingTuples, err := storage.GetRemainingTuples(db, bulkJobID, 3) - if err != nil { - return nil, err - } - for _, tuple := range remainingTuples { - go func() { - _, err = fgaClient. - WriteTuples(context.Background()). - Body(client.ClientWriteTuplesBody{tuple.Tuple}). - Options(client.ClientWriteOptions{}). - Execute() - if err != nil { - err.Error() - } else { - println("Success") - } - completedTuples.Add(1) - }() - } + return err } - // Write 1 tuple per request - // Start with 20 requests per second - // Slowly ramp up - find ramp up logic - // Each time a request is successful we have to write it to a file - // --Resuming - // If resuming, we can ignore all the successful writes, start with failed writes - // --Failed - // Once fully completed, we can show a summary - // No. of successful writes, number of failed writes - // --While executing we can show - // Percentage completed, current RPS, expected time to complete - return &CreateImportJobResponse{JobId: bulkJobID}, nil + success, failed, err := storage.GetSummary(conn, bulkJobID) + fmt.Printf("The status for Job ID - %s: Success - %d, Failed - %d", bulkJobID, success, failed) + return nil } // createCmd represents the import command. @@ -123,18 +84,33 @@ var createCmd = &cobra.Command{ return fmt.Errorf("failed to parse file name due to %w", err) } + initialRequestRate, err := cmd.Flags().GetInt("initial-request-rate") + if initialRequestRate <= 0 || err != nil { + initialRequestRate = 20 + } + + maxRequests, err := cmd.Flags().GetInt("max-requests") + if maxRequests <= 0 || err != nil { + maxRequests = 2000 + } + + rampInterval, err := cmd.Flags().GetInt64("ramp-interval-seconds") + if rampInterval <= 0 || err != nil { + rampInterval = 120 + } + var tuples []client.ClientTupleKey tuples, err = tuplefile.ReadTupleFile(fileName) if err != nil { return err //nolint:wrapcheck } - result, err := ImportJobTuples(fgaClient, tuples, storeID) + err = ImportJobTuples(fgaClient, tuples, storeID, initialRequestRate, maxRequests, rampInterval) if err != nil { return err } - return output.Display(*result) + return nil }, } diff --git a/cmd/import/list.go b/cmd/import/list.go index afa325a..22db1a4 100644 --- a/cmd/import/list.go +++ b/cmd/import/list.go @@ -17,6 +17,8 @@ limitations under the License. package _import import ( + "fmt" + "github.com/openfga/cli/internal/storage" "github.com/spf13/cobra" _ "modernc.org/sqlite" ) @@ -27,9 +29,17 @@ var listCmd = &cobra.Command{ Short: "List all the import jobs", Long: "List all the import jobs", RunE: func(cmd *cobra.Command, _ []string) error { + conn, err := storage.NewDatabase() + if err != nil { + return err + } + results, err := storage.GetAllJobs(conn) + if err != nil { + return err + } + for _, result := range results { + fmt.Println(result) + } return nil }, } - -func init() { -} diff --git a/cmd/import/retry.go b/cmd/import/retry.go index 411a972..226efc6 100644 --- a/cmd/import/retry.go +++ b/cmd/import/retry.go @@ -17,6 +17,10 @@ limitations under the License. package _import import ( + "fmt" + "github.com/openfga/cli/internal/cmdutils" + "github.com/openfga/cli/internal/job" + "github.com/openfga/cli/internal/storage" "github.com/spf13/cobra" _ "modernc.org/sqlite" ) @@ -27,6 +31,40 @@ var retryCmd = &cobra.Command{ Short: "Retry a import job", Long: "Retry a import job", RunE: func(cmd *cobra.Command, _ []string) error { + clientConfig := cmdutils.GetClientConfig(cmd) + conn, err := storage.NewDatabase() + bulkJobID, err := cmd.Flags().GetString("job-id") + if err != nil { + return fmt.Errorf("failed to get job-id: %w", err) + } + + fgaClient, err := clientConfig.GetFgaClient() + if err != nil { + return fmt.Errorf("failed to initialize FGA Client due to %w", err) + } + + initialRequestRate, err := cmd.Flags().GetInt("initial-request-rate") + if initialRequestRate <= 0 || err != nil { + initialRequestRate = 20 + } + + maxRequests, err := cmd.Flags().GetInt("max-requests") + if maxRequests <= 0 || err != nil { + maxRequests = 2000 + } + + rampInterval, err := cmd.Flags().GetInt64("ramp-interval-seconds") + if rampInterval <= 0 || err != nil { + rampInterval = 120 + } + + err = job.ImportTuples(conn, bulkJobID, fgaClient, initialRequestRate, maxRequests, rampInterval) + if err != nil { + return err + } + + success, failed, err := storage.GetSummary(conn, bulkJobID) + fmt.Printf("The status for Job ID - %s: Success - %d, Failed - %d", bulkJobID, success, failed) return nil }, } diff --git a/cmd/import/status.go b/cmd/import/status.go index 692b921..dbca83e 100644 --- a/cmd/import/status.go +++ b/cmd/import/status.go @@ -17,6 +17,8 @@ limitations under the License. package _import import ( + "fmt" + "github.com/openfga/cli/internal/storage" "github.com/spf13/cobra" _ "modernc.org/sqlite" ) @@ -27,6 +29,19 @@ var statusCmd = &cobra.Command{ Short: "Check the status of the import job", Long: "The status command is used to check if the import job is running.", RunE: func(cmd *cobra.Command, _ []string) error { + bulkJobID, err := cmd.Flags().GetString("job-id") + if err != nil { + return fmt.Errorf("failed to get job-id: %w", err) + } + conn, err := storage.NewDatabase() + if err != nil { + return err + } + success, failed, err := storage.GetSummary(conn, bulkJobID) + if err != nil { + return err + } + fmt.Printf("The status for Job ID - %s: Success - %d, Failed - %d", bulkJobID, success, failed) return nil }, } diff --git a/example/tuples.json b/example/tuples.json new file mode 100644 index 0000000..2f180b7 --- /dev/null +++ b/example/tuples.json @@ -0,0 +1,17 @@ +[ + { + "user": "user:anne", + "relation": "owner", + "object": "folder:product" + }, + { + "user": "folder:product", + "relation": "parent", + "object": "folder:product-2021" + }, + { + "user": "user:beth", + "relation": "viewer", + "object": "folder:product-2021" + } +] diff --git a/internal/job/job.go b/internal/job/job.go new file mode 100644 index 0000000..6687894 --- /dev/null +++ b/internal/job/job.go @@ -0,0 +1,78 @@ +package job + +import ( + "context" + "database/sql" + "fmt" + "github.com/oklog/ulid/v2" + "github.com/openfga/cli/internal/storage" + "github.com/openfga/go-sdk/client" + "math" + "sync/atomic" + "time" +) + +func CreateJob(conn *sql.Conn, storeID string, tuples []client.ClientTupleKey) (string, error) { + bulkJobID := ulid.Make().String() + err := storage.InsertTuples(conn, bulkJobID, storeID, tuples) + if err != nil { + return "", err + } + return bulkJobID, nil +} + +func ImportTuples(conn *sql.Conn, bulkJobID string, + fgaClient client.SdkClient, + requestRate int, + maxRequests int, + rampIntervalInSeconds int64, +) error { + notInsertedTuplesCount, insertedTuplesCount, err := storage.GetTotalAndRemainingTuples(conn, bulkJobID) + totalTuplesCount := insertedTuplesCount + notInsertedTuplesCount + if err != nil { + return err + } + completedTuples := atomic.Int64{} + completedTuples.Store(insertedTuplesCount) + + rampStartTime := time.Now().Unix() + currentRequestRate := requestRate + for completedTuples.Load() < totalTuplesCount { + startTime := time.Now() + if rampStartTime > time.Now().Unix()+rampIntervalInSeconds { + currentRequestRate = GetRequestRate(requestRate, maxRequests) + rampStartTime = time.Now().Unix() + } + remainingTuples, e := storage.GetRemainingTuples(conn, bulkJobID, currentRequestRate) + if e != nil { + return e + } + for _, tuple := range remainingTuples { + _, e = fgaClient. + WriteTuples(context.Background()). + Body(client.ClientWriteTuplesBody{tuple.Tuple}). + Options(client.ClientWriteOptions{}). + Execute() + if e != nil { + errStr := e.Error() + storage.UpdateStatus(conn, tuple.Rowid, storage.NOT_INSERTED, errStr) + } else { + storage.UpdateStatus(conn, tuple.Rowid, storage.INSERTED, "") + } + completedTuples.Add(1) + } + elapsed := time.Since(startTime) + + fmt.Printf("Completed %d/%d. Requests Per Second - %d\n", completedTuples.Load(), totalTuplesCount, currentRequestRate) + time.Sleep(time.Second - elapsed) + } + return nil +} + +func GetRequestRate(currentRequestRate int, maxRequests int) int { + increasedRequestRate := currentRequestRate + int(math.Ceil(float64(currentRequestRate)*0.3)) + if increasedRequestRate > maxRequests { + return maxRequests + } + return increasedRequestRate +} diff --git a/internal/storage/sqlite.go b/internal/storage/sqlite.go index 0d506f8..3b24bed 100644 --- a/internal/storage/sqlite.go +++ b/internal/storage/sqlite.go @@ -1,6 +1,7 @@ package storage import ( + "context" "database/sql" "fmt" openfga "github.com/openfga/go-sdk" @@ -38,7 +39,19 @@ var GET_TUPLES_COUNT = ` SELECT COUNT(*) from import_job where bulk_job_id = ? and status = ? ` -func NewDatabase() (*sql.DB, error) { +var UPDATE_TUPLE_STATUS = ` + UPDATE import_job set status = ?, reason = ? where ROWID = ? +` + +var JOB_STATUS = ` + SELECT status, count(status) FROM import_job where bulk_job_id = ? group by status +` + +var GET_ALL_JOBS = ` + SELECT DISTINCT bulk_job_id FROM import_job +` + +func NewDatabase() (*sql.Conn, error) { dsnURI := "file:cli.db?_journal_mode=WAL" db, err := sql.Open("sqlite", dsnURI) if err != nil { @@ -48,10 +61,11 @@ func NewDatabase() (*sql.DB, error) { if err != nil { return nil, err } - return db, nil + conn, err := db.Conn(context.Background()) + return conn, nil } -func InsertTuples(db *sql.DB, bulkJobID string, storeID string, tuples []client.ClientTupleKey) error { +func InsertTuples(db *sql.Conn, bulkJobID string, storeID string, tuples []client.ClientTupleKey) error { valueStrings := make([]string, 0, len(tuples)) valueArgs := make([]interface{}, 0, len(tuples)*10) for _, tuple := range tuples { @@ -77,7 +91,7 @@ func InsertTuples(db *sql.DB, bulkJobID string, storeID string, tuples []client. } stmt := fmt.Sprintf(INSERT_TUPLES, strings.Join(valueStrings, ",")) - _, err := db.Exec(stmt, valueArgs...) + _, err := db.ExecContext(context.Background(), stmt, valueArgs...) if err != nil { return err } @@ -85,8 +99,8 @@ func InsertTuples(db *sql.DB, bulkJobID string, storeID string, tuples []client. return nil } -func GetRemainingTuples(db *sql.DB, bulkJobID string, count int) ([]TuplesResult, error) { - row, err := db.Query(READ_TUPLES, bulkJobID, NOT_INSERTED, count) +func GetRemainingTuples(db *sql.Conn, bulkJobID string, count int) ([]TuplesResult, error) { + row, err := db.QueryContext(context.Background(), READ_TUPLES, bulkJobID, NOT_INSERTED, count) if err != nil { return nil, err } @@ -116,7 +130,6 @@ func GetRemainingTuples(db *sql.DB, bulkJobID string, count int) ([]TuplesResult } tuples = append(tuples, TuplesResult{Rowid: rowid, Tuple: result}) } - fmt.Printf("%+v", tuples) return tuples, nil } @@ -125,15 +138,15 @@ type TuplesResult struct { Rowid int64 } -func GetTotalAndRemainingTuples(db *sql.DB, bulkJobID string) (int64, int64, error) { +func GetTotalAndRemainingTuples(db *sql.Conn, bulkJobID string) (int64, int64, error) { var notInsertedCount, insertedCount int64 - row, err := db.Query(GET_TUPLES_COUNT, bulkJobID, NOT_INSERTED) + row, err := db.QueryContext(context.Background(), GET_TUPLES_COUNT, bulkJobID, NOT_INSERTED) if err != nil { return 0, 0, err } row.Next() row.Scan(¬InsertedCount) - row, err = db.Query(GET_TUPLES_COUNT, bulkJobID, INSERTED) + row, err = db.QueryContext(context.Background(), GET_TUPLES_COUNT, bulkJobID, INSERTED) if err != nil { return 0, 0, err } @@ -141,3 +154,49 @@ func GetTotalAndRemainingTuples(db *sql.DB, bulkJobID string) (int64, int64, err row.Scan(&insertedCount) return notInsertedCount, insertedCount, nil } + +func UpdateStatus(db *sql.Conn, rowid int64, status int, reason string) error { + _, err := db.ExecContext(context.Background(), UPDATE_TUPLE_STATUS, status, reason, rowid) + if err != nil { + return err + } + return nil +} + +func GetSummary(db *sql.Conn, bulkJobID string) (int, int, error) { + var success, failed, status, result int + row, err := db.QueryContext(context.Background(), JOB_STATUS, bulkJobID) + if err != nil { + return 0, 0, err + } + for row.Next() { + err = row.Scan(&status, &result) + if err != nil { + return 0, 0, err + } + switch status { + case NOT_INSERTED: + failed = result + break + case INSERTED: + success = result + break + } + } + + return success, failed, nil +} + +func GetAllJobs(db *sql.Conn) ([]string, error) { + results := make([]string, 0) + var result string + row, err := db.QueryContext(context.Background(), GET_ALL_JOBS) + if err != nil { + return nil, err + } + for row.Next() { + row.Scan(&result) + results = append(results, result) + } + return results, nil +} From ec100eb2c4e11db34bb4ad2cccad550c575af77a Mon Sep 17 00:00:00 2001 From: Poovamraj T T Date: Thu, 23 Jan 2025 20:52:12 +0530 Subject: [PATCH 3/3] Update check.go --- cmd/query/check.go | 20 ++++---------------- 1 file changed, 4 insertions(+), 16 deletions(-) diff --git a/cmd/query/check.go b/cmd/query/check.go index 092d401..65cd5c8 100644 --- a/cmd/query/check.go +++ b/cmd/query/check.go @@ -20,7 +20,6 @@ import ( "context" "fmt" - openfga "github.com/openfga/go-sdk" "github.com/openfga/go-sdk/client" "github.com/spf13/cobra" @@ -35,7 +34,6 @@ func check( object string, contextualTuples []client.ClientContextualTupleKey, queryContext *map[string]interface{}, - consistency *openfga.ConsistencyPreference, ) (*client.ClientCheckResponse, error) { body := &client.ClientCheckRequest{ User: user, @@ -46,14 +44,9 @@ func check( } options := &client.ClientCheckOptions{} - // Don't set if UNSPECIFIED has been provided, it's the default anyway - if *consistency != openfga.CONSISTENCYPREFERENCE_UNSPECIFIED { - options.Consistency = consistency - } - response, err := fgaClient.Check(context.Background()).Body(*body).Options(*options).Execute() if err != nil { - return nil, err //nolint:wrapcheck + return nil, fmt.Errorf("failed to check due to %w", err) } return response, nil @@ -63,7 +56,7 @@ func check( var checkCmd = &cobra.Command{ Use: "check", Short: "Check", - Example: `fga query check --store-id="01H4P8Z95KTXXEP6Z03T75Q984" user:anne can_view document:roadmap --context '{"ip_address":"127.0.0.1"}' --consistency "HIGHER_CONSISTENCY"`, //nolint:lll + Example: `fga query check --store-id="01H4P8Z95KTXXEP6Z03T75Q984" user:anne can_view document:roadmap --context '{"ip_address":"127.0.0.1"}'`, //nolint:lll Long: "Check if a user has a particular relation with an object.", Args: cobra.ExactArgs(3), //nolint:mnd RunE: func(cmd *cobra.Command, args []string) error { @@ -83,14 +76,9 @@ var checkCmd = &cobra.Command{ return fmt.Errorf("error parsing query context for check: %w", err) } - consistency, err := cmdutils.ParseConsistencyFromCmd(cmd) - if err != nil { - return fmt.Errorf("error parsing consistency for check: %w", err) - } - - response, err := check(fgaClient, args[0], args[1], args[2], contextualTuples, queryContext, consistency) + response, err := check(fgaClient, args[0], args[1], args[2], contextualTuples, queryContext) if err != nil { - return fmt.Errorf("check failed: %w", err) + return fmt.Errorf("failed to check due to %w", err) } return output.Display(*response)