Skip to content

Commit

Permalink
group: sync result collection. #47
Browse files Browse the repository at this point in the history
  • Loading branch information
fgeller committed Mar 27, 2017
1 parent d6b6ce5 commit 06cbab2
Showing 1 changed file with 39 additions and 14 deletions.
53 changes: 39 additions & 14 deletions group.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,7 @@ func (cmd *groupCmd) run(args []string, q chan struct{}) {
for _, grp := range groups {
for _, top := range topics {
go func(grp, topic string) {
target := group{Name: grp, Topic: topic, Offsets: map[int32]groupOffset{}}
cmd.fetchGroupTopicOffset(target.Offsets, grp, topic)
if len(target.Offsets) > 0 {
buf, _ := json.Marshal(target)
fmt.Println(string(buf))
}
cmd.printGroupTopicOffset(grp, topic)
wg.Done()
}(grp, top)
}
Expand All @@ -114,19 +109,47 @@ func (cmd *groupCmd) run(args []string, q chan struct{}) {
}
}

func (cmd *groupCmd) fetchGroupTopicOffset(target map[int32]groupOffset, grp, top string) {
func (cmd *groupCmd) printGroupTopicOffset(grp, top string) {
target := group{Name: grp, Topic: top, Offsets: map[int32]groupOffset{}}
results := make(chan groupOffsetResult)
done := make(chan struct{})
parts := []int32{cmd.partition}
if cmd.partition == -23 {
parts = cmd.fetchPartitions(top)
}
wg := &sync.WaitGroup{}
wg.Add(len(parts))
for _, part := range parts {
go cmd.fetchGroupOffset(wg, target, grp, top, part)
go cmd.fetchGroupOffset(wg, grp, top, part, results)
}
go func() { wg.Wait(); close(done) }()

awaitGroupOffsets:
for {
select {
case res := <-results:
if !res.empty {
target.Offsets[res.partition] = groupOffset{Offset: res.offset, Lag: res.lag}
}
case <-done:
break awaitGroupOffsets
}
}
wg.Wait()

if len(target.Offsets) > 0 {
buf, _ := json.Marshal(target)
fmt.Println(string(buf))
}
}

type groupOffsetResult struct {
partition int32
offset int64
lag int64
empty bool
}
func (cmd *groupCmd) fetchGroupOffset(wg *sync.WaitGroup, target map[int32]groupOffset, grp, top string, part int32) {

func (cmd *groupCmd) fetchGroupOffset(wg *sync.WaitGroup, grp, top string, part int32, results chan groupOffsetResult) {
var (
err error
groupOff int64
Expand All @@ -139,13 +162,17 @@ func (cmd *groupCmd) fetchGroupOffset(wg *sync.WaitGroup, target map[int32]group
if offsetManager, err = sarama.NewOffsetManagerFromClient(grp, cmd.client); err != nil {
failf("failed to create client err=%v", err)
}
defer logClose("offset manager", offsetManager)

pom, err := offsetManager.ManagePartition(top, part)
if err != nil {
failf("failed to manage partition group=%s topic=%s partition=%d err=%v", grp, top, part, err)
}
defer logClose("partition offset manager", pom)

groupOff, _ = pom.NextOffset()
if groupOff == sarama.OffsetNewest {
// no offset recorded yet
results <- groupOffsetResult{empty: true}
return
}

Expand All @@ -158,9 +185,7 @@ func (cmd *groupCmd) fetchGroupOffset(wg *sync.WaitGroup, target map[int32]group
failf("failed to read partition offset for topic=%s partition=%d err=%v", top, part, err)
}

target[part] = groupOffset{Offset: groupOff, Lag: partOff - groupOff}
logClose("partition offset manager", pom)
logClose("offset manager", offsetManager)
results <- groupOffsetResult{partition: part, offset: groupOff, lag: partOff - groupOff}
}

func (cmd *groupCmd) fetchTopics() []string {
Expand Down

0 comments on commit 06cbab2

Please sign in to comment.