Skip to content

Commit ceb29db

Browse files
author
liubaohai
committed
add close zk fun
1 parent 8a68a79 commit ceb29db

File tree

8 files changed

+47
-27
lines changed

8 files changed

+47
-27
lines changed

cgrouparch/manager.py

+3-6
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
from pykit import threadutil
99
from pykit import utfjson
10+
from pykit import zkutil
1011
from pykit.cgrouparch import account
1112
from pykit.cgrouparch import cgroup_manager
1213
from pykit.cgrouparch import communicate
@@ -85,11 +86,7 @@ def init_arch_conf(context):
8586
except Exception as e:
8687
logger.warn('failed to get arch conf from zk: %s' % repr(e))
8788

88-
try:
89-
context['zk_client'].stop()
90-
except Exception as e:
91-
logger.info('failed to stop zk client: ' + repr(e))
92-
89+
zkutil.close_zk(context['zk_client'])
9390
context['zk_client'] = None
9491
time.sleep(10)
9592

@@ -129,7 +126,7 @@ def get_cgexec_arg(cgroup_names, **argkv):
129126
zk_path = '%s/arch_conf' % context['zk_prefix']
130127
resp = zk_client.get(zk_path)
131128

132-
zk_client.stop()
129+
zkutil.close_zk(zk_client)
133130

134131
context['arch_conf'] = {
135132
'version': resp[1].version,

zktx/zktx.py

+1-4
Original file line numberDiff line numberDiff line change
@@ -411,10 +411,7 @@ def _close(self):
411411
self.modifications = {}
412412

413413
if self.owning_zk:
414-
try:
415-
self.zke.stop()
416-
except KazooException as e:
417-
logger.info(repr(e) + ' while zkclient.stop()')
414+
zkutil.close_zk(self.zke)
418415

419416
def release_all_key_locks(self):
420417

zkutil/README.md

+17
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
- [zkutil.parse_kazoo_acl](#zkutilparse_kazoo_acl)
1717
- [zkutil.perm_to_long](#zkutilperm_to_long)
1818
- [zkutil.perm_to_short](#zkutilperm_to_short)
19+
- [zkutil.close_zk](#zkutilclose_zk)
1920
- [Conditioned access methods](#conditioned-access-methods)
2021
- [zkutil.get_next](#zkutilget_next)
2122
- [zkutil.wait_absent](#zkutilwait_absent)
@@ -334,6 +335,22 @@ such as `cdrw`.
334335
a string of short permissions.
335336
336337
338+
## zkutil.close_zk
339+
340+
**syntax**:
341+
`zkutil.close_zk(zk)`
342+
343+
Stop and close a zk client.
344+
345+
**arguments**:
346+
347+
- `zk`:
348+
a `KazooClient` or `KazooClientExt` instance, otherwise raise a `TypeError`.
349+
350+
**return**:
351+
nothing
352+
353+
337354
# Conditioned access methods
338355
339356
A collection of helper functions those block until a condition satisfied before

zkutil/__init__.py

+2
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
from .zkutil import (
1717
PermTypeError,
1818

19+
close_zk,
1920
init_hierarchy,
2021
export_hierarchy,
2122
is_backward_locking,
@@ -55,6 +56,7 @@
5556

5657
"kazoo_client_ext",
5758

59+
"close_zk",
5860
"init_hierarchy",
5961
"export_hierarchy",
6062
"is_backward_locking",

zkutil/cached_reader.py

+2-5
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@
55
import threading
66

77
from . import zkconf
8+
from . import zkutil
89
from . import exceptions
9-
from kazoo.exceptions import KazooException
1010

1111
logger = logging.getLogger(__name__)
1212

@@ -57,10 +57,7 @@ def close(self):
5757

5858
self.zke.remove_listener(self.conn_change_cb)
5959
if self.owning_zk:
60-
try:
61-
self.zke.stop()
62-
except KazooException as e:
63-
logger.exception(repr(e) + ' while close kazoo client')
60+
zkutil.close_zk(self.zke)
6461

6562
def _on_conn_change(self, state):
6663
logger.info('state changed: {state}'.format(state=state))

zkutil/zkacid.py

+2-4
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from pykit import txutil
66

77
from .zkconf import kazoo_client_ext
8+
from . import zkutil
89

910
logger = logging.getLogger(__name__)
1011

@@ -22,7 +23,4 @@ def setter(path, val, zstat):
2223
yield curr
2324
finally:
2425
if owning_zk:
25-
try:
26-
zkclient.stop()
27-
except Exception as e:
28-
logger.info(repr(e) + ' while stop owning zkclient')
26+
zkutil.close_zk(zkclient)

zkutil/zklock.py

+1-7
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
import time
77

88
from kazoo.client import KazooClient
9-
from kazoo.exceptions import KazooException
109
from kazoo.exceptions import LockTimeout
1110
from kazoo.exceptions import NodeExistsError
1211
from kazoo.exceptions import NoNodeError
@@ -213,13 +212,8 @@ def close(self):
213212
self.zkclient.remove_listener(self.on_connection_change)
214213

215214
if self.owning_client:
216-
217215
logger.info('zk client is made by me, close it')
218-
219-
try:
220-
self.zkclient.stop()
221-
except KazooException as e:
222-
logger.info(repr(e) + ' while stop my own client')
216+
zkutil.close_zk(self.zkclient)
223217

224218
def is_locked(self):
225219

zkutil/zkutil.py

+19-1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from kazoo import security
1111
from kazoo.client import KazooClient
1212
from kazoo.exceptions import NoNodeError
13+
from kazoo.exceptions import KazooException
1314
from pykit import config
1415
from pykit import net
1516
from pykit import utfjson
@@ -47,6 +48,23 @@ class ZkPathError(Exception):
4748
pass
4849

4950

51+
def close_zk(zk):
52+
if not isinstance(zk, KazooClient):
53+
raise TypeError('expect KazooClient or KazooClientExt, but got {t}'.format(t=type(zk)))
54+
55+
try:
56+
zk.stop()
57+
58+
except KazooException as e:
59+
logger.exception(repr(e) + ' while stop zk client')
60+
61+
try:
62+
zk.close()
63+
64+
except Exception as e:
65+
logger.exception(repr(e) + ' while close zk client')
66+
67+
5068
def lock_data(node_id=None):
5169
# deprecated
5270
return lock_id(node_id=node_id)
@@ -251,7 +269,7 @@ def _init_hierarchy(hierarchy, parent_path):
251269
_init_hierarchy(children, path)
252270

253271
_init_hierarchy(hierarchy, '/')
254-
zkcli.stop()
272+
close_zk(zkcli)
255273

256274

257275
def _make_zk_path(*paths):

0 commit comments

Comments
 (0)