Skip to content

Commit

Permalink
fix!
Browse files Browse the repository at this point in the history
  • Loading branch information
Raezil committed Dec 7, 2024
1 parent 83d681c commit a68c581
Showing 1 changed file with 31 additions and 20 deletions.
51 changes: 31 additions & 20 deletions eventstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ func (eventstore *EventStore) PublishWithRabbitMQ(event *Event) {
}

func (eventstore *EventStore) BroadcastWithRabbitMQ(queueName string) error {
// Declare the queue
q, err := eventstore.Channel.QueueDeclare(
queueName, // Queue name
true, // Durable
Expand All @@ -171,6 +172,7 @@ func (eventstore *EventStore) BroadcastWithRabbitMQ(queueName string) error {
return fmt.Errorf("failed to declare queue: %w", err)
}

// Bind the queue to the exchange
err = eventstore.Channel.QueueBind(
q.Name, // Queue name
"", // Routing key
Expand All @@ -182,10 +184,11 @@ func (eventstore *EventStore) BroadcastWithRabbitMQ(queueName string) error {
return fmt.Errorf("failed to bind queue: %w", err)
}

// Consume messages from the queue
messages, err := eventstore.Channel.Consume(
q.Name, // Queue name
"", // Consumer tag
true, // Auto-ack
true, // Auto-acknowledge
false, // Exclusive
false, // No-local
false, // No-wait
Expand All @@ -197,31 +200,39 @@ func (eventstore *EventStore) BroadcastWithRabbitMQ(queueName string) error {

go func() {
for msg := range messages {
var event Event
err := json.Unmarshal(msg.Body, &event)
if err != nil {
log.Printf("Failed to deserialize event: %v", err)
continue
}

// Dispatch the event
handler, exists := (*eventstore.Dispatcher)[event.Projection]
if !exists {
log.Printf("no handler for event projection: %s", event.Projection)
continue
}

// Execute the handler
_, err = handler(event.Args)
if err != nil {
continue
}
// Process each message in a separate goroutine
go eventstore.processMessage(msg)
}
}()

return nil
}

func (eventstore *EventStore) processMessage(msg amqp.Delivery) {
// Deserialize the event
var event Event
if err := json.Unmarshal(msg.Body, &event); err != nil {
log.Printf("Failed to deserialize event: %v. Payload: %s", err, string(msg.Body))
return
}

// Retrieve the handler
handler, exists := (*eventstore.Dispatcher)[event.Projection]
if !exists {
log.Printf("No handler found for projection: %s", event.Projection)
return
}

// Execute the handler
result, err := handler(event.Args)
if err != nil {
log.Printf("Handler error for event ID %s: %v", event.Id, err)
return
}

log.Printf("Event %s processed successfully. Result: %v", event.Id, result)
}

func (eventstore *EventStore) CloseRabbitMQ() {
if eventstore.Channel != nil {
eventstore.Channel.Close()
Expand Down

0 comments on commit a68c581

Please sign in to comment.