Skip to content

Commit

Permalink
fix tilt and address comments
Browse files Browse the repository at this point in the history
Signed-off-by: pxp928 <[email protected]>
  • Loading branch information
pxp928 committed Jan 27, 2024
1 parent 3e48108 commit f653527
Show file tree
Hide file tree
Showing 6 changed files with 10 additions and 10 deletions.
2 changes: 1 addition & 1 deletion cmd/guaccollect/cmd/files.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func getCollectorPublish(ctx context.Context, blobStore *blob.BlobStore, pubsub
func initializeNATsandCollector(ctx context.Context, pubsubAddr string, blobAddr string) {
logger := logging.FromContext(ctx)

if strings.Contains(pubsubAddr, "nats://") {
if strings.HasPrefix(pubsubAddr, "nats://") {
// initialize jetstream
// TODO: pass in credentials file for NATS secure login
jetStream := emitter.NewJetStream(pubsubAddr, "", "")
Expand Down
2 changes: 1 addition & 1 deletion cmd/guacingest/cmd/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func ingest(cmd *cobra.Command, args []string) {
ctx, cf := context.WithCancel(logging.WithLogger(context.Background()))
logger := logging.FromContext(ctx)

if strings.Contains(opts.pubsubAddr, "nats://") {
if strings.HasPrefix(opts.pubsubAddr, "nats://") {
// initialize jetstream
// TODO: pass in credentials file for NATS secure login
jetStream := emitter.NewJetStream(opts.pubsubAddr, "", "")
Expand Down
2 changes: 1 addition & 1 deletion internal/testing/cmd/pubsub_test/cmd/files.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func getCollectorPublish(ctx context.Context, blobStore *blob.BlobStore, pubsub
func initializeNATsandCollector(ctx context.Context, pubsubAddr string, blobAddr string) {
logger := logging.FromContext(ctx)

if strings.Contains(pubsubAddr, "nats://") {
if strings.HasPrefix(pubsubAddr, "nats://") {
// initialize jetstream
// TODO: pass in credentials file for NATS secure login
jetStream := emitter.NewJetStream(pubsubAddr, "", "")
Expand Down
6 changes: 3 additions & 3 deletions k8s/k8s.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ spec:
command: ["/opt/guac/guacingest"]
workingDir: /tmp
env:
- name: GUAC_NATS_ADDR
- name: GUAC_PUBSUB_ADDR
value: nats://nats:4222
---
apiVersion: apps/v1
Expand All @@ -136,7 +136,7 @@ spec:
command: ["/opt/guac/guaccollect", "image"]
workingDir: /tmp
env:
- name: GUAC_NATS_ADDR
- name: GUAC_PUBSUB_ADDR
value: nats://nats:4222
- name: GUAC_CSUB_ADDR
value: guac-collectsub:2782
Expand All @@ -161,7 +161,7 @@ spec:
command: ["/opt/guac/guaccollect", "deps_dev"]
workingDir: /tmp
env:
- name: GUAC_NATS_ADDR
- name: GUAC_PUBSUB_ADDR
value: nats://nats:4222
- name: GUAC_CSUB_ADDR
value: guac-collectsub:2782
4 changes: 2 additions & 2 deletions pkg/cli/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func init() {

// Set of all flags used across GUAC clis and subcommands. Use consistent
// names for config file.
set.String("pubsub-addr", "nats://127.0.0.1:4222", "address to connect to a pubsub (default is NATS)")
set.String("pubsub-addr", "nats://127.0.0.1:4222", "gocloud connection string for pubsub configured via https://gocloud.dev/howto/pubsub/ (default is nats://127.0.0.1:4222)")
set.String("csub-addr", "localhost:2782", "address to connect to collect-sub service")
set.Bool("csub-tls", false, "enable tls connection to the server")
set.Bool("csub-tls-skip-verify", false, "skip verifying server certificate (for self-signed certificates for example)")
Expand All @@ -58,7 +58,7 @@ func init() {
set.String("neo4j-realm", "neo4j", "realm to connect to graph db")

// blob store address
set.String("blob-addr", "file:///path/to/dir", "address to the blob store configured via https://gocloud.dev/howto/blob/")
set.String("blob-addr", "file:///path/to/dir", "gocloud connection string for blob store configured via https://gocloud.dev/howto/blob/")

set.String("neptune-endpoint", "localhost", "address to neptune db")
set.Int("neptune-port", 8182, "port used for neptune db connection")
Expand Down
4 changes: 2 additions & 2 deletions pkg/emitter/emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func NewEmitterPubSub(_ context.Context, serviceURL string) *EmitterPubSub {
// buildTopicURL constructs the full URL for a topic.
// If using NATS, additional parameters are needed for jetstream
func buildTopicURL(serviceURL string) string {
if strings.Contains(serviceURL, "nats://") {
if strings.HasPrefix(serviceURL, "nats://") {
return fmt.Sprintf("%s?subject=%s", serviceURL, subjectNameDocCollected)
} else {
return serviceURL
Expand All @@ -73,7 +73,7 @@ func buildTopicURL(serviceURL string) string {
// buildSubscriptionURL constructs the full URL for subscription.
// If using NATS, additional parameters are needed for jetstream
func buildSubscriptionURL(serviceURL string) string {
if strings.Contains(serviceURL, "nats://") {
if strings.HasPrefix(serviceURL, "nats://") {
return fmt.Sprintf("%s?%s&subject=%s&consumer_durable=%s&stream_name=%s&stream_subjects=%s", serviceURL, "jetstream", subjectNameDocCollected, durableProcessor, streamName, streamSubjects)
} else {
return serviceURL
Expand Down

0 comments on commit f653527

Please sign in to comment.