Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 34 additions & 6 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
@@ -1,18 +1,46 @@
# SingleStore Python SDK Contributing Guide

Fork this repo and commit your changes to the forked repo.
From there make a Pull Request with your submission keeping the following in mind:
From there make a Pull Request with your submission keeping the
following in mind:

## Pre-commit checks on the clone of this repo

The CI pipeline in this repo runs a bunch of validation checks and code reformatting with pre-commit checks. If you don't install those checks in your clone of the repo, the code will likely not pass. To install the pre-commit tool in your clone run the following from your clone directory. This will force the checks before you can push.
The CI pipeline in this repo runs a bunch of validation checks and code
reformatting with pre-commit checks. If you don't install those checks
in your clone of the repo, the code will likely not pass. To install
the pre-commit tool in your clone run the following from your clone
directory. This will force the checks before you can push.

```bash
pip3 install pre-commit==3.7.1
```
pip install pre-commit==3.7.1
pre-commit install
```

The checks run automatically when you attempt to commit, but you can run them manually as well with the following:
```bash
The checks run automatically when you attempt to commit, but you can run
them manually as well with the following:
```
pre-commit run --all-files
```

## Running tests

To create a test environment, do the following:
```
pip install -r requirements.txt
pip install -r test-requirements.txt
```

If you have Docker installed, you can run the tests as follows. Note that
you should run the tests using both standard protocol and Data API (HTTP):
```
pytest -v singlestoredb/tests
USE_DATA_API=1 -v singlestoredb/tests
```

If you need to run against a specific server version, you can specify
the URL of that server:
```
SINGLESTOREDB_URL=user:[email protected]:3306 pytest -v singlestoredb/tests
SINGLESTOREDB_URL=http://user:[email protected]:8090 pytest -v singlestoredb/tests
```
59 changes: 42 additions & 17 deletions RELEASE.md
Original file line number Diff line number Diff line change
@@ -1,24 +1,49 @@
# Release process

1. Bump the version number in `setup.cfg` and `singlestoredb/__init__.py` using
semantic versioning rules: minor bump for new features, patch bump for
bug fixes.
## Bump the package version and build documentation

2. Add release notes to `docs/src/whatsnew.rst`.
Bump the version number in `setup.cfg` and `singlestoredb/__init__.py` using
semantic versioning rules: minor bump for new features, patch bump for
bug fixes. Add release notes to `docs/src/whatsnew.rst`. Run `make html` in
`docs/src` to generate documentation.

3. Run `SINGLESTOREDB_URL=root:@db-server:db-port make html` in `docs/src` to
generate documentation. You will need `sphinx` and `sphinx_rtd_theme` installed
for this step. You also need a SingleStoreDB server running at the given
IP and port to run samples against.
You will need `sphinx` and `sphinx_rtd_theme` installed for this step. You
also need a SingleStoreDB server running at the given IP and port to run
samples against.

4. Commit all changed files with a commit like "Prepare for vX.X.X release".
There is a utility to do this process for you, but you should check the
`docs/src/whatsnew.rst` to verify the release summary. Use the following
to run it:
```
resources/bump_version.py < major | minor | patch >

5. The coverage tests will be triggered by the push, but you should also run
`Smoke test` workflow manually which does basic tests on all supported versions
of Python.
```

6. Once all workflows are clean, create a new Github release with the name
"SingleStoreDB vX.X.X" and set the generated tag to the matching version
number. Add the release notes from the `whatsnew.rst` file to the release
notes. Creating the release will run the `Public packages` workflow which
builds the packages and pubsishes them to PyPI.
## Commit and push the changes

After verifying the release summary in the documentation, commit the changes:
```
# Make sure newly generated docs get added
git add docs

# Commit changes
git commit -am "Prepare for vX.X.X release".

git push

```

## Run smoke tests

The coverage tests will be triggered by the push, but you should also run
[Smoke test](https://github.com/singlestore-labs/singlestoredb-python/actions/workflows/smoke-test.yml)
workflow manually which does basic tests on all supported versions of Python.

## Create the release on Github

Once all workflows are clean, create a new Github release with the name
"SingleStoreDB vX.X.X" at <https://github.com/singlestore-labs/singlestoredb-python/releases>
and set the generated tag to the matching version
number. Add the release notes from the `whatsnew.rst` file to the release
notes. Creating the release will run the [Publish packages](https://github.com/singlestore-labs/singlestoredb-python/actions/workflows/publish.yml)
workflow which builds the packages and pubsishes them to PyPI.
224 changes: 224 additions & 0 deletions singlestoredb/fusion/handlers/iceberg_materialized_view.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
#!/usr/bin/env python
"""Iceberg Materialized View handler for SingleStoreDB Fusion."""
from __future__ import annotations

import json
import uuid
from typing import Any
from typing import Dict
from typing import Optional

from singlestoredb.connection import connect
from singlestoredb.fusion import result
from singlestoredb.fusion.handler import SQLHandler


class CreateIcebergMaterializedView(SQLHandler):
"""
CREATE ICEBERG MATERIALIZED VIEW
[ if_not_exists ]
view_name
ON iceberg_table
[ catalog ]
[ storage ]
[ where ]
;

# If not exists
if_not_exists = IF NOT EXISTS

# View name
view_name = <table>

# Iceberg table
iceberg_table = <table>

# Catalog
catalog = CATALOG [ _catalog_config ] [ _catalog_creds ]
_catalog_config = CONFIG '<catalog-config>'
_catalog_creds = CREDENTIALS '<catalog-creds>'

# Storage
storage = LINK [ _link_config ] [ _link_creds ]
_link_config = S3 CONFIG '<link-config>'
_link_creds = CREDENTIALS '<link-creds>'

# Where clause
where = WHERE <expr>

Description
-----------
Create an Iceberg materialized view that syncs data from an Iceberg table
to a SingleStore table with automatic updates.

Arguments
---------
* ``<catalog-config>`` and ``<catalog-creds>``: The catalog configuration.
* ``<link-config>`` and ``<link-creds>``: The storage link configuration.

Remarks
-------
* ``CATALOG`` specifies the details of the catalog to connect to.
* ``LINK`` specifies the details of the data storage to connect to.
* The materialized view will keep the SingleStore table in sync with the
Iceberg table through an underlying pipeline.

Examples
--------
The following statement creates an Iceberg materialized view::

CREATE ICEBERG MATERIALIZED VIEW my_db.all_sales_orders
ON my_catalog.sales.orders
CATALOG CONFIG '{
"catalog_type": "GLUE",
"table_format": "ICEBERG",
"catalog_id": "123456789012",
"catalog_region": "us-east-1"
}'
LINK S3 CONFIG '{
"region": "us-east-1",
"endpoint_url": "s3://my-bucket"
}'
;

"""

def run(self, params: Dict[str, Any]) -> Optional[result.FusionSQLResult]:
# Parse view name
if isinstance(params['view_name'], str):
view_database = None
view_table = params['view_name']
else:
view_database, view_table = params['view_name']

# Parse Iceberg table reference
iceberg_parts = params['iceberg_table']
if isinstance(iceberg_parts, str):
# Simple table name
catalog_name = None
iceberg_database = None
iceberg_table = iceberg_parts
elif len(iceberg_parts) == 2:
# database.table
catalog_name = None
iceberg_database, iceberg_table = iceberg_parts
elif len(iceberg_parts) == 3:
# catalog.database.table
catalog_name, iceberg_database, iceberg_table = iceberg_parts
else:
raise ValueError(
'Iceberg table reference must be in format: '
'[catalog.]database.table',
)

where = ''
if params.get('where'):
where = f' WHERE {params["where"]}'

# Iceberg expects lowercase
if iceberg_database:
iceberg_database = iceberg_database.lower()
if catalog_name:
catalog_name = catalog_name.lower()

# Parse catalog configuration
catalog_config = json.loads(
params['catalog'].get('catalog_config', '{}') or '{}',
)
catalog_creds = json.loads(
params['catalog'].get('catalog_creds', '{}') or '{}',
)

# Parse storage configuration
storage_config = json.loads(
(params.get('storage') or {}).get('link_config', '{}') or '{}',
)
storage_creds = json.loads(
(params.get('storage') or {}).get('link_creds', '{}') or '{}',
)

storage_config['provider'] = 'S3'

# Validate required fields
if iceberg_database is None:
raise ValueError(
'Database name must be specified for Iceberg table',
)

if view_database is None:
with connect() as conn:
with conn.cursor() as cur:
cur.execute('SELECT DATABASE()')
res = cur.fetchone()
if not res:
raise ValueError(
'No database selected. Please specify database '
'name for materialized view',
)
if isinstance(res, (tuple, list)):
view_database = res[0]
elif isinstance(res, dict):
view_database = list(res.values())[0]
else:
raise ValueError(
'Unexpected result type from SELECT DATABASE()',
)

# Merge configurations
config = {}
config.update(catalog_config)
config.update(storage_config)
config_json = json.dumps(config)

creds = {}
creds.update(catalog_creds)
creds.update(storage_creds)
creds_json = json.dumps(creds)

# Create a unique pipeline name
pipeline_name = f'iceberg_mv_{view_database}_{view_table}_{uuid.uuid4().hex[:8]}'
table_id = f'{iceberg_database}.{iceberg_table}'

print('ICEBERG TABLE', iceberg_database, iceberg_table)
print('DB TABLE', view_database, view_table)
print('CONFIG', config)
print('CREDS', creds)

# Create and start the pipeline
with connect() as conn:
with conn.cursor() as cur:
# Infer and create the pipeline.
# It also creates a table (and optionally a view in case of
# merge pipeline) with the same name
cur.execute(rf'''
CREATE INFERRED PIPELINE `{pipeline_name}` AS
LOAD DATA S3 '{table_id}'
CONFIG '{config_json}'
CREDENTIALS '{creds_json}'
FORMAT ICEBERG
OPTIONS = 'merge'
{where}
''')

# Start the pipeline
cur.execute(rf'START PIPELINE `{pipeline_name}`')

# Create view with user-provided name
cur.execute(rf'''
CREATE VIEW `{view_database}`.`{view_table}`
AS SELECT * FROM `{pipeline_name}`
''')

# Return result
res = result.FusionSQLResult()
res.add_field('MaterializedView', result.STRING)
res.add_field('Pipeline', result.STRING)
res.add_field('Status', result.STRING)
res.set_rows([
(f'{view_database}.{view_table}', pipeline_name, 'Created'),
])

return res


CreateIcebergMaterializedView.register(overwrite=True)
Loading