Skip to content

Commit

Permalink
Adding RabbitMq (#2)
Browse files Browse the repository at this point in the history
  • Loading branch information
Raezil authored Dec 6, 2024
1 parent 4439125 commit dd73881
Show file tree
Hide file tree
Showing 5 changed files with 161 additions and 2 deletions.
45 changes: 45 additions & 0 deletions README.MD
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,51 @@ func main() {
eventstore.Broadcast()
}
```
Example 3
```
package main
import (
. "github.com/Raezil/GoEventBus"
"fmt"
"log"
)
type HouseWasSoldEvent struct{}
// NewDispatcher initializes the dispatcher with event handlers
func NewDispatcher() *Dispatcher {
return &Dispatcher{
HouseWasSoldEvent{}: func(m map[string]interface{}) (Result, error) {
price, ok := m["price"].(int)
if !ok {
return Result{}, fmt.Errorf("price not provided or invalid")
}
result := fmt.Sprintf("House was sold for %d", price)
log.Println(result)
return Result{
Message: result,
}, nil
},
}
}
func main() {
// Initialize dispatcher and event store
dispatcher := NewDispatcher()
eventstore := NewEventStoreWithRabbitMq(dispatcher)
eventstore.PublishToRabbitMQ(NewEvent(
HouseWasSoldEvent{},
map[string]interface{}{
"price": 100,
},
))
// Broadcast the event
eventstore.BroadcastWithRabbitMq()
}
```

3. Get the dependency
Expand Down
107 changes: 107 additions & 0 deletions eventstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,23 @@ import (
"fmt"
"log"
"sync"

"github.com/streadway/amqp"
)

// RabbitMQ settings
const (
rabbitMQURL = "amqp://guest:guest@localhost:5672/"
rabbitMQExchange = "events_exchange"
)

// EventStore handles publishing and dispatching events
type EventStore struct {
Mutex sync.Mutex
Dispatcher *Dispatcher
Events *sync.Pool
RabbitMQ *amqp.Connection
Channel *amqp.Channel
}

// NewEventStore initializes an EventStore with a dispatcher and an event pool
Expand Down Expand Up @@ -86,3 +96,100 @@ func (eventstore *EventStore) Broadcast() error {

return lastErr
}

// NewEventStore initializes an EventStore with a dispatcher and an event pool
func NewEventStoreWithRabbitMq(dispatcher *Dispatcher) *EventStore {
conn, err := amqp.Dial(rabbitMQURL)
if err != nil {
log.Fatalf("Failed to connect to RabbitMQ: %v", err)
}

ch, err := conn.Channel()
if err != nil {
log.Fatalf("Failed to open a channel: %v", err)
}

// Declare an exchange
err = ch.ExchangeDeclare(
rabbitMQExchange, // name
"fanout", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
if err != nil {
log.Fatalf("Failed to declare an exchange: %v", err)
}

return &EventStore{
Mutex: sync.Mutex{},
Dispatcher: dispatcher,
Events: &sync.Pool{
New: func() interface{} {
return &Event{} // Return a new Event
},
},
RabbitMQ: conn,
Channel: ch,
}
}

// PublishToRabbitMQ sends an event to RabbitMQ
func (eventstore *EventStore) PublishToRabbitMQ(event *Event) error {
body := fmt.Sprintf("EventID: %v, Args: %v", event.Id, event.Args)
err := eventstore.Channel.Publish(
rabbitMQExchange, // exchange
"", // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
},
)
if err != nil {
log.Printf("Failed to publish a message: %v", err)
}
handler, exists := (*eventstore.Dispatcher)[event.Projection]
if !exists {
return fmt.Errorf("no handler for event projection: %s", event.Projection)
}

// Execute the handler
_, err = handler(event.Args)
if err != nil {
return fmt.Errorf("error handling event: %w", err)
}
return err
}

// CloseRabbitMQ cleans up RabbitMQ resources
func (es *EventStore) CloseRabbitMQ() {
if es.Channel != nil {
es.Channel.Close()
}
if es.RabbitMQ != nil {
es.RabbitMQ.Close()
}
}

// Broadcast sends all stored events to RabbitMQ
func (eventstore *EventStore) BroadcastWithRabbitMq() {
eventstore.Mutex.Lock()
defer eventstore.Mutex.Unlock()

for {
// Fetch an event from the pool
event := eventstore.Events.Get().(*Event)
if event == nil {
break
}

// Publish the event to RabbitMQ
if err := eventstore.PublishToRabbitMQ(event); err != nil {
log.Printf("Failed to broadcast event %v: %v", event.Id, err)
}
}
}
5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,7 @@ module github.com/Raezil/GoEventBus

go 1.22.2

require github.com/google/uuid v1.6.0
require (
github.com/google/uuid v1.6.0
github.com/streadway/amqp v1.1.0
)
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/streadway/amqp v1.1.0 h1:py12iX8XSyI7aN/3dUT8DFIDJazNJsVJdxNVEpnQTZM=
github.com/streadway/amqp v1.1.0/go.mod h1:WYSrTEYHOXHd0nwFeUXAe2G2hRnQT+deZJJf88uS9Bg=
4 changes: 3 additions & 1 deletion go.work
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
go 1.22.2

use .
use (
.
)

0 comments on commit dd73881

Please sign in to comment.