diff --git a/eventstream/iface.go b/eventstream/iface.go new file mode 100644 index 0000000..12b60cb --- /dev/null +++ b/eventstream/iface.go @@ -0,0 +1,27 @@ +/* + * Copyright (c) 2022-2023 Tochemey + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package eventstream + +// Stream defines the stream interface +type Stream interface { +} diff --git a/eventstream/logger.go b/eventstream/logger.go new file mode 100644 index 0000000..a3d97ce --- /dev/null +++ b/eventstream/logger.go @@ -0,0 +1,72 @@ +/* + * Copyright (c) 2022-2023 Tochemey + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package eventstream + +import ( + natsserver "github.com/nats-io/nats-server/v2/server" + "github.com/tochemey/goakt/log" +) + +// nLogger defines the NATS JetStream logger +type nLogger struct { + logger log.Logger +} + +// newNLogger creates an instance of nLogger +func newNLogger(logger log.Logger) *nLogger { + return &nLogger{logger: logger} +} + +// Noticef implementation +func (n nLogger) Noticef(format string, v ...interface{}) { + n.logger.Infof(format, v...) +} + +// Warnf implementation +func (n nLogger) Warnf(format string, v ...interface{}) { + n.logger.Warnf(format, v...) +} + +// Fatalf implementation +func (n nLogger) Fatalf(format string, v ...interface{}) { + n.logger.Fatalf(format, v...) +} + +// Errorf implementation +func (n nLogger) Errorf(format string, v ...interface{}) { + n.logger.Errorf(format, v...) +} + +// Debugf implementation +func (n nLogger) Debugf(format string, v ...interface{}) { + n.logger.Debugf(format, v...) +} + +// Tracef implementation +func (n nLogger) Tracef(format string, v ...interface{}) { + // TODO: maybe use info or implements trace + n.logger.Warnf(format, v...) +} + +// enforce the implementation of the NATS logger interface +var _ natsserver.Logger = (*nLogger)(nil) diff --git a/eventstream/server.go b/eventstream/server.go new file mode 100644 index 0000000..534fc23 --- /dev/null +++ b/eventstream/server.go @@ -0,0 +1,133 @@ +/* + * Copyright (c) 2022-2023 Tochemey + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package eventstream + +import ( + "context" + "time" + + natsserver "github.com/nats-io/nats-server/v2/server" + natsclient "github.com/nats-io/nats.go" + "github.com/pkg/errors" + "github.com/tochemey/goakt/log" + "go.uber.org/atomic" +) + +var ( + // ErrStreamServerNotStarted is returned when the stream server is not started + ErrStreamServerNotStarted = errors.New("events stream server not started") + // ErrStreamServerStartFailure is returned when the stream server fails to start + ErrStreamServerStartFailure = errors.New("events stream server failed to start") + // ErrStreamServerAlreadyStarted is returned when the stream server is already started + ErrStreamServerAlreadyStarted = errors.New("events stream server already started") +) + +// EventsStream defines the events stream engine +type EventsStream struct { + server *natsserver.Server + logger log.Logger + + started *atomic.Bool + startTimeout time.Duration + + publisher *natsclient.Conn +} + +// NewEventsStream creates an instance of EventsStream +func NewEventsStream() (*EventsStream, error) { + // create an instance of the stream server + // TODO: add optional params to overwrite this default settings + streamServer := &EventsStream{ + logger: log.DefaultLogger, + started: atomic.NewBool(false), + startTimeout: 5 * time.Second, + } + + // create the instance of the NATS JetStream server + // TODO: enhance the configuration when needed + server, err := natsserver.NewServer(&natsserver.Options{ + DontListen: true, + JetStream: true, + Logtime: true, + NoLog: false, + }) + // return an error in case there is one + if err != nil { + return nil, err + } + + // create the NATS logger + l := newNLogger(streamServer.logger) + // set the debug flag + debugFlag := streamServer.logger.LogLevel() == log.DebugLevel + // set the server logger + server.SetLogger(l, debugFlag, true) + // configure the logger + server.ConfigureLogger() + // set the stream server underlying server + streamServer.server = server + + // successful + return streamServer, nil +} + +// Start starts the stream server +// nolint +func (x *EventsStream) Start(ctx context.Context) error { + // add a logging information + x.logger.Info("starting the events stream server...") + // check whether the server is already started + if x.started.Load() { + return ErrStreamServerAlreadyStarted + } + + // start the NATS JetStream server + x.server.Start() + // wait for the server to be ready + if !x.server.ReadyForConnections(x.startTimeout) { + x.logger.Error("stream server failed to start") + return ErrStreamServerStartFailure + } + + // set the started + x.started.Store(true) + // add a logging information + x.logger.Info("events stream server started successfully") + return nil +} + +// Stop stops the stream server +func (x *EventsStream) Stop() error { + // add a logging information + x.logger.Info("stopping the events stream server...") + // check whether the server is already started + if !x.started.Load() { + return ErrStreamServerNotStarted + } + + // shutdown the underlying NATS JetStream server + x.server.Shutdown() + // set the started to false + x.started.Store(false) + return nil +} diff --git a/go.mod b/go.mod index a81c34c..2468139 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,8 @@ require ( github.com/flowchartsman/retry v1.2.0 github.com/google/uuid v1.3.1 github.com/hashicorp/go-memdb v1.3.4 + github.com/nats-io/nats-server/v2 v2.10.3 + github.com/nats-io/nats.go v1.30.2 github.com/pkg/errors v0.9.1 github.com/stretchr/testify v1.8.4 github.com/tochemey/goakt v1.0.0 @@ -65,15 +67,20 @@ require ( github.com/hashicorp/memberlist v0.5.0 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect + github.com/klauspost/compress v1.17.0 // indirect github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 // indirect github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 // indirect github.com/lib/pq v1.10.9 // indirect github.com/mailru/easyjson v0.7.7 // indirect github.com/miekg/dns v1.1.55 // indirect + github.com/minio/highwayhash v1.0.2 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/mschoch/smat v0.2.0 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/nats-io/jwt/v2 v2.5.2 // indirect + github.com/nats-io/nkeys v0.4.5 // indirect + github.com/nats-io/nuid v1.0.1 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.0.2 // indirect github.com/opencontainers/runc v1.1.9 // indirect @@ -91,6 +98,7 @@ require ( go.opentelemetry.io/otel/metric v1.19.0 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.26.0 // indirect + golang.org/x/crypto v0.13.0 // indirect golang.org/x/mod v0.12.0 // indirect golang.org/x/net v0.15.0 // indirect golang.org/x/oauth2 v0.11.0 // indirect diff --git a/go.sum b/go.sum index 1bdebd3..537333c 100644 --- a/go.sum +++ b/go.sum @@ -178,6 +178,8 @@ github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHm github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/klauspost/compress v1.17.0 h1:Rnbp4K9EjcDuVuHtd0dgA4qNuv9yKDYKK1ulpJwgrqM= +github.com/klauspost/compress v1.17.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= @@ -202,6 +204,8 @@ github.com/miekg/dns v1.1.26/go.mod h1:bPDLeHnStXmXAq1m/Ch/hvfNHr14JKNPMBo3VZKju github.com/miekg/dns v1.1.45/go.mod h1:e3IlAVfNqAllflbibAZEWOXOQ+Ynzk/dDozDxY7XnME= github.com/miekg/dns v1.1.55 h1:GoQ4hpsj0nFLYe+bWiCToyrBEJXkQfOOIvFGFy0lEgo= github.com/miekg/dns v1.1.55/go.mod h1:uInx36IzPl7FYnDcMeVWxj9byh7DutNykX4G9Sj60FY= +github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g= +github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc= github.com/mitchellh/go-wordwrap v1.0.0/go.mod h1:ZXFpozHsX6DPmq2I0TCekCxypsnAUbP2oI0UX1GXzOo= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= @@ -216,6 +220,16 @@ github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOl github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= +github.com/nats-io/jwt/v2 v2.5.2 h1:DhGH+nKt+wIkDxM6qnVSKjokq5t59AZV5HRcFW0zJwU= +github.com/nats-io/jwt/v2 v2.5.2/go.mod h1:24BeQtRwxRV8ruvC4CojXlx/WQ/VjuwlYiH+vu/+ibI= +github.com/nats-io/nats-server/v2 v2.10.3 h1:nk2QVLpJUh3/AhZCJlQdTfj2oeLDvWnn1Z6XzGlNFm0= +github.com/nats-io/nats-server/v2 v2.10.3/go.mod h1:lzrskZ/4gyMAh+/66cCd+q74c6v7muBypzfWhP/MAaM= +github.com/nats-io/nats.go v1.30.2 h1:aloM0TGpPorZKQhbAkdCzYDj+ZmsJDyeo3Gkbr72NuY= +github.com/nats-io/nats.go v1.30.2/go.mod h1:dcfhUgmQNN4GJEfIb2f9R7Fow+gzBF4emzDHrVBd5qM= +github.com/nats-io/nkeys v0.4.5 h1:Zdz2BUlFm4fJlierwvGK+yl20IAKUm7eV6AAZXEhkPk= +github.com/nats-io/nkeys v0.4.5/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= @@ -332,6 +346,8 @@ golang.org/x/crypto v0.0.0-20190923035154-9ee001bba392/go.mod h1:/lpIB1dKB+9EgE3 golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.13.0 h1:mvySKfSWJ+UKUii46M40LOvyWfN0s2U+46/jDd0e6Ck= +golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc= +golang.org/x/exp v0.0.0-20230905200255-921286631fa9 h1:GoHiUyI/Tp2nVkLI2mCxVkOjsbSXD66ic0XW0js0R9g= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= @@ -367,6 +383,7 @@ golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/readme.md b/readme.md index 1e628e0..d10bb4a 100644 --- a/readme.md +++ b/readme.md @@ -4,7 +4,7 @@ [![codecov](https://codecov.io/gh/Tochemey/ego/branch/main/graph/badge.svg?token=Z5b9gM6Mnt)](https://codecov.io/gh/Tochemey/ego) [![GitHub tag (with filter)](https://img.shields.io/github/v/tag/tochemey/ego)](https://github.com/Tochemey/ego/tags) -eGo is a minimal library that help build event-sourcing and CQRS application through a simple interface, and it allows developers to describe their commands, events and states are defined using google protocol buffers. +eGo is a Golang library that help build event-sourcing and CQRS application through a simple interface, and it allows developers to describe their commands, events and states are defined using google protocol buffers. Under the hood, ego leverages [goakt](https://github.com/Tochemey/goakt) to scale out and guarantee performant, reliable persistence. ### Features