Fix #4590: Usage through query log file (#4719)

* Usage through query log file

* Pick fileds if available in file

* Created Usage Souce
This commit is contained in:
Mayur Singal 2022-05-09 15:07:52 +05:30 committed by GitHub
parent 400818d04d
commit fd521763d4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 232 additions and 195 deletions

View File

@ -7,9 +7,7 @@
"databaseUsageConfigType": { "databaseUsageConfigType": {
"description": "Database Source Config Usage Pipeline type", "description": "Database Source Config Usage Pipeline type",
"type": "string", "type": "string",
"enum": [ "enum": ["DatabaseUsage"],
"DatabaseUsage"
],
"default": "DatabaseUsage" "default": "DatabaseUsage"
} }
}, },
@ -33,6 +31,10 @@
"description": "Configuration to set the limit for query logs", "description": "Configuration to set the limit for query logs",
"type": "integer", "type": "integer",
"default": "100" "default": "100"
},
"queryLogFilePath": {
"description": "Configuration to set the file path for query logs",
"type": "string"
} }
}, },
"additionalProperties": false "additionalProperties": false

View File

@ -13,7 +13,8 @@
}, },
"sourceConfig": { "sourceConfig": {
"config": { "config": {
"queryLogDuration": "1" "queryLogDuration": "1",
"queryLogFilePath": "<path to query log file>"
} }
} }
}, },

View File

@ -0,0 +1,46 @@
{
"source": {
"type": "query-log-usage",
"serviceName": "local_mysql",
"serviceConnection": {
"config": {
"type": "Mysql",
"username": "openmetadata_user",
"password": "openmetadata_password",
"hostPort": "localhost:3306",
"connectionOptions": {},
"connectionArguments": {}
}
},
"sourceConfig": {
"config": {
"queryLogDuration": "1",
"queryLogFilePath": "<path to query log file>"
}
}
},
"processor": {
"type": "query-parser",
"config": {
"filter": ""
}
},
"stage": {
"type": "table-usage",
"config": {
"filename": "/tmp/query_log_usage"
}
},
"bulkSink": {
"type": "metadata-usage",
"config": {
"filename": "/tmp/query_log_usage"
}
},
"workflowConfig": {
"openMetadataServerConfig": {
"hostPort": "http://localhost:8585/api",
"authProvider": "no-auth"
}
}
}

View File

@ -13,7 +13,7 @@ Clickhouse usage module
""" """
import ast import ast
from typing import Any, Dict, Iterable from typing import Iterable
from metadata.generated.schema.entity.services.connections.database.clickhouseConnection import ( from metadata.generated.schema.entity.services.connections.database.clickhouseConnection import (
ClickhouseConnection, ClickhouseConnection,
@ -27,14 +27,15 @@ from metadata.ingestion.api.source import InvalidSourceException, Source, Source
# This import verifies that the dependencies are available. # This import verifies that the dependencies are available.
from metadata.ingestion.models.table_queries import TableQuery from metadata.ingestion.models.table_queries import TableQuery
from metadata.ingestion.source.sql_alchemy_helper import SQLSourceStatus from metadata.ingestion.source.sql_alchemy_helper import SQLSourceStatus
from metadata.ingestion.source.usage_source import UsageSource
from metadata.utils.connections import get_connection, test_connection from metadata.utils.connections import get_connection, test_connection
from metadata.utils.helpers import get_start_and_end from metadata.utils.helpers import get_start_and_end
from metadata.utils.sql_queries import CLICKHOUSE_SQL_USAGE_STATEMENT from metadata.utils.sql_queries import CLICKHOUSE_SQL_USAGE_STATEMENT
class ClickhouseUsageSource(Source[TableQuery]): class ClickhouseUsageSource(UsageSource):
def __init__(self, config: WorkflowSource, metadata_config: WorkflowConfig): def __init__(self, config: WorkflowSource, metadata_config: WorkflowConfig):
super().__init__() super().__init__(config, metadata_config)
self.config = config self.config = config
self.connection = config.serviceConnection.__root__.config self.connection = config.serviceConnection.__root__.config
start, end = get_start_and_end(self.config.sourceConfig.config.queryLogDuration) start, end = get_start_and_end(self.config.sourceConfig.config.queryLogDuration)
@ -56,15 +57,6 @@ class ClickhouseUsageSource(Source[TableQuery]):
return cls(config, metadata_config) return cls(config, metadata_config)
def prepare(self):
return super().prepare()
def _get_raw_extract_iter(self) -> Iterable[Dict[str, Any]]:
rows = self.engine.execute(self.sql_stmt)
for row in rows:
yield row
def next_record(self) -> Iterable[TableQuery]: def next_record(self) -> Iterable[TableQuery]:
""" """
Using itertools.groupby and raw level iterator, Using itertools.groupby and raw level iterator,
@ -89,20 +81,3 @@ class ClickhouseUsageSource(Source[TableQuery]):
service_name=self.config.serviceName, service_name=self.config.serviceName,
) )
yield table_query yield table_query
def get_report(self):
"""
get report
Returns:
"""
return self.report
def close(self):
pass
def get_status(self) -> SourceStatus:
return self.report
def test_connection(self) -> None:
test_connection(self.engine)

View File

@ -11,36 +11,30 @@
""" """
MSSQL usage module MSSQL usage module
""" """
from typing import Any, Dict, Iterable
from metadata.generated.schema.entity.services.connections.database.mssqlConnection import ( from metadata.generated.schema.entity.services.connections.database.mssqlConnection import (
MssqlConnection, MssqlConnection,
) )
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.metadataIngestion.workflow import ( from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource, Source as WorkflowSource,
) )
from metadata.generated.schema.metadataIngestion.workflow import WorkflowConfig from metadata.generated.schema.metadataIngestion.workflow import WorkflowConfig
from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus from metadata.ingestion.api.source import InvalidSourceException
from metadata.ingestion.source.usage_source import UsageSource
# This import verifies that the dependencies are available. # This import verifies that the dependencies are available.
from metadata.ingestion.models.table_queries import TableQuery
from metadata.ingestion.source.sql_alchemy_helper import SQLSourceStatus
from metadata.utils.connections import get_connection, test_connection
from metadata.utils.helpers import get_start_and_end from metadata.utils.helpers import get_start_and_end
from metadata.utils.sql_queries import MSSQL_SQL_USAGE_STATEMENT from metadata.utils.sql_queries import MSSQL_SQL_USAGE_STATEMENT
class MssqlUsageSource(Source[TableQuery]): class MssqlUsageSource(UsageSource):
def __init__(self, config: WorkflowSource, metadata_config: WorkflowConfig): def __init__(self, config: WorkflowSource, metadata_config: OpenMetadataConnection):
super().__init__() super().__init__(config, metadata_config)
self.config = config start, end = get_start_and_end(config.sourceConfig.config.queryLogDuration)
self.connection = config.serviceConnection.__root__.config
start, end = get_start_and_end(self.config.sourceConfig.config.queryLogDuration)
self.analysis_date = start self.analysis_date = start
self.sql_stmt = MSSQL_SQL_USAGE_STATEMENT.format(start_date=start, end_date=end) self.sql_stmt = MSSQL_SQL_USAGE_STATEMENT.format(start_date=start, end_date=end)
self.report = SQLSourceStatus()
self.engine = get_connection(self.connection)
@classmethod @classmethod
def create(cls, config_dict, metadata_config: WorkflowConfig): def create(cls, config_dict, metadata_config: WorkflowConfig):
@ -52,53 +46,3 @@ class MssqlUsageSource(Source[TableQuery]):
f"Expected MssqlConnection, but got {connection}" f"Expected MssqlConnection, but got {connection}"
) )
return cls(config, metadata_config) return cls(config, metadata_config)
def prepare(self):
return super().prepare()
def _get_raw_extract_iter(self) -> Iterable[Dict[str, Any]]:
rows = self.engine.execute(self.sql_stmt)
for row in rows:
yield row
def next_record(self) -> Iterable[TableQuery]:
"""
Using itertools.groupby and raw level iterator,
it groups to table and yields TableMetadata
:return:
"""
for row in self._get_raw_extract_iter():
table_query = TableQuery(
query=row["query_type"],
user_name=row["user_name"],
starttime=str(row["start_time"]),
endtime=str(row["end_time"]),
analysis_date=self.analysis_date,
aborted=row["aborted"],
database=row["database_name"],
sql=row["query_text"],
service_name=self.config.serviceName,
)
if row["schema_name"] is not None:
self.report.scanned(f"{row['database_name']}.{row['schema_name']}")
else:
self.report.scanned(f"{row['database_name']}")
yield table_query
def get_report(self):
"""
get report
Returns:
"""
return self.report
def close(self):
pass
def get_status(self) -> SourceStatus:
return self.report
def test_connection(self) -> None:
test_connection(self.engine)

View File

@ -0,0 +1,35 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Common Query Log Connector
"""
from datetime import datetime
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.metadataIngestion.workflow import WorkflowConfig
from metadata.ingestion.source.usage_source import UsageSource
class QueryLogUsageSource(UsageSource):
def __init__(self, config: WorkflowSource, metadata_config: OpenMetadataConnection):
super().__init__(config, metadata_config)
self.analysis_date = datetime.today().strftime("%Y-%m-%d %H:%M:%S")
@classmethod
def create(cls, config_dict, metadata_config: WorkflowConfig):
"""Create class instance"""
config: WorkflowSource = WorkflowSource.parse_obj(config_dict)
return cls(config, metadata_config)

View File

@ -13,7 +13,7 @@ Redshift usage module
""" """
# This import verifies that the dependencies are available. # This import verifies that the dependencies are available.
from typing import Any, Dict, Iterable, Iterator, Union from typing import Iterator, Union
from metadata.generated.schema.entity.services.connections.database.redshiftConnection import ( from metadata.generated.schema.entity.services.connections.database.redshiftConnection import (
RedshiftConnection, RedshiftConnection,
@ -25,11 +25,8 @@ from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource, Source as WorkflowSource,
) )
from metadata.generated.schema.metadataIngestion.workflow import WorkflowConfig from metadata.generated.schema.metadataIngestion.workflow import WorkflowConfig
from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus from metadata.ingestion.api.source import InvalidSourceException
from metadata.ingestion.models.table_queries import TableQuery from metadata.ingestion.source.usage_source import UsageSource
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.sql_alchemy_helper import SQLSourceStatus
from metadata.utils.connections import get_connection, test_connection
from metadata.utils.helpers import get_start_and_end from metadata.utils.helpers import get_start_and_end
# pylint: disable=useless-super-delegation # pylint: disable=useless-super-delegation
@ -39,7 +36,7 @@ from metadata.utils.sql_queries import REDSHIFT_SQL_STATEMENT
logger = ingestion_logger() logger = ingestion_logger()
class RedshiftUsageSource(Source[TableQuery]): class RedshiftUsageSource(UsageSource):
# SELECT statement from mysql information_schema to extract table and column metadata # SELECT statement from mysql information_schema to extract table and column metadata
SQL_STATEMENT = REDSHIFT_SQL_STATEMENT SQL_STATEMENT = REDSHIFT_SQL_STATEMENT
# CONFIG KEYS # CONFIG KEYS
@ -52,11 +49,7 @@ class RedshiftUsageSource(Source[TableQuery]):
DEFAULT_CLUSTER_SOURCE = "CURRENT_DATABASE()" DEFAULT_CLUSTER_SOURCE = "CURRENT_DATABASE()"
def __init__(self, config: WorkflowSource, metadata_config: WorkflowConfig): def __init__(self, config: WorkflowSource, metadata_config: WorkflowConfig):
super().__init__() super().__init__(config, metadata_config)
self.config = config
self.service_connection = config.serviceConnection.__root__.config
self.metadata_config = metadata_config
self.metadata = OpenMetadata(metadata_config)
start, end = get_start_and_end(self.config.sourceConfig.config.queryLogDuration) start, end = get_start_and_end(self.config.sourceConfig.config.queryLogDuration)
self.sql_stmt = RedshiftUsageSource.SQL_STATEMENT.format( self.sql_stmt = RedshiftUsageSource.SQL_STATEMENT.format(
start_time=start, end_time=end start_time=start, end_time=end
@ -64,8 +57,6 @@ class RedshiftUsageSource(Source[TableQuery]):
self.analysis_date = start self.analysis_date = start
self._extract_iter: Union[None, Iterator] = None self._extract_iter: Union[None, Iterator] = None
self._database = "redshift" self._database = "redshift"
self.status = SQLSourceStatus()
self.engine = get_connection(self.service_connection)
@classmethod @classmethod
def create(cls, config_dict, metadata_config: WorkflowConfig): def create(cls, config_dict, metadata_config: WorkflowConfig):
@ -76,40 +67,3 @@ class RedshiftUsageSource(Source[TableQuery]):
f"Expected RedshiftConnection, but got {connection}" f"Expected RedshiftConnection, but got {connection}"
) )
return cls(config, metadata_config) return cls(config, metadata_config)
def prepare(self):
pass
def _get_raw_extract_iter(self) -> Iterable[Dict[str, Any]]:
rows = self.engine.execute(self.sql_stmt)
for row in rows:
yield row
def next_record(self) -> Iterable[TableQuery]:
"""
Using itertools.groupby and raw level iterator, it groups to table and yields TableMetadata
:return:
"""
for row in self._get_raw_extract_iter():
tq = TableQuery(
query=row["query"],
user_name=row["usename"],
starttime=str(row["starttime"]),
endtime=str(row["endtime"]),
analysis_date=str(self.analysis_date),
database=self.service_connection.database,
aborted=row["aborted"],
sql=row["querytxt"],
service_name=self.config.serviceName,
)
yield tq
def close(self):
pass
def get_status(self) -> SourceStatus:
return self.status
def test_connection(self) -> None:
test_connection(self.engine)

View File

@ -14,7 +14,7 @@ Snowflake usage module
import traceback import traceback
from datetime import timedelta from datetime import timedelta
from typing import Any, Dict, Iterable, Iterator, Union from typing import Iterable, Iterator, Union
from metadata.generated.schema.entity.services.connections.database.snowflakeConnection import ( from metadata.generated.schema.entity.services.connections.database.snowflakeConnection import (
SnowflakeConnection, SnowflakeConnection,
@ -26,12 +26,11 @@ from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource, Source as WorkflowSource,
) )
from metadata.generated.schema.metadataIngestion.workflow import WorkflowConfig from metadata.generated.schema.metadataIngestion.workflow import WorkflowConfig
from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus from metadata.ingestion.api.source import InvalidSourceException
# This import verifies that the dependencies are available. # This import verifies that the dependencies are available.
from metadata.ingestion.models.table_queries import TableQuery from metadata.ingestion.models.table_queries import TableQuery
from metadata.ingestion.source.sql_alchemy_helper import SQLSourceStatus from metadata.ingestion.source.usage_source import UsageSource
from metadata.utils.connections import get_connection, test_connection
from metadata.utils.helpers import get_start_and_end from metadata.utils.helpers import get_start_and_end
from metadata.utils.logger import ingestion_logger from metadata.utils.logger import ingestion_logger
from metadata.utils.sql_queries import SNOWFLAKE_SQL_STATEMENT from metadata.utils.sql_queries import SNOWFLAKE_SQL_STATEMENT
@ -39,8 +38,7 @@ from metadata.utils.sql_queries import SNOWFLAKE_SQL_STATEMENT
logger = ingestion_logger() logger = ingestion_logger()
class SnowflakeUsageSource(Source[TableQuery]): class SnowflakeUsageSource(UsageSource):
# SELECT statement from mysql information_schema # SELECT statement from mysql information_schema
# to extract table and column metadata # to extract table and column metadata
SQL_STATEMENT = SNOWFLAKE_SQL_STATEMENT SQL_STATEMENT = SNOWFLAKE_SQL_STATEMENT
@ -55,13 +53,10 @@ class SnowflakeUsageSource(Source[TableQuery]):
DEFAULT_CLUSTER_SOURCE = "CURRENT_DATABASE()" DEFAULT_CLUSTER_SOURCE = "CURRENT_DATABASE()"
def __init__(self, config: WorkflowSource, metadata_config: WorkflowConfig): def __init__(self, config: WorkflowSource, metadata_config: WorkflowConfig):
super().__init__() super().__init__(config, metadata_config)
self.config = config
self.service_connection = config.serviceConnection.__root__.config
start, end = get_start_and_end(self.config.sourceConfig.config.queryLogDuration) start, end = get_start_and_end(self.config.sourceConfig.config.queryLogDuration)
end = end + timedelta(days=1) end = end + timedelta(days=1)
self.analysis_date = start self.analysis_date = start
self.metadata_config = metadata_config
self.sql_stmt = SnowflakeUsageSource.SQL_STATEMENT.format( self.sql_stmt = SnowflakeUsageSource.SQL_STATEMENT.format(
start_date=start, start_date=start,
end_date=end, end_date=end,
@ -69,8 +64,6 @@ class SnowflakeUsageSource(Source[TableQuery]):
) )
self._extract_iter: Union[None, Iterator] = None self._extract_iter: Union[None, Iterator] = None
self._database = "Snowflake" self._database = "Snowflake"
self.report = SQLSourceStatus()
self.engine = get_connection(self.service_connection)
@classmethod @classmethod
def create(cls, config_dict, metadata_config: WorkflowConfig): def create(cls, config_dict, metadata_config: WorkflowConfig):
@ -82,15 +75,6 @@ class SnowflakeUsageSource(Source[TableQuery]):
) )
return cls(config, metadata_config) return cls(config, metadata_config)
def prepare(self):
pass
def _get_raw_extract_iter(self) -> Iterable[Dict[str, Any]]:
rows = self.engine.execute(self.sql_stmt)
for row in rows:
yield row
def next_record(self) -> Iterable[TableQuery]: def next_record(self) -> Iterable[TableQuery]:
""" """
Using itertools.groupby and raw level iterator, Using itertools.groupby and raw level iterator,
@ -121,20 +105,3 @@ class SnowflakeUsageSource(Source[TableQuery]):
except Exception as err: except Exception as err:
logger.debug(traceback.format_exc()) logger.debug(traceback.format_exc())
logger.debug(repr(err)) logger.debug(repr(err))
def get_report(self):
"""
get report
Returns:
"""
return self.report
def test_connection(self) -> None:
test_connection(self.engine)
def close(self):
pass
def get_status(self) -> SourceStatus:
return self.report

View File

@ -0,0 +1,113 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Usage Souce Module
"""
import csv
from typing import Any, Dict, Iterable
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.metadataIngestion.workflow import WorkflowConfig
from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus
# This import verifies that the dependencies are available.
from metadata.ingestion.models.table_queries import TableQuery
from metadata.ingestion.source.sql_alchemy_helper import SQLSourceStatus
from metadata.utils.connections import get_connection, test_connection
from metadata.utils.helpers import get_start_and_end
class UsageSource(Source[TableQuery]):
def __init__(self, config: WorkflowSource, metadata_config: OpenMetadataConnection):
super().__init__()
self.config = config
self.metadata_config = metadata_config
self.connection = config.serviceConnection.__root__.config
start, end = get_start_and_end(self.config.sourceConfig.config.queryLogDuration)
self.analysis_date = start
self.report = SQLSourceStatus()
self.engine = get_connection(self.connection)
def prepare(self):
return super().prepare()
def _get_raw_extract_iter(self) -> Iterable[Dict[str, Any]]:
if self.config.sourceConfig.config.queryLogFilePath:
with open(self.config.sourceConfig.config.queryLogFilePath, "r") as fin:
for i in csv.DictReader(fin):
query_dict = dict(i)
row = {
"query_type": query_dict.get("query"),
"user_name": query_dict.get("user_name", ""),
"start_time": query_dict.get("start_time", ""),
"end_time": query_dict.get("end_time", ""),
"aborted": query_dict.get("aborted", False),
"database_name": query_dict.get(
"database_name",
self.connection.database
if self.connection.database
else "default",
),
"query_text": query_dict.get("query"),
"schema_name": query_dict.get("schema_name"),
}
yield row
else:
rows = self.engine.execute(self.sql_stmt)
for row in rows:
yield row
def next_record(self) -> Iterable[TableQuery]:
"""
Using itertools.groupby and raw level iterator,
it groups to table and yields TableMetadata
:return:
"""
for row in self._get_raw_extract_iter():
table_query = TableQuery(
query=row["query_type"],
user_name=row["user_name"],
starttime=str(row["start_time"]),
endtime=str(row["end_time"]),
analysis_date=self.analysis_date,
aborted=row["aborted"],
database=row["database_name"],
sql=row["query_text"],
service_name=self.config.serviceName,
)
if not row["schema_name"]:
self.report.scanned(f"{row['database_name']}.{row['schema_name']}")
else:
self.report.scanned(f"{row['database_name']}")
yield table_query
def get_report(self):
"""
get report
Returns:
"""
return self.report
def close(self):
pass
def get_status(self) -> SourceStatus:
return self.report
def test_connection(self) -> None:
test_connection(self.engine)

View File

@ -2,16 +2,16 @@ import textwrap
REDSHIFT_SQL_STATEMENT = """ REDSHIFT_SQL_STATEMENT = """
SELECT DISTINCT ss.userid, SELECT DISTINCT ss.userid,
ss.query, ss.query query_type,
sui.usename, sui.usename user_name,
ss.tbl, ss.tbl,
sq.querytxt, sq.querytxt query_text,
sti.database, sti.database database_name,
sti.schema, sti.schema schema_name,
sti.table, sti.table,
sq.starttime, sq.starttime start_time,
sq.endtime, sq.endtime end_time,
sq.aborted sq.aborted aborted
FROM stl_scan ss FROM stl_scan ss
JOIN svv_table_info sti ON ss.tbl = sti.table_id JOIN svv_table_info sti ON ss.tbl = sti.table_id
JOIN stl_query sq ON ss.query = sq.query JOIN stl_query sq ON ss.query = sq.query