feat(ingest): add underlying platform for glue (#3035)

This commit is contained in:
aseembansal-gogo 2021-08-06 22:49:21 +05:30 committed by GitHub
parent 916b374a0e
commit ac345e0ade
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 75 additions and 31 deletions

View File

@ -646,6 +646,7 @@ source:
aws_secret_access_key: # Optional. aws_secret_access_key: # Optional.
aws_session_token: # Optional. aws_session_token: # Optional.
aws_role: # Optional (Role chaining supported by using a sorted list). aws_role: # Optional (Role chaining supported by using a sorted list).
underlying_platform: #Optional (Can change platform name to be athena)
``` ```
### Druid `druid` ### Druid `druid`

View File

@ -47,6 +47,7 @@ from datahub.metadata.schema_classes import (
class GlueSourceConfig(AwsSourceConfig): class GlueSourceConfig(AwsSourceConfig):
extract_transforms: Optional[bool] = True extract_transforms: Optional[bool] = True
underlying_platform: Optional[str] = None
@property @property
def glue_client(self): def glue_client(self):
@ -80,6 +81,7 @@ class GlueSource(Source):
self.glue_client = config.glue_client self.glue_client = config.glue_client
self.s3_client = config.s3_client self.s3_client = config.s3_client
self.extract_transforms = config.extract_transforms self.extract_transforms = config.extract_transforms
self.underlying_platform = config.underlying_platform
self.env = config.env self.env = config.env
@classmethod @classmethod
@ -87,6 +89,11 @@ class GlueSource(Source):
config = GlueSourceConfig.parse_obj(config_dict) config = GlueSourceConfig.parse_obj(config_dict)
return cls(config, ctx) return cls(config, ctx)
def get_underlying_platform(self):
if self.underlying_platform in ["athena"]:
return self.underlying_platform
return "glue"
def get_all_jobs(self): def get_all_jobs(self):
""" """
List all jobs in Glue. List all jobs in Glue.
@ -195,7 +202,7 @@ class GlueSource(Source):
full_table_name = f"{node_args['database']}.{node_args['table_name']}" full_table_name = f"{node_args['database']}.{node_args['table_name']}"
# we know that the table will already be covered when ingesting Glue tables # we know that the table will already be covered when ingesting Glue tables
node_urn = f"urn:li:dataset:(urn:li:dataPlatform:glue,{full_table_name},{self.env})" node_urn = f"urn:li:dataset:(urn:li:dataPlatform:{self.get_underlying_platform()},{full_table_name},{self.env})"
# if data object is S3 bucket # if data object is S3 bucket
elif node_args.get("connection_type") == "s3": elif node_args.get("connection_type") == "s3":
@ -447,7 +454,9 @@ class GlueSource(Source):
for job in self.get_all_jobs(): for job in self.get_all_jobs():
flow_urn = mce_builder.make_data_flow_urn("glue", job["Name"], self.env) flow_urn = mce_builder.make_data_flow_urn(
self.get_underlying_platform(), job["Name"], self.env
)
flow_wu = self.get_dataflow_wu(flow_urn, job) flow_wu = self.get_dataflow_wu(flow_urn, job)
self.report.report_workunit(flow_wu) self.report.report_workunit(flow_wu)
@ -550,13 +559,13 @@ class GlueSource(Source):
schemaName=table_name, schemaName=table_name,
version=0, version=0,
fields=fields, fields=fields,
platform="urn:li:dataPlatform:glue", platform=f"urn:li:dataPlatform:{self.get_underlying_platform()}",
hash="", hash="",
platformSchema=MySqlDDL(tableSchema=""), platformSchema=MySqlDDL(tableSchema=""),
) )
dataset_snapshot = DatasetSnapshot( dataset_snapshot = DatasetSnapshot(
urn=f"urn:li:dataset:(urn:li:dataPlatform:glue,{table_name},{self.env})", urn=f"urn:li:dataset:(urn:li:dataPlatform:{self.get_underlying_platform()},{table_name},{self.env})",
aspects=[], aspects=[],
) )

View File

@ -27,7 +27,7 @@ class PostgresConfig(BasicSQLAlchemyConfig):
# defaults # defaults
scheme = "postgresql+psycopg2" scheme = "postgresql+psycopg2"
def get_identifier(self, schema: str, table: str) -> str: def get_identifier(self: BasicSQLAlchemyConfig, schema: str, table: str) -> str:
regular = f"{schema}.{table}" regular = f"{schema}.{table}"
if self.database_alias: if self.database_alias:
return f"{self.database_alias}.{regular}" return f"{self.database_alias}.{regular}"

View File

@ -5219,20 +5219,20 @@
"fields": [ "fields": [
{ {
"type": [ "type": [
"null", "long",
"long" "null"
], ],
"name": "lastObserved", "name": "lastObserved",
"default": null, "default": 0,
"doc": "The timestamp the metadata was observed at" "doc": "The timestamp the metadata was observed at"
}, },
{ {
"type": [ "type": [
"null", "string",
"string" "null"
], ],
"name": "runId", "name": "runId",
"default": null, "default": "no-run-id-provided",
"doc": "The run id that produced the metadata" "doc": "The run id that produced the metadata"
}, },
{ {

View File

@ -7381,14 +7381,22 @@ class SystemMetadataClass(DictWrapper):
RECORD_SCHEMA = get_schema_type("com.linkedin.pegasus2avro.mxe.SystemMetadata") RECORD_SCHEMA = get_schema_type("com.linkedin.pegasus2avro.mxe.SystemMetadata")
def __init__(self, def __init__(self,
lastObserved: Union[None, int]=None, lastObserved: Optional[Union[int, None]]=None,
runId: Union[None, str]=None, runId: Optional[Union[str, None]]=None,
properties: Union[None, Dict[str, str]]=None, properties: Union[None, Dict[str, str]]=None,
): ):
super().__init__() super().__init__()
self.lastObserved = lastObserved if lastObserved is None:
self.runId = runId # default: 0
self.lastObserved = self.RECORD_SCHEMA.field_map["lastObserved"].default
else:
self.lastObserved = lastObserved
if runId is None:
# default: 'no-run-id-provided'
self.runId = self.RECORD_SCHEMA.field_map["runId"].default
else:
self.runId = runId
self.properties = properties self.properties = properties
@classmethod @classmethod
@ -7405,23 +7413,23 @@ class SystemMetadataClass(DictWrapper):
@property @property
def lastObserved(self) -> Union[None, int]: def lastObserved(self) -> Union[int, None]:
"""Getter: The timestamp the metadata was observed at""" """Getter: The timestamp the metadata was observed at"""
return self._inner_dict.get('lastObserved') # type: ignore return self._inner_dict.get('lastObserved') # type: ignore
@lastObserved.setter @lastObserved.setter
def lastObserved(self, value: Union[None, int]) -> None: def lastObserved(self, value: Union[int, None]) -> None:
"""Setter: The timestamp the metadata was observed at""" """Setter: The timestamp the metadata was observed at"""
self._inner_dict['lastObserved'] = value self._inner_dict['lastObserved'] = value
@property @property
def runId(self) -> Union[None, str]: def runId(self) -> Union[str, None]:
"""Getter: The run id that produced the metadata""" """Getter: The run id that produced the metadata"""
return self._inner_dict.get('runId') # type: ignore return self._inner_dict.get('runId') # type: ignore
@runId.setter @runId.setter
def runId(self, value: Union[None, str]) -> None: def runId(self, value: Union[str, None]) -> None:
"""Setter: The run id that produced the metadata""" """Setter: The run id that produced the metadata"""
self._inner_dict['runId'] = value self._inner_dict['runId'] = value

View File

@ -5163,20 +5163,20 @@
{ {
"name": "lastObserved", "name": "lastObserved",
"type": [ "type": [
"null", "long",
"long" "null"
], ],
"doc": "The timestamp the metadata was observed at", "doc": "The timestamp the metadata was observed at",
"default": null "default": 0
}, },
{ {
"name": "runId", "name": "runId",
"type": [ "type": [
"null", "string",
"string" "null"
], ],
"doc": "The run id that produced the metadata", "doc": "The run id that produced the metadata",
"default": null "default": "no-run-id-provided"
}, },
{ {
"name": "properties", "name": "properties",

View File

@ -185,20 +185,20 @@
{ {
"name": "lastObserved", "name": "lastObserved",
"type": [ "type": [
"null", "long",
"long" "null"
], ],
"doc": "The timestamp the metadata was observed at", "doc": "The timestamp the metadata was observed at",
"default": null "default": 0
}, },
{ {
"name": "runId", "name": "runId",
"type": [ "type": [
"null", "string",
"string" "null"
], ],
"doc": "The run id that produced the metadata", "doc": "The run id that produced the metadata",
"default": null "default": "no-run-id-provided"
}, },
{ {
"name": "properties", "name": "properties",

View File

@ -143,3 +143,29 @@ def test_glue_ingest(tmp_path, pytestconfig):
output_path=tmp_path / "glue_mces.json", output_path=tmp_path / "glue_mces.json",
golden_path=test_resources_dir / "glue_mces_golden.json", golden_path=test_resources_dir / "glue_mces_golden.json",
) )
def test_underlying_platform_takes_precendence():
source = GlueSource(
ctx=PipelineContext(run_id="glue-source-test"),
config=GlueSourceConfig(aws_region="us-west-2", underlying_platform="athena"),
)
assert source.get_underlying_platform() == "athena"
def test_underlying_platform_cannot_be_other_than_athena():
source = GlueSource(
ctx=PipelineContext(run_id="glue-source-test"),
config=GlueSourceConfig(
aws_region="us-west-2", underlying_platform="data-warehouse"
),
)
assert source.get_underlying_platform() == "glue"
def test_without_underlying_platform():
source = GlueSource(
ctx=PipelineContext(run_id="glue-source-test"),
config=GlueSourceConfig(aws_region="us-west-2"),
)
assert source.get_underlying_platform() == "glue"