Table Constraints Added - Ingestion (#2854)

This commit is contained in:
Ayush Shah 2022-02-19 22:46:15 +05:30 committed by GitHub
parent 0e34028d05
commit d2c64007cb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 28 additions and 4 deletions

View File

@ -18,7 +18,6 @@
"description": "Unique identifier for the address.", "description": "Unique identifier for the address.",
"fullyQualifiedName": "bigquery_gcp.shopify.dim_address.address_id", "fullyQualifiedName": "bigquery_gcp.shopify.dim_address.address_id",
"tags": [], "tags": [],
"constraint": "PRIMARY_KEY",
"ordinalPosition": 1 "ordinalPosition": 1
}, },
{ {
@ -131,6 +130,15 @@
"ordinalPosition": 12 "ordinalPosition": 12
} }
], ],
"tableConstraints": [
{
"constraintType": "PRIMARY_KEY",
"columns": [
"address_id",
"shop_id"
]
}
],
"database": { "database": {
"id": "50da1ff8-4e1d-4967-8931-45edbf4dd908", "id": "50da1ff8-4e1d-4967-8931-45edbf4dd908",
"type": "database", "type": "database",

View File

@ -147,6 +147,7 @@ class MetadataRestSink(Sink[Entity]):
columns=db_and_table.table.columns, columns=db_and_table.table.columns,
description=db_and_table.table.description, description=db_and_table.table.description,
database=db_ref, database=db_ref,
tableConstraints=db_and_table.table.tableConstraints,
) )
if db_and_table.table.viewDefinition: if db_and_table.table.viewDefinition:
table_request.viewDefinition = ( table_request.viewDefinition = (
@ -219,6 +220,7 @@ class MetadataRestSink(Sink[Entity]):
db_and_table.database.name.__root__, db_and_table.database.name.__root__,
) )
) )
logger.debug(traceback.print_exc())
logger.error(err) logger.error(err)
self.status.failure(f"Table: {db_and_table.table.name.__root__}") self.status.failure(f"Table: {db_and_table.table.name.__root__}")

View File

@ -26,9 +26,11 @@ from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.table import ( from metadata.generated.schema.entity.data.table import (
Column, Column,
Constraint, Constraint,
ConstraintType,
DataModel, DataModel,
ModelType, ModelType,
Table, Table,
TableConstraint,
TableData, TableData,
TableProfile, TableProfile,
) )
@ -93,6 +95,7 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
self.connection = self.engine.connect() self.connection = self.engine.connect()
self.data_profiler = None self.data_profiler = None
self.data_models = {} self.data_models = {}
self.table_constraints = None
self.database_source_state = set() self.database_source_state = set()
if self.config.dbt_catalog_file is not None: if self.config.dbt_catalog_file is not None:
with open(self.config.dbt_catalog_file, "r", encoding="utf-8") as catalog: with open(self.config.dbt_catalog_file, "r", encoding="utf-8") as catalog:
@ -212,6 +215,7 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
description = _get_table_description(schema, table_name, inspector) description = _get_table_description(schema, table_name, inspector)
fqn = f"{self.config.service_name}.{schema}.{table_name}" fqn = f"{self.config.service_name}.{schema}.{table_name}"
self.database_source_state.add(fqn) self.database_source_state.add(fqn)
self.table_constraints = None
table_columns = self._get_columns(schema, table_name, inspector) table_columns = self._get_columns(schema, table_name, inspector)
table_entity = Table( table_entity = Table(
id=uuid.uuid4(), id=uuid.uuid4(),
@ -221,6 +225,8 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
fullyQualifiedName=fqn, fullyQualifiedName=fqn,
columns=table_columns, columns=table_columns,
) )
if self.table_constraints:
table_entity.tableConstraints = self.table_constraints
try: try:
if self.sql_config.generate_sample_data: if self.sql_config.generate_sample_data:
table_data = self.fetch_sample_data(schema, table_name) table_data = self.fetch_sample_data(schema, table_name)
@ -371,7 +377,7 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
) )
model_fqdn = f"{schema}.{model_name}" model_fqdn = f"{schema}.{model_name}"
except Exception as err: except Exception as err:
print(err) logger.error(err)
self.data_models[model_fqdn] = model self.data_models[model_fqdn] = model
def _parse_data_model_upstream(self, mnode): def _parse_data_model_upstream(self, mnode):
@ -441,17 +447,17 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
Prepare column constraints for the Table Entity Prepare column constraints for the Table Entity
""" """
constraint = None constraint = None
if column["nullable"]: if column["nullable"]:
constraint = Constraint.NULL constraint = Constraint.NULL
elif not column["nullable"]: elif not column["nullable"]:
constraint = Constraint.NOT_NULL constraint = Constraint.NOT_NULL
if column["name"] in pk_columns: if column["name"] in pk_columns:
if len(pk_columns) > 1:
return None
constraint = Constraint.PRIMARY_KEY constraint = Constraint.PRIMARY_KEY
elif column["name"] in unique_columns: elif column["name"] in unique_columns:
constraint = Constraint.UNIQUE constraint = Constraint.UNIQUE
return constraint return constraint
def _get_columns( def _get_columns(
@ -521,9 +527,17 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
data_type_display = column["type"] data_type_display = column["type"]
if parsed_string is None: if parsed_string is None:
col_type = ColumnTypeParser.get_column_type(column["type"]) col_type = ColumnTypeParser.get_column_type(column["type"])
col_constraint = self._get_column_constraints( col_constraint = self._get_column_constraints(
column, pk_columns, unique_columns column, pk_columns, unique_columns
) )
if not col_constraint and len(pk_columns) > 1:
self.table_constraints = [
TableConstraint(
constraintType=ConstraintType.PRIMARY_KEY,
columns=pk_columns,
)
]
col_data_length = self._check_col_length( col_data_length = self._check_col_length(
col_type, column["type"] col_type, column["type"]
) )