diff --git a/ingestion/examples/sample_data/mysql/database.json b/ingestion/examples/sample_data/mysql/database.json new file mode 100644 index 00000000000..0b4869f0cf4 --- /dev/null +++ b/ingestion/examples/sample_data/mysql/database.json @@ -0,0 +1,8 @@ +{ + "id": null, + "name": "default", + "service": { + "id": "b946d870-03b2-4d33-a075-13665a7a76b9", + "type": "MYSQL" + } +} \ No newline at end of file diff --git a/ingestion/examples/sample_data/mysql/database_schema.json b/ingestion/examples/sample_data/mysql/database_schema.json new file mode 100644 index 00000000000..6c0b28d304f --- /dev/null +++ b/ingestion/examples/sample_data/mysql/database_schema.json @@ -0,0 +1,8 @@ +{ + "id": null, + "name": "posts_db", + "service": { + "id": "b946d870-03b2-4d33-a075-13665a7a76b9", + "type": "MYSQL" + } +} \ No newline at end of file diff --git a/ingestion/examples/sample_data/mysql/database_service.json b/ingestion/examples/sample_data/mysql/database_service.json new file mode 100644 index 00000000000..56126dbf150 --- /dev/null +++ b/ingestion/examples/sample_data/mysql/database_service.json @@ -0,0 +1,20 @@ +{ + "type": "mysql", + "serviceName": "mysql_sample", + "serviceConnection": { + "config": { + "type": "Mysql", + "hostPort": "localhost:3306", + "username": "openmetadata_user", + "authType": { + "password": "openmetadata_password" + }, + "databaseSchema": "posts_db" + } + }, + "sourceConfig": { + "config": { + "type": "DatabaseMetadata" + } + } +} \ No newline at end of file diff --git a/ingestion/examples/sample_data/mysql/tables.json b/ingestion/examples/sample_data/mysql/tables.json new file mode 100644 index 00000000000..b93b3995310 --- /dev/null +++ b/ingestion/examples/sample_data/mysql/tables.json @@ -0,0 +1,573 @@ +{ + "tables": [ + { + "name": "Tags", + "displayName": null, + "fullyQualifiedName": "mysql_sample.default.new_er_database.Tags", + "description": null, + "tableType": "Regular", + "columns": [ + { + "name": "tag_id", + "dataType": "INT", + "dataLength": 1, + "description": null, + "constraint": "PRIMARY_KEY", + "ordinalPosition": 1 + }, + { + "name": "name", + "dataType": "VARCHAR", + "dataLength": 100, + "dataTypeDisplay": "varchar(100)", + "description": null, + "constraint": "NOT_NULL", + "ordinalPosition": 2 + } + ], + "databaseSchema": { + "id": "5f40fbdc-7652-4bb5-8dd8-5834c382b8cf", + "type": "databaseSchema", + "name": "new_er_database", + "fullyQualifiedName": "mysql_sample.default.new_er_database", + "description": null, + "displayName": "new_er_database", + "deleted": false, + "inherited": null + }, + "database": { + "id": "9ec40d31-2cc3-434b-b79e-93a22ffb695b", + "type": "database", + "name": "default", + "fullyQualifiedName": "mysql_sample.default", + "description": null, + "displayName": "default", + "deleted": false, + "inherited": null + }, + "service": { + "id": "93fd8fbb-cecd-46b5-ae9a-0f6cda13a923", + "type": "databaseService", + "name": "mysql_sample", + "fullyQualifiedName": "mysql_sample", + "description": null, + "displayName": "mysql_sample", + "deleted": false, + "inherited": null + } + }, + { + "name": "Users", + "displayName": null, + "fullyQualifiedName": "mysql_sample.default.new_er_database.Users", + "description": null, + "tableType": "Regular", + "columns": [ + { + "name": "user_id", + "dataType": "INT", + "dataLength": 1, + "dataTypeDisplay": "int", + "constraint": "PRIMARY_KEY", + "ordinalPosition": 1 + }, + { + "name": "username", + "dataType": "VARCHAR", + "dataLength": 50, + "dataTypeDisplay": "varchar(50)", + "constraint": "NOT_NULL", + "ordinalPosition": 2 + }, + { + "name": "email", + "dataType": "VARCHAR", + "dataLength": 100, + "dataTypeDisplay": "varchar(100)", + "constraint": "NOT_NULL", + "ordinalPosition": 3 + }, + { + "name": "created_at", + "dataType": "TIMESTAMP", + "dataLength": 1, + "dataTypeDisplay": "timestamp", + "constraint": "NULL", + "ordinalPosition": 4 + } + ], + "databaseSchema": { + "id": "5f40fbdc-7652-4bb5-8dd8-5834c382b8cf", + "type": "databaseSchema", + "name": "new_er_database", + "fullyQualifiedName": "mysql_sample.default.new_er_database", + "description": null, + "displayName": "new_er_database", + "deleted": false, + "inherited": null + }, + "database": { + "id": "9ec40d31-2cc3-434b-b79e-93a22ffb695b", + "type": "database", + "name": "default", + "fullyQualifiedName": "mysql_sample.default", + "description": null, + "displayName": "default", + "deleted": false, + "inherited": null + }, + "service": { + "id": "93fd8fbb-cecd-46b5-ae9a-0f6cda13a923", + "type": "databaseService", + "name": "mysql_sample", + "fullyQualifiedName": "mysql_sample", + "description": null, + "displayName": "mysql_sample", + "deleted": false, + "inherited": null + } + }, + { + "name": "Categories", + "displayName": null, + "fullyQualifiedName": "mysql_sample.default.posts_db.Categories", + "description": null, + "tableType": "Regular", + "columns": [ + { + "name": "category_id", + "displayName": null, + "dataType": "INT", + "dataLength": 100, + "dataTypeDisplay": "int", + "description": null, + "constraint": "PRIMARY_KEY", + "ordinalPosition": 1 + }, + { + "name": "name", + "dataType": "VARCHAR", + "dataLength": 100, + "dataTypeDisplay": "varchar(100)", + "description": null, + "constraint": "NOT_NULL", + "ordinalPosition": 2 + } + ], + "databaseSchema": { + "type": "databaseSchema", + "name": "posts_db", + "fullyQualifiedName": "mysql_sample.default.posts_db", + "description": null, + "displayName": "posts_db", + "deleted": false, + "inherited": null + }, + "database": { + "type": "database", + "name": "default", + "fullyQualifiedName": "mysql_sample.default", + "description": null, + "displayName": "default", + "deleted": false, + "inherited": null + }, + "service": { + "type": "databaseService", + "name": "mysql_sample", + "fullyQualifiedName": "mysql_sample", + "description": null, + "displayName": "mysql_sample", + "deleted": false, + "inherited": null + } + }, + { + "name": "Comments", + "displayName": null, + "fullyQualifiedName": "mysql_sample.default.posts_db.Comments", + "description": null, + "tableType": "Regular", + "columns": [ + { + "name": "comment_id", + "displayName": null, + "dataType": "INT", + "dataTypeDisplay": "int", + "description": null, + "constraint": "PRIMARY_KEY", + "ordinalPosition": 1 + }, + { + "name": "post_id", + "displayName": null, + "dataType": "INT", + "dataTypeDisplay": "int", + "description": null, + "constraint": "NULL", + "ordinalPosition": 2 + }, + { + "name": "user_id", + "displayName": null, + "dataType": "INT", + "dataTypeDisplay": "int", + "description": null, + "constraint": "NULL", + "ordinalPosition": 3 + }, + { + "name": "comment", + "displayName": null, + "dataType": "TEXT", + "dataTypeDisplay": "text", + "description": null, + "constraint": "NOT_NULL", + "ordinalPosition": 4 + }, + { + "name": "created_at", + "displayName": null, + "dataType": "TIMESTAMP", + "dataTypeDisplay": "timestamp", + "description": null, + "constraint": "NULL", + "ordinalPosition": 5 + } + ], + "tableConstraints": [ + { + "constraintType": "FOREIGN_KEY", + "columns": [ + "post_id" + ], + "referredColumns": [ + "mysql_sample.default.posts_db.Posts.post_id" + ], + "relationshipType": "MANY_TO_ONE" + }, + { + "constraintType": "FOREIGN_KEY", + "columns": [ + "user_id" + ], + "referredColumns": [ + "mysql_sample.default.posts_db.Users.user_id" + ], + "relationshipType": "MANY_TO_ONE" + } + ], + "databaseSchema": { + "type": "databaseSchema", + "name": "posts_db", + "fullyQualifiedName": "mysql_sample.default.posts_db", + "description": null, + "displayName": "posts_db", + "deleted": false, + "inherited": null + }, + "database": { + "type": "database", + "name": "default", + "fullyQualifiedName": "mysql_sample.default", + "description": null, + "displayName": "default", + "deleted": false, + "inherited": null + }, + "service": { + "type": "databaseService", + "name": "mysql_sample", + "fullyQualifiedName": "mysql_sample", + "description": null, + "displayName": "mysql_sample", + "deleted": false, + "inherited": null + } + }, + { + "name": "Posts", + "displayName": null, + "fullyQualifiedName": "mysql_sample.default.posts_db.Posts", + "description": null, + "tableType": "Regular", + "columns": [ + { + "name": "post_id", + "displayName": null, + "dataType": "INT", + "dataTypeDisplay": "int", + "description": null, + "constraint": "PRIMARY_KEY", + "ordinalPosition": 1 + }, + { + "name": "user_id", + "displayName": null, + "dataType": "INT", + "dataTypeDisplay": "int", + "description": null, + "constraint": "NULL", + "ordinalPosition": 2 + }, + { + "name": "category_id", + "displayName": null, + "dataType": "INT", + "dataTypeDisplay": "int", + "description": null, + "constraint": "NULL", + "ordinalPosition": 3 + }, + { + "name": "title", + "displayName": null, + "dataType": "VARCHAR", + "arrayDataType": null, + "dataLength": 255, + "precision": null, + "scale": null, + "dataTypeDisplay": "varchar(255)", + "description": null, + "constraint": "NOT_NULL", + "ordinalPosition": 4 + }, + { + "name": "content", + "displayName": null, + "dataType": "TEXT", + "dataTypeDisplay": "text", + "description": null, + "constraint": "NOT_NULL", + "ordinalPosition": 5 + }, + { + "name": "created_at", + "displayName": null, + "dataType": "TIMESTAMP", + "dataTypeDisplay": "timestamp", + "description": null, + "constraint": "NULL", + "ordinalPosition": 6 + } + ], + "tableConstraints": [ + { + "constraintType": "FOREIGN_KEY", + "columns": [ + "user_id" + ], + "referredColumns": [ + "mysql_sample.default.posts_db.Users.user_id" + ], + "relationshipType": "MANY_TO_ONE" + }, + { + "constraintType": "FOREIGN_KEY", + "columns": [ + "category_id" + ], + "referredColumns": [ + "mysql_sample.default.posts_db.Categories.category_id" + ], + "relationshipType": "MANY_TO_ONE" + } + ], + "databaseSchema": { + "id": "3b2c045a-03ea-4303-abf3-082ac4e73804", + "type": "databaseSchema", + "name": "posts_db", + "fullyQualifiedName": "mysql_sample.default.posts_db", + "description": null, + "displayName": "posts_db", + "deleted": false, + "inherited": null + }, + "database": { + "id": "c1a9f3bf-8bb8-43e3-beeb-e4c2293b977a", + "type": "database", + "name": "default", + "fullyQualifiedName": "mysql_sample.default", + "description": null, + "displayName": "default", + "deleted": false, + "inherited": null + }, + "service": { + "id": "c0382692-7cf3-40b7-9aa7-b14bf2cbecdd", + "type": "databaseService", + "name": "mysql_sample", + "fullyQualifiedName": "mysql_sample", + "description": null, + "displayName": "mysql_sample", + "deleted": false, + "inherited": null + } + }, + { + "name": "PostTags", + "displayName": null, + "fullyQualifiedName": "mysql_sample.default.posts_db.PostTags", + "description": "testdesc2", + "tableType": "Regular", + "columns": [ + { + "name": "post_id", + "displayName": null, + "dataType": "INT", + "dataTypeDisplay": "int", + "description": null, + "constraint": null, + "ordinalPosition": 1 + }, + { + "name": "tag_id", + "displayName": null, + "dataType": "INT", + "dataTypeDisplay": "int", + "description": null, + "constraint": null, + "ordinalPosition": 2 + } + ], + "tableConstraints": [ + { + "constraintType": "PRIMARY_KEY", + "columns": [ + "post_id", + "tag_id" + ], + "referredColumns": null, + "relationshipType": null + }, + { + "constraintType": "FOREIGN_KEY", + "columns": [ + "post_id" + ], + "referredColumns": [ + "mysql_sample.default.posts_db.Posts.post_id" + ], + "relationshipType": "MANY_TO_ONE" + }, + { + "constraintType": "FOREIGN_KEY", + "columns": [ + "tag_id" + ], + "referredColumns": [ + "mysql_sample.default.posts_db.Tags.tag_id" + ], + "relationshipType": "MANY_TO_ONE" + } + ], + "databaseSchema": { + "type": "databaseSchema", + "name": "posts_db", + "fullyQualifiedName": "mysql_sample.default.posts_db", + "description": null, + "displayName": "posts_db", + "deleted": false, + "inherited": null + }, + "database": { + "id": "c1a9f3bf-8bb8-43e3-beeb-e4c2293b977a", + "type": "database", + "name": "default", + "fullyQualifiedName": "mysql_sample.default", + "description": null, + "displayName": "default", + "deleted": false, + "inherited": null + }, + "service": { + "id": "c0382692-7cf3-40b7-9aa7-b14bf2cbecdd", + "type": "databaseService", + "name": "mysql_sample", + "fullyQualifiedName": "mysql_sample", + "description": null, + "displayName": "mysql_sample", + "deleted": false, + "inherited": null + } + }, + { + "name": "Profiles", + "displayName": null, + "fullyQualifiedName": "mysql_sample.default.posts_db.Profiles", + "description": null, + "tableType": "Regular", + "columns": [ + { + "name": "profile_id", + "displayName": null, + "dataType": "INT", + "dataTypeDisplay": "int", + "description": null, + "constraint": "PRIMARY_KEY", + "ordinalPosition": 1 + }, + { + "name": "user_id", + "displayName": null, + "dataType": "INT", + "dataTypeDisplay": "int", + "description": null, + "constraint": "UNIQUE", + "ordinalPosition": 2 + }, + { + "name": "bio", + "displayName": null, + "dataType": "TEXT", + "dataTypeDisplay": "text", + "description": null, + "constraint": "NULL", + "ordinalPosition": 3 + } + ], + "tableConstraints": [ + { + "constraintType": "FOREIGN_KEY", + "columns": [ + "user_id" + ], + "referredColumns": [ + "mysql_sample.default.posts_db.Users.user_id" + ], + "relationshipType": "ONE_TO_ONE" + } + ], + "databaseSchema": { + "id": "3b2c045a-03ea-4303-abf3-082ac4e73804", + "type": "databaseSchema", + "name": "posts_db", + "fullyQualifiedName": "mysql_sample.default.posts_db", + "description": null, + "displayName": "posts_db", + "deleted": false, + "inherited": null + }, + "database": { + "id": "c1a9f3bf-8bb8-43e3-beeb-e4c2293b977a", + "type": "database", + "name": "default", + "fullyQualifiedName": "mysql_sample.default", + "description": null, + "displayName": "default", + "deleted": false, + "inherited": null + }, + "service": { + "id": "c0382692-7cf3-40b7-9aa7-b14bf2cbecdd", + "type": "databaseService", + "name": "mysql_sample", + "fullyQualifiedName": "mysql_sample", + "description": null, + "displayName": "mysql_sample", + "deleted": false, + "inherited": null + } + } + ] +} \ No newline at end of file diff --git a/ingestion/src/metadata/ingestion/source/database/common_db_source.py b/ingestion/src/metadata/ingestion/source/database/common_db_source.py index 024c59b9efd..b5ba49bc94f 100644 --- a/ingestion/src/metadata/ingestion/source/database/common_db_source.py +++ b/ingestion/src/metadata/ingestion/source/database/common_db_source.py @@ -11,13 +11,14 @@ """ Generic source to build SQL connectors. """ +import copy import math import time import traceback from abc import ABC from concurrent.futures import ThreadPoolExecutor from copy import deepcopy -from typing import Any, Iterable, List, Optional, Tuple, Union, cast +from typing import Any, Dict, Iterable, List, Optional, Tuple, Union, cast from pydantic import BaseModel from sqlalchemy.engine import Connection @@ -38,6 +39,7 @@ from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema from metadata.generated.schema.entity.data.table import ( + Column, ConstraintType, Table, TableConstraint, @@ -63,6 +65,7 @@ from metadata.ingestion.api.models import Either from metadata.ingestion.connections.session import create_and_bind_thread_safe_session from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification from metadata.ingestion.models.ometa_lineage import OMetaLineageRequest +from metadata.ingestion.models.patch_request import PatchedEntity, PatchRequest from metadata.ingestion.models.topology import Queue from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.connections import ( @@ -75,6 +78,7 @@ from metadata.ingestion.source.database.sqlalchemy_source import SqlAlchemySourc from metadata.ingestion.source.database.stored_procedures_mixin import QueryByProcedure from metadata.ingestion.source.models import TableView from metadata.utils import fqn +from metadata.utils.constraints import get_relationship_type from metadata.utils.db_utils import get_view_lineage from metadata.utils.execution_time_tracker import ( ExecutionTimeTrackerContextMap, @@ -88,6 +92,13 @@ from metadata.utils.ssl_manager import SSLManager, check_ssl_and_init logger = ingestion_logger() +class ColumnAndReferredColumn(BaseModel): + table_name: str + schema_name: str + db_name: Optional[str] + column: Dict + + class TableNameAndType(BaseModel): """ Helper model for passing down @@ -141,6 +152,7 @@ class CommonDbSourceService( self.database_source_state = set() self.context.get_global().table_views = [] self.context.get_global().table_constrains = [] + self.context.get_global().foreign_tables = [] self.context.set_threads(self.source_config.threads) super().__init__() @@ -518,7 +530,12 @@ class CommonDbSourceService( ) table_constraints = self.update_table_constraints( - table_constraints, foreign_columns + schema_name=schema_name, + table_name=table_name, + db_name=self.context.get().database, + table_constraints=table_constraints, + foreign_columns=foreign_columns, + columns=columns, ) description = ( @@ -697,7 +714,74 @@ class CommonDbSourceService( else: yield from self._process_view_def_serial() - def _get_foreign_constraints(self, foreign_columns) -> List[TableConstraint]: + def _prepare_foreign_constraints( # pylint: disable=too-many-arguments, too-many-locals + self, + supports_database: bool, + column: Dict, + table_name: str, + schema_name: str, + db_name: str, + columns: List[Column], + add_to_global: bool = True, + ): + """ + Method to prepare the foreign constraints + """ + referred_column_fqns = [] + if supports_database: + database_name = column.get("referred_database") + else: + database_name = self.context.get().database + referred_table_fqn = fqn.build( + metadata=self.metadata, + entity_type=Table, + table_name=column.get("referred_table"), + schema_name=column.get("referred_schema"), + database_name=database_name, + service_name=self.context.get().database_service, + ) + referred_table = self.metadata.get_by_name(entity=Table, fqn=referred_table_fqn) + if referred_table: + for referred_column in column.get("referred_columns"): + col_fqn = fqn._build( # pylint: disable=protected-access + referred_table_fqn, referred_column, quote=False + ) + if col_fqn: + referred_column_fqns.append(FullyQualifiedEntityName(col_fqn)) + else: + if add_to_global: + column_and_referred_columns = ColumnAndReferredColumn( + table_name=table_name, + schema_name=schema_name, + db_name=db_name, + column=column, + ) + self.context.get_global().foreign_tables.append( + column_and_referred_columns + ) + return None + relationship_type = None + if referred_table: + relationship_type = get_relationship_type( + column, # sqlalchemy foreign column + referred_table.columns, # referred table columns + columns, # current table om columns + ) + return TableConstraint( + constraintType=ConstraintType.FOREIGN_KEY, + columns=column.get("constrained_columns"), + referredColumns=referred_column_fqns, + relationshipType=relationship_type, + ) + + def _get_foreign_constraints( + self, + table_name, + schema_name, + db_name, + foreign_columns: List[Dict], + columns: List[Column], + ) -> List[TableConstraint]: """ Search the referred table for foreign constraints and get referred column fqn @@ -706,48 +790,31 @@ class CommonDbSourceService( foreign_constraints = [] for column in foreign_columns: - referred_column_fqns = [] - if supports_database: - database_name = column.get("referred_database") - else: - database_name = self.context.get().database - referred_table_fqn = fqn.build( - metadata=self.metadata, - entity_type=Table, - table_name=column.get("referred_table"), - schema_name=column.get("referred_schema"), - database_name=database_name, - service_name=self.context.get().database_service, - ) - if referred_table_fqn: - for referred_column in column.get("referred_columns"): - col_fqn = fqn._build( - referred_table_fqn, referred_column, quote=False - ) - if col_fqn: - referred_column_fqns.append(FullyQualifiedEntityName(col_fqn)) - else: - # do not build partial foreign constraint. It will updated in next run. - continue - foreign_constraints.append( - TableConstraint( - constraintType=ConstraintType.FOREIGN_KEY, - columns=column.get("constrained_columns"), - referredColumns=referred_column_fqns, - ) + foreign_constraint = self._prepare_foreign_constraints( + supports_database, column, table_name, schema_name, db_name, columns ) + if foreign_constraint: + foreign_constraints.append(foreign_constraint) return foreign_constraints @calculate_execution_time() def update_table_constraints( - self, table_constraints, foreign_columns + self, + table_name, + schema_name, + db_name, + table_constraints, + foreign_columns, + columns, ) -> List[TableConstraint]: """ From topology. process the table constraints of all tables """ - foreign_table_constraints = self._get_foreign_constraints(foreign_columns) + foreign_table_constraints = self._get_foreign_constraints( + table_name, schema_name, db_name, foreign_columns, columns + ) if foreign_table_constraints: if table_constraints: table_constraints.extend(foreign_table_constraints) @@ -816,3 +883,55 @@ class CommonDbSourceService( """ By default the source url is not supported for """ + + def yield_table_constraints(self) -> Iterable[Either[PatchedEntity]]: + """ + Process remaining table constraints by patching the table + """ + supports_database = hasattr(self.service_connection, "supportsDatabase") + + for foreign_table in self.context.get_global().foreign_tables or []: + try: + foreign_constraints = [] + table_fqn = fqn.build( + metadata=self.metadata, + entity_type=Table, + service_name=self.context.get().database_service, + database_name=foreign_table.db_name, + schema_name=foreign_table.schema_name, + table_name=foreign_table.table_name, + ) + table = self.metadata.get_by_name(entity=Table, fqn=table_fqn) + if table: + foreign_constraint = self._prepare_foreign_constraints( + supports_database, + foreign_table.column, + foreign_table.table_name, + foreign_table.schema_name, + foreign_table.db_name, + table.columns, + False, + ) + if foreign_constraint: + foreign_constraints.append(foreign_constraint) + + # send the patch request + if foreign_constraints: + new_entity = copy.deepcopy(table) + new_entity.tableConstraints = ( + new_entity.tableConstraints or [] + ) + foreign_constraints + patch_request = PatchRequest( + original_entity=table, + new_entity=new_entity, + override_metadata=True, + ) + yield Either(right=patch_request) + except Exception as exc: + yield Either( + left=StackTraceError( + name=str(foreign_table.table_name), + error=f"Error to yield tableConstraints for {str(foreign_table.table_name)}: {exc}", + stackTrace=traceback.format_exc(), + ) + ) diff --git a/ingestion/src/metadata/ingestion/source/database/database_service.py b/ingestion/src/metadata/ingestion/source/database/database_service.py index 3a24806054f..c2dd0368b3b 100644 --- a/ingestion/src/metadata/ingestion/source/database/database_service.py +++ b/ingestion/src/metadata/ingestion/source/database/database_service.py @@ -118,6 +118,7 @@ class DatabaseServiceTopology(ServiceTopology): post_process=[ "yield_view_lineage", "yield_external_table_lineage", + "yield_table_constraints", ], ) database: Annotated[ @@ -352,7 +353,13 @@ class DatabaseServiceSource( """ def update_table_constraints( - self, table_constraints: List[TableConstraint], foreign_columns: [] + self, + table_name, + schema_name, + db_name, + table_constraints: List[TableConstraint], + foreign_columns: [], + columns, ) -> List[TableConstraint]: """ process the table constraints of all tables @@ -538,7 +545,7 @@ class DatabaseServiceSource( self.inspector, "get_table_owner" ): owner_name = self.inspector.get_table_owner( - connection=self.connection, # pylint: disable=no-member.fetchall() + connection=self.connection, # pylint: disable=no-member table_name=table_name, schema=self.context.get().database_schema, ) @@ -609,6 +616,11 @@ class DatabaseServiceSource( Process external table lineage """ + def yield_table_constraints(self) -> Iterable[Either[AddLineageRequest]]: + """ + Process remaining table constraints by patching the table + """ + def test_connection(self) -> None: test_connection_fn = get_test_connection_fn(self.service_connection) result = test_connection_fn( diff --git a/ingestion/src/metadata/ingestion/source/database/sample_data.py b/ingestion/src/metadata/ingestion/source/database/sample_data.py index 013f8af36c1..1a833cba569 100644 --- a/ingestion/src/metadata/ingestion/source/database/sample_data.py +++ b/ingestion/src/metadata/ingestion/source/database/sample_data.py @@ -242,6 +242,41 @@ class SampleDataSource( entity=DatabaseService, config=WorkflowSource(**self.glue_database_service_json), ) + + # MYSQL service for er diagrams + self.mysql_database_service_json = json.load( + open( # pylint: disable=consider-using-with + sample_data_folder + "/mysql/database_service.json", + "r", + encoding=UTF_8, + ) + ) + self.mysql_database = json.load( + open( # pylint: disable=consider-using-with + sample_data_folder + "/mysql/database.json", + "r", + encoding=UTF_8, + ) + ) + self.mysql_database_schema = json.load( + open( # pylint: disable=consider-using-with + sample_data_folder + "/mysql/database_schema.json", + "r", + encoding=UTF_8, + ) + ) + self.mysql_tables = json.load( + open( # pylint: disable=consider-using-with + sample_data_folder + "/mysql/tables.json", + "r", + encoding=UTF_8, + ) + ) + self.mysql_database_service = self.metadata.get_service_or_create( + entity=DatabaseService, + config=WorkflowSource(**self.mysql_database_service_json), + ) + self.database_service_json = json.load( open( # pylint: disable=consider-using-with sample_data_folder + "/datasets/service.json", @@ -615,6 +650,7 @@ class SampleDataSource( yield from self.ingest_users() yield from self.ingest_tables() yield from self.ingest_glue() + yield from self.ingest_mysql() yield from self.ingest_stored_procedures() yield from self.ingest_topics() yield from self.ingest_charts() @@ -666,6 +702,55 @@ class SampleDataSource( yield Either(right=team_to_ingest) + def ingest_mysql(self) -> Iterable[Either[Entity]]: + """Ingest Sample Data for mysql database source including ER diagrams metadata""" + + db = CreateDatabaseRequest( + name=self.mysql_database["name"], + service=self.mysql_database_service.fullyQualifiedName, + ) + + yield Either(right=db) + + database_entity = fqn.build( + self.metadata, + entity_type=Database, + service_name=self.mysql_database_service.fullyQualifiedName.root, + database_name=db.name.root, + ) + + database_object = self.metadata.get_by_name( + entity=Database, fqn=database_entity + ) + schema = CreateDatabaseSchemaRequest( + name=self.mysql_database_schema["name"], + database=database_object.fullyQualifiedName, + ) + yield Either(right=schema) + + database_schema_entity = fqn.build( + self.metadata, + entity_type=DatabaseSchema, + service_name=self.mysql_database_service.fullyQualifiedName.root, + database_name=db.name.root, + schema_name=schema.name.root, + ) + + database_schema_object = self.metadata.get_by_name( + entity=DatabaseSchema, fqn=database_schema_entity + ) + + for table in self.mysql_tables["tables"]: + table_request = CreateTableRequest( + name=table["name"], + description=table["description"], + columns=table["columns"], + databaseSchema=database_schema_object.fullyQualifiedName, + tableConstraints=table.get("tableConstraints"), + tableType=table["tableType"], + ) + yield Either(right=table_request) + def ingest_glue(self) -> Iterable[Either[Entity]]: """Ingest Sample Data for glue database source""" diff --git a/ingestion/src/metadata/ingestion/source/database/unitycatalog/metadata.py b/ingestion/src/metadata/ingestion/source/database/unitycatalog/metadata.py index e3ed35bd9b7..f738e3c883a 100644 --- a/ingestion/src/metadata/ingestion/source/database/unitycatalog/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/unitycatalog/metadata.py @@ -325,7 +325,7 @@ class UnitycatalogSource( ) = self.get_table_constraints(table.table_constraints) table_constraints = self.update_table_constraints( - primary_constraints, foreign_constraints + primary_constraints, foreign_constraints, columns ) table_request = CreateTableRequest( @@ -436,7 +436,7 @@ class UnitycatalogSource( return table_constraints def update_table_constraints( - self, table_constraints, foreign_columns + self, table_constraints, foreign_columns, columns ) -> List[TableConstraint]: """ From topology. diff --git a/ingestion/src/metadata/utils/constraints.py b/ingestion/src/metadata/utils/constraints.py new file mode 100644 index 00000000000..a45cc11e8cc --- /dev/null +++ b/ingestion/src/metadata/utils/constraints.py @@ -0,0 +1,65 @@ +# Copyright 2024 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Define constraints helper methods useful for the metadata ingestion +""" + +from typing import Dict, List + +from metadata.generated.schema.entity.data.table import ( + Column, + ConstraintType, + RelationshipType, +) +from metadata.ingestion.ometa.utils import model_str + + +def _is_column_unique(column: Dict, columns: List[Column]) -> bool: + """ + Method to check if the column in unique in the table + """ + if column and len(column) > 0: + constrained_column = column[0] + for col in columns or []: + if model_str(col.name) == constrained_column: + if col.constraint and col.constraint.value in { + ConstraintType.UNIQUE.value, + ConstraintType.PRIMARY_KEY.value, + }: + return True + break + return False + + +def get_relationship_type( + column: Dict, referred_table_columns: List[Column], columns: List[Column] +) -> str: + """ + Determine the type of relationship (one-to-one, one-to-many, etc.) + """ + # Check if the column is unique in the current table + is_unique_in_current_table = _is_column_unique( + column.get("constrained_columns"), columns + ) + + # Check if the referred column is unique in the referred table + is_unique_in_referred_table = _is_column_unique( + column.get("referred_columns"), referred_table_columns + ) + + if is_unique_in_current_table and is_unique_in_referred_table: + return RelationshipType.ONE_TO_ONE + if is_unique_in_current_table: + return RelationshipType.ONE_TO_MANY + if is_unique_in_referred_table: + return RelationshipType.MANY_TO_ONE + return RelationshipType.MANY_TO_MANY diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/databases/DatabaseSchemaResource.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/databases/DatabaseSchemaResource.java index fbfdf611f78..67fb0c540bd 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/databases/DatabaseSchemaResource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/databases/DatabaseSchemaResource.java @@ -15,6 +15,7 @@ package org.openmetadata.service.resources.databases; import static org.openmetadata.common.utils.CommonUtil.listOf; +import es.org.elasticsearch.action.search.SearchResponse; import io.swagger.v3.oas.annotations.ExternalDocumentation; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.Parameter; @@ -644,6 +645,42 @@ public class DatabaseSchemaResource return addHref(uriInfo, databaseSchema); } + @GET + @Path("/entityRelationship") + @Operation( + operationId = "searchSchemaEntityRelationship", + summary = "Search Schema Entity Relationship", + responses = { + @ApiResponse( + responseCode = "200", + description = "search response", + content = + @Content( + mediaType = "application/json", + schema = @Schema(implementation = SearchResponse.class))) + }) + public Response searchSchemaEntityRelationship( + @Context UriInfo uriInfo, + @Context SecurityContext securityContext, + @Parameter(description = "fqn") @QueryParam("fqn") String fqn, + @Parameter(description = "upstreamDepth") @QueryParam("upstreamDepth") int upstreamDepth, + @Parameter(description = "downstreamDepth") @QueryParam("downstreamDepth") + int downstreamDepth, + @Parameter( + description = + "Elasticsearch query that will be combined with the query_string query generator from the `query` argument") + @QueryParam("query_filter") + String queryFilter, + @Parameter(description = "Filter documents by deleted param. By default deleted is false") + @QueryParam("includeDeleted") + @DefaultValue("false") + boolean deleted) + throws IOException { + + return Entity.getSearchRepository() + .searchSchemaEntityRelationship(fqn, upstreamDepth, downstreamDepth, queryFilter, deleted); + } + private DatabaseSchema getDatabaseSchema(CreateDatabaseSchema create, String user) { return repository .copy(new DatabaseSchema(), create, user) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/databases/TableResource.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/databases/TableResource.java index d213a49df1d..25ccb933bf3 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/databases/TableResource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/databases/TableResource.java @@ -15,6 +15,7 @@ package org.openmetadata.service.resources.databases; import static org.openmetadata.common.utils.CommonUtil.listOf; +import es.org.elasticsearch.action.search.SearchResponse; import io.swagger.v3.oas.annotations.ExternalDocumentation; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.Parameter; @@ -127,7 +128,8 @@ public class TableResource extends EntityResource { MetadataOperation.EDIT_QUERIES, MetadataOperation.EDIT_DATA_PROFILE, MetadataOperation.EDIT_SAMPLE_DATA, - MetadataOperation.EDIT_LINEAGE); + MetadataOperation.EDIT_LINEAGE, + MetadataOperation.EDIT_ENTITY_RELATIONSHIP); } public static class TableList extends ResultList { @@ -1219,6 +1221,42 @@ public class TableResource extends EntityResource { .toResponse(); } + @GET + @Path("/entityRelationship") + @Operation( + operationId = "searchEntityRelationship", + summary = "Search Entity Relationship", + responses = { + @ApiResponse( + responseCode = "200", + description = "search response", + content = + @Content( + mediaType = "application/json", + schema = @Schema(implementation = SearchResponse.class))) + }) + public Response searchEntityRelationship( + @Context UriInfo uriInfo, + @Context SecurityContext securityContext, + @Parameter(description = "fqn") @QueryParam("fqn") String fqn, + @Parameter(description = "upstreamDepth") @QueryParam("upstreamDepth") int upstreamDepth, + @Parameter(description = "downstreamDepth") @QueryParam("downstreamDepth") + int downstreamDepth, + @Parameter( + description = + "Elasticsearch query that will be combined with the query_string query generator from the `query` argument") + @QueryParam("query_filter") + String queryFilter, + @Parameter(description = "Filter documents by deleted param. By default deleted is false") + @QueryParam("includeDeleted") + @DefaultValue("false") + boolean deleted) + throws IOException { + + return Entity.getSearchRepository() + .searchEntityRelationship(fqn, upstreamDepth, downstreamDepth, queryFilter, deleted); + } + public static Table validateNewTable(Table table) { table.setId(UUID.randomUUID()); DatabaseUtil.validateConstraints(table.getColumns(), table.getTableConstraints()); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchClient.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchClient.java index a30209d0037..7971e837d34 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchClient.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchClient.java @@ -6,6 +6,7 @@ import java.io.IOException; import java.security.KeyStoreException; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import javax.json.JsonArray; @@ -77,6 +78,12 @@ public interface SearchClient { String ADD_UPDATE_LINEAGE = "boolean docIdExists = false; for (int i = 0; i < ctx._source.lineage.size(); i++) { if (ctx._source.lineage[i].doc_id.equalsIgnoreCase(params.lineageData.doc_id)) { ctx._source.lineage[i] = params.lineageData; docIdExists = true; break;}}if (!docIdExists) {ctx._source.lineage.add(params.lineageData);}"; + + // The script is used for updating the entityRelationship attribute of the entity in ES + // It checks if any duplicate entry is present based on the doc_id and updates only if it is not + // present + String ADD_UPDATE_ENTITY_RELATIONSHIP = + "boolean docIdExists = false; for (int i = 0; i < ctx._source.entityRelationship.size(); i++) { if (ctx._source.entityRelationship[i].doc_id.equalsIgnoreCase(params.entityRelationshipData.doc_id)) { ctx._source.entityRelationship[i] = params.entityRelationshipData; docIdExists = true; break;}}if (!docIdExists) {ctx._source.entityRelationship.add(params.entityRelationshipData);}"; String UPDATE_ADDED_DELETE_GLOSSARY_TAGS = "if (ctx._source.tags != null) { for (int i = ctx._source.tags.size() - 1; i >= 0; i--) { if (params.tagDeleted != null) { for (int j = 0; j < params.tagDeleted.size(); j++) { if (ctx._source.tags[i].tagFQN.equalsIgnoreCase(params.tagDeleted[j].tagFQN)) { ctx._source.tags.remove(i); } } } } } if (ctx._source.tags == null) { ctx._source.tags = []; } if (params.tagAdded != null) { ctx._source.tags.addAll(params.tagAdded); } ctx._source.tags = ctx._source.tags .stream() .distinct() .sorted((o1, o2) -> o1.tagFQN.compareTo(o2.tagFQN)) .collect(Collectors.toList());"; String REMOVE_TEST_SUITE_CHILDREN_SCRIPT = @@ -98,6 +105,37 @@ public interface SearchClient { String NOT_IMPLEMENTED_ERROR_TYPE = "NOT_IMPLEMENTED"; + String ENTITY_RELATIONSHIP_DIRECTION_ENTITY = "entityRelationship.entity.fqnHash.keyword"; + + String ENTITY_RELATIONSHIP_DIRECTION_RELATED_ENTITY = + "entityRelationship.relatedEntity.fqnHash.keyword"; + + Set FIELDS_TO_REMOVE_ENTITY_RELATIONSHIP = + Set.of( + "suggest", + "service_suggest", + "column_suggest", + "schema_suggest", + "database_suggest", + "lifeCycle", + "fqnParts", + "chart_suggest", + "field_suggest", + "lineage", + "entityRelationship", + "customMetrics", + "descriptionStatus", + "columnNames", + "totalVotes", + "usageSummary", + "dataProducts", + "tags", + "followers", + "domain", + "votes", + "tier", + "changeDescription"); + boolean isClientAvailable(); ElasticSearchConfiguration.SearchType getSearchType(); @@ -142,9 +180,17 @@ public interface SearchClient { String entityType) throws IOException; + Response searchEntityRelationship( + String fqn, int upstreamDepth, int downstreamDepth, String queryFilter, boolean deleted) + throws IOException; + Response searchDataQualityLineage( String fqn, int upstreamDepth, String queryFilter, boolean deleted) throws IOException; + Response searchSchemaEntityRelationship( + String fqn, int upstreamDepth, int downstreamDepth, String queryFilter, boolean deleted) + throws IOException; + /* Used for listing knowledge page hierarchy for a given parent and page type, used in Elastic/Open SearchClientExtension */ @@ -211,6 +257,11 @@ public interface SearchClient { void updateLineage( String indexName, Pair fieldAndValue, Map lineagaData); + void updateEntityRelationship( + String indexName, + Pair fieldAndValue, + Map entityRelationshipData); + Response listDataInsightChartResult( Long startTs, Long endTs, diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java index 283f829d427..1a39cdc71b6 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java @@ -837,11 +837,25 @@ public class SearchRepository { fqn, upstreamDepth, downstreamDepth, queryFilter, deleted, entityType); } + public Response searchEntityRelationship( + String fqn, int upstreamDepth, int downstreamDepth, String queryFilter, boolean deleted) + throws IOException { + return searchClient.searchEntityRelationship( + fqn, upstreamDepth, downstreamDepth, queryFilter, deleted); + } + public Response searchDataQualityLineage( String fqn, int upstreamDepth, String queryFilter, boolean deleted) throws IOException { return searchClient.searchDataQualityLineage(fqn, upstreamDepth, queryFilter, deleted); } + public Response searchSchemaEntityRelationship( + String fqn, int upstreamDepth, int downstreamDepth, String queryFilter, boolean deleted) + throws IOException { + return searchClient.searchSchemaEntityRelationship( + fqn, upstreamDepth, downstreamDepth, queryFilter, deleted); + } + public Map searchLineageForExport( String fqn, int upstreamDepth, diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchClient.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchClient.java index 75766cde51c..59f13145be8 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchClient.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchClient.java @@ -12,6 +12,7 @@ import static org.openmetadata.service.Entity.FIELD_NAME; import static org.openmetadata.service.Entity.GLOSSARY_TERM; import static org.openmetadata.service.Entity.QUERY; import static org.openmetadata.service.Entity.RAW_COST_ANALYSIS_REPORT_DATA; +import static org.openmetadata.service.Entity.TABLE; import static org.openmetadata.service.exception.CatalogGenericExceptionMapper.getResponse; import static org.openmetadata.service.search.EntityBuilderConstant.API_RESPONSE_SCHEMA_FIELD; import static org.openmetadata.service.search.EntityBuilderConstant.API_RESPONSE_SCHEMA_FIELD_KEYWORD; @@ -146,6 +147,7 @@ import org.openmetadata.schema.dataInsight.custom.DataInsightCustomChart; import org.openmetadata.schema.dataInsight.custom.DataInsightCustomChartResultList; import org.openmetadata.schema.dataInsight.custom.FormulaHolder; import org.openmetadata.schema.entity.data.EntityHierarchy__1; +import org.openmetadata.schema.entity.data.Table; import org.openmetadata.schema.service.configuration.elasticsearch.ElasticSearchConfiguration; import org.openmetadata.schema.tests.DataQualityReport; import org.openmetadata.schema.type.EntityReference; @@ -156,6 +158,8 @@ import org.openmetadata.service.Entity; import org.openmetadata.service.dataInsight.DataInsightAggregatorInterface; import org.openmetadata.service.jdbi3.DataInsightChartRepository; import org.openmetadata.service.jdbi3.DataInsightSystemChartRepository; +import org.openmetadata.service.jdbi3.ListFilter; +import org.openmetadata.service.jdbi3.TableRepository; import org.openmetadata.service.jdbi3.TestCaseResultRepository; import org.openmetadata.service.search.SearchAggregation; import org.openmetadata.service.search.SearchClient; @@ -749,6 +753,123 @@ public class ElasticSearchClient implements SearchClient { return Response.status(OK).entity(responseMap).build(); } + private void getEntityRelationship( + String fqn, + int depth, + Set> edges, + Set> nodes, + String queryFilter, + String direction, + boolean deleted) + throws IOException { + if (depth <= 0) { + return; + } + es.org.elasticsearch.action.search.SearchRequest searchRequest = + new es.org.elasticsearch.action.search.SearchRequest( + Entity.getSearchRepository().getIndexOrAliasName(GLOBAL_SEARCH_ALIAS)); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query( + QueryBuilders.boolQuery() + .must(QueryBuilders.termQuery(direction, FullyQualifiedName.buildHash(fqn)))); + if (CommonUtil.nullOrEmpty(deleted)) { + searchSourceBuilder.query( + QueryBuilders.boolQuery() + .must(QueryBuilders.termQuery(direction, FullyQualifiedName.buildHash(fqn))) + .must(QueryBuilders.termQuery("deleted", deleted))); + } + if (!nullOrEmpty(queryFilter) && !queryFilter.equals("{}")) { + try { + XContentParser filterParser = + XContentType.JSON + .xContent() + .createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, queryFilter); + es.org.elasticsearch.index.query.QueryBuilder filter = + SearchSourceBuilder.fromXContent(filterParser).query(); + es.org.elasticsearch.index.query.BoolQueryBuilder newQuery = + QueryBuilders.boolQuery().must(searchSourceBuilder.query()).filter(filter); + searchSourceBuilder.query(newQuery); + } catch (Exception ex) { + LOG.warn("Error parsing query_filter from query parameters, ignoring filter", ex); + } + } + searchRequest.source(searchSourceBuilder.size(1000)); + SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT); + for (var hit : searchResponse.getHits().getHits()) { + List> entityRelationship = + (List>) hit.getSourceAsMap().get("entityRelationship"); + HashMap tempMap = new HashMap<>(JsonUtils.getMap(hit.getSourceAsMap())); + tempMap.keySet().removeAll(FIELDS_TO_REMOVE_ENTITY_RELATIONSHIP); + nodes.add(tempMap); + for (Map er : entityRelationship) { + Map entity = (HashMap) er.get("entity"); + Map relatedEntity = (HashMap) er.get("relatedEntity"); + if (direction.equalsIgnoreCase(ENTITY_RELATIONSHIP_DIRECTION_ENTITY)) { + if (!edges.contains(er) && entity.get("fqn").equals(fqn)) { + edges.add(er); + getEntityRelationship( + relatedEntity.get("fqn"), depth - 1, edges, nodes, queryFilter, direction, deleted); + } + } else { + if (!edges.contains(er) && relatedEntity.get("fqn").equals(fqn)) { + edges.add(er); + getEntityRelationship( + entity.get("fqn"), depth - 1, edges, nodes, queryFilter, direction, deleted); + } + } + } + } + } + + public Map searchEntityRelationshipInternal( + String fqn, int upstreamDepth, int downstreamDepth, String queryFilter, boolean deleted) + throws IOException { + Map responseMap = new HashMap<>(); + Set> edges = new HashSet<>(); + Set> nodes = new HashSet<>(); + es.org.elasticsearch.action.search.SearchRequest searchRequest = + new es.org.elasticsearch.action.search.SearchRequest( + Entity.getSearchRepository().getIndexOrAliasName(GLOBAL_SEARCH_ALIAS)); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query( + QueryBuilders.boolQuery().must(QueryBuilders.termQuery("fullyQualifiedName", fqn))); + searchRequest.source(searchSourceBuilder.size(1000)); + SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT); + for (var hit : searchResponse.getHits().getHits()) { + Map tempMap = new HashMap<>(JsonUtils.getMap(hit.getSourceAsMap())); + tempMap.keySet().removeAll(FIELDS_TO_REMOVE); + responseMap.put("entity", tempMap); + } + getEntityRelationship( + fqn, + downstreamDepth, + edges, + nodes, + queryFilter, + ENTITY_RELATIONSHIP_DIRECTION_ENTITY, + deleted); + getEntityRelationship( + fqn, + upstreamDepth, + edges, + nodes, + queryFilter, + ENTITY_RELATIONSHIP_DIRECTION_RELATED_ENTITY, + deleted); + responseMap.put("edges", edges); + responseMap.put("nodes", nodes); + return responseMap; + } + + @Override + public Response searchEntityRelationship( + String fqn, int upstreamDepth, int downstreamDepth, String queryFilter, boolean deleted) + throws IOException { + Map responseMap = + searchEntityRelationshipInternal(fqn, upstreamDepth, downstreamDepth, queryFilter, deleted); + return Response.status(OK).entity(responseMap).build(); + } + @Override public Response searchDataQualityLineage( String fqn, int upstreamDepth, String queryFilter, boolean deleted) throws IOException { @@ -761,6 +882,80 @@ public class ElasticSearchClient implements SearchClient { return Response.status(OK).entity(responseMap).build(); } + public Map searchSchemaEntityRelationshipInternal( + String fqn, int upstreamDepth, int downstreamDepth, String queryFilter, boolean deleted) + throws IOException { + Map responseMap = new HashMap<>(); + Set> edges = new HashSet<>(); + Set> nodes = new HashSet<>(); + es.org.elasticsearch.action.search.SearchRequest searchRequest = + new es.org.elasticsearch.action.search.SearchRequest( + Entity.getSearchRepository().getIndexOrAliasName(GLOBAL_SEARCH_ALIAS)); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query( + QueryBuilders.boolQuery().must(QueryBuilders.termQuery("fullyQualifiedName", fqn))); + searchRequest.source(searchSourceBuilder.size(1000)); + SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT); + for (var hit : searchResponse.getHits().getHits()) { + Map tempMap = new HashMap<>(JsonUtils.getMap(hit.getSourceAsMap())); + tempMap.keySet().removeAll(FIELDS_TO_REMOVE); + responseMap.put("entity", tempMap); + } + TableRepository repository = (TableRepository) Entity.getEntityRepository(TABLE); + ListFilter filter = new ListFilter(Include.NON_DELETED).addQueryParam("databaseSchema", fqn); + List
tables = + repository.listAll(repository.getFields("tableConstraints, displayName, owners"), filter); + for (Table table : tables) { + getEntityRelationship( + table.getFullyQualifiedName(), + downstreamDepth, + edges, + nodes, + queryFilter, + ENTITY_RELATIONSHIP_DIRECTION_ENTITY, + deleted); + getEntityRelationship( + table.getFullyQualifiedName(), + upstreamDepth, + edges, + nodes, + queryFilter, + ENTITY_RELATIONSHIP_DIRECTION_RELATED_ENTITY, + deleted); + } + // Add the remaining tables from the list into the nodes + // These will the one's that do not have any entity relationship + for (Table table : tables) { + boolean tablePresent = false; + for (Map node : nodes) { + if (table.getId().toString().equals(node.get("id"))) { + tablePresent = true; + break; + } + } + if (!tablePresent) { + HashMap tableMap = new HashMap<>(JsonUtils.getMap(table)); + tableMap.keySet().removeAll(FIELDS_TO_REMOVE_ENTITY_RELATIONSHIP); + tableMap.put("entityType", "table"); + nodes.add(tableMap); + } + } + + responseMap.put("edges", edges); + responseMap.put("nodes", nodes); + return responseMap; + } + + @Override + public Response searchSchemaEntityRelationship( + String fqn, int upstreamDepth, int downstreamDepth, String queryFilter, boolean deleted) + throws IOException { + Map responseMap = + searchSchemaEntityRelationshipInternal( + fqn, upstreamDepth, downstreamDepth, queryFilter, deleted); + return Response.status(OK).entity(responseMap).build(); + } + private void getLineage( String fqn, int depth, @@ -1850,6 +2045,28 @@ public class ElasticSearchClient implements SearchClient { } } + public void updateEntityRelationship( + String indexName, + Pair fieldAndValue, + Map entityRelationshipData) { + if (isClientAvailable) { + UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest(indexName); + updateByQueryRequest.setQuery( + new MatchQueryBuilder(fieldAndValue.getKey(), fieldAndValue.getValue()) + .operator(Operator.AND)); + Map params = + Collections.singletonMap("entityRelationshipData", entityRelationshipData); + Script script = + new Script( + ScriptType.INLINE, + Script.DEFAULT_SCRIPT_LANG, + ADD_UPDATE_ENTITY_RELATIONSHIP, + params); + updateByQueryRequest.setScript(script); + updateElasticSearchByQuery(updateByQueryRequest); + } + } + @Override public void updateLineage( String indexName, Pair fieldAndValue, Map lineageData) { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/SearchIndex.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/SearchIndex.java index 3f7dcd01435..05931f4ddaa 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/SearchIndex.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/SearchIndex.java @@ -1,13 +1,16 @@ package org.openmetadata.service.search.indexes; import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty; +import static org.openmetadata.schema.type.Include.NON_DELETED; import static org.openmetadata.service.Entity.FIELD_DESCRIPTION; import static org.openmetadata.service.Entity.FIELD_DISPLAY_NAME; +import static org.openmetadata.service.Entity.getEntityByName; import static org.openmetadata.service.jdbi3.LineageRepository.buildRelationshipDetailsMap; import static org.openmetadata.service.search.EntityBuilderConstant.DISPLAY_NAME_KEYWORD; import static org.openmetadata.service.search.EntityBuilderConstant.FIELD_DISPLAY_NAME_NGRAM; import static org.openmetadata.service.search.EntityBuilderConstant.FULLY_QUALIFIED_NAME; import static org.openmetadata.service.search.EntityBuilderConstant.FULLY_QUALIFIED_NAME_PARTS; +import static org.openmetadata.service.util.FullyQualifiedName.getParentFQN; import java.util.ArrayList; import java.util.Collections; @@ -16,14 +19,22 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; +import org.openmetadata.common.utils.CommonUtil; import org.openmetadata.schema.EntityInterface; +import org.openmetadata.schema.entity.data.Table; import org.openmetadata.schema.type.EntityReference; import org.openmetadata.schema.type.Include; import org.openmetadata.schema.type.LineageDetails; import org.openmetadata.schema.type.Relationship; +import org.openmetadata.schema.type.TableConstraint; import org.openmetadata.service.Entity; +import org.openmetadata.service.exception.EntityNotFoundException; import org.openmetadata.service.jdbi3.CollectionDAO; +import org.openmetadata.service.search.SearchClient; import org.openmetadata.service.search.SearchIndexUtils; +import org.openmetadata.service.search.models.IndexMapping; import org.openmetadata.service.search.models.SearchSuggest; import org.openmetadata.service.util.FullyQualifiedName; import org.openmetadata.service.util.JsonUtils; @@ -31,6 +42,7 @@ import org.openmetadata.service.util.JsonUtils; public interface SearchIndex { Set DEFAULT_EXCLUDED_FIELDS = Set.of("changeDescription", "lineage.pipeline.changeDescription", "connection"); + public static final SearchClient searchClient = Entity.getSearchRepository().getSearchClient(); default Map buildSearchIndexDoc() { // Build Index Doc @@ -162,6 +174,146 @@ public interface SearchIndex { return data; } + static List> populateEntityRelationshipData(Table entity) { + List> constraints = new ArrayList<>(); + if (CommonUtil.nullOrEmpty(entity.getTableConstraints())) { + return constraints; + } + for (TableConstraint tableConstraint : entity.getTableConstraints()) { + if (!tableConstraint + .getConstraintType() + .value() + .equalsIgnoreCase(TableConstraint.ConstraintType.FOREIGN_KEY.value())) { + continue; + } + for (String referredColumn : tableConstraint.getReferredColumns()) { + String relatedEntityFQN = getParentFQN(referredColumn); + Table relatedEntity; + try { + relatedEntity = getEntityByName(Entity.TABLE, relatedEntityFQN, "*", NON_DELETED); + IndexMapping destinationIndexMapping = + Entity.getSearchRepository() + .getIndexMapping(relatedEntity.getEntityReference().getType()); + String destinationIndexName = + destinationIndexMapping.getIndexName(Entity.getSearchRepository().getClusterAlias()); + Map relationshipsMap = buildRelationshipsMap(entity, relatedEntity); + int relatedEntityIndex = + checkRelatedEntity(relatedEntity.getFullyQualifiedName(), constraints); + if (relatedEntityIndex >= 0) { + updateExistingConstraint( + entity, + tableConstraint, + constraints.get(relatedEntityIndex), + destinationIndexName, + relatedEntity, + referredColumn); + } else { + addNewConstraint( + entity, + tableConstraint, + constraints, + relationshipsMap, + destinationIndexName, + relatedEntity, + referredColumn); + } + } catch (EntityNotFoundException ex) { + } + } + } + return constraints; + } + + static int checkRelatedEntity(String relatedEntityFQN, List> constraints) { + int index = 0; + for (Map constraint : constraints) { + Map relatedConstraintEntity = + (Map) constraint.get("relatedEntity"); + if (relatedConstraintEntity.get("fqn").equals(relatedEntityFQN)) { + return index; + } + index++; + } + return -1; + } + + private static Map buildRelationshipsMap( + EntityInterface entity, Table relatedEntity) { + Map relationshipsMap = new HashMap<>(); + relationshipsMap.put("entity", buildEntityRefMap(entity.getEntityReference())); + relationshipsMap.put("relatedEntity", buildEntityRefMap(relatedEntity.getEntityReference())); + relationshipsMap.put( + "doc_id", entity.getId().toString() + "-" + relatedEntity.getId().toString()); + return relationshipsMap; + } + + private static void updateRelatedEntityIndex( + String destinationIndexName, Table relatedEntity, Map constraint) { + Pair to = new ImmutablePair<>("_id", relatedEntity.getId().toString()); + searchClient.updateEntityRelationship(destinationIndexName, to, constraint); + } + + private static void updateExistingConstraint( + EntityInterface entity, + TableConstraint tableConstraint, + Map presentConstraint, + String destinationIndexName, + Table relatedEntity, + String referredColumn) { + for (String currentColumn : tableConstraint.getColumns()) { + if (currentColumn.equals(FullyQualifiedName.getColumnName(referredColumn))) { + String columnFQN = FullyQualifiedName.add(entity.getFullyQualifiedName(), currentColumn); + + Map columnMap = new HashMap<>(); + columnMap.put("columnFQN", columnFQN); + columnMap.put("relatedColumnFQN", referredColumn); + columnMap.put("relationshipType", tableConstraint.getRelationshipType()); + + List> presentColumns = + (List>) presentConstraint.get("columns"); + presentColumns.add(columnMap); + + updateRelatedEntityIndex(destinationIndexName, relatedEntity, presentConstraint); + break; + } + } + } + + private static void addNewConstraint( + EntityInterface entity, + TableConstraint tableConstraint, + List> constraints, + Map relationshipsMap, + String destinationIndexName, + Table relatedEntity, + String referredColumn) { + for (String currentColumn : tableConstraint.getColumns()) { + if (currentColumn.equals(FullyQualifiedName.getColumnName(referredColumn))) { + List> columns = new ArrayList<>(); + String columnFQN = FullyQualifiedName.add(entity.getFullyQualifiedName(), currentColumn); + + Map columnMap = new HashMap<>(); + columnMap.put("columnFQN", columnFQN); + columnMap.put("relatedColumnFQN", referredColumn); + columnMap.put("relationshipType", tableConstraint.getRelationshipType()); + columns.add(columnMap); + relationshipsMap.put("columns", columns); + constraints.add(JsonUtils.getMap(relationshipsMap)); + + updateRelatedEntityIndex(destinationIndexName, relatedEntity, relationshipsMap); + } + } + } + + static Map buildEntityRefMap(EntityReference entityRef) { + Map details = new HashMap<>(); + details.put("id", entityRef.getId().toString()); + details.put("type", entityRef.getType()); + details.put("fqn", entityRef.getFullyQualifiedName()); + details.put("fqnHash", FullyQualifiedName.buildHash(entityRef.getFullyQualifiedName())); + return details; + } + static Map getDefaultFields() { Map fields = new HashMap<>(); fields.put(DISPLAY_NAME_KEYWORD, 10.0f); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/TableIndex.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/TableIndex.java index d3565fdc88b..9fa515d7c6d 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/TableIndex.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/indexes/TableIndex.java @@ -103,6 +103,7 @@ public record TableIndex(Table table) implements ColumnIndex { doc.put("service", getEntityWithDisplayName(table.getService())); doc.put("database", getEntityWithDisplayName(table.getDatabase())); doc.put("lineage", SearchIndex.getLineageData(table.getEntityReference())); + doc.put("entityRelationship", SearchIndex.populateEntityRelationshipData(table)); doc.put("databaseSchema", getEntityWithDisplayName(table.getDatabaseSchema())); return doc; } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchClient.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchClient.java index b026e6d5cc8..537e47c01e2 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchClient.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchClient.java @@ -11,6 +11,7 @@ import static org.openmetadata.service.Entity.FIELD_DISPLAY_NAME; import static org.openmetadata.service.Entity.GLOSSARY_TERM; import static org.openmetadata.service.Entity.QUERY; import static org.openmetadata.service.Entity.RAW_COST_ANALYSIS_REPORT_DATA; +import static org.openmetadata.service.Entity.TABLE; import static org.openmetadata.service.exception.CatalogGenericExceptionMapper.getResponse; import static org.openmetadata.service.search.EntityBuilderConstant.API_RESPONSE_SCHEMA_FIELD; import static org.openmetadata.service.search.EntityBuilderConstant.API_RESPONSE_SCHEMA_FIELD_KEYWORD; @@ -66,6 +67,7 @@ import org.openmetadata.schema.dataInsight.custom.DataInsightCustomChart; import org.openmetadata.schema.dataInsight.custom.DataInsightCustomChartResultList; import org.openmetadata.schema.dataInsight.custom.FormulaHolder; import org.openmetadata.schema.entity.data.EntityHierarchy__1; +import org.openmetadata.schema.entity.data.Table; import org.openmetadata.schema.service.configuration.elasticsearch.ElasticSearchConfiguration; import org.openmetadata.schema.tests.DataQualityReport; import org.openmetadata.schema.type.EntityReference; @@ -76,6 +78,8 @@ import org.openmetadata.service.Entity; import org.openmetadata.service.dataInsight.DataInsightAggregatorInterface; import org.openmetadata.service.jdbi3.DataInsightChartRepository; import org.openmetadata.service.jdbi3.DataInsightSystemChartRepository; +import org.openmetadata.service.jdbi3.ListFilter; +import org.openmetadata.service.jdbi3.TableRepository; import org.openmetadata.service.jdbi3.TestCaseResultRepository; import org.openmetadata.service.search.SearchAggregation; import org.openmetadata.service.search.SearchClient; @@ -227,6 +231,7 @@ public class OpenSearchClient implements SearchClient { "fqnParts", "chart_suggest", "field_suggest"); + private static final List SOURCE_FIELDS_TO_EXCLUDE = Stream.concat(FIELDS_TO_REMOVE.stream(), Stream.of("schemaDefinition", "customMetrics")) .toList(); @@ -752,6 +757,123 @@ public class OpenSearchClient implements SearchClient { return Response.status(OK).entity(responseMap).build(); } + private void getEntityRelationship( + String fqn, + int depth, + Set> edges, + Set> nodes, + String queryFilter, + String direction, + boolean deleted) + throws IOException { + if (depth <= 0) { + return; + } + os.org.opensearch.action.search.SearchRequest searchRequest = + new os.org.opensearch.action.search.SearchRequest( + Entity.getSearchRepository().getIndexOrAliasName(GLOBAL_SEARCH_ALIAS)); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query( + QueryBuilders.boolQuery() + .must(QueryBuilders.termQuery(direction, FullyQualifiedName.buildHash(fqn)))); + if (CommonUtil.nullOrEmpty(deleted)) { + searchSourceBuilder.query( + QueryBuilders.boolQuery() + .must(QueryBuilders.termQuery(direction, FullyQualifiedName.buildHash(fqn))) + .must(QueryBuilders.termQuery("deleted", deleted))); + } + if (!nullOrEmpty(queryFilter) && !queryFilter.equals("{}")) { + try { + XContentParser filterParser = + XContentType.JSON + .xContent() + .createParser(X_CONTENT_REGISTRY, LoggingDeprecationHandler.INSTANCE, queryFilter); + QueryBuilder filter = SearchSourceBuilder.fromXContent(filterParser).query(); + BoolQueryBuilder newQuery = + QueryBuilders.boolQuery().must(searchSourceBuilder.query()).filter(filter); + searchSourceBuilder.query(newQuery); + } catch (Exception ex) { + LOG.warn("Error parsing query_filter from query parameters, ignoring filter", ex); + } + } + searchRequest.source(searchSourceBuilder.size(1000)); + os.org.opensearch.action.search.SearchResponse searchResponse = + client.search(searchRequest, RequestOptions.DEFAULT); + for (var hit : searchResponse.getHits().getHits()) { + List> entityRelationship = + (List>) hit.getSourceAsMap().get("entityRelationship"); + HashMap tempMap = new HashMap<>(JsonUtils.getMap(hit.getSourceAsMap())); + tempMap.keySet().removeAll(FIELDS_TO_REMOVE_ENTITY_RELATIONSHIP); + nodes.add(tempMap); + for (Map er : entityRelationship) { + Map entity = (HashMap) er.get("entity"); + Map relatedEntity = (HashMap) er.get("relatedEntity"); + if (direction.equalsIgnoreCase(ENTITY_RELATIONSHIP_DIRECTION_ENTITY)) { + if (!edges.contains(er) && entity.get("fqn").equals(fqn)) { + edges.add(er); + getEntityRelationship( + relatedEntity.get("fqn"), depth - 1, edges, nodes, queryFilter, direction, deleted); + } + } else { + if (!edges.contains(er) && relatedEntity.get("fqn").equals(fqn)) { + edges.add(er); + getEntityRelationship( + entity.get("fqn"), depth - 1, edges, nodes, queryFilter, direction, deleted); + } + } + } + } + } + + public Map searchEntityRelationshipInternal( + String fqn, int upstreamDepth, int downstreamDepth, String queryFilter, boolean deleted) + throws IOException { + Map responseMap = new HashMap<>(); + Set> edges = new HashSet<>(); + Set> nodes = new HashSet<>(); + os.org.opensearch.action.search.SearchRequest searchRequest = + new os.org.opensearch.action.search.SearchRequest( + Entity.getSearchRepository().getIndexOrAliasName(GLOBAL_SEARCH_ALIAS)); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query( + QueryBuilders.boolQuery().must(QueryBuilders.termQuery("fullyQualifiedName", fqn))); + searchRequest.source(searchSourceBuilder.size(1000)); + SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT); + for (var hit : searchResponse.getHits().getHits()) { + HashMap tempMap = new HashMap<>(JsonUtils.getMap(hit.getSourceAsMap())); + tempMap.keySet().removeAll(FIELDS_TO_REMOVE); + responseMap.put("entity", tempMap); + } + getEntityRelationship( + fqn, + downstreamDepth, + edges, + nodes, + queryFilter, + ENTITY_RELATIONSHIP_DIRECTION_ENTITY, + deleted); + getEntityRelationship( + fqn, + upstreamDepth, + edges, + nodes, + queryFilter, + ENTITY_RELATIONSHIP_DIRECTION_RELATED_ENTITY, + deleted); + responseMap.put("edges", edges); + responseMap.put("nodes", nodes); + return responseMap; + } + + @Override + public Response searchEntityRelationship( + String fqn, int upstreamDepth, int downstreamDepth, String queryFilter, boolean deleted) + throws IOException { + Map responseMap = + searchEntityRelationshipInternal(fqn, upstreamDepth, downstreamDepth, queryFilter, deleted); + return Response.status(OK).entity(responseMap).build(); + } + @Override public Response searchDataQualityLineage( String fqn, int upstreamDepth, String queryFilter, boolean deleted) throws IOException { @@ -764,6 +886,78 @@ public class OpenSearchClient implements SearchClient { return Response.status(OK).entity(responseMap).build(); } + public Map searchSchemaEntityRelationshipInternal( + String fqn, int upstreamDepth, int downstreamDepth, String queryFilter, boolean deleted) + throws IOException { + Map responseMap = new HashMap<>(); + Set> edges = new HashSet<>(); + Set> nodes = new HashSet<>(); + os.org.opensearch.action.search.SearchRequest searchRequest = + new os.org.opensearch.action.search.SearchRequest( + Entity.getSearchRepository().getIndexOrAliasName(GLOBAL_SEARCH_ALIAS)); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query( + QueryBuilders.boolQuery().must(QueryBuilders.termQuery("fullyQualifiedName", fqn))); + searchRequest.source(searchSourceBuilder.size(1000)); + SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT); + for (var hit : searchResponse.getHits().getHits()) { + HashMap tempMap = new HashMap<>(JsonUtils.getMap(hit.getSourceAsMap())); + tempMap.keySet().removeAll(FIELDS_TO_REMOVE); + responseMap.put("entity", tempMap); + } + TableRepository repository = (TableRepository) Entity.getEntityRepository(TABLE); + ListFilter filter = new ListFilter(Include.NON_DELETED).addQueryParam("databaseSchema", fqn); + List
tables = + repository.listAll(repository.getFields("tableConstraints, displayName, owners"), filter); + for (Table table : tables) { + getEntityRelationship( + table.getFullyQualifiedName(), + downstreamDepth, + edges, + nodes, + queryFilter, + ENTITY_RELATIONSHIP_DIRECTION_ENTITY, + deleted); + getEntityRelationship( + table.getFullyQualifiedName(), + upstreamDepth, + edges, + nodes, + queryFilter, + ENTITY_RELATIONSHIP_DIRECTION_RELATED_ENTITY, + deleted); + } + // Add the remaining tables from the list into the nodes + // These will the one's that do not have any entity relationship + for (Table table : tables) { + boolean tablePresent = false; + for (Map node : nodes) { + if (table.getId().toString().equals(node.get("id"))) { + tablePresent = true; + break; + } + } + if (!tablePresent) { + HashMap tableMap = new HashMap<>(JsonUtils.getMap(table)); + tableMap.keySet().removeAll(FIELDS_TO_REMOVE_ENTITY_RELATIONSHIP); + tableMap.put("entityType", "table"); + nodes.add(tableMap); + } + } + responseMap.put("edges", edges); + responseMap.put("nodes", nodes); + return responseMap; + } + + public Response searchSchemaEntityRelationship( + String fqn, int upstreamDepth, int downstreamDepth, String queryFilter, boolean deleted) + throws IOException { + Map responseMap = + searchSchemaEntityRelationshipInternal( + fqn, upstreamDepth, downstreamDepth, queryFilter, deleted); + return Response.status(OK).entity(responseMap).build(); + } + private void getLineage( String fqn, int depth, @@ -1839,6 +2033,29 @@ public class OpenSearchClient implements SearchClient { } } + @Override + public void updateEntityRelationship( + String indexName, + Pair fieldAndValue, + Map entityRelationshipData) { + if (isClientAvailable) { + UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest(indexName); + updateByQueryRequest.setQuery( + new MatchQueryBuilder(fieldAndValue.getKey(), fieldAndValue.getValue()) + .operator(Operator.AND)); + Map params = + Collections.singletonMap("entityRelationshipData", entityRelationshipData); + Script script = + new Script( + ScriptType.INLINE, + Script.DEFAULT_SCRIPT_LANG, + ADD_UPDATE_ENTITY_RELATIONSHIP, + params); + updateByQueryRequest.setScript(script); + updateOpenSearchByQuery(updateByQueryRequest); + } + } + @SneakyThrows private void updateOpenSearchByQuery(UpdateByQueryRequest updateByQueryRequest) { if (updateByQueryRequest != null && isClientAvailable) { diff --git a/openmetadata-service/src/main/resources/elasticsearch/en/table_index_mapping.json b/openmetadata-service/src/main/resources/elasticsearch/en/table_index_mapping.json index ff54f3fc718..403a21d1fdb 100644 --- a/openmetadata-service/src/main/resources/elasticsearch/en/table_index_mapping.json +++ b/openmetadata-service/src/main/resources/elasticsearch/en/table_index_mapping.json @@ -587,6 +587,9 @@ "lineage": { "type" : "object" }, + "entityRelationship": { + "type" : "object" + }, "serviceType": { "type": "keyword", "normalizer": "lowercase_normalizer" diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/data/table.json b/openmetadata-spec/src/main/resources/json/schema/entity/data/table.json index cef16bc5a84..3d20fcda1a3 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/data/table.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/data/table.json @@ -210,6 +210,15 @@ "$ref": "../../type/basic.json#/definitions/fullyQualifiedEntityName" }, "default": null + }, + "relationshipType": { + "type": "string", + "enum": [ + "ONE_TO_ONE", + "ONE_TO_MANY", + "MANY_TO_ONE", + "MANY_TO_MANY" + ] } }, "additionalProperties": false diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/policies/accessControl/resourceDescriptor.json b/openmetadata-spec/src/main/resources/json/schema/entity/policies/accessControl/resourceDescriptor.json index 315348886ee..5504785527e 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/policies/accessControl/resourceDescriptor.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/policies/accessControl/resourceDescriptor.json @@ -30,6 +30,7 @@ "EditDescription", "EditDisplayName", "EditLineage", + "EditEntityRelationship", "EditPolicy", "EditOwners", "EditQueries", diff --git a/openmetadata-ui/src/main/resources/ui/src/assets/svg/table-grey.svg b/openmetadata-ui/src/main/resources/ui/src/assets/svg/table-grey.svg index 1207d1ca6c5..73ee44b15a0 100644 --- a/openmetadata-ui/src/main/resources/ui/src/assets/svg/table-grey.svg +++ b/openmetadata-ui/src/main/resources/ui/src/assets/svg/table-grey.svg @@ -1 +1 @@ - \ No newline at end of file + \ No newline at end of file diff --git a/openmetadata-ui/src/main/resources/ui/src/components/Database/SchemaTab/SchemaTab.interfaces.ts b/openmetadata-ui/src/main/resources/ui/src/components/Database/SchemaTab/SchemaTab.interfaces.ts index beb6665b05a..f4c3bcc2d27 100644 --- a/openmetadata-ui/src/main/resources/ui/src/components/Database/SchemaTab/SchemaTab.interfaces.ts +++ b/openmetadata-ui/src/main/resources/ui/src/components/Database/SchemaTab/SchemaTab.interfaces.ts @@ -20,7 +20,7 @@ export type Props = { hasDescriptionEditAccess: boolean; hasTagEditAccess: boolean; isReadOnly?: boolean; + testCaseSummary?: TestSummary; onThreadLinkSelect: (value: string, threadType?: ThreadType) => void; onUpdate: (columns: Table['columns']) => Promise; - testCaseSummary?: TestSummary; }; diff --git a/openmetadata-ui/src/main/resources/ui/src/components/Database/SchemaTable/SchemaTable.component.tsx b/openmetadata-ui/src/main/resources/ui/src/components/Database/SchemaTable/SchemaTable.component.tsx index 8cec144a7bd..00da063ecf3 100644 --- a/openmetadata-ui/src/main/resources/ui/src/components/Database/SchemaTable/SchemaTable.component.tsx +++ b/openmetadata-ui/src/main/resources/ui/src/components/Database/SchemaTable/SchemaTable.component.tsx @@ -12,7 +12,7 @@ */ import { FilterOutlined } from '@ant-design/icons'; -import { Button, Tooltip, Typography } from 'antd'; +import { Button, Form, Select, Tooltip, Typography } from 'antd'; import { ColumnsType } from 'antd/lib/table'; import { ExpandableConfig } from 'antd/lib/table/interface'; import { @@ -36,7 +36,10 @@ import { ICON_DIMENSION, NO_DATA_PLACEHOLDER, } from '../../../constants/constants'; -import { TABLE_SCROLL_VALUE } from '../../../constants/Table.constants'; +import { + COLUMN_CONSTRAINT_TYPE_OPTIONS, + TABLE_SCROLL_VALUE, +} from '../../../constants/Table.constants'; import { usePermissionProvider } from '../../../context/PermissionProvider/PermissionProvider'; import { OperationPermission, @@ -71,22 +74,29 @@ import FilterTablePlaceHolder from '../../common/ErrorWithPlaceholder/FilterTabl import Table from '../../common/Table/Table'; import TestCaseStatusSummaryIndicator from '../../common/TestCaseStatusSummaryIndicator/TestCaseStatusSummaryIndicator.component'; import EntityNameModal from '../../Modals/EntityNameModal/EntityNameModal.component'; -import { EntityName } from '../../Modals/EntityNameModal/EntityNameModal.interface'; +import { + EntityName, + EntityNameWithAdditionFields, +} from '../../Modals/EntityNameModal/EntityNameModal.interface'; import { ModalWithMarkdownEditor } from '../../Modals/ModalWithMarkdownEditor/ModalWithMarkdownEditor'; import { ColumnFilter } from '../ColumnFilter/ColumnFilter.component'; import TableDescription from '../TableDescription/TableDescription.component'; import TableTags from '../TableTags/TableTags.component'; -import { SchemaTableProps, TableCellRendered } from './SchemaTable.interface'; +import { + SchemaTableProps, + TableCellRendered, + UpdatedColumnFieldData, +} from './SchemaTable.interface'; const SchemaTable = ({ searchText, - onUpdate, hasDescriptionEditAccess, hasTagEditAccess, isReadOnly = false, - onThreadLinkSelect, table, testCaseSummary, + onUpdate, + onThreadLinkSelect, }: SchemaTableProps) => { const { theme } = useApplicationStore(); const { t } = useTranslation(); @@ -171,12 +181,7 @@ const SchemaTable = ({ field, value, columns, - }: { - fqn: string; - field: keyof Column; - value?: string; - columns: Column[]; - }) => { + }: UpdatedColumnFieldData) => { columns?.forEach((col) => { if (col.fullyQualifiedName === fqn) { set(col, field, value); @@ -310,18 +315,27 @@ const SchemaTable = ({ setEditColumnDisplayName(record); }; - const handleEditDisplayName = async ({ displayName }: EntityName) => { + const handleEditColumnData = async (data: EntityName) => { + const { displayName, constraint } = data as EntityNameWithAdditionFields; if ( !isUndefined(editColumnDisplayName) && editColumnDisplayName.fullyQualifiedName ) { const tableCols = cloneDeep(tableColumns); + updateColumnFields({ fqn: editColumnDisplayName.fullyQualifiedName, value: isEmpty(displayName) ? undefined : displayName, field: 'displayName', columns: tableCols, }); + + updateColumnFields({ + fqn: editColumnDisplayName.fullyQualifiedName, + value: isEmpty(constraint) ? undefined : constraint, + field: 'constraint', + columns: tableCols, + }); await onUpdate(tableCols); setEditColumnDisplayName(undefined); } else { @@ -372,13 +386,14 @@ const SchemaTable = ({ {getEntityName(record)} ) : null} + {(tablePermissions?.EditAll || tablePermissions?.EditDisplayName) && !isReadOnly && ( - - - - ), - ...COMMON_RESIZABLE_PANEL_CONFIG.LEFT_PANEL, - }} - secondPanel={{ - children: ( -
- - customProperties={databaseSchema} - dataProducts={databaseSchema?.dataProducts ?? []} - domain={databaseSchema?.domain} - editCustomAttributePermission={ - editCustomAttributePermission - } - editTagPermission={editTagsPermission} - entityFQN={decodedDatabaseSchemaFQN} - entityId={databaseSchema?.id ?? ''} - entityType={EntityType.DATABASE_SCHEMA} - selectedTags={tags} - viewAllPermission={viewAllPermission} - onExtensionUpdate={handleExtensionUpdate} - onTagSelectionChange={handleTagSelection} - onThreadLinkSelect={onThreadLinkSelect} - /> -
- ), - ...COMMON_RESIZABLE_PANEL_CONFIG.RIGHT_PANEL, - className: - 'entity-resizable-right-panel-container entity-resizable-panel-container', - }} - /> - - - ), - }, - { - label: ( - - ), - key: EntityTabs.STORED_PROCEDURE, - children: , - }, - { - label: ( - - ), - key: EntityTabs.ACTIVITY_FEED, - children: ( - - - - ), - }, - { - label: ( - - ), - key: EntityTabs.CUSTOM_PROPERTIES, - children: databaseSchema && ( -
- - className="" - entityDetails={databaseSchema} - entityType={EntityType.DATABASE_SCHEMA} - handleExtensionUpdate={handleExtensionUpdate} - hasEditAccess={editCustomAttributePermission} - hasPermission={viewAllPermission} - isVersionView={false} - /> -
- ), - }, - ]; + const tabs: TabsProps['items'] = useMemo( + () => + databaseSchemaClassBase.getDatabaseSchemaPageTabs({ + feedCount, + tableData, + activeTab, + currentTablesPage, + databaseSchema, + description, + editDescriptionPermission, + isEdit, + showDeletedTables, + tableDataLoading, + editCustomAttributePermission, + editTagsPermission, + decodedDatabaseSchemaFQN, + tags, + viewAllPermission, + storedProcedureCount, + onEditCancel, + handleExtensionUpdate, + handleTagSelection, + onThreadLinkSelect, + tablePaginationHandler, + onDescriptionEdit, + onDescriptionUpdate, + handleShowDeletedTables, + getEntityFeedCount, + fetchDatabaseSchemaDetails, + handleFeedCount, + }), + [ + feedCount, + tableData, + activeTab, + currentTablesPage, + databaseSchema, + description, + editDescriptionPermission, + isEdit, + showDeletedTables, + tableDataLoading, + editCustomAttributePermission, + editTagsPermission, + decodedDatabaseSchemaFQN, + tags, + viewAllPermission, + storedProcedureCount, + handleExtensionUpdate, + handleTagSelection, + onThreadLinkSelect, + tablePaginationHandler, + onEditCancel, + onDescriptionEdit, + onDescriptionUpdate, + handleShowDeletedTables, + getEntityFeedCount, + fetchDatabaseSchemaDetails, + handleFeedCount, + ] + ); const updateVote = async (data: QueryVote, id: string) => { try { diff --git a/openmetadata-ui/src/main/resources/ui/src/pages/TableDetailsPageV1/TableConstraints/TableConstraints.tsx b/openmetadata-ui/src/main/resources/ui/src/pages/TableDetailsPageV1/TableConstraints/TableConstraints.tsx index 8a5fdd99d29..92cf2c0d6e8 100644 --- a/openmetadata-ui/src/main/resources/ui/src/pages/TableDetailsPageV1/TableConstraints/TableConstraints.tsx +++ b/openmetadata-ui/src/main/resources/ui/src/pages/TableDetailsPageV1/TableConstraints/TableConstraints.tsx @@ -10,122 +10,191 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -import { Space, Tooltip, Typography } from 'antd'; +import { Button, Space, Tooltip, Typography } from 'antd'; import { isEmpty, map } from 'lodash'; -import React, { FC, Fragment, useMemo } from 'react'; +import React, { FC, Fragment, useCallback, useMemo, useState } from 'react'; import { useTranslation } from 'react-i18next'; import { Link } from 'react-router-dom'; +import { ReactComponent as IconEdit } from '../../../assets/svg/edit-new.svg'; +import { ReactComponent as PlusIcon } from '../../../assets/svg/plus-primary.svg'; +import TagButton from '../../../components/common/TagButton/TagButton.component'; import { FQN_SEPARATOR_CHAR } from '../../../constants/char.constants'; +import { DE_ACTIVE_COLOR, ICON_DIMENSION } from '../../../constants/constants'; import { SUPPORTED_TABLE_CONSTRAINTS } from '../../../constants/Table.constants'; import { EntityType, FqnPart } from '../../../enums/entity.enum'; import { ConstraintType, Table } from '../../../generated/entity/data/table'; import { getPartialNameFromTableFQN } from '../../../utils/CommonUtils'; - import entityUtilClassBase from '../../../utils/EntityUtilClassBase'; import ForeignKeyConstraint from './ForeignKeyConstraint'; import PrimaryKeyConstraint from './PrimaryKeyConstraint'; import './table-constraints.less'; +import TableConstraintsModal from './TableConstraintsModal/TableConstraintsModal.component'; interface TableConstraintsProps { - constraints: Table['tableConstraints']; + hasPermission: boolean; + tableDetails?: Table; + onUpdate: (updateData: Table['tableConstraints']) => Promise; } -const TableConstraints: FC = ({ constraints }) => { +const TableConstraints: FC = ({ + tableDetails, + hasPermission, + onUpdate, +}) => { const { t } = useTranslation(); + const [isModalOpen, setIsModalOpen] = useState(false); const supportedConstraints = useMemo( () => - constraints?.filter((constraint) => + tableDetails?.tableConstraints?.filter((constraint) => SUPPORTED_TABLE_CONSTRAINTS.includes( constraint.constraintType as ConstraintType ) ) ?? [], - [constraints] + [tableDetails?.tableConstraints] ); - if (isEmpty(supportedConstraints)) { - return null; - } + const handleOpenEditConstraintModal = useCallback( + () => setIsModalOpen(true), + [] + ); + const handleCloseEditConstraintModal = useCallback( + () => setIsModalOpen(false), + [] + ); + + const handleSubmit = async (values: Table['tableConstraints']) => { + await onUpdate(values); + setIsModalOpen(false); + }; return ( - - - {t('label.table-constraints')} - - {supportedConstraints.map( - ({ constraintType, columns, referredColumns }, index) => { - if (constraintType === ConstraintType.PrimaryKey) { - return ( -
- - {columns?.map((column, index) => ( - - {(columns?.length ?? 0) - 1 !== index ? ( - - ) : null} - - ))} - + <> + + + + {t('label.table-constraints')} + - - {columns?.map((column) => ( - - {column} - - ))} - -
- ); - } - if (constraintType === ConstraintType.ForeignKey) { - return ( - - - - {columns?.join(', ')} -
- {map(referredColumns, (referredColumn) => ( - - - - {referredColumn} - - - + {hasPermission && !isEmpty(supportedConstraints) && ( + + + + )} + + + {hasPermission && supportedConstraints.length === 0 && ( + } + label={t('label.add')} + tooltip="" + onClick={handleOpenEditConstraintModal} + /> + )} + + {supportedConstraints.map( + ({ constraintType, columns, referredColumns }, index) => { + if (constraintType === ConstraintType.PrimaryKey) { + return ( +
+ + {columns?.map((column, index) => ( + + {(columns?.length ?? 0) - 1 !== index ? ( + + ) : null} + ))} -
- - - ); - } + - return null; - } + + {columns?.map((column) => ( + + {column} + + ))} + +
+ ); + } + if (constraintType === ConstraintType.ForeignKey) { + return ( + + + + {columns?.join(', ')} +
+ {map(referredColumns, (referredColumn) => ( + + + + {referredColumn} + + + + ))} +
+
+
+ ); + } + + return null; + } + )} +
+ + {isModalOpen && ( + )} -
+ ); }; diff --git a/openmetadata-ui/src/main/resources/ui/src/pages/TableDetailsPageV1/TableConstraints/TableConstraintsModal/TableConstraintsModal.component.tsx b/openmetadata-ui/src/main/resources/ui/src/pages/TableDetailsPageV1/TableConstraints/TableConstraintsModal/TableConstraintsModal.component.tsx new file mode 100644 index 00000000000..91d19607a56 --- /dev/null +++ b/openmetadata-ui/src/main/resources/ui/src/pages/TableDetailsPageV1/TableConstraints/TableConstraintsModal/TableConstraintsModal.component.tsx @@ -0,0 +1,293 @@ +/* + * Copyright 2024 Collate. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import Icon from '@ant-design/icons/lib/components/Icon'; +import { Button, Col, Form, Modal, Row, Select } from 'antd'; +import { AxiosError } from 'axios'; +import { debounce, isEmpty } from 'lodash'; +import React, { useCallback, useEffect, useMemo, useState } from 'react'; +import { useTranslation } from 'react-i18next'; +import { ReactComponent as IconDelete } from '../../../../assets/svg/ic-delete.svg'; +import { ReactComponent as PlusIcon } from '../../../../assets/svg/plus-primary.svg'; +import { PAGE_SIZE } from '../../../../constants/constants'; +import { RELATIONSHIP_TYPE_OPTION } from '../../../../constants/Table.constants'; +import { SearchIndex } from '../../../../enums/search.enum'; +import { ConstraintType, Table } from '../../../../generated/entity/data/table'; +import { searchQuery } from '../../../../rest/searchAPI'; +import { getServiceNameQueryFilter } from '../../../../utils/ServiceUtils'; +import { showErrorToast } from '../../../../utils/ToastUtils'; +import { + SelectOptions, + TableConstraintForm, + TableConstraintModalProps, +} from './TableConstraintsModal.interface'; + +const TableConstraintsModal = ({ + tableDetails, + constraint, + onSave, + onClose, +}: TableConstraintModalProps) => { + const { t } = useTranslation(); + const [form] = Form.useForm<{ constraint: TableConstraintForm[] }>(); + const [isLoading, setIsLoading] = useState(false); + const [isRelatedColumnLoading, setIsRelatedColumnLoading] = + useState(false); + const [searchValue, setSearchValue] = useState(''); + const [relatedColumns, setRelatedColumns] = useState([]); + + const tableColumnNameOptions = useMemo( + () => + tableDetails?.columns.map((item) => ({ + label: item.name, + value: item.name, + })) ?? [], + [tableDetails?.columns] + ); + + const getSearchResults = async (value: string) => { + setIsRelatedColumnLoading(true); + try { + const data = await searchQuery({ + query: value, + searchIndex: SearchIndex.TABLE, + queryFilter: getServiceNameQueryFilter( + tableDetails?.service?.name ?? '' + ), + pageNumber: 1, + pageSize: PAGE_SIZE, + includeDeleted: false, + }); + const sources = data.hits.hits.map((hit) => hit._source); + + const allColumns = sources.reduce((acc: SelectOptions[], cv: Table) => { + const columnOption = cv.columns + .map((item) => ({ + label: item.fullyQualifiedName ?? '', + value: item.fullyQualifiedName ?? '', + })) + .filter(Boolean); + + return [...acc, ...columnOption]; + }, []); + + setRelatedColumns(allColumns); + } catch (error) { + showErrorToast( + error as AxiosError, + t('server.entity-fetch-error', { + entity: t('label.suggestion-lowercase-plural'), + }) + ); + } finally { + setIsRelatedColumnLoading(false); + } + }; + + const debounceOnSearch = useCallback(debounce(getSearchResults, 300), []); + + const handleSearch = (value: string): void => { + setSearchValue(value); + debounceOnSearch(value); + }; + + const handleSubmit = async (obj: { constraint: TableConstraintForm[] }) => { + try { + setIsLoading(true); + await form.validateFields(); + const constraintData = obj.constraint.map((item) => ({ + ...item, + columns: [item.columns], + referredColumns: [item.referredColumns], + constraintType: ConstraintType.ForeignKey, + })); + + await onSave(constraintData); + } catch (_) { + // Nothing here + } finally { + setIsLoading(false); + } + }; + + useEffect(() => { + const filteredConstraints = !isEmpty(constraint) + ? constraint + ?.filter((item) => item.constraintType !== ConstraintType.PrimaryKey) + .map((item) => ({ + columns: item.columns?.[0], + relationshipType: item.relationshipType, + referredColumns: item.referredColumns?.[0], + })) + : [ + { + columns: '', + relationshipType: '', + referredColumns: '', + }, + ]; + + form.setFieldValue('constraint', filteredConstraints); + }, [constraint]); + + useEffect(() => { + getSearchResults(searchValue); + }, []); + + return ( + + {t('label.cancel')} + , + , + ]} + maskClosable={false} + title={t(`label.${isEmpty(constraint) ? 'add' : 'update'}-entity`, { + entity: t('label.table-constraint-plural'), + })} + width={600} + onCancel={onClose}> +
+ + {(fields, { add, remove }) => ( + <> + {fields.map(({ key, name, ...restField }) => ( + +
+ + + + + + + +