Fix #4767: Refactor Usage & Lineage (#5190)

Fix #4767: Refactor Usage & Lineage (#5190)
This commit is contained in:
Mayur Singal 2022-06-03 13:42:28 +05:30 committed by GitHub
parent 37057c79a2
commit b3428771a3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 786 additions and 629 deletions

View File

@ -35,6 +35,18 @@
"queryLogFilePath": {
"description": "Configuration to set the file path for query logs",
"type": "string"
},
"schemaFilterPattern": {
"description": "Regex to only fetch tables or databases that matches the pattern.",
"$ref": "../type/filterPattern.json#/definitions/filterPattern"
},
"tableFilterPattern": {
"description": "Regex exclude tables or databases that matches the pattern.",
"$ref": "../type/filterPattern.json#/definitions/filterPattern"
},
"databaseFilterPattern": {
"description": "Regex to only fetch databases that matches the pattern.",
"$ref": "../type/filterPattern.json#/definitions/filterPattern"
}
},
"additionalProperties": false

View File

@ -0,0 +1,46 @@
{
"$id": "https://open-metadata.org/schema/entity/data/queryParserData.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Query Parser Data",
"description": "This schema defines type of query parser data",
"type": "object",
"properties": {
"tables": {
"description": "List of tables used in query",
"type": "array",
"items": {
"type": "string"
}
},
"tableAliases": {
"description": "Table names mapped with alias used in query",
"type": "object"
},
"columns": {
"description": "Table columns used in query",
"type": "object"
},
"database": {
"description": "Database of the associated with query",
"type": "string"
},
"sql": {
"description": "SQL query",
"type": "string"
},
"serviceName": {
"description": "Name that identifies this database service.",
"type": "string"
},
"date": {
"description": "Date of execution of SQL query",
"type": "string"
},
"databaseSchema": {
"description": "Database schema of the associated with query",
"type": "string"
}
},
"required": ["sql", "serviceName", "tables", "database"],
"additionalProperties": false
}

View File

@ -0,0 +1,47 @@
{
"$id": "https://open-metadata.org/schema/entity/data/tableQuery.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Table Query",
"description": "This schema defines structure of table query",
"type": "object",
"properties": {
"query": {
"description": "SQL query",
"type": "string"
},
"userName": {
"description": "Name of the user that executed the SQL query",
"type": "string"
},
"startTime": {
"description": "Start time of execution of SQL query",
"type": "string"
},
"endTime": {
"description": "End time of execution of SQL query",
"type": "string"
},
"analysisDate": {
"description": "Date of execution of SQL query",
"$ref": "./basic.json#/definitions/dateTime"
},
"aborted": {
"description": "Flag to check if query was aborted during execution",
"type": "boolean"
},
"serviceName": {
"description": "Name that identifies this database service.",
"type": "string"
},
"database": {
"description": "Database of the associated with query",
"type": "string"
},
"databaseSchema": {
"description": "Database schema of the associated with query",
"type": "string"
}
},
"required": ["query", "sql", "serviceName"],
"additionalProperties": false
}

View File

@ -0,0 +1,81 @@
{
"$id": "https://open-metadata.org/schema/entity/data/tableUsageCount.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Table Usage Count",
"description": "This model is the linking between the usage stage and bulksink steps",
"type": "object",
"definitions": {
"tableColumn": {
"type": "object",
"properties": {
"table": {
"description": "Name of the table",
"type": "string"
},
"column": {
"description": "Name of the column",
"type": "string"
}
}
},
"tableColumnJoin": {
"type": "object",
"properties": {
"tableColumn": {
"description": "Source table column",
"$ref": "#/definitions/tableColumn"
},
"joinedWith": {
"description": "List of table columns with which the table is joined with",
"type": "array",
"items": {
"$ref": "#/definitions/tableColumn"
}
}
}
}
},
"properties": {
"table": {
"description": "Name of the table",
"type": "string"
},
"date": {
"description": "Date of execution of SQL query",
"type": "string"
},
"database": {
"description": "Database of the associated with table",
"type": "string"
},
"count": {
"description": "Usage count of table",
"type": "integer",
"default": 1
},
"databaseSchema": {
"description": "Database schema of the associated with table",
"type": "string"
},
"sqlQueries": {
"description": "List of SQL Queries associated with table",
"type": "array",
"items": {
"$ref": "../entity/data/table.json#/definitions/sqlQuery"
}
},
"joins": {
"description": "List of joins associated with table",
"type": "array",
"items": {
"$ref": "#/definitions/tableColumnJoin"
}
},
"serviceName": {
"description": "Name that identifies this database service.",
"type": "string"
}
},
"required": ["tableName", "date", "database", "serviceName"],
"additionalProperties": false
}

View File

@ -0,0 +1,20 @@
{
"$id": "https://open-metadata.org/schema/entity/data/usageRequest.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Usage Request",
"description": "This schema defines type of table usage request used to publish the usage count on a perticular date",
"javaType": "org.openmetadata.catalog.type.UsageRequest",
"type": "object",
"properties": {
"date": {
"description": "Date of execution of SQL query",
"type": "string"
},
"count": {
"description": "Usage count of table",
"type": "integer"
}
},
"required": ["date", "count"],
"additionalProperties": false
}

View File

@ -4,7 +4,6 @@ source:
serviceConnection:
config:
type: BigQuery
enablePolicyTagImport: true
projectId: project_id
credentials:
gcsConfig:
@ -23,8 +22,7 @@ source:
queryLogDuration: '1'
processor:
type: query-parser
config:
filter: ''
config: {}
stage:
type: table-usage
config:

View File

@ -13,8 +13,7 @@ source:
queryLogDuration: '1'
processor:
type: query-parser
config:
filter: ''
config: {}
stage:
type: table-usage
config:

View File

@ -11,11 +11,9 @@ source:
sourceConfig:
config:
queryLogDuration: '1'
queryLogFilePath: <path to query log file>
processor:
type: query-parser
config:
filter: ''
config: {}
stage:
type: table-usage
config:

View File

@ -13,8 +13,7 @@ source:
queryLogDuration: '1'
processor:
type: query-parser
config:
filter: ''
config: {}
stage:
type: table-usage
config:

View File

@ -14,8 +14,7 @@ source:
resultLimit: 1000
processor:
type: query-parser
config:
filter: ''
config: {}
stage:
type: table-usage
config:

View File

@ -8,12 +8,15 @@
"sampleDataFolder": "./examples/sample_data"
}
},
"sourceConfig": {}
"sourceConfig": {
"config":{
"type":"DatabaseUsage"
}
}
},
"processor": {
"type": "query-parser",
"config": {
"filter": ""
}
},
"stage": {

View File

@ -43,7 +43,7 @@ base_requirements = {
"Jinja2>=2.11.3",
"PyYAML",
"jsonschema",
"sqllineage==1.3.3",
"sqllineage==1.3.5",
"antlr4-python3-runtime==4.9.2",
# compatibility requirements for 3.7
"typing-compat~=0.1.0",
@ -61,7 +61,7 @@ base_plugins = {
"query-parser",
"metadata-usage",
"file-stage",
"sql-metadata~=2.0.0",
"sql-metadata~=2.5.0",
}
plugins: Dict[str, Set[str]] = {
"airflow": {

View File

@ -15,23 +15,29 @@ from typing import List, Optional
from metadata.config.common import ConfigModel
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.table import ColumnJoins, Table, TableJoins
from metadata.generated.schema.entity.data.table import (
ColumnJoins,
JoinedWith,
SqlQuery,
Table,
TableJoins,
)
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.type.tableUsageCount import TableColumn, TableUsageCount
from metadata.generated.schema.type.usageRequest import UsageRequest
from metadata.ingestion.api.bulk_sink import BulkSink, BulkSinkStatus
from metadata.ingestion.models.table_queries import (
ColumnJoinedWith,
TableColumn,
TableUsageCount,
TableUsageRequest,
)
from metadata.ingestion.ometa.client import APIError
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils import fqn
from metadata.utils.helpers import _get_formmated_table_name
from metadata.utils.helpers import get_formatted_entity_name
from metadata.utils.logger import ingestion_logger
from metadata.utils.sql_lineage import ingest_lineage_by_query
from metadata.utils.sql_lineage import (
get_column_fqn,
ingest_lineage_by_query,
search_table_entities,
)
logger = ingestion_logger()
@ -57,6 +63,7 @@ class MetadataUsageBulkSink(BulkSink):
self.metadata = OpenMetadata(self.metadata_config)
self.status = BulkSinkStatus()
self.table_join_dict = {}
self.table_usage_map = {}
self.today = datetime.today().strftime("%Y-%m-%d")
@classmethod
@ -64,13 +71,12 @@ class MetadataUsageBulkSink(BulkSink):
config = MetadataUsageSinkConfig.parse_obj(config_dict)
return cls(config, metadata_config)
def handle_work_unit_start(self, wu):
pass
def handle_work_unit_end(self, wu):
pass
def ingest_sql_queries_lineage(self, queries, database):
def ingest_sql_queries_lineage(
self, queries: List[SqlQuery], database: str
) -> None:
"""
Method to ingest lineage by sql queries
"""
for query in queries:
ingest_lineage_by_query(
self.metadata,
@ -79,74 +85,35 @@ class MetadataUsageBulkSink(BulkSink):
database=database,
)
def write_records(self) -> None:
usage_records = [json.loads(l) for l in self.file_handler.readlines()]
table_usage_map = {}
for record in usage_records:
table_usage = TableUsageCount(**json.loads(record))
self.service_name = table_usage.service_name
if "." in table_usage.table:
(
table_usage.database_schema,
table_usage.table,
) = table_usage.table.split(".")[-2:]
table_entities = self.__get_table_entity(
table_usage.database, table_usage.database_schema, table_usage.table
)
else:
table_entities = self.metadata.es_search_from_service(
entity_type=Table,
service_name=self.service_name,
filters={
"database": table_usage.database,
"database_schema": None,
"name": table_usage.table,
},
)
for table_entity in table_entities or []:
if table_entity is not None:
if not table_usage_map.get(table_entity.id.__root__):
table_usage_map[table_entity.id.__root__] = {
def __populate_table_usage_map(
self, table_entity: Table, table_usage: TableUsageCount
) -> None:
"""
Method Either initialise the map data or
update existing data with information from new queries on the same table
"""
if not self.table_usage_map.get(table_entity.id.__root__):
self.table_usage_map[table_entity.id.__root__] = {
"table_entity": table_entity,
"usage_count": table_usage.count,
"sql_queries": table_usage.sql_queries,
"sql_queries": table_usage.sqlQueries,
"usage_date": table_usage.date,
"database": table_usage.database,
"database_schema": table_usage.databaseSchema,
}
else:
table_usage_map[table_entity.id.__root__][
self.table_usage_map[table_entity.id.__root__][
"usage_count"
] += table_usage.count
table_usage_map[table_entity.id.__root__]["sql_queries"].extend(
table_usage.sql_queries
)
table_join_request = self.__get_table_joins(table_usage)
logger.debug("table join request {}".format(table_join_request))
try:
if (
table_join_request is not None
and len(table_join_request.columnJoins) > 0
):
self.metadata.publish_frequently_joined_with(
table_entity, table_join_request
)
except APIError as err:
self.status.failures.append(table_join_request)
logger.error(
"Failed to update query join for {}, {}".format(
table_usage.table, err
)
self.table_usage_map[table_entity.id.__root__]["sql_queries"].extend(
table_usage.sqlQueries
)
else:
logger.warning(
"Table does not exist, skipping usage publish {}, {}".format(
table_usage.table, table_usage.database
)
)
self.status.warnings.append(f"Table: {table_usage.table}")
for table_id, value_dict in table_usage_map.items():
def __publish_usage_records(self) -> None:
"""
Method to publish SQL Queries, Table Usage & Lineage
"""
for _, value_dict in self.table_usage_map.items():
self.metadata.ingest_table_queries_data(
table=value_dict["table_entity"],
table_queries=value_dict["sql_queries"],
@ -154,8 +121,8 @@ class MetadataUsageBulkSink(BulkSink):
self.ingest_sql_queries_lineage(
value_dict["sql_queries"], value_dict["database"]
)
table_usage_request = TableUsageRequest(
date=value_dict["usage_date"], count=value_dict["usage_count"]
table_usage_request = UsageRequest(
date=self.today, count=value_dict["usage_count"]
)
try:
self.metadata.publish_table_usage(
@ -179,47 +146,100 @@ class MetadataUsageBulkSink(BulkSink):
self.status.failures.append(
"Table: {}".format(value_dict["table_entity"].name.__root__)
)
def write_records(self) -> None:
for usage_record in self.file_handler.readlines():
record = json.loads(usage_record)
table_usage = TableUsageCount(**json.loads(record))
self.service_name = table_usage.serviceName
if "." in table_usage.table:
databaseSchema, table = fqn.split(table_usage.table)[-2:]
table_usage.table = table
if not table_usage.databaseSchema:
table_usage.databaseSchema = databaseSchema
table_usage.database = get_formatted_entity_name(table_usage.database)
table_usage.databaseSchema = get_formatted_entity_name(
table_usage.databaseSchema
)
table_usage.table = get_formatted_entity_name(table_usage.table)
table_entities = search_table_entities(
self.metadata,
table_usage.serviceName,
table_usage.database,
table_usage.databaseSchema,
table_usage.table,
)
for table_entity in table_entities or []:
if table_entity is not None:
self.__populate_table_usage_map(
table_usage=table_usage, table_entity=table_entity
)
table_join_request = self.__get_table_joins(table_usage)
logger.debug("table join request {}".format(table_join_request))
try:
if (
table_join_request is not None
and len(table_join_request.columnJoins) > 0
):
self.metadata.publish_frequently_joined_with(
table_entity, table_join_request
)
except APIError as err:
self.status.failures.append(table_join_request)
logger.error(
"Failed to update query join for {}, {}".format(
table_usage.table, err
)
)
else:
logger.warning(
"Table does not exist, skipping usage publish {}, {}".format(
table_usage.table, table_usage.database
)
)
self.status.warnings.append(f"Table: {table_usage.table}")
self.__publish_usage_records()
try:
self.metadata.compute_percentile(Table, self.today)
self.metadata.compute_percentile(Database, self.today)
except APIError:
logger.error("Failed to publish compute.percentile")
def __get_table_joins(self, table_usage):
def __get_table_joins(self, table_usage: TableUsageCount) -> TableJoins:
table_joins: TableJoins = TableJoins(
columnJoins=[], directTableJoins=[], startDate=table_usage.date
)
"""
Method to get Table Joins
"""
column_joins_dict = {}
for column_join in table_usage.joins:
joined_with = {}
if column_join.table_column is None or len(column_join.joined_with) == 0:
if column_join.tableColumn is None or len(column_join.joinedWith) == 0:
continue
if column_join.table_column.column in column_joins_dict.keys():
joined_with = column_joins_dict[column_join.table_column.column]
if column_join.tableColumn.column in column_joins_dict.keys():
joined_with = column_joins_dict[column_join.tableColumn.column]
else:
column_joins_dict[column_join.table_column.column] = {}
main_column_fqdn = self.__get_column_fqdn(
table_usage.database,
table_usage.database_schema,
column_join.table_column,
column_joins_dict[column_join.tableColumn.column] = {}
for column in column_join.joinedWith:
joined_column_fqn = self.__get_column_fqn(
table_usage.database, table_usage.databaseSchema, column
)
for column in column_join.joined_with:
joined_column_fqdn = self.__get_column_fqdn(
table_usage.database, table_usage.database_schema, column
)
if str(joined_column_fqdn) in joined_with.keys():
column_joined_with = joined_with[str(joined_column_fqdn)]
if str(joined_column_fqn) in joined_with.keys():
column_joined_with = joined_with[str(joined_column_fqn)]
column_joined_with.joinCount += 1
joined_with[str(joined_column_fqdn)] = column_joined_with
elif joined_column_fqdn is not None:
joined_with[str(joined_column_fqdn)] = ColumnJoinedWith(
fullyQualifiedName=str(joined_column_fqdn), joinCount=1
joined_with[str(joined_column_fqn)] = column_joined_with
elif joined_column_fqn is not None:
joined_with[str(joined_column_fqn)] = JoinedWith(
fullyQualifiedName=str(joined_column_fqn), joinCount=1
)
else:
logger.info(
f"Skipping join columns for {column} {joined_column_fqdn}"
logger.debug(
f"Skipping join columns for {column} {joined_column_fqn}"
)
column_joins_dict[column_join.table_column.column] = joined_with
column_joins_dict[column_join.tableColumn.column] = joined_with
for key, value in column_joins_dict.items():
table_joins.columnJoins.append(
@ -227,44 +247,24 @@ class MetadataUsageBulkSink(BulkSink):
)
return table_joins
def __get_column_fqdn(
def __get_column_fqn(
self, database: str, database_schema: str, table_column: TableColumn
):
table_entities = self.__get_table_entity(
database, database_schema, table_column.table
) -> Optional[str]:
"""
Method to get column fqn
"""
table_entities = search_table_entities(
self.metadata,
self.service_name,
database,
database_schema,
table_column.table,
)
if not table_entities:
return None
for table_entity in table_entities:
for tbl_column in table_entity.columns:
if table_column.column.lower() == tbl_column.name.__root__.lower():
return tbl_column.fullyQualifiedName.__root__
def __get_table_entity(
self, database_name: str, database_schema: str, table_name: str
) -> Optional[List[Table]]:
table_fqn = fqn.build(
self.metadata,
entity_type=Table,
service_name=self.service_name,
database_name=database_name,
schema_name=database_schema,
table_name=table_name,
)
table_fqn = _get_formmated_table_name(table_fqn)
table_entity = self.metadata.get_by_name(Table, fqn=table_fqn)
if table_entity:
return [table_entity]
es_result = self.metadata.es_search_from_service(
entity_type=Table,
service_name=self.service_name,
filters={
"database": database_name,
"database_schema": database_schema,
"name": table_name,
},
)
return es_result
for table_entity in table_entities:
return get_column_fqn(table_entity, table_column.column)
def get_status(self):
return self.status

View File

@ -1,98 +0,0 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Dict, List, Optional
from pydantic import BaseModel
from metadata.generated.schema.entity.data.table import ColumnJoins, SqlQuery
from metadata.ingestion.models.json_serializable import JsonSerializable
class TableQuery(JsonSerializable):
""" """
def __init__(
self,
query: str,
user_name: str,
starttime: str,
endtime: str,
analysis_date: str,
database: str,
aborted: bool,
sql: str,
service_name: str,
) -> None:
""" """
self.query = query
self.user_name = user_name
self.starttime = starttime
self.endtime = endtime
self.analysis_date = analysis_date
self.database = database
self.aborted = aborted
self.sql = sql
self.service_name = service_name
class TableColumn(BaseModel):
table: str
column: str
class TableColumnJoin(BaseModel):
table_column: Optional[TableColumn] = None
joined_with: Optional[List[TableColumn]] = None
TableColumnJoins = List[TableColumnJoin]
class TableUsageCount(BaseModel):
table: str
date: str
database: str
database_schema: Optional[str]
sql_queries: List[SqlQuery]
count: int = 1
joins: TableColumnJoins
service_name: str
class QueryParserData(BaseModel):
tables: List[str]
tables_aliases: Dict[str, str]
columns: Dict[str, List[object]]
date: str
database: str
sql: str
service_name: str
class Config:
arbitrary_types_allowed = True
class TableUsageRequest(BaseModel):
date: str
count: int
class ColumnJoinsList(BaseModel):
__root__: List[ColumnJoins]
class ColumnJoinedWith(BaseModel):
fullyQualifiedName: str
joinCount: int
TablesUsage = List[TableUsageCount]

View File

@ -27,7 +27,7 @@ from metadata.generated.schema.entity.data.table import (
TableJoins,
TableProfile,
)
from metadata.ingestion.models.table_queries import TableUsageRequest
from metadata.generated.schema.type.usageRequest import UsageRequest
from metadata.ingestion.ometa.client import REST
from metadata.ingestion.ometa.utils import ometa_logger
@ -115,7 +115,7 @@ class OMetaTableMixin:
)
def publish_table_usage(
self, table: Table, table_usage_request: TableUsageRequest
self, table: Table, table_usage_request: UsageRequest
) -> None:
"""
POST usage details for a Table

View File

@ -76,7 +76,3 @@ def _(arg) -> str:
Models with __root__
"""
return str(arg.__root__)
def _get_formmated_table_name(table_name):
return table_name.replace("[", "").replace("]", "")

View File

@ -22,19 +22,11 @@ from metadata.config.common import ConfigModel
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.type.queryParserData import QueryParserData
from metadata.generated.schema.type.tableQuery import TableQuery
from metadata.ingestion.api.processor import Processor, ProcessorStatus
from metadata.ingestion.models.table_queries import QueryParserData, TableQuery
from metadata.utils.logger import ingestion_logger
class QueryParserProcessorConfig(ConfigModel):
"""
Query parser pydantic configuration model
"""
filter: Optional[str] = None
logger = ingestion_logger()
@ -52,12 +44,12 @@ class QueryParserProcessor(Processor):
status (ProcessorStatus):
"""
config: QueryParserProcessorConfig
config: ConfigModel
status: ProcessorStatus
def __init__(
self,
config: QueryParserProcessorConfig,
config: ConfigModel,
metadata_config: OpenMetadataConnection,
):
@ -69,26 +61,30 @@ class QueryParserProcessor(Processor):
def create(
cls, config_dict: dict, metadata_config: OpenMetadataConnection, **kwargs
):
config = QueryParserProcessorConfig.parse_obj(config_dict)
config = ConfigModel.parse_obj(config_dict)
return cls(config, metadata_config)
def process(self, record: TableQuery) -> QueryParserData:
def process(self, record: TableQuery) -> Optional[QueryParserData]:
query_parser_data = None
try:
start_date = record.analysis_date
if isinstance(record.analysis_date, str):
if not record.query:
return
start_date = record.analysisDate.__root__
if isinstance(record.analysisDate, str):
start_date = datetime.datetime.strptime(
str(record.analysis_date), "%Y-%m-%d %H:%M:%S"
str(record.analysisDate), "%Y-%m-%d %H:%M:%S"
).date()
parser = Parser(record.sql)
parser = Parser(record.query)
columns_dict = {} if parser.columns_dict is None else parser.columns_dict
query_parser_data = QueryParserData(
tables=parser.tables,
tables_aliases=parser.tables_aliases,
tableAliases=parser.tables_aliases,
columns=columns_dict,
database=record.database,
sql=record.sql,
databaseSchema=record.databaseSchema,
sql=record.query,
date=start_date.strftime("%Y-%m-%d"),
service_name=record.service_name,
serviceName=record.serviceName,
)
# pylint: disable=broad-except
except Exception as err:
@ -96,8 +92,6 @@ class QueryParserProcessor(Processor):
logger.debug(record.sql)
logger.debug(traceback.format_exc())
logger.error(err)
query_parser_data = None
return query_parser_data
def close(self):

View File

@ -15,7 +15,7 @@ import collections
import logging as log
import os
from datetime import datetime
from typing import Iterable
from typing import Any, Dict, Iterable, Optional
from google.cloud import logging
@ -31,34 +31,29 @@ from metadata.generated.schema.entity.services.databaseService import (
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus
from metadata.ingestion.models.table_queries import TableQuery
from metadata.ingestion.source.database.common_db_source import SQLSourceStatus
from metadata.generated.schema.type.tableQuery import TableQuery
from metadata.ingestion.api.source import InvalidSourceException
from metadata.ingestion.source.database.usage_source import UsageSource
from metadata.utils.credentials import set_google_credentials
from metadata.utils.helpers import get_start_and_end
logger = log.getLogger(__name__)
class BigqueryUsageSource(Source[TableQuery]):
class BigqueryUsageSource(UsageSource):
SERVICE_TYPE = DatabaseServiceType.BigQuery.value
scheme = "bigquery"
def __init__(self, config: WorkflowSource, metadata_config: OpenMetadataConnection):
super().__init__()
super().__init__(config, metadata_config)
self.temp_credentials = None
self.metadata_config = metadata_config
self.config = config
self.service_connection = config.serviceConnection.__root__.config
# Used as db
self.project_id = (
self.service_connection.projectId
or self.service_connection.credentials.gcsConfig.projectId
self.connection.projectId or self.connection.credentials.gcsConfig.projectId
)
self.logger_name = "cloudaudit.googleapis.com%2Fdata_access"
self.status = SQLSourceStatus()
self.logging_client = logging.Client()
self.usage_logger = self.logging_client.logger(self.logger_name)
logger.debug("Listing entries for logger {}:".format(self.usage_logger.name))
@classmethod
def create(cls, config_dict, metadata_config: OpenMetadataConnection):
@ -75,30 +70,19 @@ class BigqueryUsageSource(Source[TableQuery]):
return cls(config, metadata_config)
def prepare(self):
pass
def next_record(self) -> Iterable[TableQuery]:
logging_client = logging.Client()
usage_logger = logging_client.logger(self.logger_name)
logger.debug("Listing entries for logger {}:".format(usage_logger.name))
start, end = get_start_and_end(self.config.sourceConfig.config.queryLogDuration)
try:
entries = usage_logger.list_entries()
def _get_raw_extract_iter(self) -> Optional[Iterable[Dict[str, Any]]]:
entries = self.usage_logger.list_entries()
for entry in entries:
timestamp = entry.timestamp.isoformat()
timestamp = datetime.strptime(timestamp[0:10], "%Y-%m-%d")
if timestamp >= start and timestamp <= end:
if timestamp >= self.start and timestamp <= self.end:
if ("query" in str(entry.payload)) and type(
entry.payload
) == collections.OrderedDict:
payload = list(entry.payload.items())[-1][1]
if "jobChange" in payload:
logger.debug(f"\nEntries: {payload}")
if (
"queryConfig"
in payload["jobChange"]["job"]["jobConfig"]
):
if "queryConfig" in payload["jobChange"]["job"]["jobConfig"]:
queryConfig = payload["jobChange"]["job"]["jobConfig"][
"queryConfig"
]
@ -114,31 +98,20 @@ class BigqueryUsageSource(Source[TableQuery]):
jobStats["startTime"][0:19], "%Y-%m-%dT%H:%M:%S"
).strftime("%Y-%m-%d %H:%M:%S")
)
logger.debug(
f"Query :{statementType}:{queryConfig['query']}"
)
logger.debug(f"Query :{statementType}:{queryConfig['query']}")
tq = TableQuery(
query=statementType,
user_name=entry.resource.labels["project_id"],
starttime=str(jobStats["startTime"]),
endtime=str(jobStats["endTime"]),
analysis_date=analysis_date,
query=queryConfig["query"],
userName=entry.resource.labels["project_id"],
startTime=str(jobStats["startTime"]),
endTime=str(jobStats["endTime"]),
analysisDate=analysis_date,
aborted=0,
database=str(database),
sql=queryConfig["query"],
service_name=self.config.serviceName,
serviceName=self.config.serviceName,
databaseSchema=None,
)
yield tq
except Exception as err:
logger.error(repr(err))
def get_status(self) -> SourceStatus:
return self.status
def test_connection(self) -> SourceStatus:
pass
def close(self):
super().close()
if self.temp_credentials:

View File

@ -13,7 +13,6 @@ Clickhouse usage module
"""
import ast
from typing import Iterable
from metadata.generated.schema.entity.services.connections.database.clickhouseConnection import (
ClickhouseConnection,
@ -22,29 +21,17 @@ from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.metadataIngestion.workflow import WorkflowConfig
from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus
# This import verifies that the dependencies are available.
from metadata.ingestion.models.table_queries import TableQuery
from metadata.ingestion.source.database.common_db_source import SQLSourceStatus
from metadata.ingestion.api.source import InvalidSourceException
from metadata.ingestion.source.database.usage_source import UsageSource
from metadata.utils.connections import get_connection, test_connection
from metadata.utils.helpers import get_start_and_end
from metadata.utils.sql_queries import CLICKHOUSE_SQL_USAGE_STATEMENT
class ClickhouseUsageSource(UsageSource):
def __init__(self, config: WorkflowSource, metadata_config: WorkflowConfig):
super().__init__(config, metadata_config)
self.config = config
self.connection = config.serviceConnection.__root__.config
start, end = get_start_and_end(self.config.sourceConfig.config.queryLogDuration)
self.analysis_date = start
self.sql_stmt = CLICKHOUSE_SQL_USAGE_STATEMENT.format(
start_time=start, end_time=end
start_time=self.start, end_time=self.end
)
self.report = SQLSourceStatus()
self.engine = get_connection(self.connection)
@classmethod
def create(cls, config_dict, metadata_config: WorkflowConfig):
@ -57,27 +44,12 @@ class ClickhouseUsageSource(UsageSource):
return cls(config, metadata_config)
def next_record(self) -> Iterable[TableQuery]:
def get_database_name(self, data: dict) -> str:
"""
Using itertools.groupby and raw level iterator,
it groups to table and yields TableMetadata
:return:
Method to fetch database name from row data
"""
for row in self._get_raw_extract_iter():
database = "default"
if row["database_name"]:
database_list = ast.literal_eval(row["database_name"])
if data["database_name"]:
database_list = ast.literal_eval(data["database_name"])
database = database_list[0] if len(database_list) == 1 else "default"
table_query = TableQuery(
query=row["query_id"],
user_name=row["user_name"],
starttime=str(row["start_time"]),
endtime=str(row["end_time"]),
analysis_date=self.analysis_date,
aborted=row["aborted"],
database=database,
sql=row["query_text"],
service_name=self.config.serviceName,
)
yield table_query
return database

View File

@ -23,18 +23,15 @@ from metadata.generated.schema.metadataIngestion.workflow import (
from metadata.generated.schema.metadataIngestion.workflow import WorkflowConfig
from metadata.ingestion.api.source import InvalidSourceException
from metadata.ingestion.source.database.usage_source import UsageSource
# This import verifies that the dependencies are available.
from metadata.utils.helpers import get_start_and_end
from metadata.utils.sql_queries import MSSQL_SQL_USAGE_STATEMENT
class MssqlUsageSource(UsageSource):
def __init__(self, config: WorkflowSource, metadata_config: OpenMetadataConnection):
super().__init__(config, metadata_config)
start, end = get_start_and_end(config.sourceConfig.config.queryLogDuration)
self.analysis_date = start
self.sql_stmt = MSSQL_SQL_USAGE_STATEMENT.format(start_date=start, end_date=end)
self.sql_stmt = MSSQL_SQL_USAGE_STATEMENT.format(
start_date=self.start, end_date=self.end
)
@classmethod
def create(cls, config_dict, metadata_config: WorkflowConfig):

View File

@ -27,7 +27,6 @@ from metadata.generated.schema.metadataIngestion.workflow import (
from metadata.generated.schema.metadataIngestion.workflow import WorkflowConfig
from metadata.ingestion.api.source import InvalidSourceException
from metadata.ingestion.source.database.usage_source import UsageSource
from metadata.utils.helpers import get_start_and_end
# pylint: disable=useless-super-delegation
from metadata.utils.logger import ingestion_logger
@ -37,24 +36,14 @@ logger = ingestion_logger()
class RedshiftUsageSource(UsageSource):
# SELECT statement from mysql information_schema to extract table and column metadata
SQL_STATEMENT = REDSHIFT_SQL_STATEMENT
# CONFIG KEYS
WHERE_CLAUSE_SUFFIX_KEY = "where_clause"
CLUSTER_SOURCE = "cluster_source"
CLUSTER_KEY = "cluster_key"
USE_CATALOG_AS_CLUSTER_NAME = "use_catalog_as_cluster_name"
DATABASE_KEY = "database_key"
SERVICE_TYPE = DatabaseServiceType.Redshift.value
DEFAULT_CLUSTER_SOURCE = "CURRENT_DATABASE()"
def __init__(self, config: WorkflowSource, metadata_config: WorkflowConfig):
super().__init__(config, metadata_config)
start, end = get_start_and_end(self.config.sourceConfig.config.queryLogDuration)
self.sql_stmt = RedshiftUsageSource.SQL_STATEMENT.format(
start_time=start, end_time=end
start_time=self.start, end_time=self.end
)
self.analysis_date = start
self._extract_iter: Union[None, Iterator] = None
self._database = "redshift"

View File

@ -12,7 +12,7 @@
import csv
import json
from datetime import datetime
from typing import Iterable
from typing import Dict, Iterable, Optional
from metadata.generated.schema.entity.services.connections.database.sampleDataConnection import (
SampleDataConnection,
@ -27,23 +27,27 @@ from metadata.generated.schema.entity.services.databaseService import (
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.ingestion.api.source import InvalidSourceException, Source
from metadata.ingestion.models.table_queries import TableQuery
from metadata.generated.schema.type.tableQuery import TableQuery
from metadata.ingestion.api.source import InvalidSourceException
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.common_db_source import SQLSourceStatus
from metadata.ingestion.source.database.sample_data import SampleDataSourceStatus
from metadata.ingestion.source.database.usage_source import UsageSource
class SampleUsageSource(Source[TableQuery]):
class SampleUsageSource(UsageSource):
service_type = DatabaseServiceType.BigQuery.value
def __init__(self, config: WorkflowSource, metadata_config: OpenMetadataConnection):
super().__init__()
self.status = SampleDataSourceStatus()
self.config = config
self.service_connection = config.serviceConnection.__root__.config
self.source_config = config.sourceConfig.config
self.metadata_config = metadata_config
self.report = SQLSourceStatus()
self.metadata = OpenMetadata(metadata_config)
self.analysis_date = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
self.service_json = json.load(
open(
@ -70,29 +74,16 @@ class SampleUsageSource(Source[TableQuery]):
)
return cls(config, metadata_config)
def prepare(self):
pass
def next_record(self) -> Iterable[TableQuery]:
def _get_raw_extract_iter(self) -> Optional[Iterable[Dict[str, str]]]:
for row in self.query_logs:
tq = TableQuery(
yield TableQuery(
query=row["query"],
user_name="",
starttime="",
endtime="",
analysis_date=datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S"),
database="ecommerce_db",
userName="",
startTime="",
endTime="",
analysisDate=self.analysis_date,
aborted=False,
sql=row["query"],
service_name=self.config.serviceName,
database="ecommerce_db",
serviceName=self.config.serviceName,
databaseSchema="shopify",
)
yield tq
def close(self):
pass
def get_status(self):
return self.status
def test_connection(self) -> None:
pass

View File

@ -12,11 +12,8 @@
Snowflake usage module
"""
import traceback
from datetime import timedelta
from typing import Any, Dict, Iterable, Iterator, Union
from sqlalchemy import inspect
from typing import Iterable, Iterator, Union
from metadata.generated.schema.entity.services.connections.database.snowflakeConnection import (
SnowflakeConnection,
@ -31,17 +28,17 @@ from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.metadataIngestion.workflow import WorkflowConfig
from metadata.generated.schema.type.tableQuery import TableQuery
from metadata.ingestion.api.source import InvalidSourceException
# This import verifies that the dependencies are available.
from metadata.ingestion.models.table_queries import TableQuery
from metadata.ingestion.source.database.usage_source import UsageSource
from metadata.utils.connections import get_connection
from metadata.utils.helpers import get_start_and_end
from metadata.utils.logger import ingestion_logger
from metadata.utils.sql_queries import SNOWFLAKE_SQL_STATEMENT
logger = ingestion_logger()
SNOWFLAKE_ABORTED_CODE = "1969"
class SnowflakeUsageSource(UsageSource):
@ -60,12 +57,10 @@ class SnowflakeUsageSource(UsageSource):
def __init__(self, config: WorkflowSource, metadata_config: OpenMetadataConnection):
super().__init__(config, metadata_config)
start, end = get_start_and_end(self.config.sourceConfig.config.queryLogDuration)
end = end + timedelta(days=1)
self.analysis_date = start
self.end = self.end + timedelta(days=1)
self.sql_stmt = SnowflakeUsageSource.SQL_STATEMENT.format(
start_date=start,
end_date=end,
start_date=self.start,
end_date=self.end,
result_limit=self.config.sourceConfig.config.resultLimit,
)
self._extract_iter: Union[None, Iterator] = None
@ -81,9 +76,9 @@ class SnowflakeUsageSource(UsageSource):
)
return cls(config, metadata_config)
def _get_raw_extract_iter(self) -> Iterable[Dict[str, Any]]:
def _get_raw_extract_iter(self) -> Iterable[TableQuery]:
if self.config.serviceConnection.__root__.config.database:
yield from super(SnowflakeUsageSource, self)._get_raw_extract_iter()
yield from super()._get_raw_extract_iter()
else:
query = "SHOW DATABASES"
results = self.engine.execute(query)
@ -96,35 +91,28 @@ class SnowflakeUsageSource(UsageSource):
self.engine = get_connection(self.connection)
rows = self.engine.execute(self.sql_stmt)
for row in rows:
yield row
def next_record(self) -> Iterable[TableQuery]:
"""
Using itertools.groupby and raw level iterator,
it groups to table and yields TableMetadata
:return:
"""
for row in self._get_raw_extract_iter():
try:
table_query = TableQuery(
query=row["query_type"],
user_name=row["user_name"],
starttime=str(row["start_time"]),
endtime=str(row["end_time"]),
analysis_date=self.analysis_date,
aborted="1969" in str(row["end_time"]),
database=row["database_name"],
sql=row["query_text"],
service_name=self.config.serviceName,
yield TableQuery(
query=row["query_text"],
userName=row["user_name"],
startTime=str(row["start_time"]),
endTime=str(row["end_time"]),
analysisDate=self.analysis_date,
aborted=self.get_aborted_status(row),
database=self.get_database_name(row),
serviceName=self.config.serviceName,
databaseSchema=row["schema_name"],
)
if not row["database_name"] and self.connection.database:
TableQuery.database = self.connection.database
logger.debug(f"Parsed Query: {row['query_text']}")
if row["schema_name"] is not None:
self.report.scanned(f"{row['database_name']}.{row['schema_name']}")
else:
self.report.scanned(f"{row['database_name']}")
yield table_query
except Exception as err:
logger.debug(traceback.format_exc())
logger.debug(repr(err))
def get_database_name(self, data: dict) -> str:
"""
Method to get database name
"""
if not data["database_name"] and self.connection.database:
return self.connection.database
return data["database_name"]
def get_aborted_status(self, data: dict) -> bool:
"""
Method to get aborted status of query
"""
return SNOWFLAKE_ABORTED_CODE in str(data["end_time"])

View File

@ -12,7 +12,8 @@
Usage Souce Module
"""
import csv
from typing import Any, Dict, Iterable
import traceback
from typing import Iterable, Optional
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
@ -20,13 +21,17 @@ from metadata.generated.schema.entity.services.connections.metadata.openMetadata
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus
# This import verifies that the dependencies are available.
from metadata.ingestion.models.table_queries import TableQuery
from metadata.generated.schema.type.tableQuery import TableQuery
from metadata.ingestion.api.source import Source, SourceStatus
from metadata.ingestion.source.database.common_db_source import SQLSourceStatus
from metadata.utils.connections import get_connection, test_connection
from metadata.utils.filters import filter_by_database, filter_by_schema
from metadata.utils.helpers import get_start_and_end
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
class UsageSource(Source[TableQuery]):
@ -35,39 +40,63 @@ class UsageSource(Source[TableQuery]):
self.config = config
self.metadata_config = metadata_config
self.connection = config.serviceConnection.__root__.config
start, end = get_start_and_end(self.config.sourceConfig.config.queryLogDuration)
self.analysis_date = start
self.source_config = self.config.sourceConfig.config
self.start, self.end = get_start_and_end(self.source_config.queryLogDuration)
self.analysis_date = self.start
self.report = SQLSourceStatus()
self.engine = get_connection(self.connection)
def prepare(self):
return super().prepare()
def _get_raw_extract_iter(self) -> Iterable[Dict[str, Any]]:
def get_database_name(self, data: dict) -> str:
"""
Method to get database name
"""
return data.get("database_name")
def get_aborted_status(self, data: dict) -> bool:
"""
Method to get aborted status of query
"""
return data.get("aborted", False)
def _get_raw_extract_iter(self) -> Optional[Iterable[TableQuery]]:
"""
If queryLogFilePath available in config iterate through log file
otherwise execute the sql query to fetch TableQuery data
"""
if self.config.sourceConfig.config.queryLogFilePath:
with open(self.config.sourceConfig.config.queryLogFilePath, "r") as fin:
for i in csv.DictReader(fin):
query_dict = dict(i)
row = {
"query_type": query_dict.get("query"),
"user_name": query_dict.get("user_name", ""),
"start_time": query_dict.get("start_time", ""),
"end_time": query_dict.get("end_time", ""),
"aborted": query_dict.get("aborted", False),
"database_name": query_dict.get(
"database_name",
self.connection.database
if self.connection.database
else "default",
),
"query_text": query_dict.get("query"),
"schema_name": query_dict.get("schema_name"),
}
yield row
yield TableQuery(
query=query_dict["query_text"],
userName=query_dict.get("user_name", ""),
startTime=query_dict.get("start_time", ""),
endTime=query_dict.get("end_time", ""),
analysisDate=self.analysis_date,
aborted=self.get_aborted_status(query_dict),
database=self.get_database_name(query_dict),
serviceName=self.config.serviceName,
databaseSchema=query_dict.get("schema_name"),
)
else:
rows = self.engine.execute(self.sql_stmt)
for row in rows:
yield row
row = dict(row)
print(row)
yield TableQuery(
query=row["query_text"],
userName=row["user_name"],
startTime=str(row["start_time"]),
endTime=str(row["end_time"]),
analysisDate=self.analysis_date,
aborted=self.get_aborted_status(row),
database=self.get_database_name(row),
serviceName=self.config.serviceName,
databaseSchema=row["schema_name"],
)
def next_record(self) -> Iterable[TableQuery]:
"""
@ -75,24 +104,32 @@ class UsageSource(Source[TableQuery]):
it groups to table and yields TableMetadata
:return:
"""
for table_query in self._get_raw_extract_iter():
if table_query:
if filter_by_database(
self.source_config.databaseFilterPattern,
database_name=table_query.database,
):
continue
if filter_by_schema(
self.source_config.schemaFilterPattern,
schema_name=table_query.databaseSchema,
):
continue
for row in self._get_raw_extract_iter():
table_query = TableQuery(
query=row["query_type"],
user_name=row["user_name"],
starttime=str(row["start_time"]),
endtime=str(row["end_time"]),
analysis_date=self.analysis_date,
aborted=row["aborted"],
database=row["database_name"],
sql=row["query_text"],
service_name=self.config.serviceName,
)
if not row["schema_name"]:
self.report.scanned(f"{row['database_name']}.{row['schema_name']}")
else:
self.report.scanned(f"{row['database_name']}")
try:
yield table_query
logger.debug(f"Parsed Query: {table_query.query}")
if not table_query.databaseSchema:
self.report.scanned(
f"{table_query.database}.{table_query.databaseSchema}"
)
else:
self.report.scanned(f"{table_query.database}")
yield table_query
except Exception as err:
logger.debug(traceback.format_exc())
logger.error(str(err))
def get_report(self):
"""

View File

@ -16,13 +16,13 @@ from metadata.generated.schema.entity.data.table import SqlQuery
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.ingestion.api.stage import Stage, StageStatus
from metadata.ingestion.models.table_queries import (
QueryParserData,
from metadata.generated.schema.type.queryParserData import QueryParserData
from metadata.generated.schema.type.tableUsageCount import (
TableColumn,
TableColumnJoin,
TableUsageCount,
)
from metadata.ingestion.api.stage import Stage, StageStatus
from metadata.ingestion.stage.file import FileStageConfig
from metadata.utils.logger import ingestion_logger
@ -58,7 +58,7 @@ def get_table_column_join(table, table_aliases, joins, database):
)
except ValueError as err:
logger.error("Error in parsing sql query joins {}".format(err))
return TableColumnJoin(table_column=table_column, joined_with=joined_with)
return TableColumnJoin(tableColumn=table_column, joinedWith=joined_with)
class TableUsageStage(Stage[QueryParserData]):
@ -104,7 +104,7 @@ class TableUsageStage(Stage[QueryParserData]):
table_usage_count.joins.append(
get_table_column_join(
table,
record.tables_aliases,
record.tableAliases,
record.columns["join"],
record.database,
)
@ -114,7 +114,7 @@ class TableUsageStage(Stage[QueryParserData]):
if record.columns.get("join") is not None:
tbl_column_join = get_table_column_join(
table,
record.tables_aliases,
record.tableAliases,
record.columns["join"],
record.database,
)
@ -126,8 +126,9 @@ class TableUsageStage(Stage[QueryParserData]):
database=record.database,
date=record.date,
joins=joins,
service_name=record.service_name,
sql_queries=[],
serviceName=record.serviceName,
sqlQueries=[],
databaseSchema=record.databaseSchema,
)
except Exception as exc:
@ -140,7 +141,7 @@ class TableUsageStage(Stage[QueryParserData]):
def close(self):
for key, value in self.table_usage.items():
value.sql_queries = self.table_queries.get(key, [])
value.sqlQueries = self.table_queries.get(key, [])
data = value.json()
self.file.write(json.dumps(data))
self.file.write("\n")

View File

@ -51,7 +51,9 @@ def get_query_from_dict(data: Dict[str, Optional[str]]) -> str:
)
def get_entity_from_es_result(entity_list: Optional[List[T]]) -> Optional[T]:
def get_entity_from_es_result(
entity_list: Optional[List[T]], fetch_multiple_entities: bool = False
) -> Optional[T]:
"""
Return a single element from an entity list obtained
from an ES query
@ -59,6 +61,8 @@ def get_entity_from_es_result(entity_list: Optional[List[T]]) -> Optional[T]:
:return: single entity
"""
if entity_list and len(entity_list):
if fetch_multiple_entities:
return entity_list
return entity_list[0]
logger.warning("ES Query was empty")

View File

@ -14,9 +14,12 @@ Filter information has been taken from the
ES indexes definitions
"""
import re
from typing import List, Optional, Type, TypeVar
from typing import List, Optional, Type, TypeVar, Union
from antlr4 import *
from antlr4.CommonTokenStream import CommonTokenStream
from antlr4.error.ErrorStrategy import BailErrorStrategy
from antlr4.InputStream import InputStream
from antlr4.tree.Tree import ParseTreeWalker
from pydantic import BaseModel
from metadata.antlr.split_listener import SplitListener
@ -121,6 +124,7 @@ def _(
schema_name: Optional[str],
table_name: str,
retries: int = 3,
fetch_multiple_entities: bool = False,
) -> Optional[str]:
"""
Building logic for tables
@ -148,10 +152,16 @@ def _(
},
retries=retries,
)
entity: Optional[Table] = get_entity_from_es_result(entity_list=es_result)
return str(entity.fullyQualifiedName.__root__) if entity else None
return _build(service_name, database_name, schema_name, table_name)
entity: Optional[Union[Table, List[Table]]] = get_entity_from_es_result(
entity_list=es_result, fetch_multiple_entities=fetch_multiple_entities
)
if not entity:
return None
if fetch_multiple_entities:
return [str(table.fullyQualifiedName.__root__) for table in entity]
return str(entity.fullyQualifiedName.__root__)
fqn = _build(service_name, database_name, schema_name, table_name)
return [fqn] if fetch_multiple_entities else fqn
@fqn_build_registry.add(DatabaseSchema)

View File

@ -10,7 +10,7 @@
# limitations under the License.
from datetime import datetime, timedelta
from typing import Any, Dict, Iterable
from typing import Any, Dict, Iterable, Optional
from metadata.generated.schema.api.services.createDashboardService import (
CreateDashboardServiceRequest,
@ -46,7 +46,7 @@ def get_start_and_end(duration):
start = (today + timedelta(0 - duration)).replace(
hour=0, minute=0, second=0, microsecond=0
)
end = (today + timedelta(3)).replace(hour=0, minute=0, second=0, microsecond=0)
end = today.replace(hour=0, minute=0, second=0, microsecond=0)
return start, end
@ -185,8 +185,9 @@ def datetime_to_ts(date: datetime) -> int:
return int(date.timestamp() * 1_000)
def _get_formmated_table_name(table_name):
return table_name.replace("[", "").replace("]", "")
def get_formatted_entity_name(name: str) -> Optional[str]:
if name:
return name.replace("[", "").replace("]", "")
def get_raw_extract_iter(alchemy_helper) -> Iterable[Dict[str, Any]]:

View File

@ -13,33 +13,99 @@ Helper functions to handle SQL lineage operations
"""
import traceback
from logging.config import DictConfigurator
from typing import List, Optional
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.type.entityLineage import EntitiesEdge
from metadata.generated.schema.type.entityLineage import (
ColumnLineage,
EntitiesEdge,
LineageDetails,
)
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.ometa.client import APIError
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils import fqn
from metadata.utils.helpers import _get_formmated_table_name
from metadata.utils.helpers import get_formatted_entity_name
from metadata.utils.logger import utils_logger
# Prevent sqllineage from modifying the logger config
def configure(self):
pass
DictConfigurator.configure = configure
logger = utils_logger()
column_lineage_map = {}
def _separate_fqn(database, fqn):
database_schema, table = fqn.split(".")[-2:]
if not database_schema:
def split_raw_table_name(database: str, raw_name: str) -> dict:
database_schema = None
return {"database": database, "database_schema": database_schema, "name": table}
if "." in raw_name:
database_schema, table = fqn.split(raw_name)[-2:]
if database_schema == "<default>":
database_schema = None
return {"database": database, "database_schema": database_schema, "table": table}
def get_column_fqn(table_entity: Table, column: str) -> Optional[str]:
"""
Get fqn of column if exist in table entity
"""
if not table_entity:
return
for tbl_column in table_entity.columns:
if column.lower() == tbl_column.name.__root__.lower():
return tbl_column.fullyQualifiedName.__root__
def search_table_entities(
metadata: OpenMetadata,
service_name: str,
database: str,
database_schema: Optional[str],
table: str,
) -> Optional[List[Table]]:
"""
Method to get table entity from database, database_schema & table name
"""
try:
table_fqns = fqn.build(
metadata,
entity_type=Table,
service_name=service_name,
database_name=database,
schema_name=database_schema,
table_name=table,
fetch_multiple_entities=True,
)
table_entities = []
for table_fqn in table_fqns or []:
try:
table_entity = metadata.get_by_name(Table, fqn=table_fqn)
table_entities.append(table_entity)
except APIError:
logger.debug(f"Table not found for fqn: {fqn}")
return table_entities
except Exception as err:
logger.debug(traceback.format_exc())
logger.error(err)
def get_column_lineage(
to_entity: Table,
from_entity: Table,
to_table_raw_name: str,
from_table_raw_name: str,
) -> List[ColumnLineage]:
column_lineage = []
if column_lineage_map.get(to_table_raw_name) and column_lineage_map.get(
to_table_raw_name
).get(from_table_raw_name):
for to_col, from_col in column_lineage_map.get(to_table_raw_name).get(
from_table_raw_name
):
to_col_fqn = get_column_fqn(to_entity, to_col)
from_col_fqn = get_column_fqn(from_entity, from_col)
if to_col_fqn and from_col_fqn:
column_lineage.append(
ColumnLineage(fromColumns=[from_col_fqn], toColumn=to_col_fqn)
)
return column_lineage
def _create_lineage_by_table_name(
@ -48,53 +114,46 @@ def _create_lineage_by_table_name(
to_table: str,
service_name: str,
database: str,
query: str,
):
"""
This method is to create a lineage between two tables
"""
try:
from_table = str(from_table).replace("<default>", "")
to_table = str(to_table).replace("<default>", "")
from_fqn = fqn.build(
metadata,
entity_type=Table,
from_raw_name = get_formatted_entity_name(str(from_table))
from_table_obj = split_raw_table_name(database=database, raw_name=from_raw_name)
from_entities = search_table_entities(
table=from_table_obj.get("table"),
database_schema=from_table_obj.get("database_schema"),
database=from_table_obj.get("database"),
metadata=metadata,
service_name=service_name,
database_name=database,
schema_name=None, # TODO: Split table name
table_name=_get_formmated_table_name(str(from_table)),
)
from_entity: Table = metadata.get_by_name(entity=Table, fqn=from_fqn)
if not from_entity:
table_obj = _separate_fqn(database=database, fqn=from_fqn)
multiple_from_fqns = metadata.es_search_from_service(
entity_type=Table,
to_raw_name = get_formatted_entity_name(str(from_table))
to_table_obj = split_raw_table_name(database=database, raw_name=to_raw_name)
to_entities = search_table_entities(
table=to_table_obj.get("table"),
database_schema=to_table_obj.get("database_schema"),
database=to_table_obj.get("database"),
metadata=metadata,
service_name=service_name,
filters=table_obj,
)
else:
multiple_from_fqns = [from_entity]
to_fqn = fqn.build(
metadata,
entity_type=Table,
service_name=service_name,
database_name=database,
schema_name=None, # TODO: Split table name
table_name=_get_formmated_table_name(str(to_table)),
)
to_entity: Table = metadata.get_by_name(entity=Table, fqn=to_fqn)
if not to_entity:
table_obj = _separate_fqn(database=database, fqn=to_fqn)
multiple_to_fqns = metadata.es_search_from_service(
entity_type=Table,
service_name=service_name,
filters=table_obj,
)
else:
multiple_to_fqns = [to_entity]
if not multiple_to_fqns or not multiple_from_fqns:
if not to_entities or not from_entities:
return None
for from_entity in multiple_from_fqns:
for to_entity in multiple_to_fqns:
for from_entity in from_entities:
for to_entity in to_entities:
col_lineage = get_column_lineage(
to_entity=to_entity,
to_table_raw_name=str(to_table),
from_entity=from_entity,
from_table_raw_name=str(from_table),
)
lineage_details = None
if col_lineage:
lineage_details = LineageDetails(
sqlQuery=query, columnsLineage=col_lineage
)
lineage = AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(
@ -107,13 +166,37 @@ def _create_lineage_by_table_name(
),
)
)
if lineage_details:
lineage.edge.lineageDetails = lineage_details
created_lineage = metadata.add_lineage(lineage)
logger.info(f"Successfully added Lineage {created_lineage}")
except Exception as err:
logger.debug(traceback.format_exc())
logger.error(err)
logger.error(traceback.format_exc())
def populate_column_lineage_map(raw_column_lineage):
lineage_map = {}
if not raw_column_lineage or len(raw_column_lineage[0]) != 2:
return
for source, target in raw_column_lineage:
if lineage_map.get(str(target.parent)):
ele = lineage_map.get(str(target.parent))
if ele.get(str(source.parent)):
ele[str(source.parent)].append(
(
target.raw_name,
source.raw_name,
)
)
else:
ele[str(source.parent)] = [(target.raw_name, source.raw_name)]
else:
lineage_map[str(target.parent)] = {
str(source.parent): [(target.raw_name, source.raw_name)]
}
return lineage_map
def ingest_lineage_by_query(
@ -123,12 +206,22 @@ def ingest_lineage_by_query(
This method parses the query to get source, target and intermediate table names to create lineage,
and returns True if target table is found to create lineage otherwise returns False.
"""
# Prevent sqllineage from modifying the logger config
# Disable the DictConfigurator.configure method while importing LineageRunner
configure = DictConfigurator.configure
DictConfigurator.configure = lambda _: None
from sqllineage.runner import LineageRunner
# Reverting changes after import is done
DictConfigurator.configure = configure
column_lineage_map.clear()
try:
result = LineageRunner(query)
if not result.target_tables:
return False
raw_column_lineage = result.get_column_lineage()
column_lineage_map.update(populate_column_lineage_map(raw_column_lineage))
for intermediate_table in result.intermediate_tables:
for source_table in result.source_tables:
_create_lineage_by_table_name(
@ -137,6 +230,7 @@ def ingest_lineage_by_query(
to_table=intermediate_table,
service_name=service_name,
database=database,
query=query,
)
for target_table in result.target_tables:
_create_lineage_by_table_name(
@ -145,6 +239,7 @@ def ingest_lineage_by_query(
to_table=target_table,
service_name=service_name,
database=database,
query=query,
)
if not result.intermediate_tables:
for target_table in result.target_tables:
@ -155,6 +250,7 @@ def ingest_lineage_by_query(
to_table=target_table,
service_name=service_name,
database=database,
query=query,
)
return True
except Exception as err:

View File

@ -62,7 +62,7 @@ from metadata.generated.schema.tests.table.tableRowCountToEqual import (
)
from metadata.generated.schema.tests.tableTest import TableTestCase, TableTestType
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.models.table_queries import TableUsageRequest
from metadata.generated.schema.type.usageRequest import UsageRequest
from metadata.ingestion.ometa.client import APIError
from metadata.ingestion.ometa.ometa_api import OpenMetadata
@ -340,7 +340,7 @@ class OMetaTableTest(TestCase):
entity=Table, fqn=self.entity.fullyQualifiedName
)
usage = TableUsageRequest(date="2021-10-20", count=10)
usage = UsageRequest(date="2021-10-20", count=10)
self.metadata.publish_table_usage(res, usage)

View File

@ -28,12 +28,16 @@ config = """
"sampleDataFolder": "ingestion/examples/sample_data"
}
},
"sourceConfig": {}
"sourceConfig": {
"config":{
"type": "DatabaseUsage"
}
}
},
"processor": {
"type": "query-parser",
"config": {
"filter": ""
}
},
"stage": {
@ -64,31 +68,31 @@ class QueryParserTest(TestCase):
Check the join count
"""
expected_result = {
"shopify.dim_address": 100,
"shopify.shop": 150,
"shopify.dim_customer": 125,
"dim_customer": 38,
"shopify.dim_location": 75,
"dim_location.shop_id": 25,
"shop": 28,
"shop_id": 25,
"shopify.dim_staff": 75,
"shopify.fact_line_item": 100,
"shopify.fact_order": 155,
"shopify.product": 5,
"shopify.fact_sale": 260,
"dim_address": 12,
"api": 2,
"dim_location": 4,
"product": 16,
"dim_staff": 5,
"fact_line_item": 17,
"fact_order": 15,
"fact_sale": 27,
"fact_session": 31,
"raw_customer": 10,
"raw_order": 13,
"raw_product_catalog": 6,
"shopify.dim_address": 200,
"shopify.shop": 300,
"shopify.dim_customer": 250,
"dim_customer": 76,
"shopify.dim_location": 150,
"dim_location.shop_id": 50,
"shop": 56,
"shop_id": 50,
"shopify.dim_staff": 150,
"shopify.fact_line_item": 200,
"shopify.fact_order": 310,
"shopify.product": 10,
"shopify.fact_sale": 520,
"dim_address": 24,
"api": 4,
"dim_location": 8,
"product": 32,
"dim_staff": 10,
"fact_line_item": 34,
"fact_order": 30,
"fact_sale": 54,
"fact_session": 62,
"raw_customer": 20,
"raw_order": 26,
"raw_product_catalog": 12,
}
workflow = Workflow.create(json.loads(config))
workflow.execute()