From 33e92de72c7ff622e81d3f76238f419144abc02f Mon Sep 17 00:00:00 2001 From: Mathias Brulatout Date: Fri, 25 Aug 2023 14:10:02 +0200 Subject: [PATCH] Drop tornado/twisted support Retargeting the project towards an async-only behavior. Future-Proofing: As asyncio is part of the Python standard library, it's likely to receive continued updates and improvements. By aligning with asyncio, we're ensuring that this project stays relevant and can benefit from the latest advancements in Python's asynchronous capabilities. This also aligns with community momentum which heavily shifted towards asyncio for async operations while providing simplified maintenance and reduced dependencies by dropping two major frameworks. --- consul/tornado.py | 57 ------- consul/twisted.py | 123 -------------- docs/index.rst | 50 ------ tests-requirements.txt | 3 - tests/test_base.py | 6 +- tests/test_tornado.py | 353 ----------------------------------------- tests/test_twisted.py | 306 ----------------------------------- 7 files changed, 3 insertions(+), 895 deletions(-) delete mode 100644 consul/tornado.py delete mode 100644 consul/twisted.py delete mode 100644 tests/test_tornado.py delete mode 100644 tests/test_twisted.py diff --git a/consul/tornado.py b/consul/tornado.py deleted file mode 100644 index 0ece898..0000000 --- a/consul/tornado.py +++ /dev/null @@ -1,57 +0,0 @@ -from tornado import gen, httpclient - -from consul import base - -__all__ = ["Consul"] - - -class HTTPClient(base.HTTPClient): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.client = httpclient.AsyncHTTPClient() - - def response(self, response): - return base.Response(response.code, response.headers, response.body.decode("utf-8")) - - @gen.coroutine - def _request(self, callback, request): - try: - response = yield self.client.fetch(request) - except httpclient.HTTPError as e: - if e.code == 599: - raise base.Timeout - response = e.response - raise gen.Return(callback(self.response(response))) - - def get(self, callback, path, params=None): - uri = self.uri(path, params) - return self._request(callback, uri) - - def put(self, callback, path, params=None, data=""): - uri = self.uri(path, params) - request = httpclient.HTTPRequest( - uri, method="PUT", body="" if data is None else data, validate_cert=self.verify - ) - return self._request(callback, request) - - def delete(self, callback, path, params=None): - uri = self.uri(path, params) - request = httpclient.HTTPRequest(uri, method="DELETE", validate_cert=self.verify) - return self._request(callback, request) - - def post(self, callback, path, params=None, data=""): - uri = self.uri(path, params) - request = httpclient.HTTPRequest(uri, method="POST", body=data, validate_cert=self.verify) - return self._request(callback, request) - - def close(self): - self.client.close() - - -class Consul(base.Consul): - def http_connect(self, host, port, scheme, verify=True, cert=None): - return HTTPClient(host, port, scheme, verify=verify, cert=cert) - - def close(self): - """Close all opened http connections""" - return self.http.close() diff --git a/consul/twisted.py b/consul/twisted.py deleted file mode 100644 index ae5657a..0000000 --- a/consul/twisted.py +++ /dev/null @@ -1,123 +0,0 @@ -# noinspection PyUnresolvedReferences -from treq.client import HTTPClient as TreqHTTPClient -from twisted.internet import reactor -from twisted.internet.defer import inlineCallbacks, returnValue -from twisted.internet.error import ConnectError -from twisted.internet.ssl import ClientContextFactory -from twisted.web._newclient import RequestTransmissionFailed, ResponseNeverReceived -from twisted.web.client import Agent, HTTPConnectionPool - -from consul import base -from consul.base import ConsulException - -__all__ = ["Consul"] - - -# noinspection PyClassHasNoInit -class InsecureContextFactory(ClientContextFactory): - """ - This is an insecure context factory implementation. Note that this is not - intended for production use. It is recommended either a treq/twisted - provided factory be used or a custom factory for this purpose. - - https://twistedmatrix.com/documents/current/core/howto/ssl.html - """ - - def getContext(self): - return ClientContextFactory.getContext(self) - - -class HTTPClient(base.HTTPClient): - def __init__(self, contextFactory, *args, **kwargs): - super().__init__(*args, **kwargs) - agent_kwargs = {"reactor": reactor, "pool": HTTPConnectionPool(reactor)} - if contextFactory is not None: - # use the provided context factory - agent_kwargs["contextFactory"] = contextFactory - elif not self.verify: - # if no context is provided and verify is set to false, use the - # insecure context factory implementation - agent_kwargs["contextFactory"] = InsecureContextFactory() - - self.client = TreqHTTPClient(Agent(**agent_kwargs)) - - @staticmethod - def response(code, headers, text): - return base.Response(code, headers, text) - - @staticmethod - def compat_string(value): - """ - Provide a python2/3 compatible string representation of the value - :type value: - :rtype : - """ - if isinstance(value, bytes): - return value.decode(encoding="utf-8") - return str(value) - - @inlineCallbacks - def _get_resp(self, response): - # Merge multiple header values as per RFC2616 - # http://www.w3.org/Protocols/rfc2616/rfc2616-sec4.html#sec4.2 - headers = { - self.compat_string(k): ",".join(map(self.compat_string, v)) - for k, v in dict(response.headers.getAllRawHeaders()).items() - } - body = yield response.text(encoding="utf-8") - returnValue((response.code, headers, body)) - - @inlineCallbacks - def request(self, callback, method, url, **kwargs): - if "data" in kwargs and not isinstance(kwargs["data"], bytes): - # python2/3 compatibility - data = kwargs.pop("data") - kwargs["data"] = data.encode(encoding="utf-8") if hasattr(data, "encode") else bytes(data) - - try: - response = yield self.client.request(method, url, **kwargs) - parsed = yield self._get_resp(response) - returnValue(callback(self.response(*parsed))) - except ConnectError as e: - raise ConsulException(f"{e.__class__.__name__}: {e.MESSAGE}") from e - except ResponseNeverReceived as exc: - # this exception is raised if the connection to the server is lost - # when yielding a response, this could be due to network issues or - # server restarts - raise ConsulException(f"Server connection lost: {method.upper()} {url}") from exc - except RequestTransmissionFailed as exc: - # this exception is expected if the reactor is stopped mid request - raise ConsulException(f"Request incomplete: {method.upper()} {url}") from exc - - @inlineCallbacks - def get(self, callback, path, params=None): - uri = self.uri(path, params) - response = yield self.request(callback, "get", uri, params=params) - returnValue(response) - - @inlineCallbacks - def put(self, callback, path, params=None, data=""): - uri = self.uri(path, params) - response = yield self.request(callback, "put", uri, data=data) - returnValue(response) - - @inlineCallbacks - def post(self, callback, path, params=None, data=""): - uri = self.uri(path, params) - response = yield self.request(callback, "post", uri, data=data) - returnValue(response) - - @inlineCallbacks - def delete(self, callback, path, params=None): - uri = self.uri(path, params) - response = yield self.request(callback, "delete", uri, params=params) - returnValue(response) - - @inlineCallbacks - def close(self): - pass - - -class Consul(base.Consul): - def http_connect(self, host, port, scheme, verify=True, cert=None, contextFactory=None, **kwargs): - return HTTPClient(contextFactory, host, port, scheme, verify=verify, cert=cert, **kwargs) diff --git a/docs/index.rst b/docs/index.rst index 68a4cc7..d1893ec 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -44,48 +44,6 @@ library from the python standard lib, including the `requests`_ library can be made non-blocking via monkey patching. This means the standard python-consul client will just work asynchronously with `gevent`_. -Tornado -~~~~~~~ - -There is a `Tornado`_ client which makes use of `gen.coroutine`_. The API for -this client is identical to the standard python-consul client except that you -need to *yield* the result of each API call. This client is available in -*consul.tornado*. - -.. code:: python - - from tornado.ioloop import IOLoop - from tornado.gen import coroutine - from consul.base import Timeout - from consul.tornado import Consul - - - class Config: - def __init__(self, loop): - self.foo = None - loop.add_callback(self.watch) - - @coroutine - def watch(self): - c = Consul() - - # asynchronously poll for updates - index = None - while True: - try: - index, data = yield c.kv.get('foo', index=index) - if data is not None: - self.foo = data['Value'] - except Timeout: - # gracefully handle request timeout - pass - - if __name__ == '__main__': - loop = IOLoop.instance() - _ = Config(loop) - loop.start() - - asyncio ~~~~~~~ @@ -124,11 +82,6 @@ result of each API call. This client is available in *consul.aio*. loop.run_until_complete(go()) -Wanted -~~~~~~ - -Adaptors for `Twisted`_ and a `thread pool`_ based adaptor. - Tools ----- @@ -306,12 +259,9 @@ Consul.txn .. _requests: http://python-requests.org .. _Vanilla: https://github.com/cablehead/vanilla .. _gevent: http://www.gevent.org -.. _Tornado: http://www.tornadoweb.org -.. _gen.coroutine: https://tornado.readthedocs.io/en/latest/gen.html .. _asyncio.coroutine: https://docs.python.org/3/library/asyncio-task.html#coroutines .. _aiohttp: https://github.com/KeepSafe/aiohttp .. _asyncio: https://docs.python.org/3/library/asyncio.html -.. _Twisted: https://twistedmatrix.com/trac/ .. _thread pool: https://docs.python.org/2/library/threading.html .. _ianitor: https://github.com/ClearcodeHQ/ianitor diff --git a/tests-requirements.txt b/tests-requirements.txt index d8afcc9..194531f 100644 --- a/tests-requirements.txt +++ b/tests-requirements.txt @@ -9,13 +9,10 @@ pytest pytest_asyncio pytest-cov pytest-rerunfailures -pytest-twisted pytest-xdist ruff setuptools -tornado tox treq -twisted virtualenv wheel \ No newline at end of file diff --git a/tests/test_base.py b/tests/test_base.py index 91450e2..ac80bcd 100644 --- a/tests/test_base.py +++ b/tests/test_base.py @@ -111,9 +111,9 @@ def test_node_meta(self): c = Consul() for r in _should_support_node_meta(c): assert r().params == [] - assert sorted(r(node_meta={"env": "prod", "net": 1}).params) == sorted( - [("node-meta", "net:1"), ("node-meta", "env:prod")] - ) + assert sorted(r(node_meta={"env": "prod", "net": 1}).params) == sorted([ + ("node-meta", "net:1"), ("node-meta", "env:prod") + ]) class TestMeta: diff --git a/tests/test_tornado.py b/tests/test_tornado.py deleted file mode 100644 index 27edbdf..0000000 --- a/tests/test_tornado.py +++ /dev/null @@ -1,353 +0,0 @@ -import base64 -import struct -import time - -import pytest -from tornado import gen, ioloop -from tornado.ioloop import IOLoop - -import consul -import consul.tornado - -Check = consul.Check - - -@pytest.fixture(autouse=True) -def ensure_tornado_ioloop(): - assert isinstance(IOLoop.current(), IOLoop), "Not using Tornado's IOLoop!" - - -@pytest.fixture -def consul_obj(consul_port): - c = consul.tornado.Consul(port=consul_port) - yield c - loop = IOLoop.current() - loop.run_sync(c.close) - - -@pytest.fixture -def consul_acl_obj(acl_consul): - c = consul.tornado.Consul(port=acl_consul.port, token=acl_consul.token) - yield c - loop = IOLoop.current() - loop.run_sync(c.close) - - -class TestConsul: - async def test_kv(self, consul_obj): - c = consul_obj - _index, data = await c.kv.get("foo") - assert data is None - response = await c.kv.put("foo", "bar") - assert response is True - _index, data = await c.kv.get("foo") - assert data["Value"] == b"bar" - - async def test_kv_binary(self, consul_obj): - c = consul_obj - await c.kv.put("foo", struct.pack("i", 1000)) - _index, data = await c.kv.get("foo") - assert struct.unpack("i", data["Value"]) == (1000,) - - def test_kv_missing(self, consul_obj): - c = consul_obj - - @gen.coroutine - def main(): - yield c.kv.put("index", "bump") - index, data = yield c.kv.get("foo") - assert data is None - index, data = yield c.kv.get("foo", index=index) - assert data["Value"] == b"bar" - loop.stop() - - @gen.coroutine - def put(): - yield c.kv.put("foo", "bar") - - loop = ioloop.IOLoop.current() - loop.add_timeout(time.time() + (2.0 / 100), put) - loop.run_sync(main) - - async def test_kv_put_flags(self, consul_obj): - c = consul_obj - yield c.kv.put("foo", "bar") - _index, data = yield c.kv.get("foo") - assert data["Flags"] == 0 - - response = yield c.kv.put("foo", "bar", flags=50) - assert response is True - _index, data = yield c.kv.get("foo") - assert data["Flags"] == 50 - - async def test_kv_delete(self, consul_obj): - c = consul_obj - await c.kv.put("foo1", "1") - await c.kv.put("foo2", "2") - await c.kv.put("foo3", "3") - _index, data = await c.kv.get("foo", recurse=True) - assert [x["Key"] for x in data] == ["foo1", "foo2", "foo3"] - - response = await c.kv.delete("foo2") - assert response is True - _index, data = await c.kv.get("foo", recurse=True) - assert [x["Key"] for x in data] == ["foo1", "foo3"] - response = await c.kv.delete("foo", recurse=True) - assert response is True - _index, data = await c.kv.get("foo", recurse=True) - assert data is None - - def test_kv_subscribe(self, consul_obj): - c = consul_obj - - @gen.coroutine - def get(): - index, data = yield c.kv.get("foo") - assert data is None - index, data = yield c.kv.get("foo", index=index) - assert data["Value"] == b"bar" - loop.stop() - - @gen.coroutine - def put(): - response = yield c.kv.put("foo", "bar") - assert response is True - - loop = ioloop.IOLoop.current() - loop.add_timeout(time.time() + (1.0 / 100), put) - loop.run_sync(get) - - async def test_kv_encoding(self, consul_obj): - c = consul_obj - - # test binary - response = await c.kv.put("foo", struct.pack("i", 1000)) - assert response is True - _index, data = await c.kv.get("foo") - assert struct.unpack("i", data["Value"]) == (1000,) - - # test unicode - response = await c.kv.put("foo", "bar") - assert response is True - _index, data = await c.kv.get("foo") - assert data["Value"] == b"bar" - - # test empty-string comes back as `None` - response = await c.kv.put("foo", "") - assert response is True - _index, data = await c.kv.get("foo") - assert data["Value"] is None - - # test None - response = await c.kv.put("foo", None) - assert response is True - _index, data = await c.kv.get("foo") - assert data["Value"] is None - - # check unencoded values raises assert - with pytest.raises(AssertionError): - await c.kv.put("foo", {1: 2}) - - async def test_transaction(self, consul_obj): - c = consul_obj - value = base64.b64encode(b"1").decode("utf8") - d = {"KV": {"Verb": "set", "Key": "asdf", "Value": value}} - r = await c.txn.put([d]) - assert r["Errors"] is None - - d = {"KV": {"Verb": "get", "Key": "asdf"}} - r = await c.txn.put([d]) - assert r["Results"][0]["KV"]["Value"] == value - - async def test_agent_services(self, consul_obj): - c = consul_obj - services = await c.agent.services() - assert services == {} - response = await c.agent.service.register("foo") - assert response is True - services = await c.agent.services() - assert services == { - "foo": { - "Port": 0, - "ID": "foo", - "CreateIndex": 0, - "ModifyIndex": 0, - "EnableTagOverride": False, - "Service": "foo", - "Tags": [], - "Meta": {}, - "Address": "", - }, - } - response = await c.agent.service.deregister("foo") - assert response is True - services = await c.agent.services() - assert services == {} - - def test_catalog(self, consul_obj): - c = consul_obj - - @gen.coroutine - def nodes(): - index, nodes = yield c.catalog.nodes() - assert len(nodes) == 1 - current = nodes[0] - - index, nodes = yield c.catalog.nodes(index=index) - nodes.remove(current) - assert [x["Node"] for x in nodes] == ["n1"] - - index, nodes = yield c.catalog.nodes(index=index) - nodes.remove(current) - assert [x["Node"] for x in nodes] == [] - loop.stop() - - @gen.coroutine - def register(): - response = yield c.catalog.register("n1", "10.1.10.11") - assert response is True - yield gen.sleep(50 / 1000.0) - response = yield c.catalog.deregister("n1") - assert response is True - - loop = ioloop.IOLoop.current() - loop.add_timeout(time.time() + (1.0 / 100), register) - loop.run_sync(nodes) - - async def test_health_service(self, consul_obj): - c = consul_obj - # check there are no nodes for the service 'foo' - _index, nodes = await c.health.service("foo") - assert nodes == [] - - # register two nodes, one with a long ttl, the other shorter - await c.agent.service.register("foo", service_id="foo:1", check=Check.ttl("10s")) - await c.agent.service.register("foo", service_id="foo:2", check=Check.ttl("100ms")) - - await gen.sleep(30 / 1000.0) - - # check the nodes show for the /health/service endpoint - _index, nodes = await c.health.service("foo") - assert [node["Service"]["ID"] for node in nodes] == ["foo:1", "foo:2"] - - # but that they aren't passing their health check - _index, nodes = await c.health.service("foo", passing=True) - assert nodes == [] - - # ping the two node's health check - await c.agent.check.ttl_pass("service:foo:1") - await c.agent.check.ttl_pass("service:foo:2") - - await gen.sleep(50 / 1000.0) - - # both nodes are now available - _index, nodes = await c.health.service("foo", passing=True) - assert [node["Service"]["ID"] for node in nodes] == ["foo:1", "foo:2"] - - # wait until the short ttl node fails - await gen.sleep(120 / 1000.0) - - # only one node available - _index, nodes = await c.health.service("foo", passing=True) - assert [node["Service"]["ID"] for node in nodes] == ["foo:1"] - - # ping the failed node's health check - await c.agent.check.ttl_pass("service:foo:2") - - await gen.sleep(30 / 1000.0) - - # check both nodes are available - _index, nodes = await c.health.service("foo", passing=True) - assert [node["Service"]["ID"] for node in nodes] == ["foo:1", "foo:2"] - - # deregister the nodes - await c.agent.service.deregister("foo:1") - await c.agent.service.deregister("foo:2") - - await gen.sleep(30 / 1000.0) - - _index, nodes = await c.health.service("foo") - assert nodes == [] - - def test_health_service_subscribe(self, consul_obj): - c = consul_obj - - class Config: - nodes = [] - - config = Config() - - @gen.coroutine - def monitor(): - yield c.agent.service.register("foo", service_id="foo:1", check=Check.ttl("40ms")) - index = None - while True: - index, nodes = yield c.health.service("foo", index=index, passing=True) - config.nodes = [node["Service"]["ID"] for node in nodes] - - @gen.coroutine - def keepalive(): - # give the monitor a chance to register the service - yield gen.sleep(50 / 1000.0) - assert config.nodes == [] - - # ping the service's health check - yield c.agent.check.ttl_pass("service:foo:1") - yield gen.sleep(30 / 1000.0) - assert config.nodes == ["foo:1"] - - # the service should fail - yield gen.sleep(60 / 1000.0) - assert config.nodes == [] - - yield c.agent.service.deregister("foo:1") - loop.stop() - - loop = ioloop.IOLoop.current() - loop.add_callback(monitor) - loop.run_sync(keepalive) - - @pytest.mark.tornado - async def test_session(self, consul_obj): - c = consul_obj - - async def monitor(): - index, services = await c.session.list() - assert services == [] - await gen.sleep(20 / 1000.0) - - index, services = await c.session.list(index=index) - assert len(services) - - index, services = await c.session.list(index=index) - assert services == [] - loop.stop() - - async def register(): - session_id = await c.session.create() - await gen.sleep(50 / 1000.0) - response = await c.session.destroy(session_id) - assert response is True - - loop = ioloop.IOLoop.current() - loop.add_timeout(time.time() + (1.0 / 100), register) - await monitor() - - async def test_acl(self, consul_acl_obj): - c = consul_acl_obj - - rules = """ - key "" { - policy = "read" - } - key "private/" { - policy = "deny" - } - """ - token = await c.acl.create(rules=rules) - - with pytest.raises(consul.ACLPermissionDenied): - await c.acl.list(token=token) - - destroyed = await c.acl.destroy(token) - assert destroyed is True diff --git a/tests/test_twisted.py b/tests/test_twisted.py deleted file mode 100644 index fb56d58..0000000 --- a/tests/test_twisted.py +++ /dev/null @@ -1,306 +0,0 @@ -import base64 -import struct - -import pytest -import pytest_twisted -from twisted.internet import defer, reactor - -import consul -import consul.twisted - -Check = consul.Check - - -def sleep(seconds): - """ - An asynchronous sleep function using twsited. Source: - http://twistedmatrix.com/pipermail/twisted-python/2009-October/020788.html - - :type seconds: float - """ - d = defer.Deferred() - reactor.callLater(seconds, d.callback, seconds) - return d - - -@pytest.fixture(autouse=True) -def check_twisted_reactor(): - assert reactor, "Not using Twisted's reactor!" - - -@pytest.fixture -def consul_obj(consul_port): - c = consul.twisted.Consul(port=consul_port) - yield c - - -@pytest.fixture -def consul_acl_obj(acl_consul): - c = consul.twisted.Consul(port=acl_consul.port, token=acl_consul.token) - yield c - - -class TestConsul: - @pytest_twisted.inlineCallbacks - def test_kv(self, consul_obj): - c = consul_obj - _index, data = yield c.kv.get("foo") - assert data is None - response = yield c.kv.put("foo", "bar") - assert response is True - _index, data = yield c.kv.get("foo") - assert data["Value"] == b"bar" - - @pytest_twisted.inlineCallbacks - def test_kv_binary(self, consul_obj): - c = consul_obj - yield c.kv.put("foo", struct.pack("i", 1000)) - _index, data = yield c.kv.get("foo") - assert struct.unpack("i", data["Value"]) == (1000,) - - @pytest_twisted.inlineCallbacks - def test_kv_missing(self, consul_obj): - c = consul_obj - reactor.callLater(2.0 / 100, c.kv.put, "foo", "bar") - yield c.kv.put("index", "bump") - index, data = yield c.kv.get("foo") - assert data is None - index, data = yield c.kv.get("foo", index=index) - assert data["Value"] == b"bar" - - @pytest_twisted.inlineCallbacks - def test_kv_put_flags(self, consul_obj): - c = consul_obj - yield c.kv.put("foo", "bar") - _index, data = yield c.kv.get("foo") - assert data["Flags"] == 0 - - response = yield c.kv.put("foo", "bar", flags=50) - assert response is True - _index, data = yield c.kv.get("foo") - assert data["Flags"] == 50 - - @pytest_twisted.inlineCallbacks - def test_kv_delete(self, consul_obj): - c = consul_obj - yield c.kv.put("foo1", "1") - yield c.kv.put("foo2", "2") - yield c.kv.put("foo3", "3") - _index, data = yield c.kv.get("foo", recurse=True) - assert [x["Key"] for x in data] == ["foo1", "foo2", "foo3"] - - response = yield c.kv.delete("foo2") - assert response is True - _index, data = yield c.kv.get("foo", recurse=True) - assert [x["Key"] for x in data] == ["foo1", "foo3"] - response = yield c.kv.delete("foo", recurse=True) - assert response is True - _index, data = yield c.kv.get("foo", recurse=True) - assert data is None - - @pytest_twisted.inlineCallbacks - def test_kv_subscribe(self, consul_obj): - c = consul_obj - - @defer.inlineCallbacks - def put(): - response = yield c.kv.put("foo", "bar") - assert response is True - - reactor.callLater(1.0 / 100, put) - index, data = yield c.kv.get("foo") - assert data is None - index, data = yield c.kv.get("foo", index=index) - assert data["Value"] == b"bar" - - @pytest_twisted.inlineCallbacks - def test_transaction(self, consul_obj): - c = consul_obj - value = base64.b64encode(b"1").decode("utf8") - d = {"KV": {"Verb": "set", "Key": "asdf", "Value": value}} - r = yield c.txn.put([d]) - assert r["Errors"] is None - - d = {"KV": {"Verb": "get", "Key": "asdf"}} - r = yield c.txn.put([d]) - assert r["Results"][0]["KV"]["Value"] == value - - @pytest_twisted.inlineCallbacks - def test_agent_services(self, consul_obj): - c = consul_obj - services = yield c.agent.services() - assert services == {} - response = yield c.agent.service.register("foo") - assert response is True - services = yield c.agent.services() - assert services == { - "foo": { - "Port": 0, - "ID": "foo", - "CreateIndex": 0, - "ModifyIndex": 0, - "EnableTagOverride": False, - "Service": "foo", - "Tags": [], - "Meta": {}, - "Address": "", - } - } - response = yield c.agent.service.deregister("foo") - assert response is True - services = yield c.agent.services() - assert services == {} - - @pytest_twisted.inlineCallbacks - def test_catalog(self, consul_obj): - c = consul_obj - - @defer.inlineCallbacks - def register(): - response = yield c.catalog.register("n1", "10.1.10.11") - assert response is True - yield sleep(50 / 1000.0) - response = yield c.catalog.deregister("n1") - assert response is True - - reactor.callLater(1.0 / 100, register) - - index, nodes = yield c.catalog.nodes() - assert len(nodes) == 1 - current = nodes[0] - - index, nodes = yield c.catalog.nodes(index=index) - nodes.remove(current) - assert [x["Node"] for x in nodes] == ["n1"] - - index, nodes = yield c.catalog.nodes(index=index) - nodes.remove(current) - assert [x["Node"] for x in nodes] == [] - - @pytest_twisted.inlineCallbacks - def test_health_service(self, consul_obj): - c = consul_obj - - # check there are no nodes for the service 'foo' - _index, nodes = yield c.health.service("foo") - assert nodes == [] - - # register two nodes, one with a long ttl, the other shorter - yield c.agent.service.register("foo", service_id="foo:1", check=Check.ttl("10s")) - yield c.agent.service.register("foo", service_id="foo:2", check=Check.ttl("100ms")) - - yield sleep(1.0) - - # check the nodes show for the /health/service endpoint - _index, nodes = yield c.health.service("foo") - assert [node["Service"]["ID"] for node in nodes] == ["foo:1", "foo:2"] - - # but that they aren't passing their health check - _index, nodes = yield c.health.service("foo", passing=True) - assert nodes == [] - - # ping the two node's health check - yield c.agent.check.ttl_pass("service:foo:1") - yield c.agent.check.ttl_pass("service:foo:2") - - yield sleep(0.05) - - # both nodes are now available - _index, nodes = yield c.health.service("foo", passing=True) - assert [node["Service"]["ID"] for node in nodes] == ["foo:1", "foo:2"] - - # wait until the short ttl node fails - yield sleep(0.5) - - # only one node available - _index, nodes = yield c.health.service("foo", passing=True) - assert [node["Service"]["ID"] for node in nodes] == ["foo:1"] - - # ping the failed node's health check - yield c.agent.check.ttl_pass("service:foo:2") - - yield sleep(0.05) - - # check both nodes are available - _index, nodes = yield c.health.service("foo", passing=True) - assert [node["Service"]["ID"] for node in nodes] == ["foo:1", "foo:2"] - - # deregister the nodes - yield c.agent.service.deregister("foo:1") - yield c.agent.service.deregister("foo:2") - - yield sleep(2) - _index, nodes = yield c.health.service("foo") - assert nodes == [] - - @pytest_twisted.inlineCallbacks - def test_health_service_subscribe(self, consul_obj): - c = consul_obj - - class Config: - def __init__(self): - self.nodes = [] - self.index = None - - @defer.inlineCallbacks - def update(self): - self.index, nodes = yield c.health.service("foo", index=None, passing=True) - self.nodes = [node["Service"]["ID"] for node in nodes] - - config = Config() - yield c.agent.service.register("foo", service_id="foo:1", check=Check.ttl("40ms")) - yield config.update() - assert config.nodes == [] - - # ping the service's health check - yield c.agent.check.ttl_pass("service:foo:1") - yield config.update() - assert config.nodes == ["foo:1"] - - # the service should fail - yield sleep(0.8) - yield config.update() - assert config.nodes == [] - - yield c.agent.service.deregister("foo:1") - - @pytest_twisted.inlineCallbacks - def test_session(self, consul_obj): - c = consul_obj - - index, services = yield c.session.list() - assert services == [] - - session_id = yield c.session.create() - index, services = yield c.session.list(index=index) - assert len(services) - - response = yield c.session.destroy(session_id) - assert response is True - - index, services = yield c.session.list(index=index) - assert services == [] - - @pytest_twisted.inlineCallbacks - def test_acl(self, consul_acl_obj): - c = consul_acl_obj - - rules = """ - key "" { - policy = "read" - } - key "private/" { - policy = "deny" - } - """ - token = yield c.acl.create(rules=rules) - - raised = False - try: - yield c.acl.list(token=token) - except consul.ACLPermissionDenied: - raised = True - assert raised - - destroyed = yield c.acl.destroy(token) - assert destroyed is True