feat(ingest): support for primary and foreign key extraction from sql sources (#3316)

This commit is contained in:
Swaroop Jagadish 2021-09-30 23:04:47 -07:00 committed by GitHub
parent a7dbf07484
commit df0b4f258b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 643 additions and 216 deletions

View File

@ -39,6 +39,7 @@ from datahub.metadata.com.linkedin.pegasus2avro.schema import (
BytesTypeClass,
DateTypeClass,
EnumTypeClass,
ForeignKeyConstraint,
MySqlDDL,
NullTypeClass,
NumberTypeClass,
@ -239,9 +240,15 @@ def get_column_type(
def get_schema_metadata(
sql_report: SQLSourceReport, dataset_name: str, platform: str, columns: List[dict]
sql_report: SQLSourceReport,
dataset_name: str,
platform: str,
columns: List[dict],
pk_constraints: dict = None,
foreign_keys: List[ForeignKeyConstraint] = None,
) -> SchemaMetadata:
canonical_schema: List[SchemaField] = []
for column in columns:
field = SchemaField(
fieldPath=column["name"],
@ -251,6 +258,12 @@ def get_schema_metadata(
nullable=column["nullable"],
recursive=False,
)
if (
pk_constraints is not None
and isinstance(pk_constraints, dict) # some dialects (hive) return list
and column["name"] in pk_constraints.get("constrained_columns", [])
):
field.isPartOfKey = True
canonical_schema.append(field)
schema_metadata = SchemaMetadata(
@ -261,6 +274,9 @@ def get_schema_metadata(
platformSchema=MySqlDDL(tableSchema=""),
fields=canonical_schema,
)
if foreign_keys is not None and foreign_keys != []:
schema_metadata.foreignKeys = foreign_keys
return schema_metadata
@ -337,6 +353,27 @@ class SQLAlchemySource(Source):
else:
return f"{schema}.{entity}"
def get_foreign_key_metadata(self, datasetUrn, fk_dict, inspector):
referred_dataset_name = self.get_identifier(
schema=fk_dict["referred_schema"],
entity=fk_dict["referred_table"],
inspector=inspector,
)
source_fields = [
f"urn:li:schemaField:({datasetUrn}, {f})"
for f in fk_dict["constrained_columns"]
]
foreign_dataset = f"urn:li:dataset:(urn:li:dataPlatform:{self.platform},{referred_dataset_name},{self.config.env})"
foreign_fields = [
f"urn:li:schemaField:({foreign_dataset}, {f})"
for f in fk_dict["referred_columns"]
]
return ForeignKeyConstraint(
fk_dict["name"], foreign_fields, source_fields, foreign_dataset
)
def loop_tables(
self,
inspector: Inspector,
@ -373,21 +410,39 @@ class SQLAlchemySource(Source):
# The "properties" field is a non-standard addition to SQLAlchemy's interface.
properties = table_info.get("properties", {})
# TODO: capture inspector.get_pk_constraint
# TODO: capture inspector.get_sorted_table_and_fkc_names
datasetUrn = f"urn:li:dataset:(urn:li:dataPlatform:{self.platform},{dataset_name},{self.config.env})"
dataset_snapshot = DatasetSnapshot(
urn=f"urn:li:dataset:(urn:li:dataPlatform:{self.platform},{dataset_name},{self.config.env})",
urn=datasetUrn,
aspects=[],
)
if description is not None or properties:
dataset_properties = DatasetPropertiesClass(
description=description,
customProperties=properties,
)
dataset_snapshot.aspects.append(dataset_properties)
pk_constraints: dict = inspector.get_pk_constraint(table, schema)
try:
foreign_keys = [
self.get_foreign_key_metadata(datasetUrn, fk_rec, inspector)
for fk_rec in inspector.get_foreign_keys(table, schema)
]
except KeyError:
# certain databases like MySQL cause issues due to lower-case/upper-case irregularities
logger.debug(
f"{datasetUrn}: failure in foreign key extraction... skipping"
)
foreign_keys = []
schema_metadata = get_schema_metadata(
self.report, dataset_name, self.platform, columns
self.report,
dataset_name,
self.platform,
columns,
pk_constraints,
foreign_keys,
)
dataset_snapshot.aspects.append(schema_metadata)

View File

@ -686,6 +686,94 @@
}
],
"doc": "Shared aspect containing Browse Paths to be indexed for an entity."
},
{
"type": "record",
"Aspect": {
"name": "glossaryTerms"
},
"name": "GlossaryTerms",
"namespace": "com.linkedin.pegasus2avro.common",
"fields": [
{
"type": {
"type": "array",
"items": {
"type": "record",
"name": "GlossaryTermAssociation",
"namespace": "com.linkedin.pegasus2avro.common",
"fields": [
{
"Searchable": {
"fieldName": "glossaryTerms",
"fieldType": "URN_PARTIAL"
},
"java": {
"class": "com.linkedin.pegasus2avro.common.urn.GlossaryTermUrn"
},
"type": "string",
"name": "urn",
"doc": "Urn of the applied glossary term"
}
],
"doc": "Properties of an applied glossary term."
}
},
"name": "terms",
"doc": "The related business terms"
},
{
"type": "com.linkedin.pegasus2avro.common.AuditStamp",
"name": "auditStamp",
"doc": "Audit stamp containing who reported the related business term"
}
],
"doc": "Related business terms information"
},
{
"type": "record",
"Aspect": {
"name": "institutionalMemory"
},
"name": "InstitutionalMemory",
"namespace": "com.linkedin.pegasus2avro.common",
"fields": [
{
"type": {
"type": "array",
"items": {
"type": "record",
"name": "InstitutionalMemoryMetadata",
"namespace": "com.linkedin.pegasus2avro.common",
"fields": [
{
"java": {
"class": "com.linkedin.pegasus2avro.common.url.Url",
"coercerClass": "com.linkedin.pegasus2avro.common.url.UrlCoercer"
},
"type": "string",
"name": "url",
"doc": "Link to an engineering design document or a wiki page."
},
{
"type": "string",
"name": "description",
"doc": "Description of the link."
},
{
"type": "com.linkedin.pegasus2avro.common.AuditStamp",
"name": "createStamp",
"doc": "Audit stamp associated with creation of this record"
}
],
"doc": "Metadata corresponding to a record of institutional memory."
}
},
"name": "elements",
"doc": "List of records that represent institutional memory of an entity. Each record consists of a link, description, creator and timestamps associated with that record."
}
],
"doc": "Institutional memory of an entity. This is a way to link to relevant documentation and provide description of the documentation. Institutional or tribal knowledge is very important for users to leverage the entity."
}
]
},
@ -1340,7 +1428,9 @@
"com.linkedin.pegasus2avro.common.Ownership",
"com.linkedin.pegasus2avro.common.Status",
"com.linkedin.pegasus2avro.common.GlobalTags",
"com.linkedin.pegasus2avro.common.BrowsePaths"
"com.linkedin.pegasus2avro.common.BrowsePaths",
"com.linkedin.pegasus2avro.common.GlossaryTerms",
"com.linkedin.pegasus2avro.common.InstitutionalMemory"
]
},
"name": "aspects",
@ -1536,7 +1626,9 @@
"com.linkedin.pegasus2avro.common.Ownership",
"com.linkedin.pegasus2avro.common.Status",
"com.linkedin.pegasus2avro.common.GlobalTags",
"com.linkedin.pegasus2avro.common.BrowsePaths"
"com.linkedin.pegasus2avro.common.BrowsePaths",
"com.linkedin.pegasus2avro.common.GlossaryTerms",
"com.linkedin.pegasus2avro.common.InstitutionalMemory"
]
},
"name": "aspects",
@ -1882,7 +1974,9 @@
"com.linkedin.pegasus2avro.common.Ownership",
"com.linkedin.pegasus2avro.common.Status",
"com.linkedin.pegasus2avro.common.GlobalTags",
"com.linkedin.pegasus2avro.common.BrowsePaths"
"com.linkedin.pegasus2avro.common.BrowsePaths",
"com.linkedin.pegasus2avro.common.GlossaryTerms",
"com.linkedin.pegasus2avro.common.InstitutionalMemory"
]
},
"name": "aspects",
@ -2303,51 +2397,7 @@
],
"doc": "Upstream lineage of a dataset"
},
{
"type": "record",
"Aspect": {
"name": "institutionalMemory"
},
"name": "InstitutionalMemory",
"namespace": "com.linkedin.pegasus2avro.common",
"fields": [
{
"type": {
"type": "array",
"items": {
"type": "record",
"name": "InstitutionalMemoryMetadata",
"namespace": "com.linkedin.pegasus2avro.common",
"fields": [
{
"java": {
"class": "com.linkedin.pegasus2avro.common.url.Url",
"coercerClass": "com.linkedin.pegasus2avro.common.url.UrlCoercer"
},
"type": "string",
"name": "url",
"doc": "Link to an engineering design document or a wiki page."
},
{
"type": "string",
"name": "description",
"doc": "Description of the link."
},
{
"type": "com.linkedin.pegasus2avro.common.AuditStamp",
"name": "createStamp",
"doc": "Audit stamp associated with creation of this record"
}
],
"doc": "Metadata corresponding to a record of institutional memory."
}
},
"name": "elements",
"doc": "List of records that represent institutional memory of an entity. Each record consists of a link, description, creator and timestamps associated with that record."
}
],
"doc": "Institutional memory of an entity. This is a way to link to relevant documentation and provide description of the documentation. Institutional or tribal knowledge is very important for users to leverage the entity."
},
"com.linkedin.pegasus2avro.common.InstitutionalMemory",
"com.linkedin.pegasus2avro.common.Ownership",
"com.linkedin.pegasus2avro.common.Status",
{
@ -2823,49 +2873,7 @@
},
"type": [
"null",
{
"type": "record",
"Aspect": {
"name": "glossaryTerms"
},
"name": "GlossaryTerms",
"namespace": "com.linkedin.pegasus2avro.common",
"fields": [
{
"type": {
"type": "array",
"items": {
"type": "record",
"name": "GlossaryTermAssociation",
"namespace": "com.linkedin.pegasus2avro.common",
"fields": [
{
"Searchable": {
"fieldName": "glossaryTerms",
"fieldType": "URN_PARTIAL"
},
"java": {
"class": "com.linkedin.pegasus2avro.common.urn.GlossaryTermUrn"
},
"type": "string",
"name": "urn",
"doc": "Urn of the applied glossary term"
}
],
"doc": "Properties of an applied glossary term."
}
},
"name": "terms",
"doc": "The related business terms"
},
{
"type": "com.linkedin.pegasus2avro.common.AuditStamp",
"name": "auditStamp",
"doc": "Audit stamp containing who reported the related business term"
}
],
"doc": "Related business terms information"
}
"com.linkedin.pegasus2avro.common.GlossaryTerms"
],
"name": "glossaryTerms",
"default": null,
@ -3152,6 +3160,7 @@
"keyAspect": "dataProcessKey",
"name": "dataProcess"
},
"deprecated": "Use DataJob instead.",
"name": "DataProcessSnapshot",
"namespace": "com.linkedin.pegasus2avro.metadata.snapshot",
"fields": [

View File

@ -5188,7 +5188,7 @@ class ChartSnapshotClass(DictWrapper):
RECORD_SCHEMA = get_schema_type("com.linkedin.pegasus2avro.metadata.snapshot.ChartSnapshot")
def __init__(self,
urn: str,
aspects: List[Union["ChartKeyClass", "ChartInfoClass", "ChartQueryClass", "EditableChartPropertiesClass", "OwnershipClass", "StatusClass", "GlobalTagsClass", "BrowsePathsClass"]],
aspects: List[Union["ChartKeyClass", "ChartInfoClass", "ChartQueryClass", "EditableChartPropertiesClass", "OwnershipClass", "StatusClass", "GlobalTagsClass", "BrowsePathsClass", "GlossaryTermsClass", "InstitutionalMemoryClass"]],
):
super().__init__()
@ -5219,12 +5219,12 @@ class ChartSnapshotClass(DictWrapper):
@property
def aspects(self) -> List[Union["ChartKeyClass", "ChartInfoClass", "ChartQueryClass", "EditableChartPropertiesClass", "OwnershipClass", "StatusClass", "GlobalTagsClass", "BrowsePathsClass"]]:
def aspects(self) -> List[Union["ChartKeyClass", "ChartInfoClass", "ChartQueryClass", "EditableChartPropertiesClass", "OwnershipClass", "StatusClass", "GlobalTagsClass", "BrowsePathsClass", "GlossaryTermsClass", "InstitutionalMemoryClass"]]:
"""Getter: The list of metadata aspects associated with the chart. Depending on the use case, this can either be all, or a selection, of supported aspects."""
return self._inner_dict.get('aspects') # type: ignore
@aspects.setter
def aspects(self, value: List[Union["ChartKeyClass", "ChartInfoClass", "ChartQueryClass", "EditableChartPropertiesClass", "OwnershipClass", "StatusClass", "GlobalTagsClass", "BrowsePathsClass"]]) -> None:
def aspects(self, value: List[Union["ChartKeyClass", "ChartInfoClass", "ChartQueryClass", "EditableChartPropertiesClass", "OwnershipClass", "StatusClass", "GlobalTagsClass", "BrowsePathsClass", "GlossaryTermsClass", "InstitutionalMemoryClass"]]) -> None:
"""Setter: The list of metadata aspects associated with the chart. Depending on the use case, this can either be all, or a selection, of supported aspects."""
self._inner_dict['aspects'] = value
@ -5329,7 +5329,7 @@ class DashboardSnapshotClass(DictWrapper):
RECORD_SCHEMA = get_schema_type("com.linkedin.pegasus2avro.metadata.snapshot.DashboardSnapshot")
def __init__(self,
urn: str,
aspects: List[Union["DashboardKeyClass", "DashboardInfoClass", "EditableDashboardPropertiesClass", "OwnershipClass", "StatusClass", "GlobalTagsClass", "BrowsePathsClass"]],
aspects: List[Union["DashboardKeyClass", "DashboardInfoClass", "EditableDashboardPropertiesClass", "OwnershipClass", "StatusClass", "GlobalTagsClass", "BrowsePathsClass", "GlossaryTermsClass", "InstitutionalMemoryClass"]],
):
super().__init__()
@ -5360,12 +5360,12 @@ class DashboardSnapshotClass(DictWrapper):
@property
def aspects(self) -> List[Union["DashboardKeyClass", "DashboardInfoClass", "EditableDashboardPropertiesClass", "OwnershipClass", "StatusClass", "GlobalTagsClass", "BrowsePathsClass"]]:
def aspects(self) -> List[Union["DashboardKeyClass", "DashboardInfoClass", "EditableDashboardPropertiesClass", "OwnershipClass", "StatusClass", "GlobalTagsClass", "BrowsePathsClass", "GlossaryTermsClass", "InstitutionalMemoryClass"]]:
"""Getter: The list of metadata aspects associated with the dashboard. Depending on the use case, this can either be all, or a selection, of supported aspects."""
return self._inner_dict.get('aspects') # type: ignore
@aspects.setter
def aspects(self, value: List[Union["DashboardKeyClass", "DashboardInfoClass", "EditableDashboardPropertiesClass", "OwnershipClass", "StatusClass", "GlobalTagsClass", "BrowsePathsClass"]]) -> None:
def aspects(self, value: List[Union["DashboardKeyClass", "DashboardInfoClass", "EditableDashboardPropertiesClass", "OwnershipClass", "StatusClass", "GlobalTagsClass", "BrowsePathsClass", "GlossaryTermsClass", "InstitutionalMemoryClass"]]) -> None:
"""Setter: The list of metadata aspects associated with the dashboard. Depending on the use case, this can either be all, or a selection, of supported aspects."""
self._inner_dict['aspects'] = value
@ -5376,7 +5376,7 @@ class DataFlowSnapshotClass(DictWrapper):
RECORD_SCHEMA = get_schema_type("com.linkedin.pegasus2avro.metadata.snapshot.DataFlowSnapshot")
def __init__(self,
urn: str,
aspects: List[Union["DataFlowKeyClass", "DataFlowInfoClass", "EditableDataFlowPropertiesClass", "OwnershipClass", "StatusClass", "GlobalTagsClass", "BrowsePathsClass"]],
aspects: List[Union["DataFlowKeyClass", "DataFlowInfoClass", "EditableDataFlowPropertiesClass", "OwnershipClass", "StatusClass", "GlobalTagsClass", "BrowsePathsClass", "GlossaryTermsClass", "InstitutionalMemoryClass"]],
):
super().__init__()
@ -5407,12 +5407,12 @@ class DataFlowSnapshotClass(DictWrapper):
@property
def aspects(self) -> List[Union["DataFlowKeyClass", "DataFlowInfoClass", "EditableDataFlowPropertiesClass", "OwnershipClass", "StatusClass", "GlobalTagsClass", "BrowsePathsClass"]]:
def aspects(self) -> List[Union["DataFlowKeyClass", "DataFlowInfoClass", "EditableDataFlowPropertiesClass", "OwnershipClass", "StatusClass", "GlobalTagsClass", "BrowsePathsClass", "GlossaryTermsClass", "InstitutionalMemoryClass"]]:
"""Getter: The list of metadata aspects associated with the data flow. Depending on the use case, this can either be all, or a selection, of supported aspects."""
return self._inner_dict.get('aspects') # type: ignore
@aspects.setter
def aspects(self, value: List[Union["DataFlowKeyClass", "DataFlowInfoClass", "EditableDataFlowPropertiesClass", "OwnershipClass", "StatusClass", "GlobalTagsClass", "BrowsePathsClass"]]) -> None:
def aspects(self, value: List[Union["DataFlowKeyClass", "DataFlowInfoClass", "EditableDataFlowPropertiesClass", "OwnershipClass", "StatusClass", "GlobalTagsClass", "BrowsePathsClass", "GlossaryTermsClass", "InstitutionalMemoryClass"]]) -> None:
"""Setter: The list of metadata aspects associated with the data flow. Depending on the use case, this can either be all, or a selection, of supported aspects."""
self._inner_dict['aspects'] = value
@ -5470,7 +5470,7 @@ class DataJobSnapshotClass(DictWrapper):
RECORD_SCHEMA = get_schema_type("com.linkedin.pegasus2avro.metadata.snapshot.DataJobSnapshot")
def __init__(self,
urn: str,
aspects: List[Union["DataJobKeyClass", "DataJobInfoClass", "DataJobInputOutputClass", "EditableDataJobPropertiesClass", "OwnershipClass", "StatusClass", "GlobalTagsClass", "BrowsePathsClass"]],
aspects: List[Union["DataJobKeyClass", "DataJobInfoClass", "DataJobInputOutputClass", "EditableDataJobPropertiesClass", "OwnershipClass", "StatusClass", "GlobalTagsClass", "BrowsePathsClass", "GlossaryTermsClass", "InstitutionalMemoryClass"]],
):
super().__init__()
@ -5501,12 +5501,12 @@ class DataJobSnapshotClass(DictWrapper):
@property
def aspects(self) -> List[Union["DataJobKeyClass", "DataJobInfoClass", "DataJobInputOutputClass", "EditableDataJobPropertiesClass", "OwnershipClass", "StatusClass", "GlobalTagsClass", "BrowsePathsClass"]]:
def aspects(self) -> List[Union["DataJobKeyClass", "DataJobInfoClass", "DataJobInputOutputClass", "EditableDataJobPropertiesClass", "OwnershipClass", "StatusClass", "GlobalTagsClass", "BrowsePathsClass", "GlossaryTermsClass", "InstitutionalMemoryClass"]]:
"""Getter: The list of metadata aspects associated with the data job. Depending on the use case, this can either be all, or a selection, of supported aspects."""
return self._inner_dict.get('aspects') # type: ignore
@aspects.setter
def aspects(self, value: List[Union["DataJobKeyClass", "DataJobInfoClass", "DataJobInputOutputClass", "EditableDataJobPropertiesClass", "OwnershipClass", "StatusClass", "GlobalTagsClass", "BrowsePathsClass"]]) -> None:
def aspects(self, value: List[Union["DataJobKeyClass", "DataJobInfoClass", "DataJobInputOutputClass", "EditableDataJobPropertiesClass", "OwnershipClass", "StatusClass", "GlobalTagsClass", "BrowsePathsClass", "GlossaryTermsClass", "InstitutionalMemoryClass"]]) -> None:
"""Setter: The list of metadata aspects associated with the data job. Depending on the use case, this can either be all, or a selection, of supported aspects."""
self._inner_dict['aspects'] = value

View File

@ -675,6 +675,92 @@
"Aspect": {
"name": "browsePaths"
}
},
{
"type": "record",
"name": "GlossaryTerms",
"namespace": "com.linkedin.pegasus2avro.common",
"doc": "Related business terms information",
"fields": [
{
"name": "terms",
"type": {
"type": "array",
"items": {
"type": "record",
"name": "GlossaryTermAssociation",
"doc": "Properties of an applied glossary term.",
"fields": [
{
"name": "urn",
"type": "string",
"doc": "Urn of the applied glossary term",
"Searchable": {
"fieldName": "glossaryTerms",
"fieldType": "URN_PARTIAL"
},
"java": {
"class": "com.linkedin.pegasus2avro.common.urn.GlossaryTermUrn"
}
}
]
}
},
"doc": "The related business terms"
},
{
"name": "auditStamp",
"type": "AuditStamp",
"doc": "Audit stamp containing who reported the related business term"
}
],
"Aspect": {
"name": "glossaryTerms"
}
},
{
"type": "record",
"name": "InstitutionalMemory",
"namespace": "com.linkedin.pegasus2avro.common",
"doc": "Institutional memory of an entity. This is a way to link to relevant documentation and provide description of the documentation. Institutional or tribal knowledge is very important for users to leverage the entity.",
"fields": [
{
"name": "elements",
"type": {
"type": "array",
"items": {
"type": "record",
"name": "InstitutionalMemoryMetadata",
"doc": "Metadata corresponding to a record of institutional memory.",
"fields": [
{
"name": "url",
"type": "string",
"doc": "Link to an engineering design document or a wiki page.",
"java": {
"class": "com.linkedin.pegasus2avro.common.url.Url",
"coercerClass": "com.linkedin.pegasus2avro.common.url.UrlCoercer"
}
},
{
"name": "description",
"type": "string",
"doc": "Description of the link."
},
{
"name": "createStamp",
"type": "AuditStamp",
"doc": "Audit stamp associated with creation of this record"
}
]
}
},
"doc": "List of records that represent institutional memory of an entity. Each record consists of a link, description, creator and timestamps associated with that record."
}
],
"Aspect": {
"name": "institutionalMemory"
}
}
]
},
@ -1329,7 +1415,9 @@
"com.linkedin.pegasus2avro.common.Ownership",
"com.linkedin.pegasus2avro.common.Status",
"com.linkedin.pegasus2avro.common.GlobalTags",
"com.linkedin.pegasus2avro.common.BrowsePaths"
"com.linkedin.pegasus2avro.common.BrowsePaths",
"com.linkedin.pegasus2avro.common.GlossaryTerms",
"com.linkedin.pegasus2avro.common.InstitutionalMemory"
]
},
"doc": "The list of metadata aspects associated with the dashboard. Depending on the use case, this can either be all, or a selection, of supported aspects."
@ -1525,7 +1613,9 @@
"com.linkedin.pegasus2avro.common.Ownership",
"com.linkedin.pegasus2avro.common.Status",
"com.linkedin.pegasus2avro.common.GlobalTags",
"com.linkedin.pegasus2avro.common.BrowsePaths"
"com.linkedin.pegasus2avro.common.BrowsePaths",
"com.linkedin.pegasus2avro.common.GlossaryTerms",
"com.linkedin.pegasus2avro.common.InstitutionalMemory"
]
},
"doc": "The list of metadata aspects associated with the data flow. Depending on the use case, this can either be all, or a selection, of supported aspects."
@ -1870,7 +1960,9 @@
"com.linkedin.pegasus2avro.common.Ownership",
"com.linkedin.pegasus2avro.common.Status",
"com.linkedin.pegasus2avro.common.GlobalTags",
"com.linkedin.pegasus2avro.common.BrowsePaths"
"com.linkedin.pegasus2avro.common.BrowsePaths",
"com.linkedin.pegasus2avro.common.GlossaryTerms",
"com.linkedin.pegasus2avro.common.InstitutionalMemory"
]
},
"doc": "The list of metadata aspects associated with the data job. Depending on the use case, this can either be all, or a selection, of supported aspects."
@ -2288,50 +2380,7 @@
"name": "upstreamLineage"
}
},
{
"type": "record",
"name": "InstitutionalMemory",
"namespace": "com.linkedin.pegasus2avro.common",
"doc": "Institutional memory of an entity. This is a way to link to relevant documentation and provide description of the documentation. Institutional or tribal knowledge is very important for users to leverage the entity.",
"fields": [
{
"name": "elements",
"type": {
"type": "array",
"items": {
"type": "record",
"name": "InstitutionalMemoryMetadata",
"doc": "Metadata corresponding to a record of institutional memory.",
"fields": [
{
"name": "url",
"type": "string",
"doc": "Link to an engineering design document or a wiki page.",
"java": {
"class": "com.linkedin.pegasus2avro.common.url.Url",
"coercerClass": "com.linkedin.pegasus2avro.common.url.UrlCoercer"
}
},
{
"name": "description",
"type": "string",
"doc": "Description of the link."
},
{
"name": "createStamp",
"type": "AuditStamp",
"doc": "Audit stamp associated with creation of this record"
}
]
}
},
"doc": "List of records that represent institutional memory of an entity. Each record consists of a link, description, creator and timestamps associated with that record."
}
],
"Aspect": {
"name": "institutionalMemory"
}
},
"com.linkedin.pegasus2avro.common.InstitutionalMemory",
"com.linkedin.pegasus2avro.common.Ownership",
"com.linkedin.pegasus2avro.common.Status",
{
@ -2776,48 +2825,7 @@
"name": "glossaryTerms",
"type": [
"null",
{
"type": "record",
"name": "GlossaryTerms",
"namespace": "com.linkedin.pegasus2avro.common",
"doc": "Related business terms information",
"fields": [
{
"name": "terms",
"type": {
"type": "array",
"items": {
"type": "record",
"name": "GlossaryTermAssociation",
"doc": "Properties of an applied glossary term.",
"fields": [
{
"name": "urn",
"type": "string",
"doc": "Urn of the applied glossary term",
"Searchable": {
"fieldName": "glossaryTerms",
"fieldType": "URN_PARTIAL"
},
"java": {
"class": "com.linkedin.pegasus2avro.common.urn.GlossaryTermUrn"
}
}
]
}
},
"doc": "The related business terms"
},
{
"name": "auditStamp",
"type": "AuditStamp",
"doc": "Audit stamp containing who reported the related business term"
}
],
"Aspect": {
"name": "glossaryTerms"
}
}
"com.linkedin.pegasus2avro.common.GlossaryTerms"
],
"doc": "Glossary terms associated with the field",
"default": null,
@ -3236,7 +3244,8 @@
"Entity": {
"keyAspect": "dataProcessKey",
"name": "dataProcess"
}
},
"deprecated": "Use DataJob instead."
},
{
"type": "record",

View File

@ -10,7 +10,7 @@
"customProperties": {
"Database:": "db1",
"Owner:": "root",
"CreateTime:": "Mon Aug 09 22:20:40 UTC 2021",
"CreateTime:": "Fri Oct 01 05:17:59 UTC 2021",
"LastAccessTime:": "UNKNOWN",
"Retention:": "0",
"Location:": "hdfs://namenode:8020/user/hive/warehouse/db1.db/_test_table_underscore",
@ -20,7 +20,7 @@
"Table Parameters: numRows": "0",
"Table Parameters: rawDataSize": "0",
"Table Parameters: totalSize": "0",
"Table Parameters: transient_lastDdlTime": "1628547640",
"Table Parameters: transient_lastDdlTime": "1633065479",
"SerDe Library:": "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe",
"InputFormat:": "org.apache.hadoop.mapred.TextInputFormat",
"OutputFormat:": "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat",
@ -120,7 +120,7 @@
"customProperties": {
"Database:": "db1",
"Owner:": "root",
"CreateTime:": "Mon Aug 09 22:20:41 UTC 2021",
"CreateTime:": "Fri Oct 01 05:17:59 UTC 2021",
"LastAccessTime:": "UNKNOWN",
"Retention:": "0",
"Location:": "hdfs://namenode:8020/user/hive/warehouse/db1.db/array_struct_test",
@ -130,7 +130,7 @@
"Table Parameters: numRows": "1",
"Table Parameters: rawDataSize": "32",
"Table Parameters: totalSize": "33",
"Table Parameters: transient_lastDdlTime": "1628547648",
"Table Parameters: transient_lastDdlTime": "1633065482",
"SerDe Library:": "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe",
"InputFormat:": "org.apache.hadoop.mapred.TextInputFormat",
"OutputFormat:": "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat",
@ -230,7 +230,7 @@
"customProperties": {
"Database:": "db1",
"Owner:": "root",
"CreateTime:": "Mon Aug 09 22:20:35 UTC 2021",
"CreateTime:": "Fri Oct 01 05:17:56 UTC 2021",
"LastAccessTime:": "UNKNOWN",
"Retention:": "0",
"Location:": "hdfs://namenode:8020/user/hive/warehouse/db1.db/pokes",
@ -239,7 +239,7 @@
"Table Parameters: numRows": "0",
"Table Parameters: rawDataSize": "0",
"Table Parameters: totalSize": "5812",
"Table Parameters: transient_lastDdlTime": "1628547638",
"Table Parameters: transient_lastDdlTime": "1633065477",
"SerDe Library:": "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe",
"InputFormat:": "org.apache.hadoop.mapred.TextInputFormat",
"OutputFormat:": "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat",
@ -339,7 +339,7 @@
"customProperties": {
"Database:": "db1",
"Owner:": "root",
"CreateTime:": "Mon Aug 09 22:20:40 UTC 2021",
"CreateTime:": "Fri Oct 01 05:17:59 UTC 2021",
"LastAccessTime:": "UNKNOWN",
"Retention:": "0",
"Location:": "hdfs://namenode:8020/user/hive/warehouse/db1.db/struct_test",
@ -349,7 +349,7 @@
"Table Parameters: numRows": "0",
"Table Parameters: rawDataSize": "0",
"Table Parameters: totalSize": "0",
"Table Parameters: transient_lastDdlTime": "1628547640",
"Table Parameters: transient_lastDdlTime": "1633065479",
"SerDe Library:": "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe",
"InputFormat:": "org.apache.hadoop.mapred.TextInputFormat",
"OutputFormat:": "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat",
@ -438,4 +438,4 @@
"properties": null
}
}
]
]

View File

@ -4,7 +4,7 @@ CREATE DATABASE IF NOT EXISTS db2;
CREATE TABLE IF NOT EXISTS db1.pokes (foo INT, bar STRING);
LOAD DATA LOCAL INPATH '/opt/hive/examples/files/kv1.txt' OVERWRITE INTO TABLE db1.pokes;
CREATE TABLE IF NOT EXISTS db2.pokes (foo INT, bar STRING);
CREATE TABLE IF NOT EXISTS db2.pokes (foo INT, bar STRING, primary key(foo) DISABLE NOVALIDATE NORELY);
LOAD DATA LOCAL INPATH '/opt/hive/examples/files/kv1.txt' OVERWRITE INTO TABLE db2.pokes;
-- Setup a table with a special character.

View File

@ -44,7 +44,7 @@
"recursive": false,
"globalTags": null,
"glossaryTerms": null,
"isPartOfKey": false
"isPartOfKey": true
},
{
"fieldPath": "aspect",
@ -60,7 +60,7 @@
"recursive": false,
"globalTags": null,
"glossaryTerms": null,
"isPartOfKey": false
"isPartOfKey": true
},
{
"fieldPath": "version",
@ -76,7 +76,7 @@
"recursive": false,
"globalTags": null,
"glossaryTerms": null,
"isPartOfKey": false
"isPartOfKey": true
},
{
"fieldPath": "metadata",
@ -212,7 +212,7 @@
"recursive": false,
"globalTags": null,
"glossaryTerms": null,
"isPartOfKey": false
"isPartOfKey": true
},
{
"fieldPath": "urn",
@ -494,7 +494,7 @@
"recursive": false,
"globalTags": null,
"glossaryTerms": null,
"isPartOfKey": false
"isPartOfKey": true
},
{
"fieldPath": "company",
@ -576,6 +576,112 @@
"properties": null
}
},
{
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:mysql,northwind.orders,PROD)",
"aspects": [
{
"com.linkedin.pegasus2avro.schema.SchemaMetadata": {
"schemaName": "northwind.orders",
"platform": "urn:li:dataPlatform:mysql",
"version": 0,
"created": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null
},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null
},
"deleted": null,
"dataset": null,
"cluster": null,
"hash": "",
"platformSchema": {
"com.linkedin.pegasus2avro.schema.MySqlDDL": {
"tableSchema": ""
}
},
"fields": [
{
"fieldPath": "id",
"jsonPath": null,
"nullable": false,
"description": null,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.NumberType": {}
}
},
"nativeDataType": "INTEGER()",
"recursive": false,
"globalTags": null,
"glossaryTerms": null,
"isPartOfKey": true
},
{
"fieldPath": "description",
"jsonPath": null,
"nullable": true,
"description": null,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "VARCHAR(length=50)",
"recursive": false,
"globalTags": null,
"glossaryTerms": null,
"isPartOfKey": false
},
{
"fieldPath": "customer_id",
"jsonPath": null,
"nullable": false,
"description": null,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.NumberType": {}
}
},
"nativeDataType": "INTEGER()",
"recursive": false,
"globalTags": null,
"glossaryTerms": null,
"isPartOfKey": false
}
],
"primaryKeys": null,
"foreignKeysSpecs": null,
"foreignKeys": [
{
"name": "fk_order_customer",
"foreignFields": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:mysql,northwind.customers,PROD), id)"
],
"sourceFields": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:mysql,northwind.orders,PROD), customer_id)"
],
"foreignDataset": "urn:li:dataset:(urn:li:dataPlatform:mysql,northwind.customers,PROD)"
}
]
}
}
]
}
},
"proposedDelta": null,
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "mysql-test",
"properties": null
}
},
{
"auditHeader": null,
"entityType": "dataset",
@ -589,6 +695,19 @@
},
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:mysql,northwind.orders,PROD)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "datasetProfile",
"aspect": {
"value": "{\"timestampMillis\": 1586847600000, \"rowCount\": 0, \"columnCount\": 3, \"fieldProfiles\": [{\"fieldPath\": \"id\", \"uniqueCount\": 0, \"nullCount\": 0, \"sampleValues\": []}, {\"fieldPath\": \"description\", \"uniqueCount\": 0, \"nullCount\": 0, \"sampleValues\": []}, {\"fieldPath\": \"customer_id\", \"uniqueCount\": 0, \"nullCount\": 0, \"sampleValues\": []}]}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"proposedSnapshot": {
@ -665,4 +784,4 @@
},
"systemMetadata": null
}
]
]

View File

@ -74,6 +74,20 @@ CREATE TABLE IF NOT EXISTS `northwind`.`customers` (
ENGINE = InnoDB
DEFAULT CHARACTER SET = utf8;
CREATE TABLE IF NOT EXISTS `northwind`.`orders` (
`id` INT(11) NOT NULL AUTO_INCREMENT,
`description` VARCHAR(50) NULL DEFAULT NULL,
`customer_id` INT(11) NOT NULL,
PRIMARY KEY (`id`),
CONSTRAINT `fk_order_customer`
FOREIGN KEY (`customer_id`)
REFERENCES `northwind`.`customers`(`id`)
ON DELETE NO ACTION
ON UPDATE NO ACTION
)
ENGINE = InnoDB
DEFAULT CHARACTER SET = utf8;
-- Now, the actual sample data.
USE `northwind`;
@ -105,5 +119,6 @@ ENGINE = InnoDB
DEFAULT CHARACTER SET = utf8;
SET FOREIGN_KEY_CHECKS=@OLD_FOREIGN_KEY_CHECKS;
SET UNIQUE_CHECKS=@OLD_UNIQUE_CHECKS;

View File

@ -156,5 +156,206 @@
"runId": "mssql-test",
"properties": null
}
},
{
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:mssql,DemoData.Foo.Persons,PROD)",
"aspects": [
{
"com.linkedin.pegasus2avro.schema.SchemaMetadata": {
"schemaName": "DemoData.Foo.Persons",
"platform": "urn:li:dataPlatform:mssql",
"version": 0,
"created": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null
},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null
},
"deleted": null,
"dataset": null,
"cluster": null,
"hash": "",
"platformSchema": {
"com.linkedin.pegasus2avro.schema.MySqlDDL": {
"tableSchema": ""
}
},
"fields": [
{
"fieldPath": "ID",
"jsonPath": null,
"nullable": false,
"description": null,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.NumberType": {}
}
},
"nativeDataType": "INTEGER()",
"recursive": false,
"globalTags": null,
"glossaryTerms": null,
"isPartOfKey": true
},
{
"fieldPath": "LastName",
"jsonPath": null,
"nullable": false,
"description": null,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "VARCHAR(length=255, collation='SQL_Latin1_General_CP1_CI_AS')",
"recursive": false,
"globalTags": null,
"glossaryTerms": null,
"isPartOfKey": false
},
{
"fieldPath": "FirstName",
"jsonPath": null,
"nullable": true,
"description": null,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "VARCHAR(length=255, collation='SQL_Latin1_General_CP1_CI_AS')",
"recursive": false,
"globalTags": null,
"glossaryTerms": null,
"isPartOfKey": false
},
{
"fieldPath": "Age",
"jsonPath": null,
"nullable": true,
"description": null,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.NumberType": {}
}
},
"nativeDataType": "INTEGER()",
"recursive": false,
"globalTags": null,
"glossaryTerms": null,
"isPartOfKey": false
}
],
"primaryKeys": null,
"foreignKeysSpecs": null,
"foreignKeys": null
}
}
]
}
},
"proposedDelta": null,
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "mssql-test",
"properties": null
}
},
{
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:mssql,DemoData.Foo.SalesReason,PROD)",
"aspects": [
{
"com.linkedin.pegasus2avro.schema.SchemaMetadata": {
"schemaName": "DemoData.Foo.SalesReason",
"platform": "urn:li:dataPlatform:mssql",
"version": 0,
"created": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null
},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null
},
"deleted": null,
"dataset": null,
"cluster": null,
"hash": "",
"platformSchema": {
"com.linkedin.pegasus2avro.schema.MySqlDDL": {
"tableSchema": ""
}
},
"fields": [
{
"fieldPath": "TempID",
"jsonPath": null,
"nullable": false,
"description": null,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.NumberType": {}
}
},
"nativeDataType": "INTEGER()",
"recursive": false,
"globalTags": null,
"glossaryTerms": null,
"isPartOfKey": true
},
{
"fieldPath": "Name",
"jsonPath": null,
"nullable": true,
"description": null,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "NVARCHAR(length=50)",
"recursive": false,
"globalTags": null,
"glossaryTerms": null,
"isPartOfKey": false
}
],
"primaryKeys": null,
"foreignKeysSpecs": null,
"foreignKeys": [
{
"name": "FK_TempSales_SalesReason",
"foreignFields": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:mssql,DemoData.Foo.Persons,PROD), ID)"
],
"sourceFields": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:mssql,DemoData.Foo.SalesReason,PROD), TempID)"
],
"foreignDataset": "urn:li:dataset:(urn:li:dataPlatform:mssql,DemoData.Foo.Persons,PROD)"
}
]
}
}
]
}
},
"proposedDelta": null,
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "mssql-test",
"properties": null
}
}
]
]

View File

@ -8,3 +8,22 @@ CREATE SCHEMA Foo;
GO
CREATE TABLE Foo.Items (ID int, ItemName nvarchar(max));
GO
CREATE TABLE Foo.Persons (
ID int NOT NULL PRIMARY KEY,
LastName varchar(255) NOT NULL,
FirstName varchar(255),
Age int
);
GO
CREATE TABLE Foo.SalesReason
(
TempID int NOT NULL,
Name nvarchar(50)
, CONSTRAINT PK_TempSales PRIMARY KEY NONCLUSTERED (TempID)
, CONSTRAINT FK_TempSales_SalesReason FOREIGN KEY (TempID)
REFERENCES Foo.Persons (ID)
ON DELETE CASCADE
ON UPDATE CASCADE
)
;
GO