fix(ingestion/glue): Add support for missing config options for profiling in Glue (#10858)

This commit is contained in:
sagar-salvi-apptware 2024-07-29 16:04:07 +05:30 committed by GitHub
parent d85da39a86
commit a09575fb6f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 1813 additions and 1313 deletions

View File

@ -55,6 +55,19 @@ New (optional fields `systemMetadata` and `headers`):
"headers": {}
}
```
- #10858 Profiling configuration for Glue source has been updated.
Previously, the configuration was:
```yaml
profiling: {}
```
Now, it needs to be:
```yaml
profiling:
enabled: true
```
### Potential Downtime

View File

@ -167,8 +167,8 @@ class GlueSourceConfig(
default=False,
description="If an S3 Objects Tags should be created for the Tables ingested by Glue.",
)
profiling: Optional[GlueProfilingConfig] = Field(
default=None,
profiling: GlueProfilingConfig = Field(
default_factory=GlueProfilingConfig,
description="Configs to ingest data profiles from glue table",
)
# Custom Stateful Ingestion settings
@ -186,7 +186,7 @@ class GlueSourceConfig(
)
def is_profiling_enabled(self) -> bool:
return self.profiling is not None and is_profiling_enabled(
return self.profiling.enabled and is_profiling_enabled(
self.profiling.operation_config
)
@ -867,34 +867,39 @@ class GlueSource(StatefulIngestionSourceBase):
# instantiate column profile class for each column
column_profile = DatasetFieldProfileClass(fieldPath=column_name)
if self.source_config.profiling.unique_count in column_params:
column_profile.uniqueCount = int(
float(column_params[self.source_config.profiling.unique_count])
)
if self.source_config.profiling.unique_proportion in column_params:
column_profile.uniqueProportion = float(
column_params[self.source_config.profiling.unique_proportion]
)
if self.source_config.profiling.null_count in column_params:
column_profile.nullCount = int(
float(column_params[self.source_config.profiling.null_count])
)
if self.source_config.profiling.null_proportion in column_params:
column_profile.nullProportion = float(
column_params[self.source_config.profiling.null_proportion]
)
if self.source_config.profiling.min in column_params:
column_profile.min = column_params[self.source_config.profiling.min]
if self.source_config.profiling.max in column_params:
column_profile.max = column_params[self.source_config.profiling.max]
if self.source_config.profiling.mean in column_params:
column_profile.mean = column_params[self.source_config.profiling.mean]
if self.source_config.profiling.median in column_params:
column_profile.median = column_params[
self.source_config.profiling.median
]
if self.source_config.profiling.stdev in column_params:
column_profile.stdev = column_params[self.source_config.profiling.stdev]
if not self.source_config.profiling.profile_table_level_only:
if self.source_config.profiling.unique_count in column_params:
column_profile.uniqueCount = int(
float(column_params[self.source_config.profiling.unique_count])
)
if self.source_config.profiling.unique_proportion in column_params:
column_profile.uniqueProportion = float(
column_params[self.source_config.profiling.unique_proportion]
)
if self.source_config.profiling.null_count in column_params:
column_profile.nullCount = int(
float(column_params[self.source_config.profiling.null_count])
)
if self.source_config.profiling.null_proportion in column_params:
column_profile.nullProportion = float(
column_params[self.source_config.profiling.null_proportion]
)
if self.source_config.profiling.min in column_params:
column_profile.min = column_params[self.source_config.profiling.min]
if self.source_config.profiling.max in column_params:
column_profile.max = column_params[self.source_config.profiling.max]
if self.source_config.profiling.mean in column_params:
column_profile.mean = column_params[
self.source_config.profiling.mean
]
if self.source_config.profiling.median in column_params:
column_profile.median = column_params[
self.source_config.profiling.median
]
if self.source_config.profiling.stdev in column_params:
column_profile.stdev = column_params[
self.source_config.profiling.stdev
]
dataset_profile.fieldProfiles.append(column_profile)
@ -914,9 +919,7 @@ class GlueSource(StatefulIngestionSourceBase):
def get_profile_if_enabled(
self, mce: MetadataChangeEventClass, database_name: str, table_name: str
) -> Iterable[MetadataWorkUnit]:
# We don't need both checks only the second one
# but then lint believes that GlueProfilingConfig can be None
if self.source_config.profiling and self.source_config.is_profiling_enabled():
if self.source_config.is_profiling_enabled():
# for cross-account ingestion
kwargs = dict(
DatabaseName=database_name,

View File

@ -7,6 +7,14 @@ from datahub.ingestion.source_config.operation_config import OperationConfig
class GlueProfilingConfig(ConfigModel):
enabled: bool = Field(
default=False,
description="Whether profiling should be done.",
)
profile_table_level_only: bool = Field(
default=False,
description="Whether to perform profiling at table-level only, or include column-level profiling as well.",
)
row_count: Optional[str] = Field(
default=None,
description="The parameter name for row count in glue table.",

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,289 @@
[
{
"entityType": "container",
"entityUrn": "urn:li:container:77f8f4c39b47069d3a71191de1333b0e",
"changeType": "UPSERT",
"aspectName": "containerProperties",
"aspect": {
"json": {
"customProperties": {
"platform": "glue",
"env": "PROD",
"database": "flights-database-profiling",
"param1": "value1",
"param2": "value2",
"LocationUri": "s3://test-bucket/test-prefix",
"CreateTime": "June 09, 2021 at 14:14:19"
},
"name": "flights-database-profiling",
"qualifiedName": "arn:aws:glue:us-west-2:123412341234:database/flights-database-profiling"
}
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:77f8f4c39b47069d3a71191de1333b0e",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:77f8f4c39b47069d3a71191de1333b0e",
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"json": {
"platform": "urn:li:dataPlatform:glue"
}
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:77f8f4c39b47069d3a71191de1333b0e",
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
"json": {
"typeNames": [
"Database"
]
}
}
},
{
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:glue,flights-database-profiling.avro-profiling,PROD)",
"aspects": [
{
"com.linkedin.pegasus2avro.common.Status": {
"removed": false
}
},
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
"customProperties": {
"CrawlerSchemaDeserializerVersion": "1.0",
"CrawlerSchemaSerializerVersion": "1.0",
"UPDATED_BY_CRAWLER": "flights-crawler",
"averageRecordSize": "55",
"avro.schema.literal": "{\"type\":\"record\",\"name\":\"flights_avro_subset\",\"namespace\":\"default\",\"fields\":[{\"name\":\"yr\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"flightdate\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"uniquecarrier\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"airlineid\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"carrier\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"flightnum\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"origin\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"dest\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"depdelay\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"carrierdelay\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"weatherdelay\",\"type\":[\"null\",\"int\"],\"default\":null}]}",
"classification": "avro",
"compressionType": "none",
"objectCount": "30",
"recordCount": "169222196",
"sizeKey": "9503351413",
"typeOfData": "file",
"Location": "s3://crawler-public-us-west-2/flight/avro/",
"InputFormat": "org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat",
"OutputFormat": "org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat",
"Compressed": "False",
"NumberOfBuckets": "-1",
"SerdeInfo": "{'SerializationLibrary': 'org.apache.hadoop.hive.serde2.avro.AvroSerDe', 'Parameters': {'avro.schema.literal': '{\"type\":\"record\",\"name\":\"flights_avro_subset\",\"namespace\":\"default\",\"fields\":[{\"name\":\"yr\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"flightdate\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"uniquecarrier\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"airlineid\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"carrier\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"flightnum\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"origin\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"dest\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"depdelay\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"carrierdelay\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"weatherdelay\",\"type\":[\"null\",\"int\"],\"default\":null}]}', 'serialization.format': '1'}}",
"BucketColumns": "[]",
"SortColumns": "[]",
"StoredAsSubDirectories": "False"
},
"name": "avro-profiling",
"qualifiedName": "arn:aws:glue:us-west-2:123412341234:table/flights-database-profiling/avro-profiling",
"tags": []
}
},
{
"com.linkedin.pegasus2avro.schema.SchemaMetadata": {
"schemaName": "flights-database-profiling.avro-profiling",
"platform": "urn:li:dataPlatform:glue",
"version": 0,
"created": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"hash": "",
"platformSchema": {
"com.linkedin.pegasus2avro.schema.MySqlDDL": {
"tableSchema": ""
}
},
"fields": [
{
"fieldPath": "[version=2.0].[type=int].yr",
"nullable": true,
"description": "test comment",
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.NumberType": {}
}
},
"nativeDataType": "int",
"recursive": false,
"isPartOfKey": false,
"jsonProps": "{\"native_data_type\": \"int\", \"_nullable\": true}"
},
{
"fieldPath": "[version=2.0].[type=string].flightdate",
"nullable": true,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "string",
"recursive": false,
"isPartOfKey": false,
"jsonProps": "{\"native_data_type\": \"string\", \"_nullable\": true}"
},
{
"fieldPath": "[version=2.0].[type=string].uniquecarrier",
"nullable": true,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "string",
"recursive": false,
"isPartOfKey": false,
"jsonProps": "{\"native_data_type\": \"string\", \"_nullable\": true}"
},
{
"fieldPath": "[version=2.0].[type=int].airlineid",
"nullable": true,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.NumberType": {}
}
},
"nativeDataType": "int",
"recursive": false,
"isPartOfKey": false,
"jsonProps": "{\"native_data_type\": \"int\", \"_nullable\": true}"
},
{
"fieldPath": "[version=2.0].[type=string].carrier",
"nullable": true,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "string",
"recursive": false,
"isPartOfKey": false,
"jsonProps": "{\"native_data_type\": \"string\", \"_nullable\": true}"
},
{
"fieldPath": "[version=2.0].[type=string].flightnum",
"nullable": true,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "string",
"recursive": false,
"isPartOfKey": false,
"jsonProps": "{\"native_data_type\": \"string\", \"_nullable\": true}"
},
{
"fieldPath": "[version=2.0].[type=string].origin",
"nullable": true,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "string",
"recursive": false,
"isPartOfKey": false,
"jsonProps": "{\"native_data_type\": \"string\", \"_nullable\": true}"
}
]
}
},
{
"com.linkedin.pegasus2avro.common.DataPlatformInstance": {
"platform": "urn:li:dataPlatform:glue"
}
},
{
"com.linkedin.pegasus2avro.common.Ownership": {
"owners": [
{
"owner": "urn:li:corpuser:owner",
"type": "DATAOWNER"
}
],
"ownerTypes": {},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
}
}
}
]
}
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:glue,flights-database-profiling.avro-profiling,PROD)",
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
"json": {
"typeNames": [
"Table"
]
}
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:glue,flights-database-profiling.avro-profiling,PROD)",
"changeType": "UPSERT",
"aspectName": "container",
"aspect": {
"json": {
"container": "urn:li:container:77f8f4c39b47069d3a71191de1333b0e"
}
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:glue,flights-database-profiling.avro-profiling,PROD)",
"changeType": "UPSERT",
"aspectName": "datasetProfile",
"aspect": {
"json": {
"timestampMillis": 1586847600000,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
},
"fieldProfiles": [
{
"fieldPath": "yr",
"uniqueCount": 1,
"uniqueProportion": 2.0,
"nullCount": 0,
"nullProportion": 11.0,
"min": "1",
"max": "10",
"mean": "1",
"median": "2",
"stdev": "3"
}
]
}
}
}
]

View File

@ -13,7 +13,11 @@ from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.extractor.schema_util import avro_schema_to_mce_fields
from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph
from datahub.ingestion.sink.file import write_metadata_file
from datahub.ingestion.source.aws.glue import GlueSource, GlueSourceConfig
from datahub.ingestion.source.aws.glue import (
GlueProfilingConfig,
GlueSource,
GlueSourceConfig,
)
from datahub.ingestion.source.state.sql_common_state import (
BaseSQLAlchemyCheckpointState,
)
@ -38,6 +42,7 @@ from tests.unit.test_glue_source_stubs import (
get_databases_delta_response,
get_databases_response,
get_databases_response_for_lineage,
get_databases_response_profiling,
get_databases_response_with_resource_link,
get_dataflow_graph_response_1,
get_dataflow_graph_response_2,
@ -54,9 +59,11 @@ from tests.unit.test_glue_source_stubs import (
get_tables_response_1,
get_tables_response_2,
get_tables_response_for_target_database,
get_tables_response_profiling_1,
resource_link_database,
tables_1,
tables_2,
tables_profiling_1,
target_database_tables,
)
@ -93,6 +100,42 @@ def glue_source(
)
def glue_source_with_profiling(
platform_instance: Optional[str] = None,
use_s3_bucket_tags: bool = False,
use_s3_object_tags: bool = False,
extract_delta_schema_from_parameters: bool = False,
) -> GlueSource:
profiling_config = GlueProfilingConfig(
enabled=True,
profile_table_level_only=False,
row_count="row_count",
column_count="column_count",
unique_count="unique_count",
unique_proportion="unique_proportion",
null_count="null_count",
null_proportion="null_proportion",
min="min",
max="max",
mean="mean",
median="median",
stdev="stdev",
)
return GlueSource(
ctx=PipelineContext(run_id="glue-source-test"),
config=GlueSourceConfig(
aws_region="us-west-2",
extract_transforms=False,
platform_instance=platform_instance,
use_s3_bucket_tags=use_s3_bucket_tags,
use_s3_object_tags=use_s3_object_tags,
extract_delta_schema_from_parameters=extract_delta_schema_from_parameters,
profiling=profiling_config,
),
)
column_type_test_cases: Dict[str, Tuple[str, Type]] = {
"char": ("char", StringTypeClass),
"array": ("array<int>", ArrayTypeClass),
@ -641,3 +684,41 @@ def test_glue_ingest_include_column_lineage(
output_path=tmp_path / mce_file,
golden_path=test_resources_dir / mce_golden_file,
)
@freeze_time(FROZEN_TIME)
def test_glue_ingest_with_profiling(
tmp_path: Path,
pytestconfig: PytestConfig,
) -> None:
glue_source_instance = glue_source_with_profiling()
mce_file = "glue_mces.json"
mce_golden_file = "glue_mces_golden_profiling.json"
with Stubber(glue_source_instance.glue_client) as glue_stubber:
glue_stubber.add_response("get_databases", get_databases_response_profiling, {})
glue_stubber.add_response(
"get_tables",
get_tables_response_profiling_1,
{"DatabaseName": "flights-database-profiling"},
)
glue_stubber.add_response(
"get_table",
{"Table": tables_profiling_1[0]},
{"DatabaseName": "flights-database-profiling", "Name": "avro-profiling"},
)
mce_objects = [wu.metadata for wu in glue_source_instance.get_workunits()]
glue_stubber.assert_no_pending_responses()
write_metadata_file(tmp_path / mce_file, mce_objects)
# Verify the output.
test_resources_dir = pytestconfig.rootpath / "tests/unit/glue"
mce_helpers.check_golden_file(
pytestconfig,
output_path=tmp_path / mce_file,
golden_path=test_resources_dir / mce_golden_file,
)

View File

@ -973,6 +973,112 @@ tables_lineage_1 = [
get_tables_lineage_response_1 = {"TableList": tables_lineage_1}
get_databases_response_profiling = {
"DatabaseList": [
{
"Name": "flights-database-profiling",
"CreateTime": datetime.datetime(2021, 6, 9, 14, 14, 19),
"CreateTableDefaultPermissions": [
{
"Principal": {
"DataLakePrincipalIdentifier": "IAM_ALLOWED_PRINCIPALS"
},
"Permissions": ["ALL"],
}
],
"CatalogId": "123412341234",
"LocationUri": "s3://test-bucket/test-prefix",
"Parameters": {"param1": "value1", "param2": "value2"},
},
]
}
tables_profiling_1 = [
{
"Name": "avro-profiling",
"DatabaseName": "flights-database-profiling",
"Owner": "owner",
"CreateTime": datetime.datetime(2021, 6, 9, 14, 17, 35),
"UpdateTime": datetime.datetime(2021, 6, 9, 14, 17, 35),
"LastAccessTime": datetime.datetime(2021, 6, 9, 14, 17, 35),
"Retention": 0,
"StorageDescriptor": {
"Columns": [
{
"Name": "yr",
"Type": "int",
"Comment": "test comment",
"Parameters": {
"unique_proportion": "2",
"min": "1",
"median": "2",
"max": "10",
"mean": "1",
"null_proportion": "11",
"unique_count": "1",
"stdev": "3",
"null_count": "0",
},
},
{"Name": "flightdate", "Type": "string"},
{"Name": "uniquecarrier", "Type": "string"},
{"Name": "airlineid", "Type": "int"},
{"Name": "carrier", "Type": "string"},
{"Name": "flightnum", "Type": "string"},
{"Name": "origin", "Type": "string"},
],
"Location": "s3://crawler-public-us-west-2/flight/avro/",
"InputFormat": "org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat",
"OutputFormat": "org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat",
"Compressed": False,
"NumberOfBuckets": -1,
"SerdeInfo": {
"SerializationLibrary": "org.apache.hadoop.hive.serde2.avro.AvroSerDe",
"Parameters": {
"avro.schema.literal": '{"type":"record","name":"flights_avro_subset","namespace":"default","fields":[{"name":"yr","type":["null","int"],"default":null},{"name":"flightdate","type":["null","string"],"default":null},{"name":"uniquecarrier","type":["null","string"],"default":null},{"name":"airlineid","type":["null","int"],"default":null},{"name":"carrier","type":["null","string"],"default":null},{"name":"flightnum","type":["null","string"],"default":null},{"name":"origin","type":["null","string"],"default":null},{"name":"dest","type":["null","string"],"default":null},{"name":"depdelay","type":["null","int"],"default":null},{"name":"carrierdelay","type":["null","int"],"default":null},{"name":"weatherdelay","type":["null","int"],"default":null}]}',
"serialization.format": "1",
},
},
"BucketColumns": [],
"SortColumns": [],
"Parameters": {
"CrawlerSchemaDeserializerVersion": "1.0",
"CrawlerSchemaSerializerVersion": "1.0",
"UPDATED_BY_CRAWLER": "flights-crawler",
"averageRecordSize": "55",
"avro.schema.literal": '{"type":"record","name":"flights_avro_subset","namespace":"default","fields":[{"name":"yr","type":["null","int"],"default":null},{"name":"flightdate","type":["null","string"],"default":null},{"name":"uniquecarrier","type":["null","string"],"default":null},{"name":"airlineid","type":["null","int"],"default":null},{"name":"carrier","type":["null","string"],"default":null},{"name":"flightnum","type":["null","string"],"default":null},{"name":"origin","type":["null","string"],"default":null},{"name":"dest","type":["null","string"],"default":null},{"name":"depdelay","type":["null","int"],"default":null},{"name":"carrierdelay","type":["null","int"],"default":null},{"name":"weatherdelay","type":["null","int"],"default":null}]}',
"classification": "avro",
"compressionType": "none",
"objectCount": "30",
"recordCount": "169222196",
"sizeKey": "9503351413",
"typeOfData": "file",
},
"StoredAsSubDirectories": False,
},
"PartitionKeys": [],
"TableType": "EXTERNAL_TABLE",
"Parameters": {
"CrawlerSchemaDeserializerVersion": "1.0",
"CrawlerSchemaSerializerVersion": "1.0",
"UPDATED_BY_CRAWLER": "flights-crawler",
"averageRecordSize": "55",
"avro.schema.literal": '{"type":"record","name":"flights_avro_subset","namespace":"default","fields":[{"name":"yr","type":["null","int"],"default":null},{"name":"flightdate","type":["null","string"],"default":null},{"name":"uniquecarrier","type":["null","string"],"default":null},{"name":"airlineid","type":["null","int"],"default":null},{"name":"carrier","type":["null","string"],"default":null},{"name":"flightnum","type":["null","string"],"default":null},{"name":"origin","type":["null","string"],"default":null},{"name":"dest","type":["null","string"],"default":null},{"name":"depdelay","type":["null","int"],"default":null},{"name":"carrierdelay","type":["null","int"],"default":null},{"name":"weatherdelay","type":["null","int"],"default":null}]}',
"classification": "avro",
"compressionType": "none",
"objectCount": "30",
"recordCount": "169222196",
"sizeKey": "9503351413",
"typeOfData": "file",
},
"CreatedBy": "arn:aws:sts::123412341234:assumed-role/AWSGlueServiceRole-flights-crawler/AWS-Crawler",
"IsRegisteredWithLakeFormation": False,
"CatalogId": "123412341234",
}
]
get_tables_response_profiling_1 = {"TableList": tables_profiling_1}
def mock_get_object_response(raw_body: str) -> Dict[str, Any]:
"""
Mock s3 client get_object() response object.