Skip to content

Commit

Permalink
Refactoring EventStore (#3)
Browse files Browse the repository at this point in the history
* fix: bug in eventstore.go

* refactoring!
  • Loading branch information
Raezil authored Dec 7, 2024
1 parent dd73881 commit 9f6873d
Showing 1 changed file with 62 additions and 34 deletions.
96 changes: 62 additions & 34 deletions eventstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ func NewEventStore(dispatcher *Dispatcher) *EventStore {

// Publish adds an event to the event pool
func (eventstore *EventStore) Publish(event *Event) {
if event == nil {
log.Println("Attempted to publish a nil event")
return
}
eventstore.Events.Put(event)
}

Expand Down Expand Up @@ -69,8 +73,7 @@ func (eventstore *EventStore) Commit() error {
return fmt.Errorf("error handling event: %w", err)
}

log.Printf("Event id: %s was successfully published", event.Id)

log.Printf("Event id: %s was successfully processed", event.Id)
return nil
}

Expand All @@ -80,25 +83,22 @@ func (eventstore *EventStore) Broadcast() error {
defer eventstore.Mutex.Unlock()

var lastErr error
// Try to commit an event
for {
err := eventstore.Commit()
if err != nil {
// If there are no more events to process, break the loop
if err.Error() != "" {
if err.Error() == "no events to process" {
break
}
// Capture the last error if something else goes wrong
lastErr = err
log.Printf("Error processing event: %v", err)
}

}

return lastErr
}

// NewEventStore initializes an EventStore with a dispatcher and an event pool
func NewEventStoreWithRabbitMq(dispatcher *Dispatcher) *EventStore {
// NewEventStoreWithRabbitMQ initializes an EventStore with RabbitMQ integration
func NewEventStoreWithRabbitMQ(dispatcher *Dispatcher) *EventStore {
conn, err := amqp.Dial(rabbitMQURL)
if err != nil {
log.Fatalf("Failed to connect to RabbitMQ: %v", err)
Expand Down Expand Up @@ -138,58 +138,86 @@ func NewEventStoreWithRabbitMq(dispatcher *Dispatcher) *EventStore {

// PublishToRabbitMQ sends an event to RabbitMQ
func (eventstore *EventStore) PublishToRabbitMQ(event *Event) error {
body := fmt.Sprintf("EventID: %v, Args: %v", event.Id, event.Args)
if event == nil {
return fmt.Errorf("cannot publish a nil event")
}

// Format the event data to publish
body := fmt.Sprintf("EventID: %s, Args: %v", event.Id, event.Args)
err := eventstore.Channel.Publish(
rabbitMQExchange, // exchange
"", // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
ContentType: "application/json",
Body: []byte(body),
},
)
if err != nil {
log.Printf("Failed to publish a message: %v", err)
}
handler, exists := (*eventstore.Dispatcher)[event.Projection]
if !exists {
return fmt.Errorf("no handler for event projection: %s", event.Projection)
return fmt.Errorf("failed to publish message to RabbitMQ: %w", err)
}

// Execute the handler
_, err = handler(event.Args)
if err != nil {
return fmt.Errorf("error handling event: %w", err)
}
return err
// Log successful publishing
log.Printf("Successfully published event ID: %s to RabbitMQ", event.Id)
return nil
}

// CloseRabbitMQ cleans up RabbitMQ resources
func (es *EventStore) CloseRabbitMQ() {
if es.Channel != nil {
es.Channel.Close()
func (eventstore *EventStore) CloseRabbitMQ() {
if eventstore.Channel != nil {
eventstore.Channel.Close()
}
if es.RabbitMQ != nil {
es.RabbitMQ.Close()
if eventstore.RabbitMQ != nil {
eventstore.RabbitMQ.Close()
}
}

// Broadcast sends all stored events to RabbitMQ
func (eventstore *EventStore) BroadcastWithRabbitMq() {
// BroadcastWithRabbitMq processes and publishes all events in the pool
func (eventstore *EventStore) BroadcastWithRabbitMQ() error {
eventstore.Mutex.Lock()
defer eventstore.Mutex.Unlock()

var lastErr error

for {
// Fetch an event from the pool
event := eventstore.Events.Get().(*Event)
if event == nil {
// Get an event from the pool
curr := eventstore.Events.Get()
if curr == nil {
// Exit if no more events are in the pool
break
}

// Type assert the event
event, ok := curr.(*Event)
if !ok || event == nil {
log.Println("Invalid event retrieved from pool")
continue
}

// Publish the event to RabbitMQ
if err := eventstore.PublishToRabbitMQ(event); err != nil {
log.Printf("Failed to broadcast event %v: %v", event.Id, err)
err := eventstore.PublishToRabbitMQ(event)
if err != nil {
log.Printf("Failed to publish event ID %s: %v", event.Id, err)
lastErr = err // Record the last error encountered
continue
}

// Optionally, handle the event if there's a dispatcher
if eventstore.Dispatcher != nil {
handler, exists := (*eventstore.Dispatcher)[event.Projection]
if exists {
_, err := handler(event.Args)
if err != nil {
log.Printf("Error handling event ID %s: %v", event.Id, err)
lastErr = err
}
} else {
log.Printf("No handler found for event projection: %s", event.Projection)
}
}
}

// Return the last encountered error, if any
return lastErr
}

0 comments on commit 9f6873d

Please sign in to comment.