Skip to content

Commit

Permalink
Merge pull request #35 from jxsl13/feature-pause-rework
Browse files Browse the repository at this point in the history
pause/resume rework
  • Loading branch information
jxsl13 authored Nov 16, 2023
2 parents b054a69 + 6b244f1 commit 459aecd
Show file tree
Hide file tree
Showing 39 changed files with 3,104 additions and 554 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ jobs:
run: docker-compose up -d

- name: Code Coverage
run: go test ./... -timeout 600s -race -count=1 -covermode=atomic -coverprofile=coverage.txt
run: go test ./... -timeout 900s -race -count=1 -covermode=atomic -coverprofile=coverage.txt

- name: Upload coverage to Codecov
uses: codecov/codecov-action@v3
Expand Down
453 changes: 453 additions & 0 deletions DEBUG.md

Large diffs are not rendered by default.

5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,8 @@
environment:
docker-compose up -d

down:
docker-compose down

test:
go test -v -race -count=1 ./...
go test -v -race -count=1 ./...
21 changes: 12 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
- connection & session (channel) pooling
- reconnect handling
- batch processing
- pause/resume consumers
- clean shutdown handling
- sane defaults
- resilience & robustness over performance by default (publisher & subscriber acks)
Expand All @@ -37,28 +38,29 @@ import (
"os/signal"

"github.com/jxsl13/amqpx"
"github.com/jxsl13/amqpx/pool"
"github.com/jxsl13/amqpx/logging"
)

func main() {
ctx, cancel := signal.NotifyContext(context.Background())
defer cancel()

amqpx.RegisterTopologyCreator(func(t *amqpx.Topologer) error {
amqpx.RegisterTopologyCreator(func(t *pool.Topologer) error {
// error handling omitted for brevity
t.ExchangeDeclare("example-exchange", "topic") // durable exchange by default
t.QueueDeclare("example-queue") // durable quorum queue by default
t.QueueBind("example-queue", "route.name.v1.event", "example-exchange")
return nil
})
amqpx.RegisterTopologyDeleter(func(t *amqpx.Topologer) error {
amqpx.RegisterTopologyDeleter(func(t *pool.Topologer) error {
// error handling omitted for brevity
t.QueueDelete("example-queue")
t.ExchangeDelete("example-exchange")
return nil
})

amqpx.RegisterHandler("example-queue", func(msg amqpx.Delivery) error {
amqpx.RegisterHandler("example-queue", func(msg pool.Delivery) error {
fmt.Println("received message:", string(msg.Body))
fmt.Println("canceling context")
cancel()
Expand All @@ -73,7 +75,7 @@ func main() {
)
defer amqpx.Close()

amqpx.Publish("example-exchange", "route.name.v1.event", amqpx.Publishing{
amqpx.Publish("example-exchange", "route.name.v1.event", pool.Publishing{
ContentType: "application/json",
Body: []byte("my test event"),
})
Expand All @@ -95,6 +97,7 @@ import (
"os/signal"

"github.com/jxsl13/amqpx"
"github.com/jxsl13/amqpx/pool"
"github.com/jxsl13/amqpx/logging"
)

Expand All @@ -113,18 +116,18 @@ func main() {
ctx, cancel := signal.NotifyContext(context.Background())
defer cancel()

amqpx.RegisterTopologyCreator(func(t *amqpx.Topologer) error {
amqpx.RegisterTopologyCreator(func(t *pool.Topologer) error {
// error handling omitted for brevity

t.ExchangeDeclare("example-exchange", "topic",
amqpx.ExchangeDeclareOptions{
pool.ExchangeDeclareOptions{
Durable: true,
},
)
t.QueueDeclare("example-queue",
amqpx.QueueDeclareOptions{
pool.QueueDeclareOptions{
Durable: true,
Args: amqpx.QuorumQueue,
Args: pool.QuorumQueue,
},
)
t.QueueBind("example-queue", "route.name.v1.event", "example-exchange")
Expand All @@ -139,7 +142,7 @@ func main() {

amqpx.RegisterHandler("example-queue",
ExampleConsumer(cancel),
amqpx.ConsumeOptions{
pool.ConsumeOptions{
ConsumerTag: "example-queue-cunsumer",
Exclusive: true,
},
Expand Down
Loading

0 comments on commit 459aecd

Please sign in to comment.