@@ -2,6 +2,19 @@ defmodule HashRing.Worker do
2
2
@ moduledoc false
3
3
use GenServer
4
4
5
+ @ erpc_timeout 500
6
+ @ node_readiness_check_interval :timer . seconds ( 1 )
7
+
8
+ defstruct [
9
+ :table ,
10
+ :node_blacklist ,
11
+ :node_whitelist ,
12
+ :wait_for_readiness ,
13
+ :readiness_deps_set
14
+ ]
15
+
16
+ alias __MODULE__ , as: State
17
+
5
18
def nodes ( pid_or_name )
6
19
7
20
def nodes ( pid ) when is_pid ( pid ) do
@@ -14,7 +27,7 @@ defmodule HashRing.Worker do
14
27
|> get_ring ( )
15
28
|> HashRing . nodes ( )
16
29
rescue
17
- ArgumentError ->
30
+ ArgumentError ->
18
31
{ :error , :no_such_ring }
19
32
end
20
33
@@ -69,55 +82,135 @@ defmodule HashRing.Worker do
69
82
nodes = [ Node . self ( ) | Node . list ( :connected ) ]
70
83
node_blacklist = Keyword . get ( options , :node_blacklist , [ ~r/ ^remsh.*$/ , ~r/ ^rem-.*$/ ] )
71
84
node_whitelist = Keyword . get ( options , :node_whitelist , [ ] )
85
+ wait_for_readiness = Keyword . get ( options , :wait_for_readiness , false )
86
+ readiness_deps_set = Keyword . get ( options , :readiness_deps , [ ] ) |> MapSet . new ( )
72
87
73
88
ring =
74
89
Enum . reduce ( nodes , ring , fn node , acc ->
75
- cond do
76
- HashRing.Utils . ignore_node? ( node , node_blacklist , node_whitelist ) ->
77
- acc
78
-
79
- :else ->
90
+ if HashRing.Utils . ignore_node? ( node , node_blacklist , node_whitelist ) do
91
+ acc
92
+ else
93
+ if wait_for_readiness do
94
+ if node_ready? ( node , readiness_deps_set ) do
95
+ HashRing . add_node ( acc , node )
96
+ else
97
+ schedule_check_for_node_readiness ( node )
98
+ acc
99
+ end
100
+ else
80
101
HashRing . add_node ( acc , node )
102
+ end
81
103
end
82
104
end )
83
105
84
106
node_type = Keyword . get ( options , :node_type , :all )
85
107
:ok = :net_kernel . monitor_nodes ( true , node_type: node_type )
86
108
true = :ets . insert_new ( table , { :ring , ring } )
87
- { :ok , { table , node_blacklist , node_whitelist } }
109
+
110
+ { :ok ,
111
+ % State {
112
+ table: table ,
113
+ node_blacklist: node_blacklist ,
114
+ node_whitelist: node_whitelist ,
115
+ wait_for_readiness: wait_for_readiness ,
116
+ readiness_deps_set: readiness_deps_set
117
+ } }
88
118
89
119
:else ->
90
120
nodes = Keyword . get ( options , :nodes , [ ] )
91
121
ring = HashRing . add_nodes ( ring , nodes )
92
122
true = :ets . insert_new ( table , { :ring , ring } )
93
- { :ok , { table , [ ] , [ ] } }
123
+
124
+ { :ok ,
125
+ % State {
126
+ table: table ,
127
+ node_blacklist: [ ] ,
128
+ node_whitelist: [ ] ,
129
+ wait_for_readiness: false ,
130
+ readiness_deps_set: MapSet . new ( )
131
+ } }
94
132
end
95
133
end
96
134
97
- def handle_call ( :list_nodes , _from , { table , _b , _w } = state ) do
135
+ def handle_call ( :list_nodes , _from , % State { table: table } = state ) do
98
136
{ :reply , HashRing . nodes ( get_ring ( table ) ) , state }
99
137
end
100
138
101
- def handle_call ( { :key_to_node , key } , _from , { table , _b , _w } = state ) do
139
+ def handle_call ( { :key_to_node , key } , _from , % State { table: table } = state ) do
102
140
{ :reply , HashRing . key_to_node ( get_ring ( table ) , key ) , state }
103
141
end
104
142
105
- def handle_call ( { :add_node , node } , _from , { table , _b , _w } = state ) do
106
- get_ring ( table ) |> HashRing . add_node ( node ) |> update_ring ( table )
143
+ def handle_call (
144
+ { :add_node , node } ,
145
+ _from ,
146
+ % State {
147
+ table: table ,
148
+ wait_for_readiness: wait_for_readiness ,
149
+ readiness_deps_set: readiness_deps_set
150
+ } = state
151
+ ) do
152
+ if wait_for_readiness and not node_ready? ( node , readiness_deps_set ) do
153
+ schedule_check_for_node_readiness ( node )
154
+ else
155
+ get_ring ( table ) |> HashRing . add_node ( node ) |> update_ring ( table )
156
+ end
157
+
107
158
{ :reply , :ok , state }
108
159
end
109
160
110
- def handle_call ( { :add_node , node , weight } , _from , { table , _b , _w } = state ) do
111
- get_ring ( table ) |> HashRing . add_node ( node , weight ) |> update_ring ( table )
161
+ def handle_call (
162
+ { :add_node , node , weight } ,
163
+ _from ,
164
+ % State {
165
+ table: table ,
166
+ wait_for_readiness: wait_for_readiness ,
167
+ readiness_deps_set: readiness_deps_set
168
+ } = state
169
+ ) do
170
+ if wait_for_readiness and not node_ready? ( node , readiness_deps_set ) do
171
+ schedule_check_for_node_readiness ( { node , weight } )
172
+ else
173
+ get_ring ( table ) |> HashRing . add_node ( node , weight ) |> update_ring ( table )
174
+ end
175
+
112
176
{ :reply , :ok , state }
113
177
end
114
178
115
- def handle_call ( { :add_nodes , nodes } , _from , { table , _b , _w } = state ) do
116
- get_ring ( table ) |> HashRing . add_nodes ( nodes ) |> update_ring ( table )
179
+ def handle_call (
180
+ { :add_nodes , nodes } ,
181
+ _from ,
182
+ % State {
183
+ table: table ,
184
+ wait_for_readiness: wait_for_readiness ,
185
+ readiness_deps_set: readiness_deps_set
186
+ } = state
187
+ ) do
188
+ if wait_for_readiness do
189
+ % { true: ready_nodes , false: starting_nodes } =
190
+ Enum . group_by (
191
+ nodes ,
192
+ fn
193
+ { node , _weight } ->
194
+ node_ready? ( node , readiness_deps_set )
195
+
196
+ node ->
197
+ node_ready? ( node , readiness_deps_set )
198
+ end
199
+ )
200
+
201
+ get_ring ( table ) |> HashRing . add_nodes ( ready_nodes ) |> update_ring ( table )
202
+
203
+ for starting_node <- starting_nodes do
204
+ schedule_check_for_node_readiness ( starting_node )
205
+ end
206
+ else
207
+ get_ring ( table ) |> HashRing . add_nodes ( nodes ) |> update_ring ( table )
208
+ end
209
+
117
210
{ :reply , :ok , state }
118
211
end
119
212
120
- def handle_call ( { :remove_node , node } , _from , { table , _b , _w } = state ) do
213
+ def handle_call ( { :remove_node , node } , _from , % State { table: table } = state ) do
121
214
get_ring ( table ) |> HashRing . remove_node ( node ) |> update_ring ( table )
122
215
{ :reply , :ok , state }
123
216
end
@@ -127,19 +220,62 @@ defmodule HashRing.Worker do
127
220
{ :stop , :shutdown , state }
128
221
end
129
222
130
- def handle_info ( { :nodeup , node , _info } , { table , b , w } = state ) do
223
+ def handle_info (
224
+ { :nodeup , node , _info } ,
225
+ % State {
226
+ table: table ,
227
+ node_blacklist: b ,
228
+ node_whitelist: w ,
229
+ wait_for_readiness: wait_for_readiness ,
230
+ readiness_deps_set: readiness_deps_set
231
+ } = state
232
+ ) do
131
233
unless HashRing.Utils . ignore_node? ( node , b , w ) do
132
- get_ring ( table ) |> HashRing . add_node ( node ) |> update_ring ( table )
234
+ if wait_for_readiness and not node_ready? ( node , readiness_deps_set ) do
235
+ schedule_check_for_node_readiness ( node )
236
+ else
237
+ get_ring ( table ) |> HashRing . add_node ( node ) |> update_ring ( table )
238
+ end
133
239
end
134
240
135
241
{ :noreply , state }
136
242
end
137
243
138
- def handle_info ( { :nodedown , node , _info } , state = { table , _b , _w } ) do
244
+ def handle_info ( { :nodedown , node , _info } , % State { table: table } = state ) do
139
245
get_ring ( table ) |> HashRing . remove_node ( node ) |> update_ring ( table )
140
246
{ :noreply , state }
141
247
end
142
248
249
+ def handle_info (
250
+ { :check_node_readiness , node , weight } ,
251
+ % State { table: table , readiness_deps_set: readiness_deps_set } = state
252
+ ) do
253
+ if node_ready? ( node , readiness_deps_set ) do
254
+ get_ring ( table ) |> HashRing . add_node ( node , weight ) |> update_ring ( table )
255
+ else
256
+ schedule_check_for_node_readiness ( { node , weight } )
257
+ end
258
+
259
+ { :noreply , state }
260
+ end
261
+
262
+ def handle_info (
263
+ { :check_node_readiness , node } ,
264
+ % State { table: table , readiness_deps_set: readiness_deps_set } = state
265
+ ) do
266
+ if node_ready? ( node , readiness_deps_set ) do
267
+ get_ring ( table ) |> HashRing . add_node ( node ) |> update_ring ( table )
268
+ else
269
+ schedule_check_for_node_readiness ( node )
270
+ end
271
+
272
+ { :noreply , state }
273
+ end
274
+
275
+ def handle_info ( _msg , state ) do
276
+ { :noreply , state }
277
+ end
278
+
143
279
defp get_ets_name ( name ) , do: :"libring_#{ name } "
144
280
145
281
defp do_call ( pid_or_name , msg )
@@ -160,6 +296,33 @@ defmodule HashRing.Worker do
160
296
161
297
defp get_ring ( table ) , do: :ets . lookup_element ( table , :ring , 2 )
162
298
163
- defp update_ring ( ring , table ) ,
299
+ defp update_ring ( ring , table ) ,
164
300
do: :ets . update_element ( table , :ring , { 2 , ring } )
301
+
302
+ defp get_started_apps_set ( node ) do
303
+ try do
304
+ :erpc . call ( node , Application , :started_applications , [ ] , @ erpc_timeout )
305
+ |> Enum . map ( & elem ( & 1 , 0 ) )
306
+ |> MapSet . new ( )
307
+ rescue
308
+ _e -> MapSet . new ( )
309
+ end
310
+ end
311
+
312
+ defp node_ready? ( node , readiness_deps_set ) do
313
+ MapSet . difference ( readiness_deps_set , get_started_apps_set ( node ) )
314
+ |> MapSet . equal? ( MapSet . new ( ) )
315
+ end
316
+
317
+ defp schedule_check_for_node_readiness ( { node , weight } ) do
318
+ if node in Node . list ( ) do
319
+ :timer . send_after ( @ node_readiness_check_interval , { :check_node_readiness , node , weight } )
320
+ end
321
+ end
322
+
323
+ defp schedule_check_for_node_readiness ( node ) do
324
+ if node in Node . list ( ) do
325
+ :timer . send_after ( @ node_readiness_check_interval , { :check_node_readiness , node } )
326
+ end
327
+ end
165
328
end
0 commit comments