mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-09-25 17:04:54 +00:00
parent
f0926d9b75
commit
9da62be6f4
@ -80,11 +80,13 @@ class MetadataUsageBulkSink(BulkSink):
|
||||
Method to ingest lineage by sql queries
|
||||
"""
|
||||
|
||||
create_queries = [
|
||||
query.query for query in queries if "create" in query.query.lower()
|
||||
create_or_insert_queries = [
|
||||
query.query
|
||||
for query in queries
|
||||
if "create" in query.query.lower() or "insert" in query.query.lower()
|
||||
]
|
||||
|
||||
for query in create_queries:
|
||||
for query in create_or_insert_queries:
|
||||
lineages = get_lineage_by_query(
|
||||
self.metadata,
|
||||
query=query,
|
||||
|
@ -44,12 +44,12 @@ class ClickhouseUsageSource(UsageSource):
|
||||
|
||||
return cls(config, metadata_config)
|
||||
|
||||
def get_database_name(self, data: dict) -> str:
|
||||
def get_schema_name(self, data: dict) -> str:
|
||||
"""
|
||||
Method to fetch database name from row data
|
||||
Method to fetch schema name from row data
|
||||
"""
|
||||
database = "default"
|
||||
if data["database_name"]:
|
||||
database_list = ast.literal_eval(data["database_name"])
|
||||
database = database_list[0] if len(database_list) == 1 else "default"
|
||||
return database
|
||||
schema = None
|
||||
if data.get("schema_name"):
|
||||
schema_list = ast.literal_eval(data["schema_name"])
|
||||
schema = schema_list[0] if len(schema_list) == 1 else None
|
||||
return schema
|
||||
|
@ -60,6 +60,12 @@ class UsageSource(Source[TableQuery], ABC):
|
||||
"""
|
||||
return data.get("database_name")
|
||||
|
||||
def get_schema_name(self, data: dict) -> str:
|
||||
"""
|
||||
Method to get schema name
|
||||
"""
|
||||
return data.get("schema_name")
|
||||
|
||||
def get_aborted_status(self, data: dict) -> bool:
|
||||
"""
|
||||
Method to get aborted status of query
|
||||
@ -119,10 +125,11 @@ class UsageSource(Source[TableQuery], ABC):
|
||||
)
|
||||
queries = []
|
||||
for row in rows:
|
||||
row = dict(row)
|
||||
try:
|
||||
if filter_by_database(
|
||||
self.source_config.databaseFilterPattern,
|
||||
self.get_database_name(dict(row)),
|
||||
self.get_database_name(row),
|
||||
) or filter_by_schema(
|
||||
self.source_config.schemaFilterPattern,
|
||||
schema_name=row["schema_name"],
|
||||
@ -135,10 +142,10 @@ class UsageSource(Source[TableQuery], ABC):
|
||||
startTime=str(row["start_time"]),
|
||||
endTime=str(row["end_time"]),
|
||||
analysisDate=row["start_time"],
|
||||
aborted=self.get_aborted_status(dict(row)),
|
||||
databaseName=self.get_database_name(dict(row)),
|
||||
aborted=self.get_aborted_status(row),
|
||||
databaseName=self.get_database_name(row),
|
||||
serviceName=self.config.serviceName,
|
||||
databaseSchema=row["schema_name"],
|
||||
databaseSchema=self.get_schema_name(row),
|
||||
)
|
||||
)
|
||||
except Exception as err:
|
||||
|
@ -304,17 +304,18 @@ CLICKHOUSE_SQL_USAGE_STATEMENT = """
|
||||
Select
|
||||
query_start_time start_time,
|
||||
DATEADD(query_duration_ms, query_start_time) end_time,
|
||||
databases database_name,
|
||||
'default' database_name,
|
||||
user user_name,
|
||||
FALSE aborted,
|
||||
query_id query_id,
|
||||
query query_text,
|
||||
NULL schema_name,
|
||||
databases schema_name,
|
||||
tables tables
|
||||
From system.query_log
|
||||
Where start_time between '{start_time}' and '{end_time}'
|
||||
and CAST(type,'Int8') <> 3
|
||||
and CAST(type,'Int8') <> 4
|
||||
and (`type`='QueryFinish' or `type`='QueryStart')
|
||||
"""
|
||||
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user