-
Notifications
You must be signed in to change notification settings - Fork 21
/
Copy pathworker_service.py
75 lines (61 loc) · 2.06 KB
/
worker_service.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import os
import sys
from redis import Redis
from rq import Connection, Queue
from trader.batch.non_fork_worker import NonForkWorker
from trader.cli.command_line import cli_norepl, common_options, default_config
from trader.common.helpers import rich_dict
from trader.common.logging_helper import setup_logging
from typing import Dict, Optional
import click
import nest_asyncio
nest_asyncio.apply()
logging = setup_logging(module_name='worker_service')
class WorkerService():
def __init__(
self,
redis_server_address: str,
redis_server_port: int,
work_queue: str
):
self.redis_server_address = redis_server_address
self.redis_server_port = redis_server_port
self.work_queue = work_queue
self.redis_conn: Optional[Redis] = None
def _connect(self):
self.redis_conn = Redis(host=self.redis_server_address, port=self.redis_server_port)
def start(self):
logging.debug('starting worker_service')
if not self.redis_conn: self._connect()
queue = Queue(connection=self.redis_conn, name=self.work_queue)
with Connection():
w = NonForkWorker([queue], connection=self.redis_conn)
w.work()
def list_queues(self) -> Dict[str, int]:
logging.debug('list_queues')
if not self.redis_conn: self._connect()
return {q.name: q.count for q in Queue.all(connection=self.redis_conn)}
@cli_norepl.command()
@click.option('--queue_name', required=True, help='name of RQ job queue to use, eg: history')
@common_options()
@default_config()
def start(
redis_server_address: str,
redis_server_port: int,
queue_name: str,
**args
):
service = WorkerService(redis_server_address, redis_server_port, queue_name)
service.start()
@cli_norepl.command()
@common_options()
@default_config()
def list(
redis_server_address: str,
redis_server_port: int,
**args,
):
service = WorkerService(redis_server_address, redis_server_port, '')
rich_dict(service.list_queues())
if __name__ == '__main__':
cli_norepl(obj={})