Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HIL-SERL] Migrate threading to multiprocessing #759

Open
wants to merge 16 commits into
base: user/adil-zouitine/2025-1-7-port-hil-serl-new
Choose a base branch
from

Conversation

helper2424
Copy link
Contributor

What this does

Explain what this PR does. Feel free to tag your PR with the appropriate label(s).

Examples:

Title Label
Fixes #[issue] (πŸ› Bug)
Adds new dataset (πŸ—ƒοΈ Dataset)
Optimizes something (⚑️ Performance)

How it was tested

Explain/show how you tested your changes.

Examples:

  • Added test_something in tests/test_stuff.py.
  • Added new_feature and checked that training converges with policy X on dataset/environment Y.
  • Optimized some_function, it now runs X times faster than previously.

How to checkout & try? (for the reviewer)

Provide a simple way for the reviewer to try out your changes.

Examples:

pytest -sx tests/test_stuff.py::test_something
python lerobot/scripts/train.py --some.option=true

SECTION TO REMOVE BEFORE SUBMITTING YOUR PR

Note: Anyone in the community is free to review the PR once the tests have passed. Feel free to tag
members/contributors who may be interested in your PR. Try to avoid tagging more than 3 people.

Note: Before submitting this PR, please read the contributor guideline.

step = 0

logging.debug(
f"{log_prefix} Queue updated, {queue.qsize()} items in the queue"
Copy link

@ChorntonYoel ChorntonYoel Feb 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the qsize() breaks on macos, unless the queues are from a manager. So I think either this part should to change, or we should just create the queues like this in the actor_server:

    parameters_queue = manager.Queue(maxsize=1)
    transitions_queue = manager.Queue(maxsize=5000)
    interactions_queue = manager.Queue(maxsize=5000)`

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, fixed

@helper2424 helper2424 force-pushed the user/helper2424/add_mp branch from dd4b3e0 to 264a48e Compare March 1, 2025 10:15
@helper2424 helper2424 force-pushed the user/helper2424/add_mp branch from c3acad7 to 31af309 Compare March 1, 2025 14:30
Comment on lines +165 to +168
"/".join(
[".."] * (len(path2.parts) - len(common_parts))
+ list(path1.parts[len(common_parts) :])
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose of this function? why we need it in the init_hydra_config πŸ˜„

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a good question. All these fixes came from the linter, it's not me.

I have added an ability to use files in init_logging only

@@ -110,4 +108,4 @@ policy:
actor_learner_config:
learner_host: "127.0.0.1"
learner_port: 50051
policy_parameters_push_frequency: 15
policy_parameters_push_frequency: 1
Copy link
Member

@AdilZouitine AdilZouitine Mar 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not too frequent? the network handle it? It explain also why you converge in 20k because your update is more frequent

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

learner_port=8083 Why this change? The default port for grpc is 50051 by convention

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have returned the port to 50051. I used anothe one - because in vast.ai they use port forwarding, so internal port will 8088 but the external for the machine will 50051. I made it for testing, but removed already.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding the grequency - yeah, I have played with it. And the situation looks following:

  • we have a learner, which updates it's weights hundred times per second
  • we have actor that collects data on it's side

They both depends on each other - as soon we have better weights - we should deliver it to actor, it collects better data - we deliver it to learner, learner so it could train with new data and create better weights.

Delivering weights every 15 seconds is very slow for such architecture. The learner have weights updates every second - so we can deliver it to the actor. If learning process would be slower - like bigger nn's, than we can deliver it now so often, but convergence will be slower too.

@@ -24,6 +24,7 @@

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have to rebase, to integrate the last replay buffer modification, it helps a lot for the performance

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, it should be already rebased on the last commit from your branch

transition (Optional): Transition data to be sent to the learner.
interaction_message (Optional): Iteraction message providing additional statistics for logging.
"""
from lerobot.scripts.server.utils import get_last_item_from_queue
Copy link
Member

@AdilZouitine AdilZouitine Mar 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to remove this file for the merge debug.py

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done



def transitions_stream(shutdown_event: Event, message_queue: queue.Queue):
def transitions_stream(
shutdown_event: Event, transitions_queue: Queue
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got this info from pylance for Event Variable not allowed in type expression Pylance maybe we should mute pylance for this line


yield response
def interactions_stream(
shutdown_event: Event, interactions_queue: Queue
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto, pylance

shutdown_event: Event,
message_queue: queue.Queue,
):
) -> hilserl_pb2.Empty:
"""
Streams data from the actor to the learner.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The doctstring is not right, this function send transitions only

cfg: DictConfig,
robot: Robot,
reward_classifier: nn.Module,
shutdown_event: Event,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto, Pylance

@@ -458,39 +477,70 @@ def log_policy_frequency_issue(
)


def establish_learner_connection(stub, shutdown_event: Event, attempts=30):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto Pylance


logging.info("[ACTOR] Connection with Learner established")

parameters_queue = Queue()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not put the size of the Queue equals to one?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some issues with Macos, what @ChorntonYoel pointed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have explained it here - #759 (comment).

By default, if the queue has some size, then when u try to put anything in the full queue - the thread (or process) will be locked till somebody extract anything on another side with .get method. So, queue size will limit the perfomance of data consumer.

it's better here to allow putting data without any locks and just drain the queue on another side till the last element is extracted.


transition_queue = queue.Queue()
interaction_message_queue = queue.Queue()
import torch.multiprocessing as mp
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel a lot of work with this line hahah

@@ -345,33 +434,39 @@ def add_actor_information_and_train(
interaction_step_shift = (
resume_interaction_step if resume_interaction_step is not None else 0
)
saved_data = False
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch this variable was useless

@AdilZouitine
Copy link
Member

Overall, it looks good to me. I just have a minor remark or question.

I will run it to benchmark now and after the rebase, the goal is to see an improvement in optimization speed. πŸ˜„

Thank you, Eugene! Great work!

@helper2424 helper2424 force-pushed the user/helper2424/add_mp branch from abf3769 to 9f0c1b5 Compare March 3, 2025 14:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants