diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 4f73c361..ebf27708 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -1,18 +1,46 @@ # SingleStore Python SDK Contributing Guide Fork this repo and commit your changes to the forked repo. -From there make a Pull Request with your submission keeping the following in mind: +From there make a Pull Request with your submission keeping the +following in mind: ## Pre-commit checks on the clone of this repo -The CI pipeline in this repo runs a bunch of validation checks and code reformatting with pre-commit checks. If you don't install those checks in your clone of the repo, the code will likely not pass. To install the pre-commit tool in your clone run the following from your clone directory. This will force the checks before you can push. +The CI pipeline in this repo runs a bunch of validation checks and code +reformatting with pre-commit checks. If you don't install those checks +in your clone of the repo, the code will likely not pass. To install +the pre-commit tool in your clone run the following from your clone +directory. This will force the checks before you can push. -```bash -pip3 install pre-commit==3.7.1 +``` +pip install pre-commit==3.7.1 pre-commit install ``` -The checks run automatically when you attempt to commit, but you can run them manually as well with the following: -```bash +The checks run automatically when you attempt to commit, but you can run +them manually as well with the following: +``` pre-commit run --all-files ``` + +## Running tests + +To create a test environment, do the following: +``` +pip install -r requirements.txt +pip install -r test-requirements.txt +``` + +If you have Docker installed, you can run the tests as follows. Note that +you should run the tests using both standard protocol and Data API (HTTP): +``` +pytest -v singlestoredb/tests +USE_DATA_API=1 -v singlestoredb/tests +``` + +If you need to run against a specific server version, you can specify +the URL of that server: +``` +SINGLESTOREDB_URL=user:pw@127.0.0.1:3306 pytest -v singlestoredb/tests +SINGLESTOREDB_URL=http://user:pw@127.0.0.1:8090 pytest -v singlestoredb/tests +``` diff --git a/RELEASE.md b/RELEASE.md index 932fd07c..f029053e 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -1,24 +1,49 @@ # Release process -1. Bump the version number in `setup.cfg` and `singlestoredb/__init__.py` using - semantic versioning rules: minor bump for new features, patch bump for - bug fixes. +## Bump the package version and build documentation -2. Add release notes to `docs/src/whatsnew.rst`. +Bump the version number in `setup.cfg` and `singlestoredb/__init__.py` using +semantic versioning rules: minor bump for new features, patch bump for +bug fixes. Add release notes to `docs/src/whatsnew.rst`. Run `make html` in +`docs/src` to generate documentation. -3. Run `SINGLESTOREDB_URL=root:@db-server:db-port make html` in `docs/src` to - generate documentation. You will need `sphinx` and `sphinx_rtd_theme` installed - for this step. You also need a SingleStoreDB server running at the given - IP and port to run samples against. +You will need `sphinx` and `sphinx_rtd_theme` installed for this step. You +also need a SingleStoreDB server running at the given IP and port to run +samples against. -4. Commit all changed files with a commit like "Prepare for vX.X.X release". +There is a utility to do this process for you, but you should check the +`docs/src/whatsnew.rst` to verify the release summary. Use the following +to run it: +``` +resources/bump_version.py < major | minor | patch > -5. The coverage tests will be triggered by the push, but you should also run - `Smoke test` workflow manually which does basic tests on all supported versions - of Python. +``` -6. Once all workflows are clean, create a new Github release with the name - "SingleStoreDB vX.X.X" and set the generated tag to the matching version - number. Add the release notes from the `whatsnew.rst` file to the release - notes. Creating the release will run the `Public packages` workflow which - builds the packages and pubsishes them to PyPI. +## Commit and push the changes + +After verifying the release summary in the documentation, commit the changes: +``` +# Make sure newly generated docs get added +git add docs + +# Commit changes +git commit -am "Prepare for vX.X.X release". + +git push + +``` + +## Run smoke tests + +The coverage tests will be triggered by the push, but you should also run +[Smoke test](https://github.com/singlestore-labs/singlestoredb-python/actions/workflows/smoke-test.yml) +workflow manually which does basic tests on all supported versions of Python. + +## Create the release on Github + +Once all workflows are clean, create a new Github release with the name +"SingleStoreDB vX.X.X" at +and set the generated tag to the matching version +number. Add the release notes from the `whatsnew.rst` file to the release +notes. Creating the release will run the [Publish packages](https://github.com/singlestore-labs/singlestoredb-python/actions/workflows/publish.yml) +workflow which builds the packages and pubsishes them to PyPI. diff --git a/singlestoredb/fusion/handlers/iceberg_materialized_view.py b/singlestoredb/fusion/handlers/iceberg_materialized_view.py new file mode 100644 index 00000000..fd835bff --- /dev/null +++ b/singlestoredb/fusion/handlers/iceberg_materialized_view.py @@ -0,0 +1,224 @@ +#!/usr/bin/env python +"""Iceberg Materialized View handler for SingleStoreDB Fusion.""" +from __future__ import annotations + +import json +import uuid +from typing import Any +from typing import Dict +from typing import Optional + +from singlestoredb.connection import connect +from singlestoredb.fusion import result +from singlestoredb.fusion.handler import SQLHandler + + +class CreateIcebergMaterializedView(SQLHandler): + """ + CREATE ICEBERG MATERIALIZED VIEW + [ if_not_exists ] + view_name + ON iceberg_table + [ catalog ] + [ storage ] + [ where ] + ; + + # If not exists + if_not_exists = IF NOT EXISTS + + # View name + view_name = + + # Iceberg table + iceberg_table =
+ + # Catalog + catalog = CATALOG [ _catalog_config ] [ _catalog_creds ] + _catalog_config = CONFIG '' + _catalog_creds = CREDENTIALS '' + + # Storage + storage = LINK [ _link_config ] [ _link_creds ] + _link_config = S3 CONFIG '' + _link_creds = CREDENTIALS '' + + # Where clause + where = WHERE + + Description + ----------- + Create an Iceberg materialized view that syncs data from an Iceberg table + to a SingleStore table with automatic updates. + + Arguments + --------- + * ```` and ````: The catalog configuration. + * ```` and ````: The storage link configuration. + + Remarks + ------- + * ``CATALOG`` specifies the details of the catalog to connect to. + * ``LINK`` specifies the details of the data storage to connect to. + * The materialized view will keep the SingleStore table in sync with the + Iceberg table through an underlying pipeline. + + Examples + -------- + The following statement creates an Iceberg materialized view:: + + CREATE ICEBERG MATERIALIZED VIEW my_db.all_sales_orders + ON my_catalog.sales.orders + CATALOG CONFIG '{ + "catalog_type": "GLUE", + "table_format": "ICEBERG", + "catalog_id": "123456789012", + "catalog_region": "us-east-1" + }' + LINK S3 CONFIG '{ + "region": "us-east-1", + "endpoint_url": "s3://my-bucket" + }' + ; + + """ + + def run(self, params: Dict[str, Any]) -> Optional[result.FusionSQLResult]: + # Parse view name + if isinstance(params['view_name'], str): + view_database = None + view_table = params['view_name'] + else: + view_database, view_table = params['view_name'] + + # Parse Iceberg table reference + iceberg_parts = params['iceberg_table'] + if isinstance(iceberg_parts, str): + # Simple table name + catalog_name = None + iceberg_database = None + iceberg_table = iceberg_parts + elif len(iceberg_parts) == 2: + # database.table + catalog_name = None + iceberg_database, iceberg_table = iceberg_parts + elif len(iceberg_parts) == 3: + # catalog.database.table + catalog_name, iceberg_database, iceberg_table = iceberg_parts + else: + raise ValueError( + 'Iceberg table reference must be in format: ' + '[catalog.]database.table', + ) + + where = '' + if params.get('where'): + where = f' WHERE {params["where"]}' + + # Iceberg expects lowercase + if iceberg_database: + iceberg_database = iceberg_database.lower() + if catalog_name: + catalog_name = catalog_name.lower() + + # Parse catalog configuration + catalog_config = json.loads( + params['catalog'].get('catalog_config', '{}') or '{}', + ) + catalog_creds = json.loads( + params['catalog'].get('catalog_creds', '{}') or '{}', + ) + + # Parse storage configuration + storage_config = json.loads( + (params.get('storage') or {}).get('link_config', '{}') or '{}', + ) + storage_creds = json.loads( + (params.get('storage') or {}).get('link_creds', '{}') or '{}', + ) + + storage_config['provider'] = 'S3' + + # Validate required fields + if iceberg_database is None: + raise ValueError( + 'Database name must be specified for Iceberg table', + ) + + if view_database is None: + with connect() as conn: + with conn.cursor() as cur: + cur.execute('SELECT DATABASE()') + res = cur.fetchone() + if not res: + raise ValueError( + 'No database selected. Please specify database ' + 'name for materialized view', + ) + if isinstance(res, (tuple, list)): + view_database = res[0] + elif isinstance(res, dict): + view_database = list(res.values())[0] + else: + raise ValueError( + 'Unexpected result type from SELECT DATABASE()', + ) + + # Merge configurations + config = {} + config.update(catalog_config) + config.update(storage_config) + config_json = json.dumps(config) + + creds = {} + creds.update(catalog_creds) + creds.update(storage_creds) + creds_json = json.dumps(creds) + + # Create a unique pipeline name + pipeline_name = f'iceberg_mv_{view_database}_{view_table}_{uuid.uuid4().hex[:8]}' + table_id = f'{iceberg_database}.{iceberg_table}' + + print('ICEBERG TABLE', iceberg_database, iceberg_table) + print('DB TABLE', view_database, view_table) + print('CONFIG', config) + print('CREDS', creds) + + # Create and start the pipeline + with connect() as conn: + with conn.cursor() as cur: + # Infer and create the pipeline. + # It also creates a table (and optionally a view in case of + # merge pipeline) with the same name + cur.execute(rf''' + CREATE INFERRED PIPELINE `{pipeline_name}` AS + LOAD DATA S3 '{table_id}' + CONFIG '{config_json}' + CREDENTIALS '{creds_json}' + FORMAT ICEBERG + OPTIONS = 'merge' + {where} + ''') + + # Start the pipeline + cur.execute(rf'START PIPELINE `{pipeline_name}`') + + # Create view with user-provided name + cur.execute(rf''' + CREATE VIEW `{view_database}`.`{view_table}` + AS SELECT * FROM `{pipeline_name}` + ''') + + # Return result + res = result.FusionSQLResult() + res.add_field('MaterializedView', result.STRING) + res.add_field('Pipeline', result.STRING) + res.add_field('Status', result.STRING) + res.set_rows([ + (f'{view_database}.{view_table}', pipeline_name, 'Created'), + ]) + + return res + + +CreateIcebergMaterializedView.register(overwrite=True)