Skip to content

Commit

Permalink
added create and drop volume to interface
Browse files Browse the repository at this point in the history
  • Loading branch information
donotpush committed Jan 16, 2025
1 parent 1efe565 commit b60b3d3
Show file tree
Hide file tree
Showing 14 changed files with 76 additions and 33 deletions.
6 changes: 6 additions & 0 deletions dlt/destinations/impl/athena/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,12 @@ def close_connection(self) -> None:
self._conn.close()
self._conn = None

def create_volume(self) -> None:
pass

def drop_volume(self) -> None:
pass

@property
def native_connection(self) -> Connection:
return self._conn
Expand Down
6 changes: 6 additions & 0 deletions dlt/destinations/impl/bigquery/sql_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,12 @@ def close_connection(self) -> None:
self._client.close()
self._client = None

def create_volume(self) -> None:
pass

def drop_volume(self) -> None:
pass

@contextmanager
@raise_database_error
def begin_transaction(self) -> Iterator[DBTransaction]:
Expand Down
6 changes: 6 additions & 0 deletions dlt/destinations/impl/clickhouse/sql_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,12 @@ def open_connection(self) -> clickhouse_driver.dbapi.connection.Connection:
self._conn = clickhouse_driver.connect(dsn=self.credentials.to_native_representation())
return self._conn

def create_volume(self) -> None:
pass

def drop_volume(self) -> None:
pass

@raise_open_connection_error
def close_connection(self) -> None:
if self._conn:
Expand Down
34 changes: 1 addition & 33 deletions dlt/destinations/impl/databricks/databricks.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@ def __init__(
self._job_client: "DatabricksClient" = None

self._sql_client = None
self._workspace_client = None
self._created_volume = None

def run(self) -> None:
self._sql_client = self._job_client.sql_client
Expand Down Expand Up @@ -93,41 +91,17 @@ def run(self) -> None:

self._sql_client.execute_sql(statement)

self._cleanup_volume()

def _handle_local_file_upload(self, local_file_path: str) -> tuple[str, str]:
from databricks.sdk import WorkspaceClient
from databricks.sdk.service import catalog
import time
import io

w = WorkspaceClient(
host=self._job_client.config.credentials.server_hostname,
token=self._job_client.config.credentials.access_token,
)
self._workspace_client = w

# Create a temporary volume
volume_name = "_dlt_temp_load_volume"
# created_volume = w.volumes.create(
# catalog_name=self._sql_client.database_name,
# schema_name=self._sql_client.dataset_name,
# name=volume_name,
# volume_type=catalog.VolumeType.MANAGED,
# )
# self._created_volume = created_volume # store to delete later

qualified_volume_name = (
f"{self._sql_client.database_name}.{self._sql_client.dataset_name}.{volume_name}"
)
self._sql_client.execute_sql(f"""
CREATE VOLUME IF NOT EXISTS {qualified_volume_name}
""")

logger.info(f"datrabricks volume created {qualified_volume_name}")

# Compute volume paths
volume_path = f"/Volumes/{self._sql_client.database_name}/{self._sql_client.dataset_name}/{volume_name}"
volume_path = f"/Volumes/{self._sql_client.database_name}/{self._sql_client.dataset_name}/{self._sql_client.volume_name}"
volume_folder = f"file_{time.time_ns()}"
volume_folder_path = f"{volume_path}/{volume_folder}"

Expand Down Expand Up @@ -273,12 +247,6 @@ def _build_copy_into_statement(
{format_options_clause}
"""

def _cleanup_volume(self) -> None:
print("lalal")
# if self._workspace_client and self._created_volume:
# self._workspace_client.volumes.delete(name=self._created_volume.full_name)
# logger.info(f"Deleted temporary volume [{self._created_volume.full_name}]")

@staticmethod
def ensure_databricks_abfss_url(
bucket_path: str, azure_storage_account_name: str = None, account_host: str = None
Expand Down
11 changes: 11 additions & 0 deletions dlt/destinations/impl/databricks/sql_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def iter_df(self, chunk_size: int) -> Generator[DataFrame, None, None]:

class DatabricksSqlClient(SqlClientBase[DatabricksSqlConnection], DBTransaction):
dbapi: ClassVar[DBApi] = databricks_lib
volume_name: str = "_dlt_temp_load_volume"

def __init__(
self,
Expand Down Expand Up @@ -102,6 +103,16 @@ def close_connection(self) -> None:
self._conn.close()
self._conn = None

def create_volume(self) -> None:
self.execute_sql(f"""
CREATE VOLUME IF NOT EXISTS {self.fully_qualified_dataset_name()}.{self.volume_name}
""")

def drop_volume(self) -> None:
self.execute_sql(f"""
DROP VOLUME IF EXISTS {self.fully_qualified_dataset_name()}.{self.volume_name}
""")

@contextmanager
def begin_transaction(self) -> Iterator[DBTransaction]:
# Databricks does not support transactions
Expand Down
6 changes: 6 additions & 0 deletions dlt/destinations/impl/dremio/sql_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ def close_connection(self) -> None:
self._conn.close()
self._conn = None

def create_volume(self) -> None:
pass

def drop_volume(self) -> None:
pass

@contextmanager
@raise_database_error
def begin_transaction(self) -> Iterator[DBTransaction]:
Expand Down
6 changes: 6 additions & 0 deletions dlt/destinations/impl/duckdb/sql_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,12 @@ def close_connection(self) -> None:
self.credentials.return_conn(self._conn)
self._conn = None

def create_volume(self) -> None:
pass

def drop_volume(self) -> None:
pass

@contextmanager
@raise_database_error
def begin_transaction(self) -> Iterator[DBTransaction]:
Expand Down
6 changes: 6 additions & 0 deletions dlt/destinations/impl/mssql/sql_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ def close_connection(self) -> None:
self._conn.close()
self._conn = None

def create_volume(self) -> None:
pass

def drop_volume(self) -> None:
pass

@contextmanager
def begin_transaction(self) -> Iterator[DBTransaction]:
try:
Expand Down
6 changes: 6 additions & 0 deletions dlt/destinations/impl/postgres/sql_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ def close_connection(self) -> None:
self._conn.close()
self._conn = None

def create_volume(self) -> None:
pass

def drop_volume(self) -> None:
pass

@contextmanager
def begin_transaction(self) -> Iterator[DBTransaction]:
try:
Expand Down
6 changes: 6 additions & 0 deletions dlt/destinations/impl/snowflake/sql_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ def close_connection(self) -> None:
self._conn.close()
self._conn = None

def create_volume(self) -> None:
pass

def drop_volume(self) -> None:
pass

@contextmanager
def begin_transaction(self) -> Iterator[DBTransaction]:
try:
Expand Down
6 changes: 6 additions & 0 deletions dlt/destinations/impl/sqlalchemy/db_api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,12 @@ def close_connection(self) -> None:
self._current_connection = None
self._current_transaction = None

def create_volume(self) -> None:
pass

def drop_volume(self) -> None:
pass

@property
def native_connection(self) -> Connection:
if not self._current_connection:
Expand Down
1 change: 1 addition & 0 deletions dlt/destinations/job_client_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ def initialize_storage(self, truncate_tables: Iterable[str] = None) -> None:
self.sql_client.create_dataset()
elif truncate_tables:
self.sql_client.truncate_tables(*truncate_tables)
self.sql_client.create_volume()

def is_storage_initialized(self) -> bool:
return self.sql_client.has_dataset()
Expand Down
8 changes: 8 additions & 0 deletions dlt/destinations/sql_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,14 @@ def close_connection(self) -> None:
def begin_transaction(self) -> ContextManager[DBTransaction]:
pass

@abstractmethod
def create_volume(self) -> None:
pass

@abstractmethod
def drop_volume(self) -> None:
pass

def __getattr__(self, name: str) -> Any:
# pass unresolved attrs to native connections
if not self.native_connection:
Expand Down
1 change: 1 addition & 0 deletions dlt/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,7 @@ def load(
runner.run_pool(load_step.config, load_step)
info: LoadInfo = self._get_step_info(load_step)

self.sql_client().drop_volume()
self.first_run = False
return info
except Exception as l_ex:
Expand Down

0 comments on commit b60b3d3

Please sign in to comment.