Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add streaming to LeRobotDataset #740

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

Cadene
Copy link
Collaborator

@Cadene Cadene commented Feb 17, 2025

What this does

A streaming mode could be useful for training at scale. Instead of downloading all the data locally at one place, data subsets are streamed to training nodes on-demand in a fully distributed fashion. It could also unlock federated learning at a massive scale.

Note: LeRobotDataset has already been used with federated learning tools, but not at a massive scale. See twitter post and flower quickstart lerobot.

This PR adds a streaming: bool argument to LeRobotDataset. Changes are minimal because LeRobotDataset relies on hugging face datasets library and torchvision.io.VideoReader, both already support streaming.

Note: The current implementation of streaming in HF datasets after shuffle is random access.

TODO to unblock merge:

  • Compare against webdataset
  • Show a real use case where streaming=True is useful
  • Add tests

How it was tested

Load single frame from distant video:

from lerobot.common.datasets.video_utils import decode_video_frames_torchvision
video_path = "https://huggingface.co/datasets/pepijn223/mobileso100_drive_forward6/resolve/main/videos/chunk-000/observation.images.mobile/episode_000000.mp4"
frames = decode_video_frames_torchvision(video_path, [0.0], tolerance_s=0.2)

Load items:

from pathlib import Path
from lerobot.common.datasets.lerobot_dataset import LeRobotDataset
import tqdm

def main():
    dataset = LeRobotDataset("pepijn223/mobileso100_drive_forward6", episodes=[0], streaming=True)

    for i in tqdm.tqdm(range(100000)):
        item = dataset[i]
        print(f'{item["index"]=}')
        print(f'{item["frame_index"]=}')
        print(f'{item["episode_index"]=}')
        print(f'{item["timestamp"]=}')

if __name__ == "__main__":
    main()

Copy link

@Tavish9 Tavish9 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice feat! Look forward to it 🎊

@aopolin-lv
Copy link

Hi, thanks for your effort. However, I notice the streaming mode does not support the 'n_action_step' setting. Do you consider about adding this feature?

@Cadene
Copy link
Collaborator Author

Cadene commented Feb 24, 2025

Hi, thanks for your effort. However, I notice the streaming mode does not support the 'n_action_step' setting. Do you consider about adding this feature?

I am sorry I didnt understand. What do you mean? @aopolin-lv

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants