diff --git a/.github/workflows/stale.yml b/.github/workflows/stale.yml new file mode 100644 index 0000000..73b9327 --- /dev/null +++ b/.github/workflows/stale.yml @@ -0,0 +1,44 @@ +name: Stale + +on: + workflow_dispatch: + schedule: + - cron: '0 12 * * *' + +jobs: + Stale: + runs-on: ubuntu-latest + name: Run stale + steps: + - name: Checkout + id: checkout + uses: actions/checkout@v4 + - name: Stale + id: stale + uses: sonia-corporation/stale@2.5.0 + with: + pull-request-stale-label: stale-label + pull-request-days-before-stale: 14 + pull-request-days-before-close: 10 + pull-request-ignore-all-assignees: true + pull-request-delete-branch-after-close: true + pull-request-stale-comment: | + This pull is inactive since 14 days! + If there is no activity, it will be closed in two weeks. + + You should take one of the following actions: + - Manually close this PR if it is no longer relevant + - Push new commits or comment if you have more information to share + issue-stale-label: inactive + issue-ignore-all-assignees: true + issue-days-before-stale: 14 + issue-days-before-close: 10 + issue-stale-comment: | + This issue is inactive since 14 days! + If there is no activity, it will be closed in two weeks. + + You should take one of the following actions: + - Manually close this issue if it is no longer relevant + - Comment if you have more information to share + issue-add-labels-after-close: | + closed-due-to-inactivity diff --git a/actor.go b/actor.go index 94d6ca7..efabf6f 100644 --- a/actor.go +++ b/actor.go @@ -28,10 +28,8 @@ import ( "context" "errors" "fmt" - "sync" "time" - "go.uber.org/atomic" "golang.org/x/sync/errgroup" "google.golang.org/protobuf/types/known/anypb" "google.golang.org/protobuf/types/known/timestamppb" @@ -42,7 +40,6 @@ import ( "github.com/tochemey/ego/v3/egopb" "github.com/tochemey/ego/v3/eventstore" "github.com/tochemey/ego/v3/eventstream" - "github.com/tochemey/ego/v3/internal/telemetry" ) var ( @@ -57,10 +54,10 @@ type actor struct { // specifies the current state currentState State - eventsCounter *atomic.Uint64 + eventsCounter uint64 lastCommandTime time.Time - mu sync.RWMutex - eventsStream eventstream.Stream + + eventsStream eventstream.Stream } // enforce compilation error @@ -72,8 +69,6 @@ func newActor(behavior EntityBehavior, eventsStore eventstore.EventsStore, event return &actor{ eventsStore: eventsStore, EntityBehavior: behavior, - eventsCounter: atomic.NewUint64(0), - mu: sync.RWMutex{}, eventsStream: eventsStream, } } @@ -81,16 +76,11 @@ func newActor(behavior EntityBehavior, eventsStore eventstore.EventsStore, event // PreStart pre-starts the actor // At this stage we connect to the various stores func (entity *actor) PreStart(ctx context.Context) error { - spanCtx, span := telemetry.SpanContext(ctx, "PreStart") - defer span.End() - entity.mu.Lock() - defer entity.mu.Unlock() - if entity.eventsStore == nil { return errors.New("events store is not defined") } - if err := entity.eventsStore.Ping(spanCtx); err != nil { + if err := entity.eventsStore.Ping(ctx); err != nil { return fmt.Errorf("failed to connect to the events store: %v", err) } @@ -99,12 +89,6 @@ func (entity *actor) PreStart(ctx context.Context) error { // Receive processes any message dropped into the actor mailbox. func (entity *actor) Receive(ctx *actors.ReceiveContext) { - _, span := telemetry.SpanContext(ctx.Context(), "Receive") - defer span.End() - - entity.mu.Lock() - defer entity.mu.Unlock() - // grab the command sent switch command := ctx.Message().(type) { case *goaktpb.PostStart: @@ -119,23 +103,16 @@ func (entity *actor) Receive(ctx *actors.ReceiveContext) { } // PostStop prepares the actor to gracefully shutdown +// nolint func (entity *actor) PostStop(ctx context.Context) error { - _, span := telemetry.SpanContext(ctx, "PostStop") - defer span.End() - - entity.mu.Lock() - defer entity.mu.Unlock() - + entity.eventsCounter = 0 return nil } // recoverFromSnapshot reset the persistent actor to the latest snapshot in case there is one // this is vital when the entity actor is restarting. func (entity *actor) recoverFromSnapshot(ctx context.Context) error { - spanCtx, span := telemetry.SpanContext(ctx, "RecoverFromSnapshot") - defer span.End() - - event, err := entity.eventsStore.GetLatestEvent(spanCtx, entity.ID()) + event, err := entity.eventsStore.GetLatestEvent(ctx, entity.ID()) if err != nil { return fmt.Errorf("failed to recover the latest journal: %w", err) } @@ -148,7 +125,7 @@ func (entity *actor) recoverFromSnapshot(ctx context.Context) error { } entity.currentState = currentState - entity.eventsCounter.Store(event.GetSequenceNumber()) + entity.eventsCounter = event.GetSequenceNumber() return nil } @@ -209,7 +186,7 @@ func (entity *actor) processCommandAndReply(ctx *actors.ReceiveContext, command StateReply: &egopb.StateReply{ PersistenceId: entity.ID(), State: resultingState, - SequenceNumber: entity.eventsCounter.Load(), + SequenceNumber: entity.eventsCounter, Timestamp: entity.lastCommandTime.Unix(), }, }, @@ -230,7 +207,7 @@ func (entity *actor) processCommandAndReply(ctx *actors.ReceiveContext, command return } - entity.eventsCounter.Inc() + entity.eventsCounter++ entity.currentState = resultingState entity.lastCommandTime = timestamppb.Now().AsTime() @@ -239,7 +216,7 @@ func (entity *actor) processCommandAndReply(ctx *actors.ReceiveContext, command envelope := &egopb.Event{ PersistenceId: entity.ID(), - SequenceNumber: entity.eventsCounter.Load(), + SequenceNumber: entity.eventsCounter, IsDeleted: false, Event: event, ResultingState: state, @@ -272,7 +249,7 @@ func (entity *actor) processCommandAndReply(ctx *actors.ReceiveContext, command StateReply: &egopb.StateReply{ PersistenceId: entity.ID(), State: state, - SequenceNumber: entity.eventsCounter.Load(), + SequenceNumber: entity.eventsCounter, Timestamp: entity.lastCommandTime.Unix(), }, }, diff --git a/actor_test.go b/actor_test.go index af6e5e4..e41ae8f 100644 --- a/actor_test.go +++ b/actor_test.go @@ -37,12 +37,13 @@ import ( "github.com/tochemey/goakt/v2/actors" "github.com/tochemey/goakt/v2/log" - "github.com/tochemey/gopack/postgres" "github.com/tochemey/ego/v3/egopb" "github.com/tochemey/ego/v3/eventstore/memory" pgeventstore "github.com/tochemey/ego/v3/eventstore/postgres" "github.com/tochemey/ego/v3/eventstream" + "github.com/tochemey/ego/v3/internal/lib" + "github.com/tochemey/ego/v3/internal/postgres" testpb "github.com/tochemey/ego/v3/test/data/pb/v3" ) @@ -61,7 +62,7 @@ func TestActor(t *testing.T) { err = actorSystem.Start(ctx) require.NoError(t, err) - time.Sleep(time.Second) + lib.Pause(time.Second) // create the event store eventStore := memory.NewEventsStore() @@ -74,7 +75,7 @@ func TestActor(t *testing.T) { err = eventStore.Connect(ctx) require.NoError(t, err) - time.Sleep(time.Second) + lib.Pause(time.Second) // create an instance of events stream eventStream := eventstream.New() @@ -85,7 +86,7 @@ func TestActor(t *testing.T) { pid, _ := actorSystem.Spawn(ctx, behavior.ID(), actor) require.NotNil(t, pid) - time.Sleep(time.Second) + lib.Pause(time.Second) var command proto.Message @@ -144,7 +145,7 @@ func TestActor(t *testing.T) { err = eventStore.Disconnect(ctx) require.NoError(t, err) - time.Sleep(time.Second) + lib.Pause(time.Second) // close the stream eventStream.Close() @@ -167,7 +168,7 @@ func TestActor(t *testing.T) { err = actorSystem.Start(ctx) require.NoError(t, err) - time.Sleep(time.Second) + lib.Pause(time.Second) // create the event store eventStore := memory.NewEventsStore() @@ -180,7 +181,7 @@ func TestActor(t *testing.T) { err = eventStore.Connect(ctx) require.NoError(t, err) - time.Sleep(time.Second) + lib.Pause(time.Second) // create an instance of events stream eventStream := eventstream.New() @@ -191,7 +192,7 @@ func TestActor(t *testing.T) { pid, _ := actorSystem.Spawn(ctx, behavior.ID(), persistentActor) require.NotNil(t, pid) - time.Sleep(time.Second) + lib.Pause(time.Second) var command proto.Message @@ -240,7 +241,7 @@ func TestActor(t *testing.T) { // close the stream eventStream.Close() - time.Sleep(time.Second) + lib.Pause(time.Second) // stop the actor system err = actorSystem.Stop(ctx) @@ -261,7 +262,7 @@ func TestActor(t *testing.T) { err = actorSystem.Start(ctx) require.NoError(t, err) - time.Sleep(time.Second) + lib.Pause(time.Second) // create the event store eventStore := memory.NewEventsStore() @@ -283,7 +284,7 @@ func TestActor(t *testing.T) { pid, _ := actorSystem.Spawn(ctx, behavior.ID(), persistentActor) require.NotNil(t, pid) - time.Sleep(time.Second) + lib.Pause(time.Second) command := &testpb.TestSend{} // send the command to the actor @@ -313,7 +314,7 @@ func TestActor(t *testing.T) { // create an actor system actorSystem, err := actors.NewActorSystem("TestActorSystem", actors.WithPassivationDisabled(), - actors.WithLogger(log.DefaultLogger), + actors.WithLogger(log.DiscardLogger), actors.WithActorInitMaxRetries(3), ) require.NoError(t, err) @@ -323,7 +324,7 @@ func TestActor(t *testing.T) { err = actorSystem.Start(ctx) require.NoError(t, err) - time.Sleep(time.Second) + lib.Pause(time.Second) // create the event store var ( @@ -339,7 +340,7 @@ func TestActor(t *testing.T) { schemaUtils := pgeventstore.NewSchemaUtils(db) require.NoError(t, schemaUtils.CreateTable(ctx)) - config := &postgres.Config{ + config := &pgeventstore.Config{ DBHost: testContainer.Host(), DBPort: testContainer.Port(), DBName: testDatabase, @@ -350,7 +351,7 @@ func TestActor(t *testing.T) { eventStore := pgeventstore.NewEventsStore(config) require.NoError(t, eventStore.Connect(ctx)) - time.Sleep(time.Second) + lib.Pause(time.Second) // create a persistence id persistenceID := uuid.NewString() @@ -361,7 +362,7 @@ func TestActor(t *testing.T) { err = eventStore.Connect(ctx) require.NoError(t, err) - time.Sleep(time.Second) + lib.Pause(time.Second) // create an instance of event stream eventStream := eventstream.New() @@ -373,7 +374,7 @@ func TestActor(t *testing.T) { require.NoError(t, err) require.NotNil(t, pid) - time.Sleep(time.Second) + lib.Pause(time.Second) var command proto.Message @@ -429,13 +430,13 @@ func TestActor(t *testing.T) { assert.True(t, proto.Equal(expected, resultingState)) // wait a while - time.Sleep(time.Second) + lib.Pause(time.Second) // restart the actor pid, err = actorSystem.ReSpawn(ctx, behavior.ID()) require.NoError(t, err) - time.Sleep(time.Second) + lib.Pause(time.Second) // fetch the current state command = &egopb.GetStateCommand{} @@ -463,7 +464,7 @@ func TestActor(t *testing.T) { // close the stream eventStream.Close() - time.Sleep(time.Second) + lib.Pause(time.Second) err = actorSystem.Stop(ctx) assert.NoError(t, err) @@ -483,7 +484,7 @@ func TestActor(t *testing.T) { err = actorSystem.Start(ctx) require.NoError(t, err) - time.Sleep(time.Second) + lib.Pause(time.Second) // create the event store eventStore := memory.NewEventsStore() @@ -496,7 +497,7 @@ func TestActor(t *testing.T) { err = eventStore.Connect(ctx) require.NoError(t, err) - time.Sleep(time.Second) + lib.Pause(time.Second) // create an instance of event stream eventStream := eventstream.New() @@ -507,7 +508,7 @@ func TestActor(t *testing.T) { pid, _ := actorSystem.Spawn(ctx, behavior.ID(), actor) require.NotNil(t, pid) - time.Sleep(time.Second) + lib.Pause(time.Second) var command proto.Message @@ -590,7 +591,7 @@ func TestActor(t *testing.T) { // close the stream eventStream.Close() - time.Sleep(time.Second) + lib.Pause(time.Second) // stop the actor system err = actorSystem.Stop(ctx) @@ -610,7 +611,7 @@ func TestActor(t *testing.T) { err = actorSystem.Start(ctx) require.NoError(t, err) - time.Sleep(time.Second) + lib.Pause(time.Second) // create the event store eventStore := memory.NewEventsStore() @@ -623,7 +624,7 @@ func TestActor(t *testing.T) { err = eventStore.Connect(ctx) require.NoError(t, err) - time.Sleep(time.Second) + lib.Pause(time.Second) // create an instance of events stream eventStream := eventstream.New() @@ -634,7 +635,7 @@ func TestActor(t *testing.T) { pid, _ := actorSystem.Spawn(ctx, behavior.ID(), actor) require.NotNil(t, pid) - time.Sleep(time.Second) + lib.Pause(time.Second) // send the command to the actor reply, err := actors.Ask(ctx, pid, &testpb.CreateAccount{AccountBalance: 500.00}, 5*time.Second) @@ -673,7 +674,7 @@ func TestActor(t *testing.T) { // disconnect from the event store require.NoError(t, eventStore.Disconnect(ctx)) - time.Sleep(time.Second) + lib.Pause(time.Second) // close the stream eventStream.Close() diff --git a/engine.go b/engine.go index fc75451..fc6149e 100644 --- a/engine.go +++ b/engine.go @@ -38,7 +38,6 @@ import ( "github.com/tochemey/goakt/v2/actors" "github.com/tochemey/goakt/v2/discovery" "github.com/tochemey/goakt/v2/log" - "github.com/tochemey/goakt/v2/telemetry" "github.com/tochemey/ego/v3/egopb" "github.com/tochemey/ego/v3/eventstore" @@ -64,7 +63,6 @@ type Engine struct { actorSystem actors.ActorSystem // actorSystem is the underlying actor system logger log.Logger // logger is the logging engine to use discoveryProvider discovery.Provider // discoveryProvider is the discovery provider for clustering - telemetry *telemetry.Telemetry // telemetry is the observability engine partitionsCount uint64 // partitionsCount specifies the number of partitions started atomic.Bool hostName string @@ -82,8 +80,7 @@ func NewEngine(name string, eventsStore eventstore.EventsStore, opts ...Option) name: name, eventsStore: eventsStore, enableCluster: atomic.NewBool(false), - logger: log.New(log.InfoLevel, os.Stderr), - telemetry: telemetry.New(), + logger: log.New(log.ErrorLevel, os.Stderr), eventStream: eventstream.New(), locker: &sync.Mutex{}, } @@ -103,7 +100,6 @@ func (x *Engine) Start(ctx context.Context) error { actors.WithPassivationDisabled(), actors.WithActorInitMaxRetries(1), actors.WithReplyTimeout(5 * time.Second), - actors.WithTelemetry(x.telemetry), actors.WithSupervisorDirective(actors.NewStopDirective()), } diff --git a/engine_test.go b/engine_test.go index 446f69d..f7e3858 100644 --- a/engine_test.go +++ b/engine_test.go @@ -45,6 +45,7 @@ import ( "github.com/tochemey/ego/v3/egopb" "github.com/tochemey/ego/v3/eventstore/memory" samplepb "github.com/tochemey/ego/v3/example/pbs/sample/pb/v1" + "github.com/tochemey/ego/v3/internal/lib" offsetstore "github.com/tochemey/ego/v3/offsetstore/memory" "github.com/tochemey/ego/v3/projection" ) @@ -81,15 +82,16 @@ func TestEgo(t *testing.T) { provider.EXPECT().Close().Return(nil) // create a projection message handler - handler := projection.NewDiscardHandler(log.DefaultLogger) + handler := projection.NewDiscardHandler(log.DiscardLogger) // create the ego engine engine := NewEngine("Sample", eventStore, + WithLogger(log.DiscardLogger), WithCluster(provider, 4, 1, host, remotingPort, gossipPort, clusterPort)) // start ego engine err := engine.Start(ctx) // wait for the cluster to fully start - time.Sleep(time.Second) + lib.Pause(time.Second) // add projection err = engine.AddProjection(ctx, "discard", handler, offsetStore) @@ -117,7 +119,7 @@ func TestEgo(t *testing.T) { } // wait for the cluster to fully start - time.Sleep(time.Second) + lib.Pause(time.Second) // send the command to the actor. Please don't ignore the error in production grid code resultingState, revision, err := engine.SendCommand(ctx, entityID, command, time.Minute) @@ -169,7 +171,7 @@ func TestEgo(t *testing.T) { // connect to the event store require.NoError(t, eventStore.Connect(ctx)) // create the ego engine - engine := NewEngine("Sample", eventStore) + engine := NewEngine("Sample", eventStore, WithLogger(log.DiscardLogger)) // start ego engine err := engine.Start(ctx) require.NoError(t, err) @@ -222,7 +224,7 @@ func TestEgo(t *testing.T) { require.NoError(t, eventStore.Connect(ctx)) // create the ego engine - engine := NewEngine("Sample", eventStore) + engine := NewEngine("Sample", eventStore, WithLogger(log.DiscardLogger)) // create a persistence id entityID := uuid.NewString() @@ -239,7 +241,7 @@ func TestEgo(t *testing.T) { require.NoError(t, eventStore.Connect(ctx)) // create the ego engine - engine := NewEngine("Sample", eventStore) + engine := NewEngine("Sample", eventStore, WithLogger(log.DiscardLogger)) err := engine.Start(ctx) require.NoError(t, err) @@ -260,7 +262,7 @@ func TestEgo(t *testing.T) { require.NoError(t, eventStore.Connect(ctx)) // create the ego engine - engine := NewEngine("Sample", eventStore) + engine := NewEngine("Sample", eventStore, WithLogger(log.DiscardLogger)) err := engine.Start(ctx) require.NoError(t, err) diff --git a/eventstore/postgres/config.go b/eventstore/postgres/config.go new file mode 100644 index 0000000..d407d6b --- /dev/null +++ b/eventstore/postgres/config.go @@ -0,0 +1,35 @@ +/* + * MIT License + * + * Copyright (c) 2022-2024 Tochemey + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package postgres + +// Config defines the postgres events store configuration +type Config struct { + DBHost string // DBHost represents the database host + DBPort int // DBPort is the database port + DBName string // DBName is the database name + DBUser string // DBUser is the database user used to connect + DBPassword string // DBPassword is the database password + DBSchema string // DBSchema represents the database schema +} diff --git a/eventstore/postgres/helper_test.go b/eventstore/postgres/helper_test.go index b666fcf..8fe971a 100644 --- a/eventstore/postgres/helper_test.go +++ b/eventstore/postgres/helper_test.go @@ -29,7 +29,7 @@ import ( "os" "testing" - "github.com/tochemey/gopack/postgres" + "github.com/tochemey/ego/v3/internal/postgres" ) var testContainer *postgres.TestContainer diff --git a/eventstore/postgres/postgres.go b/eventstore/postgres/postgres.go index 6fa934f..a4c1290 100644 --- a/eventstore/postgres/postgres.go +++ b/eventstore/postgres/postgres.go @@ -26,18 +26,17 @@ package postgres import ( "context" - "database/sql" "errors" "fmt" sq "github.com/Masterminds/squirrel" + "github.com/jackc/pgx/v5" "go.uber.org/atomic" "google.golang.org/protobuf/proto" - "github.com/tochemey/gopack/postgres" - "github.com/tochemey/ego/v3/egopb" "github.com/tochemey/ego/v3/eventstore" + "github.com/tochemey/ego/v3/internal/postgres" ) var ( @@ -76,9 +75,9 @@ type EventsStore struct { var _ eventstore.EventsStore = (*EventsStore)(nil) // NewEventsStore creates a new instance of PostgresEventStore -func NewEventsStore(config *postgres.Config) *EventsStore { +func NewEventsStore(config *Config) *EventsStore { // create the underlying db connection - db := postgres.New(config) + db := postgres.New(postgres.NewConfig(config.DBHost, config.DBPort, config.DBUser, config.DBPassword, config.DBName)) return &EventsStore{ db: db, sb: sq.StatementBuilder.PlaceholderFormat(sq.Dollar), @@ -197,7 +196,7 @@ func (s *EventsStore) WriteEvents(ctx context.Context, events []*egopb.Event) er } // let us begin a database transaction to make sure we atomically write those events into the database - tx, err := s.db.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelReadCommitted}) + tx, err := s.db.BeginTx(ctx, pgx.TxOptions{IsoLevel: pgx.ReadCommitted}) // return the error in case we are unable to get a database transaction if err != nil { return fmt.Errorf("failed to obtain a database transaction: %w", err) @@ -242,10 +241,10 @@ func (s *EventsStore) WriteEvents(ctx context.Context, events []*egopb.Event) er return fmt.Errorf("unable to build sql insert statement: %w", err) } // insert into the table - _, execErr := tx.ExecContext(ctx, query, args...) + _, execErr := tx.Exec(ctx, query, args...) if execErr != nil { // attempt to roll back the transaction and log the error in case there is an error - if err = tx.Rollback(); err != nil { + if err = tx.Rollback(ctx); err != nil { return fmt.Errorf("unable to rollback db transaction: %w", err) } // return the main error @@ -258,7 +257,7 @@ func (s *EventsStore) WriteEvents(ctx context.Context, events []*egopb.Event) er } // commit the transaction - if commitErr := tx.Commit(); commitErr != nil { + if commitErr := tx.Commit(ctx); commitErr != nil { // return the commit error in case there is one return fmt.Errorf("failed to record events: %w", commitErr) } diff --git a/eventstore/postgres/postgres_test.go b/eventstore/postgres/postgres_test.go index 82ddcb5..b7b513a 100644 --- a/eventstore/postgres/postgres_test.go +++ b/eventstore/postgres/postgres_test.go @@ -35,8 +35,6 @@ import ( "google.golang.org/protobuf/types/known/anypb" "google.golang.org/protobuf/types/known/timestamppb" - "github.com/tochemey/gopack/postgres" - "github.com/tochemey/ego/v3/egopb" "github.com/tochemey/ego/v3/eventstore" testpb "github.com/tochemey/ego/v3/test/data/pb/v3" @@ -44,13 +42,13 @@ import ( func TestPostgresEventsStore(t *testing.T) { t.Run("testNewEventsStore", func(t *testing.T) { - config := &postgres.Config{ - DBHost: testContainer.Host(), - DBPort: testContainer.Port(), - DBName: testDatabase, - DBUser: testUser, - DBPassword: testDatabasePassword, - DBSchema: testContainer.Schema(), + config := &Config{ + testContainer.Host(), + testContainer.Port(), + testDatabase, + testUser, + testDatabasePassword, + testContainer.Schema(), } estore := NewEventsStore(config) @@ -61,7 +59,7 @@ func TestPostgresEventsStore(t *testing.T) { }) t.Run("testConnect:happy path", func(t *testing.T) { ctx := context.TODO() - config := &postgres.Config{ + config := &Config{ DBHost: testContainer.Host(), DBPort: testContainer.Port(), DBName: testDatabase, @@ -79,7 +77,7 @@ func TestPostgresEventsStore(t *testing.T) { }) t.Run("testConnect:database does not exist", func(t *testing.T) { ctx := context.TODO() - config := &postgres.Config{ + config := &Config{ DBHost: testContainer.Host(), DBPort: testContainer.Port(), DBName: "testDatabase", @@ -92,11 +90,10 @@ func TestPostgresEventsStore(t *testing.T) { assert.NotNil(t, store) err := store.Connect(ctx) assert.Error(t, err) - assert.EqualError(t, err, "failed to ping database connection: pq: database \"testDatabase\" does not exist") }) t.Run("testWriteAndReplayEvents", func(t *testing.T) { ctx := context.TODO() - config := &postgres.Config{ + config := &Config{ DBHost: testContainer.Host(), DBPort: testContainer.Port(), DBName: testDatabase, @@ -184,13 +181,13 @@ func TestPostgresEventsStore(t *testing.T) { }) t.Run("testGetLatestEvent", func(t *testing.T) { ctx := context.TODO() - config := &postgres.Config{ - DBHost: testContainer.Host(), - DBPort: testContainer.Port(), - DBName: testDatabase, - DBUser: testUser, - DBPassword: testDatabasePassword, - DBSchema: testContainer.Schema(), + config := &Config{ + testContainer.Host(), + testContainer.Port(), + testDatabase, + testUser, + testDatabasePassword, + testContainer.Schema(), } store := NewEventsStore(config) @@ -259,13 +256,13 @@ func TestPostgresEventsStore(t *testing.T) { }) t.Run("testDeleteEvents", func(t *testing.T) { ctx := context.TODO() - config := &postgres.Config{ - DBHost: testContainer.Host(), - DBPort: testContainer.Port(), - DBName: testDatabase, - DBUser: testUser, - DBPassword: testDatabasePassword, - DBSchema: testContainer.Schema(), + config := &Config{ + testContainer.Host(), + testContainer.Port(), + testDatabase, + testUser, + testDatabasePassword, + testContainer.Schema(), } store := NewEventsStore(config) @@ -341,13 +338,13 @@ func TestPostgresEventsStore(t *testing.T) { }) t.Run("testShardNumbers", func(t *testing.T) { ctx := context.TODO() - config := &postgres.Config{ - DBHost: testContainer.Host(), - DBPort: testContainer.Port(), - DBName: testDatabase, - DBUser: testUser, - DBPassword: testDatabasePassword, - DBSchema: testContainer.Schema(), + config := &Config{ + testContainer.Host(), + testContainer.Port(), + testDatabase, + testUser, + testDatabasePassword, + testContainer.Schema(), } store := NewEventsStore(config) diff --git a/eventstore/postgres/schema_utils.go b/eventstore/postgres/schema_utils.go index a9bf1a8..9f8fc4b 100644 --- a/eventstore/postgres/schema_utils.go +++ b/eventstore/postgres/schema_utils.go @@ -27,7 +27,7 @@ package postgres import ( "context" - "github.com/tochemey/gopack/postgres" + "github.com/tochemey/ego/v3/internal/postgres" ) // SchemaUtils help create the various test tables in unit/integration tests diff --git a/example/main.go b/example/main.go index 498661c..a500c33 100644 --- a/example/main.go +++ b/example/main.go @@ -69,7 +69,7 @@ func main() { // send the command to the actor. Please don't ignore the error in production grid code reply, _, _ := engine.SendCommand(ctx, entityID, command, time.Minute) account := reply.(*samplepb.Account) - log.Printf("current balance: %v", account.GetAccountBalance()) + log.Printf("current balance on opening: %v", account.GetAccountBalance()) // send another command to credit the balance command = &samplepb.CreditAccount{ @@ -79,7 +79,7 @@ func main() { reply, _, _ = engine.SendCommand(ctx, entityID, command, time.Minute) account = reply.(*samplepb.Account) - log.Printf("current balance: %v", account.GetAccountBalance()) + log.Printf("current balance after a credit of 250: %v", account.GetAccountBalance()) // capture ctrl+c interruptSignal := make(chan os.Signal, 1) diff --git a/go.mod b/go.mod index 7d58acf..de7f393 100644 --- a/go.mod +++ b/go.mod @@ -6,14 +6,16 @@ require ( github.com/Masterminds/squirrel v1.5.4 github.com/deckarep/golang-set/v2 v2.6.0 github.com/flowchartsman/retry v1.2.0 + github.com/georgysavva/scany/v2 v2.1.3 github.com/google/uuid v1.6.0 github.com/hashicorp/go-memdb v1.3.4 + github.com/jackc/pgx/v5 v5.7.1 + github.com/lib/pq v1.10.9 + github.com/ory/dockertest/v3 v3.11.0 + github.com/pkg/errors v0.9.1 github.com/stretchr/testify v1.9.0 - github.com/tochemey/goakt/v2 v2.6.2 - github.com/tochemey/gopack v0.0.0-20240704194040-eaa380774969 + github.com/tochemey/goakt/v2 v2.7.0 github.com/travisjeffery/go-dynaport v1.0.0 - go.opentelemetry.io/otel v1.30.0 - go.opentelemetry.io/otel/trace v1.30.0 go.uber.org/atomic v1.11.0 go.uber.org/goleak v1.3.0 golang.org/x/sync v0.8.0 @@ -22,28 +24,27 @@ require ( require ( connectrpc.com/connect v1.17.0 // indirect - connectrpc.com/otelconnect v0.7.1 // indirect + dario.cat/mergo v1.0.0 // indirect github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 // indirect github.com/Microsoft/go-winio v0.6.2 // indirect github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 // indirect github.com/RoaringBitmap/roaring v1.9.4 // indirect - github.com/XSAM/otelsql v0.32.0 // indirect github.com/armon/go-metrics v0.4.1 // indirect github.com/bits-and-blooms/bitset v1.14.3 // indirect github.com/buraksezer/consistent v0.10.0 // indirect github.com/buraksezer/olric v0.5.6-0.20240510193155-81e12546eb39 // indirect - github.com/cenkalti/backoff v2.2.1+incompatible // indirect + github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/containerd/continuity v0.4.3 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/docker/cli v26.1.4+incompatible // indirect + github.com/docker/docker v27.1.1+incompatible // indirect github.com/docker/go-connections v0.5.0 // indirect github.com/docker/go-units v0.5.0 // indirect github.com/emicklei/go-restful/v3 v3.12.1 // indirect github.com/fxamacker/cbor/v2 v2.7.0 // indirect - github.com/georgysavva/scany/v2 v2.1.3 // indirect github.com/go-logr/logr v1.4.2 // indirect - github.com/go-logr/stdr v1.2.2 // indirect github.com/go-openapi/jsonpointer v0.21.0 // indirect github.com/go-openapi/jsonreference v0.21.0 // indirect github.com/go-openapi/swag v0.23.0 // indirect @@ -53,34 +54,38 @@ require ( github.com/google/gnostic-models v0.6.8 // indirect github.com/google/go-cmp v0.6.0 // indirect github.com/google/gofuzz v1.2.0 // indirect + github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-immutable-radix v1.3.1 // indirect github.com/hashicorp/go-msgpack/v2 v2.1.2 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect - github.com/hashicorp/go-sockaddr v1.0.6 // indirect + github.com/hashicorp/go-sockaddr v1.0.7 // indirect github.com/hashicorp/golang-lru v1.0.2 // indirect github.com/hashicorp/logutils v1.0.0 // indirect github.com/hashicorp/memberlist v0.5.1 // indirect + github.com/jackc/pgpassfile v1.0.0 // indirect + github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect + github.com/jackc/puddle/v2 v2.2.2 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/cpuid/v2 v2.2.8 // indirect github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 // indirect github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 // indirect - github.com/lib/pq v1.10.9 // indirect github.com/mailru/easyjson v0.7.7 // indirect github.com/miekg/dns v1.1.62 // indirect + github.com/mitchellh/mapstructure v1.5.0 // indirect + github.com/moby/docker-image-spec v1.3.1 // indirect + github.com/moby/term v0.5.0 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/mschoch/smat v0.2.0 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.1.0 // indirect - github.com/opencontainers/runc v1.1.13 // indirect - github.com/ory/dockertest v3.3.5+incompatible // indirect - github.com/pkg/errors v0.9.1 // indirect + github.com/opencontainers/runc v1.1.14 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/redis/go-redis/v9 v9.6.1 // indirect - github.com/reugn/go-quartz v0.12.0 // indirect + github.com/reugn/go-quartz v0.13.0 // indirect github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 // indirect github.com/sirupsen/logrus v1.9.3 // indirect github.com/stretchr/objx v0.5.2 // indirect @@ -90,10 +95,13 @@ require ( github.com/vmihailenco/msgpack/v5 v5.4.1 // indirect github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect github.com/x448/float16 v0.8.4 // indirect + github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb // indirect + github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect + github.com/xeipuuv/gojsonschema v1.2.0 // indirect github.com/zeebo/xxh3 v1.0.2 // indirect - go.opentelemetry.io/otel/metric v1.30.0 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.27.0 // indirect + golang.org/x/crypto v0.27.0 // indirect golang.org/x/mod v0.21.0 // indirect golang.org/x/net v0.29.0 // indirect golang.org/x/oauth2 v0.23.0 // indirect diff --git a/go.sum b/go.sum index aa6117b..538065e 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,9 @@ connectrpc.com/connect v1.17.0 h1:W0ZqMhtVzn9Zhn2yATuUokDLO5N+gIuBWMOnsQrfmZk= connectrpc.com/connect v1.17.0/go.mod h1:0292hj1rnx8oFrStN7cB4jjVBeqs+Yx5yDIC2prWDO8= -connectrpc.com/otelconnect v0.7.1 h1:scO5pOb0i4yUE66CnNrHeK1x51yq0bE0ehPg6WvzXJY= -connectrpc.com/otelconnect v0.7.1/go.mod h1:dh3bFgHBTb2bkqGCeVVOtHJreSns7uu9wwL2Tbz17ms= +dario.cat/mergo v1.0.0 h1:AGCNq9Evsj31mOgNPcLyXc+4PNABt905YmuqPYYpBWk= +dario.cat/mergo v1.0.0/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk= +filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA= +filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4= github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 h1:L/gRVlceqvL25UVaW/CKtUDjefjrs0SPonmDGUVOYP0= github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E= github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= @@ -17,8 +19,6 @@ github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5/go.mod h1:lmUJ/7eu/Q8 github.com/RoaringBitmap/roaring v1.6.0/go.mod h1:plvDsJQpxOC5bw8LRteu/MLWHsHez/3y6cubLI4/1yE= github.com/RoaringBitmap/roaring v1.9.4 h1:yhEIoH4YezLYT04s1nHehNO64EKFTop/wBhxv2QzDdQ= github.com/RoaringBitmap/roaring v1.9.4/go.mod h1:6AXUsoIEzDTFFQCe1RbGA6uFONMhvejWj5rqITANK90= -github.com/XSAM/otelsql v0.32.0 h1:vDRE4nole0iOOlTaC/Bn6ti7VowzgxK39n3Ll1Kt7i0= -github.com/XSAM/otelsql v0.32.0/go.mod h1:Ary0hlyVBbaSwo8atZB8Aoothg9s/LBJj/N/p5qDmLM= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= @@ -43,8 +43,8 @@ github.com/buraksezer/consistent v0.10.0 h1:hqBgz1PvNLC5rkWcEBVAL9dFMBWz6I0VgUCW github.com/buraksezer/consistent v0.10.0/go.mod h1:6BrVajWq7wbKZlTOUPs/XVfR8c0maujuPowduSpZqmw= github.com/buraksezer/olric v0.5.6-0.20240510193155-81e12546eb39 h1:ZB2KxErCucUiYTz9gZlujrurj4WcOe+j4t1sB1okuGw= github.com/buraksezer/olric v0.5.6-0.20240510193155-81e12546eb39/go.mod h1:b1aCKANfqosRKIeYR5NPETD8/VnxcVdlbYGMZ97GPNA= -github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4= -github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= +github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= +github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= @@ -55,6 +55,8 @@ github.com/cockroachdb/cockroach-go/v2 v2.2.0 h1:/5znzg5n373N/3ESjHF5SMLxiW4RKB0 github.com/cockroachdb/cockroach-go/v2 v2.2.0/go.mod h1:u3MiKYGupPPjkn3ozknpMUpxPaNLTFWAya419/zv6eI= github.com/containerd/continuity v0.4.3 h1:6HVkalIp+2u1ZLH1J/pYX2oBVXlJZvh1X1A7bEZ9Su8= github.com/containerd/continuity v0.4.3/go.mod h1:F6PTNCKepoxEaXLQp3wDAjygEnImnZ/7o4JzpodfroQ= +github.com/creack/pty v1.1.18 h1:n56/Zwd5o6whRC5PMGretI4IdRLlmBXYNjScPaBgsbY= +github.com/creack/pty v1.1.18/go.mod h1:MOBLtS5ELjhRRrroQr9kyvTxUAFNvYEK993ew/Vr4O4= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= @@ -63,6 +65,10 @@ github.com/deckarep/golang-set/v2 v2.6.0 h1:XfcQbWM1LlMB8BsJ8N9vW5ehnnPVIw0je80N github.com/deckarep/golang-set/v2 v2.6.0/go.mod h1:VAky9rY/yGXJOLEDv3OMci+7wtDpOF4IN+y82NBOac4= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/docker/cli v26.1.4+incompatible h1:I8PHdc0MtxEADqYJZvhBrW9bo8gawKwwenxRM7/rLu8= +github.com/docker/cli v26.1.4+incompatible/go.mod h1:JLrzqnKDaYBop7H2jaqPtU4hHvMKP+vjCwu2uszcLI8= +github.com/docker/docker v27.1.1+incompatible h1:hO/M4MtV36kzKldqnA37IWhebRA+LnqqcqDja6kVaKY= +github.com/docker/docker v27.1.1+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= github.com/docker/go-connections v0.5.0 h1:USnMq7hx7gwdVZq1L49hLXaFtUdTADjXGp+uj1Br63c= github.com/docker/go-connections v0.5.0/go.mod h1:ov60Kzw0kKElRwhNs9UlUHAE/F9Fe6GLaXnqyDdmEXc= github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4= @@ -80,17 +86,16 @@ github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2 github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= -github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= -github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= -github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-openapi/jsonpointer v0.21.0 h1:YgdVicSA9vH5RiHs9TZW5oyafXZFc6+2Vc1rr/O9oNQ= github.com/go-openapi/jsonpointer v0.21.0/go.mod h1:IUyH9l/+uyhIYQ/PXVA41Rexl+kOkAPDdXEYns6fzUY= github.com/go-openapi/jsonreference v0.21.0 h1:Rs+Y7hSXT83Jacb7kFyjn4ijOuVGSvOdF2+tg1TRrwQ= github.com/go-openapi/jsonreference v0.21.0/go.mod h1:LmZmgsrTkVg9LG4EaHeY8cBDslNPMo06cago5JNLkm4= github.com/go-openapi/swag v0.23.0 h1:vsEVJDUo2hPJ2tu0/Xc+4noaxyEffXNIs3cOULZ+GrE= github.com/go-openapi/swag v0.23.0/go.mod h1:esZ8ITTYEsH1V2trKHjAN8Ai7xHb8RV+YSZ577vPjgQ= +github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y= +github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI= github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8= @@ -99,10 +104,6 @@ github.com/gofrs/flock v0.8.1/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14j github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= -github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9 h1:au07oEsX2xN0ktxqI+Sida1w446QrXBRJ0nee3SNZlA= -github.com/golang-sql/civil v0.0.0-20220223132316-b832511892a9/go.mod h1:8vg3r2VgvsThLBIFL93Qb5yWzgyZWhEmBwUJWevAkK0= -github.com/golang-sql/sqlexp v0.1.0 h1:ZCD6MBpcuOVfGVqsEmY5/4FtYiKz6tSyUv9LPEDei6A= -github.com/golang-sql/sqlexp v0.1.0/go.mod h1:J4ad9Vo8ZCWQ2GMrC4UCQy1JpCbwU9m3EOqtpKwwwHI= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -123,12 +124,12 @@ github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/pprof v0.0.0-20240727154555-813a5fbdbec8 h1:FKHo8hFI3A+7w0aUQuYXQ+6EN5stWmeY/AZqtM8xk9k= github.com/google/pprof v0.0.0-20240727154555-813a5fbdbec8/go.mod h1:K1liHPHnj73Fdn/EKuT8nrFqBihUSKXoLYU0BuatOYo= +github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 h1:El6M4kTTCOh6aBiKaUGG7oYTSPP8MxqL4YI3kZKwcP4= +github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510/go.mod h1:pupxD2MaaD3pAXIBCelhxNneeOaAeabZDe5s4K6zSpQ= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/gotestyourself/gotestyourself v2.2.0+incompatible h1:AQwinXlbQR2HvPjQZOmDhRqsv5mZf+Jb1RnSLxcqZcI= -github.com/gotestyourself/gotestyourself v2.2.0+incompatible/go.mod h1:zZKM6oeNM8k+FRljX1mnzVYeS8wiGgQyvST1/GafPbY= github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= @@ -148,8 +149,8 @@ github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9 github.com/hashicorp/go-retryablehttp v0.5.3/go.mod h1:9B5zBasrRhHXnJnui7y6sL7es7NDiJgTc6Er0maI1Xs= github.com/hashicorp/go-sockaddr v1.0.0/go.mod h1:7Xibr9yA9JjQq1JpNB2Vw7kxv8xerXegt+ozgdvDeDU= github.com/hashicorp/go-sockaddr v1.0.5/go.mod h1:uoUUmtwU7n9Dv3O4SNLeFvg0SxQ3lyjsj6+CCykpaxI= -github.com/hashicorp/go-sockaddr v1.0.6 h1:RSG8rKU28VTUTvEKghe5gIhIQpv8evvNpnDEyqO4u9I= -github.com/hashicorp/go-sockaddr v1.0.6/go.mod h1:uoUUmtwU7n9Dv3O4SNLeFvg0SxQ3lyjsj6+CCykpaxI= +github.com/hashicorp/go-sockaddr v1.0.7 h1:G+pTkSO01HpR5qCxg7lxfsFEZaG+C0VssTy/9dbT+Fw= +github.com/hashicorp/go-sockaddr v1.0.7/go.mod h1:FZQbEYa1pxkQ7WLpyXJ6cbjpT8q0YgQaK/JakXqGyWw= github.com/hashicorp/go-uuid v1.0.0 h1:RS8zrF7PhGwyNPOtxSClXXj9HA8feRnJzgnI1RJCSnM= github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= @@ -166,12 +167,12 @@ github.com/huandu/xstrings v1.3.2/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq github.com/imdario/mergo v0.3.11/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= -github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b h1:C8S2+VttkHFdOOCXJe+YGfa4vHYwlt4Zx+IVXQ97jYg= -github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b/go.mod h1:vsD4gTJCa9TptPL8sPkXrLZ+hDuNrZCnj29CQpr4X1E= -github.com/jackc/pgx/v5 v5.0.0 h1:3UdmB3yUeTnJtZ+nDv3Mxzd4GHHvHkl9XN3oboIbOrY= -github.com/jackc/pgx/v5 v5.0.0/go.mod h1:JBbvW3Hdw77jKl9uJrEDATUZIFM2VFPzRq4RWIhkF4o= -github.com/jackc/puddle/v2 v2.0.0 h1:Kwk/AlLigcnZsDssc3Zun1dk1tAtQNPaBBxBHWn0Mjc= -github.com/jackc/puddle/v2 v2.0.0/go.mod h1:itE7ZJY8xnoo0JqJEpSMprN0f+NQkMCuEV/N9j8h0oc= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgx/v5 v5.7.1 h1:x7SYsPBYDkHDksogeSmZZ5xzThcTgRz++I5E+ePFUcs= +github.com/jackc/pgx/v5 v5.7.1/go.mod h1:e7O26IywZZ+naJtWWos6i6fvWK+29etgITqrqHLfoZA= +github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= +github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= @@ -205,8 +206,6 @@ github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJ github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= -github.com/microsoft/go-mssqldb v1.6.0 h1:mM3gYdVwEPFrlg/Dvr2DNVEgYFG7L42l+dGc67NNNpc= -github.com/microsoft/go-mssqldb v1.6.0/go.mod h1:00mDtPbeQCRGC1HwOOR5K/gr30P1NcEG0vx6Kbv2aJU= github.com/miekg/dns v1.1.26/go.mod h1:bPDLeHnStXmXAq1m/Ch/hvfNHr14JKNPMBo3VZKjuso= github.com/miekg/dns v1.1.45/go.mod h1:e3IlAVfNqAllflbibAZEWOXOQ+Ynzk/dDozDxY7XnME= github.com/miekg/dns v1.1.62 h1:cN8OuEF1/x5Rq6Np+h1epln8OiyPWV+lROx9LxcGgIQ= @@ -216,7 +215,13 @@ github.com/minio/highwayhash v1.0.3/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0 github.com/mitchellh/cli v1.1.5/go.mod h1:v8+iFts2sPIKUV1ltktPXMCC8fumSKFItNcD2cLtRR4= github.com/mitchellh/copystructure v1.0.0/go.mod h1:SNtv71yrdKgLRyLFxmLdkAbkKEFWgYaq1OVrnRcwhnw= github.com/mitchellh/go-wordwrap v1.0.1/go.mod h1:R62XHJLzvMFRBbcrT7m7WgmE1eOyTSsCt+hzestvNj0= +github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= +github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/mitchellh/reflectwalk v1.0.0/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw= +github.com/moby/docker-image-spec v1.3.1 h1:jMKff3w6PgbfSa69GfNg+zN/XLhfXJGnEx3Nl2EsFP0= +github.com/moby/docker-image-spec v1.3.1/go.mod h1:eKmb5VW8vQEh/BAr2yvVNvuiJuY6UIocYsFu/DxxRpo= +github.com/moby/term v0.5.0 h1:xt8Q1nalod/v7BqbG21f8mQPqH+xAaC9C3N3wfWbVP0= +github.com/moby/term v0.5.0/go.mod h1:8FzsFHVUBGZdbDsJw/ot+X+d5HLUbvklYLJ9uGfcI3Y= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= @@ -231,8 +236,8 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8m github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/nats-io/jwt/v2 v2.6.0 h1:yXoBTdEotZw3NujMT+Nnu1UPNlFWdKQ3d0JJF/+pJag= github.com/nats-io/jwt/v2 v2.6.0/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A= -github.com/nats-io/nats-server/v2 v2.10.20 h1:CXDTYNHeBiAKBTAIP2gjpgbWap2GhATnTLgP8etyvEI= -github.com/nats-io/nats-server/v2 v2.10.20/go.mod h1:hgcPnoUtMfxz1qVOvLZGurVypQ+Cg6GXVXjG53iHk+M= +github.com/nats-io/nats-server/v2 v2.10.21 h1:gfG6T06wBdI25XyY2IsauarOc2srWoFxxfsOKjrzoRA= +github.com/nats-io/nats-server/v2 v2.10.21/go.mod h1:I1YxSAEWbXCfy0bthwvNb5X43WwIWMz7gx5ZVPDr5Rc= github.com/nats-io/nats.go v1.37.0 h1:07rauXbVnnJvv1gfIyghFEo6lUcYRY0WXc3x7x0vUxE= github.com/nats-io/nats.go v1.37.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= @@ -247,10 +252,10 @@ github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8 github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= github.com/opencontainers/image-spec v1.1.0 h1:8SG7/vwALn54lVB/0yZ/MMwhFrPYtpEHQb2IpWsCzug= github.com/opencontainers/image-spec v1.1.0/go.mod h1:W4s4sFTMaBeK1BQLXbG4AdM2szdn85PY75RI83NrTrM= -github.com/opencontainers/runc v1.1.13 h1:98S2srgG9vw0zWcDpFMn5TRrh8kLxa/5OFUstuUhmRs= -github.com/opencontainers/runc v1.1.13/go.mod h1:R016aXacfp/gwQBYw2FDGa9m+n6atbLWrYY8hNMT/sA= -github.com/ory/dockertest v3.3.5+incompatible h1:iLLK6SQwIhcbrG783Dghaaa3WPzGc+4Emza6EbVUUGA= -github.com/ory/dockertest v3.3.5+incompatible/go.mod h1:1vX4m9wsvi00u5bseYwXaSnhNrne+V0E6LAcBILJdPs= +github.com/opencontainers/runc v1.1.14 h1:rgSuzbmgz5DUJjeSnw337TxDbRuqjs6iqQck/2weR6w= +github.com/opencontainers/runc v1.1.14/go.mod h1:E4C2z+7BxR7GHXp0hAY53mek+x49X1LjPNeMTfRGvOA= +github.com/ory/dockertest/v3 v3.11.0 h1:OiHcxKAvSDUwsEVh2BjxQQc/5EHz9n0va9awCtNGuyA= +github.com/ory/dockertest/v3 v3.11.0/go.mod h1:VIPxS1gwT9NpPOrfD3rACs8Y9Z7yhzO4SB194iUDnUI= github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY= github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= @@ -276,8 +281,8 @@ github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+Gx github.com/redis/go-redis/v9 v9.2.1/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M= github.com/redis/go-redis/v9 v9.6.1 h1:HHDteefn6ZkTtY5fGUE8tj8uy85AHk6zP7CpzIAM0y4= github.com/redis/go-redis/v9 v9.6.1/go.mod h1:0C0c6ycQsdpVNQpxb1njEQIqkx5UcsM8FJCQLgE9+RA= -github.com/reugn/go-quartz v0.12.0 h1:RsrklW++R5Swc7mCPYseXM06PTWN4N7/f1rsYkhHiww= -github.com/reugn/go-quartz v0.12.0/go.mod h1:no4ktgYbAAuY0E1SchR8cTx1LF4jYIzdgaQhzRPSkpk= +github.com/reugn/go-quartz v0.13.0 h1:0eMxvj28Qu1npIDdN9Mzg9hwyksGH6XJt4Cz0QB8EUk= +github.com/reugn/go-quartz v0.13.0/go.mod h1:0ghKksELp8MJ4h84T203aTHRF3Kug5BrxEW3ErBvhzY= github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= github.com/ryanuber/columnize v2.1.2+incompatible/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= @@ -315,10 +320,8 @@ github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM= github.com/tidwall/redcon v1.6.2 h1:5qfvrrybgtO85jnhSravmkZyC0D+7WstbfCs3MmPhow= github.com/tidwall/redcon v1.6.2/go.mod h1:p5Wbsgeyi2VSTBWOcA5vRXrOb9arFTcU2+ZzFjqV75Y= -github.com/tochemey/goakt/v2 v2.6.2 h1:GbD+aR4h2xUivKCdxMIsOkVe7/Pvdbfc371/4iiHF5s= -github.com/tochemey/goakt/v2 v2.6.2/go.mod h1:uuJtrNQ3OgWZRM5uARRFMSv7U1g+crDKfm/v1ASkz0w= -github.com/tochemey/gopack v0.0.0-20240704194040-eaa380774969 h1:SDUoPcLRz28UitaOlMjOTC2T6XYBqx3qWDLDyA0jX3k= -github.com/tochemey/gopack v0.0.0-20240704194040-eaa380774969/go.mod h1:3Qt0XoTLDsLzcaMXs7tYXd+iKOehoWqykWMQNBAwTdA= +github.com/tochemey/goakt/v2 v2.7.0 h1:LTQ4ILFJTbl5Y3KmIWrlSwjojfWlDl1CUbGLXn/6P7w= +github.com/tochemey/goakt/v2 v2.7.0/go.mod h1:xrSs/irYSoD0pBDjvDs6LEmRPkCM6If9qqUzXKNN94c= github.com/travisjeffery/go-dynaport v1.0.0 h1:m/qqf5AHgB96CMMSworIPyo1i7NZueRsnwdzdCJ8Ajw= github.com/travisjeffery/go-dynaport v1.0.0/go.mod h1:0LHuDS4QAx+mAc4ri3WkQdavgVoBIZ7cE9ob17KIAJk= github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= @@ -328,6 +331,13 @@ github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAh github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds= github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg= +github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU= +github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb h1:zGWFAtiMcyryUHoUjUJX0/lt1H2+i2Ka2n+D3DImSNo= +github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU= +github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 h1:EzJWgHovont7NscjpAxXsDA8S8BMYve8Y5+7cuRE7R0= +github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ= +github.com/xeipuuv/gojsonschema v1.2.0 h1:LhYJRs+L4fBtjZUfuSZIKGeVu0QRy8e5Xi7D17UxZ74= +github.com/xeipuuv/gojsonschema v1.2.0/go.mod h1:anYRn/JVcOK2ZgGU+IjEV4nwlhoK5sQluxsYJ78Id3Y= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= @@ -335,16 +345,6 @@ github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ= github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0= github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0= github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA= -go.opentelemetry.io/otel v1.30.0 h1:F2t8sK4qf1fAmY9ua4ohFS/K+FUuOPemHUIXHtktrts= -go.opentelemetry.io/otel v1.30.0/go.mod h1:tFw4Br9b7fOS+uEao81PJjVMjW/5fvNCbpsDIXqP0pc= -go.opentelemetry.io/otel/metric v1.30.0 h1:4xNulvn9gjzo4hjg+wzIKG7iNFEaBMX00Qd4QIZs7+w= -go.opentelemetry.io/otel/metric v1.30.0/go.mod h1:aXTfST94tswhWEb+5QjlSqG+cZlmyXy/u8jFpor3WqQ= -go.opentelemetry.io/otel/sdk v1.30.0 h1:cHdik6irO49R5IysVhdn8oaiR9m8XluDaJAs4DfOrYE= -go.opentelemetry.io/otel/sdk v1.30.0/go.mod h1:p14X4Ok8S+sygzblytT1nqG98QG2KYKv++HE0LY/mhg= -go.opentelemetry.io/otel/sdk/metric v1.30.0 h1:QJLT8Pe11jyHBHfSAgYH7kEmT24eX792jZO1bo4BXkM= -go.opentelemetry.io/otel/sdk/metric v1.30.0/go.mod h1:waS6P3YqFNzeP01kuo/MBBYqaoBJl7efRQHOaydhy1Y= -go.opentelemetry.io/otel/trace v1.30.0 h1:7UBkkYzeg3C7kQX8VAidWh2biiQbtAKjyIML8dQ9wmc= -go.opentelemetry.io/otel/trace v1.30.0/go.mod h1:5EyKqTzzmyqB9bwtCCq6pDLktPK6fmGf/Dph+8VI02o= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= @@ -454,8 +454,8 @@ gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo= -gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw= +gotest.tools/v3 v3.5.1 h1:EENdUnS3pdur5nybKYIh2Vfgc8IUNBjxDPSjtiJcOzU= +gotest.tools/v3 v3.5.1/go.mod h1:isy3WKz7GK6uNw/sbHzfKBLvlvXwUyV06n6brMxxopU= k8s.io/api v0.31.1 h1:Xe1hX/fPW3PXYYv8BlozYqw63ytA92snr96zMW9gWTU= k8s.io/api v0.31.1/go.mod h1:sbN1g6eY6XVLeqNsZGLnI5FwVseTrZX7Fv3O26rhAaI= k8s.io/apimachinery v0.31.1 h1:mhcUBbj7KUjaVhyXILglcVjuS4nYXiwC+KKFBgIVy7U= diff --git a/internal/telemetry/span_context.go b/internal/lib/lib.go similarity index 76% rename from internal/telemetry/span_context.go rename to internal/lib/lib.go index a18fe96..5b0117c 100644 --- a/internal/telemetry/span_context.go +++ b/internal/lib/lib.go @@ -1,7 +1,7 @@ /* * MIT License * - * Copyright (c) 2022-2024 Tochemey + * Copyright (c) 2024 Tochemey * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -22,18 +22,18 @@ * SOFTWARE. */ -package telemetry +package lib import ( - "context" - - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/trace" + "time" ) -func SpanContext(ctx context.Context, methodName string) (context.Context, trace.Span) { - // Create a span - tracer := otel.GetTracerProvider() - spanCtx, span := tracer.Tracer("").Start(ctx, methodName) - return spanCtx, span +// Pause pauses the running process for some time period +func Pause(duration time.Duration) { + stopCh := make(chan struct{}, 1) + timer := time.AfterFunc(duration, func() { + stopCh <- struct{}{} + }) + <-stopCh + timer.Stop() } diff --git a/internal/postgres/config.go b/internal/postgres/config.go new file mode 100644 index 0000000..c4282b9 --- /dev/null +++ b/internal/postgres/config.go @@ -0,0 +1,60 @@ +/* + * MIT License + * + * Copyright (c) 2022-2024 Tochemey + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package postgres + +import "time" + +// Config defines the postgres configuration +// This configuration does not take into consideration the SSL mode +// TODO: enhance with SSL mode +type Config struct { + DBHost string // DBHost represents the database host + DBPort int // DBPort is the database port + DBName string // DBName is the database name + DBUser string // DBUser is the database user used to connect + DBPassword string // DBPassword is the database password + DBSchema string // DBSchema represents the database schema + MaxConnections int // MaxConnections represents the number of max connections in the pool + MinConnections int // MinConnections represents the number of minimum connections in the pool + MaxConnectionLifetime time.Duration // MaxConnectionLifetime represents the duration since creation after which a connection will be automatically closed. + MaxConnIdleTime time.Duration // MaxConnIdleTime is the duration after which an idle connection will be automatically closed by the health check. + HealthCheckPeriod time.Duration // HeathCheckPeriod is the duration between checks of the health of idle connections. +} + +// NewConfig creates an instance of Config +func NewConfig(host string, port int, user, password, dbName string) *Config { + return &Config{ + DBHost: host, + DBPort: port, + DBName: dbName, + DBUser: user, + DBPassword: password, + MaxConnections: 4, + MinConnections: 0, + MaxConnectionLifetime: time.Hour, + MaxConnIdleTime: 30 * time.Minute, + HealthCheckPeriod: time.Minute, + } +} diff --git a/internal/postgres/postgres.go b/internal/postgres/postgres.go new file mode 100644 index 0000000..705cf98 --- /dev/null +++ b/internal/postgres/postgres.go @@ -0,0 +1,169 @@ +/* + * MIT License + * + * Copyright (c) 2022-2024 Tochemey + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package postgres + +import ( + "context" + "fmt" + + "github.com/georgysavva/scany/v2/pgxscan" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgconn" + "github.com/jackc/pgx/v5/pgxpool" +) + +// Postgres will be implemented by concrete RDBMS store +type Postgres interface { + // Connect connects to the underlying database + Connect(ctx context.Context) error + // Disconnect closes the underlying opened underlying connection database + Disconnect(ctx context.Context) error + // Select fetches a single row from the database and automatically scanned it into the dst. + // It returns an error in case of failure. When there is no record no errors is return. + Select(ctx context.Context, dst any, query string, args ...any) error + // SelectAll fetches a set of rows as defined by the query and scanned those record in the dst. + // It returns nil when there is no records to fetch. + SelectAll(ctx context.Context, dst any, query string, args ...any) error + // Exec executes an SQL statement against the database and returns the appropriate result or an error. + Exec(ctx context.Context, query string, args ...any) (pgconn.CommandTag, error) + // BeginTx helps start an SQL transaction. The return transaction object is expected to be used in + // the subsequent queries following the BeginTx. + BeginTx(ctx context.Context, txOptions pgx.TxOptions) (pgx.Tx, error) +} + +// Postgres helps interact with the Postgres database +type postgres struct { + connStr string + pool *pgxpool.Pool + config *Config +} + +var _ Postgres = (*postgres)(nil) + +const postgresDriver = "postgres" +const instrumentationName = "ego.events_store" + +// New returns a store connecting to the given Postgres database. +func New(config *Config) Postgres { + postgres := new(postgres) + postgres.config = config + postgres.connStr = createConnectionString(config.DBHost, config.DBPort, config.DBName, config.DBUser, config.DBPassword, config.DBSchema) + return postgres +} + +// Connect will connect to our Postgres database +func (pg *postgres) Connect(ctx context.Context) error { + // create the connection config + config, err := pgxpool.ParseConfig(pg.connStr) + if err != nil { + return fmt.Errorf("failed to parse connection string: %w", err) + } + + // amend some of the configuration + config.MaxConns = int32(pg.config.MaxConnections) + config.MaxConnLifetime = pg.config.MaxConnectionLifetime + config.MaxConnIdleTime = pg.config.MaxConnIdleTime + config.MinConns = int32(pg.config.MinConnections) + config.HealthCheckPeriod = pg.config.HealthCheckPeriod + + // connect to the pool + dbpool, err := pgxpool.NewWithConfig(ctx, config) + if err != nil { + return fmt.Errorf("failed to create the connection pool: %w", err) + } + + // let us test the connection + if err := dbpool.Ping(ctx); err != nil { + return fmt.Errorf("failed to ping the database connection: %w", err) + } + + // set the db handle + pg.pool = dbpool + return nil +} + +// createConnectionString will create the Postgres connection string from the +// supplied connection details +// TODO: enhance this with the SSL settings +func createConnectionString(host string, port int, name, user string, password string, schema string) string { + info := fmt.Sprintf("host=%s port=%d user=%s dbname=%s sslmode=disable", host, port, user, name) + // The Postgres driver gets confused in cases where the user has no password + // set but a password is passed, so only set password if its non-empty + if password != "" { + info += fmt.Sprintf(" password=%s", password) + } + + if schema != "" { + info += fmt.Sprintf(" search_path=%s", schema) + } + + return info +} + +// Exec executes a sql query without returning rows against the database +func (pg *postgres) Exec(ctx context.Context, query string, args ...interface{}) (pgconn.CommandTag, error) { + return pg.pool.Exec(ctx, query, args...) +} + +// BeginTx starts a new database transaction +func (pg *postgres) BeginTx(ctx context.Context, txOptions pgx.TxOptions) (pgx.Tx, error) { + return pg.pool.BeginTx(ctx, txOptions) +} + +// SelectAll fetches rows +func (pg *postgres) SelectAll(ctx context.Context, dst interface{}, query string, args ...interface{}) error { + err := pgxscan.Select(ctx, pg.pool, dst, query, args...) + if err != nil { + if pgxscan.NotFound(err) { + return nil + } + + return err + } + return nil +} + +// Select fetches only one row +func (pg *postgres) Select(ctx context.Context, dst interface{}, query string, args ...interface{}) error { + err := pgxscan.Get(ctx, pg.pool, dst, query, args...) + if err != nil { + if pgxscan.NotFound(err) { + return nil + } + return err + } + + return nil +} + +// Disconnect the database connection. +// nolint +func (pg *postgres) Disconnect(ctx context.Context) error { + if pg.pool == nil { + return nil + } + pg.pool.Close() + return nil +} diff --git a/internal/postgres/postgres_test.go b/internal/postgres/postgres_test.go new file mode 100644 index 0000000..717132e --- /dev/null +++ b/internal/postgres/postgres_test.go @@ -0,0 +1,325 @@ +/* + * MIT License + * + * Copyright (c) 2022-2024 Tochemey + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package postgres + +import ( + "context" + "testing" + "time" + + "github.com/google/uuid" + "github.com/stretchr/testify/suite" +) + +// account is a test struct +type account struct { + AccountID string + AccountName string +} + +// PostgresTestSuite will run the Postgres tests +type PostgresTestSuite struct { + suite.Suite + container *TestContainer +} + +// SetupSuite starts the Postgres database engine and set the container +// host and port to use in the tests +func (s *PostgresTestSuite) SetupSuite() { + s.container = NewTestContainer("testdb", "test", "test") +} + +func (s *PostgresTestSuite) TearDownSuite() { + s.container.Cleanup() +} + +// In order for 'go test' to run this suite, we need to create +// a normal test function and pass our suite to suite.Run +func TestPostgresTestSuite(t *testing.T) { + suite.Run(t, new(PostgresTestSuite)) +} + +func (s *PostgresTestSuite) TestConnect() { + s.Run("with valid connection settings", func() { + ctx := context.TODO() + db := s.container.GetTestDB() + + err := db.Connect(ctx) + s.Assert().NoError(err) + }) + + s.Run("with invalid database port", func() { + ctx := context.TODO() + db := New(&Config{ + DBUser: "test", + DBName: "testdb", + DBPassword: "test", + DBSchema: s.container.Schema(), + DBHost: s.container.Host(), + DBPort: -2, + MaxConnections: 4, + MinConnections: 0, + MaxConnectionLifetime: time.Hour, + MaxConnIdleTime: 30 * time.Minute, + HealthCheckPeriod: time.Minute, + }) + err := db.Connect(ctx) + s.Assert().Error(err) + }) + + s.Run("with invalid database name", func() { + ctx := context.TODO() + db := New(&Config{ + DBUser: "test", + DBName: "wrong-name", + DBPassword: "test", + DBSchema: s.container.Schema(), + DBHost: s.container.Host(), + DBPort: s.container.Port(), + MaxConnections: 4, + MinConnections: 0, + MaxConnectionLifetime: time.Hour, + MaxConnIdleTime: 30 * time.Minute, + HealthCheckPeriod: time.Minute, + }) + err := db.Connect(ctx) + s.Assert().Error(err) + }) + + s.Run("with invalid database user", func() { + ctx := context.TODO() + db := New(&Config{ + DBUser: "test-user", + DBName: "testdb", + DBPassword: "test", + DBSchema: s.container.Schema(), + DBHost: s.container.Host(), + DBPort: s.container.Port(), + MaxConnections: 4, + MinConnections: 0, + MaxConnectionLifetime: time.Hour, + MaxConnIdleTime: 30 * time.Minute, + HealthCheckPeriod: time.Minute, + }) + err := db.Connect(ctx) + s.Assert().Error(err) + }) + + s.Run("with invalid database password", func() { + ctx := context.TODO() + db := New(&Config{ + DBUser: "test", + DBName: "testdb", + DBPassword: "invalid-db-pass", + DBSchema: s.container.Schema(), + DBHost: s.container.Host(), + DBPort: s.container.Port(), + MaxConnections: 4, + MinConnections: 0, + MaxConnectionLifetime: time.Hour, + MaxConnIdleTime: 30 * time.Minute, + HealthCheckPeriod: time.Minute, + }) + + err := db.Connect(ctx) + s.Assert().Error(err) + }) +} + +func (s *PostgresTestSuite) TestExec() { + ctx := context.TODO() + db := s.container.GetTestDB() + err := db.Connect(ctx) + s.Assert().NoError(err) + + s.Run("with valid SQL statement", func() { + // let us create a test table + const schemaDDL = ` + CREATE TABLE accounts + ( + account_id UUID, + account_name VARCHAR(255) NOT NULL, + PRIMARY KEY (account_id) + ); + ` + _, err = db.Exec(ctx, schemaDDL) + s.Assert().NoError(err) + }) + + s.Run("with invalid SQL statement", func() { + const schemaDDL = `SOME-INVALID-SQL` + _, err = db.Exec(ctx, schemaDDL) + s.Assert().Error(err) + }) +} + +func (s *PostgresTestSuite) TestSelect() { + ctx := context.TODO() + db := s.container.GetTestDB() + err := db.Connect(ctx) + s.Assert().NoError(err) + + const selectSQL = `SELECT account_id, account_name FROM accounts WHERE account_id = $1` + + s.Run("with valid record", func() { + // first drop the table + err = db.DropTable(ctx, "accounts") + s.Assert().NoError(err) + + // create the database table + err = createTable(ctx, db) + s.Assert().NoError(err) + + // let us insert into that table + inserted := &account{ + AccountID: uuid.New().String(), + AccountName: "some-account", + } + err = insertInto(ctx, db, inserted) + s.Assert().NoError(err) + + // let us select the record inserted + selected := &account{} + err = db.Select(ctx, selected, selectSQL, inserted.AccountID) + s.Assert().NoError(err) + + // let us compare the selected data and the record added + s.Assert().Equal(inserted.AccountID, selected.AccountID) + s.Assert().Equal(inserted.AccountName, selected.AccountName) + }) + + s.Run("with no records", func() { + // first drop the table + err = db.DropTable(ctx, "accounts") + s.Assert().NoError(err) + + // create the database table + err = createTable(ctx, db) + s.Assert().NoError(err) + + var selected *account + err = db.Select(ctx, selected, selectSQL, uuid.New().String()) + s.Assert().NoError(err) + s.Assert().Nil(selected) + }) + + s.Run("with invalid SQL statement", func() { + var selected *account + err = db.Select(ctx, selected, "weird-sql", uuid.New().String()) + s.Assert().Error(err) + s.Assert().Nil(selected) + }) +} + +func (s *PostgresTestSuite) TestSelectAll() { + ctx := context.TODO() + db := s.container.GetTestDB() + err := db.Connect(ctx) + s.Assert().NoError(err) + + const selectSQL = `SELECT account_id, account_name FROM accounts;` + + s.Run("with valid records", func() { + // first drop the table + err = db.DropTable(ctx, "accounts") + s.Assert().NoError(err) + + // create the database table + err = createTable(ctx, db) + s.Assert().NoError(err) + + // let us insert into that table + inserted := &account{ + AccountID: uuid.New().String(), + AccountName: "some-account", + } + err = insertInto(ctx, db, inserted) + s.Assert().NoError(err) + + // let us select the record inserted + var selected []*account + err = db.SelectAll(ctx, &selected, selectSQL) + s.Assert().NoError(err) + s.Assert().Equal(1, len(selected)) + }) + + s.Run("with no records", func() { + // first drop the table + err = db.DropTable(ctx, "accounts") + s.Assert().NoError(err) + + // create the database table + err = createTable(ctx, db) + s.Assert().NoError(err) + + var selected []*account + err = db.SelectAll(ctx, &selected, selectSQL) + s.Assert().NoError(err) + s.Assert().Nil(selected) + }) + + s.Run("with invalid SQL statement", func() { + var selected []*account + err = db.SelectAll(ctx, selected, "weird-sql", uuid.New().String()) + s.Assert().Error(err) + s.Assert().Nil(selected) + }) +} + +func (s *PostgresTestSuite) TestClose() { + ctx := context.TODO() + db := s.container.GetTestDB() + err := db.Connect(ctx) + s.Assert().NoError(err) + + // close the db connection + err = db.Disconnect(ctx) + s.Assert().NoError(err) + + // let us execute a query against a closed connection + err = db.TableExists(ctx, "accounts") + s.Assert().Error(err) + s.Assert().EqualError(err, "closed pool") +} + +func createTable(ctx context.Context, db Postgres) error { + // let us create a test table + const schemaDDL = ` + CREATE TABLE IF NOT EXISTS accounts + ( + account_id UUID, + account_name VARCHAR(255) NOT NULL, + PRIMARY KEY (account_id) + ); + ` + _, err := db.Exec(ctx, schemaDDL) + return err +} + +func insertInto(ctx context.Context, db Postgres, account *account) error { + const insertSQL = `INSERT INTO accounts(account_id, account_name) VALUES($1, $2);` + _, err := db.Exec(ctx, insertSQL, account.AccountID, account.AccountName) + return err +} diff --git a/internal/postgres/testkit.go b/internal/postgres/testkit.go new file mode 100644 index 0000000..77a8bf8 --- /dev/null +++ b/internal/postgres/testkit.go @@ -0,0 +1,252 @@ +/* + * MIT License + * + * Copyright (c) 2022-2024 Tochemey + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package postgres + +import ( + "context" + "database/sql" + "errors" + "fmt" + "log" + "net" + "strconv" + "time" + + _ "github.com/lib/pq" //nolint + dockertest "github.com/ory/dockertest/v3" + "github.com/ory/dockertest/v3/docker" +) + +// TestContainer helps creates a Postgres docker container to +// run unit tests +type TestContainer struct { + host string + port int + schema string + + resource *dockertest.Resource + pool *dockertest.Pool + + // connection credentials + dbUser string + dbName string + dbPass string +} + +// NewTestContainer create a Postgres test container useful for unit and integration tests +// This function will exit when there is an error.Call this function inside your SetupTest to create the container before each test. +func NewTestContainer(dbName, dbUser, dbPassword string) *TestContainer { + // create the docker pool + pool, err := dockertest.NewPool("") + if err != nil { + log.Fatalf("Could not connect to docker: %s", err) + } + // pulls an image, creates a container based on it and runs it + resource, err := pool.RunWithOptions(&dockertest.RunOptions{ + Repository: "postgres", + Tag: "11", + Env: []string{ + fmt.Sprintf("POSTGRES_PASSWORD=%s", dbPassword), + fmt.Sprintf("POSTGRES_USER=%s", dbUser), + fmt.Sprintf("POSTGRES_DB=%s", dbName), + "listen_addresses = '*'", + }, + Cmd: []string{ + "postgres", "-c", "log_statement=all", "-c", "log_connections=on", "-c", "log_disconnections=on", + }, + }, func(config *docker.HostConfig) { + // set AutoRemove to true so that stopped container goes away by itself + config.AutoRemove = true + config.RestartPolicy = docker.RestartPolicy{Name: "no"} + }) + // handle the error + if err != nil { + log.Fatalf("Could not start resource: %s", err) + } + // get the host and port of the database connection + hostAndPort := resource.GetHostPort("5432/tcp") + databaseURL := fmt.Sprintf("postgres://%s:%s@%s/%s?sslmode=disable", dbUser, dbPassword, hostAndPort, dbName) + log.Println("Connecting to database on url: ", databaseURL) + // Tell docker to hard kill the container in 120 seconds + _ = resource.Expire(120) + // exponential backoff-retry, because the application in the container might not be ready to accept connections yet + pool.MaxWait = 120 * time.Second + + if err = pool.Retry(func() error { + db, err := sql.Open("postgres", databaseURL) + if err != nil { + return err + } + return db.Ping() + }); err != nil { + log.Fatalf("Could not connect to docker: %s", err) + } + + // create an instance of TestContainer + container := new(TestContainer) + container.pool = pool + container.resource = resource + host, port, err := splitHostAndPort(hostAndPort) + if err != nil { + log.Fatalf("Unable to get database host and port: %s", err) + } + // set the container host, port and schema + container.dbName = dbName + container.dbUser = dbUser + container.dbPass = dbPassword + container.host = host + container.port = port + container.schema = "public" + return container +} + +// GetTestDB returns a Postgres TestDB that can be used in the tests +// to perform some database queries +func (c TestContainer) GetTestDB() *TestDB { + return &TestDB{ + New(&Config{ + DBHost: c.host, + DBPort: c.port, + DBName: c.dbName, + DBUser: c.dbUser, + DBPassword: c.dbPass, + DBSchema: c.schema, + MaxConnections: 4, + MinConnections: 0, + MaxConnectionLifetime: time.Hour, + MaxConnIdleTime: 30 * time.Minute, + HealthCheckPeriod: time.Minute, + }), + } +} + +// Host return the host of the test container +func (c TestContainer) Host() string { + return c.host +} + +// Port return the port of the test container +func (c TestContainer) Port() int { + return c.port +} + +// Schema return the test schema of the test container +func (c TestContainer) Schema() string { + return c.schema +} + +// Cleanup frees the resource by removing a container and linked volumes from docker. +// Call this function inside your TearDownSuite to clean-up resources after each test +func (c TestContainer) Cleanup() { + if err := c.pool.Purge(c.resource); err != nil { + log.Fatalf("Could not purge resource: %s", err) + } +} + +// TestDB is used in test to perform +// some database queries +type TestDB struct { + Postgres +} + +// DropTable utility function to drop a database table +func (c TestDB) DropTable(ctx context.Context, tableName string) error { + var dropSQL = fmt.Sprintf("DROP TABLE IF EXISTS %s CASCADE;", tableName) + _, err := c.Exec(ctx, dropSQL) + return err +} + +// TableExists utility function to help check the existence of table in Postgres +// tableName is in the format: . e.g: public.users +func (c TestDB) TableExists(ctx context.Context, tableName string) error { + var stmt = fmt.Sprintf("SELECT to_regclass('%s');", tableName) + _, err := c.Exec(ctx, stmt) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return nil + } + return err + } + + return nil +} + +// Count utility function to help count the number of rows in a Postgres table. +// tableName is in the format: . e.g: public.users +// It returns -1 when there is an error +func (c TestDB) Count(ctx context.Context, tableName string) (int, error) { + var count int + if err := c.Select(ctx, &count, fmt.Sprintf("SELECT COUNT(*) FROM %s", tableName)); err != nil { + return -1, err + } + return count, nil +} + +// CreateSchema helps create a test schema in a Postgres database +func (c TestDB) CreateSchema(ctx context.Context, schemaName string) error { + stmt := fmt.Sprintf("CREATE SCHEMA %s", schemaName) + if _, err := c.Exec(ctx, stmt); err != nil { + return err + } + return nil +} + +// SchemaExists helps check the existence of a Postgres schema. Very useful when implementing tests +func (c TestDB) SchemaExists(ctx context.Context, schemaName string) (bool, error) { + stmt := fmt.Sprintf("SELECT schema_name FROM information_schema.schemata WHERE schema_name = '%s';", schemaName) + var check string + if err := c.Select(ctx, &check, stmt); err != nil { + return false, err + } + + // this redundant check is necessary + if check == schemaName { + return true, nil + } + + return false, nil +} + +// DropSchema utility function to drop a database schema +func (c TestDB) DropSchema(ctx context.Context, schemaName string) error { + var dropSQL = fmt.Sprintf("DROP SCHEMA IF EXISTS %s CASCADE;", schemaName) + _, err := c.Exec(ctx, dropSQL) + return err +} + +// splitHostAndPort helps get the host address and port of and address +func splitHostAndPort(hostAndPort string) (string, int, error) { + host, port, err := net.SplitHostPort(hostAndPort) + if err != nil { + return "", -1, err + } + + portValue, err := strconv.Atoi(port) + if err != nil { + return "", -1, err + } + + return host, portValue, nil +} diff --git a/internal/postgres/testkit_test.go b/internal/postgres/testkit_test.go new file mode 100644 index 0000000..9d9360c --- /dev/null +++ b/internal/postgres/testkit_test.go @@ -0,0 +1,215 @@ +/* + * MIT License + * + * Copyright (c) 2022-2024 Tochemey + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package postgres + +import ( + "context" + "testing" + + "github.com/stretchr/testify/suite" +) + +type testkitSuite struct { + suite.Suite + container *TestContainer +} + +// SetupSuite starts the Postgres database engine and set the container +// host and port to use in the tests +func (s *testkitSuite) SetupSuite() { + s.container = NewTestContainer("testdb", "test", "test") +} + +func (s *testkitSuite) TearDownSuite() { + s.container.Cleanup() +} + +// In order for 'go test' to run this suite, we need to create +// a normal test function and pass our suite to suite.Run +func TestTestKitSuite(t *testing.T) { + suite.Run(t, new(testkitSuite)) +} + +func (s *testkitSuite) TestDropTable() { + s.Run("with no table defined", func() { + ctx := context.TODO() + db := s.container.GetTestDB() + + err := db.Connect(ctx) + s.Assert().NoError(err) + + // drop fake table + err = db.DropTable(ctx, "fake") + s.Assert().NoError(err) + s.Assert().Nil(err) + + err = db.Disconnect(ctx) + s.Assert().NoError(err) + }) +} + +func (s *testkitSuite) TestTableExist() { + s.Run("with no table defined", func() { + ctx := context.TODO() + db := s.container.GetTestDB() + + err := db.Connect(ctx) + s.Assert().NoError(err) + + // check fake table existence + err = db.TableExists(ctx, "fake") + s.Assert().NoError(err) + s.Assert().Nil(err) + err = db.Disconnect(ctx) + s.Assert().NoError(err) + }) +} + +func (s *testkitSuite) TestCreateAndCheckExistence() { + s.Run("happy path", func() { + ctx := context.TODO() + const schemaName = "example" + + db := s.container.GetTestDB() + + err := db.Connect(ctx) + s.Assert().NoError(err) + + err = db.CreateSchema(ctx, schemaName) + s.Assert().NoError(err) + + ok, err := db.SchemaExists(ctx, schemaName) + s.Assert().NoError(err) + s.Assert().True(ok) + + err = db.DropSchema(ctx, schemaName) + s.Assert().NoError(err) + + err = db.Disconnect(ctx) + s.Assert().NoError(err) + }) + s.Run("schema does not exist", func() { + ctx := context.TODO() + const schemaName = "example" + + db := s.container.GetTestDB() + + err := db.Connect(ctx) + s.Assert().NoError(err) + ok, err := db.SchemaExists(ctx, schemaName) + s.Assert().NoError(err) + s.Assert().False(ok) + + err = db.Disconnect(ctx) + s.Assert().NoError(err) + }) +} + +func (s *testkitSuite) TestCreateTable() { + s.Run("happy path", func() { + ctx := context.TODO() + const stmt = `create table mangoes(id serial, taste varchar(10));` + + db := s.container.GetTestDB() + + err := db.Connect(ctx) + s.Assert().NoError(err) + + _, err = db.Exec(ctx, stmt) + s.Assert().NoError(err) + + err = db.TableExists(ctx, "public.mangoes") + s.Assert().NoError(err) + s.Assert().Nil(err) + + err = db.DropTable(ctx, "public.mangoes") + s.Assert().NoError(err) + + err = db.Disconnect(ctx) + s.Assert().NoError(err) + }) + s.Run("happy path in a different schema", func() { + ctx := context.TODO() + const schemaName = "example" + const stmt = `create table example.mangoes(id serial, taste varchar(10));` + + db := s.container.GetTestDB() + + err := db.Connect(ctx) + s.Assert().NoError(err) + + err = db.CreateSchema(ctx, schemaName) + s.Assert().NoError(err) + + ok, err := db.SchemaExists(ctx, schemaName) + s.Assert().NoError(err) + s.Assert().True(ok) + + _, err = db.Exec(ctx, stmt) + s.Assert().NoError(err) + + err = db.TableExists(ctx, "example.mangoes") + s.Assert().NoError(err) + s.Assert().Nil(err) + + err = db.DropSchema(ctx, schemaName) + s.Assert().NoError(err) + }) +} + +func (s *testkitSuite) TestCount() { + ctx := context.TODO() + const schemaName = "example" + const stmt = `create table example.mangoes(id serial, taste varchar(10));` + + db := s.container.GetTestDB() + + err := db.Connect(ctx) + s.Assert().NoError(err) + + err = db.CreateSchema(ctx, schemaName) + s.Assert().NoError(err) + + ok, err := db.SchemaExists(ctx, schemaName) + s.Assert().NoError(err) + s.Assert().True(ok) + + _, err = db.Exec(ctx, stmt) + s.Assert().NoError(err) + + err = db.TableExists(ctx, "example.mangoes") + s.Assert().NoError(err) + s.Assert().Nil(err) + + count, err := db.Count(ctx, "example.mangoes") + s.Assert().NoError(err) + s.Assert().Equal(0, count) + + err = db.DropSchema(ctx, schemaName) + s.Assert().NoError(err) + + err = db.Disconnect(ctx) + s.Assert().NoError(err) +} diff --git a/internal/queue/queue.go b/internal/queue/queue.go index e71119b..7b866e5 100644 --- a/internal/queue/queue.go +++ b/internal/queue/queue.go @@ -25,6 +25,7 @@ package queue import ( + "sync" "sync/atomic" "unsafe" ) @@ -34,6 +35,7 @@ type Queue struct { head unsafe.Pointer // pointer to the head of the queue tail unsafe.Pointer // pointer to the tail of the queue len uint64 // length of the queue + pool sync.Pool } // item is a single node in the queue. @@ -49,12 +51,19 @@ func NewQueue() *Queue { return &Queue{ head: unsafe.Pointer(dummy), // both head and tail point to the dummy node tail: unsafe.Pointer(dummy), + pool: sync.Pool{ + New: func() interface{} { + return &item{} + }, + }, } } // Enqueue adds a value to the tail of the queue. func (q *Queue) Enqueue(v interface{}) { - xitem := &item{v: v} + xitem := q.pool.Get().(*item) + xitem.next = nil + xitem.v = v for { last := loadItem(&q.tail) // Load current tail lastNext := loadItem(&last.next) // Load the next pointer of tail @@ -95,7 +104,8 @@ func (q *Queue) Dequeue() interface{} { // Try to swing the head to the next node if casItem(&q.head, head, next) { atomic.AddUint64(&q.len, ^uint64(0)) // decrement length - return v // return the dequeued value + q.pool.Put(next) + return v // return the dequeued value } } } diff --git a/offsetstore/postgres/config.go b/offsetstore/postgres/config.go new file mode 100644 index 0000000..b1717c4 --- /dev/null +++ b/offsetstore/postgres/config.go @@ -0,0 +1,35 @@ +/* + * MIT License + * + * Copyright (c) 2022-2024 Tochemey + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package postgres + +// Config defines the postgres offset store configuration +type Config struct { + DBHost string // DBHost represents the database host + DBPort int // DBPort is the database port + DBName string // DBName is the database name + DBUser string // DBUser is the database user used to connect + DBPassword string // DBPassword is the database password + DBSchema string // DBSchema represents the database schema +} diff --git a/offsetstore/postgres/helper_test.go b/offsetstore/postgres/helper_test.go index b666fcf..8fe971a 100644 --- a/offsetstore/postgres/helper_test.go +++ b/offsetstore/postgres/helper_test.go @@ -29,7 +29,7 @@ import ( "os" "testing" - "github.com/tochemey/gopack/postgres" + "github.com/tochemey/ego/v3/internal/postgres" ) var testContainer *postgres.TestContainer diff --git a/offsetstore/postgres/postgres.go b/offsetstore/postgres/postgres.go index 208c31d..4000227 100644 --- a/offsetstore/postgres/postgres.go +++ b/offsetstore/postgres/postgres.go @@ -26,18 +26,17 @@ package postgres import ( "context" - "database/sql" "errors" "fmt" "time" sq "github.com/Masterminds/squirrel" + "github.com/jackc/pgx/v5" "go.uber.org/atomic" "google.golang.org/protobuf/proto" - "github.com/tochemey/gopack/postgres" - "github.com/tochemey/ego/v3/egopb" + "github.com/tochemey/ego/v3/internal/postgres" "github.com/tochemey/ego/v3/offsetstore" ) @@ -84,9 +83,9 @@ type OffsetStore struct { var _ offsetstore.OffsetStore = (*OffsetStore)(nil) // NewOffsetStore creates an instance of OffsetStore -func NewOffsetStore(config *postgres.Config) *OffsetStore { +func NewOffsetStore(config *Config) *OffsetStore { // create the underlying db connection - db := postgres.New(config) + db := postgres.New(postgres.NewConfig(config.DBHost, config.DBPort, config.DBUser, config.DBPassword, config.DBName)) return &OffsetStore{ db: db, sb: sq.StatementBuilder.PlaceholderFormat(sq.Dollar), @@ -143,7 +142,7 @@ func (x *OffsetStore) WriteOffset(ctx context.Context, offset *egopb.Offset) err } // let us begin a database transaction to make sure we atomically write those events into the database - tx, err := x.db.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelReadCommitted}) + tx, err := x.db.BeginTx(ctx, pgx.TxOptions{IsoLevel: pgx.ReadCommitted}) // return the error in case we are unable to get a database transaction if err != nil { return fmt.Errorf("failed to obtain a database transaction: %w", err) @@ -168,10 +167,10 @@ func (x *OffsetStore) WriteOffset(ctx context.Context, offset *egopb.Offset) err } // execute the query - _, execErr := tx.ExecContext(ctx, query, args...) + _, execErr := tx.Exec(ctx, query, args...) if execErr != nil { // attempt to roll back the transaction and log the error in case there is an error - if err = tx.Rollback(); err != nil { + if err = tx.Rollback(ctx); err != nil { return fmt.Errorf("unable to rollback db transaction: %w", err) } // return the main error @@ -196,10 +195,10 @@ func (x *OffsetStore) WriteOffset(ctx context.Context, offset *egopb.Offset) err } // insert into the table - _, execErr = tx.ExecContext(ctx, query, args...) + _, execErr = tx.Exec(ctx, query, args...) if execErr != nil { // attempt to roll back the transaction and log the error in case there is an error - if err = tx.Rollback(); err != nil { + if err = tx.Rollback(ctx); err != nil { return fmt.Errorf("unable to rollback db transaction: %w", err) } // return the main error @@ -207,7 +206,7 @@ func (x *OffsetStore) WriteOffset(ctx context.Context, offset *egopb.Offset) err } // commit the transaction - if commitErr := tx.Commit(); commitErr != nil { + if commitErr := tx.Commit(ctx); commitErr != nil { // return the commit error in case there is one return fmt.Errorf("failed to record events: %w", commitErr) } @@ -257,7 +256,7 @@ func (x *OffsetStore) ResetOffset(ctx context.Context, projectionName string, va } // let us begin a database transaction to make sure we atomically write those events into the database - tx, err := x.db.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelReadCommitted}) + tx, err := x.db.BeginTx(ctx, pgx.TxOptions{IsoLevel: pgx.ReadCommitted}) // return the error in case we are unable to get a database transaction if err != nil { return fmt.Errorf("failed to obtain a database transaction: %w", err) @@ -281,10 +280,10 @@ func (x *OffsetStore) ResetOffset(ctx context.Context, projectionName string, va } // insert into the table - _, execErr := tx.ExecContext(ctx, query, args...) + _, execErr := tx.Exec(ctx, query, args...) if execErr != nil { // attempt to roll back the transaction and log the error in case there is an error - if err = tx.Rollback(); err != nil { + if err = tx.Rollback(ctx); err != nil { return fmt.Errorf("unable to rollback db transaction: %w", err) } // return the main error @@ -292,7 +291,7 @@ func (x *OffsetStore) ResetOffset(ctx context.Context, projectionName string, va } // commit the transaction - if commitErr := tx.Commit(); commitErr != nil { + if commitErr := tx.Commit(ctx); commitErr != nil { // return the commit error in case there is one return fmt.Errorf("failed to record events: %w", commitErr) } diff --git a/offsetstore/postgres/postgres_test.go b/offsetstore/postgres/postgres_test.go index 56dd4c7..64c965d 100644 --- a/offsetstore/postgres/postgres_test.go +++ b/offsetstore/postgres/postgres_test.go @@ -33,21 +33,19 @@ import ( "github.com/stretchr/testify/require" "google.golang.org/protobuf/proto" - "github.com/tochemey/gopack/postgres" - "github.com/tochemey/ego/v3/egopb" "github.com/tochemey/ego/v3/offsetstore" ) func TestPostgresOffsetStore(t *testing.T) { t.Run("testNewOffsetStore", func(t *testing.T) { - config := &postgres.Config{ - DBHost: testContainer.Host(), - DBPort: testContainer.Port(), - DBName: testDatabase, - DBUser: testUser, - DBPassword: testDatabasePassword, - DBSchema: testContainer.Schema(), + config := &Config{ + testContainer.Host(), + testContainer.Port(), + testDatabase, + testUser, + testDatabasePassword, + testContainer.Schema(), } estore := NewOffsetStore(config) @@ -58,13 +56,13 @@ func TestPostgresOffsetStore(t *testing.T) { }) t.Run("testConnect:happy path", func(t *testing.T) { ctx := context.TODO() - config := &postgres.Config{ - DBHost: testContainer.Host(), - DBPort: testContainer.Port(), - DBName: testDatabase, - DBUser: testUser, - DBPassword: testDatabasePassword, - DBSchema: testContainer.Schema(), + config := &Config{ + testContainer.Host(), + testContainer.Port(), + testDatabase, + testUser, + testDatabasePassword, + testContainer.Schema(), } store := NewOffsetStore(config) @@ -76,30 +74,29 @@ func TestPostgresOffsetStore(t *testing.T) { }) t.Run("testConnect:database does not exist", func(t *testing.T) { ctx := context.TODO() - config := &postgres.Config{ - DBHost: testContainer.Host(), - DBPort: testContainer.Port(), - DBName: "testDatabase", - DBUser: testUser, - DBPassword: testDatabasePassword, - DBSchema: testContainer.Schema(), + config := &Config{ + testContainer.Host(), + testContainer.Port(), + "testDatabase", + testUser, + testDatabasePassword, + testContainer.Schema(), } store := NewOffsetStore(config) assert.NotNil(t, store) err := store.Connect(ctx) assert.Error(t, err) - assert.EqualError(t, err, "failed to ping database connection: pq: database \"testDatabase\" does not exist") }) t.Run("testWriteOffset", func(t *testing.T) { ctx := context.TODO() - config := &postgres.Config{ - DBHost: testContainer.Host(), - DBPort: testContainer.Port(), - DBName: testDatabase, - DBUser: testUser, - DBPassword: testDatabasePassword, - DBSchema: testContainer.Schema(), + config := &Config{ + testContainer.Host(), + testContainer.Port(), + testDatabase, + testUser, + testDatabasePassword, + testContainer.Schema(), } db, err := dbHandle(ctx) @@ -139,13 +136,13 @@ func TestPostgresOffsetStore(t *testing.T) { }) t.Run("testResetOffset", func(t *testing.T) { ctx := context.TODO() - config := &postgres.Config{ - DBHost: testContainer.Host(), - DBPort: testContainer.Port(), - DBName: testDatabase, - DBUser: testUser, - DBPassword: testDatabasePassword, - DBSchema: testContainer.Schema(), + config := &Config{ + testContainer.Host(), + testContainer.Port(), + testDatabase, + testUser, + testDatabasePassword, + testContainer.Schema(), } db, err := dbHandle(ctx) diff --git a/offsetstore/postgres/schema_utils.go b/offsetstore/postgres/schema_utils.go index f716178..4397594 100644 --- a/offsetstore/postgres/schema_utils.go +++ b/offsetstore/postgres/schema_utils.go @@ -27,7 +27,7 @@ package postgres import ( "context" - "github.com/tochemey/gopack/postgres" + "github.com/tochemey/ego/v3/internal/postgres" ) // SchemaUtils help create the various test tables in unit/integration tests diff --git a/option.go b/option.go index 02dc24a..6679938 100644 --- a/option.go +++ b/option.go @@ -29,7 +29,6 @@ import ( "github.com/tochemey/goakt/v2/discovery" "github.com/tochemey/goakt/v2/log" - "github.com/tochemey/goakt/v2/telemetry" ) // Option is the interface that applies a configuration option. @@ -68,10 +67,3 @@ func WithLogger(logger log.Logger) Option { e.logger = logger }) } - -// WithTelemetry sets the telemetry engine -func WithTelemetry(telemetry *telemetry.Telemetry) Option { - return OptionFunc(func(e *Engine) { - e.telemetry = telemetry - }) -} diff --git a/option_test.go b/option_test.go index c8d6131..7e77d97 100644 --- a/option_test.go +++ b/option_test.go @@ -32,7 +32,6 @@ import ( "github.com/tochemey/goakt/v2/discovery/kubernetes" "github.com/tochemey/goakt/v2/log" - "github.com/tochemey/goakt/v2/telemetry" ) func TestOptions(t *testing.T) { @@ -40,7 +39,6 @@ func TestOptions(t *testing.T) { logger := log.DefaultLogger // create a discovery provider discoveryProvider := kubernetes.NewDiscovery(&kubernetes.Config{}) - tel := telemetry.New() testCases := []struct { name string @@ -66,11 +64,6 @@ func TestOptions(t *testing.T) { option: WithLogger(logger), expected: Engine{logger: logger}, }, - { - name: "WithTelemetry", - option: WithTelemetry(tel), - expected: Engine{telemetry: tel}, - }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { diff --git a/projection/actor_test.go b/projection/actor_test.go index 0d88912..ff0dd7a 100644 --- a/projection/actor_test.go +++ b/projection/actor_test.go @@ -41,6 +41,7 @@ import ( "github.com/tochemey/ego/v3/egopb" "github.com/tochemey/ego/v3/eventstore/memory" + "github.com/tochemey/ego/v3/internal/lib" memoffsetstore "github.com/tochemey/ego/v3/offsetstore/memory" testpb "github.com/tochemey/ego/v3/test/data/pb/v3" ) @@ -61,12 +62,12 @@ func TestActor(t *testing.T) { err = actorSystem.Start(ctx) require.NoError(t, err) - time.Sleep(time.Second) + lib.Pause(time.Second) projectionName := "db-writer" persistenceID := uuid.NewString() shardNumber := uint64(9) - logger := log.DefaultLogger + logger := log.DiscardLogger // set up the event store journalStore := memory.NewEventsStore() @@ -87,7 +88,7 @@ func TestActor(t *testing.T) { require.NoError(t, err) require.NotNil(t, pid) - time.Sleep(time.Second) + lib.Pause(time.Second) // start the projection require.NoError(t, actors.Tell(ctx, pid, Start)) @@ -117,7 +118,7 @@ func TestActor(t *testing.T) { require.NoError(t, journalStore.WriteEvents(ctx, journals)) // wait for the data to be persisted by the database since this an eventual consistency case - time.Sleep(time.Second) + lib.Pause(time.Second) // create the projection id projectionID := &egopb.ProjectionId{ diff --git a/projection/runner_test.go b/projection/runner_test.go index fc2d048..ec72355 100644 --- a/projection/runner_test.go +++ b/projection/runner_test.go @@ -42,6 +42,7 @@ import ( "github.com/tochemey/ego/v3/egopb" "github.com/tochemey/ego/v3/eventstore/memory" + "github.com/tochemey/ego/v3/internal/lib" memoffsetstore "github.com/tochemey/ego/v3/offsetstore/memory" testpb "github.com/tochemey/ego/v3/test/data/pb/v3" ) @@ -104,7 +105,7 @@ func TestProjection(t *testing.T) { require.True(t, runner.started.Load()) // wait for the data to be persisted by the database since this an eventual consistency case - time.Sleep(time.Second) + lib.Pause(time.Second) // create the projection id projectionID := &egopb.ProjectionId{ @@ -178,7 +179,7 @@ func TestProjection(t *testing.T) { require.True(t, runner.started.Load()) // wait for the data to be persisted by the database since this an eventual consistency case - time.Sleep(time.Second) + lib.Pause(time.Second) // here due to the default recovery strategy the projection is stopped require.False(t, runner.started.Load()) @@ -245,7 +246,7 @@ func TestProjection(t *testing.T) { require.True(t, runner.started.Load()) // wait for the data to be persisted by the database since this an eventual consistency case - time.Sleep(1 * time.Second) + lib.Pause(1 * time.Second) // let us grab the current offset require.False(t, runner.started.Load()) @@ -313,7 +314,7 @@ func TestProjection(t *testing.T) { require.True(t, runner.started.Load()) // wait for the data to be persisted by the database since this an eventual consistency case - time.Sleep(time.Second) + lib.Pause(time.Second) projectionID := &egopb.ProjectionId{ ProjectionName: projectionName, @@ -389,7 +390,7 @@ func TestProjection(t *testing.T) { require.True(t, runner.started.Load()) // wait for the data to be persisted by the database since this an eventual consistency case - time.Sleep(time.Second) + lib.Pause(time.Second) projectionID := &egopb.ProjectionId{ ProjectionName: projectionName,