Skip to content

[ENH] : Implementation of read_archive function #1438

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions AUTHORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,4 @@ Contributors
- [@joranbeasley](https://github.com/joranbeasley) | [contributions](https://github.com/pyjanitor-devs/pyjanitor/issues?q=is%3Aclosed+mentions%joranbeasley)
-[@kianmeng](https://github.com/kianmeng) | [contributions](https://github.com/pyjanitor-devs/pyjanitor/pull/1290#issue-1906020324)
- [@lbeltrame](https://github.com/lbeltrame) | [contributions](https://github.com/pyjanitor-devs/pyjanitor/pull/1401)
- [@Sabrina-Hassaim](https://github.com/Sabrina-Hassaim) | [contributions](https://github.com/pyjanitor-devs/pyjanitor/issues?q=is%3Aclosed+mentions%3ASabrinaHassaim)
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
2 changes: 1 addition & 1 deletion README.md
186 changes: 186 additions & 0 deletions janitor/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
import inspect
import os
import subprocess
import tarfile
import warnings
import zipfile
from collections import defaultdict
from glob import glob
from io import StringIO
Expand Down Expand Up @@ -689,3 +691,187 @@ def _object_to_dict(obj):
data[key] = _object_to_dict(value)
return data
return obj


#################################################################


def read_archive(
file_path: str,
extract_to_df: bool = True,
file_type: str | None = None,
selected_files: list[str] | None = None,
) -> pd.DataFrame | list[str]:
"""
Reads an archive file (.zip, .tar, .tar.gz) and optionally lists its content
or extracts specific files into a DataFrame.

Args:
file_path: The path to the archive file.
extract_to_df: Whether to read the contents into a DataFrame
(for CSV or similar formats). Default is True.
file_type: Optional file type hint ('zip', 'tar', 'tar.gz').
If None, it will be inferred from the file extension.
selected_files: List of files to read directly without user interaction.

Returns:
- A pandas DataFrame if extract_to_df is True
and the user selects a file to load.
- A list of dataframes that contains
compatible file names in the archive otherwise.
"""
file_type = file_type or _infer_file_type(file_path)

if file_type == "zip":
return _process_zip_archive(file_path, extract_to_df, selected_files)
elif file_type in {"tar", "tar.gz"}:
return _process_tar_archive(file_path, extract_to_df, selected_files)
else:
raise ValueError(
"Unsupported archive format. Supported formats are .zip, .tar, or .tar.gz."
)


def _process_zip_archive(
file_path: str, extract_to_df: bool, selected_files: list[str] | None
) -> pd.DataFrame | list[str]:
"""Process a ZIP archive."""
with zipfile.ZipFile(file_path) as archive:
compatible_files = _list_compatible_files(archive.namelist())

if extract_to_df:
return _select_and_extract_from_zip(
archive, compatible_files, selected_files
)
return compatible_files


def _process_tar_archive(
file_path: str, extract_to_df: bool, selected_files: list[str] | None
) -> pd.DataFrame | list[str]:
"""Process a TAR archive."""
mode = "r:gz" if file_path.endswith(".gz") else "r"
with tarfile.open(file_path, mode) as archive:
compatible_files = _list_compatible_files(archive.getnames())

if extract_to_df:
return _select_and_extract_from_tar(
archive, compatible_files, selected_files
)
return compatible_files


def _select_and_extract_from_zip(
archive: zipfile.ZipFile,
compatible_files: list[str],
selected_files: list[str] | None,
) -> pd.DataFrame | list[pd.DataFrame]:
"""Select and read specific files from a ZIP archive."""
if not selected_files:
selected_files = _select_files_interactively(compatible_files)

dfs = []
for selected_file in selected_files:
with archive.open(selected_file) as file:
if selected_file.endswith(".csv"):
dfs.append(pd.read_csv(file))
elif selected_file.endswith(".xlsx"):
dfs.append(pd.read_excel(file))
return dfs if len(dfs) > 1 else dfs[0]


def _select_and_extract_from_tar(
archive: tarfile.TarFile,
compatible_files: list[str],
selected_files: list[str] | None,
) -> pd.DataFrame | list[pd.DataFrame]:
"""Select and read specific files from a TAR archive."""
if not selected_files:
selected_files = _select_files_interactively(compatible_files)

dfs = []
for selected_file in selected_files:
member = archive.getmember(selected_file)
with archive.extractfile(member) as file:
if selected_file.endswith(".csv"):
dfs.append(pd.read_csv(file))
elif selected_file.endswith(".xlsx"):
dfs.append(pd.read_excel(file))
return dfs if len(dfs) > 1 else dfs[0]


def _select_files_interactively(compatible_files: list[str]) -> list[str]:
"""
Allow the user to select files from a list interactively.

Args:
compatible_files: List of compatible file names.

Returns:
List of selected file names.
"""
print("Compatible files found in the archive:")
for idx, file_name in enumerate(compatible_files, 1):
print(f"{idx}. {file_name}")

selected_indices = (
input(
"Enter the numbers of the files to read, "
"separated by commas (e.g., 1,2,3): "
)
.strip()
.split(",")
)
selected_files = [
compatible_files[int(idx) - 1]
for idx in selected_indices
if idx.strip().isdigit() and 0 < int(idx) <= len(compatible_files)
]
if not selected_files:
raise ValueError("No valid files selected.")
return selected_files


def _list_compatible_files(file_names: list[str]) -> list[str]:
"""
Helper function to list compatible files (e.g., .csv, .xlsx) from an archive.

Args:
file_names: List of file names in the archive.

Returns:
List of compatible file names.
"""
compatible_files = [
file_name
for file_name in file_names
if file_name.endswith((".csv", ".xlsx"))
]
print("Compatible files detected :", compatible_files)
if not compatible_files:
raise ValueError("No compatible files found in the archive.")
return compatible_files


def _infer_file_type(file_path: str) -> str:
"""
Infer the type of the archive based on the file extension.

Args:
file_path: Path to the file.

Returns:
A string representing the archive type ('zip', 'tar', 'tar.gz').

Raises:
ValueError if the file extension is unsupported.
"""
if file_path.endswith(".zip"):
return "zip"
elif file_path.endswith((".tar", ".tar.gz")):
return "tar.gz" if file_path.endswith(".tar.gz") else "tar"
else:
raise ValueError(
"Cannot infer file type from the file extension. "
"Please specify the 'file_type' parameter."
)
2 changes: 1 addition & 1 deletion mkdocs/AUTHORS.md
2 changes: 1 addition & 1 deletion mkdocs/CHANGELOG.md
3 changes: 3 additions & 0 deletions test.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
col1,col2
1,2
3,4
77 changes: 77 additions & 0 deletions tests/io/test_read_archive.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import io
import tarfile
import zipfile

import pandas as pd
import pytest

from janitor.io import read_archive


@pytest.fixture
def zip_test_file(tmp_path):
"""Fixture pour créer un fichier ZIP de test."""
zip_path = tmp_path / "test.zip"
with zipfile.ZipFile(zip_path, mode="w") as zf:
zf.writestr("file1.csv", "col1,col2\n1,2\n3,4")
zf.writestr("file2.csv", "col3,col4\n5,6\n7,8")
return zip_path


@pytest.fixture
def tar_test_file(tmp_path):
"""Fixture pour créer un fichier TAR de test."""
tar_path = tmp_path / "test.tar.gz"
with tarfile.open(tar_path, mode="w:gz") as tf:
info1 = tarfile.TarInfo(name="file1.csv")
data1 = io.BytesIO(b"col1,col2\n1,2\n3,4")
info1.size = data1.getbuffer().nbytes
tf.addfile(info1, data1)

info2 = tarfile.TarInfo(name="file2.csv")
data2 = io.BytesIO(b"col3,col4\n5,6\n7,8")
info2.size = data2.getbuffer().nbytes
tf.addfile(info2, data2)
return tar_path


def test_read_zip_archive(zip_test_file):
result = read_archive(
str(zip_test_file), extract_to_df=True, selected_files=["file1.csv"]
)
assert isinstance(result, pd.DataFrame)
assert list(result.columns) == ["col1", "col2"]
assert result.shape == (2, 2)


def test_list_files_in_zip(zip_test_file):
result = read_archive(str(zip_test_file), extract_to_df=False)
assert isinstance(result, list)
assert "file1.csv" in result
assert "file2.csv" in result


def test_no_compatible_files(tmp_path):
zip_path = tmp_path / "empty.zip"
with zipfile.ZipFile(zip_path, mode="w") as zf:
zf.writestr("file1.txt", "Just some text")
with pytest.raises(
ValueError, match="No compatible files found in the archive"
):
read_archive(str(zip_path))


def test_read_tar_archive(tar_test_file):
result = read_archive(
str(tar_test_file), extract_to_df=True, selected_files=["file1.csv"]
)
assert isinstance(result, pd.DataFrame)
assert list(result.columns) == ["col1", "col2"]
assert result.shape == (2, 2)


def test_list_files_in_tar(tar_test_file):
result = read_archive(str(tar_test_file), extract_to_df=False)
assert isinstance(result, list)
assert "file1.csv" in result
assert "file2.csv" in result
Loading