Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

abstract pubsub service via gocloud #1664

Merged
merged 18 commits into from
Jan 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ not required.
therefore don't use camelCase anywhere.

- Descriptive on its own - The flag names are also used in the guac.yaml config
file. Therefore a name should be self descriptive. Good: `nats-addr`, Bad:
file. Therefore a name should be self descriptive. Good: `pubsub-addr`, Bad:
`type`. If it is something that has the same meaning everywhere, it is ok to
be short: ex: interval.

Expand Down
15 changes: 7 additions & 8 deletions cmd/guaccollect/cmd/deps_dev.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"time"

"github.com/guacsec/guac/pkg/cli"
"github.com/guacsec/guac/pkg/collectsub/client"
csubclient "github.com/guacsec/guac/pkg/collectsub/client"
"github.com/guacsec/guac/pkg/collectsub/datasource"
"github.com/guacsec/guac/pkg/collectsub/datasource/csubsource"
Expand All @@ -38,8 +37,8 @@ import (
type depsDevOptions struct {
// datasource for the collector
dataSource datasource.CollectSource
// address for NATS connection
natsAddr string
// address for pubsub connection
pubsubAddr string
// address for blob store
blobAddr string
// run as poll collector
Expand Down Expand Up @@ -75,7 +74,7 @@ you have access to read and write to the respective blob store.`,
logger := logging.FromContext(ctx)

opts, err := validateDepsDevFlags(
viper.GetString("nats-addr"),
viper.GetString("pubsub-addr"),
viper.GetString("blob-addr"),
viper.GetString("csub-addr"),
viper.GetBool("csub-tls"),
Expand Down Expand Up @@ -111,22 +110,22 @@ you have access to read and write to the respective blob store.`,
}()
}

initializeNATsandCollector(ctx, opts.natsAddr, opts.blobAddr)
initializeNATsandCollector(ctx, opts.pubsubAddr, opts.blobAddr)
},
}

func validateDepsDevFlags(natsAddr string, blobAddr string, csubAddr string, csubTls bool, csubTlsSkipVerify bool, useCsub bool, poll bool, retrieveDependencies bool, args []string,
func validateDepsDevFlags(pubsubAddr string, blobAddr string, csubAddr string, csubTls bool, csubTlsSkipVerify bool, useCsub bool, poll bool, retrieveDependencies bool, args []string,
enablePrometheus bool, prometheusPort int,
) (depsDevOptions, error) {
var opts depsDevOptions
opts.natsAddr = natsAddr
opts.pubsubAddr = pubsubAddr
opts.blobAddr = blobAddr
opts.poll = poll
opts.retrieveDependencies = retrieveDependencies
opts.enablePrometheus = enablePrometheus
opts.prometheusPort = prometheusPort
if useCsub {
csubOpts, err := client.ValidateCsubClientFlags(csubAddr, csubTls, csubTlsSkipVerify)
csubOpts, err := csubclient.ValidateCsubClientFlags(csubAddr, csubTls, csubTlsSkipVerify)
if err != nil {
return opts, fmt.Errorf("unable to validate csub client flags: %w", err)
}
Expand Down
43 changes: 24 additions & 19 deletions cmd/guaccollect/cmd/files.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"os"
"os/signal"
"strings"
"sync"
"syscall"
"time"
Expand All @@ -37,8 +38,8 @@ import (
type filesOptions struct {
// path to folder with documents to collect
path string
// address for NATS connection
natsAddr string
// address for pubsub connection
pubsubAddr string
// address for blob store
blobAddr string
// poll location
Expand Down Expand Up @@ -66,7 +67,7 @@ you have access to read and write to the respective blob store.`,
Run: func(cmd *cobra.Command, args []string) {

opts, err := validateFilesFlags(
viper.GetString("nats-addr"),
viper.GetString("pubsub-addr"),
viper.GetString("blob-addr"),
viper.GetBool("service-poll"),
args)
Expand All @@ -86,14 +87,14 @@ you have access to read and write to the respective blob store.`,
logger.Errorf("unable to register file collector: %v", err)
}

initializeNATsandCollector(ctx, opts.natsAddr, opts.blobAddr)
initializeNATsandCollector(ctx, opts.pubsubAddr, opts.blobAddr)
},
}

func validateFilesFlags(natsAddr string, blobAddr string, poll bool, args []string) (filesOptions, error) {
func validateFilesFlags(pubsubAddr string, blobAddr string, poll bool, args []string) (filesOptions, error) {
var opts filesOptions

opts.natsAddr = natsAddr
opts.pubsubAddr = pubsubAddr
opts.blobAddr = blobAddr
opts.poll = poll

Expand All @@ -106,33 +107,37 @@ func validateFilesFlags(natsAddr string, blobAddr string, poll bool, args []stri
return opts, nil
}

func getCollectorPublish(ctx context.Context) (func(*processor.Document) error, error) {
func getCollectorPublish(ctx context.Context, blobStore *blob.BlobStore, pubsub *emitter.EmitterPubSub) (func(*processor.Document) error, error) {
return func(d *processor.Document) error {
return collector.Publish(ctx, d)
return collector.Publish(ctx, d, blobStore, pubsub)
}, nil
}

func initializeNATsandCollector(ctx context.Context, natsAddr string, blobAddr string) {
func initializeNATsandCollector(ctx context.Context, pubsubAddr string, blobAddr string) {
logger := logging.FromContext(ctx)
// initialize jetstream
// TODO: pass in credentials file for NATS secure login
jetStream := emitter.NewJetStream(natsAddr, "", "")
ctx, err := jetStream.JetStreamInit(ctx)
if err != nil {
logger.Errorf("jetStream initialization failed with error: %v", err)
os.Exit(1)

if strings.HasPrefix(pubsubAddr, "nats://") {
// initialize jetstream
// TODO: pass in credentials file for NATS secure login
jetStream := emitter.NewJetStream(pubsubAddr, "", "")
if err := jetStream.JetStreamInit(ctx); err != nil {
logger.Errorf("jetStream initialization failed with error: %v", err)
os.Exit(1)
}
defer jetStream.Close()
}
defer jetStream.Close()

// initialize blob store
blobStore, err := blob.NewBlobStore(ctx, blobAddr)
if err != nil {
logger.Errorf("unable to connect to blog store: %v", err)
}

ctx = blob.WithBlobStore(ctx, blobStore)
// initialize pubsub
pubsub := emitter.NewEmitterPubSub(ctx, pubsubAddr)

// Get pipeline of components
collectorPubFunc, err := getCollectorPublish(ctx)
collectorPubFunc, err := getCollectorPublish(ctx, blobStore, pubsub)
if err != nil {
logger.Errorf("error: %v", err)
os.Exit(1)
Expand Down
15 changes: 7 additions & 8 deletions cmd/guaccollect/cmd/github.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"time"

"github.com/guacsec/guac/internal/client/githubclient"
"github.com/guacsec/guac/pkg/collectsub/client"
csubclient "github.com/guacsec/guac/pkg/collectsub/client"
"github.com/guacsec/guac/pkg/collectsub/datasource"
"github.com/guacsec/guac/pkg/collectsub/datasource/csubsource"
Expand All @@ -37,8 +36,8 @@ import (
type githubOptions struct {
// datasource for the collector
dataSource datasource.CollectSource
// address for NATS connection
natsAddr string
// address for pubsub connection
pubsubAddr string
// address for blob store
blobAddr string
// run as poll collector
Expand Down Expand Up @@ -68,7 +67,7 @@ you have access to read and write to the respective blob store.`,
logger := logging.FromContext(ctx)

opts, err := validateGithubFlags(
viper.GetString("nats-addr"),
viper.GetString("pubsub-addr"),
viper.GetString("blob-addr"),
viper.GetString("csub-addr"),
viper.GetBool("csub-tls"),
Expand Down Expand Up @@ -109,18 +108,18 @@ you have access to read and write to the respective blob store.`,
logger.Errorf("unable to register Github collector: %v", err)
}

initializeNATsandCollector(ctx, opts.natsAddr, opts.blobAddr)
initializeNATsandCollector(ctx, opts.pubsubAddr, opts.blobAddr)
},
}

func validateGithubFlags(natsAddr string, blobAddr string, csubAddr string, csubTls bool, csubTlsSkipVerify bool, useCsub bool, poll bool, args []string) (githubOptions, error) {
func validateGithubFlags(pubsubAddr string, blobAddr string, csubAddr string, csubTls bool, csubTlsSkipVerify bool, useCsub bool, poll bool, args []string) (githubOptions, error) {
var opts githubOptions
opts.natsAddr = natsAddr
opts.pubsubAddr = pubsubAddr
opts.blobAddr = blobAddr
opts.poll = poll

if useCsub {
csubOpts, err := client.ValidateCsubClientFlags(csubAddr, csubTls, csubTlsSkipVerify)
csubOpts, err := csubclient.ValidateCsubClientFlags(csubAddr, csubTls, csubTlsSkipVerify)
if err != nil {
return opts, fmt.Errorf("unable to validate csub client flags: %w", err)
}
Expand Down
15 changes: 7 additions & 8 deletions cmd/guaccollect/cmd/oci.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"os"
"time"

"github.com/guacsec/guac/pkg/collectsub/client"
csubclient "github.com/guacsec/guac/pkg/collectsub/client"
"github.com/guacsec/guac/pkg/collectsub/datasource"
"github.com/guacsec/guac/pkg/collectsub/datasource/csubsource"
Expand All @@ -37,8 +36,8 @@ import (
type ociOptions struct {
// datasource for the collector
dataSource datasource.CollectSource
// address for NATS connection
natsAddr string
// address for pubsub connection
pubsubAddr string
// address for blob store
blobAddr string
// run as poll collector
Expand Down Expand Up @@ -68,7 +67,7 @@ you have access to read and write to the respective blob store.`,
logger := logging.FromContext(ctx)

opts, err := validateOCIFlags(
viper.GetString("nats-addr"),
viper.GetString("pubsub-addr"),
viper.GetString("blob-addr"),
viper.GetString("csub-addr"),
viper.GetBool("csub-tls"),
Expand All @@ -92,18 +91,18 @@ you have access to read and write to the respective blob store.`,
logger.Errorf("unable to register oci collector: %v", err)
}

initializeNATsandCollector(ctx, opts.natsAddr, opts.blobAddr)
initializeNATsandCollector(ctx, opts.pubsubAddr, opts.blobAddr)
},
}

func validateOCIFlags(natsAddr string, blobAddr string, csubAddr string, csubTls bool, csubTlsSkipVerify bool, useCsub bool, poll bool, args []string) (ociOptions, error) {
func validateOCIFlags(pubsubAddr string, blobAddr string, csubAddr string, csubTls bool, csubTlsSkipVerify bool, useCsub bool, poll bool, args []string) (ociOptions, error) {
var opts ociOptions
opts.natsAddr = natsAddr
opts.pubsubAddr = pubsubAddr
opts.blobAddr = blobAddr
opts.poll = poll

if useCsub {
csubOpts, err := client.ValidateCsubClientFlags(csubAddr, csubTls, csubTlsSkipVerify)
csubOpts, err := csubclient.ValidateCsubClientFlags(csubAddr, csubTls, csubTlsSkipVerify)
if err != nil {
return opts, fmt.Errorf("unable to validate csub client flags: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/guaccollect/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
func init() {
cobra.OnInitialize(cli.InitConfig)

set, err := cli.BuildFlags([]string{"nats-addr", "blob-addr", "csub-addr", "use-csub", "service-poll", "enable-prometheus", "prometheus-addr"})
set, err := cli.BuildFlags([]string{"pubsub-addr", "blob-addr", "csub-addr", "use-csub", "service-poll", "enable-prometheus", "prometheus-addr"})
if err != nil {
fmt.Fprintf(os.Stderr, "failed to setup flag: %v", err)
os.Exit(1)
Expand Down
32 changes: 18 additions & 14 deletions cmd/guacingest/cmd/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"os"
"os/signal"
"strings"
"sync"
"syscall"

Expand All @@ -36,7 +37,7 @@ import (
)

type options struct {
natsAddr string
pubsubAddr string
blobAddr string
csubClientOptions client.CsubClientOptions
graphqlEndpoint string
Expand All @@ -45,7 +46,7 @@ type options struct {
func ingest(cmd *cobra.Command, args []string) {

opts, err := validateFlags(
viper.GetString("nats-addr"),
viper.GetString("pubsub-addr"),
viper.GetString("blob-addr"),
viper.GetString("csub-addr"),
viper.GetBool("csub-tls"),
Expand All @@ -61,22 +62,25 @@ func ingest(cmd *cobra.Command, args []string) {
ctx, cf := context.WithCancel(logging.WithLogger(context.Background()))
logger := logging.FromContext(ctx)

// initialize jetstream
// TODO: pass in credentials file for NATS secure login
jetStream := emitter.NewJetStream(opts.natsAddr, "", "")
ctx, err = jetStream.JetStreamInit(ctx)
if err != nil {
logger.Errorf("jetStream initialization failed with error: %v", err)
os.Exit(1)
if strings.HasPrefix(opts.pubsubAddr, "nats://") {
// initialize jetstream
// TODO: pass in credentials file for NATS secure login
jetStream := emitter.NewJetStream(opts.pubsubAddr, "", "")
if err := jetStream.JetStreamInit(ctx); err != nil {
logger.Errorf("jetStream initialization failed with error: %v", err)
os.Exit(1)
}
defer jetStream.Close()
}
defer jetStream.Close()

// initialize blob store
blobStore, err := blob.NewBlobStore(ctx, opts.blobAddr)
if err != nil {
logger.Errorf("unable to connect to blog store: %v", err)
}

ctx = blob.WithBlobStore(ctx, blobStore)
// initialize pubsub
pubsub := emitter.NewEmitterPubSub(ctx, opts.pubsubAddr)

// initialize collectsub client
csubClient, err := csub_client.NewClient(opts.csubClientOptions)
Expand All @@ -95,7 +99,7 @@ func ingest(cmd *cobra.Command, args []string) {
wg.Add(1)
go func() {
defer wg.Done()
if err := process.Subscribe(ctx, emit); err != nil {
if err := process.Subscribe(ctx, emit, blobStore, pubsub); err != nil {
logger.Errorf("processor ended with error: %v", err)
}
}()
Expand All @@ -110,9 +114,9 @@ func ingest(cmd *cobra.Command, args []string) {
wg.Wait()
}

func validateFlags(natsAddr string, blobAddr string, csubAddr string, csubTls bool, csubTlsSkipVerify bool, graphqlEndpoint string, args []string) (options, error) {
func validateFlags(pubsubAddr string, blobAddr string, csubAddr string, csubTls bool, csubTlsSkipVerify bool, graphqlEndpoint string, args []string) (options, error) {
var opts options
opts.natsAddr = natsAddr
opts.pubsubAddr = pubsubAddr
opts.blobAddr = blobAddr
csubOpts, err := client.ValidateCsubClientFlags(csubAddr, csubTls, csubTlsSkipVerify)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion cmd/guacingest/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
func init() {
cobra.OnInitialize(cli.InitConfig)

set, err := cli.BuildFlags([]string{"nats-addr", "blob-addr", "csub-addr", "gql-addr"})
set, err := cli.BuildFlags([]string{"pubsub-addr", "blob-addr", "csub-addr", "gql-addr"})
if err != nil {
fmt.Fprintf(os.Stderr, "failed to setup flag: %v", err)
os.Exit(1)
Expand Down
4 changes: 2 additions & 2 deletions container_files/arango/guac.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# nats
nats-addr: nats://nats:4222
# pubsub - default nats
pubsub-addr: nats://nats:4222

# blob store setup. Setup with blob store of choice via https://gocloud.dev/howto/blob/
blob-addr: file:///path/to/dir
Expand Down
4 changes: 2 additions & 2 deletions container_files/ent/guac.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# nats
nats-addr: nats://nats:4222
# pubsub - default nats
pubsub-addr: nats://nats:4222

# blob store setup. Setup with blob store of choice via https://gocloud.dev/howto/blob/
blob-addr: file:///path/to/dir
Expand Down
4 changes: 2 additions & 2 deletions container_files/guac/guac.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# nats
nats-addr: nats://nats:4222
# pubsub - default nats
pubsub-addr: nats://nats:4222

# blob store setup. Setup with blob store of choice via https://gocloud.dev/howto/blob/
blob-addr: file:///path/to/dir
Expand Down
2 changes: 1 addition & 1 deletion container_files/nats/js.conf
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// change max payload from default 1MB to 64MB
max_payload: 64MB
max_payload: 1MB
// enables jetstream, an empty block will enable and use defaults
jetstream {}
Loading
Loading