Skip to content

Commit 79e12b5

Browse files
authored
Merge pull request #17 from rog-golang-buddies/queue_setup
Queue setup
2 parents b18c7c6 + be9fddf commit 79e12b5

25 files changed

+844
-14
lines changed

.github/workflows/golangci-lint.yml

+2-2
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,14 @@ jobs:
1010
steps:
1111
- uses: actions/setup-go@v3
1212
with:
13-
go-version: 1.17
13+
go-version: 1.18
1414
- name: Checkout
1515
uses: actions/checkout@v3
1616
- name: Run linters
1717
uses: golangci/golangci-lint-action@v3
1818
with:
1919
# Optional: version of golangci-lint to use in form of v1.2 or v1.2.3 or `latest` to use the latest version
20-
version: v1.29
20+
version: v1.47.2
2121

2222
# Optional: working directory, useful for monorepos
2323
# working-directory: somedir

README.md

+7
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,10 @@ In other words this service processes content of Open API file, transforms it to
1313
4. Validate content
1414
5. Parse content into an ASD model
1515
6. Put ASD model with metadata to the storage and update service queue
16+
17+
### Starting service
18+
The easiest way to start an application is to do it with docker.
19+
If you have docker you just need to run a command from the project root
20+
`docker-compose -f ./docker/docker-compose-dev.yml up -d --build`.
21+
And `docker-compose -f ./docker/docker-compose-dev.yml down` to stop.
22+
You can observe queues, and send and retrieve messages from queues using the web interface available by address http://localhost:15672 .

cmd/main.go

+5-3
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
package main
22

3-
import "fmt"
3+
import (
4+
"github.com/rog-golang-buddies/api-hub_data-scraping-service/internal"
5+
"os"
6+
)
47

58
func main() {
6-
// Feel free to delete this file.
7-
fmt.Println("Hello Gophers")
9+
os.Exit(internal.Start())
810
}

docker/docker-compose-dev.yml

+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
version: '3.9'
2+
3+
services:
4+
rabbit:
5+
image: rabbitmq:3-management #you may open management UI via http://localhost:15672/#/ login&password == guest
6+
container_name: rabbit
7+
ports:
8+
- "5672:5672"
9+
- "15672:15672"
10+
11+
data-scraping-service:
12+
container_name: dss
13+
build:
14+
context: ../.
15+
dockerfile: Dockerfile
16+
restart: unless-stopped
17+
depends_on:
18+
- rabbit

go.mod

+13
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,16 @@
11
module github.com/rog-golang-buddies/api-hub_data-scraping-service
22

33
go 1.18
4+
5+
require (
6+
github.com/golang/mock v1.6.0
7+
github.com/rabbitmq/amqp091-go v1.4.0
8+
github.com/stretchr/testify v1.7.0
9+
github.com/wagslane/go-rabbitmq v0.10.0
10+
)
11+
12+
require (
13+
github.com/davecgh/go-spew v1.1.0 // indirect
14+
github.com/pmezard/go-difflib v1.0.0 // indirect
15+
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect
16+
)

go.sum

+21-9
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,29 @@
1+
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
12
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
2-
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
3-
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
43
github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc=
54
github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs=
5+
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
6+
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
7+
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
8+
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
9+
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
610
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
711
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
12+
github.com/rabbitmq/amqp091-go v1.4.0 h1:T2G+J9W9OY4p64Di23J6yH7tOkMocgnESvYeBjuG9cY=
13+
github.com/rabbitmq/amqp091-go v1.4.0/go.mod h1:JsV0ofX5f1nwOGafb8L5rBItt9GyhfQfcJj+oyz0dGg=
814
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
9-
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
10-
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
11-
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
12-
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
15+
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
16+
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
17+
github.com/wagslane/go-rabbitmq v0.10.0 h1:y9Bw8Q/9gOvsHfjMOGQjCW3033aYTKabxDm8eyjUGjs=
18+
github.com/wagslane/go-rabbitmq v0.10.0/go.mod h1:u6xM1V7OO4D0szUy/F6Bya/9r0lLae/2FXBijkAQmn0=
1319
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
20+
go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA=
21+
go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
1422
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
1523
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
24+
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
1625
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
26+
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
1727
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
1828
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
1929
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
@@ -28,13 +38,15 @@ golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9sn
2838
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
2939
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
3040
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
41+
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
3142
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
3243
golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
44+
golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
3345
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
3446
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
3547
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
36-
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
3748
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
49+
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
50+
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
51+
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
3852
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
39-
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
40-
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

internal/app.go

+46
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package internal
2+
3+
import (
4+
"context"
5+
"github.com/rog-golang-buddies/api-hub_data-scraping-service/internal/config"
6+
"github.com/rog-golang-buddies/api-hub_data-scraping-service/internal/queue"
7+
"github.com/rog-golang-buddies/api-hub_data-scraping-service/internal/queue/handler"
8+
"github.com/rog-golang-buddies/api-hub_data-scraping-service/internal/queue/publisher"
9+
"log"
10+
)
11+
12+
func Start() int {
13+
ctx, cancel := context.WithCancel(context.Background())
14+
defer cancel()
15+
16+
conf := config.ReadConfig() //read configuration from file & env
17+
//initialize publisher connection to the queue
18+
//this library assumes using one publisher and one consumer per application
19+
//https://github.com/wagslane/go-rabbitmq/issues/79
20+
pub, err := publisher.NewPublisher(conf.QueueConfig) //TODO pass logger here and add it to publisher options
21+
if err != nil {
22+
log.Println("error while starting publisher: ", err)
23+
return 1
24+
}
25+
defer publisher.ClosePublisher(pub)
26+
//initialize consumer connection to the queue
27+
consumer, err := queue.NewConsumer(conf.QueueConfig) //TODO pass logger here and add it to consumer options
28+
if err != nil {
29+
log.Println("error while connecting to the queue: ", err)
30+
return 1
31+
}
32+
defer queue.CloseConsumer(consumer)
33+
34+
handl := handler.NewApiSpecDocHandler(pub, conf.QueueConfig)
35+
listener := queue.NewListener()
36+
err = listener.Start(consumer, &conf.QueueConfig, handl)
37+
if err != nil {
38+
log.Println("error while listening queue ", err)
39+
return 1
40+
}
41+
42+
<-ctx.Done()
43+
44+
log.Println("application stopped gracefully (not)")
45+
return 0
46+
}

internal/config/application.go

+20
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package config
2+
3+
type ApplicationConfig struct {
4+
QueueConfig QueueConfig
5+
}
6+
7+
func ReadConfig() ApplicationConfig {
8+
//Stub this method before the configuration task is not resolved
9+
//https://github.com/rog-golang-buddies/api-hub_data-scraping-service/issues/10
10+
//TODO implement with the method to read configuration from file and env
11+
return ApplicationConfig{
12+
QueueConfig: QueueConfig{
13+
UrlRequestQueue: "data-scraping-asd",
14+
ScrapingResultQueue: "storage-update-asd",
15+
NotificationQueue: "gateway-scrape_notifications",
16+
Url: "amqp://guest:guest@rabbit:5672/",
17+
Concurrency: 10,
18+
},
19+
}
20+
}

internal/config/queue.go

+10
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package config
2+
3+
//QueueConfig queue configuration
4+
type QueueConfig struct {
5+
UrlRequestQueue string //UrlRequestQueue name to listen to the new events
6+
ScrapingResultQueue string //Queue name to send processed ApiSpecDoc
7+
NotificationQueue string //Queue name to notify a user about error or success (if required)
8+
Url string //RabbitMQ url
9+
Concurrency int //Number of parallel handlers
10+
}

internal/dto/scrapingResult.go

+9
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package dto
2+
3+
import "github.com/rog-golang-buddies/api-hub_data-scraping-service/internal/dto/apiSpecDoc"
4+
5+
type ScrapingResult struct {
6+
IsNotifyUser bool
7+
8+
ApiSpecDoc apiSpecDoc.ApiSpecDoc
9+
}

internal/dto/urlRequest.go

+12
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package dto
2+
3+
//UrlRequest represents listening request model
4+
type UrlRequest struct {
5+
//File url to scrape data
6+
FileUrl string
7+
8+
//A flag is a notification required related to an error notification in case of an error
9+
//Notification is required when this is the request from the user and doesn't require it
10+
//if it is the request from the storage and update service.
11+
IsNotifyUser bool
12+
}

internal/dto/userNotification.go

+32
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package dto
2+
3+
import "fmt"
4+
5+
//UserNotification represents basic DTO notification to the user if requested
6+
//Initially, it supposed to be simple - if err != nil => error happens, else all is ok
7+
type UserNotification struct {
8+
Error *ProcessingError
9+
}
10+
11+
func NewUserNotification(procErr *ProcessingError) UserNotification {
12+
return UserNotification{Error: procErr}
13+
}
14+
15+
//ProcessingError represents basic DTO to provide information about the error
16+
//when the processing request contains a notification request
17+
type ProcessingError struct {
18+
Cause error
19+
20+
Message string
21+
}
22+
23+
func (pe *ProcessingError) Error() string {
24+
return fmt.Sprintf("%s: %v", pe.Message, pe.Cause)
25+
}
26+
27+
func NewProcessingError(message string, err error) ProcessingError {
28+
return ProcessingError{
29+
Cause: err,
30+
Message: message,
31+
}
32+
}

internal/queue/consumer.go

+40
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package queue
2+
3+
import (
4+
"github.com/rog-golang-buddies/api-hub_data-scraping-service/internal/config"
5+
"github.com/wagslane/go-rabbitmq"
6+
"io"
7+
"log"
8+
)
9+
10+
//Consumer is just an interface for the library consumer which doesn't have one.
11+
//go:generate mockgen -source=consumer.go -destination=./mocks/consumer.go
12+
type Consumer interface {
13+
io.Closer
14+
StartConsuming(
15+
handler rabbitmq.Handler,
16+
queue string,
17+
routingKeys []string,
18+
optionFuncs ...func(*rabbitmq.ConsumeOptions),
19+
) error
20+
}
21+
22+
func NewConsumer(conf config.QueueConfig) (Consumer, error) {
23+
consumer, err := rabbitmq.NewConsumer(
24+
conf.Url,
25+
rabbitmq.Config{},
26+
rabbitmq.WithConsumerOptionsLogging,
27+
)
28+
if err != nil {
29+
return nil, err
30+
}
31+
return &consumer, nil
32+
}
33+
34+
func CloseConsumer(consumer Consumer) {
35+
log.Println("closing consumer")
36+
err := consumer.Close()
37+
if err != nil {
38+
log.Println("error while closing consumer: ", err)
39+
}
40+
}

internal/queue/consumer_test.go

+15
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package queue_test
2+
3+
import (
4+
"github.com/golang/mock/gomock"
5+
"github.com/rog-golang-buddies/api-hub_data-scraping-service/internal/queue"
6+
mock_queue "github.com/rog-golang-buddies/api-hub_data-scraping-service/internal/queue/mocks"
7+
"testing"
8+
)
9+
10+
func TestClosePublisher(t *testing.T) {
11+
ctrl := gomock.NewController(t)
12+
consumer := mock_queue.NewMockConsumer(ctrl)
13+
consumer.EXPECT().Close().Return(nil)
14+
queue.CloseConsumer(consumer)
15+
}

internal/queue/handler/apispec.go

+68
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package handler
2+
3+
import (
4+
"encoding/json"
5+
"github.com/rog-golang-buddies/api-hub_data-scraping-service/internal/config"
6+
"github.com/rog-golang-buddies/api-hub_data-scraping-service/internal/dto"
7+
"github.com/rog-golang-buddies/api-hub_data-scraping-service/internal/dto/apiSpecDoc"
8+
"github.com/rog-golang-buddies/api-hub_data-scraping-service/internal/queue/publisher"
9+
"github.com/wagslane/go-rabbitmq"
10+
"log"
11+
)
12+
13+
type ApiSpecDocHandler struct {
14+
publisher publisher.Publisher
15+
config config.QueueConfig
16+
}
17+
18+
func (asdh *ApiSpecDocHandler) Handle(delivery rabbitmq.Delivery) rabbitmq.Action {
19+
log.Printf("consumed: %v", string(delivery.Body))
20+
//call process here
21+
var req dto.UrlRequest
22+
err := json.Unmarshal(delivery.Body, &req)
23+
if err != nil {
24+
log.Printf("error unmarshalling message: '%v', err: %s\n", string(delivery.Body), err)
25+
return rabbitmq.NackDiscard
26+
}
27+
//here processing of the request happens...
28+
asd := apiSpecDoc.ApiSpecDoc{} //TODO replace this stub with process call
29+
30+
//publish to the required queue success or error
31+
result := dto.ScrapingResult{IsNotifyUser: req.IsNotifyUser, ApiSpecDoc: asd}
32+
err = asdh.publish(&delivery, result, asdh.config.ScrapingResultQueue)
33+
if err != nil {
34+
log.Println("error while publishing: ", err)
35+
//Here is some error while publishing happened - probably something wrong with the queue
36+
return rabbitmq.NackDiscard
37+
}
38+
if req.IsNotifyUser {
39+
err = asdh.publish(&delivery, dto.NewUserNotification(nil), asdh.config.NotificationQueue)
40+
if err != nil {
41+
log.Println("error while notifying user")
42+
//don't discard this message because it was published to the storage service successfully
43+
}
44+
}
45+
log.Println("Url scraped successfully")
46+
return rabbitmq.Ack
47+
}
48+
49+
func (asdh *ApiSpecDocHandler) publish(delivery *rabbitmq.Delivery, message any, queue string) error {
50+
content, err := json.Marshal(message)
51+
if err != nil {
52+
log.Println("error while marshalling: ", err)
53+
return err
54+
}
55+
return asdh.publisher.Publish(content,
56+
[]string{queue},
57+
rabbitmq.WithPublishOptionsCorrelationID(delivery.CorrelationId),
58+
rabbitmq.WithPublishOptionsContentType("application/json"),
59+
rabbitmq.WithPublishOptionsPersistentDelivery,
60+
)
61+
}
62+
63+
func NewApiSpecDocHandler(publisher publisher.Publisher, config config.QueueConfig) Handler {
64+
return &ApiSpecDocHandler{
65+
publisher: publisher,
66+
config: config,
67+
}
68+
}

0 commit comments

Comments
 (0)