diff --git a/ingestion/setup.py b/ingestion/setup.py index ace9aa4633b..864f327214e 100644 --- a/ingestion/setup.py +++ b/ingestion/setup.py @@ -46,6 +46,7 @@ base_requirements = { "PyYAML", "jsonschema", "sqllineage==1.3.3", + "MarkupSafe>=2.0", } report_requirements = { diff --git a/ingestion/src/metadata/ingestion/ometa/mixins/es_mixin.py b/ingestion/src/metadata/ingestion/ometa/mixins/es_mixin.py new file mode 100644 index 00000000000..e0401e231b6 --- /dev/null +++ b/ingestion/src/metadata/ingestion/ometa/mixins/es_mixin.py @@ -0,0 +1,64 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Mixin class containing Lineage specific methods + +To be used by OpenMetadata class +""" +from logging.config import DictConfigurator +from typing import Generic, TypeVar + +from pydantic import BaseModel + +from metadata.generated.schema.entity.data.table import Table +from metadata.ingestion.ometa.client import REST +from metadata.ingestion.ometa.utils import ometa_logger + +logger = ometa_logger() + + +# Prevent sqllineage from modifying the logger config +def configure(self): + pass + + +DictConfigurator.configure = configure + +T = TypeVar("T", bound=BaseModel) # pylint: disable=invalid-name + + +class ESMixin(Generic[T]): + client: REST + + es_url: str = "/search/query?q=service:{} {}&from={}&size={}&index={}" + + def search_entities_using_es( + self, service_name, table_obj, search_index, from_count: int = 0, size: int = 10 + ): + generate_es_string = " AND ".join( + [ + "%s:%s" % (key, value) + for (key, value) in table_obj.items() + if value is not None + ] + ) + resp_es = self.client.get( + self.es_url.format( + service_name, generate_es_string, from_count, size, search_index + ) + ) + multiple_entities = [] + if resp_es: + for table_hit in resp_es["hits"]["hits"]: + multiple_entities.append( + self.get_by_name(entity=Table, fqdn=table_hit["fqdn"]) + ) + return multiple_entities diff --git a/ingestion/src/metadata/ingestion/ometa/mixins/lineage_mixin.py b/ingestion/src/metadata/ingestion/ometa/mixins/lineage_mixin.py index 7e9d30c9f0d..64707fbd88a 100644 --- a/ingestion/src/metadata/ingestion/ometa/mixins/lineage_mixin.py +++ b/ingestion/src/metadata/ingestion/ometa/mixins/lineage_mixin.py @@ -142,6 +142,10 @@ class OMetaLineageMixin(Generic[T]): ) return None + def _separate_fqn(self, database, fqn): + database_schema, table = fqn.split(".")[-2:] + return {"database": database, "database_schema": database_schema, "name": table} + def _create_lineage_by_table_name( self, from_table: str, to_table: str, service_name: str, database: str ): @@ -156,7 +160,15 @@ class OMetaLineageMixin(Generic[T]): _get_formmated_table_name(str(from_table)), ) from_entity: Table = self.get_by_name(entity=Table, fqdn=from_fqdn) - + if not from_entity: + table_obj = self._separate_fqn(database=database, fqn=from_fqdn) + multiple_from_fqns = self.search_entities_using_es( + service_name=service_name, + table_obj=table_obj, + search_index="table_search_index", + ) + else: + multiple_from_fqns = [from_entity] to_fqdn = get_fqdn( AddLineageRequest, service_name, @@ -164,23 +176,33 @@ class OMetaLineageMixin(Generic[T]): _get_formmated_table_name(str(to_table)), ) to_entity: Table = self.get_by_name(entity=Table, fqdn=to_fqdn) + if not to_entity: + table_obj = self._separate_fqn(database=database, fqn=to_fqdn) + multiple_to_fqns = self.search_entities_using_es( + service_name=service_name, + table_obj=table_obj, + search_index="table_search_index", + ) + else: + multiple_to_fqns = [to_entity] if not from_entity or not to_entity: return None - - lineage = AddLineageRequest( - edge=EntitiesEdge( - fromEntity=EntityReference( - id=from_entity.id.__root__, - type="table", - ), - toEntity=EntityReference( - id=to_entity.id.__root__, - type="table", - ), - ) - ) - created_lineage = self.add_lineage(lineage) - logger.info(f"Successfully added Lineage {created_lineage}") + for from_entity in multiple_from_fqns: + for to_entity in multiple_to_fqns: + lineage = AddLineageRequest( + edge=EntitiesEdge( + fromEntity=EntityReference( + id=from_entity.id.__root__, + type="table", + ), + toEntity=EntityReference( + id=to_entity.id.__root__, + type="table", + ), + ) + ) + created_lineage = self.add_lineage(lineage) + logger.info(f"Successfully added Lineage {created_lineage}") except Exception as err: logger.debug(traceback.print_exc()) diff --git a/ingestion/src/metadata/ingestion/ometa/ometa_api.py b/ingestion/src/metadata/ingestion/ometa/ometa_api.py index b29abcc486d..e91057b5536 100644 --- a/ingestion/src/metadata/ingestion/ometa/ometa_api.py +++ b/ingestion/src/metadata/ingestion/ometa/ometa_api.py @@ -53,6 +53,7 @@ from metadata.generated.schema.type.entityHistory import EntityVersionHistory from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.ometa.auth_provider import AuthenticationProvider from metadata.ingestion.ometa.client import REST, APIError, ClientConfig +from metadata.ingestion.ometa.mixins.es_mixin import ESMixin from metadata.ingestion.ometa.mixins.glossary_mixin import GlossaryMixin from metadata.ingestion.ometa.mixins.mlmodel_mixin import OMetaMlModelMixin from metadata.ingestion.ometa.mixins.pipeline_mixin import OMetaPipelineMixin @@ -116,6 +117,7 @@ class OpenMetadata( OMetaTagMixin, GlossaryMixin, OMetaServiceMixin, + ESMixin, Generic[T, C], ): """ diff --git a/ingestion/src/metadata/ingestion/source/snowflake_usage.py b/ingestion/src/metadata/ingestion/source/snowflake_usage.py index fae510c069e..04149626ff3 100644 --- a/ingestion/src/metadata/ingestion/source/snowflake_usage.py +++ b/ingestion/src/metadata/ingestion/source/snowflake_usage.py @@ -141,6 +141,8 @@ class SnowflakeUsageSource(Source[TableQuery]): sql=row["query_text"], service_name=self.config.serviceName, ) + if not row["database_name"] and self.service_connection.database: + TableQuery.database = self.service_connection.database logger.debug(f"Parsed Query: {row['query_text']}") if row["schema_name"] is not None: self.report.scanned(f"{row['database_name']}.{row['schema_name']}") @@ -159,6 +161,9 @@ class SnowflakeUsageSource(Source[TableQuery]): """ return self.report + def test_connection(self) -> None: + pass + def close(self): self.alchemy_helper.close() diff --git a/ingestion/src/metadata/ingestion/source/sql_source.py b/ingestion/src/metadata/ingestion/source/sql_source.py index c0674b2d829..ee23f7ebf70 100644 --- a/ingestion/src/metadata/ingestion/source/sql_source.py +++ b/ingestion/src/metadata/ingestion/source/sql_source.py @@ -492,10 +492,10 @@ class SQLSource(Source[OMetaDatabaseAndTable]): upstream=upstream_nodes, ) model_fqdn = f"{schema}.{model_name}".lower() + self.data_models[model_fqdn] = model except Exception as err: logger.debug(traceback.print_exc()) logger.error(err) - self.data_models[model_fqdn] = model def _parse_data_model_upstream(self, mnode): upstream_nodes = []