From df0b4f258bdd66b5e1af0ba1588704a0111be80f Mon Sep 17 00:00:00 2001 From: Swaroop Jagadish <67564030+swaroopjagadish@users.noreply.github.com> Date: Thu, 30 Sep 2021 23:04:47 -0700 Subject: [PATCH] feat(ingest): support for primary and foreign key extraction from sql sources (#3316) --- .../ingestion/source/sql/sql_common.py | 67 +++++- .../src/datahub/metadata/schema.avsc | 191 ++++++++-------- .../src/datahub/metadata/schema_classes.py | 24 +-- .../metadata/schemas/MetadataChangeEvent.avsc | 189 ++++++++-------- .../integration/hive/hive_mces_golden.json | 18 +- .../tests/integration/hive/hive_setup.sql | 2 +- .../integration/mysql/mysql_mces_golden.json | 131 ++++++++++- .../tests/integration/mysql/setup/setup.sql | 15 ++ .../sql_server/mssql_mces_golden.json | 203 +++++++++++++++++- .../integration/sql_server/setup/setup.sql | 19 ++ 10 files changed, 643 insertions(+), 216 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py index 80e9a920d9..edc2154600 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py @@ -39,6 +39,7 @@ from datahub.metadata.com.linkedin.pegasus2avro.schema import ( BytesTypeClass, DateTypeClass, EnumTypeClass, + ForeignKeyConstraint, MySqlDDL, NullTypeClass, NumberTypeClass, @@ -239,9 +240,15 @@ def get_column_type( def get_schema_metadata( - sql_report: SQLSourceReport, dataset_name: str, platform: str, columns: List[dict] + sql_report: SQLSourceReport, + dataset_name: str, + platform: str, + columns: List[dict], + pk_constraints: dict = None, + foreign_keys: List[ForeignKeyConstraint] = None, ) -> SchemaMetadata: canonical_schema: List[SchemaField] = [] + for column in columns: field = SchemaField( fieldPath=column["name"], @@ -251,6 +258,12 @@ def get_schema_metadata( nullable=column["nullable"], recursive=False, ) + if ( + pk_constraints is not None + and isinstance(pk_constraints, dict) # some dialects (hive) return list + and column["name"] in pk_constraints.get("constrained_columns", []) + ): + field.isPartOfKey = True canonical_schema.append(field) schema_metadata = SchemaMetadata( @@ -261,6 +274,9 @@ def get_schema_metadata( platformSchema=MySqlDDL(tableSchema=""), fields=canonical_schema, ) + if foreign_keys is not None and foreign_keys != []: + schema_metadata.foreignKeys = foreign_keys + return schema_metadata @@ -337,6 +353,27 @@ class SQLAlchemySource(Source): else: return f"{schema}.{entity}" + def get_foreign_key_metadata(self, datasetUrn, fk_dict, inspector): + referred_dataset_name = self.get_identifier( + schema=fk_dict["referred_schema"], + entity=fk_dict["referred_table"], + inspector=inspector, + ) + + source_fields = [ + f"urn:li:schemaField:({datasetUrn}, {f})" + for f in fk_dict["constrained_columns"] + ] + foreign_dataset = f"urn:li:dataset:(urn:li:dataPlatform:{self.platform},{referred_dataset_name},{self.config.env})" + foreign_fields = [ + f"urn:li:schemaField:({foreign_dataset}, {f})" + for f in fk_dict["referred_columns"] + ] + + return ForeignKeyConstraint( + fk_dict["name"], foreign_fields, source_fields, foreign_dataset + ) + def loop_tables( self, inspector: Inspector, @@ -373,21 +410,39 @@ class SQLAlchemySource(Source): # The "properties" field is a non-standard addition to SQLAlchemy's interface. properties = table_info.get("properties", {}) - # TODO: capture inspector.get_pk_constraint - # TODO: capture inspector.get_sorted_table_and_fkc_names - + datasetUrn = f"urn:li:dataset:(urn:li:dataPlatform:{self.platform},{dataset_name},{self.config.env})" dataset_snapshot = DatasetSnapshot( - urn=f"urn:li:dataset:(urn:li:dataPlatform:{self.platform},{dataset_name},{self.config.env})", + urn=datasetUrn, aspects=[], ) + if description is not None or properties: dataset_properties = DatasetPropertiesClass( description=description, customProperties=properties, ) dataset_snapshot.aspects.append(dataset_properties) + + pk_constraints: dict = inspector.get_pk_constraint(table, schema) + try: + foreign_keys = [ + self.get_foreign_key_metadata(datasetUrn, fk_rec, inspector) + for fk_rec in inspector.get_foreign_keys(table, schema) + ] + except KeyError: + # certain databases like MySQL cause issues due to lower-case/upper-case irregularities + logger.debug( + f"{datasetUrn}: failure in foreign key extraction... skipping" + ) + foreign_keys = [] + schema_metadata = get_schema_metadata( - self.report, dataset_name, self.platform, columns + self.report, + dataset_name, + self.platform, + columns, + pk_constraints, + foreign_keys, ) dataset_snapshot.aspects.append(schema_metadata) diff --git a/metadata-ingestion/src/datahub/metadata/schema.avsc b/metadata-ingestion/src/datahub/metadata/schema.avsc index 96988ced0b..55c62f4732 100644 --- a/metadata-ingestion/src/datahub/metadata/schema.avsc +++ b/metadata-ingestion/src/datahub/metadata/schema.avsc @@ -686,6 +686,94 @@ } ], "doc": "Shared aspect containing Browse Paths to be indexed for an entity." + }, + { + "type": "record", + "Aspect": { + "name": "glossaryTerms" + }, + "name": "GlossaryTerms", + "namespace": "com.linkedin.pegasus2avro.common", + "fields": [ + { + "type": { + "type": "array", + "items": { + "type": "record", + "name": "GlossaryTermAssociation", + "namespace": "com.linkedin.pegasus2avro.common", + "fields": [ + { + "Searchable": { + "fieldName": "glossaryTerms", + "fieldType": "URN_PARTIAL" + }, + "java": { + "class": "com.linkedin.pegasus2avro.common.urn.GlossaryTermUrn" + }, + "type": "string", + "name": "urn", + "doc": "Urn of the applied glossary term" + } + ], + "doc": "Properties of an applied glossary term." + } + }, + "name": "terms", + "doc": "The related business terms" + }, + { + "type": "com.linkedin.pegasus2avro.common.AuditStamp", + "name": "auditStamp", + "doc": "Audit stamp containing who reported the related business term" + } + ], + "doc": "Related business terms information" + }, + { + "type": "record", + "Aspect": { + "name": "institutionalMemory" + }, + "name": "InstitutionalMemory", + "namespace": "com.linkedin.pegasus2avro.common", + "fields": [ + { + "type": { + "type": "array", + "items": { + "type": "record", + "name": "InstitutionalMemoryMetadata", + "namespace": "com.linkedin.pegasus2avro.common", + "fields": [ + { + "java": { + "class": "com.linkedin.pegasus2avro.common.url.Url", + "coercerClass": "com.linkedin.pegasus2avro.common.url.UrlCoercer" + }, + "type": "string", + "name": "url", + "doc": "Link to an engineering design document or a wiki page." + }, + { + "type": "string", + "name": "description", + "doc": "Description of the link." + }, + { + "type": "com.linkedin.pegasus2avro.common.AuditStamp", + "name": "createStamp", + "doc": "Audit stamp associated with creation of this record" + } + ], + "doc": "Metadata corresponding to a record of institutional memory." + } + }, + "name": "elements", + "doc": "List of records that represent institutional memory of an entity. Each record consists of a link, description, creator and timestamps associated with that record." + } + ], + "doc": "Institutional memory of an entity. This is a way to link to relevant documentation and provide description of the documentation. Institutional or tribal knowledge is very important for users to leverage the entity." } ] }, @@ -1340,7 +1428,9 @@ "com.linkedin.pegasus2avro.common.Ownership", "com.linkedin.pegasus2avro.common.Status", "com.linkedin.pegasus2avro.common.GlobalTags", - "com.linkedin.pegasus2avro.common.BrowsePaths" + "com.linkedin.pegasus2avro.common.BrowsePaths", + "com.linkedin.pegasus2avro.common.GlossaryTerms", + "com.linkedin.pegasus2avro.common.InstitutionalMemory" ] }, "name": "aspects", @@ -1536,7 +1626,9 @@ "com.linkedin.pegasus2avro.common.Ownership", "com.linkedin.pegasus2avro.common.Status", "com.linkedin.pegasus2avro.common.GlobalTags", - "com.linkedin.pegasus2avro.common.BrowsePaths" + "com.linkedin.pegasus2avro.common.BrowsePaths", + "com.linkedin.pegasus2avro.common.GlossaryTerms", + "com.linkedin.pegasus2avro.common.InstitutionalMemory" ] }, "name": "aspects", @@ -1882,7 +1974,9 @@ "com.linkedin.pegasus2avro.common.Ownership", "com.linkedin.pegasus2avro.common.Status", "com.linkedin.pegasus2avro.common.GlobalTags", - "com.linkedin.pegasus2avro.common.BrowsePaths" + "com.linkedin.pegasus2avro.common.BrowsePaths", + "com.linkedin.pegasus2avro.common.GlossaryTerms", + "com.linkedin.pegasus2avro.common.InstitutionalMemory" ] }, "name": "aspects", @@ -2303,51 +2397,7 @@ ], "doc": "Upstream lineage of a dataset" }, - { - "type": "record", - "Aspect": { - "name": "institutionalMemory" - }, - "name": "InstitutionalMemory", - "namespace": "com.linkedin.pegasus2avro.common", - "fields": [ - { - "type": { - "type": "array", - "items": { - "type": "record", - "name": "InstitutionalMemoryMetadata", - "namespace": "com.linkedin.pegasus2avro.common", - "fields": [ - { - "java": { - "class": "com.linkedin.pegasus2avro.common.url.Url", - "coercerClass": "com.linkedin.pegasus2avro.common.url.UrlCoercer" - }, - "type": "string", - "name": "url", - "doc": "Link to an engineering design document or a wiki page." - }, - { - "type": "string", - "name": "description", - "doc": "Description of the link." - }, - { - "type": "com.linkedin.pegasus2avro.common.AuditStamp", - "name": "createStamp", - "doc": "Audit stamp associated with creation of this record" - } - ], - "doc": "Metadata corresponding to a record of institutional memory." - } - }, - "name": "elements", - "doc": "List of records that represent institutional memory of an entity. Each record consists of a link, description, creator and timestamps associated with that record." - } - ], - "doc": "Institutional memory of an entity. This is a way to link to relevant documentation and provide description of the documentation. Institutional or tribal knowledge is very important for users to leverage the entity." - }, + "com.linkedin.pegasus2avro.common.InstitutionalMemory", "com.linkedin.pegasus2avro.common.Ownership", "com.linkedin.pegasus2avro.common.Status", { @@ -2823,49 +2873,7 @@ }, "type": [ "null", - { - "type": "record", - "Aspect": { - "name": "glossaryTerms" - }, - "name": "GlossaryTerms", - "namespace": "com.linkedin.pegasus2avro.common", - "fields": [ - { - "type": { - "type": "array", - "items": { - "type": "record", - "name": "GlossaryTermAssociation", - "namespace": "com.linkedin.pegasus2avro.common", - "fields": [ - { - "Searchable": { - "fieldName": "glossaryTerms", - "fieldType": "URN_PARTIAL" - }, - "java": { - "class": "com.linkedin.pegasus2avro.common.urn.GlossaryTermUrn" - }, - "type": "string", - "name": "urn", - "doc": "Urn of the applied glossary term" - } - ], - "doc": "Properties of an applied glossary term." - } - }, - "name": "terms", - "doc": "The related business terms" - }, - { - "type": "com.linkedin.pegasus2avro.common.AuditStamp", - "name": "auditStamp", - "doc": "Audit stamp containing who reported the related business term" - } - ], - "doc": "Related business terms information" - } + "com.linkedin.pegasus2avro.common.GlossaryTerms" ], "name": "glossaryTerms", "default": null, @@ -3152,6 +3160,7 @@ "keyAspect": "dataProcessKey", "name": "dataProcess" }, + "deprecated": "Use DataJob instead.", "name": "DataProcessSnapshot", "namespace": "com.linkedin.pegasus2avro.metadata.snapshot", "fields": [ diff --git a/metadata-ingestion/src/datahub/metadata/schema_classes.py b/metadata-ingestion/src/datahub/metadata/schema_classes.py index 2bb62f5434..0578dd2ea4 100644 --- a/metadata-ingestion/src/datahub/metadata/schema_classes.py +++ b/metadata-ingestion/src/datahub/metadata/schema_classes.py @@ -5188,7 +5188,7 @@ class ChartSnapshotClass(DictWrapper): RECORD_SCHEMA = get_schema_type("com.linkedin.pegasus2avro.metadata.snapshot.ChartSnapshot") def __init__(self, urn: str, - aspects: List[Union["ChartKeyClass", "ChartInfoClass", "ChartQueryClass", "EditableChartPropertiesClass", "OwnershipClass", "StatusClass", "GlobalTagsClass", "BrowsePathsClass"]], + aspects: List[Union["ChartKeyClass", "ChartInfoClass", "ChartQueryClass", "EditableChartPropertiesClass", "OwnershipClass", "StatusClass", "GlobalTagsClass", "BrowsePathsClass", "GlossaryTermsClass", "InstitutionalMemoryClass"]], ): super().__init__() @@ -5219,12 +5219,12 @@ class ChartSnapshotClass(DictWrapper): @property - def aspects(self) -> List[Union["ChartKeyClass", "ChartInfoClass", "ChartQueryClass", "EditableChartPropertiesClass", "OwnershipClass", "StatusClass", "GlobalTagsClass", "BrowsePathsClass"]]: + def aspects(self) -> List[Union["ChartKeyClass", "ChartInfoClass", "ChartQueryClass", "EditableChartPropertiesClass", "OwnershipClass", "StatusClass", "GlobalTagsClass", "BrowsePathsClass", "GlossaryTermsClass", "InstitutionalMemoryClass"]]: """Getter: The list of metadata aspects associated with the chart. Depending on the use case, this can either be all, or a selection, of supported aspects.""" return self._inner_dict.get('aspects') # type: ignore @aspects.setter - def aspects(self, value: List[Union["ChartKeyClass", "ChartInfoClass", "ChartQueryClass", "EditableChartPropertiesClass", "OwnershipClass", "StatusClass", "GlobalTagsClass", "BrowsePathsClass"]]) -> None: + def aspects(self, value: List[Union["ChartKeyClass", "ChartInfoClass", "ChartQueryClass", "EditableChartPropertiesClass", "OwnershipClass", "StatusClass", "GlobalTagsClass", "BrowsePathsClass", "GlossaryTermsClass", "InstitutionalMemoryClass"]]) -> None: """Setter: The list of metadata aspects associated with the chart. Depending on the use case, this can either be all, or a selection, of supported aspects.""" self._inner_dict['aspects'] = value @@ -5329,7 +5329,7 @@ class DashboardSnapshotClass(DictWrapper): RECORD_SCHEMA = get_schema_type("com.linkedin.pegasus2avro.metadata.snapshot.DashboardSnapshot") def __init__(self, urn: str, - aspects: List[Union["DashboardKeyClass", "DashboardInfoClass", "EditableDashboardPropertiesClass", "OwnershipClass", "StatusClass", "GlobalTagsClass", "BrowsePathsClass"]], + aspects: List[Union["DashboardKeyClass", "DashboardInfoClass", "EditableDashboardPropertiesClass", "OwnershipClass", "StatusClass", "GlobalTagsClass", "BrowsePathsClass", "GlossaryTermsClass", "InstitutionalMemoryClass"]], ): super().__init__() @@ -5360,12 +5360,12 @@ class DashboardSnapshotClass(DictWrapper): @property - def aspects(self) -> List[Union["DashboardKeyClass", "DashboardInfoClass", "EditableDashboardPropertiesClass", "OwnershipClass", "StatusClass", "GlobalTagsClass", "BrowsePathsClass"]]: + def aspects(self) -> List[Union["DashboardKeyClass", "DashboardInfoClass", "EditableDashboardPropertiesClass", "OwnershipClass", "StatusClass", "GlobalTagsClass", "BrowsePathsClass", "GlossaryTermsClass", "InstitutionalMemoryClass"]]: """Getter: The list of metadata aspects associated with the dashboard. Depending on the use case, this can either be all, or a selection, of supported aspects.""" return self._inner_dict.get('aspects') # type: ignore @aspects.setter - def aspects(self, value: List[Union["DashboardKeyClass", "DashboardInfoClass", "EditableDashboardPropertiesClass", "OwnershipClass", "StatusClass", "GlobalTagsClass", "BrowsePathsClass"]]) -> None: + def aspects(self, value: List[Union["DashboardKeyClass", "DashboardInfoClass", "EditableDashboardPropertiesClass", "OwnershipClass", "StatusClass", "GlobalTagsClass", "BrowsePathsClass", "GlossaryTermsClass", "InstitutionalMemoryClass"]]) -> None: """Setter: The list of metadata aspects associated with the dashboard. Depending on the use case, this can either be all, or a selection, of supported aspects.""" self._inner_dict['aspects'] = value @@ -5376,7 +5376,7 @@ class DataFlowSnapshotClass(DictWrapper): RECORD_SCHEMA = get_schema_type("com.linkedin.pegasus2avro.metadata.snapshot.DataFlowSnapshot") def __init__(self, urn: str, - aspects: List[Union["DataFlowKeyClass", "DataFlowInfoClass", "EditableDataFlowPropertiesClass", "OwnershipClass", "StatusClass", "GlobalTagsClass", "BrowsePathsClass"]], + aspects: List[Union["DataFlowKeyClass", "DataFlowInfoClass", "EditableDataFlowPropertiesClass", "OwnershipClass", "StatusClass", "GlobalTagsClass", "BrowsePathsClass", "GlossaryTermsClass", "InstitutionalMemoryClass"]], ): super().__init__() @@ -5407,12 +5407,12 @@ class DataFlowSnapshotClass(DictWrapper): @property - def aspects(self) -> List[Union["DataFlowKeyClass", "DataFlowInfoClass", "EditableDataFlowPropertiesClass", "OwnershipClass", "StatusClass", "GlobalTagsClass", "BrowsePathsClass"]]: + def aspects(self) -> List[Union["DataFlowKeyClass", "DataFlowInfoClass", "EditableDataFlowPropertiesClass", "OwnershipClass", "StatusClass", "GlobalTagsClass", "BrowsePathsClass", "GlossaryTermsClass", "InstitutionalMemoryClass"]]: """Getter: The list of metadata aspects associated with the data flow. Depending on the use case, this can either be all, or a selection, of supported aspects.""" return self._inner_dict.get('aspects') # type: ignore @aspects.setter - def aspects(self, value: List[Union["DataFlowKeyClass", "DataFlowInfoClass", "EditableDataFlowPropertiesClass", "OwnershipClass", "StatusClass", "GlobalTagsClass", "BrowsePathsClass"]]) -> None: + def aspects(self, value: List[Union["DataFlowKeyClass", "DataFlowInfoClass", "EditableDataFlowPropertiesClass", "OwnershipClass", "StatusClass", "GlobalTagsClass", "BrowsePathsClass", "GlossaryTermsClass", "InstitutionalMemoryClass"]]) -> None: """Setter: The list of metadata aspects associated with the data flow. Depending on the use case, this can either be all, or a selection, of supported aspects.""" self._inner_dict['aspects'] = value @@ -5470,7 +5470,7 @@ class DataJobSnapshotClass(DictWrapper): RECORD_SCHEMA = get_schema_type("com.linkedin.pegasus2avro.metadata.snapshot.DataJobSnapshot") def __init__(self, urn: str, - aspects: List[Union["DataJobKeyClass", "DataJobInfoClass", "DataJobInputOutputClass", "EditableDataJobPropertiesClass", "OwnershipClass", "StatusClass", "GlobalTagsClass", "BrowsePathsClass"]], + aspects: List[Union["DataJobKeyClass", "DataJobInfoClass", "DataJobInputOutputClass", "EditableDataJobPropertiesClass", "OwnershipClass", "StatusClass", "GlobalTagsClass", "BrowsePathsClass", "GlossaryTermsClass", "InstitutionalMemoryClass"]], ): super().__init__() @@ -5501,12 +5501,12 @@ class DataJobSnapshotClass(DictWrapper): @property - def aspects(self) -> List[Union["DataJobKeyClass", "DataJobInfoClass", "DataJobInputOutputClass", "EditableDataJobPropertiesClass", "OwnershipClass", "StatusClass", "GlobalTagsClass", "BrowsePathsClass"]]: + def aspects(self) -> List[Union["DataJobKeyClass", "DataJobInfoClass", "DataJobInputOutputClass", "EditableDataJobPropertiesClass", "OwnershipClass", "StatusClass", "GlobalTagsClass", "BrowsePathsClass", "GlossaryTermsClass", "InstitutionalMemoryClass"]]: """Getter: The list of metadata aspects associated with the data job. Depending on the use case, this can either be all, or a selection, of supported aspects.""" return self._inner_dict.get('aspects') # type: ignore @aspects.setter - def aspects(self, value: List[Union["DataJobKeyClass", "DataJobInfoClass", "DataJobInputOutputClass", "EditableDataJobPropertiesClass", "OwnershipClass", "StatusClass", "GlobalTagsClass", "BrowsePathsClass"]]) -> None: + def aspects(self, value: List[Union["DataJobKeyClass", "DataJobInfoClass", "DataJobInputOutputClass", "EditableDataJobPropertiesClass", "OwnershipClass", "StatusClass", "GlobalTagsClass", "BrowsePathsClass", "GlossaryTermsClass", "InstitutionalMemoryClass"]]) -> None: """Setter: The list of metadata aspects associated with the data job. Depending on the use case, this can either be all, or a selection, of supported aspects.""" self._inner_dict['aspects'] = value diff --git a/metadata-ingestion/src/datahub/metadata/schemas/MetadataChangeEvent.avsc b/metadata-ingestion/src/datahub/metadata/schemas/MetadataChangeEvent.avsc index 7f402f51a7..adacd5c624 100644 --- a/metadata-ingestion/src/datahub/metadata/schemas/MetadataChangeEvent.avsc +++ b/metadata-ingestion/src/datahub/metadata/schemas/MetadataChangeEvent.avsc @@ -675,6 +675,92 @@ "Aspect": { "name": "browsePaths" } + }, + { + "type": "record", + "name": "GlossaryTerms", + "namespace": "com.linkedin.pegasus2avro.common", + "doc": "Related business terms information", + "fields": [ + { + "name": "terms", + "type": { + "type": "array", + "items": { + "type": "record", + "name": "GlossaryTermAssociation", + "doc": "Properties of an applied glossary term.", + "fields": [ + { + "name": "urn", + "type": "string", + "doc": "Urn of the applied glossary term", + "Searchable": { + "fieldName": "glossaryTerms", + "fieldType": "URN_PARTIAL" + }, + "java": { + "class": "com.linkedin.pegasus2avro.common.urn.GlossaryTermUrn" + } + } + ] + } + }, + "doc": "The related business terms" + }, + { + "name": "auditStamp", + "type": "AuditStamp", + "doc": "Audit stamp containing who reported the related business term" + } + ], + "Aspect": { + "name": "glossaryTerms" + } + }, + { + "type": "record", + "name": "InstitutionalMemory", + "namespace": "com.linkedin.pegasus2avro.common", + "doc": "Institutional memory of an entity. This is a way to link to relevant documentation and provide description of the documentation. Institutional or tribal knowledge is very important for users to leverage the entity.", + "fields": [ + { + "name": "elements", + "type": { + "type": "array", + "items": { + "type": "record", + "name": "InstitutionalMemoryMetadata", + "doc": "Metadata corresponding to a record of institutional memory.", + "fields": [ + { + "name": "url", + "type": "string", + "doc": "Link to an engineering design document or a wiki page.", + "java": { + "class": "com.linkedin.pegasus2avro.common.url.Url", + "coercerClass": "com.linkedin.pegasus2avro.common.url.UrlCoercer" + } + }, + { + "name": "description", + "type": "string", + "doc": "Description of the link." + }, + { + "name": "createStamp", + "type": "AuditStamp", + "doc": "Audit stamp associated with creation of this record" + } + ] + } + }, + "doc": "List of records that represent institutional memory of an entity. Each record consists of a link, description, creator and timestamps associated with that record." + } + ], + "Aspect": { + "name": "institutionalMemory" + } } ] }, @@ -1329,7 +1415,9 @@ "com.linkedin.pegasus2avro.common.Ownership", "com.linkedin.pegasus2avro.common.Status", "com.linkedin.pegasus2avro.common.GlobalTags", - "com.linkedin.pegasus2avro.common.BrowsePaths" + "com.linkedin.pegasus2avro.common.BrowsePaths", + "com.linkedin.pegasus2avro.common.GlossaryTerms", + "com.linkedin.pegasus2avro.common.InstitutionalMemory" ] }, "doc": "The list of metadata aspects associated with the dashboard. Depending on the use case, this can either be all, or a selection, of supported aspects." @@ -1525,7 +1613,9 @@ "com.linkedin.pegasus2avro.common.Ownership", "com.linkedin.pegasus2avro.common.Status", "com.linkedin.pegasus2avro.common.GlobalTags", - "com.linkedin.pegasus2avro.common.BrowsePaths" + "com.linkedin.pegasus2avro.common.BrowsePaths", + "com.linkedin.pegasus2avro.common.GlossaryTerms", + "com.linkedin.pegasus2avro.common.InstitutionalMemory" ] }, "doc": "The list of metadata aspects associated with the data flow. Depending on the use case, this can either be all, or a selection, of supported aspects." @@ -1870,7 +1960,9 @@ "com.linkedin.pegasus2avro.common.Ownership", "com.linkedin.pegasus2avro.common.Status", "com.linkedin.pegasus2avro.common.GlobalTags", - "com.linkedin.pegasus2avro.common.BrowsePaths" + "com.linkedin.pegasus2avro.common.BrowsePaths", + "com.linkedin.pegasus2avro.common.GlossaryTerms", + "com.linkedin.pegasus2avro.common.InstitutionalMemory" ] }, "doc": "The list of metadata aspects associated with the data job. Depending on the use case, this can either be all, or a selection, of supported aspects." @@ -2288,50 +2380,7 @@ "name": "upstreamLineage" } }, - { - "type": "record", - "name": "InstitutionalMemory", - "namespace": "com.linkedin.pegasus2avro.common", - "doc": "Institutional memory of an entity. This is a way to link to relevant documentation and provide description of the documentation. Institutional or tribal knowledge is very important for users to leverage the entity.", - "fields": [ - { - "name": "elements", - "type": { - "type": "array", - "items": { - "type": "record", - "name": "InstitutionalMemoryMetadata", - "doc": "Metadata corresponding to a record of institutional memory.", - "fields": [ - { - "name": "url", - "type": "string", - "doc": "Link to an engineering design document or a wiki page.", - "java": { - "class": "com.linkedin.pegasus2avro.common.url.Url", - "coercerClass": "com.linkedin.pegasus2avro.common.url.UrlCoercer" - } - }, - { - "name": "description", - "type": "string", - "doc": "Description of the link." - }, - { - "name": "createStamp", - "type": "AuditStamp", - "doc": "Audit stamp associated with creation of this record" - } - ] - } - }, - "doc": "List of records that represent institutional memory of an entity. Each record consists of a link, description, creator and timestamps associated with that record." - } - ], - "Aspect": { - "name": "institutionalMemory" - } - }, + "com.linkedin.pegasus2avro.common.InstitutionalMemory", "com.linkedin.pegasus2avro.common.Ownership", "com.linkedin.pegasus2avro.common.Status", { @@ -2776,48 +2825,7 @@ "name": "glossaryTerms", "type": [ "null", - { - "type": "record", - "name": "GlossaryTerms", - "namespace": "com.linkedin.pegasus2avro.common", - "doc": "Related business terms information", - "fields": [ - { - "name": "terms", - "type": { - "type": "array", - "items": { - "type": "record", - "name": "GlossaryTermAssociation", - "doc": "Properties of an applied glossary term.", - "fields": [ - { - "name": "urn", - "type": "string", - "doc": "Urn of the applied glossary term", - "Searchable": { - "fieldName": "glossaryTerms", - "fieldType": "URN_PARTIAL" - }, - "java": { - "class": "com.linkedin.pegasus2avro.common.urn.GlossaryTermUrn" - } - } - ] - } - }, - "doc": "The related business terms" - }, - { - "name": "auditStamp", - "type": "AuditStamp", - "doc": "Audit stamp containing who reported the related business term" - } - ], - "Aspect": { - "name": "glossaryTerms" - } - } + "com.linkedin.pegasus2avro.common.GlossaryTerms" ], "doc": "Glossary terms associated with the field", "default": null, @@ -3236,7 +3244,8 @@ "Entity": { "keyAspect": "dataProcessKey", "name": "dataProcess" - } + }, + "deprecated": "Use DataJob instead." }, { "type": "record", diff --git a/metadata-ingestion/tests/integration/hive/hive_mces_golden.json b/metadata-ingestion/tests/integration/hive/hive_mces_golden.json index 7a0b1d3620..7acb76939c 100644 --- a/metadata-ingestion/tests/integration/hive/hive_mces_golden.json +++ b/metadata-ingestion/tests/integration/hive/hive_mces_golden.json @@ -10,7 +10,7 @@ "customProperties": { "Database:": "db1", "Owner:": "root", - "CreateTime:": "Mon Aug 09 22:20:40 UTC 2021", + "CreateTime:": "Fri Oct 01 05:17:59 UTC 2021", "LastAccessTime:": "UNKNOWN", "Retention:": "0", "Location:": "hdfs://namenode:8020/user/hive/warehouse/db1.db/_test_table_underscore", @@ -20,7 +20,7 @@ "Table Parameters: numRows": "0", "Table Parameters: rawDataSize": "0", "Table Parameters: totalSize": "0", - "Table Parameters: transient_lastDdlTime": "1628547640", + "Table Parameters: transient_lastDdlTime": "1633065479", "SerDe Library:": "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe", "InputFormat:": "org.apache.hadoop.mapred.TextInputFormat", "OutputFormat:": "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat", @@ -120,7 +120,7 @@ "customProperties": { "Database:": "db1", "Owner:": "root", - "CreateTime:": "Mon Aug 09 22:20:41 UTC 2021", + "CreateTime:": "Fri Oct 01 05:17:59 UTC 2021", "LastAccessTime:": "UNKNOWN", "Retention:": "0", "Location:": "hdfs://namenode:8020/user/hive/warehouse/db1.db/array_struct_test", @@ -130,7 +130,7 @@ "Table Parameters: numRows": "1", "Table Parameters: rawDataSize": "32", "Table Parameters: totalSize": "33", - "Table Parameters: transient_lastDdlTime": "1628547648", + "Table Parameters: transient_lastDdlTime": "1633065482", "SerDe Library:": "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe", "InputFormat:": "org.apache.hadoop.mapred.TextInputFormat", "OutputFormat:": "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat", @@ -230,7 +230,7 @@ "customProperties": { "Database:": "db1", "Owner:": "root", - "CreateTime:": "Mon Aug 09 22:20:35 UTC 2021", + "CreateTime:": "Fri Oct 01 05:17:56 UTC 2021", "LastAccessTime:": "UNKNOWN", "Retention:": "0", "Location:": "hdfs://namenode:8020/user/hive/warehouse/db1.db/pokes", @@ -239,7 +239,7 @@ "Table Parameters: numRows": "0", "Table Parameters: rawDataSize": "0", "Table Parameters: totalSize": "5812", - "Table Parameters: transient_lastDdlTime": "1628547638", + "Table Parameters: transient_lastDdlTime": "1633065477", "SerDe Library:": "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe", "InputFormat:": "org.apache.hadoop.mapred.TextInputFormat", "OutputFormat:": "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat", @@ -339,7 +339,7 @@ "customProperties": { "Database:": "db1", "Owner:": "root", - "CreateTime:": "Mon Aug 09 22:20:40 UTC 2021", + "CreateTime:": "Fri Oct 01 05:17:59 UTC 2021", "LastAccessTime:": "UNKNOWN", "Retention:": "0", "Location:": "hdfs://namenode:8020/user/hive/warehouse/db1.db/struct_test", @@ -349,7 +349,7 @@ "Table Parameters: numRows": "0", "Table Parameters: rawDataSize": "0", "Table Parameters: totalSize": "0", - "Table Parameters: transient_lastDdlTime": "1628547640", + "Table Parameters: transient_lastDdlTime": "1633065479", "SerDe Library:": "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe", "InputFormat:": "org.apache.hadoop.mapred.TextInputFormat", "OutputFormat:": "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat", @@ -438,4 +438,4 @@ "properties": null } } -] +] \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/hive/hive_setup.sql b/metadata-ingestion/tests/integration/hive/hive_setup.sql index 41713b8500..465aa543e5 100644 --- a/metadata-ingestion/tests/integration/hive/hive_setup.sql +++ b/metadata-ingestion/tests/integration/hive/hive_setup.sql @@ -4,7 +4,7 @@ CREATE DATABASE IF NOT EXISTS db2; CREATE TABLE IF NOT EXISTS db1.pokes (foo INT, bar STRING); LOAD DATA LOCAL INPATH '/opt/hive/examples/files/kv1.txt' OVERWRITE INTO TABLE db1.pokes; -CREATE TABLE IF NOT EXISTS db2.pokes (foo INT, bar STRING); +CREATE TABLE IF NOT EXISTS db2.pokes (foo INT, bar STRING, primary key(foo) DISABLE NOVALIDATE NORELY); LOAD DATA LOCAL INPATH '/opt/hive/examples/files/kv1.txt' OVERWRITE INTO TABLE db2.pokes; -- Setup a table with a special character. diff --git a/metadata-ingestion/tests/integration/mysql/mysql_mces_golden.json b/metadata-ingestion/tests/integration/mysql/mysql_mces_golden.json index c862219da3..bff7c36b06 100644 --- a/metadata-ingestion/tests/integration/mysql/mysql_mces_golden.json +++ b/metadata-ingestion/tests/integration/mysql/mysql_mces_golden.json @@ -44,7 +44,7 @@ "recursive": false, "globalTags": null, "glossaryTerms": null, - "isPartOfKey": false + "isPartOfKey": true }, { "fieldPath": "aspect", @@ -60,7 +60,7 @@ "recursive": false, "globalTags": null, "glossaryTerms": null, - "isPartOfKey": false + "isPartOfKey": true }, { "fieldPath": "version", @@ -76,7 +76,7 @@ "recursive": false, "globalTags": null, "glossaryTerms": null, - "isPartOfKey": false + "isPartOfKey": true }, { "fieldPath": "metadata", @@ -212,7 +212,7 @@ "recursive": false, "globalTags": null, "glossaryTerms": null, - "isPartOfKey": false + "isPartOfKey": true }, { "fieldPath": "urn", @@ -494,7 +494,7 @@ "recursive": false, "globalTags": null, "glossaryTerms": null, - "isPartOfKey": false + "isPartOfKey": true }, { "fieldPath": "company", @@ -576,6 +576,112 @@ "properties": null } }, +{ + "auditHeader": null, + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { + "urn": "urn:li:dataset:(urn:li:dataPlatform:mysql,northwind.orders,PROD)", + "aspects": [ + { + "com.linkedin.pegasus2avro.schema.SchemaMetadata": { + "schemaName": "northwind.orders", + "platform": "urn:li:dataPlatform:mysql", + "version": 0, + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown", + "impersonator": null + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown", + "impersonator": null + }, + "deleted": null, + "dataset": null, + "cluster": null, + "hash": "", + "platformSchema": { + "com.linkedin.pegasus2avro.schema.MySqlDDL": { + "tableSchema": "" + } + }, + "fields": [ + { + "fieldPath": "id", + "jsonPath": null, + "nullable": false, + "description": null, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "nativeDataType": "INTEGER()", + "recursive": false, + "globalTags": null, + "glossaryTerms": null, + "isPartOfKey": true + }, + { + "fieldPath": "description", + "jsonPath": null, + "nullable": true, + "description": null, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "VARCHAR(length=50)", + "recursive": false, + "globalTags": null, + "glossaryTerms": null, + "isPartOfKey": false + }, + { + "fieldPath": "customer_id", + "jsonPath": null, + "nullable": false, + "description": null, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "nativeDataType": "INTEGER()", + "recursive": false, + "globalTags": null, + "glossaryTerms": null, + "isPartOfKey": false + } + ], + "primaryKeys": null, + "foreignKeysSpecs": null, + "foreignKeys": [ + { + "name": "fk_order_customer", + "foreignFields": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:mysql,northwind.customers,PROD), id)" + ], + "sourceFields": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:mysql,northwind.orders,PROD), customer_id)" + ], + "foreignDataset": "urn:li:dataset:(urn:li:dataPlatform:mysql,northwind.customers,PROD)" + } + ] + } + } + ] + } + }, + "proposedDelta": null, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "mysql-test", + "properties": null + } +}, { "auditHeader": null, "entityType": "dataset", @@ -589,6 +695,19 @@ }, "systemMetadata": null }, +{ + "auditHeader": null, + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:mysql,northwind.orders,PROD)", + "entityKeyAspect": null, + "changeType": "UPSERT", + "aspectName": "datasetProfile", + "aspect": { + "value": "{\"timestampMillis\": 1586847600000, \"rowCount\": 0, \"columnCount\": 3, \"fieldProfiles\": [{\"fieldPath\": \"id\", \"uniqueCount\": 0, \"nullCount\": 0, \"sampleValues\": []}, {\"fieldPath\": \"description\", \"uniqueCount\": 0, \"nullCount\": 0, \"sampleValues\": []}, {\"fieldPath\": \"customer_id\", \"uniqueCount\": 0, \"nullCount\": 0, \"sampleValues\": []}]}", + "contentType": "application/json" + }, + "systemMetadata": null +}, { "auditHeader": null, "proposedSnapshot": { @@ -665,4 +784,4 @@ }, "systemMetadata": null } -] +] \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/mysql/setup/setup.sql b/metadata-ingestion/tests/integration/mysql/setup/setup.sql index f86c002e27..2885a66909 100644 --- a/metadata-ingestion/tests/integration/mysql/setup/setup.sql +++ b/metadata-ingestion/tests/integration/mysql/setup/setup.sql @@ -74,6 +74,20 @@ CREATE TABLE IF NOT EXISTS `northwind`.`customers` ( ENGINE = InnoDB DEFAULT CHARACTER SET = utf8; +CREATE TABLE IF NOT EXISTS `northwind`.`orders` ( + `id` INT(11) NOT NULL AUTO_INCREMENT, + `description` VARCHAR(50) NULL DEFAULT NULL, + `customer_id` INT(11) NOT NULL, + PRIMARY KEY (`id`), + CONSTRAINT `fk_order_customer` + FOREIGN KEY (`customer_id`) + REFERENCES `northwind`.`customers`(`id`) + ON DELETE NO ACTION + ON UPDATE NO ACTION +) +ENGINE = InnoDB +DEFAULT CHARACTER SET = utf8; + -- Now, the actual sample data. USE `northwind`; @@ -105,5 +119,6 @@ ENGINE = InnoDB DEFAULT CHARACTER SET = utf8; + SET FOREIGN_KEY_CHECKS=@OLD_FOREIGN_KEY_CHECKS; SET UNIQUE_CHECKS=@OLD_UNIQUE_CHECKS; diff --git a/metadata-ingestion/tests/integration/sql_server/mssql_mces_golden.json b/metadata-ingestion/tests/integration/sql_server/mssql_mces_golden.json index b1a3b10a98..13ddf47006 100644 --- a/metadata-ingestion/tests/integration/sql_server/mssql_mces_golden.json +++ b/metadata-ingestion/tests/integration/sql_server/mssql_mces_golden.json @@ -156,5 +156,206 @@ "runId": "mssql-test", "properties": null } +}, +{ + "auditHeader": null, + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { + "urn": "urn:li:dataset:(urn:li:dataPlatform:mssql,DemoData.Foo.Persons,PROD)", + "aspects": [ + { + "com.linkedin.pegasus2avro.schema.SchemaMetadata": { + "schemaName": "DemoData.Foo.Persons", + "platform": "urn:li:dataPlatform:mssql", + "version": 0, + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown", + "impersonator": null + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown", + "impersonator": null + }, + "deleted": null, + "dataset": null, + "cluster": null, + "hash": "", + "platformSchema": { + "com.linkedin.pegasus2avro.schema.MySqlDDL": { + "tableSchema": "" + } + }, + "fields": [ + { + "fieldPath": "ID", + "jsonPath": null, + "nullable": false, + "description": null, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "nativeDataType": "INTEGER()", + "recursive": false, + "globalTags": null, + "glossaryTerms": null, + "isPartOfKey": true + }, + { + "fieldPath": "LastName", + "jsonPath": null, + "nullable": false, + "description": null, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "VARCHAR(length=255, collation='SQL_Latin1_General_CP1_CI_AS')", + "recursive": false, + "globalTags": null, + "glossaryTerms": null, + "isPartOfKey": false + }, + { + "fieldPath": "FirstName", + "jsonPath": null, + "nullable": true, + "description": null, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "VARCHAR(length=255, collation='SQL_Latin1_General_CP1_CI_AS')", + "recursive": false, + "globalTags": null, + "glossaryTerms": null, + "isPartOfKey": false + }, + { + "fieldPath": "Age", + "jsonPath": null, + "nullable": true, + "description": null, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "nativeDataType": "INTEGER()", + "recursive": false, + "globalTags": null, + "glossaryTerms": null, + "isPartOfKey": false + } + ], + "primaryKeys": null, + "foreignKeysSpecs": null, + "foreignKeys": null + } + } + ] + } + }, + "proposedDelta": null, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "mssql-test", + "properties": null + } +}, +{ + "auditHeader": null, + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { + "urn": "urn:li:dataset:(urn:li:dataPlatform:mssql,DemoData.Foo.SalesReason,PROD)", + "aspects": [ + { + "com.linkedin.pegasus2avro.schema.SchemaMetadata": { + "schemaName": "DemoData.Foo.SalesReason", + "platform": "urn:li:dataPlatform:mssql", + "version": 0, + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown", + "impersonator": null + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown", + "impersonator": null + }, + "deleted": null, + "dataset": null, + "cluster": null, + "hash": "", + "platformSchema": { + "com.linkedin.pegasus2avro.schema.MySqlDDL": { + "tableSchema": "" + } + }, + "fields": [ + { + "fieldPath": "TempID", + "jsonPath": null, + "nullable": false, + "description": null, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "nativeDataType": "INTEGER()", + "recursive": false, + "globalTags": null, + "glossaryTerms": null, + "isPartOfKey": true + }, + { + "fieldPath": "Name", + "jsonPath": null, + "nullable": true, + "description": null, + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "NVARCHAR(length=50)", + "recursive": false, + "globalTags": null, + "glossaryTerms": null, + "isPartOfKey": false + } + ], + "primaryKeys": null, + "foreignKeysSpecs": null, + "foreignKeys": [ + { + "name": "FK_TempSales_SalesReason", + "foreignFields": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:mssql,DemoData.Foo.Persons,PROD), ID)" + ], + "sourceFields": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:mssql,DemoData.Foo.SalesReason,PROD), TempID)" + ], + "foreignDataset": "urn:li:dataset:(urn:li:dataPlatform:mssql,DemoData.Foo.Persons,PROD)" + } + ] + } + } + ] + } + }, + "proposedDelta": null, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "mssql-test", + "properties": null + } } -] +] \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/sql_server/setup/setup.sql b/metadata-ingestion/tests/integration/sql_server/setup/setup.sql index 3b2cd2bda3..d37c2d6475 100644 --- a/metadata-ingestion/tests/integration/sql_server/setup/setup.sql +++ b/metadata-ingestion/tests/integration/sql_server/setup/setup.sql @@ -8,3 +8,22 @@ CREATE SCHEMA Foo; GO CREATE TABLE Foo.Items (ID int, ItemName nvarchar(max)); GO +CREATE TABLE Foo.Persons ( + ID int NOT NULL PRIMARY KEY, + LastName varchar(255) NOT NULL, + FirstName varchar(255), + Age int +); +GO +CREATE TABLE Foo.SalesReason + ( + TempID int NOT NULL, + Name nvarchar(50) + , CONSTRAINT PK_TempSales PRIMARY KEY NONCLUSTERED (TempID) + , CONSTRAINT FK_TempSales_SalesReason FOREIGN KEY (TempID) + REFERENCES Foo.Persons (ID) + ON DELETE CASCADE + ON UPDATE CASCADE + ) +; +GO