-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathgraph.go
163 lines (144 loc) · 4.25 KB
/
graph.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
package graphblast
import (
"bufio"
"io"
"strconv"
"time"
)
type Range struct {
Min Countable
Max Countable
}
func (r *Range) Contains(c Countable) bool {
return c >= r.Min && c <= r.Max
}
func (r *Range) MarshalJSON() ([]byte, error) {
return []byte("null"), nil
}
type Graph interface {
Changed(int) (bool, int)
Read(io.Reader) error
// TODO Make it possible to determine and send deltas
}
// NewGraphFromType returns an unconfigured graph object for the type of graph
// corresponding to graphType, or nil if there is no type of graph with that
// name.
func NewGraphFromType(graphType string) Graph {
switch graphType {
case "logfile":
return NewLogFile()
case "timeseries":
return NewTimeSeries()
case "scatterplot":
return NewScatterPlot()
case "histogram":
return NewHistogram()
default:
return nil
}
}
type Graphs struct {
named map[string]Graph
changed map[string]int
}
// GraphRequest sequences modifications to an internal collection of Graphs.
type GraphRequest func(*Graphs, Subscribers)
// CreateGraph adds a new Graph to a collection.
func CreateGraph(name string, graph Graph) GraphRequest {
return func(graphs *Graphs, subs Subscribers) {
graphs.named[name] = graph
body := map[string]string{"name": name}
subs.Send(NewJSONMessage("__created", body))
}
}
// CompleteGraph notifies subscribers that a Graph is no longer being updated,
// possibly due to an error.
func CompleteGraph(name string, err error) GraphRequest {
return func(graphs *Graphs, subs Subscribers) {
if err != nil {
graphs.changed[name] = 0
body := map[string]string{"name": name, "reason": err.Error()}
subs.Send(NewJSONMessage("__completed", body))
}
}
}
// DumpGraphs sends all Graphs in a collection to a single subscriber.
func DumpGraphs(subscriber string) GraphRequest {
return func(graphs *Graphs, subs Subscribers) {
for name, graph := range graphs.named {
CreateGraph(name, graph)(graphs, subs)
subs.Send(NewJSONMessageTo([]string{subscriber}, name, graph))
}
}
}
// NotifyChanges sends all Graphs that have changed (since the last call to
// NotifyChanges) to all subscribers.
func NotifyChanges() GraphRequest {
return func(graphs *Graphs, subs Subscribers) {
for name, graph := range graphs.named {
changed, indicator := graph.Changed(graphs.changed[name])
graphs.changed[name] = indicator
if !changed {
continue
}
subs.Send(NewJSONMessage(name, graph))
}
}
}
// ProcessGraphRequests maintains an internal collection of Graphs, listens for
// GraphRequests, and applies them to the collection.
func ProcessGraphRequests(requests <-chan GraphRequest, subs Subscribers) {
graphs := &Graphs{make(map[string]Graph), make(map[string]int)}
for requestFunc := range requests {
requestFunc(graphs, subs)
}
}
// PeriodicallyNotifyChanges sends a NotifyChanges request on a channel every n
// seconds.
func PeriodicallyNotifyChanges(requests chan<- GraphRequest, seconds int) {
updateFreq := time.Duration(seconds) * time.Second
for _ = range time.Tick(updateFreq) {
requests <- NotifyChanges()
}
}
// PopulateGraph is a convenience function for updating a Graph from an input
// stream, sending creation and completion requests at the appropriate times.
func PopulateGraph(name string, graph Graph, input io.Reader, requests chan<- GraphRequest) {
requests <- CreateGraph(name, graph)
var err error
defer func() {
requests <- CompleteGraph(name, err)
}()
err = graph.Read(input)
}
// The type of the items to parse from stdin and count in the histogram.
type Countable float64
// Parses a countable value from a string, and returns a non-nil error if
// parsing fails.
func Parse(str string) (Countable, error) {
d, err := strconv.ParseFloat(str, 64)
return Countable(d), err
}
// Returns the bucket (as a string) of which the countable value should
// increment the count, given the bucket size.
func (d Countable) Bucket(size int) string {
if size <= 0 {
size = 1
}
if d < 0 {
d -= Countable(size)
}
return strconv.Itoa(int(d) / size * size)
}
func doRead(input io.Reader, process func(string)) error {
Log("starting to read data")
reader := bufio.NewReader(input)
for {
line, err := reader.ReadString('\n')
if err != nil {
Log("finished reading data due to %v", err)
return err
}
process(line)
}
}