diff --git a/ingestion/src/metadata/ingestion/bulksink/metadata_usage.py b/ingestion/src/metadata/ingestion/bulksink/metadata_usage.py index ca63257820f..1c81b8f9be6 100644 --- a/ingestion/src/metadata/ingestion/bulksink/metadata_usage.py +++ b/ingestion/src/metadata/ingestion/bulksink/metadata_usage.py @@ -80,11 +80,13 @@ class MetadataUsageBulkSink(BulkSink): Method to ingest lineage by sql queries """ - create_queries = [ - query.query for query in queries if "create" in query.query.lower() + create_or_insert_queries = [ + query.query + for query in queries + if "create" in query.query.lower() or "insert" in query.query.lower() ] - for query in create_queries: + for query in create_or_insert_queries: lineages = get_lineage_by_query( self.metadata, query=query, diff --git a/ingestion/src/metadata/ingestion/source/database/clickhouse_usage.py b/ingestion/src/metadata/ingestion/source/database/clickhouse_usage.py index 17f4e8cc289..dadf321e949 100644 --- a/ingestion/src/metadata/ingestion/source/database/clickhouse_usage.py +++ b/ingestion/src/metadata/ingestion/source/database/clickhouse_usage.py @@ -44,12 +44,12 @@ class ClickhouseUsageSource(UsageSource): return cls(config, metadata_config) - def get_database_name(self, data: dict) -> str: + def get_schema_name(self, data: dict) -> str: """ - Method to fetch database name from row data + Method to fetch schema name from row data """ - database = "default" - if data["database_name"]: - database_list = ast.literal_eval(data["database_name"]) - database = database_list[0] if len(database_list) == 1 else "default" - return database + schema = None + if data.get("schema_name"): + schema_list = ast.literal_eval(data["schema_name"]) + schema = schema_list[0] if len(schema_list) == 1 else None + return schema diff --git a/ingestion/src/metadata/ingestion/source/database/usage_source.py b/ingestion/src/metadata/ingestion/source/database/usage_source.py index 057021c34a7..602552f1fa6 100644 --- a/ingestion/src/metadata/ingestion/source/database/usage_source.py +++ b/ingestion/src/metadata/ingestion/source/database/usage_source.py @@ -60,6 +60,12 @@ class UsageSource(Source[TableQuery], ABC): """ return data.get("database_name") + def get_schema_name(self, data: dict) -> str: + """ + Method to get schema name + """ + return data.get("schema_name") + def get_aborted_status(self, data: dict) -> bool: """ Method to get aborted status of query @@ -119,10 +125,11 @@ class UsageSource(Source[TableQuery], ABC): ) queries = [] for row in rows: + row = dict(row) try: if filter_by_database( self.source_config.databaseFilterPattern, - self.get_database_name(dict(row)), + self.get_database_name(row), ) or filter_by_schema( self.source_config.schemaFilterPattern, schema_name=row["schema_name"], @@ -135,10 +142,10 @@ class UsageSource(Source[TableQuery], ABC): startTime=str(row["start_time"]), endTime=str(row["end_time"]), analysisDate=row["start_time"], - aborted=self.get_aborted_status(dict(row)), - databaseName=self.get_database_name(dict(row)), + aborted=self.get_aborted_status(row), + databaseName=self.get_database_name(row), serviceName=self.config.serviceName, - databaseSchema=row["schema_name"], + databaseSchema=self.get_schema_name(row), ) ) except Exception as err: diff --git a/ingestion/src/metadata/utils/sql_queries.py b/ingestion/src/metadata/utils/sql_queries.py index 0dac3cf0acf..739cd2967f5 100644 --- a/ingestion/src/metadata/utils/sql_queries.py +++ b/ingestion/src/metadata/utils/sql_queries.py @@ -304,17 +304,18 @@ CLICKHOUSE_SQL_USAGE_STATEMENT = """ Select query_start_time start_time, DATEADD(query_duration_ms, query_start_time) end_time, - databases database_name, + 'default' database_name, user user_name, FALSE aborted, query_id query_id, query query_text, - NULL schema_name, + databases schema_name, tables tables From system.query_log Where start_time between '{start_time}' and '{end_time}' and CAST(type,'Int8') <> 3 and CAST(type,'Int8') <> 4 + and (`type`='QueryFinish' or `type`='QueryStart') """