-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathcreate.go
262 lines (233 loc) · 7.23 KB
/
create.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
package main
import (
"fmt"
"time"
"github.com/Sirupsen/logrus"
"github.com/codegangsta/cli"
)
// create host1:port1 ... hostN:portN
var createCommand = cli.Command{
Name: "create",
Usage: "create a new redis cluster.",
ArgsUsage: `<host1:port1 ... hostN:portN>`,
Description: `The create command create a redis cluster.`,
Flags: []cli.Flag{
cli.IntFlag{
Name: "replicas, r",
Value: 0,
Usage: `Slave number for every master created, the default value is none.
$ redis-trib create <--replicas 1> <host1:port1 ... hostN:portN>`,
},
},
Action: func(context *cli.Context) error {
if context.NArg() < 1 {
fmt.Printf("Incorrect Usage.\n\n")
cli.ShowCommandHelp(context, "create")
logrus.Fatalf("Must provide at least one \"host:port\" for create command!")
}
rt := NewRedisTrib()
if err := rt.CreateClusterCmd(context); err != nil {
return err
}
return nil
},
}
func (self *RedisTrib) CreateClusterCmd(context *cli.Context) error {
self.SetReplicasNum(context.Int("replicas"))
logrus.Printf(">>> Creating cluster")
for _, addr := range context.Args() {
if addr == "" {
continue
}
node := NewClusterNode(addr)
node.Connect(true)
if !node.AssertCluster() {
logrus.Fatalf("Node %s is not configured as a cluster node.", node.String())
}
node.LoadInfo(false)
node.AssertEmpty()
self.AddNode(node)
}
self.CheckCreateParameters()
logrus.Printf(">>> Performing hash slots allocation on %d nodes...", len(self.Nodes()))
self.AllocSlots()
self.ShowNodes()
YesOrDie("Can I set the above configuration?")
self.FlushNodesConfig()
logrus.Printf(">>> Nodes configuration updated")
logrus.Printf(">>> Assign a different config epoch to each node")
self.AssignConfigEpoch()
logrus.Printf(">>> Sending CLUSTER MEET messages to join the cluster")
self.JoinCluster()
// Give one second for the join to start, in order to avoid that
// wait_cluster_join will find all the nodes agree about the config as
// they are still empty with unassigned slots.
time.Sleep(time.Second * 1)
self.WaitClusterJoin()
self.FlushNodesConfig() // Useful for the replicas
self.CheckCluster(false)
return nil
}
func (self *RedisTrib) CheckCreateParameters() bool {
repOpt := self.ReplicasNum()
masters := len(self.Nodes()) / (repOpt + 1)
if masters < 3 {
logrus.Fatalf("*** ERROR: Invalid configuration for cluster creation.\n"+
"\t *** Redis Cluster requires at least 3 master nodes.\n"+
"\t *** This is not possible with %d nodes and %d replicas per node.\n"+
"\t *** At least %d nodes are required.", len(self.Nodes()), repOpt, 3*(repOpt+1))
}
return true
}
func (self *RedisTrib) FlushNodesConfig() {
for _, node := range self.Nodes() {
node.FlushNodeConfig()
}
}
func (self *RedisTrib) JoinCluster() {
var first *ClusterNode = nil
var addr string
for _, node := range self.Nodes() {
if first == nil {
first = node
addr = fmt.Sprintf("%s:%d", node.Host(), node.Port())
continue
}
node.ClusterAddNode(addr)
}
}
func (self *RedisTrib) AllocSlots() {
// TODO:
var masters [](*ClusterNode)
nodeNum := len(self.Nodes())
mastersNum := len(self.Nodes()) / (self.ReplicasNum() + 1)
// The first step is to split instances by IP. This is useful as
// we'll try to allocate master nodes in different physical machines
// (as much as possible) and to allocate slaves of a given master in
// different physical machines as well.
//
// This code assumes just that if the IP is different, than it is more
// likely that the instance is running in a different physical host
// or at least a different virtual machine.
var ips map[string][](*ClusterNode)
ips = make(map[string][](*ClusterNode))
for _, node := range self.Nodes() {
ips[node.Name()] = append(ips[node.Name()], node)
}
// Select master instances
logrus.Printf("Using %d masters:", mastersNum)
var interleaved [](*ClusterNode)
stop := false
for !stop {
// Take one node from each IP until we run out of nodes
// across every IP.
for name, nodes := range ips {
if len(nodes) == 0 {
// if this IP has no remaining nodes, check for termination
if len(interleaved) == nodeNum {
// stop when 'interleaved' has accumulated all nodes
stop = true
continue
}
} else {
// else, move one node from this IP to 'interleaved'
interleaved = append(interleaved, nodes[0])
ips[name] = nodes[1:]
}
}
}
masters = interleaved[:mastersNum]
interleaved = interleaved[mastersNum:]
nodeNum -= mastersNum
for _, node := range masters {
logrus.Printf(" -> %s", node.String())
}
// Alloc slots on masters
slotsPerNode := float64(ClusterHashSlots) / float64(mastersNum)
first := 0
cursor := 0.0
for index, node := range masters {
last := Round(cursor + slotsPerNode - 1)
if last > ClusterHashSlots || index == len(masters)-1 {
last = ClusterHashSlots - 1
}
if last < first {
last = first
}
node.AddSlots(first, last)
first = last + 1
cursor += slotsPerNode
}
// Select N replicas for every master.
// We try to split the replicas among all the IPs with spare nodes
// trying to avoid the host where the master is running, if possible.
//
// Note we loop two times. The first loop assigns the requested
// number of replicas to each master. The second loop assigns any
// remaining instances as extra replicas to masters. Some masters
// may end up with more than their requested number of replicas, but
// all nodes will be used.
assignVerbose := false
assignedReplicas := 0
var slave *ClusterNode
var node *ClusterNode
types := []string{"required", "unused"}
for _, assign := range types {
for _, m := range masters {
assignedReplicas = 0
for assignedReplicas < self.ReplicasNum() {
if nodeNum == 0 {
break
}
if assignVerbose {
if assign == "required" {
logrus.Printf("Requesting total of %d replicas (%d replicas assigned so far with %d total remaining).",
self.ReplicasNum(), assignedReplicas, nodeNum)
} else if assign == "unused" {
logrus.Printf("Assigning extra instance to replication role too (%d remaining).", nodeNum)
}
}
// Return the first node not matching our current master
index := getNodeFromSlice(m, interleaved)
if index != -1 {
node = interleaved[index]
} else {
node = nil
}
// If we found a node, use it as a best-first match.
// Otherwise, we didn't find a node on a different IP, so we
// go ahead and use a same-IP replica.
if node != nil {
slave = node
interleaved = append(interleaved[:index], interleaved[index+1:]...)
} else {
slave, interleaved = interleaved[0], interleaved[1:]
}
slave.SetReplicate(m.Name())
nodeNum -= 1
assignedReplicas += 1
logrus.Printf("Adding replica %s to %s", slave.String(), m.String())
// If we are in the "assign extra nodes" loop,
// we want to assign one extra replica to each
// master before repeating masters.
// This break lets us assign extra replicas to masters
// in a round-robin way.
if assign == "unused" {
break
}
}
}
}
return
}
func getNodeFromSlice(m *ClusterNode, nodes [](*ClusterNode)) (index int) {
if len(nodes) < 1 {
return -1
}
for i, node := range nodes {
if m.Host() != node.Host() {
return i
}
}
return -1
}