Added ES mixin - supports Lineage creation (#4163)

This commit is contained in:
Ayush Shah 2022-04-16 11:46:19 +05:30 committed by GitHub
parent 076921cce2
commit 6cea695e23
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 111 additions and 17 deletions

View File

@ -46,6 +46,7 @@ base_requirements = {
"PyYAML",
"jsonschema",
"sqllineage==1.3.3",
"MarkupSafe>=2.0",
}
report_requirements = {

View File

@ -0,0 +1,64 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Mixin class containing Lineage specific methods
To be used by OpenMetadata class
"""
from logging.config import DictConfigurator
from typing import Generic, TypeVar
from pydantic import BaseModel
from metadata.generated.schema.entity.data.table import Table
from metadata.ingestion.ometa.client import REST
from metadata.ingestion.ometa.utils import ometa_logger
logger = ometa_logger()
# Prevent sqllineage from modifying the logger config
def configure(self):
pass
DictConfigurator.configure = configure
T = TypeVar("T", bound=BaseModel) # pylint: disable=invalid-name
class ESMixin(Generic[T]):
client: REST
es_url: str = "/search/query?q=service:{} {}&from={}&size={}&index={}"
def search_entities_using_es(
self, service_name, table_obj, search_index, from_count: int = 0, size: int = 10
):
generate_es_string = " AND ".join(
[
"%s:%s" % (key, value)
for (key, value) in table_obj.items()
if value is not None
]
)
resp_es = self.client.get(
self.es_url.format(
service_name, generate_es_string, from_count, size, search_index
)
)
multiple_entities = []
if resp_es:
for table_hit in resp_es["hits"]["hits"]:
multiple_entities.append(
self.get_by_name(entity=Table, fqdn=table_hit["fqdn"])
)
return multiple_entities

View File

@ -142,6 +142,10 @@ class OMetaLineageMixin(Generic[T]):
)
return None
def _separate_fqn(self, database, fqn):
database_schema, table = fqn.split(".")[-2:]
return {"database": database, "database_schema": database_schema, "name": table}
def _create_lineage_by_table_name(
self, from_table: str, to_table: str, service_name: str, database: str
):
@ -156,7 +160,15 @@ class OMetaLineageMixin(Generic[T]):
_get_formmated_table_name(str(from_table)),
)
from_entity: Table = self.get_by_name(entity=Table, fqdn=from_fqdn)
if not from_entity:
table_obj = self._separate_fqn(database=database, fqn=from_fqdn)
multiple_from_fqns = self.search_entities_using_es(
service_name=service_name,
table_obj=table_obj,
search_index="table_search_index",
)
else:
multiple_from_fqns = [from_entity]
to_fqdn = get_fqdn(
AddLineageRequest,
service_name,
@ -164,23 +176,33 @@ class OMetaLineageMixin(Generic[T]):
_get_formmated_table_name(str(to_table)),
)
to_entity: Table = self.get_by_name(entity=Table, fqdn=to_fqdn)
if not to_entity:
table_obj = self._separate_fqn(database=database, fqn=to_fqdn)
multiple_to_fqns = self.search_entities_using_es(
service_name=service_name,
table_obj=table_obj,
search_index="table_search_index",
)
else:
multiple_to_fqns = [to_entity]
if not from_entity or not to_entity:
return None
lineage = AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(
id=from_entity.id.__root__,
type="table",
),
toEntity=EntityReference(
id=to_entity.id.__root__,
type="table",
),
)
)
created_lineage = self.add_lineage(lineage)
logger.info(f"Successfully added Lineage {created_lineage}")
for from_entity in multiple_from_fqns:
for to_entity in multiple_to_fqns:
lineage = AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(
id=from_entity.id.__root__,
type="table",
),
toEntity=EntityReference(
id=to_entity.id.__root__,
type="table",
),
)
)
created_lineage = self.add_lineage(lineage)
logger.info(f"Successfully added Lineage {created_lineage}")
except Exception as err:
logger.debug(traceback.print_exc())

View File

@ -53,6 +53,7 @@ from metadata.generated.schema.type.entityHistory import EntityVersionHistory
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.ometa.auth_provider import AuthenticationProvider
from metadata.ingestion.ometa.client import REST, APIError, ClientConfig
from metadata.ingestion.ometa.mixins.es_mixin import ESMixin
from metadata.ingestion.ometa.mixins.glossary_mixin import GlossaryMixin
from metadata.ingestion.ometa.mixins.mlmodel_mixin import OMetaMlModelMixin
from metadata.ingestion.ometa.mixins.pipeline_mixin import OMetaPipelineMixin
@ -116,6 +117,7 @@ class OpenMetadata(
OMetaTagMixin,
GlossaryMixin,
OMetaServiceMixin,
ESMixin,
Generic[T, C],
):
"""

View File

@ -141,6 +141,8 @@ class SnowflakeUsageSource(Source[TableQuery]):
sql=row["query_text"],
service_name=self.config.serviceName,
)
if not row["database_name"] and self.service_connection.database:
TableQuery.database = self.service_connection.database
logger.debug(f"Parsed Query: {row['query_text']}")
if row["schema_name"] is not None:
self.report.scanned(f"{row['database_name']}.{row['schema_name']}")
@ -159,6 +161,9 @@ class SnowflakeUsageSource(Source[TableQuery]):
"""
return self.report
def test_connection(self) -> None:
pass
def close(self):
self.alchemy_helper.close()

View File

@ -492,10 +492,10 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
upstream=upstream_nodes,
)
model_fqdn = f"{schema}.{model_name}".lower()
self.data_models[model_fqdn] = model
except Exception as err:
logger.debug(traceback.print_exc())
logger.error(err)
self.data_models[model_fqdn] = model
def _parse_data_model_upstream(self, mnode):
upstream_nodes = []