diff --git a/src/Processors/Sources/PythonSource.cpp b/src/Processors/Sources/PythonSource.cpp index 6915cb710e8..e46c8e68bfc 100644 --- a/src/Processors/Sources/PythonSource.cpp +++ b/src/Processors/Sources/PythonSource.cpp @@ -1,3 +1,4 @@ +#include #include #include "base/scope_guard.h" @@ -106,7 +107,16 @@ void PythonSource::convert_string_array_to_block( offsets.reserve(row_count); for (size_t i = offset; i < offset + row_count; ++i) { - FillColumnString(buf[i], string_column); + auto * obj = buf[i]; + if (!PyUnicode_Check(obj)) + { + LOG_ERROR( + logger, + "Unsupported Python object type {}, Unicode string expected here. Try convert column type to str with `astype(str)`", + Py_TYPE(obj)->tp_name); + throw Exception(ErrorCodes::BAD_TYPE_OF_FIELD, "Unsupported Python object type {}", Py_TYPE(obj)->tp_name); + } + FillColumnString(obj, string_column); // Try to help reserve memory for the string column data every 100 rows to avoid frequent reallocations // Check the avg size of the string column data and reserve memory accordingly if ((i - offset) % 10 == 9) @@ -278,11 +288,34 @@ Chunk PythonSource::genChunk(size_t & num_rows, PyObjectVecPtr data) type->getName(), description.sample_block.getByPosition(i).name); } - catch (const Exception & e) + catch (Exception & e) + { + destory(data); + LOG_ERROR(logger, "Error processing column \"{}\": {}", description.sample_block.getByPosition(i).name, e.what()); + throw Exception( + ErrorCodes::PY_EXCEPTION_OCCURED, + "Error processing column \"{}\": {}", + description.sample_block.getByPosition(i).name, + e.what()); + } + catch (std::exception & e) + { + destory(data); + LOG_ERROR(logger, "Error processing column \"{}\": {}", description.sample_block.getByPosition(i).name, e.what()); + throw Exception( + ErrorCodes::PY_EXCEPTION_OCCURED, + "Error processing column \"{}\": {}", + description.sample_block.getByPosition(i).name, + e.what()); + } + catch (...) { destory(data); - LOG_ERROR(logger, "Error processing column {}: {}", i, e.what()); - throw; + LOG_ERROR(logger, "Error processing column \"{}\": unknown exception", description.sample_block.getByPosition(i).name); + throw Exception( + ErrorCodes::PY_EXCEPTION_OCCURED, + "Error processing column \"{}\": unknown exception", + description.sample_block.getByPosition(i).name); } } @@ -415,10 +448,20 @@ Chunk PythonSource::scanDataToChunk() // LOG_DEBUG(logger, "Column {} data: {}", col.name, ss.str()); } } - catch (const Exception & e) + catch (Exception & e) + { + LOG_ERROR(logger, "Error processing column \"{}\": {}", col.name, e.what()); + throw Exception(ErrorCodes::PY_EXCEPTION_OCCURED, "Error processing column \"{}\": {}", col.name, e.what()); + } + catch (std::exception & e) + { + LOG_ERROR(logger, "Error processing column \"{}\": {}", col.name, e.what()); + throw Exception(ErrorCodes::PY_EXCEPTION_OCCURED, "Error processing column \"{}\": {}", col.name, e.what()); + } + catch (...) { - LOG_ERROR(logger, "Error processing column {}: {}", i, e.what()); - throw; + LOG_ERROR(logger, "Error processing column \"{}\": unknown exception", col.name); + throw Exception(ErrorCodes::PY_EXCEPTION_OCCURED, "Error processing column \"{}\": unknown exception", col.name); } } cursor += count; diff --git a/tests/test_issue251.py b/tests/test_issue251.py new file mode 100644 index 00000000000..76b9a83d89f --- /dev/null +++ b/tests/test_issue251.py @@ -0,0 +1,56 @@ +#!python3 + +import os +import unittest +import zipfile +import urllib.request + +import pandas as pd +import chdb + + +class TestIssue251(unittest.TestCase): + def setUp(self): + # if /tmp/issue251/artifacts/create_final_community_reports.parquet not exists, + # download https://github.com/user-attachments/files/16361689/parquet-test-data.zip + # and unzip it to /tmp/issue251/ + if not os.path.exists( + "/tmp/issue251/artifacts/create_final_community_reports.parquet" + ): + print("Downloading parquet-test-data.zip") + + url = "https://github.com/user-attachments/files/16361689/parquet-test-data.zip" + os.makedirs("/tmp/issue251/", exist_ok=True) + urllib.request.urlretrieve(url, "/tmp/issue251/parquet-test-data.zip") + with zipfile.ZipFile("/tmp/issue251/parquet-test-data.zip", "r") as zip_ref: + zip_ref.extractall("/tmp/issue251/") + + def test_issue251(self): + df = pd.read_parquet( + "/tmp/issue251/artifacts/create_final_community_reports.parquet", + columns=[ + "id", + "community", + "level", + "title", + "summary", + "findings", + "rank", + "rank_explanation", + ], + ) + + # make pandas show all columns + pd.set_option("display.max_columns", None) + print(df.head(2)) + print(df.dtypes) + try: + chdb.query("FROM Python(df) SELECT * LIMIT 10") + except Exception as e: + self.assertTrue( + "Unsupported Python object type numpy.ndarray" in str(e) + ) + + +if __name__ == "__main__": + unittest.main()