Skip to content

Commit

Permalink
Adding RabbitEventStore (#5)
Browse files Browse the repository at this point in the history
  • Loading branch information
Raezil authored Dec 7, 2024
1 parent a26eae6 commit 80dc5ed
Show file tree
Hide file tree
Showing 2 changed files with 192 additions and 0 deletions.
58 changes: 58 additions & 0 deletions README.MD
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,64 @@ func main() {
}
```

# Example 3
run RabbitMQ
```
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:4.0-management
```
then run go run main.go
```
package main
import (
"fmt"
"log"
. "github.com/Raezil/GoEventBus"
)
func NewDispatcher() *RabbitDispatcher {
return &RabbitDispatcher{
"HouseWasSold": func(m map[string]interface{}) (Result, error) {
price, ok := m["price"].(float64)
if !ok {
return Result{}, fmt.Errorf("price not provided or invalid")
}
result := fmt.Sprintf("House was sold for %.2f", price)
return Result{Message: result}, nil
},
}
}
func main() {
dispatcher := NewDispatcher()
rabbitStore, err := NewRabbitEventStore(dispatcher, "amqp://guest:guest@localhost:5672/", "events_queue")
if err != nil {
log.Fatalf("Failed to initialize RabbitEventStore: %v", err)
}
rabbitStore.Publish(&Event{
Id: "12345",
Projection: "HouseWasSold",
Args: map[string]interface{}{
"price": 100.0,
},
})
rabbitStore.Publish(&Event{
Id: "123456",
Projection: "HouseWasSold",
Args: map[string]interface{}{
"price": 200.0,
},
})
go rabbitStore.Broadcast()
select {}
}
```
3. Get the dependency
```sh
go get github.com/Raezil/GoEventBus
Expand Down
134 changes: 134 additions & 0 deletions rabbitmq.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package GoEventBus

import (
"encoding/json"
"fmt"
"log"
"sync"

"github.com/streadway/amqp"
)

// Dispatcher maps string representations of projections to handler functions
type RabbitDispatcher map[string]func(map[string]interface{}) (Result, error)

// RabbitEventStore handles publishing and dispatching events via RabbitMQ
type RabbitEventStore struct {
Mutex sync.Mutex
Dispatcher *RabbitDispatcher
RabbitConn *amqp.Connection
RabbitChannel *amqp.Channel
QueueName string
}

// NewRabbitEventStore initializes a RabbitEventStore
func NewRabbitEventStore(dispatcher *RabbitDispatcher, rabbitURL, queueName string) (*RabbitEventStore, error) {
conn, err := amqp.Dial(rabbitURL)
if err != nil {
return nil, fmt.Errorf("failed to connect to RabbitMQ: %w", err)
}

ch, err := conn.Channel()
if err != nil {
return nil, fmt.Errorf("failed to create a channel: %w", err)
}

_, err = ch.QueueDeclare(
queueName,
true, // Durable
false, // Delete when unused
false, // Exclusive
false, // No-wait
nil, // Arguments
)
if err != nil {
return nil, fmt.Errorf("failed to declare a queue: %w", err)
}

return &RabbitEventStore{
Dispatcher: dispatcher,
RabbitConn: conn,
RabbitChannel: ch,
QueueName: queueName,
}, nil
}

// Publish sends an event to the RabbitMQ queue
func (store *RabbitEventStore) Publish(event *Event) {
if event == nil {
log.Println("Attempted to publish a nil event")
return
}

body, err := json.Marshal(event)
if err != nil {
log.Printf("Error serializing event: %v", err)
return
}

err = store.RabbitChannel.Publish(
"", // Exchange
store.QueueName, // Routing key
false, // Mandatory
false, // Immediate
amqp.Publishing{
ContentType: "application/json",
Body: body,
},
)
if err != nil {
log.Printf("Error publishing event to RabbitMQ: %v", err)
}
}

// Commit processes a single event
func (store *RabbitEventStore) Commit(event *Event) error {
if store.Dispatcher == nil {
return fmt.Errorf("dispatcher is nil")
}

// Convert Projection to a string for use as a map key
projectionKey := fmt.Sprintf("%v", event.Projection)
handler, exists := (*store.Dispatcher)[projectionKey]
if !exists {
return fmt.Errorf("no handler for event projection: %v", event.Projection)
}

result, err := handler(event.Args)
if err != nil {
return fmt.Errorf("error handling event: %w", err)
}

log.Printf("Event id %s was successfully processed with result of %v", event.Id, result)
return nil
}

// Broadcast starts consuming events from RabbitMQ
func (store *RabbitEventStore) Broadcast() {
msgs, err := store.RabbitChannel.Consume(
store.QueueName,
"",
true, // Auto-acknowledge
false, // Exclusive
false, // No-local
false, // No-wait
nil, // Arguments
)
if err != nil {
log.Fatalf("Failed to consume messages: %v", err)
}

for msg := range msgs {
event := &Event{}
err := json.Unmarshal(msg.Body, event)
if err != nil {
log.Printf("Error deserializing event: %v", err)
continue
}

err = store.Commit(event)
if err != nil {
log.Printf("Error committing event: %v", err)
}
}
}

0 comments on commit 80dc5ed

Please sign in to comment.