mirror of
https://github.com/datahub-project/datahub.git
synced 2025-11-25 17:26:33 +00:00
fix(ingest): don't assume Glue job description always exists (#3019)
This commit is contained in:
parent
35aca2cad6
commit
07bbd50ad2
@ -301,21 +301,30 @@ class GlueSource(Source):
|
|||||||
|
|
||||||
region = self.source_config.aws_region
|
region = self.source_config.aws_region
|
||||||
|
|
||||||
|
custom_props = {
|
||||||
|
"role": job["Role"],
|
||||||
|
}
|
||||||
|
|
||||||
|
if job.get("CreatedOn") is not None:
|
||||||
|
custom_props["created"] = str(job["CreatedOn"])
|
||||||
|
|
||||||
|
if job.get("LastModifiedOn") is not None:
|
||||||
|
custom_props["modified"] = str(job["LastModifiedOn"])
|
||||||
|
|
||||||
|
command = job.get("Command", {}).get("ScriptLocation")
|
||||||
|
if command is not None:
|
||||||
|
custom_props["command"] = command
|
||||||
|
|
||||||
mce = MetadataChangeEventClass(
|
mce = MetadataChangeEventClass(
|
||||||
proposedSnapshot=DataFlowSnapshotClass(
|
proposedSnapshot=DataFlowSnapshotClass(
|
||||||
urn=flow_urn,
|
urn=flow_urn,
|
||||||
aspects=[
|
aspects=[
|
||||||
DataFlowInfoClass(
|
DataFlowInfoClass(
|
||||||
name=job["Name"],
|
name=job["Name"],
|
||||||
description=job["Description"],
|
description=job.get("Description"),
|
||||||
externalUrl=f"https://{region}.console.aws.amazon.com/gluestudio/home?region={region}#/editor/job/{job['Name']}/graph",
|
externalUrl=f"https://{region}.console.aws.amazon.com/gluestudio/home?region={region}#/editor/job/{job['Name']}/graph",
|
||||||
# specify a few Glue-specific properties
|
# specify a few Glue-specific properties
|
||||||
customProperties={
|
customProperties=custom_props,
|
||||||
"role": job["Role"],
|
|
||||||
"created": str(job["CreatedOn"]),
|
|
||||||
"modified": str(job["LastModifiedOn"]),
|
|
||||||
"command": job["Command"]["ScriptLocation"],
|
|
||||||
},
|
|
||||||
),
|
),
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
@ -430,7 +439,13 @@ class GlueSource(Source):
|
|||||||
self.report.report_workunit(flow_wu)
|
self.report.report_workunit(flow_wu)
|
||||||
yield flow_wu
|
yield flow_wu
|
||||||
|
|
||||||
dag = self.get_dataflow_graph(job["Command"]["ScriptLocation"])
|
job_script_location = job.get("Command", {}).get("ScriptLocation")
|
||||||
|
|
||||||
|
dag: Optional[Dict[str, Any]] = None
|
||||||
|
|
||||||
|
if job_script_location is not None:
|
||||||
|
|
||||||
|
dag = self.get_dataflow_graph(job_script_location)
|
||||||
|
|
||||||
dags[flow_urn] = dag
|
dags[flow_urn] = dag
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user