diff --git a/fleetspeak/src/client/client.go b/fleetspeak/src/client/client.go index b3f94adc..ac307350 100644 --- a/fleetspeak/src/client/client.go +++ b/fleetspeak/src/client/client.go @@ -74,8 +74,9 @@ type Client struct { outHigh chan service.AckMessage outMedium chan service.AckMessage outLow chan service.AckMessage - // used to wait until the retry loop goroutines are done + // used to wait until the retry and sort loop goroutines are done retryLoopsDone sync.WaitGroup + sortLoopDone sync.WaitGroup acks chan common.MessageID errs chan *fspb.MessageErrorData @@ -149,7 +150,11 @@ func New(cfg config.Configuration, cmps Components) (*Client, error) { if f == nil { f = flow.NewFilter() } - go message.SortLoop(ret.outUnsorted, ret.outbox, f) + ret.sortLoopDone.Add(1) + go func() { + message.SortLoop(ret.outUnsorted, ret.outbox, f) + ret.sortLoopDone.Done() + }() ssd := &serviceData{ config: ret.sc, @@ -258,33 +263,49 @@ func (c *Client) ProcessMessage(ctx context.Context, am service.AckMessage) (err // Stop shuts the client down gracefully. This includes stopping all communicators and services. func (c *Client) Stop() { + log.Info("Stopping client...") if c.com != nil { c.com.Stop() } c.sc.Stop() c.config.Stop() - close(c.outLow) - close(c.outMedium) - close(c.outHigh) + log.Info("Components have been stopped.") + // From here, shutdown is a little subtle: // - // 1) At this point, the communicator is off, so nothing else should be - // draining outbox. We do this ourselves and Ack everything so that the - // RetryLoops are guaranteed to terminate. + // - At this point, the communicator is off, so nothing else should be + // draining outbox. We do this ourselves and Ack everything so that the + // RetryLoops are guaranteed to terminate. // - // 2) The fake Acks in 1) are safe because the config manager is stopped. - // This means that client services are shut down and the Acks will not be - // reported outside of this process. + // - The fake Acks in 1) are safe because the config manager is stopped. + // This means that client services are shut down and the Acks will not be + // reported outside of this process. + + close(c.outLow) + close(c.outMedium) + close(c.outHigh) + c.retryLoopsDone.Wait() + log.Info("Retry loops have terminated.") + + // - Now, no more messages enter outUnsorted. // - // 3) Then we close outUnsorted so that the SortLoop terminates. - for { - select { - case m := <-c.outbox: - m.Ack() - default: - c.retryLoopsDone.Wait() - close(c.outUnsorted) - return + // - We close outUnsorted and drain outbox, to make sure no messages are lost. + // Once these two things are complete, SortLoop will return, and the client + // can be shut down. + + done := make(chan struct{}) + close(c.outUnsorted) + go func() { + for { + select { + case m := <-c.outbox: + m.Ack() + case <-done: + return + } } - } + }() + c.sortLoopDone.Wait() + done <- struct{}{} + log.Info("Messages have been drained.") } diff --git a/fleetspeak/src/client/internal/message/retry.go b/fleetspeak/src/client/internal/message/retry.go index 31c5dd74..e3356e0d 100644 --- a/fleetspeak/src/client/internal/message/retry.go +++ b/fleetspeak/src/client/internal/message/retry.go @@ -49,6 +49,7 @@ func RetryLoop(in <-chan service.AckMessage, out chan<- comms.MessageInfo, stats if sm.m.Ack != nil { sm.m.Ack() } + stats.MessageAcknowledged(sm.m.M, sm.size) acks <- sm }, Nack: func() { nacks <- sm }, @@ -68,10 +69,9 @@ func RetryLoop(in <-chan service.AckMessage, out chan<- comms.MessageInfo, stats case sm := <-acks: size -= sm.size count-- - stats.MessageAcknowledged(sm.m.M, sm.size) case sm := <-nacks: - out <- makeInfo(sm) stats.BeforeMessageRetry(sm.m.M) + out <- makeInfo(sm) case m, ok := <-optIn: if !ok { return diff --git a/fleetspeak/src/client/internal/message/sort.go b/fleetspeak/src/client/internal/message/sort.go index c15b6a5f..5f2614ca 100644 --- a/fleetspeak/src/client/internal/message/sort.go +++ b/fleetspeak/src/client/internal/message/sort.go @@ -24,14 +24,28 @@ import ( ) // SortLoop connects in and out in a sorted manner. That is, it acts essentially -// as a buffered channel between in and out which sorts any messages within -// it. Caller is responsible for implementing any needed size limit. Returns -// when "in" is closed. +// as a buffered channel between in and out which sorts any messages within it. +// The caller is responsible for implementing any needed size limit. +// SortLoop returns when in is closed, and the buffered messages have been +// drained through out, to make sure no messages are lost. func SortLoop(in <-chan comms.MessageInfo, out chan<- comms.MessageInfo, f *flow.Filter) { // Keep a slice of messages for each priority level. These are used as fifo - // queues, appending to the end and retreiving from head. + // queues, appending to the end and retrieving from head. var low, medium, high []comms.MessageInfo + // Block until all messages have been drained. + defer func() { + for _, mi := range high { + out <- mi + } + for _, mi := range medium { + out <- mi + } + for _, mi := range low { + out <- mi + } + }() + // Append a message to the correct list. appendMI := func(mi comms.MessageInfo) { switch mi.M.Priority {