Fix #3529 - Profiler is not working on views (#3617)

* tmp pass dialect when building ORM

* Format
This commit is contained in:
Pere Miquel Brull 2022-03-23 18:38:59 +01:00 committed by GitHub
parent 49c089e4a3
commit 792be37cb2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 20 additions and 18 deletions

View File

@ -16,9 +16,6 @@ from sqlalchemy import column
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.sql.functions import FunctionElement
from metadata.generated.schema.entity.services.databaseService import (
DatabaseServiceType,
)
from metadata.orm_profiler.metrics.core import CACHE, StaticMetric, _label
from metadata.orm_profiler.orm.registry import Dialects, is_quantifiable
from metadata.orm_profiler.utils import logger

View File

@ -14,14 +14,14 @@ Converter logic to transform an OpenMetadata Table Entity
to an SQLAlchemy ORM class.
"""
from functools import singledispatch
from typing import Union
from typing import Optional, Union
import sqlalchemy
from sqlalchemy.orm import DeclarativeMeta, declarative_base
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.table import Column, DataType, Table
from metadata.orm_profiler.orm.registry import CustomTypes
from metadata.orm_profiler.orm.registry import CustomTypes, Dialects
Base = declarative_base()
@ -84,7 +84,9 @@ def build_orm_col(idx: int, col: Column) -> sqlalchemy.Column:
)
def ometa_to_orm(table: Table, database: Union[Database, str]) -> DeclarativeMeta:
def ometa_to_orm(
table: Table, database: Union[Database, str], dialect: Optional[str] = None
) -> DeclarativeMeta:
"""
Given an OpenMetadata instance, prepare
the SQLAlchemy DeclarativeMeta class
@ -93,6 +95,7 @@ def ometa_to_orm(table: Table, database: Union[Database, str]) -> DeclarativeMet
We are building the class dynamically using
`type` and passing SQLAlchemy `Base` class
as the bases tuple for inheritance.
TODO: Remove the dialect once we solve the hierarchy service.db.schema.table. Check #3529
"""
cols = {
@ -107,7 +110,7 @@ def ometa_to_orm(table: Table, database: Union[Database, str]) -> DeclarativeMet
{
"__tablename__": str(table.name.__root__),
"__table_args__": {
"schema": get_db_name(database),
"schema": get_db_name(database, dialect),
"extend_existing": True, # Recreates the table ORM object if it already exists. Useful for testing
},
**cols,
@ -121,7 +124,7 @@ def ometa_to_orm(table: Table, database: Union[Database, str]) -> DeclarativeMet
@singledispatch
def get_db_name(arg) -> str:
def get_db_name(arg, *_) -> str:
"""
Return the database name to pass the table schema info
to the ORM object.
@ -133,7 +136,7 @@ def get_db_name(arg) -> str:
@get_db_name.register
def _(arg: str) -> str:
def _(arg: str, *_) -> str:
"""
Return string as is
@ -144,11 +147,17 @@ def _(arg: str) -> str:
@get_db_name.register
def _(arg: Database) -> str:
def _(arg: Database, dialect: Optional[str] = None) -> str:
"""
Get the db name from the database entity
:param arg: database
:param dialect: Database dialect. Just for snowflake cleaning
:return: db name
"""
return str(arg.name.__root__)
name = str(arg.name.__root__)
if dialect == Dialects.Snowflake:
return "_".join(name.split("_")[1:])
return name

View File

@ -15,9 +15,6 @@ Define Concat function
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.sql.functions import FunctionElement
from metadata.generated.schema.entity.services.databaseService import (
DatabaseServiceType,
)
from metadata.orm_profiler.metrics.core import CACHE
from metadata.orm_profiler.orm.registry import Dialects
from metadata.orm_profiler.utils import logger

View File

@ -19,9 +19,6 @@ data.
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.sql.functions import FunctionElement
from metadata.generated.schema.entity.services.databaseService import (
DatabaseServiceType,
)
from metadata.orm_profiler.metrics.core import CACHE
from metadata.orm_profiler.orm.registry import Dialects
from metadata.orm_profiler.utils import logger

View File

@ -466,7 +466,9 @@ class OrmProfilerProcessor(Processor[Table]):
database = self.metadata.get_by_id(
entity=Database, entity_id=record.database.id
)
orm_table = ometa_to_orm(table=record, database=database)
orm_table = ometa_to_orm(
table=record, database=database, dialect=self.session.bind.dialect.name
)
entity_profile = self.profile_entity(orm_table, record)