diff --git a/datajob/glue/glue_job.py b/datajob/glue/glue_job.py index 3ed203b..54bc876 100644 --- a/datajob/glue/glue_job.py +++ b/datajob/glue/glue_job.py @@ -40,7 +40,6 @@ def __init__( state_id: str = None, job_name: str = None, wait_for_completion=True, - *args, **kwargs, ): """ @@ -55,7 +54,6 @@ def __init__( :param role: you can provide a cdk iam role object as arg. if not provided this class will instantiate a role, :param worker_type: you can provide a worker type Standard / G.1X / G.2X :param number_of_workers: for pythonshell is this 0.0625 or 1. for glueetl is this minimum 2. - :param args: any extra args for the glue.CfnJob :param kwargs: any extra kwargs for the glue.CfnJob """ logger.info(f"creating glue job {name}") @@ -79,13 +77,12 @@ def __init__( self.state_id = self.unique_name if state_id is None else state_id self.wait_for_completion = wait_for_completion self.job_name = self.unique_name if job_name is None else job_name - self.args = args self.kwargs = kwargs self.sfn_task = GlueStartJobRunStep( state_id=self.state_id, wait_for_completion=self.wait_for_completion, parameters={"JobName": self.job_name}, - **kwargs, + **self.kwargs, ) logger.info(f"glue job {name} created.") @@ -106,7 +103,6 @@ def create(self): max_capacity=self.max_capacity, worker_type=self.worker_type, number_of_workers=self.number_of_workers, - *self.args, **self.kwargs, ) @@ -220,7 +216,6 @@ def _create_glue_job( max_capacity: int = None, worker_type: str = None, number_of_workers: str = None, - *args, **kwargs, ) -> None: """Create a glue job with the necessary configuration like, paths to @@ -247,6 +242,5 @@ def _create_glue_job( default_arguments=arguments, worker_type=worker_type, number_of_workers=number_of_workers, - *args, **kwargs, ) diff --git a/datajob/stepfunctions/stepfunctions_workflow.py b/datajob/stepfunctions/stepfunctions_workflow.py index ba038d4..487cc96 100644 --- a/datajob/stepfunctions/stepfunctions_workflow.py +++ b/datajob/stepfunctions/stepfunctions_workflow.py @@ -63,6 +63,7 @@ def __init__( region if region is not None else os.environ.get("AWS_DEFAULT_REGION") ) self.notification = self._setup_notification(notification) + self.kwargs = kwargs # init directed graph dict where values are a set. # we do it like this so that we can use toposort. self.directed_graph = defaultdict(set) @@ -138,6 +139,7 @@ def build_workflow(self): definition=self.chain_of_tasks, role=self.role.role_arn, client=sfn_client, + **self.kwargs, ) def create(self): @@ -151,6 +153,7 @@ def create(self): state_machine_name=self.unique_name, role_arn=self.role.role_arn, definition_string=cfn_template, + **self.kwargs, ) def _setup_notification(