1
1
from collections import defaultdict
2
-
3
- try :
4
- import ujson as json
5
- except ImportError :
6
- import json
7
-
2
+ import json
8
3
from flask import Flask
9
4
import gevent
5
+ from gevent .event import AsyncResult
6
+ from gevent .timeout import Timeout
10
7
from gevent import monkey
11
- from gevent .queue import Channel
12
- from gevent .queue import Empty
13
8
import os
14
9
import logging
15
10
import redis
33
28
plugin = load_plugin (plugin_path )
34
29
application .register_blueprint (plugin )
35
30
36
- subscriptions = defaultdict (Channel )
31
+ subscriptions = defaultdict (AsyncResult )
37
32
38
33
class RedisSub (gevent .Greenlet ):
39
34
"""
@@ -69,9 +64,9 @@ def handle_message(self, message):
69
64
except :
70
65
logger .exception ('unable to parse the message %r' % message )
71
66
else :
72
- gevent_channel = subscriptions [channel ]
73
- while gevent_channel . getters :
74
- gevent_channel . put_nowait ( data )
67
+ async_result = subscriptions [channel ]
68
+ async_result . set ( data )
69
+ del subscriptions [ channel ]
75
70
76
71
def subscribe (self ):
77
72
connection = self .get_redis_connection ()
@@ -95,10 +90,8 @@ def subscribe(channel):
95
90
timeout = application .config ['LONGPOLLING_TIMEOUT' ]
96
91
try :
97
92
message = subscriptions [channel ].get (timeout = timeout )
98
- except Empty :
93
+ except Timeout :
99
94
message = application .config ['TIMEOUT_RESPONSE_MESSAGE' ]
100
- finally :
101
- del subscriptions [channel ]
102
95
return message
103
96
104
97
@application .before_first_request
@@ -112,7 +105,8 @@ def start_subscribe_loop():
112
105
gevent .spawn (pubsub .start )
113
106
114
107
def main ():
115
- application .run ()
108
+ from gevent .wsgi import WSGIServer
109
+ WSGIServer (('' , 5000 ), application ).serve_forever ()
116
110
117
111
if __name__ == '__main__' :
118
112
main ()
0 commit comments