Fixed MSSQL & Clickhouse Usage (#3976)

This commit is contained in:
Mayur Singal 2022-04-09 13:50:24 +05:30 committed by GitHub
parent d77c60b816
commit 53e4403ccd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 125 additions and 70 deletions

View File

@ -1,15 +1,20 @@
{ {
"source": { "source": {
"type": "clickhouse-usage", "type": "clickhouse_usage",
"serviceName": "local_clickhouse",
"serviceConnection": {
"config": { "config": {
"type": "ClickHouse",
"username":"default", "username":"default",
"password":"", "password":"",
"database": "default", "hostPort": "localhost:8123",
"service_name": "local_clickhouse", "database": "default"
"schema_filter_pattern": { }
"excludes": ["system.*","information_schema.*","INFORMATION_SCHEMA.*"]
}, },
"duration":2 "sourceConfig": {
"config": {
"queryLogDuration": "1"
}
} }
}, },
"processor": { "processor": {
@ -24,17 +29,16 @@
"filename": "/tmp/clickhouse_usage" "filename": "/tmp/clickhouse_usage"
} }
}, },
"bulk_sink": { "bulkSink": {
"type": "metadata-usage", "type": "metadata-usage",
"config": { "config": {
"filename": "/tmp/clickhouse_usage" "filename": "/tmp/clickhouse_usage"
} }
}, },
"metadata_server": { "workflowConfig": {
"type": "metadata-server", "openMetadataServerConfig": {
"config": { "hostPort": "http://localhost:8585/api",
"api_endpoint": "http://localhost:8585/api", "authProvider": "no-auth"
"auth_provider_type": "no-auth"
} }
} }
} }

View File

@ -1,16 +1,19 @@
{ {
"source": { "source": {
"type": "mssql-usage", "type": "mssql-usage",
"serviceName": "local_mssql",
"serviceConnection": {
"config": { "config": {
"host_port": "localhost:1433", "type": "MSSQL",
"service_name": "local_mssql",
"database": "catalog_test", "database": "catalog_test",
"query": "select top 50 * from [{}].[{}]",
"username": "sa", "username": "sa",
"password": "test!Password", "password": "test!Password",
"duration":2, "hostPort": "localhost:1433"
"table_filter_pattern": { }
"excludes": ["catalog_test.*"] },
"sourceConfig": {
"config": {
"queryLogDuration": "1"
} }
} }
}, },
@ -26,17 +29,16 @@
"filename": "/tmp/mssql_usage" "filename": "/tmp/mssql_usage"
} }
}, },
"bulk_sink": { "bulkSink": {
"type": "metadata-usage", "type": "metadata-usage",
"config": { "config": {
"filename": "/tmp/mssql_usage" "filename": "/tmp/mssql_usage"
} }
}, },
"metadata_server": { "workflowConfig": {
"type": "metadata-server", "openMetadataServerConfig": {
"config": { "hostPort": "http://localhost:8585/api",
"api_endpoint": "http://localhost:8585/api", "authProvider": "no-auth"
"auth_provider_type": "no-auth"
} }
} }
} }

View File

@ -18,7 +18,7 @@ from typing import IO, Any, Optional
from pydantic import BaseModel from pydantic import BaseModel
FQDN_SEPARATOR: str = ":" FQDN_SEPARATOR: str = "."
class ConfigModel(BaseModel): class ConfigModel(BaseModel):

View File

@ -28,6 +28,7 @@ from metadata.ingestion.models.table_queries import (
) )
from metadata.ingestion.ometa.client import APIError from metadata.ingestion.ometa.client import APIError
from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils.helpers import _get_formmated_table_name
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -72,12 +73,13 @@ class MetadataUsageBulkSink(BulkSink):
for record in usage_records: for record in usage_records:
table_usage = TableUsageCount(**json.loads(record)) table_usage = TableUsageCount(**json.loads(record))
if "." in table_usage.table: if "." in table_usage.table:
table_usage.database, table_usage.table = table_usage.table.split(".")[ (
-2: table_usage.database_schema,
] table_usage.table,
) = table_usage.table.split(".")[-2:]
self.service_name = table_usage.service_name self.service_name = table_usage.service_name
table_entity = self.__get_table_entity( table_entity = self.__get_table_entity(
table_usage.database, table_usage.table table_usage.database, table_usage.database_schema, table_usage.table
) )
if table_entity is not None: if table_entity is not None:
if not table_usage_map.get(table_entity.id.__root__): if not table_usage_map.get(table_entity.id.__root__):
@ -168,11 +170,13 @@ class MetadataUsageBulkSink(BulkSink):
else: else:
column_joins_dict[column_join.table_column.column] = {} column_joins_dict[column_join.table_column.column] = {}
main_column_fqdn = self.__get_column_fqdn( main_column_fqdn = self.__get_column_fqdn(
table_usage.database, column_join.table_column table_usage.database,
table_usage.database_schema,
column_join.table_column,
) )
for column in column_join.joined_with: for column in column_join.joined_with:
joined_column_fqdn = self.__get_column_fqdn( joined_column_fqdn = self.__get_column_fqdn(
table_usage.database, column table_usage.database, table_usage.database_schema, column
) )
if str(joined_column_fqdn) in joined_with.keys(): if str(joined_column_fqdn) in joined_with.keys():
column_joined_with = joined_with[str(joined_column_fqdn)] column_joined_with = joined_with[str(joined_column_fqdn)]
@ -194,16 +198,23 @@ class MetadataUsageBulkSink(BulkSink):
) )
return table_joins return table_joins
def __get_column_fqdn(self, database: str, table_column: TableColumn): def __get_column_fqdn(
table_entity = self.__get_table_entity(database, table_column.table) self, database: str, database_schema: str, table_column: TableColumn
):
table_entity = self.__get_table_entity(
database, database_schema, table_column.table
)
if table_entity is None: if table_entity is None:
return None return None
for tbl_column in table_entity.columns: for tbl_column in table_entity.columns:
if table_column.column.lower() == tbl_column.name.__root__.lower(): if table_column.column.lower() == tbl_column.name.__root__.lower():
return tbl_column.fullyQualifiedName.__root__ return tbl_column.fullyQualifiedName.__root__.__root__
def __get_table_entity(self, database_name: str, table_name: str) -> Table: def __get_table_entity(
table_fqn = f"{self.service_name}{FQDN_SEPARATOR}{database_name}{FQDN_SEPARATOR}{table_name}" self, database_name: str, database_schema: str, table_name: str
) -> Table:
table_fqn = f"{self.service_name}{FQDN_SEPARATOR}{database_name}{FQDN_SEPARATOR}{database_schema}{FQDN_SEPARATOR}{table_name}"
table_fqn = _get_formmated_table_name(table_fqn)
table_entity = self.metadata.get_by_name(Table, fqdn=table_fqn) table_entity = self.metadata.get_by_name(Table, fqdn=table_fqn)
return table_entity return table_entity

View File

@ -61,6 +61,7 @@ class TableUsageCount(BaseModel):
table: str table: str
date: str date: str
database: str database: str
database_schema: Optional[str]
sql_queries: List[SqlQuery] sql_queries: List[SqlQuery]
count: int = 1 count: int = 1
joins: TableColumnJoins joins: TableColumnJoins

View File

@ -12,19 +12,25 @@
Clickhouse usage module Clickhouse usage module
""" """
import ast
from typing import Any, Dict, Iterable from typing import Any, Dict, Iterable
from metadata.generated.schema.entity.services.connections.database.clickhouseConnection import (
ClickhouseConnection,
)
from metadata.generated.schema.entity.services.databaseService import ( from metadata.generated.schema.entity.services.databaseService import (
DatabaseServiceType, DatabaseServiceType,
) )
from metadata.generated.schema.metadataIngestion.workflow import ( from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataServerConfig, OpenMetadataServerConfig,
) )
from metadata.ingestion.api.source import Source, SourceStatus from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus
# This import verifies that the dependencies are available. # This import verifies that the dependencies are available.
from metadata.ingestion.models.table_queries import TableQuery from metadata.ingestion.models.table_queries import TableQuery
from metadata.ingestion.source.clickhouse import ClickhouseConfig
from metadata.ingestion.source.sql_alchemy_helper import ( from metadata.ingestion.source.sql_alchemy_helper import (
SQLAlchemyHelper, SQLAlchemyHelper,
SQLSourceStatus, SQLSourceStatus,
@ -50,16 +56,19 @@ class ClickhouseUsageSource(Source[TableQuery]):
report: report:
""" """
def __init__(self, config, metadata_config): def __init__(
self, config: WorkflowSource, metadata_config: OpenMetadataServerConfig
):
super().__init__() super().__init__()
self.config = config self.config = config
start, end = get_start_and_end(config.duration) self.connection = config.serviceConnection.__root__.config
start, end = get_start_and_end(self.config.sourceConfig.config.queryLogDuration)
self.analysis_date = start self.analysis_date = start
self.sql_stmt = CLICKHOUSE_SQL_USAGE_STATEMENT.format( self.sql_stmt = CLICKHOUSE_SQL_USAGE_STATEMENT.format(
start_time=start, end_time=end start_time=start, end_time=end
) )
self.alchemy_helper = SQLAlchemyHelper( self.alchemy_helper = SQLAlchemyHelper(
config, self.connection,
metadata_config, metadata_config,
DatabaseServiceType.ClickHouse.value, DatabaseServiceType.ClickHouse.value,
self.sql_stmt, self.sql_stmt,
@ -68,7 +77,13 @@ class ClickhouseUsageSource(Source[TableQuery]):
@classmethod @classmethod
def create(cls, config_dict, metadata_config: OpenMetadataServerConfig): def create(cls, config_dict, metadata_config: OpenMetadataServerConfig):
config = ClickhouseConfig.parse_obj(config_dict) config: WorkflowSource = WorkflowSource.parse_obj(config_dict)
connection: ClickhouseConnection = config.serviceConnection.__root__.config
if not isinstance(connection, ClickhouseConnection):
raise InvalidSourceException(
f"Expected ClickhouseConnection, but got {connection}"
)
return cls(config, metadata_config) return cls(config, metadata_config)
def prepare(self): def prepare(self):
@ -90,6 +105,11 @@ class ClickhouseUsageSource(Source[TableQuery]):
:return: :return:
""" """
for row in get_raw_extract_iter(self.alchemy_helper): for row in get_raw_extract_iter(self.alchemy_helper):
database = "default"
if row["database_name"]:
database_list = ast.literal_eval(row["database_name"])
database = database_list[0] if len(database_list) == 1 else "default"
table_query = TableQuery( table_query = TableQuery(
query=row["query_id"], query=row["query_id"],
user_name=row["user_name"], user_name=row["user_name"],
@ -97,11 +117,9 @@ class ClickhouseUsageSource(Source[TableQuery]):
endtime=str(row["end_time"]), endtime=str(row["end_time"]),
analysis_date=self.analysis_date, analysis_date=self.analysis_date,
aborted=row["aborted"], aborted=row["aborted"],
database=row["database_name"][0] database=database,
if len(row["database_name"]) >= 1
else "default",
sql=row["query_text"], sql=row["query_text"],
service_name=self.config.service_name, service_name=self.config.serviceName,
) )
yield table_query yield table_query

View File

@ -14,17 +14,22 @@ MSSQL usage module
from typing import Any, Dict, Iterable from typing import Any, Dict, Iterable
from metadata.generated.schema.entity.services.connections.database.mssqlConnection import (
MssqlConnection,
)
from metadata.generated.schema.entity.services.databaseService import ( from metadata.generated.schema.entity.services.databaseService import (
DatabaseServiceType, DatabaseServiceType,
) )
from metadata.generated.schema.metadataIngestion.workflow import ( from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataServerConfig, OpenMetadataServerConfig,
) )
from metadata.ingestion.api.source import Source, SourceStatus from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus
# This import verifies that the dependencies are available. # This import verifies that the dependencies are available.
from metadata.ingestion.models.table_queries import TableQuery from metadata.ingestion.models.table_queries import TableQuery
from metadata.ingestion.source.mssql import MssqlConfig
from metadata.ingestion.source.sql_alchemy_helper import ( from metadata.ingestion.source.sql_alchemy_helper import (
SQLAlchemyHelper, SQLAlchemyHelper,
SQLSourceStatus, SQLSourceStatus,
@ -49,20 +54,32 @@ class MssqlUsageSource(Source[TableQuery]):
report: report:
""" """
def __init__(self, config, metadata_config): def __init__(
self, config: WorkflowSource, metadata_config: OpenMetadataServerConfig
):
super().__init__() super().__init__()
self.config = config self.config = config
start, end = get_start_and_end(config.duration) self.connection = config.serviceConnection.__root__.config
start, end = get_start_and_end(self.config.sourceConfig.config.queryLogDuration)
self.analysis_date = start self.analysis_date = start
self.sql_stmt = MSSQL_SQL_USAGE_STATEMENT.format(start_date=start, end_date=end) self.sql_stmt = MSSQL_SQL_USAGE_STATEMENT.format(start_date=start, end_date=end)
self.alchemy_helper = SQLAlchemyHelper( self.alchemy_helper = SQLAlchemyHelper(
config, metadata_config, DatabaseServiceType.MSSQL.value, self.sql_stmt self.connection,
metadata_config,
DatabaseServiceType.MSSQL.value,
self.sql_stmt,
) )
self.report = SQLSourceStatus() self.report = SQLSourceStatus()
@classmethod @classmethod
def create(cls, config_dict, metadata_config: OpenMetadataServerConfig): def create(cls, config_dict, metadata_config: OpenMetadataServerConfig):
config = MssqlConfig.parse_obj(config_dict) """Create class instance"""
config: WorkflowSource = WorkflowSource.parse_obj(config_dict)
connection: MssqlConnection = config.serviceConnection.__root__.config
if not isinstance(connection, MssqlConnection):
raise InvalidSourceException(
f"Expected MssqlConnection, but got {connection}"
)
return cls(config, metadata_config) return cls(config, metadata_config)
def prepare(self): def prepare(self):
@ -93,7 +110,7 @@ class MssqlUsageSource(Source[TableQuery]):
aborted=row["aborted"], aborted=row["aborted"],
database=row["database_name"], database=row["database_name"],
sql=row["query_text"], sql=row["query_text"],
service_name=self.config.service_name, service_name=self.config.serviceName,
) )
if row["schema_name"] is not None: if row["schema_name"] is not None:
self.report.scanned(f"{row['database_name']}.{row['schema_name']}") self.report.scanned(f"{row['database_name']}.{row['schema_name']}")

View File

@ -15,7 +15,8 @@ from typing import Any, Iterable
from sqlalchemy import create_engine from sqlalchemy import create_engine
from ...generated.schema.metadataIngestion.workflow import OpenMetadataServerConfig from ...generated.schema.metadataIngestion.workflow import OpenMetadataServerConfig
from .sql_source import SQLConnectionConfig, SQLSourceStatus from ...utils.source_connections import get_connection_url
from .sql_source import SQLSourceStatus
class SQLAlchemyHelper: class SQLAlchemyHelper:
@ -23,7 +24,7 @@ class SQLAlchemyHelper:
def __init__( def __init__(
self, self,
config: SQLConnectionConfig, config,
metadata_config: OpenMetadataServerConfig, metadata_config: OpenMetadataServerConfig,
platform: str, platform: str,
query: str, query: str,
@ -39,9 +40,11 @@ class SQLAlchemyHelper:
Create a SQLAlchemy connection to Database Create a SQLAlchemy connection to Database
""" """
engine = create_engine( engine = create_engine(
self.config.get_connection_url(), get_connection_url(self.config),
**self.config.options, **self.config.connectionOptions if self.config.connectionOptions else {},
connect_args=self.config.connect_args connect_args=self.config.connectionArguments
if self.config.connectionArguments
else {}
) )
conn = engine.connect() conn = engine.connect()
return conn return conn

View File

@ -62,7 +62,6 @@ def get_connection_url_common(connection):
if connection.username: if connection.username:
url += f"{connection.username}" url += f"{connection.username}"
if connection.password:
url += ( url += (
f":{quote_plus(connection.password.get_secret_value())}" f":{quote_plus(connection.password.get_secret_value())}"
if connection if connection