Skip to content

Commit

Permalink
Fix conversion from arrow column to arrow RecordBatch
Browse files Browse the repository at this point in the history
  • Loading branch information
wjsi committed Sep 20, 2023
1 parent 2175678 commit 5cbb8af
Show file tree
Hide file tree
Showing 15 changed files with 76 additions and 27 deletions.
6 changes: 6 additions & 0 deletions mars/dataframe/arrays.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,12 @@ def __repr__(self):
def _array(self):
return self._arrow_array if self._use_arrow else self._ndarray

def __arrow_array__(self, type=None):
if self._use_arrow:
combined = self._arrow_array.combine_chunks()
return combined.cast(type) if type else combined
return super().__arrow_array__(type=type)

Check warning on line 308 in mars/dataframe/arrays.py

View check run for this annotation

Codecov / codecov/patch

mars/dataframe/arrays.py#L306-L308

Added lines #L306 - L308 were not covered by tests

@property
def dtype(self) -> "Type[ArrowDtype]":
return self._dtype
Expand Down
5 changes: 5 additions & 0 deletions mars/dataframe/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
tokenize,
estimate_pandas_size,
calc_nsplits,
is_debugger_repr_thread,
)
from .utils import fetch_corner_data, ReprSeries, parse_index, merge_index_value

Expand Down Expand Up @@ -1430,6 +1431,10 @@ def __mars_tensor__(self, dtype=None, order="K"):
return tensor.astype(dtype=dtype, order=order, copy=False)

def iteritems(self, batch_size=10000, session=None):
if is_build_mode():

Check warning on line 1434 in mars/dataframe/core.py

View check run for this annotation

Codecov / codecov/patch

mars/dataframe/core.py#L1434

Added line #L1434 was not covered by tests
raise NotImplementedError("Not implemented when building dags")
if is_debugger_repr_thread() and len(self._executed_sessions) == 0:

Check warning on line 1436 in mars/dataframe/core.py

View check run for this annotation

Codecov / codecov/patch

mars/dataframe/core.py#L1436

Added line #L1436 was not covered by tests
raise NotImplementedError("Not implemented when not executed under debug")
for batch_data in self.iterbatch(batch_size=batch_size, session=session):
yield from getattr(batch_data, "iteritems")()

Expand Down
4 changes: 4 additions & 0 deletions mars/dataframe/tests/test_arrays.py
Original file line number Diff line number Diff line change
Expand Up @@ -480,3 +480,7 @@ def test_to_pandas():
s2 = df2["b"].str[:2]
expected = df["b"].astype("string").str[:2]
pd.testing.assert_series_equal(s2, expected)

# test reverse conversion to arrow
arrow_data = pa.RecordBatch.from_pandas(df2)
assert arrow_data.num_rows == len(df2)
8 changes: 6 additions & 2 deletions mars/learn/contrib/lightgbm/classifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@
lightgbm = None


LGBMClassifier = make_import_error_func("lightgbm")
if lightgbm:
if not lightgbm:
LGBMClassifier = make_import_error_func("lightgbm")

Check warning on line 28 in mars/learn/contrib/lightgbm/classifier.py

View check run for this annotation

Codecov / codecov/patch

mars/learn/contrib/lightgbm/classifier.py#L28

Added line #L28 was not covered by tests
else:

class LGBMClassifier(LGBMScikitLearnBase, lightgbm.LGBMClassifier):
def fit(
Expand All @@ -42,7 +43,10 @@ def fit(
**kwargs
):
check_consistent_length(X, y, session=session, run_kwargs=run_kwargs)

params = self.get_params(True)
self._fix_verbose_args(kwargs, params)

Check warning on line 48 in mars/learn/contrib/lightgbm/classifier.py

View check run for this annotation

Codecov / codecov/patch

mars/learn/contrib/lightgbm/classifier.py#L48

Added line #L48 was not covered by tests

model = train(
params,
self._wrap_train_tuple(X, y, sample_weight, init_score),
Expand Down
13 changes: 12 additions & 1 deletion mars/learn/contrib/lightgbm/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@
import numpy as np
import pandas as pd

from ....tensor import tensor as mars_tensor
from ....dataframe import DataFrame as MarsDataFrame, Series as MarsSeries
from ....lib.version import parse as parse_version
from ....tensor import tensor as mars_tensor


class LGBMModelType(enum.Enum):
Expand Down Expand Up @@ -58,6 +59,16 @@ def __init__(self, *args, **kwargs):
else:
super().__init__(*args, **kwargs)

@classmethod
def _fix_verbose_args(cls, kwds, params):
if "verbose" not in kwds:
return

Check warning on line 65 in mars/learn/contrib/lightgbm/core.py

View check run for this annotation

Codecov / codecov/patch

mars/learn/contrib/lightgbm/core.py#L65

Added line #L65 was not covered by tests

import lightgbm

Check warning on line 67 in mars/learn/contrib/lightgbm/core.py

View check run for this annotation

Codecov / codecov/patch

mars/learn/contrib/lightgbm/core.py#L67

Added line #L67 was not covered by tests

if parse_version(lightgbm.__version__).major >= 4:
params["verbose"] = kwds.pop("verbose")

Check warning on line 70 in mars/learn/contrib/lightgbm/core.py

View check run for this annotation

Codecov / codecov/patch

mars/learn/contrib/lightgbm/core.py#L70

Added line #L70 was not covered by tests

@classmethod
def _get_lgbm_class(cls):
try:
Expand Down
8 changes: 6 additions & 2 deletions mars/learn/contrib/lightgbm/ranker.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@
lightgbm = None


LGBMRanker = make_import_error_func("lightgbm")
if lightgbm:
if not lightgbm:
LGBMRanker = make_import_error_func("lightgbm")

Check warning on line 28 in mars/learn/contrib/lightgbm/ranker.py

View check run for this annotation

Codecov / codecov/patch

mars/learn/contrib/lightgbm/ranker.py#L28

Added line #L28 was not covered by tests
else:

class LGBMRanker(LGBMScikitLearnBase, lightgbm.LGBMRanker):
def fit(
Expand All @@ -43,7 +44,10 @@ def fit(
**kwargs
):
check_consistent_length(X, y, session=session, run_kwargs=run_kwargs)

params = self.get_params(True)
self._fix_verbose_args(kwargs, params)

Check warning on line 49 in mars/learn/contrib/lightgbm/ranker.py

View check run for this annotation

Codecov / codecov/patch

mars/learn/contrib/lightgbm/ranker.py#L49

Added line #L49 was not covered by tests

model = train(
params,
self._wrap_train_tuple(X, y, sample_weight, init_score),
Expand Down
8 changes: 6 additions & 2 deletions mars/learn/contrib/lightgbm/regressor.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@
lightgbm = None


LGBMRegressor = make_import_error_func("lightgbm")
if lightgbm:
if not lightgbm:
LGBMRegressor = make_import_error_func("lightgbm")

Check warning on line 28 in mars/learn/contrib/lightgbm/regressor.py

View check run for this annotation

Codecov / codecov/patch

mars/learn/contrib/lightgbm/regressor.py#L28

Added line #L28 was not covered by tests
else:

class LGBMRegressor(LGBMScikitLearnBase, lightgbm.LGBMRegressor):
def fit(
Expand All @@ -42,7 +43,10 @@ def fit(
**kwargs
):
check_consistent_length(X, y, session=session, run_kwargs=run_kwargs)

params = self.get_params(True)
self._fix_verbose_args(kwargs, params)

Check warning on line 48 in mars/learn/contrib/lightgbm/regressor.py

View check run for this annotation

Codecov / codecov/patch

mars/learn/contrib/lightgbm/regressor.py#L48

Added line #L48 was not covered by tests

model = train(
params,
self._wrap_train_tuple(X, y, sample_weight, init_score),
Expand Down
5 changes: 3 additions & 2 deletions mars/learn/contrib/xgboost/classifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@
from .core import xgboost, XGBScikitLearnBase


XGBClassifier = make_import_error_func("xgboost")
if xgboost:
if not xgboost:
XGBClassifier = make_import_error_func("xgboost")

Check warning on line 20 in mars/learn/contrib/xgboost/classifier.py

View check run for this annotation

Codecov / codecov/patch

mars/learn/contrib/xgboost/classifier.py#L20

Added line #L20 was not covered by tests
else:
from xgboost.sklearn import XGBClassifierBase

from .... import tensor as mt
Expand Down
5 changes: 3 additions & 2 deletions mars/learn/contrib/xgboost/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@
from .dmatrix import MarsDMatrix


XGBScikitLearnBase = None
if xgboost:
if not xgboost:
XGBScikitLearnBase = None

Check warning on line 26 in mars/learn/contrib/xgboost/core.py

View check run for this annotation

Codecov / codecov/patch

mars/learn/contrib/xgboost/core.py#L26

Added line #L26 was not covered by tests
else:

class XGBScikitLearnBase(xgboost.XGBModel):
"""
Expand Down
5 changes: 3 additions & 2 deletions mars/learn/contrib/xgboost/regressor.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@
from .core import xgboost, XGBScikitLearnBase


XGBRegressor = make_import_error_func("xgboost")
if xgboost:
if not xgboost:
XGBRegressor = make_import_error_func("xgboost")

Check warning on line 21 in mars/learn/contrib/xgboost/regressor.py

View check run for this annotation

Codecov / codecov/patch

mars/learn/contrib/xgboost/regressor.py#L21

Added line #L21 was not covered by tests
else:
from .core import wrap_evaluation_matrices
from .train import train
from .predict import predict
Expand Down
10 changes: 5 additions & 5 deletions mars/lib/mkl_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@ class MKLVersion(ctypes.Structure):
]


mkl_free_buffers = None
mkl_get_version = None
mkl_mem_stat = None

mkl_rt = _load_mkl_rt("mkl_rt")
if mkl_rt:
if not mkl_rt:
mkl_free_buffers = None
mkl_get_version = None
mkl_mem_stat = None
else:
try:
mkl_free_buffers = mkl_rt.mkl_free_buffers
mkl_free_buffers.argtypes = []
Expand Down
16 changes: 12 additions & 4 deletions mars/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1673,7 +1673,7 @@ def get_func_token(func):


def _get_func_token_values(func):
if hasattr(func, "__code__"):
if hasattr(func, "__code__") and func.__code__.co_code:
tokens = [func.__code__.co_code]
if func.__closure__ is not None:
cvars = tuple([x.cell_contents for x in func.__closure__])
Expand All @@ -1684,10 +1684,13 @@ def _get_func_token_values(func):
while isinstance(func, functools.partial):
tokens.extend([func.args, func.keywords])
func = func.func
if hasattr(func, "__code__"):
tokens.extend(_get_func_token_values(func))
elif isinstance(func, types.BuiltinFunctionType):
if (
isinstance(func, types.BuiltinFunctionType)
or "cython" in type(func).__name__
):
tokens.extend([func.__module__, func.__name__])
elif hasattr(func, "__code__"):
tokens.extend(_get_func_token_values(func))

Check warning on line 1693 in mars/utils.py

View check run for this annotation

Codecov / codecov/patch

mars/utils.py#L1693

Added line #L1693 was not covered by tests
else:
tokens.append(func)
return tokens
Expand Down Expand Up @@ -1912,3 +1915,8 @@ def get_node_ip_address(address="8.8.8.8:53"):
s.close()

return node_ip_address


def is_debugger_repr_thread():
thread_cls_name = type(threading.current_thread()).__name__
return "GetValue" in thread_cls_name and "Debug" in thread_cls_name

Check warning on line 1922 in mars/utils.py

View check run for this annotation

Codecov / codecov/patch

mars/utils.py#L1921-L1922

Added lines #L1921 - L1922 were not covered by tests
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ install_requires =
pandas>=1.0.0,<2.0.0
scipy>=1.0.0
scikit-learn>=0.20
numexpr>=2.6.4
numexpr>=2.6.4,!=2.8.5
cloudpickle>=1.5.0
pyyaml>=5.1
psutil>=5.9.0
Expand Down
5 changes: 2 additions & 3 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,10 @@
import warnings
from sysconfig import get_config_vars

from pkg_resources import parse_version
from setuptools import setup, Extension, Command

import numpy as np
from Cython.Build import cythonize
from pkg_resources import parse_version
from setuptools import Command, Extension, setup
from setuptools.command.develop import develop
from setuptools.command.install import install
from setuptools.command.sdist import sdist
Expand Down
3 changes: 2 additions & 1 deletion versioneer.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,13 +282,13 @@

import configparser
import errno
import functools
import json
import os
import re
import subprocess
import sys
from typing import Callable, Dict
import functools


class VersioneerConfig:
Expand Down Expand Up @@ -1872,6 +1872,7 @@ def run(self):

if "cx_Freeze" in sys.modules: # cx_freeze enabled?
from cx_Freeze.dist import build_exe as _build_exe

# nczeczulin reports that py2exe won't like the pep440-style string
# as FILEVERSION, but it can be used for PRODUCTVERSION, e.g.
# setup(console=[{
Expand Down

0 comments on commit 5cbb8af

Please sign in to comment.