Skip to content

Commit

Permalink
refactoring! (#4)
Browse files Browse the repository at this point in the history
  • Loading branch information
Raezil authored Dec 7, 2024
1 parent 410fea6 commit 83d681c
Showing 1 changed file with 134 additions and 0 deletions.
134 changes: 134 additions & 0 deletions eventstore.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package GoEventBus

import (
"encoding/json"
"fmt"
"log"
"sync"
Expand All @@ -23,6 +24,34 @@ type EventStore struct {
Channel *amqp.Channel
}

func (eventstore *EventStore) InitRabbitMQ() error {
var err error
eventstore.RabbitMQ, err = amqp.Dial(rabbitMQURL)
if err != nil {
return fmt.Errorf("failed to connect to RabbitMQ: %w", err)
}

eventstore.Channel, err = eventstore.RabbitMQ.Channel()
if err != nil {
return fmt.Errorf("failed to open RabbitMQ channel: %w", err)
}

err = eventstore.Channel.ExchangeDeclare(
rabbitMQExchange,
"fanout", // Exchange type
true, // Durable
false, // Auto-deleted
false, // Internal
false, // No-wait
nil, // Arguments
)
if err != nil {
return fmt.Errorf("failed to declare exchange: %w", err)
}

return nil
}

// NewEventStore initializes an EventStore with a dispatcher and an event pool
func NewEventStore(dispatcher *Dispatcher) *EventStore {
return &EventStore{
Expand Down Expand Up @@ -96,3 +125,108 @@ func (eventstore *EventStore) Broadcast() error {

return lastErr
}

func (eventstore *EventStore) PublishWithRabbitMQ(event *Event) {
if event == nil {
log.Println("Attempted to publish a nil event")
return
}

// Serialize the event (e.g., JSON)
eventData, err := json.Marshal(event)
if err != nil {
log.Printf("Failed to serialize event: %v", err)
return
}

err = eventstore.Channel.Publish(
rabbitMQExchange, // Exchange
"", // Routing key (empty for fanout exchange)
false, // Mandatory
false, // Immediate
amqp.Publishing{
ContentType: "application/json",
Body: eventData,
},
)
if err != nil {
log.Printf("Failed to publish event to RabbitMQ: %v", err)
return
}

log.Printf("Event %s published to RabbitMQ", event.Id)
eventstore.Events.Put(event)
}

func (eventstore *EventStore) BroadcastWithRabbitMQ(queueName string) error {
q, err := eventstore.Channel.QueueDeclare(
queueName, // Queue name
true, // Durable
false, // Delete when unused
false, // Exclusive
false, // No-wait
nil, // Arguments
)
if err != nil {
return fmt.Errorf("failed to declare queue: %w", err)
}

err = eventstore.Channel.QueueBind(
q.Name, // Queue name
"", // Routing key
rabbitMQExchange, // Exchange
false, // No-wait
nil, // Arguments
)
if err != nil {
return fmt.Errorf("failed to bind queue: %w", err)
}

messages, err := eventstore.Channel.Consume(
q.Name, // Queue name
"", // Consumer tag
true, // Auto-ack
false, // Exclusive
false, // No-local
false, // No-wait
nil, // Arguments
)
if err != nil {
return fmt.Errorf("failed to register consumer: %w", err)
}

go func() {
for msg := range messages {
var event Event
err := json.Unmarshal(msg.Body, &event)
if err != nil {
log.Printf("Failed to deserialize event: %v", err)
continue
}

// Dispatch the event
handler, exists := (*eventstore.Dispatcher)[event.Projection]
if !exists {
log.Printf("no handler for event projection: %s", event.Projection)
continue
}

// Execute the handler
_, err = handler(event.Args)
if err != nil {
continue
}
}
}()

return nil
}

func (eventstore *EventStore) CloseRabbitMQ() {
if eventstore.Channel != nil {
eventstore.Channel.Close()
}
if eventstore.RabbitMQ != nil {
eventstore.RabbitMQ.Close()
}
}

0 comments on commit 83d681c

Please sign in to comment.