Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cancel event sources earlier #634

Open
wants to merge 7 commits into
base: main
Choose a base branch
from

Conversation

sc68cal
Copy link
Contributor

@sc68cal sc68cal commented Dec 28, 2023

This is an attempt to address #633 but it's very ham-fisted.

This is an attempt to address ansible#633 but it's very ham-fisted.
@sc68cal sc68cal force-pushed the cancel_sources_graceful branch from 49f75ff to 3670e97 Compare December 28, 2023 20:49
@sc68cal
Copy link
Contributor Author

sc68cal commented Dec 28, 2023

With this PR and the RabbitMQ event source plugin, you can see that the ruleset runs a playbook (that just runs pause for 30 seconds to simulate work being done), and stops pulling from the RabbitMQ event source, while also completing all the work that it already pulled from the event source before the shutdown event was processed.

---
- name: Rulebook for OpenStack TSF Controller

  hosts: all

  sources:
    - ansible.eda.rabbitmq:
        host: localhost
        port: 5672
        queue: "openstack-tsf-network-controller-{{GENERATION}}"
        user: guest
        password: guest
        routing_key: 'platform.openstack.*.tsf.rendering.network.*'
        exchange: "platform_requests"

    - ansible.eda.webhook:
        port: 5000

  rules:
    - name: "tsf network create rabbitmq event"
      condition: event.body.event == 'createOpenStackNetwork'
      actions:
        - run_module:
            name: pause
            module_args:
              seconds: 30

    - name: "shutdown via webhook"
      condition: event.payload.action == "shutdown"
      action:
        shutdown:
          message: "Shutting down"
          delay: 600
2023-12-28 15:46:51,600 - ansible_rulebook.rule_set_runner - INFO - Ruleset: Rulebook for OpenStack TSF Controller, received shutdown: Shutdown(message='Shutting down', delay=600, kind='graceful', source_plugin='')
2023-12-28 15:46:51,600 - ansible_rulebook.rule_set_runner - INFO - calling task cancel here
2023-12-28 15:46:51,600 - ansible_rulebook.rule_set_runner - INFO - calling task cancel here
2023-12-28 15:46:51,600 - ansible_rulebook.rule_set_runner - INFO - ruleset: Rulebook for OpenStack TSF Controller waiting 600.000000 for shutdown
2023-12-28 15:46:51,600 - aiorabbit.client - DEBUG - Writing frame: <Basic.Cancel object at 0x11d14b390>
2023-12-28 15:46:51,601 - aiorabbit.client - DEBUG - Set state to 0x73: Cancelling Consumer while state is 0x71: Sent message acknowledgement - {83: {}, 34: {}, 81: {}, 119: {}} [None]
2023-12-28 15:46:51,601 - aiorabbit.client - DEBUG - Transition to 0x73: Cancelling Consumer from 0x71: Sent message acknowledgement after 39.0956 seconds
2023-12-28 15:46:51,601 - aiorabbit.client - DEBUG - Waiter 2400351569003208 waiting on (116: Consumer cancelled || 34: Channel Close Received) while in 0x73: Cancelling Consumer
2023-12-28 15:46:51,601 - <run_path> - INFO - Webhook Plugin Task Cancelled
2023-12-28 15:46:51,601 - ansible_rulebook.engine - INFO - Broadcast shutdown to all source plugins
2023-12-28 15:46:51,601 - ansible_rulebook.rule_set_runner - INFO - Task action::shutdown::Rulebook for OpenStack TSF Controller::shutdown via webhook finished, active actions 0
2023-12-28 15:46:51,602 - ansible_rulebook.rule_set_runner - DEBUG - Creating action task action::run_module::Rulebook for OpenStack TSF Controller::tsf network create rabbitmq event
2023-12-28 15:46:51,602 - ansible_rulebook.engine - INFO - Broadcast to queues: [<Queue at 0x119a9f4d0 maxsize=1 _queue=[Shutdown(message='Shutting down', delay=600, kind='graceful', source_plugin='')] tasks=8>, <Queue at 0x119a9f4d0 maxsize=1 _queue=[Shutdown(message='Shutting down', delay=600, kind='graceful', source_plugin='')] tasks=8>]
2023-12-28 15:46:51,602 - ansible_rulebook.engine - INFO - Broadcasting shutdown: Shutdown(message='Source ansible.eda.webhook initiated shutdown at 2023-12-28 15:46:51.601843', delay=60.0, kind='graceful', source_plugin='ansible.eda.webhook')
2023-12-28 15:46:51,602 - ansible_rulebook.rule_set_runner - INFO - call_action run_module
2023-12-28 15:46:51,602 - ansible_rulebook.rule_set_runner - INFO - action args: {'name': 'pause', 'module_args': {'seconds': 30}}
2023-12-28 15:46:51,610 - ansible_rulebook.action.run_playbook - INFO - ruleset: Rulebook for OpenStack TSF Controller, rule: tsf network create rabbitmq event
2023-12-28 15:46:51,610 - ansible_rulebook.action.run_playbook - DEBUG - private data dir /var/folders/kl/56c62hm51dd742lfbstbchgc0000gp/T/edatskbydkr
2023-12-28 15:46:51,622 - ansible_rulebook.action.run_playbook - DEBUG - project_data_file: None
2023-12-28 15:46:51,624 - ansible_rulebook.action.run_playbook - INFO - Calling Ansible runner
2023-12-28 15:46:51,625 - aiorabbit.client - DEBUG - Set state to 0x74: Consumer cancelled while state is 0x73: Cancelling Consumer - {83: {}, 34: {2400351569003208: <asyncio.locks.Event object at 0x119a9dd50 [unset]>}, 81: {}, 119: {}, 116: {2400351569003208: <asyncio.locks.Event object at 0x119cd2ad0 [unset]>}} [None]
2023-12-28 15:46:51,625 - aiorabbit.client - DEBUG - Transition to 0x74: Consumer cancelled from 0x73: Cancelling Consumer after 0.0239 seconds
2023-12-28 15:46:51,626 - aiorabbit.client - DEBUG - Waiter 2400351569003208 wait on 0x74: Consumer cancelled has finished [None]
2023-12-28 15:46:51,626 - aiorabbit.client - DEBUG - Writing frame: <Channel.Close object at 0x11d13b890>
2023-12-28 15:46:51,626 - aiorabbit.client - DEBUG - Set state to 0x23: Channel Close Sent while state is 0x74: Consumer cancelled - {83: {}, 34: {}, 81: {}, 119: {}, 116: {}} [None]
2023-12-28 15:46:51,626 - aiorabbit.client - DEBUG - Transition to 0x23: Channel Close Sent from 0x74: Consumer cancelled after 0.0015 seconds
2023-12-28 15:46:51,626 - aiorabbit.client - DEBUG - Waiter 2400351594439708 waiting on (36: Channel CloseOk Received || 34: Channel Close Received) while in 0x23: Channel Close Sent
2023-12-28 15:46:51,627 - aiorabbit.client - DEBUG - Set state to 0x24: Channel CloseOk Received while state is 0x23: Channel Close Sent - {83: {}, 34: {2400351594439708: <asyncio.locks.Event object at 0x11d16ea10 [unset]>}, 81: {}, 119: {}, 116: {}, 36: {2400351594439708: <asyncio.locks.Event object at 0x105239910 [unset]>}} [None]
2023-12-28 15:46:51,627 - aiorabbit.client - DEBUG - Transition to 0x24: Channel CloseOk Received from 0x23: Channel Close Sent after 0.0011 seconds
2023-12-28 15:46:51,628 - aiorabbit.client - DEBUG - Waiter 2400351594439708 wait on 0x24: Channel CloseOk Received has finished [None]
2023-12-28 15:46:51,628 - aiorabbit.client - DEBUG - Set state to 0x102: Closing while state is 0x24: Channel CloseOk Received - {83: {}, 34: {}, 81: {}, 119: {}, 116: {}, 36: {}} [None]
2023-12-28 15:46:51,628 - aiorabbit.client - DEBUG - Transition to 0x102: Closing from 0x24: Channel CloseOk Received after 0.0002 seconds
2023-12-28 15:46:51,628 - aiorabbit.channel0 - DEBUG - Set state to 0x20: Connection Close Sent while state is 0x18: Heartbeat Sent - {22: {}, 34: {}} [None]
2023-12-28 15:46:51,628 - aiorabbit.channel0 - DEBUG - Transition to 0x20: Connection Close Sent from 0x18: Heartbeat Sent after 4.8139 seconds
2023-12-28 15:46:51,628 - aiorabbit.channel0 - DEBUG - Waiter 2400351595868708 waiting on (33: Connection CloseOk Received) while in 0x20: Connection Close Sent
2023-12-28 15:46:51,628 - aiorabbit.channel0 - DEBUG - Set state to 0x21: Connection CloseOk Received while state is 0x20: Connection Close Sent - {22: {}, 34: {}, 33: {2400351595868708: <asyncio.locks.Event object at 0x1076af1d0 [unset]>}} [None]
2023-12-28 15:46:51,628 - aiorabbit.channel0 - DEBUG - Transition to 0x21: Connection CloseOk Received from 0x20: Connection Close Sent after 0.0006 seconds
2023-12-28 15:46:51,629 - aiorabbit.channel0 - DEBUG - Waiter 2400351595868708 wait on 0x21: Connection CloseOk Received has finished [None]
2023-12-28 15:46:51,629 - aiorabbit.client - DEBUG - Set state to 0x103: Closed while state is 0x102: Closing - {83: {}, 34: {}, 81: {}, 119: {}, 116: {}, 36: {}} [None]
2023-12-28 15:46:51,629 - aiorabbit.client - DEBUG - Transition to 0x103: Closed from 0x102: Closing after 0.0015 seconds
2023-12-28 15:46:51,629 - aiorabbit.client - DEBUG - Resetting internal state
2023-12-28 15:46:51,629 - ansible_rulebook.engine - INFO - Task cancelled Source ansible.eda.rabbitmq task cancelled, initiated shutdown at 2023-12-28 15:46:51.629794
2023-12-28 15:46:51,629 - ansible_rulebook.engine - INFO - Broadcast shutdown to all source plugins
2023-12-28 15:46:51,630 - aiorabbit.client - DEBUG - Disconnected: None
2023-12-28 15:46:51,630 - ansible_rulebook.engine - INFO - Broadcast to queues: [<Queue at 0x119a9f4d0 maxsize=1 _queue=[Shutdown(message='Shutting down', delay=600, kind='graceful', source_plugin='')] _putters[1] tasks=8>, <Queue at 0x119a9f4d0 maxsize=1 _queue=[Shutdown(message='Shutting down', delay=600, kind='graceful', source_plugin='')] _putters[1] tasks=8>]
2023-12-28 15:46:51,630 - ansible_rulebook.engine - INFO - Broadcasting shutdown: Shutdown(message='Source ansible.eda.rabbitmq task cancelled, initiated shutdown at 2023-12-28 15:46:51.629794', delay=60.0, kind='graceful', source_plugin='ansible.eda.rabbitmq')

PLAY [wrapper] *****************************************************************

TASK [Module wrapper] **********************************************************
Pausing for 30 seconds
(ctrl+C then 'C' = continue early, ctrl+C then 'A' = abort)
ok: [localhost]

TASK [save result] *************************************************************
ok: [localhost]
fatal: [example.net]: FAILED! => {"msg": "The task includes an option with an undefined variable. The error was: 'module_result' is undefined. 'module_result' is undefined\n\nThe error appears to be in '/var/folders/kl/56c62hm51dd742lfbstbchgc0000gp/T/edatskbydkr/wrapper.yml': line 9, column 5, but may\nbe elsewhere in the file depending on the exact syntax problem.\n\nThe offending line appears to be:\n\n    register: module_result\n  - ansible.builtin.set_fact:\n    ^ here\n"}

PLAY RECAP *********************************************************************
localhost                  : ok=2    changed=0    unreachable=0    failed=0    skipped=0    rescued=0    ignored=0
example.net : ok=0    changed=0    unreachable=0    failed=1    skipped=0    rescued=0    ignored=0
2023-12-28 15:47:22,972 - ansible_rulebook.action.runner - DEBUG - Cancel Queue reading task
2023-12-28 15:47:22,973 - ansible_rulebook.action.runner - INFO - Ansible runner Queue task cancelled
2023-12-28 15:47:22,980 - ansible_rulebook.action.run_playbook - INFO - Ansible runner rc: 2, status: failed
2023-12-28 15:47:22,982 - ansible_rulebook.action.run_playbook - ERROR -
PLAY [wrapper] *****************************************************************

TASK [Module wrapper] **********************************************************
Pausing for 30 seconds
(ctrl+C then 'C' = continue early, ctrl+C then 'A' = abort)
ok: [localhost]

TASK [save result] *************************************************************
ok: [localhost]
fatal: [example.net]: FAILED! => {"msg": "The task includes an option with an undefined variable. The error was: 'module_result' is undefined. 'module_result' is undefined\n\nThe error appears to be in '/var/folders/kl/56c62hm51dd742lfbstbchgc0000gp/T/edatskbydkr/wrapper.yml': line 9, column 5, but may\nbe elsewhere in the file depending on the exact syntax problem.\n\nThe offending line appears to be:\n\n    register: module_result\n  - ansible.builtin.set_fact:\n    ^ here\n"}

PLAY RECAP *********************************************************************
localhost                  : ok=2    changed=0    unreachable=0    failed=0    skipped=0    rescued=0    ignored=0
example.net : ok=0    changed=0    unreachable=0    failed=1    skipped=0    rescued=0    ignored=0

2023-12-28 15:47:23,013 - ansible_rulebook.rule_set_runner - INFO - Task action::run_module::Rulebook for OpenStack TSF Controller::tsf network create rabbitmq event finished, active actions 0
2023-12-28 15:47:23,013 - ansible_rulebook.rule_set_runner - DEBUG - Creating action task action::run_module::Rulebook for OpenStack TSF Controller::tsf network create rabbitmq event
2023-12-28 15:47:23,013 - ansible_rulebook.rule_set_runner - INFO - call_action run_module
2023-12-28 15:47:23,013 - ansible_rulebook.rule_set_runner - INFO - action args: {'name': 'pause', 'module_args': {'seconds': 30}}
2023-12-28 15:47:23,014 - ansible_rulebook.action.run_playbook - INFO - ruleset: Rulebook for OpenStack TSF Controller, rule: tsf network create rabbitmq event
2023-12-28 15:47:23,014 - ansible_rulebook.action.run_playbook - DEBUG - private data dir /var/folders/kl/56c62hm51dd742lfbstbchgc0000gp/T/eda_yd4czdt
2023-12-28 15:47:23,027 - ansible_rulebook.action.run_playbook - DEBUG - project_data_file: None
2023-12-28 15:47:23,028 - ansible_rulebook.action.run_playbook - INFO - Calling Ansible runner

PLAY [wrapper] *****************************************************************

TASK [Module wrapper] **********************************************************
Pausing for 30 seconds
(ctrl+C then 'C' = continue early, ctrl+C then 'A' = abort)
ok: [localhost]

TASK [save result] *************************************************************
ok: [localhost]
fatal: [example.net]: FAILED! => {"msg": "The task includes an option with an undefined variable. The error was: 'module_result' is undefined. 'module_result' is undefined\n\nThe error appears to be in '/var/folders/kl/56c62hm51dd742lfbstbchgc0000gp/T/eda_yd4czdt/wrapper.yml': line 9, column 5, but may\nbe elsewhere in the file depending on the exact syntax problem.\n\nThe offending line appears to be:\n\n    register: module_result\n  - ansible.builtin.set_fact:\n    ^ here\n"}

PLAY RECAP *********************************************************************
localhost                  : ok=2    changed=0    unreachable=0    failed=0    skipped=0    rescued=0    ignored=0
example.net : ok=0    changed=0    unreachable=0    failed=1    skipped=0    rescued=0    ignored=0
2023-12-28 15:47:54,249 - ansible_rulebook.action.runner - DEBUG - Cancel Queue reading task
2023-12-28 15:47:54,249 - ansible_rulebook.action.runner - INFO - Ansible runner Queue task cancelled
2023-12-28 15:47:54,253 - ansible_rulebook.action.run_playbook - INFO - Ansible runner rc: 2, status: failed
2023-12-28 15:47:54,255 - ansible_rulebook.action.run_playbook - ERROR -
PLAY [wrapper] *****************************************************************

TASK [Module wrapper] **********************************************************
Pausing for 30 seconds
(ctrl+C then 'C' = continue early, ctrl+C then 'A' = abort)
ok: [localhost]

TASK [save result] *************************************************************
ok: [localhost]
fatal: [example.net]: FAILED! => {"msg": "The task includes an option with an undefined variable. The error was: 'module_result' is undefined. 'module_result' is undefined\n\nThe error appears to be in '/var/folders/kl/56c62hm51dd742lfbstbchgc0000gp/T/eda_yd4czdt/wrapper.yml': line 9, column 5, but may\nbe elsewhere in the file depending on the exact syntax problem.\n\nThe offending line appears to be:\n\n    register: module_result\n  - ansible.builtin.set_fact:\n    ^ here\n"}

PLAY RECAP *********************************************************************
localhost                  : ok=2    changed=0    unreachable=0    failed=0    skipped=0    rescued=0    ignored=0
example.net : ok=0    changed=0    unreachable=0    failed=1    skipped=0    rescued=0    ignored=0

2023-12-28 15:47:54,290 - ansible_rulebook.rule_set_runner - INFO - Task action::run_module::Rulebook for OpenStack TSF Controller::tsf network create rabbitmq event finished, active actions 0
2023-12-28 15:47:54,290 - ansible_rulebook.rule_set_runner - INFO - All actions done
2023-12-28 15:47:54,290 - ansible_rulebook.rule_set_runner - DEBUG - Action Plan Task Cancelled for ruleset Rulebook for OpenStack TSF Controller
2023-12-28 15:47:54,290 - ansible_rulebook.rule_set_runner - INFO - Cleaning up ruleset Rulebook for OpenStack TSF Controller
2023-12-28 15:47:54 320 [main] INFO org.drools.ansible.rulebook.integration.api.RulesExecutor - Disposing session with id: 1; SessionStats{start='2023-12-28T20:45:16.430644Z', end='2023-12-28T20:47:54.299216Z', lastClockTime='2023-12-28T20:47:54.231Z', clockAdvanceCount=1578, numberOfRules=2, numberOfDisabledRules=0, rulesTriggered=6, eventsProcessed=6, eventsMatched=6, eventsSuppressed=0, permanentStorageCount=0, permanentStorageSize=0, asyncResponses=0, bytesSentOnAsync=0, sessionId=1, ruleSetName='Rulebook for OpenStack TSF Controller', lastRuleFired='tsf network create rabbitmq event', lastRuleFiredAt='2023-12-28T20:46:12.431Z'}
2023-12-28 15:47:54,328 - ansible_rulebook.rule_set_runner - INFO - {'asyncResponses': 0,
 'bytesSentOnAsync': 0,
 'clockAdvanceCount': 1578,
 'end': '2023-12-28T20:47:54.299216Z',
 'eventsMatched': 6,
 'eventsProcessed': 6,
 'eventsSuppressed': 0,
 'lastClockTime': '2023-12-28T20:47:54.231Z',
 'lastRuleFired': 'tsf network create rabbitmq event',
 'lastRuleFiredAt': '2023-12-28T20:46:12.431Z',
 'numberOfDisabledRules': 0,
 'numberOfRules': 2,
 'permanentStorageCount': 0,
 'permanentStorageSize': 0,
 'ruleSetName': 'Rulebook for OpenStack TSF Controller',
 'rulesTriggered': 6,
 'sessionId': 1,
 'start': '2023-12-28T20:45:16.430644Z'}
2023-12-28 15:47:54,328 - ansible_rulebook.rule_set_runner - DEBUG - Source Task Cancelled for ruleset Rulebook for OpenStack TSF Controller
2023-12-28 15:47:54,328 - ansible_rulebook.engine - INFO - Cancelling all ruleset tasks
2023-12-28 15:47:54,328 - ansible_rulebook.engine - INFO - Waiting on gather
2023-12-28 15:47:54,328 - ansible_rulebook.engine - INFO - Returning from run_rulesets
2023-12-28 15:47:54,328 - ansible_rulebook.app - INFO - Cancelling event source tasks
2023-12-28 15:47:54,328 - drools.dispatch - DEBUG - Shutting down async channel
2023-12-28 15:47:54,330 - ansible_rulebook.app - INFO - Main complete

I also confirm that there are still messages in the RabbitMQ queue, via the management plugin
Screen Shot 2023-12-28 at 4 48 12 PM

@mkanoor
Copy link
Contributor

mkanoor commented Dec 31, 2023

@sc68cal Would it make more sense if we write a function called cancel_all_sources which can be called when a shutdown is issued. We would have to store all the tasks in a global variable in addition to this and use that global variable in cancel_all_sources, we would have to check if the task is done before we call cancel on it.. This function can be added in the app.py file. We would also need a task.done check here

@sc68cal
Copy link
Contributor Author

sc68cal commented Jan 2, 2024

Would it make more sense if we write a function called cancel_all_sources which can be called when a shutdown is issued.

Where would this new function be declared? The issue is that the coroutine for the app is in an await state waiting for the run_rulesets to return before it continues down to that line you referenced.

We would have to store all the tasks in a global variable

Generally, think global variables are harmful and should be avoided. It makes the code harder to follow. Plus, we'd have to use locks or flags from asyncio to coordinate access, which may not be worth doing.

we would have to check if the task is done before we call cancel on it..

I'm not sure what you mean here - because at least on the RabbitMQ plugin I've written, it has a coroutine that awaits on new messages from the queue, so it's never done, it just awaits forever.

We would also need a task.done check here

I'm not sure why we would be checking for done here, because we are trying to cancel consuming from event sources, that are never done

@sc68cal sc68cal marked this pull request as ready for review January 2, 2024 17:09
tests/test_engine.py Outdated Show resolved Hide resolved
@sc68cal
Copy link
Contributor Author

sc68cal commented Jan 2, 2024

Some of the tests create a SourceTask object and associate a source with it. I tried passing it in to run_ruleset instead of just passing an empty list

@@ -1045,7 +1045,7 @@ async def test_36_multiple_rulesets_both_fired():
         ):
             await run_rulesets(
                 event_log,
-                [],
+                [ruleset_queues[1][0].sources[0]],
                 ruleset_queues,
                 dict(),
                 "playbooks/inventory.yml",

but test_36_multiple_rulesets_both_fired hangs and never finishes. Investigating...

@sc68cal
Copy link
Contributor Author

sc68cal commented Jan 2, 2024

This also changes some of the behaviors of the end to end tests. I think it's because of the ordering of the messages and when the shutdown message is sent, but I'm not 100% sure.

FAILED tests/e2e/test_match_multiple_rules.py::test_match_multiple_rules - assert 4 == 5
FAILED tests/e2e/test_run_module_output.py::test_run_module_output - assert 1 == 2
FAILED tests/e2e/test_websocket.py::test_websocket_messages - assert 701 == 2000

tasks was a variable that contained the return from spawn_sources
as well as the websocket feedback task. These are different things,
and should not be passed into run_rulesets, since the feedback task
is not an event source.

tasks is now used as a list of all tasks, only for the asyncio.gather call
@sc68cal sc68cal changed the title [WIP] cancel sources earlier Cancel event sources earlier Jan 2, 2024
@sc68cal
Copy link
Contributor Author

sc68cal commented Jan 2, 2024

I think it's because of the ordering of the messages and when the shutdown message is sent, but I'm not 100% sure.

This is fixed by handling the feedback task differently in the app module. 143127d resolves

@sc68cal
Copy link
Contributor Author

sc68cal commented Apr 15, 2024

Hi,

Can I get some feedback on this pull request from the maintainers? I did a pretty deep dive on the internals of ansible-rulebook to track this down and feel that it is an important thing to fix in ansible-rulebook

Copy link

Quality Gate Failed Quality Gate failed

Failed conditions
12.1% Duplication on New Code (required ≤ 3%)

See analysis details on SonarCloud

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants