From ac345e0ade98c2924545e0f2f36a937f850067e3 Mon Sep 17 00:00:00 2001 From: aseembansal-gogo <69132220+aseembansal-gogo@users.noreply.github.com> Date: Fri, 6 Aug 2021 22:49:21 +0530 Subject: [PATCH] feat(ingest): add underlying platform for glue (#3035) --- metadata-ingestion/README.md | 1 + .../src/datahub/ingestion/source/aws/glue.py | 17 +++++++++--- .../datahub/ingestion/source/sql/postgres.py | 2 +- .../src/datahub/metadata/schema.avsc | 12 ++++----- .../src/datahub/metadata/schema_classes.py | 24 +++++++++++------ .../metadata/schemas/MetadataChangeEvent.avsc | 12 ++++----- .../schemas/MetadataChangeProposal.avsc | 12 ++++----- .../tests/unit/test_glue_source.py | 26 +++++++++++++++++++ 8 files changed, 75 insertions(+), 31 deletions(-) diff --git a/metadata-ingestion/README.md b/metadata-ingestion/README.md index fa51b7d067..cdb1593722 100644 --- a/metadata-ingestion/README.md +++ b/metadata-ingestion/README.md @@ -646,6 +646,7 @@ source: aws_secret_access_key: # Optional. aws_session_token: # Optional. aws_role: # Optional (Role chaining supported by using a sorted list). + underlying_platform: #Optional (Can change platform name to be athena) ``` ### Druid `druid` diff --git a/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py b/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py index f1ef98544e..43506838d9 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py +++ b/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py @@ -47,6 +47,7 @@ from datahub.metadata.schema_classes import ( class GlueSourceConfig(AwsSourceConfig): extract_transforms: Optional[bool] = True + underlying_platform: Optional[str] = None @property def glue_client(self): @@ -80,6 +81,7 @@ class GlueSource(Source): self.glue_client = config.glue_client self.s3_client = config.s3_client self.extract_transforms = config.extract_transforms + self.underlying_platform = config.underlying_platform self.env = config.env @classmethod @@ -87,6 +89,11 @@ class GlueSource(Source): config = GlueSourceConfig.parse_obj(config_dict) return cls(config, ctx) + def get_underlying_platform(self): + if self.underlying_platform in ["athena"]: + return self.underlying_platform + return "glue" + def get_all_jobs(self): """ List all jobs in Glue. @@ -195,7 +202,7 @@ class GlueSource(Source): full_table_name = f"{node_args['database']}.{node_args['table_name']}" # we know that the table will already be covered when ingesting Glue tables - node_urn = f"urn:li:dataset:(urn:li:dataPlatform:glue,{full_table_name},{self.env})" + node_urn = f"urn:li:dataset:(urn:li:dataPlatform:{self.get_underlying_platform()},{full_table_name},{self.env})" # if data object is S3 bucket elif node_args.get("connection_type") == "s3": @@ -447,7 +454,9 @@ class GlueSource(Source): for job in self.get_all_jobs(): - flow_urn = mce_builder.make_data_flow_urn("glue", job["Name"], self.env) + flow_urn = mce_builder.make_data_flow_urn( + self.get_underlying_platform(), job["Name"], self.env + ) flow_wu = self.get_dataflow_wu(flow_urn, job) self.report.report_workunit(flow_wu) @@ -550,13 +559,13 @@ class GlueSource(Source): schemaName=table_name, version=0, fields=fields, - platform="urn:li:dataPlatform:glue", + platform=f"urn:li:dataPlatform:{self.get_underlying_platform()}", hash="", platformSchema=MySqlDDL(tableSchema=""), ) dataset_snapshot = DatasetSnapshot( - urn=f"urn:li:dataset:(urn:li:dataPlatform:glue,{table_name},{self.env})", + urn=f"urn:li:dataset:(urn:li:dataPlatform:{self.get_underlying_platform()},{table_name},{self.env})", aspects=[], ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/postgres.py b/metadata-ingestion/src/datahub/ingestion/source/sql/postgres.py index 097c931057..0fc38e4c16 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/postgres.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/postgres.py @@ -27,7 +27,7 @@ class PostgresConfig(BasicSQLAlchemyConfig): # defaults scheme = "postgresql+psycopg2" - def get_identifier(self, schema: str, table: str) -> str: + def get_identifier(self: BasicSQLAlchemyConfig, schema: str, table: str) -> str: regular = f"{schema}.{table}" if self.database_alias: return f"{self.database_alias}.{regular}" diff --git a/metadata-ingestion/src/datahub/metadata/schema.avsc b/metadata-ingestion/src/datahub/metadata/schema.avsc index feae9c9f73..c1c038e30d 100644 --- a/metadata-ingestion/src/datahub/metadata/schema.avsc +++ b/metadata-ingestion/src/datahub/metadata/schema.avsc @@ -5219,20 +5219,20 @@ "fields": [ { "type": [ - "null", - "long" + "long", + "null" ], "name": "lastObserved", - "default": null, + "default": 0, "doc": "The timestamp the metadata was observed at" }, { "type": [ - "null", - "string" + "string", + "null" ], "name": "runId", - "default": null, + "default": "no-run-id-provided", "doc": "The run id that produced the metadata" }, { diff --git a/metadata-ingestion/src/datahub/metadata/schema_classes.py b/metadata-ingestion/src/datahub/metadata/schema_classes.py index 1b1e533fc9..b211a36bb3 100644 --- a/metadata-ingestion/src/datahub/metadata/schema_classes.py +++ b/metadata-ingestion/src/datahub/metadata/schema_classes.py @@ -7381,14 +7381,22 @@ class SystemMetadataClass(DictWrapper): RECORD_SCHEMA = get_schema_type("com.linkedin.pegasus2avro.mxe.SystemMetadata") def __init__(self, - lastObserved: Union[None, int]=None, - runId: Union[None, str]=None, + lastObserved: Optional[Union[int, None]]=None, + runId: Optional[Union[str, None]]=None, properties: Union[None, Dict[str, str]]=None, ): super().__init__() - self.lastObserved = lastObserved - self.runId = runId + if lastObserved is None: + # default: 0 + self.lastObserved = self.RECORD_SCHEMA.field_map["lastObserved"].default + else: + self.lastObserved = lastObserved + if runId is None: + # default: 'no-run-id-provided' + self.runId = self.RECORD_SCHEMA.field_map["runId"].default + else: + self.runId = runId self.properties = properties @classmethod @@ -7405,23 +7413,23 @@ class SystemMetadataClass(DictWrapper): @property - def lastObserved(self) -> Union[None, int]: + def lastObserved(self) -> Union[int, None]: """Getter: The timestamp the metadata was observed at""" return self._inner_dict.get('lastObserved') # type: ignore @lastObserved.setter - def lastObserved(self, value: Union[None, int]) -> None: + def lastObserved(self, value: Union[int, None]) -> None: """Setter: The timestamp the metadata was observed at""" self._inner_dict['lastObserved'] = value @property - def runId(self) -> Union[None, str]: + def runId(self) -> Union[str, None]: """Getter: The run id that produced the metadata""" return self._inner_dict.get('runId') # type: ignore @runId.setter - def runId(self, value: Union[None, str]) -> None: + def runId(self, value: Union[str, None]) -> None: """Setter: The run id that produced the metadata""" self._inner_dict['runId'] = value diff --git a/metadata-ingestion/src/datahub/metadata/schemas/MetadataChangeEvent.avsc b/metadata-ingestion/src/datahub/metadata/schemas/MetadataChangeEvent.avsc index 17ae20c4b6..e8bcce4460 100644 --- a/metadata-ingestion/src/datahub/metadata/schemas/MetadataChangeEvent.avsc +++ b/metadata-ingestion/src/datahub/metadata/schemas/MetadataChangeEvent.avsc @@ -5163,20 +5163,20 @@ { "name": "lastObserved", "type": [ - "null", - "long" + "long", + "null" ], "doc": "The timestamp the metadata was observed at", - "default": null + "default": 0 }, { "name": "runId", "type": [ - "null", - "string" + "string", + "null" ], "doc": "The run id that produced the metadata", - "default": null + "default": "no-run-id-provided" }, { "name": "properties", diff --git a/metadata-ingestion/src/datahub/metadata/schemas/MetadataChangeProposal.avsc b/metadata-ingestion/src/datahub/metadata/schemas/MetadataChangeProposal.avsc index cadf7e1319..f1bd5328cd 100644 --- a/metadata-ingestion/src/datahub/metadata/schemas/MetadataChangeProposal.avsc +++ b/metadata-ingestion/src/datahub/metadata/schemas/MetadataChangeProposal.avsc @@ -185,20 +185,20 @@ { "name": "lastObserved", "type": [ - "null", - "long" + "long", + "null" ], "doc": "The timestamp the metadata was observed at", - "default": null + "default": 0 }, { "name": "runId", "type": [ - "null", - "string" + "string", + "null" ], "doc": "The run id that produced the metadata", - "default": null + "default": "no-run-id-provided" }, { "name": "properties", diff --git a/metadata-ingestion/tests/unit/test_glue_source.py b/metadata-ingestion/tests/unit/test_glue_source.py index 0cbf8b1567..6dc91d3b37 100644 --- a/metadata-ingestion/tests/unit/test_glue_source.py +++ b/metadata-ingestion/tests/unit/test_glue_source.py @@ -143,3 +143,29 @@ def test_glue_ingest(tmp_path, pytestconfig): output_path=tmp_path / "glue_mces.json", golden_path=test_resources_dir / "glue_mces_golden.json", ) + + +def test_underlying_platform_takes_precendence(): + source = GlueSource( + ctx=PipelineContext(run_id="glue-source-test"), + config=GlueSourceConfig(aws_region="us-west-2", underlying_platform="athena"), + ) + assert source.get_underlying_platform() == "athena" + + +def test_underlying_platform_cannot_be_other_than_athena(): + source = GlueSource( + ctx=PipelineContext(run_id="glue-source-test"), + config=GlueSourceConfig( + aws_region="us-west-2", underlying_platform="data-warehouse" + ), + ) + assert source.get_underlying_platform() == "glue" + + +def test_without_underlying_platform(): + source = GlueSource( + ctx=PipelineContext(run_id="glue-source-test"), + config=GlueSourceConfig(aws_region="us-west-2"), + ) + assert source.get_underlying_platform() == "glue"