Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Raezil committed Dec 7, 2024
1 parent a68c581 commit 0d4a0e9
Showing 1 changed file with 21 additions and 7 deletions.
28 changes: 21 additions & 7 deletions eventstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,6 @@ func (eventstore *EventStore) PublishWithRabbitMQ(event *Event) {
}

log.Printf("Event %s published to RabbitMQ", event.Id)
eventstore.Events.Put(event)
}

func (eventstore *EventStore) BroadcastWithRabbitMQ(queueName string) error {
Expand Down Expand Up @@ -205,32 +204,47 @@ func (eventstore *EventStore) BroadcastWithRabbitMQ(queueName string) error {
}
}()

log.Printf("Started consuming messages from queue: %s", queueName)
return nil
}

func (eventstore *EventStore) processMessage(msg amqp.Delivery) {
// Deserialize the event
var event Event
if err := json.Unmarshal(msg.Body, &event); err != nil {
log.Printf("Failed to deserialize event: %v. Payload: %s", err, string(msg.Body))
err := json.Unmarshal(msg.Body, &event)
if err != nil {
log.Printf("Failed to deserialize message. Error: %v, Payload: %s", err, string(msg.Body))
// Acknowledge the message to remove it from the queue, as it's malformed
msg.Ack(false)
return
}

// Ensure the Dispatcher is not nil
if eventstore.Dispatcher == nil {
log.Printf("Dispatcher is nil, unable to process event ID: %s", event.Id)
msg.Nack(false, false) // Reject and do not requeue the message
return
}

// Retrieve the handler
// Retrieve the handler for the event
handler, exists := (*eventstore.Dispatcher)[event.Projection]
if !exists {
log.Printf("No handler found for projection: %s", event.Projection)
log.Printf("No handler found for projection: %s (Event ID: %s)", event.Projection, event.Id)
msg.Nack(false, false) // Reject and do not requeue the message
return
}

// Execute the handler
result, err := handler(event.Args)
if err != nil {
log.Printf("Handler error for event ID %s: %v", event.Id, err)
log.Printf("Error processing event ID: %s. Handler error: %v", event.Id, err)
msg.Nack(false, true) // Reject and requeue the message
return
}

log.Printf("Event %s processed successfully. Result: %v", event.Id, result)
// Successfully processed the event
log.Printf("Event ID: %s processed successfully. Result: %v", event.Id, result)
msg.Ack(false) // Acknowledge the message
}

func (eventstore *EventStore) CloseRabbitMQ() {
Expand Down

0 comments on commit 0d4a0e9

Please sign in to comment.