diff --git a/ingestion/examples/sample_data/datasets/tables.json b/ingestion/examples/sample_data/datasets/tables.json index 7814de8cc3f..ee3f4080b74 100644 --- a/ingestion/examples/sample_data/datasets/tables.json +++ b/ingestion/examples/sample_data/datasets/tables.json @@ -18,7 +18,6 @@ "description": "Unique identifier for the address.", "fullyQualifiedName": "bigquery_gcp.shopify.dim_address.address_id", "tags": [], - "constraint": "PRIMARY_KEY", "ordinalPosition": 1 }, { @@ -131,6 +130,15 @@ "ordinalPosition": 12 } ], + "tableConstraints": [ + { + "constraintType": "PRIMARY_KEY", + "columns": [ + "address_id", + "shop_id" + ] + } + ], "database": { "id": "50da1ff8-4e1d-4967-8931-45edbf4dd908", "type": "database", diff --git a/ingestion/src/metadata/ingestion/sink/metadata_rest.py b/ingestion/src/metadata/ingestion/sink/metadata_rest.py index ab39df85a61..907ebe6d9ff 100644 --- a/ingestion/src/metadata/ingestion/sink/metadata_rest.py +++ b/ingestion/src/metadata/ingestion/sink/metadata_rest.py @@ -147,6 +147,7 @@ class MetadataRestSink(Sink[Entity]): columns=db_and_table.table.columns, description=db_and_table.table.description, database=db_ref, + tableConstraints=db_and_table.table.tableConstraints, ) if db_and_table.table.viewDefinition: table_request.viewDefinition = ( @@ -219,6 +220,7 @@ class MetadataRestSink(Sink[Entity]): db_and_table.database.name.__root__, ) ) + logger.debug(traceback.print_exc()) logger.error(err) self.status.failure(f"Table: {db_and_table.table.name.__root__}") diff --git a/ingestion/src/metadata/ingestion/source/sql_source.py b/ingestion/src/metadata/ingestion/source/sql_source.py index a9488f5cca5..060c71a9f49 100644 --- a/ingestion/src/metadata/ingestion/source/sql_source.py +++ b/ingestion/src/metadata/ingestion/source/sql_source.py @@ -26,9 +26,11 @@ from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.data.table import ( Column, Constraint, + ConstraintType, DataModel, ModelType, Table, + TableConstraint, TableData, TableProfile, ) @@ -93,6 +95,7 @@ class SQLSource(Source[OMetaDatabaseAndTable]): self.connection = self.engine.connect() self.data_profiler = None self.data_models = {} + self.table_constraints = None self.database_source_state = set() if self.config.dbt_catalog_file is not None: with open(self.config.dbt_catalog_file, "r", encoding="utf-8") as catalog: @@ -212,6 +215,7 @@ class SQLSource(Source[OMetaDatabaseAndTable]): description = _get_table_description(schema, table_name, inspector) fqn = f"{self.config.service_name}.{schema}.{table_name}" self.database_source_state.add(fqn) + self.table_constraints = None table_columns = self._get_columns(schema, table_name, inspector) table_entity = Table( id=uuid.uuid4(), @@ -221,6 +225,8 @@ class SQLSource(Source[OMetaDatabaseAndTable]): fullyQualifiedName=fqn, columns=table_columns, ) + if self.table_constraints: + table_entity.tableConstraints = self.table_constraints try: if self.sql_config.generate_sample_data: table_data = self.fetch_sample_data(schema, table_name) @@ -371,7 +377,7 @@ class SQLSource(Source[OMetaDatabaseAndTable]): ) model_fqdn = f"{schema}.{model_name}" except Exception as err: - print(err) + logger.error(err) self.data_models[model_fqdn] = model def _parse_data_model_upstream(self, mnode): @@ -441,17 +447,17 @@ class SQLSource(Source[OMetaDatabaseAndTable]): Prepare column constraints for the Table Entity """ constraint = None - if column["nullable"]: constraint = Constraint.NULL elif not column["nullable"]: constraint = Constraint.NOT_NULL if column["name"] in pk_columns: + if len(pk_columns) > 1: + return None constraint = Constraint.PRIMARY_KEY elif column["name"] in unique_columns: constraint = Constraint.UNIQUE - return constraint def _get_columns( @@ -521,9 +527,17 @@ class SQLSource(Source[OMetaDatabaseAndTable]): data_type_display = column["type"] if parsed_string is None: col_type = ColumnTypeParser.get_column_type(column["type"]) + col_constraint = self._get_column_constraints( column, pk_columns, unique_columns ) + if not col_constraint and len(pk_columns) > 1: + self.table_constraints = [ + TableConstraint( + constraintType=ConstraintType.PRIMARY_KEY, + columns=pk_columns, + ) + ] col_data_length = self._check_col_length( col_type, column["type"] )