feat(ingestion) Add more info to glue entities (#5874)

Co-authored-by: Harshal Sheth <hsheth2@gmail.com>
This commit is contained in:
skrydal 2022-09-14 21:25:09 +02:00 committed by GitHub
parent 43edc3b656
commit f61a040555
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 483 additions and 1073 deletions

View File

@ -1,13 +1,14 @@
import logging
import typing
from collections import defaultdict
from dataclasses import dataclass, field as dataclass_field
from typing import (
Any,
DefaultDict,
Dict,
Iterable,
Iterator,
List,
Mapping,
Optional,
Set,
Tuple,
@ -65,7 +66,7 @@ from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionConfigBase,
StatefulIngestionSourceBase,
)
from datahub.metadata.com.linkedin.pegasus2avro.common import Status
from datahub.metadata.com.linkedin.pegasus2avro.common import Status, SubTypes
from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import DatasetSnapshot
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
from datahub.metadata.com.linkedin.pegasus2avro.schema import (
@ -414,7 +415,7 @@ class GlueSource(StatefulIngestionSourceBase):
flow_urn: str,
new_dataset_ids: List[str],
new_dataset_mces: List[MetadataChangeEvent],
s3_formats: typing.DefaultDict[str, Set[Union[str, None]]],
s3_formats: DefaultDict[str, Set[Union[str, None]]],
) -> Optional[Dict[str, Any]]:
node_type = node["NodeType"]
@ -509,7 +510,7 @@ class GlueSource(StatefulIngestionSourceBase):
self,
dataflow_graph: Dict[str, Any],
flow_urn: str,
s3_formats: typing.DefaultDict[str, Set[Union[str, None]]],
s3_formats: DefaultDict[str, Set[Union[str, None]]],
) -> Tuple[Dict[str, Dict[str, Any]], List[str], List[MetadataChangeEvent]]:
"""
Prepare a job's DAG for ingestion.
@ -654,7 +655,9 @@ class GlueSource(StatefulIngestionSourceBase):
return MetadataWorkUnit(id=f'{job_name}-{node["Id"]}', mce=mce)
def get_all_tables(self) -> List[dict]:
def get_all_tables_and_databases(
self,
) -> Tuple[Dict, List[Dict]]:
def get_tables_from_database(database_name: str) -> List[dict]:
new_tables = []
@ -673,8 +676,8 @@ class GlueSource(StatefulIngestionSourceBase):
return new_tables
def get_database_names() -> List[str]:
database_names = []
def get_databases() -> List[Mapping[str, Any]]:
databases = []
# see https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html#Glue.Client.get_databases
paginator = self.glue_client.get_paginator("get_databases")
@ -689,19 +692,25 @@ class GlueSource(StatefulIngestionSourceBase):
for page in paginator_response:
for db in page["DatabaseList"]:
if self.source_config.database_pattern.allowed(db["Name"]):
database_names.append(db["Name"])
databases.append(db)
return database_names
return databases
if self.source_config.database_pattern.is_fully_specified_allow_list():
database_names = self.source_config.database_pattern.get_allowed_list()
else:
database_names = get_database_names()
all_databases = get_databases()
all_tables: List[dict] = []
for database in database_names:
all_tables += get_tables_from_database(database)
return all_tables
databases = {
database["Name"]: database
for database in all_databases
if self.source_config.database_pattern.allowed(database["Name"])
}
all_tables: List[dict] = [
table
for databaseName in databases.keys()
for table in get_tables_from_database(databaseName)
]
return databases, all_tables
def get_lineage_if_enabled(
self, mce: MetadataChangeEventClass
@ -905,14 +914,17 @@ class GlueSource(StatefulIngestionSourceBase):
else self.source_config.env,
)
def gen_database_containers(self, database: str) -> Iterable[MetadataWorkUnit]:
domain_urn = self._gen_domain_urn(database)
database_container_key = self.gen_database_key(database)
def gen_database_containers(
self, database: Mapping[str, Any]
) -> Iterable[MetadataWorkUnit]:
domain_urn = self._gen_domain_urn(database["Name"])
database_container_key = self.gen_database_key(database["Name"])
container_workunits = gen_containers(
container_key=database_container_key,
name=database,
name=database["Name"],
sub_types=["Database"],
domain_urn=domain_urn,
description=database.get("Description"),
)
for wu in container_workunits:
@ -955,7 +967,7 @@ class GlueSource(StatefulIngestionSourceBase):
def get_workunits(self) -> Iterable[MetadataWorkUnit]:
database_seen = set()
tables = self.get_all_tables()
databases, tables = self.get_all_tables_and_databases()
for table in tables:
database_name = table["DatabaseName"]
@ -969,19 +981,29 @@ class GlueSource(StatefulIngestionSourceBase):
continue
if database_name not in database_seen:
database_seen.add(database_name)
yield from self.gen_database_containers(database_name)
yield from self.gen_database_containers(databases[database_name])
mce = self._extract_record(table, full_table_name)
workunit = MetadataWorkUnit(full_table_name, mce=mce)
self.report.report_workunit(workunit)
yield workunit
dataset_urn: str = make_dataset_urn_with_platform_instance(
dataset_urn = make_dataset_urn_with_platform_instance(
platform=self.platform,
name=full_table_name,
env=self.env,
platform_instance=self.source_config.platform_instance,
)
mce = self._extract_record(dataset_urn, table, full_table_name)
workunit = MetadataWorkUnit(full_table_name, mce=mce)
self.report.report_workunit(workunit)
yield workunit
# We also want to assign "table" subType to the dataset representing glue table - unfortunately it is not
# possible via Dataset snapshot embedded in a mce, so we have to generate a mcp.
workunit = MetadataChangeProposalWrapper(
entityUrn=dataset_urn,
aspect=SubTypes(typeNames=["table"]),
).as_workunit()
self.report.report_workunit(workunit)
yield workunit
yield from self._get_domain_wu(
dataset_name=full_table_name,
entity_urn=dataset_urn,
@ -1044,9 +1066,7 @@ class GlueSource(StatefulIngestionSourceBase):
# in Glue, it's possible for two buckets to have files of different extensions
# if this happens, we append the extension in the URN so the sources can be distinguished
# see process_dataflow_node() for details
s3_formats: typing.DefaultDict[str, Set[Optional[str]]] = defaultdict(
lambda: set()
)
s3_formats: DefaultDict[str, Set[Optional[str]]] = defaultdict(lambda: set())
for dag in dags.values():
if dag is not None:
for s3_name, extension in self.get_dataflow_s3_names(dag):
@ -1074,7 +1094,9 @@ class GlueSource(StatefulIngestionSourceBase):
yield dataset_wu
# flake8: noqa: C901
def _extract_record(self, table: Dict, table_name: str) -> MetadataChangeEvent:
def _extract_record(
self, dataset_urn: str, table: Dict, table_name: str
) -> MetadataChangeEvent:
def get_owner() -> Optional[OwnershipClass]:
owner = table.get("Owner")
if owner:
@ -1172,7 +1194,7 @@ class GlueSource(StatefulIngestionSourceBase):
)
return new_tags
def get_schema_metadata(glue_source: GlueSource) -> SchemaMetadata:
def get_schema_metadata() -> SchemaMetadata:
schema = table["StorageDescriptor"]["Columns"]
fields: List[SchemaField] = []
for field in schema:
@ -1214,27 +1236,21 @@ class GlueSource(StatefulIngestionSourceBase):
else None,
)
dataset_urn = make_dataset_urn_with_platform_instance(
platform=self.platform,
name=table_name,
env=self.env,
platform_instance=self.source_config.platform_instance,
)
dataset_snapshot = DatasetSnapshot(
urn=dataset_urn,
aspects=[],
aspects=[
Status(removed=False),
get_dataset_properties(),
get_schema_metadata(),
get_data_platform_instance(),
],
)
dataset_snapshot.aspects.append(Status(removed=False))
if self.extract_owners:
optional_owner_aspect = get_owner()
if optional_owner_aspect is not None:
dataset_snapshot.aspects.append(optional_owner_aspect)
dataset_snapshot.aspects.append(get_dataset_properties())
dataset_snapshot.aspects.append(get_schema_metadata(self))
dataset_snapshot.aspects.append(get_data_platform_instance())
if (
self.source_config.use_s3_bucket_tags
or self.source_config.use_s3_object_tags

View File

@ -1,9 +1,7 @@
[
{
"auditHeader": null,
"entityType": "container",
"entityUrn": "urn:li:container:bdf4342ea6899d162eae685bfe9074a7",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "containerProperties",
"aspect": {
@ -12,17 +10,12 @@
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "glue-2020_04_14-07_00_00",
"registryName": null,
"registryVersion": null,
"properties": null
"runId": "glue-2020_04_14-07_00_00"
}
},
{
"auditHeader": null,
"entityType": "container",
"entityUrn": "urn:li:container:bdf4342ea6899d162eae685bfe9074a7",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
@ -31,17 +24,12 @@
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "glue-2020_04_14-07_00_00",
"registryName": null,
"registryVersion": null,
"properties": null
"runId": "glue-2020_04_14-07_00_00"
}
},
{
"auditHeader": null,
"entityType": "container",
"entityUrn": "urn:li:container:bdf4342ea6899d162eae685bfe9074a7",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
@ -50,14 +38,10 @@
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "glue-2020_04_14-07_00_00",
"registryName": null,
"registryVersion": null,
"properties": null
"runId": "glue-2020_04_14-07_00_00"
}
},
{
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:glue,test-database.test_jsons_markers,PROD)",
@ -67,22 +51,6 @@
"removed": false
}
},
{
"com.linkedin.pegasus2avro.common.Ownership": {
"owners": [
{
"owner": "urn:li:corpuser:owner",
"type": "DATAOWNER",
"source": null
}
],
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null
}
}
},
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
"customProperties": {
@ -106,11 +74,6 @@
"SortColumns": "[]",
"StoredAsSubDirectories": "False"
},
"externalUrl": null,
"name": null,
"qualifiedName": null,
"description": null,
"uri": null,
"tags": []
}
},
@ -121,17 +84,12 @@
"version": 0,
"created": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null
"actor": "urn:li:corpuser:unknown"
},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null
"actor": "urn:li:corpuser:unknown"
},
"deleted": null,
"dataset": null,
"cluster": null,
"hash": "",
"platformSchema": {
"com.linkedin.pegasus2avro.schema.MySqlDDL": {
@ -141,11 +99,7 @@
"fields": [
{
"fieldPath": "[version=2.0].[type=struct].[type=array].[type=struct].markers",
"jsonPath": null,
"nullable": true,
"description": null,
"created": null,
"lastModified": null,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.ArrayType": {
@ -157,19 +111,12 @@
},
"nativeDataType": "array<struct<name:string,position:array<double>,location:array<double>>>",
"recursive": false,
"globalTags": null,
"glossaryTerms": null,
"isPartOfKey": false,
"isPartitioningKey": null,
"jsonProps": "{\"native_data_type\": \"array<struct<name:string,position:array<double>,location:array<double>>>\"}"
},
{
"fieldPath": "[version=2.0].[type=struct].[type=array].[type=struct].markers.[type=string].name",
"jsonPath": null,
"nullable": true,
"description": null,
"created": null,
"lastModified": null,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
@ -177,19 +124,12 @@
},
"nativeDataType": "string",
"recursive": false,
"globalTags": null,
"glossaryTerms": null,
"isPartOfKey": false,
"isPartitioningKey": null,
"jsonProps": "{\"native_data_type\": \"string\", \"_nullable\": true}"
},
{
"fieldPath": "[version=2.0].[type=struct].[type=array].[type=struct].markers.[type=array].[type=double].position",
"jsonPath": null,
"nullable": true,
"description": null,
"created": null,
"lastModified": null,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.ArrayType": {
@ -201,19 +141,12 @@
},
"nativeDataType": "array<double>",
"recursive": false,
"globalTags": null,
"glossaryTerms": null,
"isPartOfKey": false,
"isPartitioningKey": null,
"jsonProps": "{\"native_data_type\": \"array<double>\"}"
},
{
"fieldPath": "[version=2.0].[type=struct].[type=array].[type=struct].markers.[type=array].[type=double].location",
"jsonPath": null,
"nullable": true,
"description": null,
"created": null,
"lastModified": null,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.ArrayType": {
@ -225,41 +158,56 @@
},
"nativeDataType": "array<double>",
"recursive": false,
"globalTags": null,
"glossaryTerms": null,
"isPartOfKey": false,
"isPartitioningKey": null,
"jsonProps": "{\"native_data_type\": \"array<double>\"}"
}
],
"primaryKeys": null,
"foreignKeysSpecs": null,
"foreignKeys": null
]
}
},
{
"com.linkedin.pegasus2avro.common.DataPlatformInstance": {
"platform": "urn:li:dataPlatform:glue",
"instance": null
"platform": "urn:li:dataPlatform:glue"
}
},
{
"com.linkedin.pegasus2avro.common.Ownership": {
"owners": [
{
"owner": "urn:li:corpuser:owner",
"type": "DATAOWNER"
}
],
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
}
}
}
]
}
},
"proposedDelta": null,
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "glue-2020_04_14-07_00_00",
"registryName": null,
"registryVersion": null,
"properties": null
"runId": "glue-2020_04_14-07_00_00"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:glue,test-database.test_jsons_markers,PROD)",
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
"value": "{\"typeNames\": [\"table\"]}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "glue-2020_04_14-07_00_00"
}
},
{
"auditHeader": null,
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:glue,test-database.test_jsons_markers,PROD)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "container",
"aspect": {
@ -268,14 +216,10 @@
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "glue-2020_04_14-07_00_00",
"registryName": null,
"registryVersion": null,
"properties": null
"runId": "glue-2020_04_14-07_00_00"
}
},
{
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:glue,test-database.test_parquet,PROD)",
@ -285,22 +229,6 @@
"removed": false
}
},
{
"com.linkedin.pegasus2avro.common.Ownership": {
"owners": [
{
"owner": "urn:li:corpuser:owner",
"type": "DATAOWNER",
"source": null
}
],
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null
}
}
},
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
"customProperties": {
@ -324,11 +252,6 @@
"SortColumns": "[]",
"StoredAsSubDirectories": "False"
},
"externalUrl": null,
"name": null,
"qualifiedName": null,
"description": null,
"uri": null,
"tags": []
}
},
@ -339,17 +262,12 @@
"version": 0,
"created": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null
"actor": "urn:li:corpuser:unknown"
},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null
"actor": "urn:li:corpuser:unknown"
},
"deleted": null,
"dataset": null,
"cluster": null,
"hash": "",
"platformSchema": {
"com.linkedin.pegasus2avro.schema.MySqlDDL": {
@ -359,11 +277,7 @@
"fields": [
{
"fieldPath": "[version=2.0].[type=int].yr",
"jsonPath": null,
"nullable": true,
"description": null,
"created": null,
"lastModified": null,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.NumberType": {}
@ -371,19 +285,12 @@
},
"nativeDataType": "int",
"recursive": false,
"globalTags": null,
"glossaryTerms": null,
"isPartOfKey": false,
"isPartitioningKey": null,
"jsonProps": "{\"native_data_type\": \"int\", \"_nullable\": true}"
},
{
"fieldPath": "[version=2.0].[type=int].quarter",
"jsonPath": null,
"nullable": true,
"description": null,
"created": null,
"lastModified": null,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.NumberType": {}
@ -391,19 +298,12 @@
},
"nativeDataType": "int",
"recursive": false,
"globalTags": null,
"glossaryTerms": null,
"isPartOfKey": false,
"isPartitioningKey": null,
"jsonProps": "{\"native_data_type\": \"int\", \"_nullable\": true}"
},
{
"fieldPath": "[version=2.0].[type=int].month",
"jsonPath": null,
"nullable": true,
"description": null,
"created": null,
"lastModified": null,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.NumberType": {}
@ -411,19 +311,12 @@
},
"nativeDataType": "int",
"recursive": false,
"globalTags": null,
"glossaryTerms": null,
"isPartOfKey": false,
"isPartitioningKey": null,
"jsonProps": "{\"native_data_type\": \"int\", \"_nullable\": true}"
},
{
"fieldPath": "[version=2.0].[type=int].dayofmonth",
"jsonPath": null,
"nullable": true,
"description": null,
"created": null,
"lastModified": null,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.NumberType": {}
@ -431,19 +324,12 @@
},
"nativeDataType": "int",
"recursive": false,
"globalTags": null,
"glossaryTerms": null,
"isPartOfKey": false,
"isPartitioningKey": null,
"jsonProps": "{\"native_data_type\": \"int\", \"_nullable\": true}"
},
{
"fieldPath": "[version=2.0].[type=string].year",
"jsonPath": null,
"nullable": true,
"description": null,
"created": null,
"lastModified": null,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
@ -451,41 +337,56 @@
},
"nativeDataType": "string",
"recursive": false,
"globalTags": null,
"glossaryTerms": null,
"isPartOfKey": false,
"isPartitioningKey": null,
"jsonProps": "{\"native_data_type\": \"string\", \"_nullable\": true}"
}
],
"primaryKeys": null,
"foreignKeysSpecs": null,
"foreignKeys": null
]
}
},
{
"com.linkedin.pegasus2avro.common.DataPlatformInstance": {
"platform": "urn:li:dataPlatform:glue",
"instance": null
"platform": "urn:li:dataPlatform:glue"
}
},
{
"com.linkedin.pegasus2avro.common.Ownership": {
"owners": [
{
"owner": "urn:li:corpuser:owner",
"type": "DATAOWNER"
}
],
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
}
}
}
]
}
},
"proposedDelta": null,
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "glue-2020_04_14-07_00_00",
"registryName": null,
"registryVersion": null,
"properties": null
"runId": "glue-2020_04_14-07_00_00"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:glue,test-database.test_parquet,PROD)",
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
"value": "{\"typeNames\": [\"table\"]}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "glue-2020_04_14-07_00_00"
}
},
{
"auditHeader": null,
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:glue,test-database.test_parquet,PROD)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "container",
"aspect": {
@ -494,17 +395,12 @@
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "glue-2020_04_14-07_00_00",
"registryName": null,
"registryVersion": null,
"properties": null
"runId": "glue-2020_04_14-07_00_00"
}
},
{
"auditHeader": null,
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:glue,flights-database.avro,PROD)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
@ -513,10 +409,7 @@
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "glue-2020_04_14-07_00_00",
"registryName": null,
"registryVersion": null,
"properties": null
"runId": "glue-2020_04_14-07_00_00"
}
}
]

File diff suppressed because it is too large Load Diff

View File

@ -30,6 +30,8 @@ from tests.test_helpers.state_helpers import (
)
from tests.test_helpers.type_helpers import PytestConfig
from tests.unit.test_glue_source_stubs import (
databases_1,
databases_2,
get_bucket_tagging,
get_databases_response,
get_dataflow_graph_response_1,
@ -277,13 +279,13 @@ def test_glue_stateful(pytestconfig, tmp_path, mock_time, mock_datahub_graph):
) as mock_checkpoint:
mock_checkpoint.return_value = mock_datahub_graph
with patch(
"datahub.ingestion.source.aws.glue.GlueSource.get_all_tables",
) as mock_get_all_tables:
"datahub.ingestion.source.aws.glue.GlueSource.get_all_tables_and_databases",
) as mock_get_all_tables_and_databases:
tables_on_first_call = tables_1
tables_on_second_call = tables_2
mock_get_all_tables.side_effect = [
tables_on_first_call,
tables_on_second_call,
mock_get_all_tables_and_databases.side_effect = [
(databases_1, tables_on_first_call),
(databases_2, tables_on_second_call),
]
pipeline_run1 = run_and_get_pipeline(pipeline_config_dict)

View File

@ -34,6 +34,8 @@ get_databases_response = {
},
]
}
databases_1 = {"flights-database": {"Name": "flights-database"}}
databases_2 = {"test-database": {"Name": "test-database"}}
tables_1 = [
{
"Name": "avro",