1
1
function Tress ( worker , concurrency ) { // function worker(job, done)
2
-
3
- if ( ! ( this instanceof Tress ) ) { return new Tress ( worker , concurrency ) ; }
2
+ if ( ! ( this instanceof Tress ) ) return new Tress ( worker , concurrency ) ;
4
3
5
4
if ( concurrency === 0 ) throw new Error ( 'Concurrency can not be 0' ) ;
6
- var _concurrency = concurrency > 0 ? concurrency : 1 ;
7
- var _delay = concurrency < 0 ? - concurrency : 0 ;
8
- var _buffer = _concurrency / 4 ;
9
- var _paused = false ;
10
- var _started = false ;
11
- var _queue = {
5
+ let _concurrency = concurrency > 0 ? concurrency : 1 ;
6
+ let _delay = concurrency < 0 ? - concurrency : 0 ;
7
+ let _buffer = _concurrency / 4 ;
8
+ let _paused = false ;
9
+ let _started = false ;
10
+ let _queue = {
12
11
waiting : [ ] ,
13
12
active : [ ] ,
14
13
failed : [ ] ,
15
- finished : [ ]
14
+ finished : [ ] ,
16
15
} ;
17
16
18
- var _onDrain = function ( ) { } ;
19
- var _onEmpty = function ( ) { } ;
20
- var _onSaturated = function ( ) { } ;
21
- var _onUnsaturated = function ( ) { } ;
22
- var _onError = function ( ) { } ;
23
- var _onSuccess = function ( ) { } ;
24
- var _onRetry = function ( ) { } ;
17
+ let _onDrain = function ( ) { } ;
18
+ let _onEmpty = function ( ) { } ;
19
+ let _onSaturated = function ( ) { } ;
20
+ let _onUnsaturated = function ( ) { } ;
21
+ let _onError = function ( ) { } ;
22
+ let _onSuccess = function ( ) { } ;
23
+ let _onRetry = function ( ) { } ;
25
24
26
- var _startJob = function ( delayable ) {
25
+ const _startJob = function ( delayable ) {
27
26
if ( _queue . waiting . length === 0 && _queue . active . length === 0 ) _onDrain ( ) ;
28
27
29
- if ( _paused || _queue . active . length >= _concurrency || _queue . waiting . length === 0 ) return ;
28
+ if ( _paused || _queue . active . length >= _concurrency || _queue . waiting . length === 0 ) return ;
30
29
31
- var job = _queue . waiting . shift ( ) ;
30
+ const job = _queue . waiting . shift ( ) ;
32
31
if ( _queue . waiting . length === 0 ) _onEmpty ( ) ;
33
32
34
33
_queue . active . push ( job ) ;
35
34
if ( _queue . active . length === _concurrency ) _onSaturated ( ) ;
36
35
37
- var doneCalled = false ;
36
+ let doneCalled = false ;
38
37
39
- setTimeout ( worker , delayable ? _delay : 0 , job . data , function ( err ) {
38
+ setTimeout ( worker , delayable ? _delay : 0 , job . data , function ( err , ... args ) {
40
39
if ( doneCalled ) {
41
40
throw new Error ( 'Too many callback calls in worker' ) ;
42
41
} else {
43
42
doneCalled = true ;
44
43
}
45
- _queue . active = _queue . active . filter ( ( v ) => v !== job ) ;
46
- if ( _queue . active . length <= _concurrency - this . buffer ) _onUnsaturated ( ) ;
47
- if ( typeof err === 'boolean' ) {
44
+ _queue . active = _queue . active . filter ( v => v !== job ) ;
45
+ if ( _queue . active . length <= _concurrency - this . buffer ) _onUnsaturated ( ) ;
46
+ if ( typeof err === 'boolean' ) {
48
47
_queue . waiting [ err ? 'unshift' : 'push' ] ( job ) ;
49
- _onRetry . apply ( job . data , Array . prototype . slice . call ( arguments , 1 ) ) ;
48
+ _onRetry . call ( job . data , ... args ) ;
50
49
} else {
51
50
_queue [ err ? 'failed' : 'finished' ] . push ( job ) ;
52
- job . callback && job . callback . apply ( job . data , arguments ) ;
53
- err && _onError . apply ( job . data , [ err , job . data ] . concat ( Array . prototype . slice . call ( arguments , 1 ) ) ) ;
54
- ! err && _onSuccess . apply ( job . data , Array . prototype . slice . call ( arguments , 1 ) ) ;
51
+ if ( job . callback ) job . callback . call ( job . data , err , ... args ) ;
52
+ if ( err ) _onError . call ( job . data , err , job . data , ... args ) ;
53
+ if ( ! err ) _onSuccess . call ( job . data , ... args ) ;
55
54
}
56
55
_startJob ( true ) ;
57
56
} ) ;
58
57
59
58
_startJob ( ) ;
60
59
} ;
61
60
62
- var _addJob = function ( job , callback , prior ) {
61
+ const _addJob = function ( job , callback , prior ) {
63
62
_started = true ;
64
63
callback = _set ( callback ) ;
65
- var jobType = Object . prototype . toString . call ( job ) . slice ( 8 , - 1 ) ;
64
+ const jobType = Object . prototype . toString . call ( job ) . slice ( 8 , - 1 ) ;
66
65
switch ( jobType ) {
67
- case 'Array' :
68
- for ( var i = 0 ; i < job . length ; i ++ ) {
69
- _addJob ( job [ i ] , callback , prior ) ;
70
- }
71
- return ;
72
- case 'Function' :
73
- case 'Undefined' :
74
- throw new Error ( 'Unable to add ' + jobType + ' to queue' ) ;
66
+ case 'Array' :
67
+ for ( let i = 0 ; i < job . length ; i ++ ) {
68
+ _addJob ( job [ i ] , callback , prior ) ;
69
+ }
70
+ return ;
71
+ case 'Function' :
72
+ case 'Undefined' :
73
+ throw new Error ( `Unable to add ${ jobType } to queue` ) ;
74
+ default : break ;
75
75
}
76
- var jobObject = {
76
+ const jobObject = {
77
77
data : job ,
78
- callback : callback
79
- }
80
- if ( prior ) {
78
+ callback,
79
+ } ;
80
+ if ( prior ) {
81
81
_queue . waiting . unshift ( jobObject ) ;
82
82
} else {
83
83
_queue . waiting . push ( jobObject ) ;
@@ -86,88 +86,123 @@ function Tress(worker, concurrency){ // function worker(job, done)
86
86
setTimeout ( _startJob , 0 ) ;
87
87
} ;
88
88
89
- var _push = ( job , callback ) => _addJob ( job , callback ) ;
90
- var _unshift = ( job , callback ) => _addJob ( job , callback , true ) ;
91
- var _length = ( ) => _queue . waiting . length ;
92
- var _running = ( ) => _queue . active . length ;
93
- var _workersList = ( ) => _queue . active ;
94
- var _idle = ( ) => _queue . waiting . length + _queue . active . length === 0 ;
95
- var _pause = ( ) => _paused = true ;
96
- var _resume = ( ) => {
89
+ const _push = ( job , callback ) => _addJob ( job , callback ) ;
90
+ const _unshift = ( job , callback ) => _addJob ( job , callback , true ) ;
91
+ const _length = ( ) => _queue . waiting . length ;
92
+ const _running = ( ) => _queue . active . length ;
93
+ const _workersList = ( ) => _queue . active ;
94
+ const _idle = ( ) => _queue . waiting . length + _queue . active . length === 0 ;
95
+ const _pause = ( ) => {
96
+ _paused = true ;
97
+ } ;
98
+ const _resume = ( ) => {
97
99
_paused = false ;
98
100
_startJob ( ) ;
99
101
} ;
100
- var _kill = ( ) => {
102
+ const _kill = ( ) => {
101
103
_onDrain = function ( ) { } ;
102
104
_queue . waiting = [ ] ;
103
105
} ;
104
- var _save = ( callback ) => callback ( {
105
- waiting : _queue . waiting . slice ( ) . concat ( _queue . active ) . map ( ( v ) => v . data ) ,
106
- failed : _queue . failed . slice ( ) . map ( ( v ) => v . data ) ,
107
- finished : _queue . finished . slice ( ) . map ( ( v ) => v . data )
106
+ const _save = callback => callback ( {
107
+ waiting : _queue . waiting . slice ( ) . concat ( _queue . active ) . map ( v => v . data ) ,
108
+ failed : _queue . failed . slice ( ) . map ( v => v . data ) ,
109
+ finished : _queue . finished . slice ( ) . map ( v => v . data ) ,
108
110
} ) ;
109
- var _load = ( data ) => {
110
- if ( _started ) throw new Error ( 'Unable to load data after queue started' ) ;
111
- var mapper = ( v ) => { return { data : v , callback : _set ( ) } } ;
111
+ const _load = data => {
112
+ if ( _started ) throw new Error ( 'Unable to load data after queue started' ) ;
113
+ const mapper = v => ( { data : v , callback : _set ( ) } ) ;
112
114
_queue = {
113
115
waiting : data . waiting . map ( mapper ) ,
114
116
active : [ ] ,
115
117
failed : data . failed . map ( mapper ) ,
116
- finished : data . finished . map ( mapper )
118
+ finished : data . finished . map ( mapper ) ,
117
119
} ;
118
- ! _paused && _startJob ( ) ;
120
+ if ( ! _paused ) _startJob ( ) ;
119
121
} ;
120
- var _status = ( job ) => {
122
+ const _status = job =>
121
123
_queue . waiting . indexOf ( job ) >= 0 ? 'waiting' :
122
- _queue . active . indexOf ( job ) >= 0 ? 'active' :
123
- _queue . finished . indexOf ( job ) >= 0 ? 'finished' :
124
- _queue . failed . indexOf ( job ) >= 0 ? 'failed' :
125
- 'missing'
126
- } ;
127
-
128
- Object . defineProperty ( this , 'drain' , { set : ( f ) => { _onDrain = _set ( f ) ; } } ) ;
129
- Object . defineProperty ( this , 'empty' , { set : ( f ) => { _onEmpty = _set ( f ) ; } } ) ;
130
- Object . defineProperty ( this , 'saturated' , { set : ( f ) => { _onSaturated = _set ( f ) ; } } ) ;
131
- Object . defineProperty ( this , 'unsaturated' , { set : ( f ) => { _onUnsaturated = _set ( f ) ; } } ) ;
132
- Object . defineProperty ( this , 'error' , { set : ( f ) => { _onError = _set ( f ) ; } } ) ;
133
- Object . defineProperty ( this , 'success' , { set : ( f ) => { _onSuccess = _set ( f ) ; } } ) ;
134
- Object . defineProperty ( this , 'retry' , { set : ( f ) => { _onRetry = _set ( f ) ; } } ) ;
124
+ _queue . active . indexOf ( job ) >= 0 ? 'active' :
125
+ _queue . finished . indexOf ( job ) >= 0 ? 'finished' :
126
+ _queue . failed . indexOf ( job ) >= 0 ? 'failed' :
127
+ 'missing' ;
128
+
129
+ Object . defineProperty ( this , 'drain' , {
130
+ set : f => {
131
+ _onDrain = _set ( f ) ;
132
+ } ,
133
+ } ) ;
134
+ Object . defineProperty ( this , 'empty' , {
135
+ set : f => {
136
+ _onEmpty = _set ( f ) ;
137
+ } ,
138
+ } ) ;
139
+ Object . defineProperty ( this , 'saturated' , {
140
+ set : f => {
141
+ _onSaturated = _set ( f ) ;
142
+ } ,
143
+ } ) ;
144
+ Object . defineProperty ( this , 'unsaturated' , {
145
+ set : f => {
146
+ _onUnsaturated = _set ( f ) ;
147
+ } ,
148
+ } ) ;
149
+ Object . defineProperty ( this , 'error' , {
150
+ set : f => {
151
+ _onError = _set ( f ) ;
152
+ } ,
153
+ } ) ;
154
+ Object . defineProperty ( this , 'success' , {
155
+ set : f => {
156
+ _onSuccess = _set ( f ) ;
157
+ } ,
158
+ } ) ;
159
+ Object . defineProperty ( this , 'retry' , {
160
+ set : f => {
161
+ _onRetry = _set ( f ) ;
162
+ } ,
163
+ } ) ;
135
164
Object . defineProperty ( this , 'concurrency' , {
136
- get : ( ) => ( _delay > 0 ? - _delay : _concurrency ) ,
137
- set : ( v ) => {
165
+ get : ( ) => _delay > 0 ? - _delay : _concurrency ,
166
+ set : v => {
138
167
_concurrency = v > 0 ? v : 1 ;
139
168
_delay = v < 0 ? - v : 0 ;
140
- }
169
+ } ,
141
170
} ) ;
142
- Object . defineProperty ( this , 'paused' , { get : ( ) => _paused } ) ;
143
- Object . defineProperty ( this , 'started' , { get : ( ) => _started } ) ;
144
- Object . defineProperty ( this , 'waiting' , { get : ( ) => _queue . waiting } ) ;
145
- Object . defineProperty ( this , 'active' , { get : ( ) => _queue . active } ) ;
146
- Object . defineProperty ( this , 'failed' , { get : ( ) => _queue . failed } ) ;
147
- Object . defineProperty ( this , 'finished' , { get : ( ) => _queue . finished } ) ;
148
-
149
- Object . defineProperty ( this , 'push' , { get : ( ) => _push } ) ;
150
- Object . defineProperty ( this , 'unshift' , { get : ( ) => _unshift } ) ;
151
- Object . defineProperty ( this , 'length' , { get : ( ) => _length } ) ;
152
- Object . defineProperty ( this , 'running' , { get : ( ) => _running } ) ;
153
- Object . defineProperty ( this , 'workersList' , { get : ( ) => _workersList } ) ;
154
- Object . defineProperty ( this , 'idle' , { get : ( ) => _idle } ) ;
155
- Object . defineProperty ( this , 'buffer' , { get : ( ) => _buffer , set : ( v ) => {
156
- if ( typeof v === 'number' ) { _buffer = v ; } else { throw new Error ( 'Buffer must be a number' ) ; }
157
- } } ) ;
158
- Object . defineProperty ( this , 'pause' , { get : ( ) => _pause } ) ;
159
- Object . defineProperty ( this , 'resume' , { get : ( ) => _resume } ) ;
160
- Object . defineProperty ( this , 'kill' , { get : ( ) => _kill } ) ;
161
- Object . defineProperty ( this , 'save' , { get : ( ) => _save } ) ;
162
- Object . defineProperty ( this , 'load' , { get : ( ) => _load } ) ;
163
- Object . defineProperty ( this , 'status' , { get : ( ) => _status } ) ;
164
-
171
+ Object . defineProperty ( this , 'paused' , { get : ( ) => _paused } ) ;
172
+ Object . defineProperty ( this , 'started' , { get : ( ) => _started } ) ;
173
+ Object . defineProperty ( this , 'waiting' , { get : ( ) => _queue . waiting } ) ;
174
+ Object . defineProperty ( this , 'active' , { get : ( ) => _queue . active } ) ;
175
+ Object . defineProperty ( this , 'failed' , { get : ( ) => _queue . failed } ) ;
176
+ Object . defineProperty ( this , 'finished' , { get : ( ) => _queue . finished } ) ;
177
+
178
+ Object . defineProperty ( this , 'push' , { get : ( ) => _push } ) ;
179
+ Object . defineProperty ( this , 'unshift' , { get : ( ) => _unshift } ) ;
180
+ Object . defineProperty ( this , 'length' , { get : ( ) => _length } ) ;
181
+ Object . defineProperty ( this , 'running' , { get : ( ) => _running } ) ;
182
+ Object . defineProperty ( this , 'workersList' , { get : ( ) => _workersList } ) ;
183
+ Object . defineProperty ( this , 'idle' , { get : ( ) => _idle } ) ;
184
+ Object . defineProperty ( this , 'buffer' , {
185
+ get : ( ) => _buffer ,
186
+ set : v => {
187
+ if ( typeof v === 'number' ) {
188
+ _buffer = v ;
189
+ } else {
190
+ throw new Error ( 'Buffer must be a number' ) ;
191
+ }
192
+ } ,
193
+ } ) ;
194
+ Object . defineProperty ( this , 'pause' , { get : ( ) => _pause } ) ;
195
+ Object . defineProperty ( this , 'resume' , { get : ( ) => _resume } ) ;
196
+ Object . defineProperty ( this , 'kill' , { get : ( ) => _kill } ) ;
197
+ Object . defineProperty ( this , 'save' , { get : ( ) => _save } ) ;
198
+ Object . defineProperty ( this , 'load' , { get : ( ) => _load } ) ;
199
+ Object . defineProperty ( this , 'status' , { get : ( ) => _status } ) ;
165
200
}
166
201
167
202
module . exports = Tress ;
168
203
169
204
function _set ( v ) {
170
- if ( v === undefined || v === null ) return function ( ) { } ;
171
- if ( typeof v === 'function' ) return v ;
205
+ if ( v === undefined || v === null ) return function ( ) { } ;
206
+ if ( typeof v === 'function' ) return v ;
172
207
throw new Error ( 'Type must be function' ) ;
173
208
}
0 commit comments