Skip to content

Commit

Permalink
updates conn manager to handle more logging
Browse files Browse the repository at this point in the history
  • Loading branch information
nickzelei committed Dec 2, 2024
1 parent eab0e9f commit c1fff42
Show file tree
Hide file tree
Showing 11 changed files with 83 additions and 92 deletions.
2 changes: 1 addition & 1 deletion backend/pkg/sqlmanager/sql-manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (s *SqlManager) NewSqlConnection(
return nil, err
}
closer := func() {
s.config.mgr.ReleaseSession(session)
s.config.mgr.ReleaseSession(session, slogger)
}

switch connection.GetConnectionConfig().GetConfig().(type) {
Expand Down
2 changes: 1 addition & 1 deletion cli/internal/cmds/neosync/sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ func (c *clisync) configureAndRunSync() error {
c.sourceConnection = sourceConnection

defer func() {
c.connmanager.ReleaseSession(c.session)
c.connmanager.ReleaseSession(c.session, c.logger)
}()

destConnection := cmdConfigToDestinationConnection(c.cmd)
Expand Down
88 changes: 37 additions & 51 deletions internal/connection-manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
)

type ConnectionProvider[T any] interface {
GetConnectionClient(connectionConfig *mgmtv1alpha1.ConnectionConfig) (T, error)
GetConnectionClient(connectionConfig *mgmtv1alpha1.ConnectionConfig, logger *slog.Logger) (T, error)
CloseClientConnection(client T) error
}

Expand All @@ -28,6 +28,8 @@ type ConnectionManager[T any] struct {
groupConnMap map[string]map[string]T

shutdown chan any

isReaping bool
}

type ConnectionInput interface {
Expand All @@ -37,10 +39,9 @@ type ConnectionInput interface {

type Interface[T any] interface {
GetConnection(session SessionInterface, connection ConnectionInput, logger *slog.Logger) (T, error)
ReleaseSession(session SessionInterface) bool
ReleaseSessionGroup(grouper SessionGroupInterface) bool
Shutdown()
Reaper()
ReleaseSession(session SessionInterface, logger *slog.Logger) bool
Shutdown(logger *slog.Logger)
Reaper(logger *slog.Logger)
}

var _ Interface[any] = &ConnectionManager[any]{}
Expand Down Expand Up @@ -81,8 +82,9 @@ func NewConnectionManager[T any](
groupSessionMap: map[string]map[string]map[string]struct{}{},
groupConnMap: map[string]map[string]T{},
config: cfg,
shutdown: make(chan any),
shutdown: make(chan any, 1),
mu: &sync.Mutex{},
isReaping: false,
}
}

Expand Down Expand Up @@ -111,7 +113,7 @@ func (c *ConnectionManager[T]) GetConnection(
logger.Debug("no cached connection found, creating new connection client")

// Create new connection
connectionClient, err := c.connectionProvider.GetConnectionClient(connection.GetConnectionConfig())
connectionClient, err := c.connectionProvider.GetConnectionClient(connection.GetConnectionConfig(), logger)
if err != nil {
var result T
return result, err
Expand Down Expand Up @@ -139,18 +141,21 @@ func (c *ConnectionManager[T]) ensureSessionMapsExist(groupId, sessionId string)
}
}

func (c *ConnectionManager[T]) ReleaseSession(session SessionInterface) bool {
func (c *ConnectionManager[T]) ReleaseSession(session SessionInterface, logger *slog.Logger) bool {
c.mu.Lock()
defer c.mu.Unlock()
groupId := session.Group()
sessionId := session.Name()
logger = logger.With("session", session.String())
groupSessions, groupExists := c.groupSessionMap[groupId]
if !groupExists {
logger.Debug("session group not found during release")
return false
}

sessionConns, sessionExists := groupSessions[sessionId]
if !sessionExists || len(sessionConns) == 0 {
logger.Debug("session not found in group during release")
return false
}

Expand All @@ -159,46 +164,20 @@ func (c *ConnectionManager[T]) ReleaseSession(session SessionInterface) bool {

// If this was the last session in the group, clean up the group
if len(groupSessions) == 0 {
logger.Debug("cleaning up group, last session found")
delete(c.groupSessionMap, groupId)
}

if c.config.closeOnRelease {
logger.Debug("close on release is enabled, pruning connections that are not bound to any sessions in the group")
remainingConns := getUniqueConnectionIdsFromGroupSessions(groupSessions)
c.closeSpecificGroupConnections(groupId, sessionConnIds, remainingConns)
c.closeSpecificGroupConnections(groupId, sessionConnIds, remainingConns, logger)
}
return true
}

func (c *ConnectionManager[T]) ReleaseSessionGroup(grouper SessionGroupInterface) bool {
c.mu.Lock()
defer c.mu.Unlock()

groupId := grouper.Group()

groupSessions, groupExists := c.groupSessionMap[groupId]
if !groupExists || len(groupSessions) == 0 {
return false
}

// Get all connection IDs that are in the group
connIds := make([]string, 0)
for _, sessionConns := range groupSessions {
connIds = append(connIds, getConnectionIds(sessionConns)...)
}

// Remove all sessions in the group
delete(c.groupSessionMap, groupId)

if c.config.closeOnRelease {
// Since we're removing the entire group, there are no remaining connections
c.closeSpecificGroupConnections(groupId, connIds, make(map[string]struct{}))
}

return len(connIds) > 0
}

// does not handle locks as it assumes the parent caller holds the lock
func (c *ConnectionManager[T]) closeSpecificGroupConnections(groupId string, candidateConnIds []string, remainingConns map[string]struct{}) {
func (c *ConnectionManager[T]) closeSpecificGroupConnections(groupId string, candidateConnIds []string, remainingConns map[string]struct{}, logger *slog.Logger) {
groupConns, exists := c.groupConnMap[groupId]
if !exists {
return
Expand All @@ -207,8 +186,9 @@ func (c *ConnectionManager[T]) closeSpecificGroupConnections(groupId string, can
for _, connId := range candidateConnIds {
if _, stillInUse := remainingConns[connId]; !stillInUse {
if dbConn, exists := groupConns[connId]; exists {
logger.Debug(fmt.Sprintf("closing connection %q", connId))
if err := c.connectionProvider.CloseClientConnection(dbConn); err != nil {
slog.Error(fmt.Sprintf("unable to close client connection during release: %s", err.Error()))
logger.Error(fmt.Sprintf("unable to close client connection during release: %s", err.Error()))
}
delete(groupConns, connId)
}
Expand All @@ -221,23 +201,29 @@ func (c *ConnectionManager[T]) closeSpecificGroupConnections(groupId string, can
}
}

func (c *ConnectionManager[T]) Shutdown() {
func (c *ConnectionManager[T]) Shutdown(logger *slog.Logger) {
c.shutdown <- struct{}{}
if !c.isReaping {
logger.Debug("reaper is not turned on, hard closing")
c.hardClose(logger)
} else {
logger.Debug("sent shutdown signal to reaper")
}
}

func (c *ConnectionManager[T]) Reaper() {
func (c *ConnectionManager[T]) Reaper(logger *slog.Logger) {
for {
select {
case <-c.shutdown:
c.hardClose()
c.hardClose(logger)
return
case <-time.After(c.config.reapDuration):
c.cleanUnusedConnections()
c.cleanUnusedConnections(logger)
}
}
}

func (c *ConnectionManager[T]) cleanUnusedConnections() {
func (c *ConnectionManager[T]) cleanUnusedConnections(logger *slog.Logger) {
c.mu.Lock()
defer c.mu.Unlock()

Expand All @@ -249,18 +235,18 @@ func (c *ConnectionManager[T]) cleanUnusedConnections() {
for session := range sessions {
groupSessions = append(groupSessions, session)
}
slog.Debug(fmt.Sprintf("[ConnectionManager][Reaper] group %q with sessions %s", groupId, strings.Join(groupSessions, ",")))
logger.Debug(fmt.Sprintf("[ConnectionManager][Reaper] group %q with sessions %s", groupId, strings.Join(groupSessions, ",")))
}

for groupId, groupConns := range c.groupConnMap {
slog.Debug(fmt.Sprintf("[ConnectionManager][Reaper] checking group %q with %d connection(s)", groupId, len(groupConns)))
logger.Debug(fmt.Sprintf("[ConnectionManager][Reaper] checking group %q with %d connection(s)", groupId, len(groupConns)))
sessionConns := groupSessionConnections[groupId]
for connId, dbConn := range groupConns {
slog.Debug(fmt.Sprintf("[ConnectionManager][Reaper] checking group %q for connection %q", groupId, connId))
logger.Debug(fmt.Sprintf("[ConnectionManager][Reaper] checking group %q for connection %q", groupId, connId))
if _, ok := sessionConns[connId]; !ok {
slog.Debug(fmt.Sprintf("[ConnectionManager][Reaper] closing client connection: %q in group %q", connId, groupId))
logger.Debug(fmt.Sprintf("[ConnectionManager][Reaper] closing client connection: %q in group %q", connId, groupId))
if err := c.connectionProvider.CloseClientConnection(dbConn); err != nil {
slog.Warn(fmt.Sprintf("[ConnectionManager][Reaper] unable to fully close client connection %q in group %q during cleanup: %s", connId, groupId, err.Error()))
logger.Warn(fmt.Sprintf("[ConnectionManager][Reaper] unable to fully close client connection %q in group %q during cleanup: %s", connId, groupId, err.Error()))
}
delete(groupConns, connId)
}
Expand All @@ -271,15 +257,15 @@ func (c *ConnectionManager[T]) cleanUnusedConnections() {
}
}

func (c *ConnectionManager[T]) hardClose() {
func (c *ConnectionManager[T]) hardClose(logger *slog.Logger) {
c.mu.Lock()
defer c.mu.Unlock()

// Close all connections in all groups
for groupId, groupConns := range c.groupConnMap {
for connId, dbConn := range groupConns {
if err := c.connectionProvider.CloseClientConnection(dbConn); err != nil {
slog.Error(fmt.Sprintf("unable to fully close client connection during hard close: %s", err.Error()))
logger.Error(fmt.Sprintf("unable to fully close client connection during hard close: %s", err.Error()))
}
delete(groupConns, connId)
}
Expand Down
28 changes: 14 additions & 14 deletions internal/connection-manager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func Test_ConnectionTunnelManager_ReleaseSession(t *testing.T) {
provider := NewMockConnectionProvider[any](t)
mgr := NewConnectionManager(provider)

require.False(t, mgr.ReleaseSession(NewSession("111")), "currently no session")
require.False(t, mgr.ReleaseSession(NewSession("111"), testutil.GetTestLogger(t)), "currently no session")

conn := &mgmtv1alpha1.Connection{
Id: "1",
Expand All @@ -83,14 +83,14 @@ func Test_ConnectionTunnelManager_ReleaseSession(t *testing.T) {
_, err := mgr.GetConnection(NewSession("111"), conn, testutil.GetTestLogger(t))
require.NoError(t, err)

require.True(t, mgr.ReleaseSession(NewSession("111")), "released an existing session")
require.True(t, mgr.ReleaseSession(NewSession("111"), testutil.GetTestLogger(t)), "released an existing session")
}

func Test_ConnectionTunnelManager_cleanUnusedConnections(t *testing.T) {
provider := NewMockConnectionProvider[any](t)
mgr := NewConnectionManager(provider)

require.False(t, mgr.ReleaseSession(NewSession("111")), "currently no session")
require.False(t, mgr.ReleaseSession(NewSession("111"), testutil.GetTestLogger(t)), "currently no session")

conn := &mgmtv1alpha1.Connection{
Id: "1",
Expand All @@ -108,10 +108,10 @@ func Test_ConnectionTunnelManager_cleanUnusedConnections(t *testing.T) {
require.NoError(t, err)

require.NotEmpty(t, mgr.groupConnMap, "has an active connection")
mgr.cleanUnusedConnections()
mgr.cleanUnusedConnections(testutil.GetTestLogger(t))
require.NotEmpty(t, mgr.groupConnMap, "not empty due to active session")
require.True(t, mgr.ReleaseSession(NewSession("111")), "released an existing session")
mgr.cleanUnusedConnections()
require.True(t, mgr.ReleaseSession(NewSession("111"), testutil.GetTestLogger(t)), "released an existing session")
mgr.cleanUnusedConnections(testutil.GetTestLogger(t))
require.Empty(t, mgr.groupConnMap, "now empty due to no active sessions")
}

Expand All @@ -120,7 +120,7 @@ func Test_ConnectionTunnelManager_hardClose(t *testing.T) {
mgr := NewConnectionManager(provider)

session := NewSession("111")
require.False(t, mgr.ReleaseSession(session), "currently no session")
require.False(t, mgr.ReleaseSession(session, testutil.GetTestLogger(t)), "currently no session")

conn := &mgmtv1alpha1.Connection{
Id: "1",
Expand All @@ -137,7 +137,7 @@ func Test_ConnectionTunnelManager_hardClose(t *testing.T) {
require.NoError(t, err)

require.NotEmpty(t, mgr.groupConnMap, "has an active connection")
mgr.hardClose()
mgr.hardClose(testutil.GetTestLogger(t))
require.Empty(t, mgr.groupConnMap, "now empty due to no active sessions")
}

Expand Down Expand Up @@ -201,12 +201,12 @@ func Test_ConnectionManager_CloseOnRelease_Option(t *testing.T) {
require.NoError(t, err)

// Release session1 - conn1 should stay alive due to session2, but conn2 should be closed
require.True(t, mgr.ReleaseSession(session1))
require.True(t, mgr.ReleaseSession(session1, testutil.GetTestLogger(t)))
require.Len(t, mgr.groupConnMap[""], 1) // Only conn1 should still exist
provider.AssertNumberOfCalls(t, "CloseClientConnection", 1) // Only conn2 should be closed

// Release session2 - conn1 should now be closed
require.True(t, mgr.ReleaseSession(session2))
require.True(t, mgr.ReleaseSession(session2, testutil.GetTestLogger(t)))
require.Empty(t, mgr.groupConnMap) // All connections should be closed
provider.AssertNumberOfCalls(t, "CloseClientConnection", 2)
}
Expand Down Expand Up @@ -280,7 +280,7 @@ func Test_ConnectionManager_Error_During_Close(t *testing.T) {
require.NoError(t, err)

// Even with close error, session and connection should be removed
require.True(t, mgr.ReleaseSession(NewSession("session1")))
require.True(t, mgr.ReleaseSession(NewSession("session1"), testutil.GetTestLogger(t)))
require.Empty(t, mgr.groupConnMap)
require.Empty(t, mgr.groupSessionMap)
}
Expand Down Expand Up @@ -310,7 +310,7 @@ func Test_ConnectionManager_Concurrent_GetConnection_And_Release(t *testing.T) {
if err != nil {
return err
}
mgr.ReleaseSession(NewSession(sessionId))
mgr.ReleaseSession(NewSession(sessionId), testutil.GetTestLogger(t))
}
return nil
})
Expand Down Expand Up @@ -347,10 +347,10 @@ func Test_ConnectionManager_Reaper_With_Active_Sessions(t *testing.T) {
// Create and release session with conn2
_, err = mgr.GetConnection(NewSession("session2"), conn2, testutil.GetTestLogger(t))
require.NoError(t, err)
require.True(t, mgr.ReleaseSession(NewSession("session2")))
require.True(t, mgr.ReleaseSession(NewSession("session2"), testutil.GetTestLogger(t)))

// Run reaper
mgr.cleanUnusedConnections()
mgr.cleanUnusedConnections(testutil.GetTestLogger(t))

// Verify conn2 was cleaned up but conn1 remains
require.Len(t, mgr.groupConnMap, 1)
Expand Down
Loading

0 comments on commit c1fff42

Please sign in to comment.