diff --git a/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py b/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py index 32b0888a98..a7171cfe42 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py +++ b/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py @@ -14,6 +14,7 @@ from datahub.configuration.common import AllowDenyPattern, ConfigurationError from datahub.configuration.source_common import PlatformSourceConfigBase from datahub.emitter import mce_builder from datahub.emitter.mce_builder import ( + get_sys_time, make_data_platform_urn, make_dataplatform_instance_urn, make_dataset_urn_with_platform_instance, @@ -41,6 +42,7 @@ from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.source.aws import s3_util from datahub.ingestion.source.aws.aws_common import AwsSourceConfig from datahub.ingestion.source.aws.s3_util import make_s3_urn +from datahub.ingestion.source.glue_profiling_config import GlueProfilingConfig from datahub.metadata.com.linkedin.pegasus2avro.common import Status from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import DatasetSnapshot from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent @@ -57,13 +59,17 @@ from datahub.metadata.schema_classes import ( DataJobInputOutputClass, DataJobSnapshotClass, DataPlatformInstanceClass, + DatasetFieldProfileClass, DatasetLineageTypeClass, + DatasetProfileClass, DatasetPropertiesClass, GlobalTagsClass, MetadataChangeEventClass, OwnerClass, OwnershipClass, OwnershipTypeClass, + PartitionSpecClass, + PartitionTypeClass, TagAssociationClass, UpstreamClass, UpstreamLineageClass, @@ -77,7 +83,7 @@ DEFAULT_PLATFORM = "glue" VALID_PLATFORMS = [DEFAULT_PLATFORM, "athena"] -class GlueSourceConfig(AwsSourceConfig, PlatformSourceConfigBase): +class GlueSourceConfig(AwsSourceConfig, PlatformSourceConfigBase, GlueProfilingConfig): extract_owners: Optional[bool] = Field( default=True, @@ -109,7 +115,6 @@ class GlueSourceConfig(AwsSourceConfig, PlatformSourceConfigBase): default=None, description="The aws account id where the target glue catalog lives. If None, datahub will ingest glue in aws caller's account.", ) - use_s3_bucket_tags: Optional[bool] = Field( default=False, description="If an S3 Buckets Tags should be created for the Tables ingested by Glue. Please Note that this will not apply tags to any folders ingested, only the files.", @@ -118,6 +123,10 @@ class GlueSourceConfig(AwsSourceConfig, PlatformSourceConfigBase): default=False, description="If an S3 Objects Tags should be created for the Tables ingested by Glue.", ) + profiling: GlueProfilingConfig = Field( + default=None, + description="Configs to ingest data profiles from glue table", + ) @property def glue_client(self): @@ -692,6 +701,147 @@ class GlueSource(Source): return mcp return None + def _create_profile_mcp( + self, + mce: MetadataChangeEventClass, + table_stats: dict, + column_stats: dict, + partition_spec: Optional[str] = None, + ) -> MetadataChangeProposalWrapper: + # instantiate profile class + dataset_profile = DatasetProfileClass(timestampMillis=get_sys_time()) + + # Inject table level stats + if self.source_config.profiling.row_count in table_stats: + dataset_profile.rowCount = int( + float(table_stats[self.source_config.profiling.row_count]) + ) + if self.source_config.profiling.row_count in table_stats: + dataset_profile.columnCount = int( + float(table_stats[self.source_config.profiling.column_count]) + ) + + # inject column level stats + dataset_profile.fieldProfiles = [] + for profile in column_stats: + column_name = profile["Name"] + + # some columns may not be profiled + if "Parameters" in profile: + column_params = profile["Parameters"] + else: + continue + + logger.debug(f"column_name: {column_name}") + # instantiate column profile class for each column + column_profile = DatasetFieldProfileClass(fieldPath=column_name) + + if self.source_config.profiling.unique_count in column_params: + column_profile.uniqueCount = int( + float(column_params[self.source_config.profiling.unique_count]) + ) + if self.source_config.profiling.unique_proportion in column_params: + column_profile.uniqueProportion = float( + column_params[self.source_config.profiling.unique_proportion] + ) + if self.source_config.profiling.null_count in column_params: + column_profile.nullCount = int( + float(column_params[self.source_config.profiling.null_count]) + ) + if self.source_config.profiling.null_proportion in column_params: + column_profile.nullProportion = float( + column_params[self.source_config.profiling.null_proportion] + ) + if self.source_config.profiling.min in column_params: + column_profile.min = column_params[self.source_config.profiling.min] + if self.source_config.profiling.max in column_params: + column_profile.max = column_params[self.source_config.profiling.max] + if self.source_config.profiling.mean in column_params: + column_profile.mean = column_params[self.source_config.profiling.mean] + if self.source_config.profiling.median in column_params: + column_profile.median = column_params[ + self.source_config.profiling.median + ] + if self.source_config.profiling.stdev in column_params: + column_profile.stdev = column_params[self.source_config.profiling.stdev] + + dataset_profile.fieldProfiles.append(column_profile) + + if partition_spec: + # inject partition level stats + dataset_profile.partitionSpec = PartitionSpecClass( + partition=partition_spec, + type=PartitionTypeClass.PARTITION, + ) + + mcp = MetadataChangeProposalWrapper( + entityType="dataset", + entityUrn=mce.proposedSnapshot.urn, + changeType=ChangeTypeClass.UPSERT, + aspectName="datasetProfile", + aspect=dataset_profile, + ) + return mcp + + def get_profile_if_enabled( + self, mce: MetadataChangeEventClass, database_name: str, table_name: str + ) -> List[MetadataChangeProposalWrapper]: + if self.source_config.profiling: + # for cross-account ingestion + kwargs = dict( + DatabaseName=database_name, + Name=table_name, + CatalogId=self.source_config.catalog_id, + ) + response = self.glue_client.get_table( + **{k: v for k, v in kwargs.items() if v} + ) + + partition_keys = response["Table"]["PartitionKeys"] + + # check if this table is partitioned + if partition_keys: + # ingest data profile with partitions + # for cross-account ingestion + kwargs = dict( + DatabaseName=database_name, + TableName=table_name, + CatalogId=self.source_config.catalog_id, + ) + response = self.glue_client.get_partitions( + **{k: v for k, v in kwargs.items() if v} + ) + + partitions = response["Partitions"] + partition_keys = [k["Name"] for k in partition_keys] + + mcps = [] + for p in partitions: + table_stats = p["Parameters"] + column_stats = p["StorageDescriptor"]["Columns"] + + # only support single partition key + partition_spec = str({partition_keys[0]: p["Values"][0]}) + + if self.source_config.profiling.partition_patterns.allowed( + partition_spec + ): + mcps.append( + self._create_profile_mcp( + mce, table_stats, column_stats, partition_spec + ) + ) + else: + continue + return mcps + else: + # ingest data profile without partition + table_stats = response["Table"]["Parameters"] + column_stats = response["Table"]["StorageDescriptor"]["Columns"] + return [self._create_profile_mcp(mce, table_stats, column_stats)] + + return [] + def gen_database_key(self, database: str) -> DatabaseKey: return DatabaseKey( database=database, @@ -795,6 +945,16 @@ class GlueSource(Source): self.report.report_workunit(mcp_wu) yield mcp_wu + mcps_profiling = self.get_profile_if_enabled(mce, database_name, table_name) + if mcps_profiling: + for mcp_index, mcp in enumerate(mcps_profiling): + mcp_wu = MetadataWorkUnit( + id=f"profile-{full_table_name}-partition-{mcp_index}", + mcp=mcps_profiling[mcp_index], + ) + self.report.report_workunit(mcp_wu) + yield mcp_wu + if self.extract_transforms: yield from self._transform_extraction() diff --git a/metadata-ingestion/src/datahub/ingestion/source/glue_profiling_config.py b/metadata-ingestion/src/datahub/ingestion/source/glue_profiling_config.py new file mode 100644 index 0000000000..e1d0a7dd5e --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/glue_profiling_config.py @@ -0,0 +1,54 @@ +from pydantic.fields import Field + +from datahub.configuration.common import AllowDenyPattern, ConfigModel + + +class GlueProfilingConfig(ConfigModel): + row_count: str = Field( + default=None, + description="The parameter name for row count in glue table.", + ) + column_count: str = Field( + default=None, + description="The parameter name for column count in glue table.", + ) + unique_count: str = Field( + default=None, + description="The parameter name for the count of unique value in a column.", + ) + unique_proportion: str = Field( + default=None, + description="The parameter name for the proportion of unique values in a column.", + ) + null_count: int = Field( + default=None, + description="The parameter name for the count of null values in a column.", + ) + null_proportion: str = Field( + default=None, + description="The parameter name for the proportion of null values in a column.", + ) + min: str = Field( + default=None, + description="The parameter name for the min value of a column.", + ) + max: str = Field( + default=None, + description="The parameter name for the max value of a column.", + ) + mean: str = Field( + default=None, + description="The parameter name for the mean value of a column.", + ) + median: str = Field( + default=None, + description="The parameter name for the median value of a column.", + ) + stdev: str = Field( + default=None, + description="The parameter name for the standard deviation of a column.", + ) + partition_patterns: AllowDenyPattern = Field( + default=AllowDenyPattern.allow_all(), + description="""Regex patterns for filtering partitions for profile. The pattern should be a string like: "{'key':'value'}".""", + )