Skip to content

Commit

Permalink
Additional wait logic for streaming destinations (rtdip#501)
Browse files Browse the repository at this point in the history
* Additional wait logic for streaming destinations

Signed-off-by: GBBBAS <[email protected]>

* Remove rest api tests - 1

Signed-off-by: GBBBAS <[email protected]>

* Add test 2

Signed-off-by: GBBBAS <[email protected]>

* Add tests back

Signed-off-by: GBBBAS <[email protected]>

* Fix for rest api tests

Signed-off-by: GBBBAS <[email protected]>

* Fix for df

Signed-off-by: GBBBAS <[email protected]>

---------

Signed-off-by: GBBBAS <[email protected]>
  • Loading branch information
GBBBAS authored Sep 21, 2023
1 parent 54b472f commit 159e181
Show file tree
Hide file tree
Showing 11 changed files with 123 additions and 75 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ jobs:
run: |
mkdocs build --strict
job_python_lint_black:
job_lint_python_black:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
Expand Down
12 changes: 6 additions & 6 deletions src/sdk/python/rtdip_sdk/pipelines/destinations/spark/delta.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class SparkDeltaDestination(DestinationInterface):
mode (optional str): Method of writing to Delta Table - append/overwrite (batch), append/update/complete (stream). Default is append
trigger (optional str): Frequency of the write operation. Specify "availableNow" to execute a trigger once, otherwise specify a time period such as "30 seconds", "5 minutes". Set to "0 seconds" if you do not want to use a trigger. (stream) Default is 10 seconds
query_name (optional str): Unique name for the query in associated SparkSession. (stream) Default is DeltaDestination
wait_while_query_active (optional bool): If True, waits for the streaming query to complete before returning. (stream) Default is False
query_wait_interval (optional int): If set, waits for the streaming query to complete before returning. (stream) Default is None
Attributes:
checkpointLocation (str): Path to checkpoint files. (Streaming)
Expand All @@ -51,7 +51,7 @@ class SparkDeltaDestination(DestinationInterface):
mode: str
trigger: str
query_name: str
wait_while_query_active: bool
query_wait_interval: int

def __init__(
self,
Expand All @@ -61,15 +61,15 @@ def __init__(
mode: str = "append",
trigger: str = "10 seconds",
query_name: str = "DeltaDestination",
wait_while_query_active: bool = False,
query_wait_interval: int = None,
) -> None:
self.data = data
self.options = options
self.destination = destination
self.mode = mode
self.trigger = trigger
self.query_name = query_name
self.wait_while_query_active = wait_while_query_active
self.query_wait_interval = query_wait_interval

@staticmethod
def system_type():
Expand Down Expand Up @@ -154,11 +154,11 @@ def write_stream(self):
.toTable(self.destination)
)

if self.wait_while_query_active == True:
if self.query_wait_interval:
while query.isActive:
if query.lastProgress:
logging.info(query.lastProgress)
time.sleep(10)
time.sleep(self.query_wait_interval)

except Py4JJavaError as e:
logging.exception(e.errmsg)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,15 @@ class SparkDeltaMergeDestination(DestinationInterface):
destination (str): Either the name of the Hive Metastore or Unity Catalog Delta Table **or** the path to the Delta table
options (dict): Options that can be specified for a Delta Table read operation (See Attributes table below). Further information on the options is available for [batch](https://docs.delta.io/latest/delta-batch.html#write-to-a-table){ target="_blank" } and [streaming](https://docs.delta.io/latest/delta-streaming.html#delta-table-as-a-sink){ target="_blank" }.
merge_condition (str): Condition for matching records between dataframe and delta table. Reference Dataframe columns as `source` and Delta Table columns as `target`. For example `source.id = target.id`.
when_matched_update_list (list[DeltaMergeConditionValues]): Conditions(optional) and values to be used when updating rows that match the `merge_condition`. Specify `*` for Values if all columns from Dataframe should be inserted.
when_matched_delete_list (list[DeltaMergeCondition]): Conditions(optional) to be used when deleting rows that match the `merge_condition`.
when_not_matched_insert_list (list[DeltaMergeConditionValues]): Conditions(optional) and values to be used when inserting rows that do not match the `merge_condition`. Specify `*` for Values if all columns from Dataframe should be inserted.
when_not_matched_by_source_update_list (list[DeltaMergeConditionValues]): Conditions(optional) and values to be used when updating rows that do not match the `merge_condition`.
when_not_matched_by_source_delete_list (list[DeltaMergeCondition]): Conditions(optional) to be used when deleting rows that do not match the `merge_condition`.
try_broadcast_join (bool): Attempts to perform a broadcast join in the merge which can leverage data skipping using partition pruning and file pruning automatically. Can fail if dataframe being merged is large and therefore more suitable for streaming merges than batch merges
trigger (str): Frequency of the write operation. Specify "availableNow" to execute a trigger once, otherwise specify a time period such as "30 seconds", "5 minutes"
query_name (str): Unique name for the query in associated SparkSession
when_matched_update_list (optional list[DeltaMergeConditionValues]): Conditions(optional) and values to be used when updating rows that match the `merge_condition`. Specify `*` for Values if all columns from Dataframe should be inserted.
when_matched_delete_list (optional list[DeltaMergeCondition]): Conditions(optional) to be used when deleting rows that match the `merge_condition`.
when_not_matched_insert_list (optional list[DeltaMergeConditionValues]): Conditions(optional) and values to be used when inserting rows that do not match the `merge_condition`. Specify `*` for Values if all columns from Dataframe should be inserted.
when_not_matched_by_source_update_list (optional list[DeltaMergeConditionValues]): Conditions(optional) and values to be used when updating rows that do not match the `merge_condition`.
when_not_matched_by_source_delete_list (optional list[DeltaMergeCondition]): Conditions(optional) to be used when deleting rows that do not match the `merge_condition`.
try_broadcast_join (optional bool): Attempts to perform a broadcast join in the merge which can leverage data skipping using partition pruning and file pruning automatically. Can fail if dataframe being merged is large and therefore more suitable for streaming merges than batch merges
trigger (optional str): Frequency of the write operation. Specify "availableNow" to execute a trigger once, otherwise specify a time period such as "30 seconds", "5 minutes". Set to "0 seconds" if you do not want to use a trigger. (stream) Default is 10 seconds
query_name (optional str): Unique name for the query in associated SparkSession
query_wait_interval (optional int): If set, waits for the streaming query to complete before returning. (stream) Default is None
Attributes:
checkpointLocation (str): Path to checkpoint files. (Streaming)
Expand All @@ -71,6 +72,7 @@ class SparkDeltaMergeDestination(DestinationInterface):
try_broadcast_join: bool
trigger: str
query_name: str
query_wait_interval: int

def __init__(
self,
Expand All @@ -87,6 +89,7 @@ def __init__(
try_broadcast_join: bool = False,
trigger="10 seconds",
query_name: str = "DeltaMergeDestination",
query_wait_interval: int = None,
) -> None:
self.spark = spark
self.data = data
Expand Down Expand Up @@ -125,6 +128,7 @@ def __init__(
self.try_broadcast_join = try_broadcast_join
self.trigger = trigger
self.query_name = query_name
self.query_wait_interval = query_wait_interval

@staticmethod
def system_type():
Expand Down Expand Up @@ -275,10 +279,11 @@ def write_stream(self):
.start()
)

while query.isActive:
if query.lastProgress:
logging.info(query.lastProgress)
time.sleep(10)
if self.query_wait_interval:
while query.isActive:
if query.lastProgress:
logging.info(query.lastProgress)
time.sleep(self.query_wait_interval)

except Py4JJavaError as e:
logging.exception(e.errmsg)
Expand Down
22 changes: 17 additions & 5 deletions src/sdk/python/rtdip_sdk/pipelines/destinations/spark/eventhub.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ class SparkEventhubDestination(DestinationInterface):
spark (SparkSession): Spark Session
data (DataFrame): Dataframe to be written to Eventhub
options (dict): A dictionary of Eventhub configurations (See Attributes table below). All Configuration options for Eventhubs can be found [here.](https://github.com/Azure/azure-event-hubs-spark/blob/master/docs/PySpark/structured-streaming-pyspark.md#event-hubs-configuration){ target="_blank" }
trigger (str): Frequency of the write operation. Specify "availableNow" to execute a trigger once, otherwise specify a time period such as "30 seconds", "5 minutes"
trigger (optional str): Frequency of the write operation. Specify "availableNow" to execute a trigger once, otherwise specify a time period such as "30 seconds", "5 minutes". Set to "0 seconds" if you do not want to use a trigger. (stream) Default is 10 seconds
query_name (str): Unique name for the query in associated SparkSession
query_wait_interval (optional int): If set, waits for the streaming query to complete before returning. (stream) Default is None
Attributes:
checkpointLocation (str): Path to checkpoint files. (Streaming)
Expand All @@ -48,19 +49,28 @@ class SparkEventhubDestination(DestinationInterface):
maxEventsPerTrigger (long): Rate limit on maximum number of events processed per trigger interval. The specified total number of events will be proportionally split across partitions of different volume. (Stream)
"""

spark: SparkSession
data: DataFrame
options: dict
trigger: str
query_name: str
query_wait_interval: int

def __init__(
self,
spark: SparkSession,
data: DataFrame,
options: dict,
trigger="10 seconds",
query_name="EventhubDestination",
query_wait_interval: int = None,
) -> None:
self.spark = spark
self.data = data
self.options = options
self.trigger = trigger
self.query_name = query_name
self.query_wait_interval = query_wait_interval

@staticmethod
def system_type():
Expand Down Expand Up @@ -183,10 +193,12 @@ def write_stream(self):
.queryName(self.query_name)
.start()
)
while query.isActive:
if query.lastProgress:
logging.info(query.lastProgress)
time.sleep(10)

if self.query_wait_interval:
while query.isActive:
if query.lastProgress:
logging.info(query.lastProgress)
time.sleep(self.query_wait_interval)

except Py4JJavaError as e:
logging.exception(e.errmsg)
Expand Down
21 changes: 16 additions & 5 deletions src/sdk/python/rtdip_sdk/pipelines/destinations/spark/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ class SparkKafkaDestination(DestinationInterface):
Args:
data (DataFrame): Dataframe to be written to Kafka
options (dict): A dictionary of Kafka configurations (See Attributes tables below). For more information on configuration options see [here](https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html){ target="_blank" }
trigger (str): Frequency of the write operation. Specify "availableNow" to execute a trigger once, otherwise specify a time period such as "30 seconds", "5 minutes"
trigger (optional str): Frequency of the write operation. Specify "availableNow" to execute a trigger once, otherwise specify a time period such as "30 seconds", "5 minutes". Set to "0 seconds" if you do not want to use a trigger. (stream) Default is 10 seconds
query_name (str): Unique name for the query in associated SparkSession
query_wait_interval (optional int): If set, waits for the streaming query to complete before returning. (stream) Default is None
The following options must be set for the Kafka destination for both batch and streaming queries.
Expand All @@ -50,17 +51,25 @@ class SparkKafkaDestination(DestinationInterface):
"""

data: DataFrame
options: dict
trigger: str
query_name: str
query_wait_interval: int

def __init__(
self,
data: DataFrame,
options: dict,
trigger="10 seconds",
query_name="KafkaDestination",
query_wait_interval: int = None,
) -> None:
self.data = data
self.options = options
self.trigger = trigger
self.query_name = query_name
self.query_wait_interval = query_wait_interval

@staticmethod
def system_type():
Expand Down Expand Up @@ -123,10 +132,12 @@ def write_stream(self):
.queryName(self.query_name)
.start()
)
while query.isActive:
if query.lastProgress:
logging.info(query.lastProgress)
time.sleep(10)

if self.query_wait_interval:
while query.isActive:
if query.lastProgress:
logging.info(query.lastProgress)
time.sleep(self.query_wait_interval)

except Py4JJavaError as e:
logging.exception(e.errmsg)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,9 @@ class SparkKafkaEventhubDestination(DestinationInterface):
data (DataFrame): Any columns not listed in the required schema [here](https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#writing-data-to-kafka){ target="_blank" } will be merged into a single column named "value", or ignored if "value" is an existing column
connection_string (str): Eventhubs connection string is required to connect to the Eventhubs service. This must include the Eventhub name as the `EntityPath` parameter. Example `"Endpoint=sb://test.servicebus.windows.net/;SharedAccessKeyName=test;SharedAccessKey=test_key;EntityPath=test_eventhub"`
options (dict): A dictionary of Kafka configurations (See Attributes tables below)
trigger (str): Frequency of the write operation. Specify "availableNow" to execute a trigger once, otherwise specify a time period such as "30 seconds", "5 minutes"
trigger (optional str): Frequency of the write operation. Specify "availableNow" to execute a trigger once, otherwise specify a time period such as "30 seconds", "5 minutes". Set to "0 seconds" if you do not want to use a trigger. (stream) Default is 10 seconds
query_name (str): Unique name for the query in associated SparkSession
query_wait_interval (optional int): If set, waits for the streaming query to complete before returning. (stream) Default is None
The following are commonly used parameters that may be included in the options dict. kafka.bootstrap.servers is the only required config. A full list of configs can be found [here](https://kafka.apache.org/documentation/#producerconfigs){ target="_blank" }
Expand All @@ -61,6 +62,16 @@ class SparkKafkaEventhubDestination(DestinationInterface):
includeHeaders (bool): Determines whether to include the Kafka headers in the row; defaults to False. (Streaming and Batch)
"""

spark: SparkSession
data: DataFrame
connection_string: str
options: dict
consumer_group: str
trigger: str
query_name: str
connection_string_properties: dict
query_wait_interval: int

def __init__(
self,
spark: SparkSession,
Expand All @@ -70,6 +81,7 @@ def __init__(
consumer_group: str,
trigger: str = "10 seconds",
query_name: str = "KafkaEventhubDestination",
query_wait_interval: int = None,
) -> None:
self.spark = spark
self.data = data
Expand All @@ -82,6 +94,7 @@ def __init__(
connection_string
)
self.options = self._configure_options(options)
self.query_wait_interval = query_wait_interval

@staticmethod
def system_type():
Expand Down Expand Up @@ -265,17 +278,18 @@ def write_stream(self) -> DataFrame:
else {"processingTime": self.trigger}
)
query = (
self.data.writeStream.trigger(**TRIGGER_OPTION)
df.writeStream.trigger(**TRIGGER_OPTION)
.format("kafka")
.options(**self.options)
.queryName(self.query_name)
.start()
)
while query.isActive:
if query.lastProgress:
logging.info(query.lastProgress)
time.sleep(10)
df.writeStream.format("kafka").options(**self.options).start()

if self.query_wait_interval:
while query.isActive:
if query.lastProgress:
logging.info(query.lastProgress)
time.sleep(self.query_wait_interval)

except Py4JJavaError as e:
logging.exception(e.errmsg)
Expand Down
23 changes: 18 additions & 5 deletions src/sdk/python/rtdip_sdk/pipelines/destinations/spark/kinesis.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ class SparkKinesisDestination(DestinationInterface):
data (DataFrame): Dataframe to be written to Delta
options (dict): A dictionary of Kinesis configurations (See Attributes table below). All Configuration options for Kinesis can be found [here.](https://github.com/qubole/kinesis-sql#kinesis-sink-configuration){ target="_blank" }
mode (str): Method of writing to Kinesis - append, complete, update
trigger (str): Frequency of the write operation. Specify "availableNow" to execute a trigger once, otherwise specify a time period such as "30 seconds", "5 minutes"
trigger (optional str): Frequency of the write operation. Specify "availableNow" to execute a trigger once, otherwise specify a time period such as "30 seconds", "5 minutes". Set to "0 seconds" if you do not want to use a trigger. (stream) Default is 10 seconds
query_name (str): Unique name for the query in associated SparkSession
query_wait_interval (optional int): If set, waits for the streaming query to complete before returning. (stream) Default is None
Attributes:
endpointUrl (str): Endpoint of the kinesis stream.
Expand All @@ -37,19 +38,28 @@ class SparkKinesisDestination(DestinationInterface):
streamName (List[str]): Name of the streams in Kinesis to write to.
"""

data: DataFrame
options: dict
mode: str
trigger: str
query_name: str
query_wait_interval: int

def __init__(
self,
data: DataFrame,
options: dict,
mode: str = "update",
trigger: str = "10 seconds",
query_name="KinesisDestination",
query_wait_interval: int = None,
) -> None:
self.data = data
self.options = options
self.mode = mode
self.trigger = trigger
self.query_name = query_name
self.query_wait_interval = query_wait_interval

@staticmethod
def system_type():
Expand Down Expand Up @@ -105,10 +115,13 @@ def write_stream(self):
.queryName(self.query_name)
.start()
)
while query.isActive:
if query.lastProgress:
logging.info(query.lastProgress)
time.sleep(10)

if self.query_wait_interval:
while query.isActive:
if query.lastProgress:
logging.info(query.lastProgress)
time.sleep(self.query_wait_interval)

except Py4JJavaError as e:
logging.exception(e.errmsg)
raise e
Expand Down
Loading

0 comments on commit 159e181

Please sign in to comment.