fix(ingest/Glue): column upstream lineage between S3 and Glue (#10895)

This commit is contained in:
sagar-salvi-apptware 2024-07-19 14:39:19 +05:30 committed by GitHub
parent 3733a408fd
commit 348d449d8a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 2172 additions and 6 deletions

View File

@ -24,6 +24,7 @@ import yaml
from pydantic import validator
from pydantic.fields import Field
from datahub.api.entities.dataset.dataset import Dataset
from datahub.configuration.common import AllowDenyPattern
from datahub.configuration.source_common import DatasetSourceConfigMixin
from datahub.emitter import mce_builder
@ -55,7 +56,11 @@ from datahub.ingestion.api.source import MetadataWorkUnitProcessor
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.aws import s3_util
from datahub.ingestion.source.aws.aws_common import AwsSourceConfig
from datahub.ingestion.source.aws.s3_util import is_s3_uri, make_s3_urn
from datahub.ingestion.source.aws.s3_util import (
is_s3_uri,
make_s3_urn,
make_s3_urn_for_lineage,
)
from datahub.ingestion.source.common.subtypes import (
DatasetContainerSubTypes,
DatasetSubTypes,
@ -90,6 +95,9 @@ from datahub.metadata.schema_classes import (
DatasetLineageTypeClass,
DatasetProfileClass,
DatasetPropertiesClass,
FineGrainedLineageClass,
FineGrainedLineageDownstreamTypeClass,
FineGrainedLineageUpstreamTypeClass,
GlobalTagsClass,
MetadataChangeEventClass,
OwnerClass,
@ -97,6 +105,7 @@ from datahub.metadata.schema_classes import (
OwnershipTypeClass,
PartitionSpecClass,
PartitionTypeClass,
SchemaMetadataClass,
TagAssociationClass,
UpstreamClass,
UpstreamLineageClass,
@ -171,6 +180,11 @@ class GlueSourceConfig(
description="If enabled, delta schemas can be alternatively fetched from table parameters.",
)
include_column_lineage: bool = Field(
default=True,
description="When enabled, column-level lineage will be extracted from the s3.",
)
def is_profiling_enabled(self) -> bool:
return self.profiling is not None and is_profiling_enabled(
self.profiling.operation_config
@ -283,6 +297,7 @@ class GlueSource(StatefulIngestionSourceBase):
def __init__(self, config: GlueSourceConfig, ctx: PipelineContext):
super().__init__(config, ctx)
self.ctx = ctx
self.extract_owners = config.extract_owners
self.source_config = config
self.report = GlueSourceReport()
@ -714,18 +729,43 @@ class GlueSource(StatefulIngestionSourceBase):
dataset_properties: Optional[
DatasetPropertiesClass
] = mce_builder.get_aspect_if_available(mce, DatasetPropertiesClass)
# extract dataset schema aspect
schema_metadata: Optional[
SchemaMetadataClass
] = mce_builder.get_aspect_if_available(mce, SchemaMetadataClass)
if dataset_properties and "Location" in dataset_properties.customProperties:
location = dataset_properties.customProperties["Location"]
if is_s3_uri(location):
s3_dataset_urn = make_s3_urn(location, self.source_config.env)
s3_dataset_urn = make_s3_urn_for_lineage(
location, self.source_config.env
)
assert self.ctx.graph
schema_metadata_for_s3: Optional[
SchemaMetadataClass
] = self.ctx.graph.get_schema_metadata(s3_dataset_urn)
if self.source_config.glue_s3_lineage_direction == "upstream":
fine_grained_lineages = None
if (
self.source_config.include_column_lineage
and schema_metadata
and schema_metadata_for_s3
):
fine_grained_lineages = self.get_fine_grained_lineages(
mce.proposedSnapshot.urn,
s3_dataset_urn,
schema_metadata,
schema_metadata_for_s3,
)
upstream_lineage = UpstreamLineageClass(
upstreams=[
UpstreamClass(
dataset=s3_dataset_urn,
type=DatasetLineageTypeClass.COPY,
)
]
],
fineGrainedLineages=fine_grained_lineages or None,
)
return MetadataChangeProposalWrapper(
entityUrn=mce.proposedSnapshot.urn,
@ -747,6 +787,49 @@ class GlueSource(StatefulIngestionSourceBase):
).as_workunit()
return None
def get_fine_grained_lineages(
self,
dataset_urn: str,
s3_dataset_urn: str,
schema_metadata: SchemaMetadata,
schema_metadata_for_s3: SchemaMetadata,
) -> Optional[List[FineGrainedLineageClass]]:
def simplify_field_path(field_path):
return Dataset._simplify_field_path(field_path)
if schema_metadata and schema_metadata_for_s3:
fine_grained_lineages: List[FineGrainedLineageClass] = []
for field in schema_metadata.fields:
field_path_v1 = simplify_field_path(field.fieldPath)
matching_s3_field = next(
(
f
for f in schema_metadata_for_s3.fields
if simplify_field_path(f.fieldPath) == field_path_v1
),
None,
)
if matching_s3_field:
fine_grained_lineages.append(
FineGrainedLineageClass(
downstreamType=FineGrainedLineageDownstreamTypeClass.FIELD,
downstreams=[
mce_builder.make_schema_field_urn(
dataset_urn, field_path_v1
)
],
upstreamType=FineGrainedLineageUpstreamTypeClass.FIELD_SET,
upstreams=[
mce_builder.make_schema_field_urn(
s3_dataset_urn,
simplify_field_path(matching_s3_field.fieldPath),
)
],
)
)
return fine_grained_lineages
return None
def _create_profile_mcp(
self,
mce: MetadataChangeEventClass,

View File

@ -0,0 +1,373 @@
[
{
"entityType": "container",
"entityUrn": "urn:li:container:89f32a7a37e2f61693aa4b720ace2a3c",
"changeType": "UPSERT",
"aspectName": "containerProperties",
"aspect": {
"json": {
"customProperties": {
"platform": "glue",
"env": "PROD",
"database": "flights-database-lineage",
"param1": "value1",
"param2": "value2",
"LocationUri": "s3://test-bucket/test-prefix",
"CreateTime": "June 09, 2021 at 14:14:19"
},
"name": "flights-database-lineage",
"qualifiedName": "arn:aws:glue:us-west-2:123412341234:database/flights-database-lineage"
}
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:89f32a7a37e2f61693aa4b720ace2a3c",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:89f32a7a37e2f61693aa4b720ace2a3c",
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"json": {
"platform": "urn:li:dataPlatform:glue"
}
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:89f32a7a37e2f61693aa4b720ace2a3c",
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
"json": {
"typeNames": [
"Database"
]
}
}
},
{
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:glue,flights-database-lineage.avro,PROD)",
"aspects": [
{
"com.linkedin.pegasus2avro.common.Status": {
"removed": false
}
},
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
"customProperties": {
"CrawlerSchemaDeserializerVersion": "1.0",
"CrawlerSchemaSerializerVersion": "1.0",
"UPDATED_BY_CRAWLER": "flights-crawler",
"averageRecordSize": "55",
"avro.schema.literal": "{\"type\":\"record\",\"name\":\"flights_avro_subset\",\"namespace\":\"default\",\"fields\":[{\"name\":\"yr\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"flightdate\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"uniquecarrier\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"airlineid\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"carrier\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"flightnum\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"origin\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"dest\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"depdelay\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"carrierdelay\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"weatherdelay\",\"type\":[\"null\",\"int\"],\"default\":null}]}",
"classification": "avro",
"compressionType": "none",
"objectCount": "30",
"recordCount": "169222196",
"sizeKey": "9503351413",
"typeOfData": "file",
"Location": "s3://crawler-public-us-west-2/flight/avro/",
"InputFormat": "org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat",
"OutputFormat": "org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat",
"Compressed": "False",
"NumberOfBuckets": "-1",
"SerdeInfo": "{'SerializationLibrary': 'org.apache.hadoop.hive.serde2.avro.AvroSerDe', 'Parameters': {'avro.schema.literal': '{\"type\":\"record\",\"name\":\"flights_avro_subset\",\"namespace\":\"default\",\"fields\":[{\"name\":\"yr\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"flightdate\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"uniquecarrier\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"airlineid\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"carrier\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"flightnum\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"origin\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"dest\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"depdelay\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"carrierdelay\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"weatherdelay\",\"type\":[\"null\",\"int\"],\"default\":null}]}', 'serialization.format': '1'}}",
"BucketColumns": "[]",
"SortColumns": "[]",
"StoredAsSubDirectories": "False"
},
"name": "avro",
"qualifiedName": "arn:aws:glue:us-west-2:123412341234:table/flights-database-lineage/avro",
"tags": []
}
},
{
"com.linkedin.pegasus2avro.schema.SchemaMetadata": {
"schemaName": "flights-database-lineage.avro",
"platform": "urn:li:dataPlatform:glue",
"version": 0,
"created": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"hash": "",
"platformSchema": {
"com.linkedin.pegasus2avro.schema.MySqlDDL": {
"tableSchema": ""
}
},
"fields": [
{
"fieldPath": "[version=2.0].[type=int].yr",
"nullable": true,
"description": "test comment",
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.NumberType": {}
}
},
"nativeDataType": "int",
"recursive": false,
"isPartOfKey": false,
"jsonProps": "{\"native_data_type\": \"int\", \"_nullable\": true}"
},
{
"fieldPath": "[version=2.0].[type=string].flightdate",
"nullable": true,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "string",
"recursive": false,
"isPartOfKey": false,
"jsonProps": "{\"native_data_type\": \"string\", \"_nullable\": true}"
},
{
"fieldPath": "[version=2.0].[type=string].uniquecarrier",
"nullable": true,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "string",
"recursive": false,
"isPartOfKey": false,
"jsonProps": "{\"native_data_type\": \"string\", \"_nullable\": true}"
},
{
"fieldPath": "[version=2.0].[type=int].airlineid",
"nullable": true,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.NumberType": {}
}
},
"nativeDataType": "int",
"recursive": false,
"isPartOfKey": false,
"jsonProps": "{\"native_data_type\": \"int\", \"_nullable\": true}"
},
{
"fieldPath": "[version=2.0].[type=string].carrier",
"nullable": true,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "string",
"recursive": false,
"isPartOfKey": false,
"jsonProps": "{\"native_data_type\": \"string\", \"_nullable\": true}"
},
{
"fieldPath": "[version=2.0].[type=string].flightnum",
"nullable": true,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "string",
"recursive": false,
"isPartOfKey": false,
"jsonProps": "{\"native_data_type\": \"string\", \"_nullable\": true}"
},
{
"fieldPath": "[version=2.0].[type=string].origin",
"nullable": true,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "string",
"recursive": false,
"isPartOfKey": false,
"jsonProps": "{\"native_data_type\": \"string\", \"_nullable\": true}"
},
{
"fieldPath": "[version=2.0].[type=string].year",
"nullable": true,
"description": "partition test comment",
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "string",
"recursive": false,
"isPartOfKey": false,
"jsonProps": "{\"native_data_type\": \"string\", \"_nullable\": true}"
}
]
}
},
{
"com.linkedin.pegasus2avro.common.DataPlatformInstance": {
"platform": "urn:li:dataPlatform:glue"
}
},
{
"com.linkedin.pegasus2avro.common.Ownership": {
"owners": [
{
"owner": "urn:li:corpuser:owner",
"type": "DATAOWNER"
}
],
"ownerTypes": {},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
}
}
}
]
}
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:glue,flights-database-lineage.avro,PROD)",
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
"json": {
"typeNames": [
"Table"
]
}
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:glue,flights-database-lineage.avro,PROD)",
"changeType": "UPSERT",
"aspectName": "container",
"aspect": {
"json": {
"container": "urn:li:container:89f32a7a37e2f61693aa4b720ace2a3c"
}
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:glue,flights-database-lineage.avro,PROD)",
"changeType": "UPSERT",
"aspectName": "upstreamLineage",
"aspect": {
"json": {
"upstreams": [
{
"auditStamp": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:s3,crawler-public-us-west-2/flight/avro,PROD)",
"type": "COPY"
}
],
"fineGrainedLineages": [
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:s3,crawler-public-us-west-2/flight/avro,PROD),yr)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:glue,flights-database-lineage.avro,PROD),yr)"
],
"confidenceScore": 1.0
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:s3,crawler-public-us-west-2/flight/avro,PROD),flightdate)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:glue,flights-database-lineage.avro,PROD),flightdate)"
],
"confidenceScore": 1.0
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:s3,crawler-public-us-west-2/flight/avro,PROD),uniquecarrier)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:glue,flights-database-lineage.avro,PROD),uniquecarrier)"
],
"confidenceScore": 1.0
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:s3,crawler-public-us-west-2/flight/avro,PROD),airlineid)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:glue,flights-database-lineage.avro,PROD),airlineid)"
],
"confidenceScore": 1.0
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:s3,crawler-public-us-west-2/flight/avro,PROD),carrier)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:glue,flights-database-lineage.avro,PROD),carrier)"
],
"confidenceScore": 1.0
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:s3,crawler-public-us-west-2/flight/avro,PROD),flightnum)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:glue,flights-database-lineage.avro,PROD),flightnum)"
],
"confidenceScore": 1.0
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:s3,crawler-public-us-west-2/flight/avro,PROD),origin)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:glue,flights-database-lineage.avro,PROD),origin)"
],
"confidenceScore": 1.0
}
]
}
}
}
]

File diff suppressed because it is too large Load Diff

View File

@ -1,6 +1,6 @@
import json
from pathlib import Path
from typing import Any, Dict, Optional, Tuple, Type, cast
from typing import Any, Callable, Dict, Optional, Tuple, Type, cast
from unittest.mock import patch
import pydantic
@ -8,8 +8,10 @@ import pytest
from botocore.stub import Stubber
from freezegun import freeze_time
import datahub.metadata.schema_classes as models
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.extractor.schema_util import avro_schema_to_mce_fields
from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph
from datahub.ingestion.sink.file import write_metadata_file
from datahub.ingestion.source.aws.glue import GlueSource, GlueSourceConfig
from datahub.ingestion.source.state.sql_common_state import (
@ -35,6 +37,7 @@ from tests.unit.test_glue_source_stubs import (
get_bucket_tagging,
get_databases_delta_response,
get_databases_response,
get_databases_response_for_lineage,
get_databases_response_with_resource_link,
get_dataflow_graph_response_1,
get_dataflow_graph_response_2,
@ -47,6 +50,7 @@ from tests.unit.test_glue_source_stubs import (
get_object_response_1,
get_object_response_2,
get_object_tagging,
get_tables_lineage_response_1,
get_tables_response_1,
get_tables_response_2,
get_tables_response_for_target_database,
@ -63,19 +67,28 @@ GMS_SERVER = f"http://localhost:{GMS_PORT}"
def glue_source(
platform_instance: Optional[str] = None,
mock_datahub_graph: Optional[Callable[[DatahubClientConfig], DataHubGraph]] = None,
use_s3_bucket_tags: bool = True,
use_s3_object_tags: bool = True,
extract_delta_schema_from_parameters: bool = False,
emit_s3_lineage: bool = False,
include_column_lineage: bool = False,
extract_transforms: bool = True,
) -> GlueSource:
pipeline_context = PipelineContext(run_id="glue-source-tes")
if mock_datahub_graph:
pipeline_context.graph = mock_datahub_graph(DatahubClientConfig())
return GlueSource(
ctx=PipelineContext(run_id="glue-source-test"),
ctx=pipeline_context,
config=GlueSourceConfig(
aws_region="us-west-2",
extract_transforms=True,
extract_transforms=extract_transforms,
platform_instance=platform_instance,
use_s3_bucket_tags=use_s3_bucket_tags,
use_s3_object_tags=use_s3_object_tags,
extract_delta_schema_from_parameters=extract_delta_schema_from_parameters,
emit_s3_lineage=emit_s3_lineage,
include_column_lineage=include_column_lineage,
),
)
@ -425,3 +438,206 @@ def test_glue_with_malformed_delta_schema_ingest(
output_path=tmp_path / "glue_malformed_delta_mces.json",
golden_path=test_resources_dir / "glue_malformed_delta_mces_golden.json",
)
@pytest.mark.parametrize(
"platform_instance, mce_file, mce_golden_file",
[
(None, "glue_mces.json", "glue_mces_golden_table_lineage.json"),
],
)
@freeze_time(FROZEN_TIME)
def test_glue_ingest_include_table_lineage(
tmp_path: Path,
pytestconfig: PytestConfig,
mock_datahub_graph: Callable[[DatahubClientConfig], DataHubGraph],
platform_instance: str,
mce_file: str,
mce_golden_file: str,
) -> None:
glue_source_instance = glue_source(
platform_instance=platform_instance,
mock_datahub_graph=mock_datahub_graph,
emit_s3_lineage=True,
)
with Stubber(glue_source_instance.glue_client) as glue_stubber:
glue_stubber.add_response("get_databases", get_databases_response, {})
glue_stubber.add_response(
"get_tables",
get_tables_response_1,
{"DatabaseName": "flights-database"},
)
glue_stubber.add_response(
"get_tables",
get_tables_response_2,
{"DatabaseName": "test-database"},
)
glue_stubber.add_response(
"get_tables",
{"TableList": []},
{"DatabaseName": "empty-database"},
)
glue_stubber.add_response("get_jobs", get_jobs_response, {})
glue_stubber.add_response(
"get_dataflow_graph",
get_dataflow_graph_response_1,
{"PythonScript": get_object_body_1},
)
glue_stubber.add_response(
"get_dataflow_graph",
get_dataflow_graph_response_2,
{"PythonScript": get_object_body_2},
)
with Stubber(glue_source_instance.s3_client) as s3_stubber:
for _ in range(
len(get_tables_response_1["TableList"])
+ len(get_tables_response_2["TableList"])
):
s3_stubber.add_response(
"get_bucket_tagging",
get_bucket_tagging(),
)
s3_stubber.add_response(
"get_object_tagging",
get_object_tagging(),
)
s3_stubber.add_response(
"get_object",
get_object_response_1(),
{
"Bucket": "aws-glue-assets-123412341234-us-west-2",
"Key": "scripts/job-1.py",
},
)
s3_stubber.add_response(
"get_object",
get_object_response_2(),
{
"Bucket": "aws-glue-assets-123412341234-us-west-2",
"Key": "scripts/job-2.py",
},
)
mce_objects = [wu.metadata for wu in glue_source_instance.get_workunits()]
glue_stubber.assert_no_pending_responses()
s3_stubber.assert_no_pending_responses()
write_metadata_file(tmp_path / mce_file, mce_objects)
# Verify the output.
test_resources_dir = pytestconfig.rootpath / "tests/unit/glue"
mce_helpers.check_golden_file(
pytestconfig,
output_path=tmp_path / mce_file,
golden_path=test_resources_dir / mce_golden_file,
)
@pytest.mark.parametrize(
"platform_instance, mce_file, mce_golden_file",
[
(None, "glue_mces.json", "glue_mces_golden_table_column_lineage.json"),
],
)
@freeze_time(FROZEN_TIME)
def test_glue_ingest_include_column_lineage(
tmp_path: Path,
pytestconfig: PytestConfig,
mock_datahub_graph: Callable[[DatahubClientConfig], DataHubGraph],
platform_instance: str,
mce_file: str,
mce_golden_file: str,
) -> None:
glue_source_instance = glue_source(
platform_instance=platform_instance,
mock_datahub_graph=mock_datahub_graph,
emit_s3_lineage=True,
include_column_lineage=True,
use_s3_bucket_tags=False,
use_s3_object_tags=False,
extract_transforms=False,
)
# fake the server response
def fake_schema_metadata(entity_urn: str) -> models.SchemaMetadataClass:
return models.SchemaMetadataClass(
schemaName="crawler-public-us-west-2/flight/avro",
platform="urn:li:dataPlatform:s3", # important <- platform must be an urn
version=0,
hash="",
platformSchema=models.OtherSchemaClass(
rawSchema="__insert raw schema here__"
),
fields=[
models.SchemaFieldClass(
fieldPath="yr",
type=models.SchemaFieldDataTypeClass(type=models.NumberTypeClass()),
nativeDataType="int",
# use this to provide the type of the field in the source system's vernacular
),
models.SchemaFieldClass(
fieldPath="flightdate",
type=models.SchemaFieldDataTypeClass(type=models.StringTypeClass()),
nativeDataType="VARCHAR(100)",
# use this to provide the type of the field in the source system's vernacular
),
models.SchemaFieldClass(
fieldPath="uniquecarrier",
type=models.SchemaFieldDataTypeClass(type=models.StringTypeClass()),
nativeDataType="VARCHAR(100)",
# use this to provide the type of the field in the source system's vernacular
),
models.SchemaFieldClass(
fieldPath="airlineid",
type=models.SchemaFieldDataTypeClass(type=models.NumberTypeClass()),
nativeDataType="int",
# use this to provide the type of the field in the source system's vernacular
),
models.SchemaFieldClass(
fieldPath="carrier",
type=models.SchemaFieldDataTypeClass(type=models.StringTypeClass()),
nativeDataType="VARCHAR(100)",
# use this to provide the type of the field in the source system's vernacular
),
models.SchemaFieldClass(
fieldPath="flightnum",
type=models.SchemaFieldDataTypeClass(type=models.StringTypeClass()),
nativeDataType="VARCHAR(100)",
# use this to provide the type of the field in the source system's vernacular
),
models.SchemaFieldClass(
fieldPath="origin",
type=models.SchemaFieldDataTypeClass(type=models.StringTypeClass()),
nativeDataType="VARCHAR(100)",
# use this to provide the type of the field in the source system's vernacular
),
],
)
glue_source_instance.ctx.graph.get_schema_metadata = fake_schema_metadata # type: ignore
with Stubber(glue_source_instance.glue_client) as glue_stubber:
glue_stubber.add_response(
"get_databases", get_databases_response_for_lineage, {}
)
glue_stubber.add_response(
"get_tables",
get_tables_lineage_response_1,
{"DatabaseName": "flights-database-lineage"},
)
mce_objects = [wu.metadata for wu in glue_source_instance.get_workunits()]
glue_stubber.assert_no_pending_responses()
write_metadata_file(tmp_path / mce_file, mce_objects)
# Verify the output.
test_resources_dir = pytestconfig.rootpath / "tests/unit/glue"
mce_helpers.check_golden_file(
pytestconfig,
output_path=tmp_path / mce_file,
golden_path=test_resources_dir / mce_golden_file,
)

View File

@ -880,6 +880,98 @@ delta_tables_2 = [
]
get_delta_tables_response_2 = {"TableList": delta_tables_2}
get_databases_response_for_lineage = {
"DatabaseList": [
{
"Name": "flights-database-lineage",
"CreateTime": datetime.datetime(2021, 6, 9, 14, 14, 19),
"CreateTableDefaultPermissions": [
{
"Principal": {
"DataLakePrincipalIdentifier": "IAM_ALLOWED_PRINCIPALS"
},
"Permissions": ["ALL"],
}
],
"CatalogId": "123412341234",
"LocationUri": "s3://test-bucket/test-prefix",
"Parameters": {"param1": "value1", "param2": "value2"},
},
]
}
tables_lineage_1 = [
{
"Name": "avro",
"DatabaseName": "flights-database-lineage",
"Owner": "owner",
"CreateTime": datetime.datetime(2021, 6, 9, 14, 17, 35),
"UpdateTime": datetime.datetime(2021, 6, 9, 14, 17, 35),
"LastAccessTime": datetime.datetime(2021, 6, 9, 14, 17, 35),
"Retention": 0,
"StorageDescriptor": {
"Columns": [
{"Name": "yr", "Type": "int", "Comment": "test comment"},
{"Name": "flightdate", "Type": "string"},
{"Name": "uniquecarrier", "Type": "string"},
{"Name": "airlineid", "Type": "int"},
{"Name": "carrier", "Type": "string"},
{"Name": "flightnum", "Type": "string"},
{"Name": "origin", "Type": "string"},
],
"Location": "s3://crawler-public-us-west-2/flight/avro/",
"InputFormat": "org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat",
"OutputFormat": "org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat",
"Compressed": False,
"NumberOfBuckets": -1,
"SerdeInfo": {
"SerializationLibrary": "org.apache.hadoop.hive.serde2.avro.AvroSerDe",
"Parameters": {
"avro.schema.literal": '{"type":"record","name":"flights_avro_subset","namespace":"default","fields":[{"name":"yr","type":["null","int"],"default":null},{"name":"flightdate","type":["null","string"],"default":null},{"name":"uniquecarrier","type":["null","string"],"default":null},{"name":"airlineid","type":["null","int"],"default":null},{"name":"carrier","type":["null","string"],"default":null},{"name":"flightnum","type":["null","string"],"default":null},{"name":"origin","type":["null","string"],"default":null},{"name":"dest","type":["null","string"],"default":null},{"name":"depdelay","type":["null","int"],"default":null},{"name":"carrierdelay","type":["null","int"],"default":null},{"name":"weatherdelay","type":["null","int"],"default":null}]}',
"serialization.format": "1",
},
},
"BucketColumns": [],
"SortColumns": [],
"Parameters": {
"CrawlerSchemaDeserializerVersion": "1.0",
"CrawlerSchemaSerializerVersion": "1.0",
"UPDATED_BY_CRAWLER": "flights-crawler",
"averageRecordSize": "55",
"avro.schema.literal": '{"type":"record","name":"flights_avro_subset","namespace":"default","fields":[{"name":"yr","type":["null","int"],"default":null},{"name":"flightdate","type":["null","string"],"default":null},{"name":"uniquecarrier","type":["null","string"],"default":null},{"name":"airlineid","type":["null","int"],"default":null},{"name":"carrier","type":["null","string"],"default":null},{"name":"flightnum","type":["null","string"],"default":null},{"name":"origin","type":["null","string"],"default":null},{"name":"dest","type":["null","string"],"default":null},{"name":"depdelay","type":["null","int"],"default":null},{"name":"carrierdelay","type":["null","int"],"default":null},{"name":"weatherdelay","type":["null","int"],"default":null}]}',
"classification": "avro",
"compressionType": "none",
"objectCount": "30",
"recordCount": "169222196",
"sizeKey": "9503351413",
"typeOfData": "file",
},
"StoredAsSubDirectories": False,
},
"PartitionKeys": [
{"Name": "year", "Type": "string", "Comment": "partition test comment"}
],
"TableType": "EXTERNAL_TABLE",
"Parameters": {
"CrawlerSchemaDeserializerVersion": "1.0",
"CrawlerSchemaSerializerVersion": "1.0",
"UPDATED_BY_CRAWLER": "flights-crawler",
"averageRecordSize": "55",
"avro.schema.literal": '{"type":"record","name":"flights_avro_subset","namespace":"default","fields":[{"name":"yr","type":["null","int"],"default":null},{"name":"flightdate","type":["null","string"],"default":null},{"name":"uniquecarrier","type":["null","string"],"default":null},{"name":"airlineid","type":["null","int"],"default":null},{"name":"carrier","type":["null","string"],"default":null},{"name":"flightnum","type":["null","string"],"default":null},{"name":"origin","type":["null","string"],"default":null},{"name":"dest","type":["null","string"],"default":null},{"name":"depdelay","type":["null","int"],"default":null},{"name":"carrierdelay","type":["null","int"],"default":null},{"name":"weatherdelay","type":["null","int"],"default":null}]}',
"classification": "avro",
"compressionType": "none",
"objectCount": "30",
"recordCount": "169222196",
"sizeKey": "9503351413",
"typeOfData": "file",
},
"CreatedBy": "arn:aws:sts::123412341234:assumed-role/AWSGlueServiceRole-flights-crawler/AWS-Crawler",
"IsRegisteredWithLakeFormation": False,
"CatalogId": "123412341234",
}
]
get_tables_lineage_response_1 = {"TableList": tables_lineage_1}
def mock_get_object_response(raw_body: str) -> Dict[str, Any]:
"""