Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring! #4

Merged
merged 1 commit into from
Dec 7, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 134 additions & 0 deletions eventstore.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package GoEventBus

import (
"encoding/json"
"fmt"
"log"
"sync"
Expand All @@ -23,6 +24,34 @@ type EventStore struct {
Channel *amqp.Channel
}

func (eventstore *EventStore) InitRabbitMQ() error {
var err error
eventstore.RabbitMQ, err = amqp.Dial(rabbitMQURL)
if err != nil {
return fmt.Errorf("failed to connect to RabbitMQ: %w", err)
}

eventstore.Channel, err = eventstore.RabbitMQ.Channel()
if err != nil {
return fmt.Errorf("failed to open RabbitMQ channel: %w", err)
}

err = eventstore.Channel.ExchangeDeclare(
rabbitMQExchange,
"fanout", // Exchange type
true, // Durable
false, // Auto-deleted
false, // Internal
false, // No-wait
nil, // Arguments
)
if err != nil {
return fmt.Errorf("failed to declare exchange: %w", err)
}

return nil
}

// NewEventStore initializes an EventStore with a dispatcher and an event pool
func NewEventStore(dispatcher *Dispatcher) *EventStore {
return &EventStore{
Expand Down Expand Up @@ -96,3 +125,108 @@ func (eventstore *EventStore) Broadcast() error {

return lastErr
}

func (eventstore *EventStore) PublishWithRabbitMQ(event *Event) {
if event == nil {
log.Println("Attempted to publish a nil event")
return
}

// Serialize the event (e.g., JSON)
eventData, err := json.Marshal(event)
if err != nil {
log.Printf("Failed to serialize event: %v", err)
return
}

err = eventstore.Channel.Publish(
rabbitMQExchange, // Exchange
"", // Routing key (empty for fanout exchange)
false, // Mandatory
false, // Immediate
amqp.Publishing{
ContentType: "application/json",
Body: eventData,
},
)
if err != nil {
log.Printf("Failed to publish event to RabbitMQ: %v", err)
return
}

log.Printf("Event %s published to RabbitMQ", event.Id)
eventstore.Events.Put(event)
}

func (eventstore *EventStore) BroadcastWithRabbitMQ(queueName string) error {
q, err := eventstore.Channel.QueueDeclare(
queueName, // Queue name
true, // Durable
false, // Delete when unused
false, // Exclusive
false, // No-wait
nil, // Arguments
)
if err != nil {
return fmt.Errorf("failed to declare queue: %w", err)
}

err = eventstore.Channel.QueueBind(
q.Name, // Queue name
"", // Routing key
rabbitMQExchange, // Exchange
false, // No-wait
nil, // Arguments
)
if err != nil {
return fmt.Errorf("failed to bind queue: %w", err)
}

messages, err := eventstore.Channel.Consume(
q.Name, // Queue name
"", // Consumer tag
true, // Auto-ack
false, // Exclusive
false, // No-local
false, // No-wait
nil, // Arguments
)
if err != nil {
return fmt.Errorf("failed to register consumer: %w", err)
}

go func() {
for msg := range messages {
var event Event
err := json.Unmarshal(msg.Body, &event)
if err != nil {
log.Printf("Failed to deserialize event: %v", err)
continue
}

// Dispatch the event
handler, exists := (*eventstore.Dispatcher)[event.Projection]
if !exists {
log.Printf("no handler for event projection: %s", event.Projection)
continue
}

// Execute the handler
_, err = handler(event.Args)
if err != nil {
continue
}
}
}()

return nil
}

func (eventstore *EventStore) CloseRabbitMQ() {
if eventstore.Channel != nil {
eventstore.Channel.Close()
}
if eventstore.RabbitMQ != nil {
eventstore.RabbitMQ.Close()
}
}
Loading