fix(glue): fix typo in reported warning, report with flow_urn (#8138)

This commit is contained in:
Mayuri Nehate 2023-06-12 17:38:20 +05:30 committed by GitHub
parent ab3fe0da81
commit f3cf9b7d5a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -304,7 +304,9 @@ class GlueSource(StatefulIngestionSourceBase):
return jobs
def get_dataflow_graph(self, script_path: str) -> Optional[Dict[str, Any]]:
def get_dataflow_graph(
self, script_path: str, flow_urn: str
) -> Optional[Dict[str, Any]]:
"""
Get the DAG of transforms and data sources/sinks for a job.
@ -320,8 +322,8 @@ class GlueSource(StatefulIngestionSourceBase):
# catch any other cases where the script path is invalid
if not script_path.startswith("s3://"):
self.report.report_warning(
script_path,
self.report_warning(
flow_urn,
f"Error parsing DAG for Glue job. The script {script_path} is not a valid S3 path.",
)
self.report.num_job_script_location_invalid += 1
@ -338,8 +340,8 @@ class GlueSource(StatefulIngestionSourceBase):
try:
obj = self.s3_client.get_object(Bucket=bucket, Key=key)
except botocore.exceptions.ClientError as e:
self.report.report_failure(
script_path,
self.report_warning(
flow_urn,
f"Unable to download DAG for Glue job from {script_path}, so job subtasks and lineage will be missing: {e}",
)
self.report.num_job_script_failed_download += 1
@ -353,8 +355,8 @@ class GlueSource(StatefulIngestionSourceBase):
# sometimes the Python script can be user-modified and the script is not valid for graph extraction
except self.glue_client.exceptions.InvalidInputException as e:
self.report.report_warning(
script_path,
self.report_warning(
flow_urn,
f"Error parsing DAG for Glue job. The script {script_path} cannot be processed by Glue (this usually occurs when it has been user-modified): {e}",
)
self.report.num_job_script_failed_parsing += 1
@ -425,9 +427,9 @@ class GlueSource(StatefulIngestionSourceBase):
s3_uri = self.get_s3_uri(node_args)
if s3_uri is None:
self.report.report_warning(
f"{node['Nodetype']}-{node['Id']}",
f"Could not find script path for job {node['Nodetype']}-{node['Id']} in flow {flow_urn}. Skipping",
self.report_warning(
flow_urn,
f"Could not find script path for job {node['NodeType']}-{node['Id']} in flow {flow_urn}. Skipping",
)
return None
@ -461,12 +463,9 @@ class GlueSource(StatefulIngestionSourceBase):
else:
if self.source_config.ignore_unsupported_connectors:
logger.debug(
f"Unrecognized node {node['Nodetype']}-{node['Id']} in flow {flow_urn}. Args : {node_args}",
)
self.report.report_warning(
f"{node['Nodetype']}-{node['Id']}",
f"Unrecognized node {node['Nodetype']}-{node['Id']} in flow {flow_urn}. Skipping",
self.report_warning(
flow_urn,
f"Unrecognized node {node['NodeType']}-{node['Id']} in flow {flow_urn}. Args: {node_args} Skipping",
)
return None
else:
@ -528,7 +527,7 @@ class GlueSource(StatefulIngestionSourceBase):
# Source and Target for some edges is not available
# in nodes. this may lead to broken edge in lineage.
if source_node is None or target_node is None:
self.report.report_warning(
self.report_warning(
flow_urn,
f"Unrecognized source or target node in edge: {edge}. Skipping."
"This may lead to missing lineage",
@ -992,7 +991,7 @@ class GlueSource(StatefulIngestionSourceBase):
dag: Optional[Dict[str, Any]] = None
if job_script_location is not None:
dag = self.get_dataflow_graph(job_script_location)
dag = self.get_dataflow_graph(job_script_location, flow_urn)
else:
self.report.num_job_script_location_missing += 1
@ -1213,3 +1212,7 @@ class GlueSource(StatefulIngestionSourceBase):
def get_report(self):
return self.report
def report_warning(self, key: str, reason: str) -> None:
logger.warning(f"{key}: {reason}")
self.report.report_warning(key, reason)