Skip to content

Commit

Permalink
refactor(flow): hide mutex methods in window flows (#138)
Browse files Browse the repository at this point in the history
  • Loading branch information
reugn authored Aug 17, 2024
1 parent fe0876e commit af55736
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 15 deletions.
10 changes: 5 additions & 5 deletions flow/session_window.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
// Session windows do not overlap and do not have a fixed start and end time.
// T indicates the incoming element type, and the outgoing element type is []T.
type SessionWindow[T any] struct {
sync.Mutex
mu sync.Mutex
inactivityGap time.Duration
in chan any
out chan any
Expand Down Expand Up @@ -74,9 +74,9 @@ func (sw *SessionWindow[T]) transmit(inlet streams.Inlet) {
// It resets the inactivity timer on each new element.
func (sw *SessionWindow[T]) receive() {
for element := range sw.in {
sw.Lock()
sw.mu.Lock()
sw.buffer = append(sw.buffer, element.(T))
sw.Unlock()
sw.mu.Unlock()
sw.notifyTimerReset() // signal to reset the inactivity timer
}
close(sw.done)
Expand Down Expand Up @@ -121,10 +121,10 @@ func (sw *SessionWindow[T]) emit() {
// dispatchWindow creates a window from buffered elements and resets the buffer.
// It sends the slice of elements to the output channel if the window is not empty.
func (sw *SessionWindow[T]) dispatchWindow() {
sw.Lock()
sw.mu.Lock()
windowElements := sw.buffer
sw.buffer = nil
sw.Unlock()
sw.mu.Unlock()

// send elements if the window is not empty
if len(windowElements) > 0 {
Expand Down
10 changes: 5 additions & 5 deletions flow/sliding_window.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type timedElement[T any] struct {
// In this case elements are assigned to multiple windows.
// T indicates the incoming element type, and the outgoing element type is []T.
type SlidingWindow[T any] struct {
sync.Mutex
mu sync.Mutex
windowSize time.Duration
slidingInterval time.Duration
queue []timedElement[T]
Expand Down Expand Up @@ -126,13 +126,13 @@ func (sw *SlidingWindow[T]) timestamp(element T) int64 {
// wrapping the original item into a timedElement along with its timestamp.
func (sw *SlidingWindow[T]) receive() {
for element := range sw.in {
sw.Lock()
sw.mu.Lock()
timed := timedElement[T]{
element: element.(T),
timestamp: sw.timestamp(element.(T)),
}
sw.queue = append(sw.queue, timed)
sw.Unlock()
sw.mu.Unlock()
}
close(sw.done)
}
Expand Down Expand Up @@ -162,7 +162,7 @@ func (sw *SlidingWindow[T]) emit() {
// dispatchWindow is responsible for sending the elements in the current
// window to the output channel and moving the window to the next position.
func (sw *SlidingWindow[T]) dispatchWindow(tick time.Time) {
sw.Lock()
sw.mu.Lock()

// sort elements in the queue by their timestamp
sort.Slice(sw.queue, func(i, j int) bool {
Expand All @@ -186,7 +186,7 @@ func (sw *SlidingWindow[T]) dispatchWindow(tick time.Time) {
// move the window
sw.queue = nextWindowQueue

sw.Unlock()
sw.mu.Unlock()

// send elements downstream if the current window is not empty
if len(windowElements) > 0 {
Expand Down
10 changes: 5 additions & 5 deletions flow/tumbling_window.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
// Tumbling windows have a fixed size and do not overlap.
// T indicates the incoming element type, and the outgoing element type is []T.
type TumblingWindow[T any] struct {
sync.Mutex
mu sync.Mutex
windowSize time.Duration
in chan any
out chan any
Expand Down Expand Up @@ -71,9 +71,9 @@ func (tw *TumblingWindow[T]) transmit(inlet streams.Inlet) {
// receive buffers the incoming elements.
func (tw *TumblingWindow[T]) receive() {
for element := range tw.in {
tw.Lock()
tw.mu.Lock()
tw.buffer = append(tw.buffer, element.(T))
tw.Unlock()
tw.mu.Unlock()
}
close(tw.done)
}
Expand All @@ -99,10 +99,10 @@ func (tw *TumblingWindow[T]) emit() {
// dispatchWindow creates a window from buffered elements and resets the buffer.
// It sends the slice of elements to the output channel if the window is not empty.
func (tw *TumblingWindow[T]) dispatchWindow() {
tw.Lock()
tw.mu.Lock()
windowElements := tw.buffer
tw.buffer = nil
tw.Unlock()
tw.mu.Unlock()

// send elements if the window is not empty
if len(windowElements) > 0 {
Expand Down

0 comments on commit af55736

Please sign in to comment.