diff --git a/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py b/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py index f3380fa814..eeba0d8649 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py +++ b/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py @@ -304,7 +304,9 @@ class GlueSource(StatefulIngestionSourceBase): return jobs - def get_dataflow_graph(self, script_path: str) -> Optional[Dict[str, Any]]: + def get_dataflow_graph( + self, script_path: str, flow_urn: str + ) -> Optional[Dict[str, Any]]: """ Get the DAG of transforms and data sources/sinks for a job. @@ -320,8 +322,8 @@ class GlueSource(StatefulIngestionSourceBase): # catch any other cases where the script path is invalid if not script_path.startswith("s3://"): - self.report.report_warning( - script_path, + self.report_warning( + flow_urn, f"Error parsing DAG for Glue job. The script {script_path} is not a valid S3 path.", ) self.report.num_job_script_location_invalid += 1 @@ -338,8 +340,8 @@ class GlueSource(StatefulIngestionSourceBase): try: obj = self.s3_client.get_object(Bucket=bucket, Key=key) except botocore.exceptions.ClientError as e: - self.report.report_failure( - script_path, + self.report_warning( + flow_urn, f"Unable to download DAG for Glue job from {script_path}, so job subtasks and lineage will be missing: {e}", ) self.report.num_job_script_failed_download += 1 @@ -353,8 +355,8 @@ class GlueSource(StatefulIngestionSourceBase): # sometimes the Python script can be user-modified and the script is not valid for graph extraction except self.glue_client.exceptions.InvalidInputException as e: - self.report.report_warning( - script_path, + self.report_warning( + flow_urn, f"Error parsing DAG for Glue job. The script {script_path} cannot be processed by Glue (this usually occurs when it has been user-modified): {e}", ) self.report.num_job_script_failed_parsing += 1 @@ -425,9 +427,9 @@ class GlueSource(StatefulIngestionSourceBase): s3_uri = self.get_s3_uri(node_args) if s3_uri is None: - self.report.report_warning( - f"{node['Nodetype']}-{node['Id']}", - f"Could not find script path for job {node['Nodetype']}-{node['Id']} in flow {flow_urn}. Skipping", + self.report_warning( + flow_urn, + f"Could not find script path for job {node['NodeType']}-{node['Id']} in flow {flow_urn}. Skipping", ) return None @@ -461,12 +463,9 @@ class GlueSource(StatefulIngestionSourceBase): else: if self.source_config.ignore_unsupported_connectors: - logger.debug( - f"Unrecognized node {node['Nodetype']}-{node['Id']} in flow {flow_urn}. Args : {node_args}", - ) - self.report.report_warning( - f"{node['Nodetype']}-{node['Id']}", - f"Unrecognized node {node['Nodetype']}-{node['Id']} in flow {flow_urn}. Skipping", + self.report_warning( + flow_urn, + f"Unrecognized node {node['NodeType']}-{node['Id']} in flow {flow_urn}. Args: {node_args} Skipping", ) return None else: @@ -528,7 +527,7 @@ class GlueSource(StatefulIngestionSourceBase): # Source and Target for some edges is not available # in nodes. this may lead to broken edge in lineage. if source_node is None or target_node is None: - self.report.report_warning( + self.report_warning( flow_urn, f"Unrecognized source or target node in edge: {edge}. Skipping." "This may lead to missing lineage", @@ -992,7 +991,7 @@ class GlueSource(StatefulIngestionSourceBase): dag: Optional[Dict[str, Any]] = None if job_script_location is not None: - dag = self.get_dataflow_graph(job_script_location) + dag = self.get_dataflow_graph(job_script_location, flow_urn) else: self.report.num_job_script_location_missing += 1 @@ -1213,3 +1212,7 @@ class GlueSource(StatefulIngestionSourceBase): def get_report(self): return self.report + + def report_warning(self, key: str, reason: str) -> None: + logger.warning(f"{key}: {reason}") + self.report.report_warning(key, reason)