Skip to content

Commit

Permalink
Applied etcd3 test fixes to etcd2
Browse files Browse the repository at this point in the history
  • Loading branch information
tcalmant committed Aug 23, 2024
1 parent 6b31c0a commit a58c49d
Showing 1 changed file with 88 additions and 67 deletions.
155 changes: 88 additions & 67 deletions tests/rsa/test_etcd_discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@
"""

import json
import threading
import unittest
from typing import Any, TypeVar

from pelix.internals.registry import ServiceReference
from pelix.utilities import EventData

try:
# Try to import modules
Expand Down Expand Up @@ -61,13 +61,14 @@
# ------------------------------------------------------------------------------


def start_framework_for_advertise(state_queue):
def start_framework_for_advertise(state_queue: Queue, order_queue: Queue):
"""
Starts a Pelix framework to advertise (via etcd) a helloimpl_xmlrpc
remote service instance. The tests can/will then discover this
service advertisement and test the EndpointEventListener notification
:param state_queue: Queue to communicate status and terminate
:param state_queue: Queue to communicate status
:param order_queue: Queue to receive termination order
"""
try:
# Start the framework
Expand Down Expand Up @@ -117,20 +118,16 @@ def start_framework_for_advertise(state_queue):
"service.exported.configs": "ecf.xmlrpc.server",
},
)
# Send that we are now ready
state_queue.put("ready")
# Loop until ready processed
while True:
if state_queue.empty():
break
# Loop until we receive done message
while True:
state = state_queue.get()
if state is None:
break
# stop the framework gracefully
framework.stop()
framework.delete()
try:
# Send that we are now ready
state_queue.put("ready")
# Wait until we receive the termination order
order = order_queue.get(timeout=60)
assert order == "quit"
finally:
# stop the framework gracefully
framework.stop()
framework.delete()
except Exception as ex:
state_queue.put(f"Error: {ex}")

Expand All @@ -145,12 +142,7 @@ def setUp(self):
print(f"EtcdDiscoveryListenerTest etcd_hostname={TEST_ETCD_HOSTNAME},toppath={TEST_ETCD_TOPPATH}")
# start external framework that publishes remote service
self.status_queue = Queue()
self.publisher_process = WrappedProcess(
target=start_framework_for_advertise, args=[self.status_queue]
)
self.publisher_process.start()
state = self.status_queue.get(timeout=10)
self.assertEqual(state, "ready")
self.order_queue = Queue()

# start a local framework
self.framework = create_framework(
Expand Down Expand Up @@ -178,64 +170,46 @@ def setUp(self):
{TopologyManager.ENDPOINT_LISTENER_SCOPE: ENDPOINT_LISTENER_SCOPE},
)

def prepareFrameworkProcess(self):
"""
Starts a framework in separate process to advertise a helloimpl
remote service
"""
# start external framework that publishes remote service
self.publisher_process = WrappedProcess(
target=start_framework_for_advertise, args=[self.status_queue, self.order_queue]
)
self.publisher_process.start()
state = self.status_queue.get(timeout=20)
self.assertEqual(state, "ready")

def tearDown(self):
"""
Cleans up external publishing framework for next test
"""
try:
self.status_queue.put(None)
self.publisher_process.join(1)
self.publisher_process.kill()
self.publisher_process.join(5)
self.publisher_process.close()
finally:
self.status_queue.close()
self.order_queue.close()
self.publisher = None
finally:

# Stop the framework
pelix.framework.FrameworkFactory.delete_framework()
self.framework = None

def test_etcd_discover(self):
test_done_event = threading.Event()
test_done_event = EventData()

def test_handler_1(endpoint_event, matched_filter):
self.assertTrue(matched_filter, ENDPOINT_LISTENER_SCOPE)
self.assertIsNotNone(endpoint_event, "endpoint_event is None")
self.assertTrue(isinstance(endpoint_event, EndpointEvent))
ee_type = endpoint_event.get_type()
self.assertTrue(ee_type == EndpointEvent.ADDED or ee_type == EndpointEvent.REMOVED)
ee_ed = endpoint_event.get_endpoint_description()
self.assertTrue(isinstance(ee_ed, EndpointDescription))
self.assertIsNotNone(ee_ed.get_id(), "endpoint_description id is None")
self.assertIsNotNone(
ee_ed.get_framework_uuid(),
"endpoint_description framework uuid is None",
)

interfaces = ee_ed.get_interfaces()
# test that service interfaces is not None and is of type list
self.assertIsNotNone(interfaces)
self.assertTrue(isinstance(interfaces, type([])))
self.assertTrue("org.eclipse.ecf.examples.hello.IHello" in interfaces)

# set the test_done_event, so tester thread will continue
test_done_event.set()

# set the handler to the test code above
self.listener.set_handler(test_handler_1)
# wait as much as 50 seconds to complete
test_done_event.wait(50)

def test_etcd_discover_remove(self):
test_done_event = threading.Event()

def test_handler_2(endpoint_event, matched_filter):
if endpoint_event.get_type() == EndpointEvent.ADDED:
# send shutdown to trigger the removal
self.status_queue.put(None)

elif endpoint_event.get_type() == EndpointEvent.REMOVED:
# do tests
try:
self.assertTrue(matched_filter, ENDPOINT_LISTENER_SCOPE)
self.assertIsNotNone(endpoint_event, "endpoint_event is None")
self.assertTrue(isinstance(endpoint_event, EndpointEvent))
ee_type = endpoint_event.get_type()
self.assertTrue(ee_type == EndpointEvent.ADDED or ee_type == EndpointEvent.REMOVED)
ee_ed = endpoint_event.get_endpoint_description()
self.assertTrue(isinstance(ee_ed, EndpointDescription))
self.assertIsNotNone(ee_ed.get_id(), "endpoint_description id is None")
Expand All @@ -250,14 +224,61 @@ def test_handler_2(endpoint_event, matched_filter):
self.assertTrue(isinstance(interfaces, type([])))
self.assertTrue("org.eclipse.ecf.examples.hello.IHello" in interfaces)

# finally set the test_done_event, so tester thread will
# continue
# set the test_done_event, so tester thread will continue
test_done_event.set()
except Exception as ex:
test_done_event.raise_exception(ex)

# set the handler to the test code above
self.listener.set_handler(test_handler_1)

# Start the remote process
self.prepareFrameworkProcess()

# wait as much as 50 seconds to complete
self.assertTrue(test_done_event.wait(50))

def test_etcd_discover_remove(self):
test_done_event = EventData()

def test_handler_2(endpoint_event, matched_filter):
try:
if endpoint_event.get_type() == EndpointEvent.ADDED:
# send shutdown to trigger the removal
self.order_queue.put("quit")
elif endpoint_event.get_type() == EndpointEvent.REMOVED:
# do tests
self.assertTrue(matched_filter, ENDPOINT_LISTENER_SCOPE)
self.assertIsNotNone(endpoint_event, "endpoint_event is None")
self.assertTrue(isinstance(endpoint_event, EndpointEvent))
ee_ed = endpoint_event.get_endpoint_description()
self.assertTrue(isinstance(ee_ed, EndpointDescription))
self.assertIsNotNone(ee_ed.get_id(), "endpoint_description id is None")
self.assertIsNotNone(
ee_ed.get_framework_uuid(),
"endpoint_description framework uuid is None",
)

interfaces = ee_ed.get_interfaces()
# test that service interfaces is not None and is of type list
self.assertIsNotNone(interfaces)
self.assertTrue(isinstance(interfaces, type([])))
self.assertTrue("org.eclipse.ecf.examples.hello.IHello" in interfaces)

# finally set the test_done_event, so tester thread will
# continue
test_done_event.set()
except Exception as ex:
test_done_event.raise_exception(ex)

# set the handler to the test code above
self.listener.set_handler(test_handler_2)

# Start the remote process
self.prepareFrameworkProcess()

# wait as much as 60 seconds to complete
test_done_event.wait(60)
self.assertTrue(test_done_event.wait(60))


class EtcdDiscoveryPublishTest(unittest.TestCase):
Expand Down

0 comments on commit a58c49d

Please sign in to comment.