Replies: 12 comments 4 replies
-
Thanks for the feedback , have you seen some of the examples from here? https://pkg.go.dev/github.com/nats-io/nats.go#example-JetStream |
Beta Was this translation helpful? Give feedback.
-
Hi, same issue here, betwwen the (outdated ?) official documentation that only references using the CLI, and the various implementations we can find, I am also lost, and it seems like my colleagues are as well. As a go developper interacting with Jetstream nats server, should I use : https://github.com/nats-io/nats.go (it contains jsm.go) Bref, If you can help me find the current reference implementation of jetstream go client that would be great. Best, EDIT : it seems like jsm.go is not to be used, that leaves go developpers with nats.go library : nats-io/jsm.go#258 @wallyqs it is true that we lack examples illustrating the concept of streams for instance. |
Beta Was this translation helpful? Give feedback.
-
Also no mention of the "STREAM" concept in jetstream exemples from nats.go |
Beta Was this translation helpful? Give feedback.
-
@aperrot42 We use Pull for the most part and they are not deprecated. Hope this helps, |
Beta Was this translation helpful? Give feedback.
-
sorry ro bother. does that mean in WorkQueuePolicy one subject only support one single consumer ? |
Beta Was this translation helpful? Give feedback.
-
Could you share your code? |
Beta Was this translation helpful? Give feedback.
-
_, err = js.AddStream(&nats.StreamConfig{
Name: "ORDERS",
Subjects: []string{"ORDERS.*"},
Storage: nats.FileStorage,
Replicas: 3,
Retention: nats.WorkQueuePolicy,
Discard: nats.DiscardNew,
MaxMsgs: -1,
MaxAge: time.Hour * 24 * 365,
})
if err != nil {
log.Println(err)
return
}
for i := 0; i < 10000; i++ {
szMsg := fmt.Sprintf("js msg %d", i)
js.PublishAsync(subjectName, []byte(szMsg))
}
<-js.PublishAsyncComplete()
func recvmsg(nWhich int) {
nc, err := nats.Connect(.........)
js, err := nc.JetStream()
durableName := fmt.Sprintf("consumer%d", nWhich)
sub, err := js.PullSubscribe("ORDERS.created", durableName, nats.BindStream("ORDERS"))
if err != nil {
fmt.Println("PullSubscribe:", err)
return
}
for {
msgs, _ := sub.Fetch(10, nats.Context(context.Background()))
for _, msg := range msgs {
log.Println(durableName+" recv data:", string(msg.Data))
msg.Ack()
}
}
}
for i:=0; i<2; i++ {
go recvmsg(i)
} |
Beta Was this translation helpful? Give feedback.
-
You should create a single durable consumer. Then bind subscribers to that consumer. sub, err := js.PullSubscribe("ORDERS.created", durableName, nats.Bind("ORDERS", durableName)) |
Beta Was this translation helpful? Give feedback.
-
thanks derekcollison , I follow your instruction, and got correct results ! |
Beta Was this translation helpful? Give feedback.
-
is there any convienent function like js.AddStreamIfNotExist, js.AddConsumerIfNotExist ? |
Beta Was this translation helpful? Give feedback.
-
@carr123 you can just call |
Beta Was this translation helpful? Give feedback.
-
|
Beta Was this translation helpful? Give feedback.
-
jetstream golang API is hard to understand and use.
is there any complete demonstration on how to invoke those APIs in different use scenerios, not just the "basic usage" ?
the jetstream doc on the official website using CLI to explains concepts. that doesn't help me too much. I want to talk to jetstream server programmably through golang API.
Beta Was this translation helpful? Give feedback.
All reactions