From f1b7255d8fe89b9a7f847287dadd16c58212d2ac Mon Sep 17 00:00:00 2001 From: Kevin Smith Date: Tue, 12 Aug 2025 14:53:26 -0500 Subject: [PATCH 1/4] Update docs --- CONTRIBUTING.md | 40 ++++++++++++++++++++++++++++----- RELEASE.md | 59 +++++++++++++++++++++++++++++++++++-------------- 2 files changed, 76 insertions(+), 23 deletions(-) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 4f73c361..ebf27708 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -1,18 +1,46 @@ # SingleStore Python SDK Contributing Guide Fork this repo and commit your changes to the forked repo. -From there make a Pull Request with your submission keeping the following in mind: +From there make a Pull Request with your submission keeping the +following in mind: ## Pre-commit checks on the clone of this repo -The CI pipeline in this repo runs a bunch of validation checks and code reformatting with pre-commit checks. If you don't install those checks in your clone of the repo, the code will likely not pass. To install the pre-commit tool in your clone run the following from your clone directory. This will force the checks before you can push. +The CI pipeline in this repo runs a bunch of validation checks and code +reformatting with pre-commit checks. If you don't install those checks +in your clone of the repo, the code will likely not pass. To install +the pre-commit tool in your clone run the following from your clone +directory. This will force the checks before you can push. -```bash -pip3 install pre-commit==3.7.1 +``` +pip install pre-commit==3.7.1 pre-commit install ``` -The checks run automatically when you attempt to commit, but you can run them manually as well with the following: -```bash +The checks run automatically when you attempt to commit, but you can run +them manually as well with the following: +``` pre-commit run --all-files ``` + +## Running tests + +To create a test environment, do the following: +``` +pip install -r requirements.txt +pip install -r test-requirements.txt +``` + +If you have Docker installed, you can run the tests as follows. Note that +you should run the tests using both standard protocol and Data API (HTTP): +``` +pytest -v singlestoredb/tests +USE_DATA_API=1 -v singlestoredb/tests +``` + +If you need to run against a specific server version, you can specify +the URL of that server: +``` +SINGLESTOREDB_URL=user:pw@127.0.0.1:3306 pytest -v singlestoredb/tests +SINGLESTOREDB_URL=http://user:pw@127.0.0.1:8090 pytest -v singlestoredb/tests +``` diff --git a/RELEASE.md b/RELEASE.md index 932fd07c..f029053e 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -1,24 +1,49 @@ # Release process -1. Bump the version number in `setup.cfg` and `singlestoredb/__init__.py` using - semantic versioning rules: minor bump for new features, patch bump for - bug fixes. +## Bump the package version and build documentation -2. Add release notes to `docs/src/whatsnew.rst`. +Bump the version number in `setup.cfg` and `singlestoredb/__init__.py` using +semantic versioning rules: minor bump for new features, patch bump for +bug fixes. Add release notes to `docs/src/whatsnew.rst`. Run `make html` in +`docs/src` to generate documentation. -3. Run `SINGLESTOREDB_URL=root:@db-server:db-port make html` in `docs/src` to - generate documentation. You will need `sphinx` and `sphinx_rtd_theme` installed - for this step. You also need a SingleStoreDB server running at the given - IP and port to run samples against. +You will need `sphinx` and `sphinx_rtd_theme` installed for this step. You +also need a SingleStoreDB server running at the given IP and port to run +samples against. -4. Commit all changed files with a commit like "Prepare for vX.X.X release". +There is a utility to do this process for you, but you should check the +`docs/src/whatsnew.rst` to verify the release summary. Use the following +to run it: +``` +resources/bump_version.py < major | minor | patch > -5. The coverage tests will be triggered by the push, but you should also run - `Smoke test` workflow manually which does basic tests on all supported versions - of Python. +``` -6. Once all workflows are clean, create a new Github release with the name - "SingleStoreDB vX.X.X" and set the generated tag to the matching version - number. Add the release notes from the `whatsnew.rst` file to the release - notes. Creating the release will run the `Public packages` workflow which - builds the packages and pubsishes them to PyPI. +## Commit and push the changes + +After verifying the release summary in the documentation, commit the changes: +``` +# Make sure newly generated docs get added +git add docs + +# Commit changes +git commit -am "Prepare for vX.X.X release". + +git push + +``` + +## Run smoke tests + +The coverage tests will be triggered by the push, but you should also run +[Smoke test](https://github.com/singlestore-labs/singlestoredb-python/actions/workflows/smoke-test.yml) +workflow manually which does basic tests on all supported versions of Python. + +## Create the release on Github + +Once all workflows are clean, create a new Github release with the name +"SingleStoreDB vX.X.X" at +and set the generated tag to the matching version +number. Add the release notes from the `whatsnew.rst` file to the release +notes. Creating the release will run the [Publish packages](https://github.com/singlestore-labs/singlestoredb-python/actions/workflows/publish.yml) +workflow which builds the packages and pubsishes them to PyPI. From 5a446879c759b6c8b975450a328c37e3951a38c8 Mon Sep 17 00:00:00 2001 From: Kevin Smith Date: Wed, 13 Aug 2025 14:06:26 -0500 Subject: [PATCH 2/4] Start of materialized view command --- .../handlers/iceberg_materialized_view.py | 208 ++++++++++++++++++ 1 file changed, 208 insertions(+) create mode 100644 singlestoredb/fusion/handlers/iceberg_materialized_view.py diff --git a/singlestoredb/fusion/handlers/iceberg_materialized_view.py b/singlestoredb/fusion/handlers/iceberg_materialized_view.py new file mode 100644 index 00000000..bf32a6b2 --- /dev/null +++ b/singlestoredb/fusion/handlers/iceberg_materialized_view.py @@ -0,0 +1,208 @@ +#!/usr/bin/env python +"""Iceberg Materialized View handler for SingleStoreDB Fusion.""" +from __future__ import annotations + +import json +import uuid +from typing import Any +from typing import Dict +from typing import Optional + +from singlestoredb.connection import connect +from singlestoredb.fusion import result +from singlestoredb.fusion.handler import SQLHandler + + +class CreateIcebergMaterializedView(SQLHandler): + """ + CREATE ICEBERG MATERIALIZED VIEW + [ if_not_exists ] + view_name + ON iceberg_table + [ catalog ] + [ storage ] + ; + + # If not exists + if_not_exists = IF NOT EXISTS + + # View name + view_name = + + # Iceberg table + iceberg_table =
+ + # Catalog + catalog = CATALOG [ _catalog_config ] [ _catalog_creds ] + _catalog_config = CONFIG '' + _catalog_creds = CREDENTIALS '' + + # Storage + storage = LINK [ _link_config ] [ _link_creds ] + _link_config = S3 CONFIG '' + _link_creds = CREDENTIALS '' + + Description + ----------- + Create an Iceberg materialized view that syncs data from an Iceberg table + to a SingleStore table with automatic updates. + + Arguments + --------- + * ```` and ````: The catalog configuration. + * ```` and ````: The storage link configuration. + + Remarks + ------- + * ``CATALOG`` specifies the details of the catalog to connect to. + * ``LINK`` specifies the details of the data storage to connect to. + * The materialized view will keep the SingleStore table in sync with the + Iceberg table through an underlying pipeline. + + Examples + -------- + The following statement creates an Iceberg materialized view:: + + CREATE ICEBERG MATERIALIZED VIEW my_db.all_sales_orders + ON my_catalog.sales.orders + CATALOG CONFIG '{ + "catalog_type": "GLUE", + "table_format": "ICEBERG", + "catalog_id": "123456789012", + "catalog_region": "us-east-1" + }' + LINK S3 CONFIG '{ + "region": "us-east-1", + "endpoint_url": "s3://my-bucket" + }' + ; + + """ + + def run(self, params: Dict[str, Any]) -> Optional[result.FusionSQLResult]: + # Parse view name + if isinstance(params['view_name'], str): + view_database = None + view_table = params['view_name'] + else: + view_database, view_table = params['view_name'] + + # Parse Iceberg table reference + iceberg_parts = params['iceberg_table'] + if isinstance(iceberg_parts, str): + # Simple table name + catalog_name = None + iceberg_database = None + iceberg_table = iceberg_parts + elif len(iceberg_parts) == 2: + # database.table + catalog_name = None + iceberg_database, iceberg_table = iceberg_parts + elif len(iceberg_parts) == 3: + # catalog.database.table + catalog_name, iceberg_database, iceberg_table = iceberg_parts + else: + raise ValueError( + 'Iceberg table reference must be in format: ' + '[catalog.]database.table', + ) + + # Iceberg expects lowercase + if iceberg_database: + iceberg_database = iceberg_database.lower() + if catalog_name: + catalog_name = catalog_name.lower() + + # Parse catalog configuration + catalog_config = json.loads( + params['catalog'].get('catalog_config', '{}') or '{}', + ) + catalog_creds = json.loads( + params['catalog'].get('catalog_creds', '{}') or '{}', + ) + + # Parse storage configuration + storage_config = json.loads( + (params.get('storage') or {}).get('link_config', '{}') or '{}', + ) + storage_creds = json.loads( + (params.get('storage') or {}).get('link_creds', '{}') or '{}', + ) + + storage_config['provider'] = 'S3' + + # Validate required fields + if iceberg_database is None: + raise ValueError( + 'Database name must be specified for Iceberg table', + ) + + if view_database is None: + with connect() as conn: + with conn.cursor() as cur: + cur.execute('SELECT DATABASE()') + res = cur.fetchone() + if not res: + raise ValueError( + 'No database selected. Please specify database ' + 'name for materialized view', + ) + if isinstance(res, (tuple, list)): + view_database = res[0] + elif isinstance(res, dict): + view_database = list(res.values())[0] + else: + raise ValueError( + 'Unexpected result type from SELECT DATABASE()', + ) + + # Merge configurations + config = {} + config.update(catalog_config) + config.update(storage_config) + config['table_id'] = f'{iceberg_database}.{iceberg_table}' + config_json = json.dumps(config) + + creds = {} + creds.update(catalog_creds) + creds.update(storage_creds) + creds_json = json.dumps(creds) + + # Create a unique pipeline name + pipeline_name = f'iceberg_mv_{view_database}_{view_table}_{uuid.uuid4().hex[:8]}' + + print('ICEBERG TABLE', iceberg_database, iceberg_table) + print('DB TABLE', view_database, view_table) + print('CONFIG', config) + print('CREDS', creds) + + # Create and start the pipeline + with connect() as conn: + with conn.cursor() as cur: + # Create the pipeline + cur.execute(rf''' + CREATE PIPELINE `{pipeline_name}` AS + LOAD DATA S3 '' + CONFIG '{config_json}' + CREDENTIALS '{creds_json}' + REPLACE INTO TABLE + `{view_database}`.`{view_table}` + FORMAT ICEBERG + ''') + + # Start the pipeline + cur.execute(rf'START PIPELINE `{pipeline_name}`') + + # Return result + res = result.FusionSQLResult() + res.add_field('MaterializedView', result.STRING) + res.add_field('Pipeline', result.STRING) + res.add_field('Status', result.STRING) + res.set_rows([ + (f'{view_database}.{view_table}', pipeline_name, 'Created'), + ]) + + return res + + +CreateIcebergMaterializedView.register(overwrite=True) From a6c2ba63ae56eb51488c502ba5e7915c3b352edc Mon Sep 17 00:00:00 2001 From: Maksym Lepekh Date: Wed, 27 Aug 2025 17:29:31 +0300 Subject: [PATCH 3/4] Use CREATE INFERRED statement and generate VIEW with a user-provided name --- .../fusion/handlers/iceberg_materialized_view.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/singlestoredb/fusion/handlers/iceberg_materialized_view.py b/singlestoredb/fusion/handlers/iceberg_materialized_view.py index bf32a6b2..cdb7c04b 100644 --- a/singlestoredb/fusion/handlers/iceberg_materialized_view.py +++ b/singlestoredb/fusion/handlers/iceberg_materialized_view.py @@ -160,7 +160,6 @@ def run(self, params: Dict[str, Any]) -> Optional[result.FusionSQLResult]: config = {} config.update(catalog_config) config.update(storage_config) - config['table_id'] = f'{iceberg_database}.{iceberg_table}' config_json = json.dumps(config) creds = {} @@ -170,6 +169,7 @@ def run(self, params: Dict[str, Any]) -> Optional[result.FusionSQLResult]: # Create a unique pipeline name pipeline_name = f'iceberg_mv_{view_database}_{view_table}_{uuid.uuid4().hex[:8]}' + table_id = f'{iceberg_database}.{iceberg_table}' print('ICEBERG TABLE', iceberg_database, iceberg_table) print('DB TABLE', view_database, view_table) @@ -179,20 +179,23 @@ def run(self, params: Dict[str, Any]) -> Optional[result.FusionSQLResult]: # Create and start the pipeline with connect() as conn: with conn.cursor() as cur: - # Create the pipeline + # Infer and create the pipeline. + # It also creates a table (and optionally a view in case of merge pipeline) with the same name cur.execute(rf''' - CREATE PIPELINE `{pipeline_name}` AS - LOAD DATA S3 '' + CREATE INFERRED PIPELINE `{pipeline_name}` AS + LOAD DATA S3 '{table_id}' CONFIG '{config_json}' CREDENTIALS '{creds_json}' - REPLACE INTO TABLE - `{view_database}`.`{view_table}` FORMAT ICEBERG + OPTIONS = 'merge' ''') # Start the pipeline cur.execute(rf'START PIPELINE `{pipeline_name}`') + # Create view with user-provided name + cur.execute(rf'CREATE VIEW `{view_database}`.`{view_table}` AS SELECT * FROM `{pipeline_name}`') + # Return result res = result.FusionSQLResult() res.add_field('MaterializedView', result.STRING) From 3dd2887ae425cd494528491b23356d6b5b72fb40 Mon Sep 17 00:00:00 2001 From: Kevin Smith Date: Wed, 27 Aug 2025 14:14:48 -0500 Subject: [PATCH 4/4] Add WHERE clause to Iceberg materialized view --- .../handlers/iceberg_materialized_view.py | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/singlestoredb/fusion/handlers/iceberg_materialized_view.py b/singlestoredb/fusion/handlers/iceberg_materialized_view.py index cdb7c04b..fd835bff 100644 --- a/singlestoredb/fusion/handlers/iceberg_materialized_view.py +++ b/singlestoredb/fusion/handlers/iceberg_materialized_view.py @@ -21,6 +21,7 @@ class CreateIcebergMaterializedView(SQLHandler): ON iceberg_table [ catalog ] [ storage ] + [ where ] ; # If not exists @@ -42,6 +43,9 @@ class CreateIcebergMaterializedView(SQLHandler): _link_config = S3 CONFIG '' _link_creds = CREDENTIALS '' + # Where clause + where = WHERE + Description ----------- Create an Iceberg materialized view that syncs data from an Iceberg table @@ -107,6 +111,10 @@ def run(self, params: Dict[str, Any]) -> Optional[result.FusionSQLResult]: '[catalog.]database.table', ) + where = '' + if params.get('where'): + where = f' WHERE {params["where"]}' + # Iceberg expects lowercase if iceberg_database: iceberg_database = iceberg_database.lower() @@ -180,7 +188,8 @@ def run(self, params: Dict[str, Any]) -> Optional[result.FusionSQLResult]: with connect() as conn: with conn.cursor() as cur: # Infer and create the pipeline. - # It also creates a table (and optionally a view in case of merge pipeline) with the same name + # It also creates a table (and optionally a view in case of + # merge pipeline) with the same name cur.execute(rf''' CREATE INFERRED PIPELINE `{pipeline_name}` AS LOAD DATA S3 '{table_id}' @@ -188,13 +197,17 @@ def run(self, params: Dict[str, Any]) -> Optional[result.FusionSQLResult]: CREDENTIALS '{creds_json}' FORMAT ICEBERG OPTIONS = 'merge' + {where} ''') # Start the pipeline cur.execute(rf'START PIPELINE `{pipeline_name}`') # Create view with user-provided name - cur.execute(rf'CREATE VIEW `{view_database}`.`{view_table}` AS SELECT * FROM `{pipeline_name}`') + cur.execute(rf''' + CREATE VIEW `{view_database}`.`{view_table}` + AS SELECT * FROM `{pipeline_name}` + ''') # Return result res = result.FusionSQLResult()