diff --git a/ingestion/src/metadata/ingestion/ometa/mixins/es_mixin.py b/ingestion/src/metadata/ingestion/ometa/mixins/es_mixin.py index 72c35f4da7c..77a15804845 100644 --- a/ingestion/src/metadata/ingestion/ometa/mixins/es_mixin.py +++ b/ingestion/src/metadata/ingestion/ometa/mixins/es_mixin.py @@ -97,7 +97,7 @@ class ESMixin(Generic[T]): fqdn_search = ( "/search/fieldQuery?fieldName={field_name}&fieldValue={field_value}&from={from_}" - "&size={size}&index={index}" + "&size={size}&index={index}&deleted=false" ) # sort_field needs to be unique for the pagination to work, so we can use the FQN @@ -125,14 +125,20 @@ class ESMixin(Generic[T]): if response: if fields: fields = fields.split(",") - return [ - self.get_by_name( + + entities = [] + for hit in response["hits"]["hits"]: + entity = self.get_by_name( entity=entity_type, fqn=hit["_source"]["fullyQualifiedName"], fields=fields, ) - for hit in response["hits"]["hits"] - ] or None + if entity is None: + continue + + entities.append(entity) + + return entities or None return None diff --git a/ingestion/src/metadata/ingestion/source/pipeline/airbyte/constants.py b/ingestion/src/metadata/ingestion/source/pipeline/airbyte/constants.py new file mode 100644 index 00000000000..0c136dc20a3 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/pipeline/airbyte/constants.py @@ -0,0 +1,29 @@ +# Copyright 2025 Collate +# Licensed under the Collate Community License, Version 1.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Constants for Airbyte +""" + +from enum import Enum + + +class AirbyteSource(Enum): + MYSQL = "MySQL" + POSTGRES = "Postgres" + MSSQL = "Microsoft SQL Server (MSSQL)" + MONGODB = "MongoDb" + + +class AirbyteDestination(Enum): + MYSQL = "MySQL" + POSTGRES = "Postgres" + MSSQL = "MS SQL Server" diff --git a/ingestion/src/metadata/ingestion/source/pipeline/airbyte/metadata.py b/ingestion/src/metadata/ingestion/source/pipeline/airbyte/metadata.py index a7278aa6063..e577f648b7f 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/airbyte/metadata.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/airbyte/metadata.py @@ -29,7 +29,6 @@ from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.entity.services.connections.pipeline.airbyteConnection import ( AirbyteConnection, ) -from metadata.generated.schema.entity.services.databaseService import DatabaseService from metadata.generated.schema.metadataIngestion.workflow import ( Source as WorkflowSource, ) @@ -46,12 +45,16 @@ from metadata.ingestion.api.models import Either from metadata.ingestion.api.steps import InvalidSourceException from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.ingestion.source.pipeline.openlineage.models import TableDetails +from metadata.ingestion.source.pipeline.openlineage.utils import FQNNotFoundException from metadata.ingestion.source.pipeline.pipeline_service import PipelineServiceSource from metadata.utils import fqn from metadata.utils.helpers import clean_uri from metadata.utils.logger import ingestion_logger from metadata.utils.time_utils import convert_timestamp_to_milliseconds +from .utils import get_destination_table_details, get_source_table_details + logger = ingestion_logger() @@ -189,6 +192,26 @@ class AirbyteSource(PipelineServiceSource): ) ) + def _get_table_fqn(self, table_details: TableDetails) -> Optional[str]: + """ + Get the FQN of the table + """ + try: + if self.get_db_service_names(): + return self._get_table_fqn_from_om(table_details) + + return fqn.build( + metadata=self.metadata, + entity_type=Table, + service_name="*", + database_name=table_details.database, + schema_name=table_details.schema, + table_name=table_details.name, + ) + except FQNNotFoundException: + return None + + # pylint: disable=too-many-locals def yield_pipeline_lineage_details( self, pipeline_details: AirbytePipelineDetails ) -> Iterable[Either[AddLineageRequest]]: @@ -197,47 +220,65 @@ class AirbyteSource(PipelineServiceSource): :param pipeline_details: pipeline_details object from airbyte :return: Lineage from inlets and outlets """ + pipeline_name = pipeline_details.connection.get("name") source_connection = self.client.get_source( pipeline_details.connection.get("sourceId") ) destination_connection = self.client.get_destination( pipeline_details.connection.get("destinationId") ) - source_service = self.metadata.get_by_name( - entity=DatabaseService, fqn=source_connection.get("name") - ) - destination_service = self.metadata.get_by_name( - entity=DatabaseService, fqn=destination_connection.get("name") - ) - if not source_service or not destination_service: - return + source_name = source_connection.get("sourceName") + destination_name = destination_connection.get("destinationName") for task in ( pipeline_details.connection.get("syncCatalog", {}).get("streams") or [] ): stream = task.get("stream") - from_fqn = fqn.build( - self.metadata, - Table, - table_name=stream.get("name"), - database_name=None, - schema_name=stream.get("namespace"), - service_name=source_connection.get("name"), + + source_table_details = get_source_table_details(stream, source_connection) + destination_table_details = get_destination_table_details( + stream, destination_connection ) - to_fqn = fqn.build( - self.metadata, - Table, - table_name=stream.get("name"), - database_name=None, - schema_name=stream.get("namespace"), - service_name=destination_connection.get("name"), - ) + if not source_table_details or not destination_table_details: + continue + + from_fqn = self._get_table_fqn(source_table_details) + to_fqn = self._get_table_fqn(destination_table_details) + + if not from_fqn: + logger.warning( + f"While extracting lineage: [{pipeline_name}]," + f" source table: [{source_table_details.database or '*'}]" + f".[{source_table_details.schema}].[{source_table_details.name}]" + f" (type: {source_name}) not found in openmetadata" + ) + continue + if not to_fqn: + logger.warning( + f"While extracting lineage: [{pipeline_name}]," + f" destination table: [{destination_table_details.database or '*'}]" + f".[{destination_table_details.schema}].[{destination_table_details.name}]" + f" (type: {destination_name}) not found in openmetadata" + ) + continue from_entity = self.metadata.get_by_name(entity=Table, fqn=from_fqn) to_entity = self.metadata.get_by_name(entity=Table, fqn=to_fqn) - if not from_entity and not to_entity: + if not from_entity: + logger.warning( + f"While extracting lineage: [{pipeline_name}]," + f" source table (fqn: [{from_fqn}], type: {source_name}) not found" + " in openmetadata" + ) + continue + if not to_entity: + logger.warning( + f"While extracting lineage: [{pipeline_name}]," + f" destination table (fqn: [{to_fqn}], type: {destination_name}) not found" + " in openmetadata" + ) continue pipeline_fqn = fqn.build( @@ -279,4 +320,4 @@ class AirbyteSource(PipelineServiceSource): """ Get Pipeline Name """ - return pipeline_details.connection.get("connectionId") + return pipeline_details.connection.get("name") diff --git a/ingestion/src/metadata/ingestion/source/pipeline/airbyte/utils.py b/ingestion/src/metadata/ingestion/source/pipeline/airbyte/utils.py new file mode 100644 index 00000000000..624abb05060 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/pipeline/airbyte/utils.py @@ -0,0 +1,99 @@ +# Copyright 2025 Collate +# Licensed under the Collate Community License, Version 1.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Utils for Airbyte +""" + +from metadata.ingestion.source.pipeline.openlineage.models import TableDetails +from metadata.utils.logger import ingestion_logger + +from .constants import AirbyteDestination, AirbyteSource + +logger = ingestion_logger() + + +def get_source_table_details(stream: dict, source_connection: dict) -> TableDetails: + """ + Get the source table details + """ + source_name = source_connection.get("sourceName") + source_database = source_connection.get("connectionConfiguration", {}).get( + "database" + ) + source_schema = stream.get("namespace") + + # Check if source is supported + if source_name not in [ + AirbyteSource.POSTGRES.value, + AirbyteSource.MSSQL.value, + AirbyteSource.MYSQL.value, + AirbyteSource.MONGODB.value, + ]: + logger.warning( + f"Lineage of airbyte pipeline with source [{source_name}] is not supported yet" + ) + return None + + # Handle specific database configurations + if source_name == AirbyteSource.MYSQL.value: + source_schema = source_database + source_database = None + elif source_name == AirbyteSource.MONGODB.value: + source_schema = ( + source_connection.get("connectionConfiguration", {}) + .get("database_config", {}) + .get("database") + ) + source_database = None + + return TableDetails( + name=stream["name"], + schema=source_schema, + database=source_database, + ) + + +def get_destination_table_details( + stream: dict, destination_connection: dict +) -> TableDetails: + """ + Get the destination table details + """ + destination_name = destination_connection.get("destinationName") + destination_database = destination_connection.get( + "connectionConfiguration", {} + ).get("database") + destination_schema = destination_connection.get("connectionConfiguration", {}).get( + "schema" + ) + + # Check if destination is supported + if destination_name not in [ + AirbyteDestination.POSTGRES.value, + AirbyteDestination.MSSQL.value, + AirbyteDestination.MYSQL.value, + ]: + logger.warning( + f"Lineage of airbyte pipeline with destination [{destination_name}] is not supported yet" + ) + return None + + # Handle specific database configurations + if destination_name == AirbyteDestination.MYSQL.value: + destination_schema = destination_database + destination_database = None + + return TableDetails( + name=stream["name"], + schema=destination_schema, + database=destination_database, + ) diff --git a/ingestion/src/metadata/ingestion/source/pipeline/openlineage/models.py b/ingestion/src/metadata/ingestion/source/pipeline/openlineage/models.py index dd7ca7d89bd..dfa6904f6db 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/openlineage/models.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/openlineage/models.py @@ -14,7 +14,7 @@ Openlineage Source Model module from dataclasses import dataclass from enum import Enum -from typing import Any, Dict, List +from typing import Any, Dict, List, Optional @dataclass @@ -76,8 +76,9 @@ class TableDetails: Minimal table information. """ - schema: str name: str + schema: str + database: Optional[str] = None class EventType(str, Enum): diff --git a/ingestion/src/metadata/ingestion/source/pipeline/pipeline_service.py b/ingestion/src/metadata/ingestion/source/pipeline/pipeline_service.py index 5912494d037..e2701e9cf9d 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/pipeline_service.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/pipeline_service.py @@ -319,8 +319,7 @@ class PipelineServiceSource(TopologyRunnerMixin, Source, ABC): def _get_table_fqn_from_om(self, table_details: TableDetails) -> Optional[str]: """ Based on partial schema and table names look for matching table object in open metadata. - :param schema: schema name - :param table: table name + :param table_details: TableDetails object containing table name, schema, database information :return: fully qualified name of a Table in Open Metadata """ result = None @@ -330,7 +329,7 @@ class PipelineServiceSource(TopologyRunnerMixin, Source, ABC): metadata=self.metadata, entity_type=Table, service_name=db_service, - database_name=None, + database_name=table_details.database, schema_name=table_details.schema, table_name=table_details.name, ) diff --git a/ingestion/src/metadata/utils/fqn.py b/ingestion/src/metadata/utils/fqn.py index 04ac3b74f00..911ab206c26 100644 --- a/ingestion/src/metadata/utils/fqn.py +++ b/ingestion/src/metadata/utils/fqn.py @@ -151,7 +151,7 @@ def build( def _( metadata: Optional[OpenMetadata], *, - service_name: str, + service_name: Optional[str], database_name: Optional[str], schema_name: Optional[str], table_name: str, @@ -161,7 +161,7 @@ def _( """ Building logic for tables :param metadata: OMeta client - :param service_name: Service Name to filter + :param service_name: Service Name to filter or None :param database_name: DB name or None :param schema_name: Schema name or None :param table_name: Table name @@ -180,8 +180,8 @@ def _( service_name=service_name, ) - # if entity not found in ES proceed to build FQN with database_name and schema_name - if not entity and database_name and schema_name: + # if entity not found in ES proceed to build FQN with service_name, database_name and schema_name + if not entity and service_name and database_name and schema_name: fqn = _build(service_name, database_name, schema_name, table_name) return [fqn] if fetch_multiple_entities else fqn if entity and fetch_multiple_entities: diff --git a/ingestion/tests/unit/topology/pipeline/test_airbyte.py b/ingestion/tests/unit/topology/pipeline/test_airbyte.py index 41878ff0a14..522a69236a8 100644 --- a/ingestion/tests/unit/topology/pipeline/test_airbyte.py +++ b/ingestion/tests/unit/topology/pipeline/test_airbyte.py @@ -18,6 +18,7 @@ from unittest import TestCase from unittest.mock import patch from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest +from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.entity.data.pipeline import ( Pipeline, PipelineStatus, @@ -25,6 +26,7 @@ from metadata.generated.schema.entity.data.pipeline import ( Task, TaskStatus, ) +from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.entity.services.pipelineService import ( PipelineConnection, PipelineService, @@ -34,6 +36,8 @@ from metadata.generated.schema.metadataIngestion.workflow import ( OpenMetadataWorkflowConfig, ) from metadata.generated.schema.type.basic import FullyQualifiedEntityName +from metadata.generated.schema.type.entityLineage import EntitiesEdge, LineageDetails +from metadata.generated.schema.type.entityLineage import Source as LineageSource from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus from metadata.ingestion.source.pipeline.airbyte.metadata import ( @@ -70,7 +74,7 @@ mock_airbyte_config = { } -EXPECTED_ARIBYTE_DETAILS = AirbytePipelineDetails( +EXPECTED_AIRBYTE_DETAILS = AirbytePipelineDetails( workspace=mock_data["workspace"][0], connection=mock_data["connection"][0] ) @@ -156,8 +160,79 @@ MOCK_PIPELINE = Pipeline( ), ) +# Mock data for lineage testing +MOCK_POSTGRES_SOURCE_TABLE = Table( + id="69fc8906-4a4a-45ab-9a54-9cc2d399e10e", + name="mock_table_name", + fullyQualifiedName="mock_source_service.mock_source_db.mock_source_schema.mock_table_name", + columns=[{"name": "id", "dataType": "INT"}, {"name": "name", "dataType": "STRING"}], +) + +MOCK_POSTGRES_DESTINATION_TABLE = Table( + id="59fc8906-4a4a-45ab-9a54-9cc2d399e10e", + name="mock_table_name", + fullyQualifiedName=( + "mock_destination_service.mock_destination_db" + ".mock_destination_schema.mock_table_name" + ), + columns=[{"name": "id", "dataType": "INT"}, {"name": "name", "dataType": "STRING"}], +) + +EXPECTED_LINEAGE = AddLineageRequest( + edge=EntitiesEdge( + fromEntity=EntityReference( + id="69fc8906-4a4a-45ab-9a54-9cc2d399e10e", type="table" + ), + toEntity=EntityReference( + id="59fc8906-4a4a-45ab-9a54-9cc2d399e10e", type="table" + ), + lineageDetails=LineageDetails( + pipeline=EntityReference( + id="2aaa012e-099a-11ed-861d-0242ac120002", type="pipeline" + ), + source=LineageSource.PipelineLineage, + ), + ) +) + +MOCK_SOURCE_TABLE_FQN = ( + "mock_source_service.mock_source_db.mock_source_schema.mock_table_name" +) +MOCK_DESTINATION_TABLE_FQN = "mock_destination_service.mock_destination_db.mock_destination_schema.mock_table_name" + + +# Configure mock for _get_table_fqn to return FQNs for source and destination tables +def mock_get_table_fqn(self, table_details): # pylint: disable=unused-argument + if table_details.name != "mock_table_name": + return None + + if table_details.schema == "mock_source_schema": + return MOCK_SOURCE_TABLE_FQN + if table_details.schema == "mock_destination_schema": + return MOCK_DESTINATION_TABLE_FQN + if table_details.schema == "mock_source_db": + return MOCK_SOURCE_TABLE_FQN + if table_details.schema == "mock_destination_db": + return MOCK_DESTINATION_TABLE_FQN + + return None + + +# Configure the mock to return our test tables and pipeline +def mock_get_by_name(entity, fqn): + if entity == Table: + if fqn == MOCK_SOURCE_TABLE_FQN: + return MOCK_POSTGRES_SOURCE_TABLE + if fqn == MOCK_DESTINATION_TABLE_FQN: + return MOCK_POSTGRES_DESTINATION_TABLE + if entity == Pipeline: + return MOCK_PIPELINE + return None + class AirbyteUnitTest(TestCase): + """Test class for Airbyte source module.""" + @patch( "metadata.ingestion.source.pipeline.pipeline_service.PipelineServiceSource.test_connection" ) @@ -180,20 +255,86 @@ class AirbyteUnitTest(TestCase): self.client.list_connections.return_value = mock_data.get("connection") def test_pipeline_list(self): - assert list(self.airbyte.get_pipelines_list())[0] == EXPECTED_ARIBYTE_DETAILS + assert list(self.airbyte.get_pipelines_list())[0] == EXPECTED_AIRBYTE_DETAILS def test_pipeline_name(self): assert self.airbyte.get_pipeline_name( - EXPECTED_ARIBYTE_DETAILS - ) == mock_data.get("connection")[0].get("connectionId") + EXPECTED_AIRBYTE_DETAILS + ) == mock_data.get("connection")[0].get("name") def test_pipelines(self): - pipline = list(self.airbyte.yield_pipeline(EXPECTED_ARIBYTE_DETAILS))[0].right - assert pipline == EXPECTED_CREATED_PIPELINES + pipeline = list(self.airbyte.yield_pipeline(EXPECTED_AIRBYTE_DETAILS))[0].right + assert pipeline == EXPECTED_CREATED_PIPELINES def test_pipeline_status(self): status = [ either.right - for either in self.airbyte.yield_pipeline_status(EXPECTED_ARIBYTE_DETAILS) + for either in self.airbyte.yield_pipeline_status(EXPECTED_AIRBYTE_DETAILS) ] assert status == EXPECTED_PIPELINE_STATUS + + @patch.object(AirbyteSource, "_get_table_fqn", mock_get_table_fqn) + def test_yield_pipeline_lineage_details(self): + """Test the Airbyte lineage generation functionality.""" + # Mock the client methods needed for lineage with supported source and destination types + self.client.get_source.return_value = { + "sourceName": "Postgres", + "connectionConfiguration": { + "database": "mock_source_db", + "schema": "mock_source_schema", + }, + } + + self.client.get_destination.return_value = { + "destinationName": "Postgres", + "connectionConfiguration": { + "database": "mock_destination_db", + "schema": "mock_destination_schema", + }, + } + + # Mock connection with stream data for lineage test + test_connection = { + "connectionId": "test-connection-id", + "sourceId": "test-source-id", + "destinationId": "test-destination-id", + "name": "Test Connection", + "syncCatalog": { + "streams": [ + { + "stream": { + "name": "mock_table_name", + "namespace": "mock_source_schema", + "jsonSchema": {}, + } + } + ] + }, + } + + test_workspace = {"workspaceId": "test-workspace-id"} + test_pipeline_details = AirbytePipelineDetails( + workspace=test_workspace, connection=test_connection + ) + + # Mock the metadata object directly in the Airbyte source + with patch.object(self.airbyte, "metadata") as mock_metadata: + mock_metadata.get_by_name.side_effect = mock_get_by_name + + # Test yield_pipeline_lineage_details + lineage_results = list( + self.airbyte.yield_pipeline_lineage_details(test_pipeline_details) + ) + + # Check that we get at least one lineage result + assert len(lineage_results) > 0 + + # Extract the lineage details + lineage = lineage_results[0].right + + # Verify the lineage structure + assert lineage.edge.fromEntity.id == MOCK_POSTGRES_SOURCE_TABLE.id + assert lineage.edge.toEntity.id == MOCK_POSTGRES_DESTINATION_TABLE.id + # Compare just the UUID string value from both sides + assert lineage.edge.lineageDetails.pipeline.id.root == MOCK_PIPELINE.id.root + assert lineage.edge.lineageDetails.source == LineageSource.PipelineLineage diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/search/SearchResource.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/search/SearchResource.java index a904fc7deb1..9c2c055db8e 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/search/SearchResource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/search/SearchResource.java @@ -409,10 +409,14 @@ public class SearchResource { @Parameter(description = "Search Index name, defaults to table_search_index") @DefaultValue("table_search_index") @QueryParam("index") - String index) + String index, + @Parameter(description = "Filter documents by deleted param. By default deleted is false") + @DefaultValue("false") + @QueryParam("deleted") + boolean deleted) throws IOException { - return searchRepository.searchByField(fieldName, fieldValue, index); + return searchRepository.searchByField(fieldName, fieldValue, index, deleted); } @GET diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchClient.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchClient.java index 708196f5a3d..ffb99825501 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchClient.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchClient.java @@ -257,7 +257,8 @@ public interface SearchClient { Response.Status.NOT_IMPLEMENTED, NOT_IMPLEMENTED_ERROR_TYPE, NOT_IMPLEMENTED_METHOD); } - Response searchByField(String fieldName, String fieldValue, String index) throws IOException; + Response searchByField(String fieldName, String fieldValue, String index, Boolean deleted) + throws IOException; Response aggregate(AggregationRequest request) throws IOException; diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java index e3ac17968ad..0a0bd739cd1 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/SearchRepository.java @@ -1160,9 +1160,9 @@ public class SearchRepository { .withIsConnectedVia(isConnectedVia(entityType))); } - public Response searchByField(String fieldName, String fieldValue, String index) + public Response searchByField(String fieldName, String fieldValue, String index, Boolean deleted) throws IOException { - return searchClient.searchByField(fieldName, fieldValue, index); + return searchClient.searchByField(fieldName, fieldValue, index, deleted); } public Response aggregate(AggregationRequest request) throws IOException { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchClient.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchClient.java index 5b151ceb6d4..8f4ad507840 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchClient.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchClient.java @@ -1310,13 +1310,17 @@ public class ElasticSearchClient implements SearchClient { } @Override - public Response searchByField(String fieldName, String fieldValue, String index) + public Response searchByField(String fieldName, String fieldValue, String index, Boolean deleted) throws IOException { es.org.elasticsearch.action.search.SearchRequest searchRequest = new es.org.elasticsearch.action.search.SearchRequest( Entity.getSearchRepository().getIndexOrAliasName(index)); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); - searchSourceBuilder.query(QueryBuilders.wildcardQuery(fieldName, fieldValue)); + BoolQueryBuilder query = + QueryBuilders.boolQuery() + .must(QueryBuilders.wildcardQuery(fieldName, fieldValue)) + .filter(QueryBuilders.termQuery("deleted", deleted)); + searchSourceBuilder.query(query); searchRequest.source(searchSourceBuilder); String response = client.search(searchRequest, RequestOptions.DEFAULT).toString(); return Response.status(OK).entity(response).build(); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchClient.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchClient.java index a5587735240..3e16a96fe1c 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchClient.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchClient.java @@ -1381,13 +1381,17 @@ public class OpenSearchClient implements SearchClient { } @Override - public Response searchByField(String fieldName, String fieldValue, String index) + public Response searchByField(String fieldName, String fieldValue, String index, Boolean deleted) throws IOException { os.org.opensearch.action.search.SearchRequest searchRequest = new os.org.opensearch.action.search.SearchRequest( Entity.getSearchRepository().getIndexOrAliasName(index)); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); - searchSourceBuilder.query(QueryBuilders.wildcardQuery(fieldName, fieldValue)); + BoolQueryBuilder query = + QueryBuilders.boolQuery() + .must(QueryBuilders.wildcardQuery(fieldName, fieldValue)) + .filter(QueryBuilders.termQuery("deleted", deleted)); + searchSourceBuilder.query(query); searchRequest.source(searchSourceBuilder); String response = client.search(searchRequest, RequestOptions.DEFAULT).toString(); return Response.status(OK).entity(response).build();