feat(ingest): glue - enable profiling (#4879)

This commit is contained in:
BZ 2022-05-30 07:46:35 -04:00 committed by GitHub
parent f40c250e9f
commit 5b55f25b69
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 216 additions and 2 deletions

View File

@ -14,6 +14,7 @@ from datahub.configuration.common import AllowDenyPattern, ConfigurationError
from datahub.configuration.source_common import PlatformSourceConfigBase
from datahub.emitter import mce_builder
from datahub.emitter.mce_builder import (
get_sys_time,
make_data_platform_urn,
make_dataplatform_instance_urn,
make_dataset_urn_with_platform_instance,
@ -41,6 +42,7 @@ from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.aws import s3_util
from datahub.ingestion.source.aws.aws_common import AwsSourceConfig
from datahub.ingestion.source.aws.s3_util import make_s3_urn
from datahub.ingestion.source.glue_profiling_config import GlueProfilingConfig
from datahub.metadata.com.linkedin.pegasus2avro.common import Status
from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import DatasetSnapshot
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
@ -57,13 +59,17 @@ from datahub.metadata.schema_classes import (
DataJobInputOutputClass,
DataJobSnapshotClass,
DataPlatformInstanceClass,
DatasetFieldProfileClass,
DatasetLineageTypeClass,
DatasetProfileClass,
DatasetPropertiesClass,
GlobalTagsClass,
MetadataChangeEventClass,
OwnerClass,
OwnershipClass,
OwnershipTypeClass,
PartitionSpecClass,
PartitionTypeClass,
TagAssociationClass,
UpstreamClass,
UpstreamLineageClass,
@ -77,7 +83,7 @@ DEFAULT_PLATFORM = "glue"
VALID_PLATFORMS = [DEFAULT_PLATFORM, "athena"]
class GlueSourceConfig(AwsSourceConfig, PlatformSourceConfigBase):
class GlueSourceConfig(AwsSourceConfig, PlatformSourceConfigBase, GlueProfilingConfig):
extract_owners: Optional[bool] = Field(
default=True,
@ -109,7 +115,6 @@ class GlueSourceConfig(AwsSourceConfig, PlatformSourceConfigBase):
default=None,
description="The aws account id where the target glue catalog lives. If None, datahub will ingest glue in aws caller's account.",
)
use_s3_bucket_tags: Optional[bool] = Field(
default=False,
description="If an S3 Buckets Tags should be created for the Tables ingested by Glue. Please Note that this will not apply tags to any folders ingested, only the files.",
@ -118,6 +123,10 @@ class GlueSourceConfig(AwsSourceConfig, PlatformSourceConfigBase):
default=False,
description="If an S3 Objects Tags should be created for the Tables ingested by Glue.",
)
profiling: GlueProfilingConfig = Field(
default=None,
description="Configs to ingest data profiles from glue table",
)
@property
def glue_client(self):
@ -692,6 +701,147 @@ class GlueSource(Source):
return mcp
return None
def _create_profile_mcp(
self,
mce: MetadataChangeEventClass,
table_stats: dict,
column_stats: dict,
partition_spec: Optional[str] = None,
) -> MetadataChangeProposalWrapper:
# instantiate profile class
dataset_profile = DatasetProfileClass(timestampMillis=get_sys_time())
# Inject table level stats
if self.source_config.profiling.row_count in table_stats:
dataset_profile.rowCount = int(
float(table_stats[self.source_config.profiling.row_count])
)
if self.source_config.profiling.row_count in table_stats:
dataset_profile.columnCount = int(
float(table_stats[self.source_config.profiling.column_count])
)
# inject column level stats
dataset_profile.fieldProfiles = []
for profile in column_stats:
column_name = profile["Name"]
# some columns may not be profiled
if "Parameters" in profile:
column_params = profile["Parameters"]
else:
continue
logger.debug(f"column_name: {column_name}")
# instantiate column profile class for each column
column_profile = DatasetFieldProfileClass(fieldPath=column_name)
if self.source_config.profiling.unique_count in column_params:
column_profile.uniqueCount = int(
float(column_params[self.source_config.profiling.unique_count])
)
if self.source_config.profiling.unique_proportion in column_params:
column_profile.uniqueProportion = float(
column_params[self.source_config.profiling.unique_proportion]
)
if self.source_config.profiling.null_count in column_params:
column_profile.nullCount = int(
float(column_params[self.source_config.profiling.null_count])
)
if self.source_config.profiling.null_proportion in column_params:
column_profile.nullProportion = float(
column_params[self.source_config.profiling.null_proportion]
)
if self.source_config.profiling.min in column_params:
column_profile.min = column_params[self.source_config.profiling.min]
if self.source_config.profiling.max in column_params:
column_profile.max = column_params[self.source_config.profiling.max]
if self.source_config.profiling.mean in column_params:
column_profile.mean = column_params[self.source_config.profiling.mean]
if self.source_config.profiling.median in column_params:
column_profile.median = column_params[
self.source_config.profiling.median
]
if self.source_config.profiling.stdev in column_params:
column_profile.stdev = column_params[self.source_config.profiling.stdev]
dataset_profile.fieldProfiles.append(column_profile)
if partition_spec:
# inject partition level stats
dataset_profile.partitionSpec = PartitionSpecClass(
partition=partition_spec,
type=PartitionTypeClass.PARTITION,
)
mcp = MetadataChangeProposalWrapper(
entityType="dataset",
entityUrn=mce.proposedSnapshot.urn,
changeType=ChangeTypeClass.UPSERT,
aspectName="datasetProfile",
aspect=dataset_profile,
)
return mcp
def get_profile_if_enabled(
self, mce: MetadataChangeEventClass, database_name: str, table_name: str
) -> List[MetadataChangeProposalWrapper]:
if self.source_config.profiling:
# for cross-account ingestion
kwargs = dict(
DatabaseName=database_name,
Name=table_name,
CatalogId=self.source_config.catalog_id,
)
response = self.glue_client.get_table(
**{k: v for k, v in kwargs.items() if v}
)
partition_keys = response["Table"]["PartitionKeys"]
# check if this table is partitioned
if partition_keys:
# ingest data profile with partitions
# for cross-account ingestion
kwargs = dict(
DatabaseName=database_name,
TableName=table_name,
CatalogId=self.source_config.catalog_id,
)
response = self.glue_client.get_partitions(
**{k: v for k, v in kwargs.items() if v}
)
partitions = response["Partitions"]
partition_keys = [k["Name"] for k in partition_keys]
mcps = []
for p in partitions:
table_stats = p["Parameters"]
column_stats = p["StorageDescriptor"]["Columns"]
# only support single partition key
partition_spec = str({partition_keys[0]: p["Values"][0]})
if self.source_config.profiling.partition_patterns.allowed(
partition_spec
):
mcps.append(
self._create_profile_mcp(
mce, table_stats, column_stats, partition_spec
)
)
else:
continue
return mcps
else:
# ingest data profile without partition
table_stats = response["Table"]["Parameters"]
column_stats = response["Table"]["StorageDescriptor"]["Columns"]
return [self._create_profile_mcp(mce, table_stats, column_stats)]
return []
def gen_database_key(self, database: str) -> DatabaseKey:
return DatabaseKey(
database=database,
@ -795,6 +945,16 @@ class GlueSource(Source):
self.report.report_workunit(mcp_wu)
yield mcp_wu
mcps_profiling = self.get_profile_if_enabled(mce, database_name, table_name)
if mcps_profiling:
for mcp_index, mcp in enumerate(mcps_profiling):
mcp_wu = MetadataWorkUnit(
id=f"profile-{full_table_name}-partition-{mcp_index}",
mcp=mcps_profiling[mcp_index],
)
self.report.report_workunit(mcp_wu)
yield mcp_wu
if self.extract_transforms:
yield from self._transform_extraction()

View File

@ -0,0 +1,54 @@
from pydantic.fields import Field
from datahub.configuration.common import AllowDenyPattern, ConfigModel
class GlueProfilingConfig(ConfigModel):
row_count: str = Field(
default=None,
description="The parameter name for row count in glue table.",
)
column_count: str = Field(
default=None,
description="The parameter name for column count in glue table.",
)
unique_count: str = Field(
default=None,
description="The parameter name for the count of unique value in a column.",
)
unique_proportion: str = Field(
default=None,
description="The parameter name for the proportion of unique values in a column.",
)
null_count: int = Field(
default=None,
description="The parameter name for the count of null values in a column.",
)
null_proportion: str = Field(
default=None,
description="The parameter name for the proportion of null values in a column.",
)
min: str = Field(
default=None,
description="The parameter name for the min value of a column.",
)
max: str = Field(
default=None,
description="The parameter name for the max value of a column.",
)
mean: str = Field(
default=None,
description="The parameter name for the mean value of a column.",
)
median: str = Field(
default=None,
description="The parameter name for the median value of a column.",
)
stdev: str = Field(
default=None,
description="The parameter name for the standard deviation of a column.",
)
partition_patterns: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
description="""Regex patterns for filtering partitions for profile. The pattern should be a string like: "{'key':'value'}".""",
)