Fix #5459 - Remove sql-metadata in favor of sqllineage (#5494)

Fix #5459 - Remove sql-metadata in favor of sqllineage (#5494)
This commit is contained in:
Pere Miquel Brull 2022-06-21 18:02:50 +02:00 committed by GitHub
parent ba1b099301
commit 0ecc9f0da6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
40 changed files with 2301 additions and 2523 deletions

View File

@ -118,7 +118,7 @@ public class ElasticSearchIndexDefinition {
if (!exists) {
String elasticSearchIndexMapping = getIndexMapping(elasticSearchIndexType);
CreateIndexRequest request = new CreateIndexRequest(elasticSearchIndexType.indexName);
request.mapping(elasticSearchIndexMapping, XContentType.JSON);
request.source(elasticSearchIndexMapping, XContentType.JSON);
CreateIndexResponse createIndexResponse = client.indices().create(request, RequestOptions.DEFAULT);
LOG.info("{} Created {}", elasticSearchIndexType.indexName, createIndexResponse.isAcknowledged());
}
@ -144,7 +144,7 @@ public class ElasticSearchIndexDefinition {
LOG.info("{} Updated {}", elasticSearchIndexType.indexName, putMappingResponse.isAcknowledged());
} else {
CreateIndexRequest request = new CreateIndexRequest(elasticSearchIndexType.indexName);
request.mapping(elasticSearchIndexMapping, XContentType.JSON);
request.source(elasticSearchIndexMapping, XContentType.JSON);
CreateIndexResponse createIndexResponse = client.indices().create(request, RequestOptions.DEFAULT);
LOG.info("{} Created {}", elasticSearchIndexType.indexName, createIndexResponse.isAcknowledged());
}

View File

@ -1,4 +1,18 @@
{
"settings": {
"analysis": {
"normalizer": {
"lowercase_normalizer": {
"type": "custom",
"char_filter": [],
"filter": [
"lowercase"
]
}
}
}
},
"mappings": {
"properties": {
"name": {
"type":"text"
@ -44,7 +58,8 @@
}
},
"fqdn": {
"type": "keyword"
"type": "keyword",
"normalizer": "lowercase_normalizer"
},
"deleted": {
"type": "boolean"
@ -142,4 +157,5 @@
"type": "long"
}
}
}
}

View File

@ -1,78 +1,94 @@
{
"properties": {
"name": {
"type": "text"
},
"display_name": {
"type": "text"
},
"owner": {
"properties": {
"id": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 128
}
}
},
"type": {
"type": "text"
},
"name": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"fullyQualifiedName": {
"type": "text"
},
"description": {
"type": "text"
},
"deleted": {
"type": "boolean"
},
"href": {
"type": "text"
"settings": {
"analysis": {
"normalizer": {
"lowercase_normalizer": {
"type": "custom",
"char_filter": [],
"filter": [
"lowercase"
]
}
}
},
"fqdn": {
"type": "keyword"
},
"last_updated_timestamp": {
"type": "date",
"format": "epoch_second"
},
"description": {
"type": "text"
},
"glossary_name": {
"type": "keyword"
},
"glossary_id": {
"type": "keyword"
},
"status": {
"type": "keyword"
},
"tags": {
"type": "keyword"
},
"entity_type": {
"type": "keyword"
},
"suggest": {
"type": "completion"
},
"deleted": {
"type": "boolean"
}
},
"mappings": {
"properties": {
"name": {
"type": "text"
},
"display_name": {
"type": "text"
},
"owner": {
"properties": {
"id": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 128
}
}
},
"type": {
"type": "text"
},
"name": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"fullyQualifiedName": {
"type": "text"
},
"description": {
"type": "text"
},
"deleted": {
"type": "boolean"
},
"href": {
"type": "text"
}
}
},
"fqdn": {
"type": "keyword",
"normalizer": "lowercase_normalizer"
},
"last_updated_timestamp": {
"type": "date",
"format": "epoch_second"
},
"description": {
"type": "text"
},
"glossary_name": {
"type": "keyword"
},
"glossary_id": {
"type": "keyword"
},
"status": {
"type": "keyword"
},
"tags": {
"type": "keyword"
},
"entity_type": {
"type": "keyword"
},
"suggest": {
"type": "completion"
},
"deleted": {
"type": "boolean"
}
}
}
}

View File

@ -1,13 +1,28 @@
{
"settings": {
"analysis": {
"normalizer": {
"lowercase_normalizer": {
"type": "custom",
"char_filter": [],
"filter": [
"lowercase"
]
}
}
}
},
"mappings": {
"properties": {
"name": {
"type":"text"
"type": "text"
},
"display_name": {
"type": "text"
},
"fqdn": {
"type": "keyword"
"type": "keyword",
"normalizer": "lowercase_normalizer"
},
"algorithm": {
"type": "keyword"
@ -123,16 +138,16 @@
"service_type": {
"type": "keyword"
},
"monthly_stats":{
"monthly_stats": {
"type": "long"
},
"monthly_percentile_rank":{
"monthly_percentile_rank": {
"type": "long"
},
"weekly_stats":{
"weekly_stats": {
"type": "long"
},
"weekly_percentile_rank":{
"weekly_percentile_rank": {
"type": "long"
},
"daily_percentile_rank": {
@ -142,4 +157,5 @@
"type": "long"
}
}
}
}

View File

@ -1,7 +1,21 @@
{
"settings": {
"analysis": {
"normalizer": {
"lowercase_normalizer": {
"type": "custom",
"char_filter": [],
"filter": [
"lowercase"
]
}
}
}
},
"mappings": {
"properties": {
"name": {
"type":"text"
"type": "text"
},
"display_name": {
"type": "text"
@ -57,10 +71,11 @@
"type": "text"
},
"fqdn": {
"type": "keyword"
"type": "keyword",
"normalizer": "lowercase_normalizer"
},
"task_names": {
"type":"text"
"type": "text"
},
"task_descriptions": {
"type": "text"
@ -124,4 +139,5 @@
"type": "completion"
}
}
}
}

View File

@ -1,7 +1,21 @@
{
"settings": {
"analysis": {
"normalizer": {
"lowercase_normalizer": {
"type": "custom",
"char_filter": [],
"filter": [
"lowercase"
]
}
}
}
},
"mappings": {
"properties": {
"name": {
"type":"text"
"type": "text"
},
"display_name": {
"type": "text"
@ -44,7 +58,8 @@
}
},
"fqdn": {
"type": "keyword"
"type": "keyword",
"normalizer": "lowercase_normalizer"
},
"deleted": {
"type": "boolean"
@ -63,7 +78,7 @@
"type": "keyword"
},
"column_names": {
"type":"text"
"type": "text"
},
"column_descriptions": {
"type": "text"
@ -135,16 +150,16 @@
"service_suggest": {
"type": "completion"
},
"monthly_stats":{
"monthly_stats": {
"type": "long"
},
"monthly_percentile_rank":{
"monthly_percentile_rank": {
"type": "long"
},
"weekly_stats":{
"weekly_stats": {
"type": "long"
},
"weekly_percentile_rank":{
"weekly_percentile_rank": {
"type": "long"
},
"daily_percentile_rank": {
@ -154,4 +169,5 @@
"type": "long"
}
}
}
}

View File

@ -1,100 +1,102 @@
{
"properties": {
"name": {
"type":"text"
},
"display_name": {
"type": "text"
},
"last_updated_timestamp": {
"type": "date",
"format": "epoch_second"
},
"entity_type": {
"type": "keyword"
},
"deleted": {
"type": "boolean"
},
"users": {
"properties": {
"id": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 36
"mappings": {
"properties": {
"name": {
"type": "text"
},
"display_name": {
"type": "text"
},
"last_updated_timestamp": {
"type": "date",
"format": "epoch_second"
},
"entity_type": {
"type": "keyword"
},
"deleted": {
"type": "boolean"
},
"users": {
"properties": {
"id": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 36
}
}
}
},
"type": {
"type": "text"
},
"name": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
},
"type": {
"type": "text"
},
"name": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"fullyQualifiedName": {
"type": "text"
},
"description": {
"type": "text"
},
"deleted": {
"type": "boolean"
},
"href": {
"type": "text"
}
},
"fullyQualifiedName": {
"type": "text"
},
"description": {
"type": "text"
},
"deleted": {
"type": "boolean"
},
"href": {
"type": "text"
}
}
},
"owns": {
"type": "keyword"
},
"default_roles": {
"properties": {
"id": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 36
},
"owns": {
"type": "keyword"
},
"default_roles": {
"properties": {
"id": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 36
}
}
}
},
"type": {
"type": "text"
},
"name": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
},
"type": {
"type": "text"
},
"name": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"fullyQualifiedName": {
"type": "text"
},
"description": {
"type": "text"
},
"deleted": {
"type": "boolean"
},
"href": {
"type": "text"
}
},
"fullyQualifiedName": {
"type": "text"
},
"description": {
"type": "text"
},
"deleted": {
"type": "boolean"
},
"href": {
"type": "text"
}
},
"suggest": {
"type": "completion"
}
},
"suggest": {
"type": "completion"
}
}
}

View File

@ -1,7 +1,21 @@
{
"settings": {
"analysis": {
"normalizer": {
"lowercase_normalizer": {
"type": "custom",
"char_filter": [],
"filter": [
"lowercase"
]
}
}
}
},
"mappings": {
"properties": {
"name": {
"type":"text"
"type": "text"
},
"display_name": {
"type": "text"
@ -44,7 +58,8 @@
}
},
"fqdn": {
"type": "keyword"
"type": "keyword",
"normalizer": "lowercase_normalizer"
},
"deleted": {
"type": "boolean"
@ -115,4 +130,5 @@
"type": "completion"
}
}
}
}

View File

@ -1,100 +1,102 @@
{
"properties": {
"name": {
"type":"text"
},
"display_name": {
"type": "text"
},
"email": {
"type": "text"
},
"last_updated_timestamp": {
"type": "date",
"format": "epoch_second"
},
"entity_type": {
"type": "keyword"
},
"teams": {
"properties": {
"id": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 36
"mappings": {
"properties": {
"name": {
"type": "text"
},
"display_name": {
"type": "text"
},
"email": {
"type": "text"
},
"last_updated_timestamp": {
"type": "date",
"format": "epoch_second"
},
"entity_type": {
"type": "keyword"
},
"teams": {
"properties": {
"id": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 36
}
}
}
},
"type": {
"type": "text"
},
"name": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
},
"type": {
"type": "text"
},
"name": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"fullyQualifiedName": {
"type": "text"
},
"description": {
"type": "text"
},
"deleted": {
"type": "boolean"
},
"href": {
"type": "text"
}
},
"fullyQualifiedName": {
"type": "text"
},
"description": {
"type": "text"
},
"deleted": {
"type": "boolean"
},
"href": {
"type": "text"
}
}
},
"roles": {
"properties": {
"id": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 36
},
"roles": {
"properties": {
"id": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 36
}
}
}
},
"type": {
"type": "text"
},
"name": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
},
"type": {
"type": "text"
},
"name": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"fullyQualifiedName": {
"type": "text"
},
"description": {
"type": "text"
},
"deleted": {
"type": "boolean"
},
"href": {
"type": "text"
}
},
"fullyQualifiedName": {
"type": "text"
},
"description": {
"type": "text"
},
"deleted": {
"type": "boolean"
},
"href": {
"type": "text"
}
},
"deleted": {
"type": "boolean"
},
"suggest": {
"type": "completion"
}
},
"deleted": {
"type": "boolean"
},
"suggest": {
"type": "completion"
}
}
}

View File

@ -12,18 +12,22 @@
"type": "string"
}
},
"tableAliases": {
"description": "Table names mapped with alias used in query",
"type": "object"
},
"columns": {
"description": "Table columns used in query",
"type": "object"
},
"database": {
"description": "Database of the associated with query",
"databaseName": {
"description": "Database associated with the table in the query",
"type": "string"
},
"joins": {
"description": "Maps each parsed table name of a query to the join information",
"type": "object",
"additionalProperties": {
".{1,}": {
"type": "array",
"items": {
"$ref": "tableUsageCount.json#/definitions/tableColumnJoin"
}
}
}
},
"sql": {
"description": "SQL query",
"type": "string"
@ -41,6 +45,6 @@
"type": "string"
}
},
"required": ["sql", "serviceName", "tables", "database"],
"required": ["sql", "serviceName", "tables", "databaseName"],
"additionalProperties": false
}

View File

@ -33,8 +33,8 @@
"description": "Name that identifies this database service.",
"type": "string"
},
"database": {
"description": "Database of the associated with query",
"databaseName": {
"description": "Database associated with the table in the query",
"type": "string"
},
"databaseSchema": {

View File

@ -16,7 +16,8 @@
"description": "Name of the column",
"type": "string"
}
}
},
"additionalProperties": false
},
"tableColumnJoin": {
"type": "object",
@ -32,7 +33,8 @@
"$ref": "#/definitions/tableColumn"
}
}
}
},
"additionalProperties": false
}
},
"properties": {
@ -44,8 +46,8 @@
"description": "Date of execution of SQL query",
"type": "string"
},
"database": {
"description": "Database of the associated with table",
"databaseName": {
"description": "Database associated with the table in the query",
"type": "string"
},
"count": {
@ -76,6 +78,6 @@
"type": "string"
}
},
"required": ["tableName", "date", "database", "serviceName"],
"required": ["tableName", "date", "databaseName", "serviceName"],
"additionalProperties": false
}

File diff suppressed because it is too large Load Diff

View File

@ -37,7 +37,6 @@ base_requirements = {
"wheel~=0.36.2",
"python-jose==3.3.0",
"sqlalchemy>=1.4.0",
"sql-metadata~=2.0.0",
"requests>=2.23",
"cryptography",
"Jinja2>=2.11.3",
@ -51,12 +50,6 @@ base_requirements = {
}
base_plugins = {
"query-parser",
"metadata-usage",
"file-stage",
"sql-metadata~=2.5.0",
}
plugins: Dict[str, Set[str]] = {
"airflow": {
"apache-airflow==2.1.4"

View File

@ -30,13 +30,11 @@ from metadata.generated.schema.type.usageRequest import UsageRequest
from metadata.ingestion.api.bulk_sink import BulkSink, BulkSinkStatus
from metadata.ingestion.ometa.client import APIError
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils import fqn
from metadata.utils.helpers import get_formatted_entity_name
from metadata.utils.logger import ingestion_logger
from metadata.utils.sql_lineage import (
get_column_fqn,
ingest_lineage_by_query,
search_table_entities,
get_lineage_by_query,
get_table_entities_from_query,
)
logger = ingestion_logger()
@ -72,18 +70,23 @@ class MetadataUsageBulkSink(BulkSink):
return cls(config, metadata_config)
def ingest_sql_queries_lineage(
self, queries: List[SqlQuery], database: str
self, queries: List[SqlQuery], database_name: str, schema_name: str
) -> None:
"""
Method to ingest lineage by sql queries
"""
for query in queries:
ingest_lineage_by_query(
lineages = get_lineage_by_query(
self.metadata,
query=query.query,
service_name=self.service_name,
database=database,
database_name=database_name,
schema_name=schema_name,
)
for lineage in lineages or []:
created_lineage = self.metadata.add_lineage(lineage)
logger.info(f"Successfully added Lineage {created_lineage}")
def __populate_table_usage_map(
self, table_entity: Table, table_usage: TableUsageCount
@ -98,7 +101,7 @@ class MetadataUsageBulkSink(BulkSink):
"usage_count": table_usage.count,
"sql_queries": table_usage.sqlQueries,
"usage_date": table_usage.date,
"database": table_usage.database,
"database": table_usage.databaseName,
"database_schema": table_usage.databaseSchema,
}
else:
@ -119,7 +122,9 @@ class MetadataUsageBulkSink(BulkSink):
table_queries=value_dict["sql_queries"],
)
self.ingest_sql_queries_lineage(
value_dict["sql_queries"], value_dict["database"]
queries=value_dict["sql_queries"],
database_name=value_dict["database"],
schema_name=value_dict["database_schema"],
)
table_usage_request = UsageRequest(
date=self.today, count=value_dict["usage_count"]
@ -147,34 +152,36 @@ class MetadataUsageBulkSink(BulkSink):
"Table: {}".format(value_dict["table_entity"].name.__root__)
)
# Check here how to properly pick up ES and/or table query data
def write_records(self) -> None:
for usage_record in self.file_handler.readlines():
record = json.loads(usage_record)
table_usage = TableUsageCount(**json.loads(record))
self.service_name = table_usage.serviceName
if "." in table_usage.table:
databaseSchema, table = fqn.split(table_usage.table)[-2:]
table_usage.table = table
if not table_usage.databaseSchema:
table_usage.databaseSchema = databaseSchema
table_usage.database = get_formatted_entity_name(table_usage.database)
table_usage.databaseSchema = get_formatted_entity_name(
table_usage.databaseSchema
table_entities = get_table_entities_from_query(
metadata=self.metadata,
service_name=self.service_name,
database_name=table_usage.databaseName,
database_schema=table_usage.databaseSchema,
table_name=table_usage.table,
)
table_usage.table = get_formatted_entity_name(table_usage.table)
table_entities = search_table_entities(
self.metadata,
table_usage.serviceName,
table_usage.database,
table_usage.databaseSchema,
table_usage.table,
)
for table_entity in table_entities or []:
if not table_entities:
logger.warning(
f"Could not fetch table {table_usage.databaseName}.{table_usage.databaseSchema}.{table_usage.table}"
)
continue
for table_entity in table_entities:
if table_entity is not None:
self.__populate_table_usage_map(
table_usage=table_usage, table_entity=table_entity
)
table_join_request = self.__get_table_joins(table_usage)
table_join_request = self.__get_table_joins(
table_entity=table_entity, table_usage=table_usage
)
logger.debug("table join request {}".format(table_join_request))
try:
if (
@ -194,7 +201,7 @@ class MetadataUsageBulkSink(BulkSink):
else:
logger.warning(
"Table does not exist, skipping usage publish {}, {}".format(
table_usage.table, table_usage.database
table_usage.table, table_usage.databaseName
)
)
self.status.warnings.append(f"Table: {table_usage.table}")
@ -205,7 +212,9 @@ class MetadataUsageBulkSink(BulkSink):
except APIError:
logger.error("Failed to publish compute.percentile")
def __get_table_joins(self, table_usage: TableUsageCount) -> TableJoins:
def __get_table_joins(
self, table_entity: Table, table_usage: TableUsageCount
) -> TableJoins:
table_joins: TableJoins = TableJoins(
columnJoins=[], directTableJoins=[], startDate=table_usage.date
)
@ -225,7 +234,7 @@ class MetadataUsageBulkSink(BulkSink):
for column in column_join.joinedWith:
joined_column_fqn = self.__get_column_fqn(
table_usage.database, table_usage.databaseSchema, column
table_usage.databaseName, table_usage.databaseSchema, column
)
if str(joined_column_fqn) in joined_with.keys():
column_joined_with = joined_with[str(joined_column_fqn)]
@ -242,8 +251,11 @@ class MetadataUsageBulkSink(BulkSink):
column_joins_dict[column_join.tableColumn.column] = joined_with
for key, value in column_joins_dict.items():
key_name = get_column_fqn(table_entity=table_entity, column=key).split(".")[
-1
]
table_joins.columnJoins.append(
ColumnJoins(columnName=key, joinedWith=list(value.values()))
ColumnJoins(columnName=key_name, joinedWith=list(value.values()))
)
return table_joins
@ -253,18 +265,18 @@ class MetadataUsageBulkSink(BulkSink):
"""
Method to get column fqn
"""
table_entities = search_table_entities(
self.metadata,
self.service_name,
database,
database_schema,
table_column.table,
table_entities = get_table_entities_from_query(
metadata=self.metadata,
service_name=self.service_name,
database_name=database,
database_schema=database_schema,
table_name=table_column.table,
)
if not table_entities:
return None
for table_entity in table_entities:
return get_column_fqn(table_entity, table_column.column)
return get_column_fqn(table_entity=table_entity, column=table_column.column)
def get_status(self):
return self.status

View File

@ -58,7 +58,7 @@ from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.bulk_sink import BulkSink, BulkSinkStatus
from metadata.ingestion.ometa.client import APIError
from metadata.ingestion.ometa.ometa_api import EmptyPayloadException, OpenMetadata
from metadata.utils.elasticsearch import ESIndex
from metadata.utils import fqn
logger = logging.getLogger(__name__)
@ -170,10 +170,19 @@ class MigrateBulkSink(BulkSink):
for table in file.readlines():
table = json.loads(table)
try:
table_entities = self.metadata.es_search_from_service(
filters = self._separate_fqn(table.get("fullyQualifiedName"))
fqn_search_string = fqn._build(
table.get("service").get("name"),
filters.get("database", "*"),
filters.get("database_schema", "*"),
filters.get("name"),
)
table_entities = self.metadata.es_search_from_fqn(
entity_type=Table,
filters=self._separate_fqn(table.get("fullyQualifiedName")),
service_name=table.get("service").get("name"),
fqn_search_string=fqn_search_string,
)
if len(table_entities) < 1:
continue

View File

@ -14,13 +14,13 @@ Mixin class containing Lineage specific methods
To be used by OpenMetadata class
"""
import time
from typing import Dict, Generic, List, Optional, Type, TypeVar
from typing import Generic, List, Optional, Type, TypeVar
from pydantic import BaseModel
from metadata.ingestion.ometa.client import REST
from metadata.ingestion.ometa.utils import ometa_logger
from metadata.utils.elasticsearch import ES_INDEX_MAP, get_query_from_dict
from metadata.utils.elasticsearch import ES_INDEX_MAP
logger = ometa_logger()
@ -36,7 +36,7 @@ class ESMixin(Generic[T]):
client: REST
search_from_service_url = "/search/query?q=service.name:{service} AND {filters}&from={from_}&size={size}&index={index}"
fqdn_search = "/search/query?q=fqdn:{fqn}&from={from_}&size={size}&index={index}"
def _search_es_entity(
self, entity_type: Type[T], query_string: str
@ -89,11 +89,10 @@ class ESMixin(Generic[T]):
return None
def es_search_from_service(
def es_search_from_fqn(
self,
entity_type: Type[T],
service_name: str,
filters: Dict[str, Optional[str]],
fqn_search_string: str,
from_count: int = 0,
size: int = 10,
retries: int = 3,
@ -102,23 +101,21 @@ class ESMixin(Generic[T]):
Given a service_name and some filters, search for entities using ES
:param entity_type: Entity to look for
:param service_name: Filter by service_name
:param filters: Set of extra filters to apply. It should at least be {"name": <entity name>}
:param fqn_search_string: string used to search by FQN. E.g., service.*.schema.table
:param from_count: Records to expect
:param size: Number of records
:param retries: Number of retries for the ES query
:return: List of entities
"""
try:
filter_query = get_query_from_dict(filters)
query_string = self.search_from_service_url.format(
service=service_name,
filters=filter_query,
from_=from_count,
size=size,
index=ES_INDEX_MAP[entity_type.__name__], # Fail if not exists
)
query_string = self.fqdn_search.format(
fqn=fqn_search_string,
from_=from_count,
size=size,
index=ES_INDEX_MAP[entity_type.__name__], # Fail if not exists
)
try:
return self._search_es_entity_retry(
entity_type=entity_type, query_string=query_string, retries=retries
)

View File

@ -19,6 +19,7 @@ from typing import Any, Dict, Generic, Optional, Type, TypeVar, Union
from pydantic import BaseModel
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.table import Table
from metadata.ingestion.ometa.client import REST, APIError
from metadata.ingestion.ometa.utils import get_entity_type, ometa_logger

View File

@ -137,6 +137,7 @@ class OMetaTableMixin:
:param table: Table Entity to update
:param table_join_request: Join data to add
"""
logger.info("table join request %s", table_join_request.json())
resp = self.client.put(
f"{self.get_suffix(Table)}/{table.id.__root__}/joins",

View File

@ -489,7 +489,7 @@ class OpenMetadata(
return entity(**resp)
except APIError as err:
if err.status_code == 404:
logger.info(
logger.debug(
"GET %s for %s. HTTP %s - %s",
entity.__name__,
path,

View File

@ -14,9 +14,12 @@ Query parser implementation
import datetime
import traceback
from typing import Optional
from collections import defaultdict
from logging.config import DictConfigurator
from typing import Dict, List, Optional, Tuple
from sql_metadata import Parser
from sqllineage.exceptions import SQLLineageException
from sqlparse.sql import Comparison, Identifier, Statement
from metadata.config.common import ConfigModel
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
@ -24,12 +27,265 @@ from metadata.generated.schema.entity.services.connections.metadata.openMetadata
)
from metadata.generated.schema.type.queryParserData import QueryParserData
from metadata.generated.schema.type.tableQuery import TableQuery
from metadata.generated.schema.type.tableUsageCount import TableColumn, TableColumnJoin
from metadata.ingestion.api.processor import Processor, ProcessorStatus
from metadata.utils.helpers import find_in_list, get_formatted_entity_name
from metadata.utils.logger import ingestion_logger
# Prevent sqllineage from modifying the logger config
# Disable the DictConfigurator.configure method while importing LineageRunner
configure = DictConfigurator.configure
DictConfigurator.configure = lambda _: None
from sqllineage.core import models
from sqllineage.runner import LineageRunner
# Reverting changes after import is done
DictConfigurator.configure = configure
logger = ingestion_logger()
def get_involved_tables_from_parser(parser: LineageRunner) -> List[models.Table]:
"""
Use the LineageRunner parser and combine
source and intermediate tables into
a single set.
:param parser: LineageRunner
:return: List of involved tables
"""
try:
# These are @lazy_property, not properly being picked up by IDEs. Ignore the warning
return list(set(parser.source_tables).union(set(parser.intermediate_tables)))
except SQLLineageException:
logger.debug(
f"Cannot extract source table information from query: {parser._sql}" # pylint: disable=protected-access
)
def get_clean_parser_table_list(tables: List[models.Table]) -> List[str]:
"""
Clean the table name if it has <default>.
:param tables: involved tables
:return: clean table names
"""
return [get_formatted_entity_name(str(table)) for table in tables]
def get_parser_table_aliases(tables: List[models.Table]) -> Dict[str, str]:
"""
Prepare a dictionary in the shape of {alias: table_name} from
the parser tables
:param tables: parser tables
:return: alias dict
"""
return {table.alias: str(table).replace("<default>.", "") for table in tables}
def get_table_name_from_list(
database_name: Optional[str],
schema_name: Optional[str],
table_name: str,
tables: List[str],
) -> Optional[str]:
"""
Find the table name (in any format in my come)
from the list using the given ingredients.
:param database_name: db name
:param schema_name: schema name
:param table_name: table name
:param tables: Contains all involved tables
:return: table name from parser info
"""
table = find_in_list(element=table_name, container=tables)
if table:
return table
schema_table = find_in_list(element=f"{schema_name}.{table_name}", container=tables)
if schema_table:
return schema_table
db_schema_table = find_in_list(
element=f"{database_name}.{schema_name}.{table_name}", container=tables
)
if db_schema_table:
return db_schema_table
logger.debug(f"Cannot find table {db_schema_table} in involved tables")
return None
def get_comparison_elements(
identifier: Identifier, tables: List[str], aliases: Dict[str, str]
) -> Optional[Tuple[str, str]]:
"""
Return the tuple table_name, column_name from each comparison element
:param identifier: comparison identifier
:param tables: involved tables
:param aliases: table aliases
:return: table name and column name from the identifier
"""
values = identifier.value.split(".")
database_name, schema_name, table_or_alias, column_name = (
[None] * (4 - len(values))
) + values
if not table_or_alias or not column_name:
logger.debug(f"Cannot obtain comparison elements from identifier {identifier}")
return None
alias_to_table = aliases.get(table_or_alias)
if alias_to_table:
return alias_to_table, column_name
table_from_list = get_table_name_from_list(
database_name=database_name,
schema_name=schema_name,
table_name=table_or_alias,
tables=tables,
)
if not table_from_list:
logger.debug(f"Cannot find {table_or_alias} in comparison elements")
return None
return table_from_list, column_name
def stateful_add_table_joins(
statement_joins: Dict[str, List[TableColumnJoin]],
source: TableColumn,
target: TableColumn,
) -> None:
"""
Update the statement_joins dict with the new table information
:param statement_joins: dict with state info
:param source: source TableColumn
:param target: target TableColumn
"""
if source.table not in statement_joins:
statement_joins[source.table].append(
TableColumnJoin(tableColumn=source, joinedWith=[target])
)
else:
# check if new column from same table
table_columns = [
join_info.tableColumn for join_info in statement_joins[source.table]
]
existing_table_column = find_in_list(element=source, container=table_columns)
if existing_table_column:
existing_join_info = [
join_info
for join_info in statement_joins[source.table]
if join_info.tableColumn == existing_table_column
][0]
existing_join_info.joinedWith.append(target)
# processing now join column from source table
else:
statement_joins[source.table].append(
TableColumnJoin(tableColumn=source, joinedWith=[target])
)
def stateful_add_joins_from_statement(
join_data: Dict[str, List[TableColumnJoin]],
statement: Statement,
tables: List[str],
aliases: Dict[str, str],
) -> None:
"""
Parse a single statement to pick up join information
:param join_data: join data from previous statements
:param statement: Parsed sql statement to process
:param tables: involved tables in the query
:param aliases: table aliases dict
:return: for each table name, list all joins against other tables
"""
# Here we want to get tokens such as `tableA.col1 = tableB.col2`
comparisons = [
sub for sub in statement.get_sublists() if isinstance(sub, Comparison)
]
for comparison in comparisons:
if "." not in comparison.left.value or "." not in comparison.right.value:
logger.debug(f"Ignoring comparison {comparison}")
continue
table_left, column_left = get_comparison_elements(
identifier=comparison.left, tables=tables, aliases=aliases
)
table_right, column_right = get_comparison_elements(
identifier=comparison.right, tables=tables, aliases=aliases
)
if not table_left or not table_right:
logger.error(f"Cannot find ingredients from {comparison}")
continue
left_table_column = TableColumn(table=table_left, column=column_left)
right_table_column = TableColumn(table=table_right, column=column_right)
# We just send the info once, from Left -> Right.
# The backend will prepare the symmetric information.
stateful_add_table_joins(join_data, left_table_column, right_table_column)
def get_table_joins(
parser: LineageRunner, tables: List[str], aliases: Dict[str, str]
) -> Dict[str, List[TableColumnJoin]]:
"""
For each table involved in the query, find its joins against any
other table.
:param parser: LineageRunner parser
:param tables: involved tables in the query
:param aliases: table aliases dict
:return: for each table name, list all joins against other tables
"""
join_data = defaultdict(list)
for statement in parser.statements_parsed:
stateful_add_joins_from_statement(
join_data, statement=statement, tables=tables, aliases=aliases
)
return join_data
def parse_sql_statement(record: TableQuery) -> Optional[QueryParserData]:
"""
Use the lineage parser and work with the tokens
to convert a RAW SQL statement into
QueryParserData.
:param record: TableQuery from usage
:return: QueryParserData
"""
start_date = record.analysisDate
if isinstance(record.analysisDate, str):
start_date = datetime.datetime.strptime(
str(record.analysisDate), "%Y-%m-%d %H:%M:%S"
).date()
parser = LineageRunner(record.query)
tables = get_involved_tables_from_parser(parser)
if not tables:
return None
clean_tables = get_clean_parser_table_list(tables)
aliases = get_parser_table_aliases(tables)
return QueryParserData(
tables=clean_tables,
joins=get_table_joins(parser=parser, tables=clean_tables, aliases=aliases),
databaseName=record.databaseName,
databaseSchema=record.databaseSchema,
sql=record.query,
date=start_date.__root__.strftime("%Y-%m-%d"),
serviceName=record.serviceName,
)
class QueryParserProcessor(Processor):
"""
Extension of the `Processor` class
@ -65,34 +321,13 @@ class QueryParserProcessor(Processor):
return cls(config, metadata_config)
def process(self, record: TableQuery) -> Optional[QueryParserData]:
query_parser_data = None
try:
if not record.query:
return
start_date = record.analysisDate.__root__
if isinstance(record.analysisDate, str):
start_date = datetime.datetime.strptime(
str(record.analysisDate), "%Y-%m-%d %H:%M:%S"
).date()
parser = Parser(record.query)
columns_dict = {} if parser.columns_dict is None else parser.columns_dict
query_parser_data = QueryParserData(
tables=parser.tables,
tableAliases=parser.tables_aliases,
columns=columns_dict,
database=record.database,
databaseSchema=record.databaseSchema,
sql=record.query,
date=start_date.strftime("%Y-%m-%d"),
serviceName=record.serviceName,
)
# pylint: disable=broad-except
except Exception as err:
return parse_sql_statement(record)
except Exception as err: # pylint: disable=broad-except
if hasattr(record, "sql"):
logger.debug(record.sql)
logger.debug(traceback.format_exc())
logger.error(err)
return query_parser_data
def close(self):
pass

View File

@ -11,10 +11,10 @@
import logging
import traceback
from logging.config import DictConfigurator
from typing import TypeVar
from pydantic import BaseModel, ValidationError
from sql_metadata import Parser
from metadata.config.common import ConfigModel
from metadata.generated.schema.api.data.createChart import CreateChartRequest
@ -58,9 +58,18 @@ from metadata.ingestion.source.database.database_service import DataModelLink
from metadata.utils.logger import ingestion_logger
from metadata.utils.sql_lineage import (
_create_lineage_by_table_name,
ingest_lineage_by_query,
get_lineage_by_query,
)
# Prevent sqllineage from modifying the logger config
# Disable the DictConfigurator.configure method while importing LineageRunner
configure = DictConfigurator.configure
DictConfigurator.configure = lambda _: None
from sqllineage.runner import LineageRunner
# Reverting changes after import is done
DictConfigurator.configure = configure
logger = ingestion_logger()
# Allow types from the generated pydantic models
@ -253,12 +262,16 @@ class MetadataRestSink(Sink[Entity]):
table_queries=db_schema_and_table.table.tableQueries,
)
if db_schema_and_table.table.viewDefinition is not None:
lineage_status = ingest_lineage_by_query(
if (
db_schema_and_table.table.viewDefinition is not None
and db_schema_and_table.table.viewDefinition.__root__
):
lineage_status = get_lineage_by_query(
self.metadata,
query=db_schema_and_table.table.viewDefinition.__root__,
service_name=db.service.name,
database=db_schema_and_table.database.name.__root__,
database_name=db_schema_and_table.database.name.__root__,
schema_name=db_schema_and_table.database_schema.name.__root__,
)
if not lineage_status:
self.create_lineage_via_es(db_schema_and_table, db_schema, db)
@ -577,20 +590,21 @@ class MetadataRestSink(Sink[Entity]):
def create_lineage_via_es(self, db_schema_and_table, db_schema, db):
try:
parser = Parser(db_schema_and_table.table.viewDefinition.__root__)
parser = LineageRunner(db_schema_and_table.table.viewDefinition.__root__)
to_table_name = db_schema_and_table.table.name.__root__
for from_table_name in parser.tables:
if "." not in from_table_name:
from_table_name = f"{db_schema.name.__root__}.{from_table_name}"
for from_table_name in parser.source_tables:
_create_lineage_by_table_name(
self.metadata,
from_table_name,
f"{db_schema.name.__root__}.{to_table_name}",
db.service.name,
db_schema_and_table.database.name.__root__,
db_schema_and_table.table.viewDefinition.__root__,
metadata=self.metadata,
from_table=str(from_table_name),
to_table=f"{db_schema.name.__root__}.{to_table_name}",
service_name=db.service.name,
database_name=db_schema_and_table.database.name.__root__,
schema_name=db_schema_and_table.table.viewDefinition.__root__,
query=db_schema_and_table.table.viewDefinition.__root__,
)
except Exception as e:
logger.error("Failed to create view lineage")
logger.debug(f"Query : {db_schema_and_table.table.viewDefinition.__root__}")

View File

@ -8,10 +8,9 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from logging.config import DictConfigurator
from typing import Iterable, List, Optional
from sql_metadata import Parser
from metadata.generated.schema.api.data.createChart import CreateChartRequest
from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
@ -36,6 +35,16 @@ from metadata.utils.helpers import get_standard_chart_type
from metadata.utils.logger import ingestion_logger
from metadata.utils.sql_lineage import search_table_entities
# Prevent sqllineage from modifying the logger config
# Disable the DictConfigurator.configure method while importing LineageRunner
configure = DictConfigurator.configure
DictConfigurator.configure = lambda _: None
from sqllineage.runner import LineageRunner
# Reverting changes after import is done
DictConfigurator.configure = configure
logger = ingestion_logger()
@ -115,19 +124,19 @@ class RedashSource(DashboardServiceSource):
visualization = widgets.get("visualization")
if not visualization.get("query"):
continue
table_list = []
if visualization.get("query", {}).get("query"):
table_list = Parser(visualization["query"]["query"])
for table in table_list.tables:
dataabase_schema = None
parser = LineageRunner(visualization["query"]["query"])
for table in parser.source_tables:
table_name = str(table)
database_schema = None
if "." in table:
dataabase_schema, table = fqn.split(table)[-2:]
database_schema, table = fqn.split(table_name)[-2:]
table_entities = search_table_entities(
metadata=self.metadata,
database=None,
service_name=self.source_config.dbServiceName,
database_schema=dataabase_schema,
table=table,
database_schema=database_schema,
table=table_name,
)
for from_entity in table_entities:
to_entity = self.metadata.get_by_name(

View File

@ -111,7 +111,8 @@ class BigquerySource(CommonDbSourceService):
raise ValueError(f"schema {schema} does not match table {table}")
return segments[1]
def set_project_id(self):
@staticmethod
def set_project_id():
_, project_id = auth.default()
return project_id

View File

@ -17,6 +17,7 @@ import os
from datetime import datetime
from typing import Any, Dict, Iterable, Optional
from google import auth
from google.cloud import logging
from metadata.generated.schema.entity.services.connections.database.bigQueryConnection import (
@ -46,10 +47,9 @@ class BigqueryUsageSource(UsageSource):
def __init__(self, config: WorkflowSource, metadata_config: OpenMetadataConnection):
super().__init__(config, metadata_config)
self.temp_credentials = None
# Used as db
self.project_id = (
self.connection.projectId or self.connection.credentials.gcsConfig.projectId
)
self.project_id = self.set_project_id()
self.logger_name = "cloudaudit.googleapis.com%2Fdata_access"
self.logging_client = logging.Client()
self.usage_logger = self.logging_client.logger(self.logger_name)
@ -70,6 +70,11 @@ class BigqueryUsageSource(UsageSource):
return cls(config, metadata_config)
@staticmethod
def set_project_id():
_, project_id = auth.default()
return project_id
def _get_raw_extract_iter(self) -> Optional[Iterable[Dict[str, Any]]]:
entries = self.usage_logger.list_entries()
for entry in entries:
@ -106,7 +111,7 @@ class BigqueryUsageSource(UsageSource):
endTime=str(jobStats["endTime"]),
analysisDate=analysis_date,
aborted=0,
database=str(database),
databaseName=str(database),
serviceName=self.config.serviceName,
databaseSchema=None,
)

View File

@ -15,6 +15,7 @@ Generic source to build SQL connectors.
import traceback
from abc import ABC
from copy import deepcopy
from logging.config import DictConfigurator
from typing import Iterable, Optional, Tuple
from sqlalchemy.engine import Connection
@ -27,7 +28,8 @@ from metadata.generated.schema.api.data.createDatabaseSchema import (
CreateDatabaseSchemaRequest,
)
from metadata.generated.schema.api.data.createTable import CreateTableRequest
from metadata.generated.schema.entity.data.table import TableType
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.table import Table, TableType
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
@ -52,6 +54,10 @@ from metadata.utils import fqn
from metadata.utils.connections import get_connection, test_connection
from metadata.utils.filters import filter_by_database, filter_by_schema, filter_by_table
from metadata.utils.logger import ingestion_logger
from metadata.utils.sql_lineage import (
get_lineage_by_query,
get_lineage_via_table_entity,
)
logger = ingestion_logger()
@ -227,7 +233,7 @@ class CommonDbSourceService(
self, table_type: str, table_name: str, schema_name: str, inspector: Inspector
) -> Optional[str]:
if table_type == "View":
if table_type == TableType.View:
try:
view_definition = inspector.get_view_definition(table_name, schema_name)
view_definition = (
@ -301,6 +307,54 @@ class CommonDbSourceService(
"{}.{}".format(self.config.serviceName, table_name)
)
def yield_view_lineage(
self, table_name_and_type: Tuple[str, str]
) -> Optional[Iterable[AddLineageRequest]]:
table_name, table_type = table_name_and_type
table_entity: Table = self.context.table
schema_name = self.context.database_schema.name.__root__
db_name = self.context.database.name.__root__
view_definition = self.get_view_definition(
table_type=table_type,
table_name=table_name,
schema_name=schema_name,
inspector=self.inspector,
)
if table_type != TableType.View or not view_definition:
return
# Prevent sqllineage from modifying the logger config
# Disable the DictConfigurator.configure method while importing LineageRunner
configure = DictConfigurator.configure
DictConfigurator.configure = lambda _: None
from sqllineage.exceptions import SQLLineageException
from sqllineage.runner import LineageRunner
# Reverting changes after import is done
DictConfigurator.configure = configure
try:
result = LineageRunner(view_definition)
if result.source_tables and result.target_tables:
yield from get_lineage_by_query(
self.metadata,
query=view_definition,
service_name=self.context.database_service.name.__root__,
database_name=db_name,
schema_name=schema_name,
) or []
else:
yield from get_lineage_via_table_entity(
self.metadata,
table_entity=table_entity,
service_name=self.context.database_service.name.__root__,
database_name=db_name,
schema_name=schema_name,
query=view_definition,
) or []
except SQLLineageException:
logger.error("Could not parse query: Ingesting lineage failed")
def test_connection(self) -> None:
"""
Used a timed-bound function to test that the engine

View File

@ -23,6 +23,7 @@ from metadata.generated.schema.api.data.createDatabaseSchema import (
CreateDatabaseSchemaRequest,
)
from metadata.generated.schema.api.data.createTable import CreateTableRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
from metadata.generated.schema.entity.data.table import Column, DataModel, Table
@ -131,6 +132,13 @@ class DatabaseServiceTopology(ServiceTopology):
processor="yield_table",
consumer=["database_service", "database", "database_schema"],
),
NodeStage(
type_=AddLineageRequest,
context="view_lineage",
processor="yield_view_lineage",
ack_sink=False,
nullable=True,
),
NodeStage(
type_=DataModelLink,
processor="yield_datamodel",
@ -247,6 +255,15 @@ class DatabaseServiceSource(DBTMixin, TopologyRunnerMixin, Source, ABC):
From topology. To be run for each schema
"""
@abstractmethod
def yield_view_lineage(
self, table_name_and_type: Tuple[str, str]
) -> Optional[Iterable[AddLineageRequest]]:
"""
From topology.
Parses view definition to get lineage information
"""
@abstractmethod
def yield_table(
self, table_name_and_type: Tuple[str, str]

View File

@ -83,7 +83,7 @@ class SampleUsageSource(UsageSource):
endTime="",
analysisDate=self.analysis_date,
aborted=False,
database="ecommerce_db",
databaseName="ecommerce_db",
serviceName=self.config.serviceName,
databaseSchema="shopify",
)

View File

@ -12,7 +12,6 @@
Snowflake usage module
"""
from datetime import timedelta
from typing import Iterable, Iterator, Union
from metadata.generated.schema.entity.services.connections.database.snowflakeConnection import (
@ -34,6 +33,7 @@ from metadata.ingestion.api.source import InvalidSourceException
# This import verifies that the dependencies are available.
from metadata.ingestion.source.database.usage_source import UsageSource
from metadata.utils.connections import get_connection
from metadata.utils.helpers import get_start_and_end
from metadata.utils.logger import ingestion_logger
from metadata.utils.sql_queries import SNOWFLAKE_SQL_STATEMENT
@ -57,7 +57,11 @@ class SnowflakeUsageSource(UsageSource):
def __init__(self, config: WorkflowSource, metadata_config: OpenMetadataConnection):
super().__init__(config, metadata_config)
self.end = self.end + timedelta(days=1)
# Snowflake does not allow retrieval of data older than 7 days
duration = min(self.source_config.queryLogDuration, 7)
self.start, self.end = get_start_and_end(duration)
self.sql_stmt = SnowflakeUsageSource.SQL_STATEMENT.format(
start_date=self.start,
end_date=self.end,
@ -67,7 +71,7 @@ class SnowflakeUsageSource(UsageSource):
self._database = "Snowflake"
@classmethod
def create(cls, config_dict, metadata_config: WorkflowConfig):
def create(cls, config_dict, metadata_config: OpenMetadataConnection):
config: WorkflowSource = WorkflowSource.parse_obj(config_dict)
connection: SnowflakeConnection = config.serviceConnection.__root__.config
if not isinstance(connection, SnowflakeConnection):
@ -98,7 +102,7 @@ class SnowflakeUsageSource(UsageSource):
endTime=str(row["end_time"]),
analysisDate=self.analysis_date,
aborted=self.get_aborted_status(row),
database=self.get_database_name(row),
databaseName=self.get_database_name(row),
serviceName=self.config.serviceName,
databaseSchema=row["schema_name"],
)

View File

@ -13,6 +13,7 @@ Usage Souce Module
"""
import csv
import traceback
from abc import ABC
from typing import Iterable, Optional
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
@ -34,7 +35,10 @@ from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
class UsageSource(Source[TableQuery]):
class UsageSource(Source[TableQuery], ABC):
sql_stmt: str
def __init__(self, config: WorkflowSource, metadata_config: OpenMetadataConnection):
super().__init__()
self.config = config
@ -77,7 +81,7 @@ class UsageSource(Source[TableQuery]):
endTime=query_dict.get("end_time", ""),
analysisDate=self.analysis_date,
aborted=self.get_aborted_status(query_dict),
database=self.get_database_name(query_dict),
databaseName=self.get_database_name(query_dict),
serviceName=self.config.serviceName,
databaseSchema=query_dict.get("schema_name"),
)
@ -92,7 +96,7 @@ class UsageSource(Source[TableQuery]):
endTime=str(row["end_time"]),
analysisDate=self.analysis_date,
aborted=self.get_aborted_status(row),
database=self.get_database_name(row),
databaseName=self.get_database_name(row),
serviceName=self.config.serviceName,
databaseSchema=row["schema_name"],
)
@ -107,7 +111,7 @@ class UsageSource(Source[TableQuery]):
if table_query:
if filter_by_database(
self.source_config.databaseFilterPattern,
database_name=table_query.database,
database_name=table_query.databaseName,
):
continue
if filter_by_schema(
@ -121,11 +125,10 @@ class UsageSource(Source[TableQuery]):
logger.debug(f"Parsed Query: {table_query.query}")
if not table_query.databaseSchema:
self.report.scanned(
f"{table_query.database}.{table_query.databaseSchema}"
f"{table_query.databaseName}.{table_query.databaseSchema}"
)
else:
self.report.scanned(f"{table_query.database}")
yield table_query
self.report.scanned(f"{table_query.databaseName}")
except Exception as err:
logger.debug(traceback.format_exc())
logger.error(str(err))

View File

@ -11,6 +11,7 @@
import json
import pathlib
import traceback
from metadata.generated.schema.entity.data.table import SqlQuery
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
@ -95,35 +96,22 @@ class TableUsageStage(Stage[QueryParserData]):
if record is None:
return None
for table in record.tables:
table_joins = record.joins.get(table)
try:
self._add_sql_query(record=record, table=table)
table_usage_count = self.table_usage.get(table)
if table_usage_count is not None:
table_usage_count.count = table_usage_count.count + 1
if record.columns.get("join") is not None:
table_usage_count.joins.append(
get_table_column_join(
table,
record.tableAliases,
record.columns["join"],
record.database,
)
)
if table_joins:
table_usage_count.joins.extend(table_joins)
else:
joins = []
if record.columns.get("join") is not None:
tbl_column_join = get_table_column_join(
table,
record.tableAliases,
record.columns["join"],
record.database,
)
if tbl_column_join is not None:
joins.append(tbl_column_join)
if table_joins:
joins.extend(table_joins)
table_usage_count = TableUsageCount(
table=table,
database=record.database,
databaseName=record.databaseName,
date=record.date,
joins=joins,
serviceName=record.serviceName,
@ -132,7 +120,9 @@ class TableUsageStage(Stage[QueryParserData]):
)
except Exception as exc:
logger.error("Error in staging record {}".format(exc))
logger.error("Error in staging record - {}".format(exc))
logger.error(traceback.format_exc())
self.table_usage[table] = table_usage_count
logger.info(f"Successfully record staged for {table}")
@ -141,8 +131,9 @@ class TableUsageStage(Stage[QueryParserData]):
def close(self):
for key, value in self.table_usage.items():
value.sqlQueries = self.table_queries.get(key, [])
data = value.json()
self.file.write(json.dumps(data))
self.file.write("\n")
if value:
value.sqlQueries = self.table_queries.get(key, [])
data = value.json()
self.file.write(json.dumps(data))
self.file.write("\n")
self.file.close()

View File

@ -12,7 +12,7 @@
Helper methods for ES
"""
from typing import Dict, List, Optional, TypeVar
from typing import List, Optional, TypeVar
from pydantic import BaseModel
@ -39,18 +39,6 @@ ES_INDEX_MAP = {
}
def get_query_from_dict(data: Dict[str, Optional[str]]) -> str:
"""
Prepare a query to be passed to ES based on incoming
key-value pairs in a dict
:param data: key-value pairs to use for searching in ES
:return: query string
"""
return " AND ".join(
[f"{key}:{value}" for key, value in data.items() if value is not None]
)
def get_entity_from_es_result(
entity_list: Optional[List[T]], fetch_multiple_entities: bool = False
) -> Optional[T]:

View File

@ -126,7 +126,7 @@ def _(
table_name: str,
retries: int = 3,
fetch_multiple_entities: bool = False,
) -> Optional[str]:
) -> Union[Optional[str], Optional[List[str]]]:
"""
Building logic for tables
:param metadata: OMeta client
@ -143,14 +143,14 @@ def _(
)
if not database_name or not schema_name:
es_result = metadata.es_search_from_service(
fqn_search_string = _build(
service_name, database_name or "*", schema_name or "*", table_name
)
es_result = metadata.es_search_from_fqn(
entity_type=Table,
service_name=service_name,
filters={
"database": database_name,
"database_schema": schema_name,
"name": table_name,
},
fqn_search_string=fqn_search_string,
retries=retries,
)
entity: Optional[Union[Table, List[Table]]] = get_entity_from_es_result(

View File

@ -67,7 +67,8 @@ def get_start_and_end(duration):
start = (today + timedelta(0 - duration)).replace(
hour=0, minute=0, second=0, microsecond=0
)
end = today.replace(hour=0, minute=0, second=0, microsecond=0)
# Add one day to make sure we are handling today's queries
end = (today + timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0)
return start, end
@ -207,8 +208,11 @@ def datetime_to_ts(date: datetime) -> int:
def get_formatted_entity_name(name: str) -> Optional[str]:
if name:
return name.replace("[", "").replace("]", "")
return (
name.replace("[", "").replace("]", "").replace("<default>.", "")
if name
else None
)
def get_raw_extract_iter(alchemy_helper) -> Iterable[Dict[str, Any]]:
@ -255,3 +259,14 @@ def get_chart_entities_from_id(
entity = EntityReference(id=chart.id, type="chart")
entities.append(entity)
return entities
def find_in_list(element: Any, container: Iterable[Any]) -> Optional[Any]:
"""
If the element is in the container, return it.
Otherwise, return None
:param element: to find
:param container: container with element
:return: element or None
"""
return next(iter([elem for elem in container if elem == element]), None)

View File

@ -13,7 +13,7 @@ Helper functions to handle SQL lineage operations
"""
import traceback
from logging.config import DictConfigurator
from typing import List, Optional
from typing import Any, Iterable, List, Optional
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.table import Table
@ -56,12 +56,14 @@ def get_column_fqn(table_entity: Table, column: str) -> Optional[str]:
def search_table_entities(
metadata: OpenMetadata,
service_name: str,
database: str,
database: Optional[str],
database_schema: Optional[str],
table: str,
) -> Optional[List[Table]]:
"""
Method to get table entity from database, database_schema & table name
Method to get table entity from database, database_schema & table name.
It uses ES to build the FQN if we miss some info and will run
a request against the API to find the Entity.
"""
try:
table_fqns = fqn.build(
@ -73,19 +75,71 @@ def search_table_entities(
table_name=table,
fetch_multiple_entities=True,
)
table_entities = []
table_entities: Optional[List[Table]] = []
for table_fqn in table_fqns or []:
try:
table_entity = metadata.get_by_name(Table, fqn=table_fqn)
table_entity: Table = metadata.get_by_name(Table, fqn=table_fqn)
if table_entity:
table_entities.append(table_entity)
except APIError:
logger.debug(f"Table not found for fqn: {fqn}")
return table_entities
except Exception as err:
logger.debug(traceback.format_exc())
logger.error(err)
def get_table_entities_from_query(
metadata: OpenMetadata,
service_name: str,
database_name: str,
database_schema: str,
table_name: str,
) -> List[Table]:
"""
Fetch data from API and ES with a fallback strategy.
If the sys data is incorrect, use the table name ingredients.
:param metadata: OpenMetadata client
:param service_name: Service being ingested.
:param database_name: Name of the database informed on db sys results
:param database_schema: Name of the schema informed on db sys results
:param table_name: Table name extracted from query. Can be `table`, `schema.table` or `db.schema.table`
:return: List of tables matching the criteria
"""
# First try to find the data from the given db and schema
# Otherwise, pick it up from the table_name str
# Finally, try with upper case
split_table = table_name.split(".")
empty_list: List[Any] = [None] # Otherwise, there's a typing error in the concat
database_query, schema_query, table = (
empty_list * (3 - len(split_table))
) + split_table
table_entities = search_table_entities(
metadata=metadata,
service_name=service_name,
database=database_name,
database_schema=database_schema,
table=table,
)
if table_entities:
return table_entities
table_entities = search_table_entities(
metadata=metadata,
service_name=service_name,
database=database_query,
database_schema=schema_query,
table=table,
)
if table_entities:
return table_entities
def get_column_lineage(
to_entity: Table,
from_entity: Table,
@ -113,36 +167,34 @@ def _create_lineage_by_table_name(
from_table: str,
to_table: str,
service_name: str,
database: str,
database_name: Optional[str],
schema_name: Optional[str],
query: str,
):
) -> Optional[Iterable[AddLineageRequest]]:
"""
This method is to create a lineage between two tables
"""
try:
from_raw_name = get_formatted_entity_name(str(from_table))
from_table_obj = split_raw_table_name(database=database, raw_name=from_raw_name)
from_entities = search_table_entities(
table=from_table_obj.get("table"),
database_schema=from_table_obj.get("database_schema"),
database=from_table_obj.get("database"),
from_table_entities = get_table_entities_from_query(
metadata=metadata,
service_name=service_name,
database_name=database_name,
database_schema=schema_name,
table_name=from_table,
)
to_raw_name = get_formatted_entity_name(str(to_table))
to_table_obj = split_raw_table_name(database=database, raw_name=to_raw_name)
to_entities = search_table_entities(
table=to_table_obj.get("table"),
database_schema=to_table_obj.get("database_schema"),
database=to_table_obj.get("database"),
to_table_entities = get_table_entities_from_query(
metadata=metadata,
service_name=service_name,
database_name=database_name,
database_schema=schema_name,
table_name=to_table,
)
if not to_entities or not from_entities:
return None
for from_entity in from_entities:
for to_entity in to_entities:
for from_entity in from_table_entities or []:
for to_entity in to_table_entities or []:
col_lineage = get_column_lineage(
to_entity=to_entity,
to_table_raw_name=str(to_table),
@ -169,8 +221,7 @@ def _create_lineage_by_table_name(
)
if lineage_details:
lineage.edge.lineageDetails = lineage_details
created_lineage = metadata.add_lineage(lineage)
logger.info(f"Successfully added Lineage {created_lineage}")
yield lineage
except Exception as err:
logger.debug(traceback.format_exc())
@ -180,7 +231,7 @@ def _create_lineage_by_table_name(
def populate_column_lineage_map(raw_column_lineage):
lineage_map = {}
if not raw_column_lineage or len(raw_column_lineage[0]) != 2:
return
return lineage_map
for source, target in raw_column_lineage:
if lineage_map.get(str(target.parent)):
ele = lineage_map.get(str(target.parent))
@ -200,9 +251,13 @@ def populate_column_lineage_map(raw_column_lineage):
return lineage_map
def ingest_lineage_by_query(
metadata: OpenMetadata, query: str, database: str, service_name: str
) -> bool:
def get_lineage_by_query(
metadata: OpenMetadata,
service_name: str,
database_name: Optional[str],
schema_name: Optional[str],
query: str,
) -> Optional[Iterable[AddLineageRequest]]:
"""
This method parses the query to get source, target and intermediate table names to create lineage,
and returns True if target table is found to create lineage otherwise returns False.
@ -219,42 +274,80 @@ def ingest_lineage_by_query(
try:
result = LineageRunner(query)
if not result.target_tables:
return False
raw_column_lineage = result.get_column_lineage()
column_lineage_map.update(populate_column_lineage_map(raw_column_lineage))
for intermediate_table in result.intermediate_tables:
for source_table in result.source_tables:
_create_lineage_by_table_name(
yield from _create_lineage_by_table_name(
metadata,
from_table=source_table,
to_table=intermediate_table,
from_table=str(source_table),
to_table=str(intermediate_table),
service_name=service_name,
database=database,
database_name=database_name,
schema_name=schema_name,
query=query,
)
for target_table in result.target_tables:
_create_lineage_by_table_name(
yield from _create_lineage_by_table_name(
metadata,
from_table=intermediate_table,
to_table=target_table,
from_table=str(intermediate_table),
to_table=str(target_table),
service_name=service_name,
database=database,
database_name=database_name,
schema_name=schema_name,
query=query,
)
if not result.intermediate_tables:
for target_table in result.target_tables:
for source_table in result.source_tables:
_create_lineage_by_table_name(
yield from _create_lineage_by_table_name(
metadata,
from_table=source_table,
to_table=target_table,
from_table=str(source_table),
to_table=str(target_table),
service_name=service_name,
database=database,
database_name=database_name,
schema_name=schema_name,
query=query,
)
return True
except Exception as err:
logger.debug(str(err))
logger.warning(f"Ingesting lineage failed")
return False
def get_lineage_via_table_entity(
metadata: OpenMetadata,
table_entity: Table,
database_name: str,
schema_name: str,
service_name: str,
query: str,
) -> Optional[Iterable[AddLineageRequest]]:
# Prevent sqllineage from modifying the logger config
# Disable the DictConfigurator.configure method while importing LineageRunner
configure = DictConfigurator.configure
DictConfigurator.configure = lambda _: None
from sqllineage.runner import LineageRunner
# Reverting changes after import is done
DictConfigurator.configure = configure
column_lineage_map.clear()
try:
parser = LineageRunner(query)
to_table_name = table_entity.name.__root__
for from_table_name in parser.source_tables:
yield from _create_lineage_by_table_name(
metadata,
from_table=str(from_table_name),
to_table=f"{schema_name}.{to_table_name}",
service_name=service_name,
database_name=database_name,
schema_name=schema_name,
query=query,
) or []
except Exception as e:
logger.error("Failed to create view lineage")
logger.debug(f"Query : {query}")
logger.debug(traceback.format_exc())

View File

@ -135,11 +135,21 @@ REDSHIFT_GET_SCHEMA_COLUMN_INFO = """
"""
SNOWFLAKE_SQL_STATEMENT = """
select query_type,query_text,user_name,database_name,
schema_name,start_time,end_time
from table(information_schema.query_history(
end_time_range_start=>to_timestamp_ltz('{start_date}'),
end_time_range_end=>to_timestamp_ltz('{end_date}'),RESULT_LIMIT=>{result_limit}))
SELECT
query_type,
query_text,
user_name,
database_name,
schema_name,
start_time,
end_time
FROM table(
information_schema.query_history(
end_time_range_start => to_timestamp_ltz('{start_date}'),
end_time_range_end => to_timestamp_ltz('{end_date}'),
RESULT_LIMIT => {result_limit}
)
)
WHERE QUERY_TYPE NOT IN ('ROLLBACK','CREATE_USER','CREATE_ROLE','CREATE_NETWORK_POLICY','ALTER_ROLE','ALTER_NETWORK_POLICY','ALTER_ACCOUNT','DROP_SEQUENCE','DROP_USER','DROP_ROLE','DROP_NETWORK_POLICY','REVOKE','UNLOAD','USE','DELETE','DROP','TRUNCATE_TABLE','ALTER_SESSION','COPY','UPDATE','COMMIT','SHOW','ALTER','DESCRIBE','CREATE_TABLE','PUT_FILES','GET_FILES');
"""
SNOWFLAKE_SESSION_TAG_QUERY = 'ALTER SESSION SET QUERY_TAG="{query_tag}"'

View File

@ -35,6 +35,7 @@ from metadata.generated.schema.entity.services.databaseService import (
)
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils import fqn
class OMetaESTest(TestCase):
@ -121,10 +122,14 @@ class OMetaESTest(TestCase):
"""
We can fetch tables from a service
"""
res = self.metadata.es_search_from_service(
fqn_search_string = fqn._build(
self.service.name.__root__, "*", "*", self.entity.name.__root__
)
res = self.metadata.es_search_from_fqn(
entity_type=Table,
service_name=self.service.name.__root__,
filters={"name": self.entity.name.__root__},
fqn_search_string=fqn_search_string,
size=100,
retries=10,
)
@ -133,13 +138,16 @@ class OMetaESTest(TestCase):
self.assertIsNotNone(res)
self.assertIn(self.entity, res)
res = self.metadata.es_search_from_service(
fqn_search_string = fqn._build(
self.service.name.__root__,
self.db_reference.name,
"*",
self.entity.name.__root__,
)
res = self.metadata.es_search_from_fqn(
entity_type=Table,
service_name=self.service.name.__root__,
filters={
"name": self.entity.name.__root__,
"database": self.db_reference.name,
},
fqn_search_string=fqn_search_string,
size=100,
retries=10,
)
@ -147,14 +155,16 @@ class OMetaESTest(TestCase):
self.assertIsNotNone(res)
self.assertIn(self.entity, res)
res = self.metadata.es_search_from_service(
fqn_search_string = fqn._build(
self.service.name.__root__,
self.db_reference.name,
self.schema_reference.name,
self.entity.name.__root__,
)
res = self.metadata.es_search_from_fqn(
entity_type=Table,
service_name=self.service.name.__root__,
filters={
"name": self.entity.name.__root__,
"database": self.db_reference.name,
"database_schema": self.schema_reference.name,
},
fqn_search_string=fqn_search_string,
size=100,
retries=10,
)
@ -166,10 +176,9 @@ class OMetaESTest(TestCase):
"""
Wrong filters return none
"""
res = self.metadata.es_search_from_service(
res = self.metadata.es_search_from_fqn(
entity_type=Table,
service_name=self.service.name.__root__,
filters={"name": "random"},
fqn_search_string="random",
retries=1,
)

View File

@ -0,0 +1,151 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Validate query parser logic
"""
# Prevent sqllineage from modifying the logger config
# Disable the DictConfigurator.configure method while importing LineageRunner
from logging.config import DictConfigurator
from unittest import TestCase
from metadata.generated.schema.type.tableUsageCount import TableColumn, TableColumnJoin
from metadata.ingestion.processor.query_parser import (
get_clean_parser_table_list,
get_involved_tables_from_parser,
get_parser_table_aliases,
get_table_joins,
)
configure = DictConfigurator.configure
DictConfigurator.configure = lambda _: None
from sqllineage.runner import LineageRunner
# Reverting changes after import is done
DictConfigurator.configure = configure
class QueryParserTests(TestCase):
"""
Check methods from query_parser.py
"""
query = col_lineage = """
SELECT
a.col1,
a.col2 + b.col2 AS col2,
case
when col1 = 3 then 'hello'
else 'bye'
end as new_col
FROM foo a
JOIN db.grault b
ON a.col1 = b.col1
JOIN db.holis c
ON a.col1 = c.abc
JOIN db.random d
ON a.col2 = d.col2
WHERE a.col3 = 'abc'
"""
parser = LineageRunner(col_lineage)
def test_involved_tables(self):
tables = {str(table) for table in get_involved_tables_from_parser(self.parser)}
self.assertEqual(
tables, {"db.grault", "db.holis", "<default>.foo", "db.random"}
)
def test_clean_parser_table_list(self):
tables = get_involved_tables_from_parser(self.parser)
clean_tables = set(get_clean_parser_table_list(tables))
self.assertEqual(clean_tables, {"db.grault", "db.holis", "foo", "db.random"})
def test_parser_table_aliases(self):
tables = get_involved_tables_from_parser(self.parser)
aliases = get_parser_table_aliases(tables)
self.assertEqual(
aliases, {"b": "db.grault", "c": "db.holis", "a": "foo", "d": "db.random"}
)
def test_get_table_joins(self):
"""
main logic point
"""
tables = get_involved_tables_from_parser(self.parser)
clean_tables = get_clean_parser_table_list(tables)
aliases = get_parser_table_aliases(tables)
joins = get_table_joins(
parser=self.parser, tables=clean_tables, aliases=aliases
)
self.assertEqual(
joins["foo"],
[
TableColumnJoin(
tableColumn=TableColumn(table="foo", column="col1"),
joinedWith=[
TableColumn(table="db.grault", column="col1"),
TableColumn(table="db.holis", column="abc"),
],
),
TableColumnJoin(
tableColumn=TableColumn(table="foo", column="col2"),
joinedWith=[
TableColumn(table="db.random", column="col2"),
],
),
],
)
def test_capitals(self):
"""
Example on how LineageRunner keeps capitals
for column names
"""
query = """
SELECT
USERS.ID,
li.id
FROM TESTDB.PUBLIC.USERS
JOIN testdb.PUBLIC."lowercase_users" li
ON USERS.id = li.ID
;
"""
parser = LineageRunner(query)
tables = get_involved_tables_from_parser(parser)
clean_tables = get_clean_parser_table_list(tables)
aliases = get_parser_table_aliases(tables)
joins = get_table_joins(parser=parser, tables=clean_tables, aliases=aliases)
self.assertEqual(
joins["testdb.public.users"],
[
TableColumnJoin(
tableColumn=TableColumn(
table="testdb.public.users", column="id"
), # lowercase col
joinedWith=[
TableColumn(
table="testdb.public.lowercase_users", column="ID"
), # uppercase col
],
),
],
)

View File

@ -32,13 +32,11 @@ config = """
"config":{
"type": "DatabaseUsage"
}
}
},
"processor": {
"type": "query-parser",
"config": {
}
"config": {}
},
"stage": {
"type": "table-usage",
@ -68,31 +66,11 @@ class QueryParserTest(TestCase):
Check the join count
"""
expected_result = {
"shopify.dim_address": 200,
"shopify.shop": 300,
"shopify.dim_customer": 250,
"dim_customer": 76,
"shopify.dim_location": 150,
"dim_location.shop_id": 50,
"shop": 56,
"shop_id": 50,
"shopify.dim_staff": 150,
"shopify.fact_line_item": 200,
"shopify.fact_order": 310,
"shopify.product": 10,
"shopify.fact_sale": 520,
"dim_address": 24,
"api": 4,
"dim_location": 8,
"product": 32,
"dim_staff": 10,
"fact_line_item": 34,
"fact_order": 30,
"fact_sale": 54,
"fact_session": 62,
"raw_customer": 20,
"raw_order": 26,
"raw_product_catalog": 12,
"shopify.raw_product_catalog": 2,
"dim_customer": 2,
"fact_order": 2,
"shopify.fact_sale": 3,
"shopify.raw_customer": 10,
}
workflow = Workflow.create(json.loads(config))
workflow.execute()