Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to perform deduplication in a cluster environment? #759

Open
Dzg0309 opened this issue Nov 30, 2023 · 5 comments
Open

How to perform deduplication in a cluster environment? #759

Dzg0309 opened this issue Nov 30, 2023 · 5 comments

Comments

@Dzg0309
Copy link

Dzg0309 commented Nov 30, 2023

When setting up the cluster environment, I want to run a deduplication task for a large data set (1T, stored locally), but how should I load the data? Should I put all the data on the supervisor node and then load it? Or should we divide the data equally into each node, and then run xorbits.init(address=http://supervisor_ip:web_port) on the supervisor node to load all the node data for deduplication? Please answer it, thank you~

@XprobeBot XprobeBot added this to the v0.7.2 milestone Nov 30, 2023
@ChengjieLi28
Copy link
Contributor

ChengjieLi28 commented Nov 30, 2023

Hi, @Dzg0309 . You should upload your data to a filesystem like S3 and use xorbits interfaces like read_csv, read_parquet (According to your data) to read it, then use deduplication operation to complete what you want.
Here's an example:

  1. First start you xorbits cluster, get the xorbits cluster endpoint.
import xorbits
xorbits.init("<your xorbits cluster endpoint>")

import xorbits.pandas as pd
df = pd.read_csv("s3://xxx") # according to your dataset. This step will process your data  in a distributed way.
# then use xorbits operator to complete your task

@Dzg0309
Copy link
Author

Dzg0309 commented Dec 1, 2023

You should upload your data to a filesystem like S3 and use xorbits interfaces like read_csv, read_parquet (According to your data) to read it,

Is it necessary to use the s3 file system? Or you can use the hadoop file system, or you can read the data files directly from the local. I tried putting all the data under the supervisor, and then when starting the cluster, other workers would report file not found errors. When the data is split to all nodes (the data directories of each node remain consistent), there will be a problem that the file does not exist during the operation, which makes me very confused.

Below is my code:

import xorbits
import xorbits.pandas as pd
from xorbits.experimental import dedup
import xorbits.datasets as xdatasets

xorbits.init(address='http://xx.xx.xxx.xx:xxxx',
              session_id = 'xorbits_dedup_test_09',
              )
ds = xdatasets.from_huggingface("/passdata/xorbits_data/mnbvc_test", split='train', cache_dir='/passdata/.cache')
df = ds.to_dataframe()

res = dedup(df, col="content", method="minhash", threshold=0.7, num_perm=128, min_length=5, ngrams=5, seed=42) # for 'minhash' method
res.to_parquet('/passdata/xorbits_data/output')
xorbits.shutdown()

@codingl2k1
Copy link
Contributor

codingl2k1 commented Dec 1, 2023

You should upload your data to a filesystem like S3 and use xorbits interfaces like read_csv, read_parquet (According to your data) to read it,

Is it necessary to use the s3 file system? Or you can use the hadoop file system, or you can read the data files directly from the local. I tried putting all the data under the supervisor, and then when starting the cluster, other workers would report file not found errors. When the data is split to all nodes (the data directories of each node remain consistent), there will be a problem that the file does not exist during the operation, which makes me very confused.

Below is my code:

import xorbits
import xorbits.pandas as pd
from xorbits.experimental import dedup
import xorbits.datasets as xdatasets

xorbits.init(address='http://xx.xx.xxx.xx:xxxx',
              session_id = 'xorbits_dedup_test_09',
              )
ds = xdatasets.from_huggingface("/passdata/xorbits_data/mnbvc_test", split='train', cache_dir='/passdata/.cache')
df = ds.to_dataframe()

res = dedup(df, col="content", method="minhash", threshold=0.7, num_perm=128, min_length=5, ngrams=5, seed=42) # for 'minhash' method
res.to_parquet('/passdata/xorbits_data/output')
xorbits.shutdown()

If the data is in a local directory, then each worker should have the same copy of the data in the same local path. Or, you can put the data in a S3 directory, then each worker get it's partition from S3 directly.

If your data is in csv or parquet format, you can try the read_parquet API: https://doc.xorbits.io/en/stable/reference/pandas/generated/xorbits.pandas.read_parquet.html#xorbits.pandas.read_parquet These APIs allow for more flexible slicing of data. But local data still needs to be copied to each worker in advance.

@Dzg0309
Copy link
Author

Dzg0309 commented Dec 4, 2023

ok I tried to copy the data to each node and it worked, but at the same time two other problems occurred:

  1. Can the read_json of xorbits directly read the data folder path and load the json file in parallel?
  2. After performing the deduplication, I wanted to use to_parquet('/passdata/xorbits_data/output') to save the data. I found that it was very slow and only saved a 0.parquet file to one of the nodes. This made me It’s a headache. I want it to be saved to multiple nodes in parallel to increase the saving speed. What should I do?

@codingl2k1
Copy link
Contributor

codingl2k1 commented Dec 5, 2023

ok I tried to copy the data to each node and it worked, but at the same time two other problems occurred:

  1. Can the read_json of xorbits directly read the data folder path and load the json file in parallel?
  2. After performing the deduplication, I wanted to use to_parquet('/passdata/xorbits_data/output') to save the data. I found that it was very slow and only saved a 0.parquet file to one of the nodes. This made me It’s a headache. I want it to be saved to multiple nodes in parallel to increase the saving speed. What should I do?
  1. Currenlty, read_json API is not implemented. So, it is fall back to pandas read_json which is not distributed. If your json data is in jsonl format (each line is a json string), then we can schedule a PR to implement the distributed read_json.

  2. to_parquet accept a path contains * to write the chunk data to the node that generated it. If your to_parquet only save the data to one node, you may want to check:

    • Is * in your save path? If not, add a * to it.
    • Is the data to parquet already tiled? If not, rechunk it or use a distributed data source, e.g. read_csv..

@XprobeBot XprobeBot modified the milestones: v0.7.2, v0.7.3 Jan 5, 2024
@XprobeBot XprobeBot modified the milestones: v0.7.3, v0.7.4 Aug 22, 2024
@luweizheng luweizheng removed this from the v0.7.4 milestone Dec 16, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants