Add support for external profiler workflow (#13887)

* Add support for external profiler workflow

* pylint

* resolve comments

* resolve comments

* pyformat

* fix code smell
This commit is contained in:
Mayur Singal 2023-11-09 18:50:47 +05:30 committed by GitHub
parent a8145a82fa
commit 4c19bb5a1d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 504 additions and 51 deletions

View File

@ -11,7 +11,7 @@
"""Azure SQL source module"""
import traceback
from typing import Iterable
from typing import Iterable, Optional
from sqlalchemy.dialects.mssql.base import MSDialect, ischema_names
@ -24,12 +24,14 @@ from metadata.generated.schema.metadataIngestion.workflow import (
)
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.azuresql.queries import AZURE_SQL_GET_DATABASES
from metadata.ingestion.source.database.common_db_source import CommonDbSourceService
from metadata.ingestion.source.database.mssql.utils import (
get_columns,
get_table_comment,
get_view_definition,
)
from metadata.ingestion.source.database.multi_db_source import MultiDBSource
from metadata.utils import fqn
from metadata.utils.filters import filter_by_database
from metadata.utils.logger import ingestion_logger
@ -50,7 +52,7 @@ MSDialect.get_all_table_comments = get_all_table_comments
MSDialect.get_columns = get_columns
class AzuresqlSource(CommonDbSourceService):
class AzuresqlSource(CommonDbSourceService, MultiDBSource):
"""
Implements the necessary methods to extract
Database metadata from Azuresql Source
@ -66,6 +68,14 @@ class AzuresqlSource(CommonDbSourceService):
)
return cls(config, metadata)
def get_configured_database(self) -> Optional[str]:
if not self.service_connection.ingestAllDatabases:
return self.service_connection.database
return None
def get_database_names_raw(self) -> Iterable[str]:
yield from self._execute_database_query(AZURE_SQL_GET_DATABASES)
def get_database_names(self) -> Iterable[str]:
if not self.config.serviceConnection.__root__.config.ingestAllDatabases:
@ -73,12 +83,7 @@ class AzuresqlSource(CommonDbSourceService):
self.set_inspector(database_name=configured_db)
yield configured_db
else:
results = self.connection.execute(
"SELECT name FROM master.sys.databases order by name"
)
for res in results:
row = list(res)
new_database = row[0]
for new_database in self.get_database_names_raw():
database_fqn = fqn.build(
self.metadata,
entity_type=Database,

View File

@ -0,0 +1,15 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
SQL Queries used during ingestion
"""
AZURE_SQL_GET_DATABASES = "SELECT name FROM master.sys.databases order by name"

View File

@ -68,6 +68,7 @@ from metadata.ingestion.source.database.common_db_source import (
CommonDbSourceService,
TableNameAndType,
)
from metadata.ingestion.source.database.multi_db_source import MultiDBSource
from metadata.ingestion.source.database.stored_procedures_mixin import (
QueryByProcedure,
StoredProcedureMixin,
@ -190,7 +191,7 @@ BigQueryDialect._build_formatted_table_id = ( # pylint: disable=protected-acces
)
class BigquerySource(StoredProcedureMixin, CommonDbSourceService):
class BigquerySource(StoredProcedureMixin, CommonDbSourceService, MultiDBSource):
"""
Implements the necessary methods to extract
Database metadata from Bigquery Source
@ -421,6 +422,12 @@ class BigquerySource(StoredProcedureMixin, CommonDbSourceService):
self.engine = inspector_details.engine
self.inspector = inspector_details.inspector
def get_configured_database(self) -> Optional[str]:
return None
def get_database_names_raw(self) -> Iterable[str]:
yield from self.project_ids
def get_database_names(self) -> Iterable[str]:
for project_id in self.project_ids:
database_fqn = fqn.build(

View File

@ -13,7 +13,7 @@ Greenplum source module
"""
import traceback
from collections import namedtuple
from typing import Iterable, Tuple
from typing import Iterable, Optional, Tuple
from sqlalchemy import sql
from sqlalchemy.dialects.postgresql.base import PGDialect, ischema_names
@ -51,6 +51,7 @@ from metadata.ingestion.source.database.greenplum.utils import (
get_table_comment,
get_view_definition,
)
from metadata.ingestion.source.database.multi_db_source import MultiDBSource
from metadata.utils import fqn
from metadata.utils.filters import filter_by_database
from metadata.utils.logger import ingestion_logger
@ -109,7 +110,7 @@ PGDialect.get_all_view_definitions = get_all_view_definitions
PGDialect.ischema_names = ischema_names
class GreenplumSource(CommonDbSourceService):
class GreenplumSource(CommonDbSourceService, MultiDBSource):
"""
Implements the necessary methods to extract
Database metadata from Greenplum Source
@ -144,16 +145,21 @@ class GreenplumSource(CommonDbSourceService):
for name, relkind in result
]
def get_configured_database(self) -> Optional[str]:
if not self.service_connection.ingestAllDatabases:
return self.service_connection.database
return None
def get_database_names_raw(self) -> Iterable[str]:
yield from self._execute_database_query(GREENPLUM_GET_DB_NAMES)
def get_database_names(self) -> Iterable[str]:
if not self.config.serviceConnection.__root__.config.ingestAllDatabases:
configured_db = self.config.serviceConnection.__root__.config.database
self.set_inspector(database_name=configured_db)
yield configured_db
else:
results = self.connection.execute(GREENPLUM_GET_DB_NAMES)
for res in results:
row = list(res)
new_database = row[0]
for new_database in self.get_database_names_raw():
database_fqn = fqn.build(
self.metadata,
entity_type=Database,

View File

@ -10,7 +10,7 @@
# limitations under the License.
"""MSSQL source module"""
import traceback
from typing import Iterable
from typing import Iterable, Optional
from sqlalchemy.dialects.mssql.base import MSDialect, ischema_names
@ -24,11 +24,13 @@ from metadata.generated.schema.metadataIngestion.workflow import (
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.common_db_source import CommonDbSourceService
from metadata.ingestion.source.database.mssql.queries import MSSQL_GET_DATABASE
from metadata.ingestion.source.database.mssql.utils import (
get_columns,
get_table_comment,
get_view_definition,
)
from metadata.ingestion.source.database.multi_db_source import MultiDBSource
from metadata.utils import fqn
from metadata.utils.filters import filter_by_database
from metadata.utils.logger import ingestion_logger
@ -53,7 +55,7 @@ MSDialect.get_all_table_comments = get_all_table_comments
MSDialect.get_columns = get_columns
class MssqlSource(CommonDbSourceService):
class MssqlSource(CommonDbSourceService, MultiDBSource):
"""
Implements the necessary methods to extract
Database metadata from MSSQL Source
@ -70,6 +72,14 @@ class MssqlSource(CommonDbSourceService):
)
return cls(config, metadata)
def get_configured_database(self) -> Optional[str]:
if not self.service_connection.ingestAllDatabases:
return self.service_connection.database
return None
def get_database_names_raw(self) -> Iterable[str]:
yield from self._execute_database_query(MSSQL_GET_DATABASE)
def get_database_names(self) -> Iterable[str]:
if not self.config.serviceConnection.__root__.config.ingestAllDatabases:
@ -77,12 +87,7 @@ class MssqlSource(CommonDbSourceService):
self.set_inspector(database_name=configured_db)
yield configured_db
else:
results = self.connection.execute(
"SELECT name FROM master.sys.databases order by name"
)
for res in results:
row = list(res)
new_database = row[0]
for new_database in self.get_database_names_raw():
database_fqn = fqn.build(
self.metadata,
entity_type=Database,

View File

@ -0,0 +1,37 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Multi DB Source Abstract class
"""
from abc import ABC, abstractmethod
from typing import Iterable, Optional
class MultiDBSource(ABC):
@abstractmethod
def get_configured_database(self) -> Optional[str]:
"""
Method to return the name of default configured database if available
"""
@abstractmethod
def get_database_names_raw(self) -> Iterable[str]:
"""
Method to return the name of all databases.
"""
def _execute_database_query(self, query: str) -> Iterable[str]:
results = self.connection.execute(query) # pylint: disable=no-member
for res in results:
row = list(res)
yield row[0]

View File

@ -13,7 +13,7 @@ Postgres source module
"""
import traceback
from collections import namedtuple
from typing import Iterable, Tuple
from typing import Iterable, Optional, Tuple
from sqlalchemy import sql
from sqlalchemy.dialects.postgresql.base import PGDialect, ischema_names
@ -40,6 +40,7 @@ from metadata.ingestion.source.database.common_db_source import (
CommonDbSourceService,
TableNameAndType,
)
from metadata.ingestion.source.database.multi_db_source import MultiDBSource
from metadata.ingestion.source.database.postgres.queries import (
POSTGRES_GET_ALL_TABLE_PG_POLICY,
POSTGRES_GET_DB_NAMES,
@ -111,7 +112,7 @@ PGDialect.get_all_view_definitions = get_all_view_definitions
PGDialect.ischema_names = ischema_names
class PostgresSource(CommonDbSourceService):
class PostgresSource(CommonDbSourceService, MultiDBSource):
"""
Implements the necessary methods to extract
Database metadata from Postgres Source
@ -146,16 +147,21 @@ class PostgresSource(CommonDbSourceService):
for name, relkind in result
]
def get_configured_database(self) -> Optional[str]:
if not self.service_connection.ingestAllDatabases:
return self.service_connection.database
return None
def get_database_names_raw(self) -> Iterable[str]:
yield from self._execute_database_query(POSTGRES_GET_DB_NAMES)
def get_database_names(self) -> Iterable[str]:
if not self.config.serviceConnection.__root__.config.ingestAllDatabases:
configured_db = self.config.serviceConnection.__root__.config.database
self.set_inspector(database_name=configured_db)
yield configured_db
else:
results = self.connection.execute(POSTGRES_GET_DB_NAMES)
for res in results:
row = list(res)
new_database = row[0]
for new_database in self.get_database_names_raw():
database_fqn = fqn.build(
self.metadata,
entity_type=Database,

View File

@ -50,6 +50,7 @@ from metadata.ingestion.source.database.common_db_source import (
CommonDbSourceService,
TableNameAndType,
)
from metadata.ingestion.source.database.multi_db_source import MultiDBSource
from metadata.ingestion.source.database.redshift.models import RedshiftStoredProcedure
from metadata.ingestion.source.database.redshift.queries import (
REDSHIFT_GET_ALL_RELATION_INFO,
@ -101,7 +102,7 @@ RedshiftDialect._get_all_relation_info = ( # pylint: disable=protected-access
)
class RedshiftSource(StoredProcedureMixin, CommonDbSourceService):
class RedshiftSource(StoredProcedureMixin, CommonDbSourceService, MultiDBSource):
"""
Implements the necessary methods to extract
Database metadata from Redshift Source
@ -153,16 +154,21 @@ class RedshiftSource(StoredProcedureMixin, CommonDbSourceService):
for name, relkind in result
]
def get_configured_database(self) -> Optional[str]:
if not self.service_connection.ingestAllDatabases:
return self.service_connection.database
return None
def get_database_names_raw(self) -> Iterable[str]:
yield from self._execute_database_query(REDSHIFT_GET_DATABASE_NAMES)
def get_database_names(self) -> Iterable[str]:
if not self.config.serviceConnection.__root__.config.ingestAllDatabases:
self.inspector = inspect(self.engine)
self.get_partition_details()
yield self.config.serviceConnection.__root__.config.database
else:
results = self.connection.execute(REDSHIFT_GET_DATABASE_NAMES)
for res in results:
row = list(res)
new_database = row[0]
for new_database in self.get_database_names_raw():
database_fqn = fqn.build(
self.metadata,
entity_type=Database,

View File

@ -52,6 +52,7 @@ from metadata.ingestion.source.database.common_db_source import (
from metadata.ingestion.source.database.life_cycle_query_mixin import (
LifeCycleQueryMixin,
)
from metadata.ingestion.source.database.multi_db_source import MultiDBSource
from metadata.ingestion.source.database.snowflake.models import (
STORED_PROC_LANGUAGE_MAP,
SnowflakeStoredProcedure,
@ -122,7 +123,9 @@ SnowflakeDialect.get_foreign_keys = get_foreign_keys
SnowflakeDialect.get_columns = get_columns
class SnowflakeSource(LifeCycleQueryMixin, StoredProcedureMixin, CommonDbSourceService):
class SnowflakeSource(
LifeCycleQueryMixin, StoredProcedureMixin, CommonDbSourceService, MultiDBSource
):
"""
Implements the necessary methods to extract
Database metadata from Snowflake Source
@ -214,6 +217,15 @@ class SnowflakeSource(LifeCycleQueryMixin, StoredProcedureMixin, CommonDbSourceS
"""
return self.database_desc_map.get(database_name)
def get_configured_database(self) -> Optional[str]:
return self.service_connection.database
def get_database_names_raw(self) -> Iterable[str]:
results = self.connection.execute(SNOWFLAKE_GET_DATABASES)
for res in results:
row = list(res)
yield row[1]
def get_database_names(self) -> Iterable[str]:
configured_db = self.config.serviceConnection.__root__.config.database
if configured_db:
@ -224,10 +236,7 @@ class SnowflakeSource(LifeCycleQueryMixin, StoredProcedureMixin, CommonDbSourceS
self.set_database_description_map()
yield configured_db
else:
results = self.connection.execute(SNOWFLAKE_GET_DATABASES)
for res in results:
row = list(res)
new_database = row[1]
for new_database in self.get_database_names_raw():
database_fqn = fqn.build(
self.metadata,
entity_type=Database,

View File

@ -32,6 +32,7 @@ from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.column_type_parser import create_sqlalchemy_type
from metadata.ingestion.source.database.common_db_source import CommonDbSourceService
from metadata.ingestion.source.database.multi_db_source import MultiDBSource
from metadata.ingestion.source.database.vertica.queries import (
VERTICA_GET_COLUMNS,
VERTICA_GET_PRIMARY_KEYS,
@ -262,7 +263,7 @@ VerticaDialect.get_all_table_comments = get_all_table_comments
VerticaDialect.get_table_comment = get_table_comment
class VerticaSource(CommonDbSourceService):
class VerticaSource(CommonDbSourceService, MultiDBSource):
"""
Implements the necessary methods to extract
Database metadata from Vertica Source
@ -293,6 +294,12 @@ class VerticaSource(CommonDbSourceService):
self.engine, VERTICA_SCHEMA_COMMENTS
)
def get_configured_database(self) -> Optional[str]:
return self.service_connection.database
def get_database_names_raw(self) -> Iterable[str]:
yield from self._execute_database_query(VERTICA_LIST_DATABASES)
def get_database_names(self) -> Iterable[str]:
configured_db = self.config.serviceConnection.__root__.config.database
if configured_db:
@ -300,10 +307,7 @@ class VerticaSource(CommonDbSourceService):
self.set_schema_description_map()
yield configured_db
else:
results = self.connection.execute(VERTICA_LIST_DATABASES)
for res in results:
row = list(res)
new_database = row[0]
for new_database in self.get_database_names_raw():
database_fqn = fqn.build(
self.metadata,
entity_type=Database,

View File

@ -64,13 +64,16 @@ class OpenMetadataSource(Source):
We do this here as well.
"""
def init_steps(self):
super().__init__()
# pylint: disable=super-init-not-called
def __init__(
self,
config: OpenMetadataWorkflowConfig,
metadata: OpenMetadata,
):
super().__init__()
self.init_steps()
self.config = config
self.metadata = metadata

View File

@ -0,0 +1,293 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
OpenMetadataExt source for the profiler
This source is used in cases where the service name
is not provided for the profiler workflow.
In such situations, the profiler will perform a thorough scan
of the entire data source to locate the
corresponding table entity in OpenMetadata.
Subsequently, it will proceed to ingest relevant metrics
and sample data for that identified entity.
"""
import traceback
from copy import deepcopy
from typing import Iterable, cast
from sqlalchemy.inspection import inspect
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.table import Table, TableType
from metadata.generated.schema.metadataIngestion.databaseServiceMetadataPipeline import (
DatabaseServiceMetadataPipeline,
)
from metadata.generated.schema.metadataIngestion.databaseServiceProfilerPipeline import (
DatabaseServiceProfilerPipeline,
)
from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig,
)
from metadata.ingestion.api.models import Either, StackTraceError
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.connections import get_connection
from metadata.profiler.source.metadata import (
OpenMetadataSource,
ProfilerSourceAndEntity,
)
from metadata.profiler.source.profiler_source_factory import profiler_source_factory
from metadata.utils import fqn
from metadata.utils.class_helper import get_service_type_from_source_type
from metadata.utils.filters import filter_by_database, filter_by_schema, filter_by_table
from metadata.utils.importer import import_source_class
from metadata.utils.logger import profiler_logger
logger = profiler_logger()
class OpenMetadataSourceExt(OpenMetadataSource):
"""
This source lists and filters the entities that need
to be processed by the profiler workflow.
Note that in order to manage the following steps we need
to test the connection against the Database Service Source.
We do this here as well.
"""
# pylint: disable=super-init-not-called
def __init__(
self,
config: OpenMetadataWorkflowConfig,
metadata: OpenMetadata,
):
self.init_steps()
self.config = config
self.metadata = metadata
self.test_connection()
# Init and type the source config
self.service_connection = self.config.source.serviceConnection.__root__.config
self.source_config: DatabaseServiceProfilerPipeline = cast(
DatabaseServiceProfilerPipeline, self.config.source.sourceConfig.config
) # Used to satisfy type checked
source_type = self.config.source.type.lower()
service_type = get_service_type_from_source_type(self.config.source.type)
source_class = import_source_class(
service_type=service_type, source_type=source_type
)
database_source_config = DatabaseServiceMetadataPipeline()
new_config = deepcopy(self.config.source)
new_config.sourceConfig.config = database_source_config
self.source = source_class.create(new_config.dict(), self.metadata)
self.engine = None
self.inspector = None
self._connection = None
self.set_inspector()
logger.info(
f"Starting profiler for service {self.config.source.type}"
f":{self.config.source.type.lower()}"
)
def set_inspector(self, database_name: str = None) -> None:
"""
When sources override `get_database_names`, they will need
to setup multiple inspectors. They can use this function.
:param database_name: new database to set
"""
new_service_connection = deepcopy(self.service_connection)
if database_name:
logger.info(f"Ingesting from database: {database_name}")
new_service_connection.database = database_name
self.engine = get_connection(new_service_connection)
self.inspector = inspect(self.engine)
self._connection = None # Lazy init as well
def _iter(self, *_, **__) -> Iterable[Either[ProfilerSourceAndEntity]]:
for database_name in self.get_database_names():
try:
database_entity = fqn.search_database_from_es(
database_name=database_name,
metadata=self.metadata,
service_name=None,
)
if not database_entity:
logger.debug(
f"Database Entity for database `{database_name}` not found"
)
continue
for schema_name in self.get_schema_names():
for table_name in self.get_table_names(schema_name):
table_entity = fqn.search_table_from_es(
metadata=self.metadata,
database_name=database_name,
service_name=None,
schema_name=schema_name,
table_name=table_name,
fields="tableProfilerConfig",
)
if not table_entity:
logger.debug(
f"Table Entity for table `{database_name}.{schema_name}.{table_name}` not found"
)
continue
profiler_source = profiler_source_factory.create(
self.config.source.type.lower(),
self.config,
database_entity,
self.metadata,
)
yield Either(
right=ProfilerSourceAndEntity(
profiler_source=profiler_source,
entity=table_entity,
)
)
except Exception as exc:
yield Either(
left=StackTraceError(
name=database_name,
error=f"Error listing source and entities for database due to [{exc}]",
stack_trace=traceback.format_exc(),
)
)
def get_table_names(self, schema_name: str) -> Iterable[str]:
for table_name in self.inspector.get_table_names(schema_name) or []:
if filter_by_table(self.source_config.tableFilterPattern, table_name):
self.status.filter(table_name, "Table pattern not allowed")
continue
yield table_name
def get_schema_names(self) -> Iterable[str]:
if self.service_connection.__dict__.get("databaseSchema"):
yield self.service_connection.databaseSchema
else:
for schema_name in self.inspector.get_schema_names():
if filter_by_schema(
self.source_config.schemaFilterPattern, schema_name
):
self.status.filter(schema_name, "Schema pattern not allowed")
continue
yield schema_name
def get_database_names(self) -> Iterable[str]:
"""
Method to fetch database names from source
"""
try:
if hasattr(self.service_connection, "supportsDatabase"):
configured_db = self.source.get_configured_database()
if configured_db:
yield configured_db
else:
database_names = self.source.get_database_names_raw()
for database in database_names:
if filter_by_database(
self.source_config.databaseFilterPattern, database
):
self.status.filter(database, "Database pattern not allowed")
continue
self.set_inspector(database_name=database)
yield database
else:
custom_database_name = self.service_connection.__dict__.get(
"databaseName"
)
database_name = self.service_connection.__dict__.get(
"database", custom_database_name or "default"
)
yield database_name
except Exception as exc:
logger.debug(f"Failed to fetch database names {exc}")
logger.debug(traceback.format_exc())
def filter_entities(self, tables: Iterable[Table]) -> Iterable[Table]:
"""
From a list of tables, apply the SQLSourceConfig
filter patterns.
We will update the status on the SQLSource Status.
"""
for table in tables:
try:
if filter_by_schema(
self.source_config.schemaFilterPattern,
table.databaseSchema.name, # type: ignore
):
self.status.filter(
f"Schema pattern not allowed: {table.fullyQualifiedName.__root__}",
"Schema pattern not allowed",
)
continue
if filter_by_table(
self.source_config.tableFilterPattern,
table.name.__root__,
):
self.status.filter(
f"Table pattern not allowed: {table.fullyQualifiedName.__root__}",
"Table pattern not allowed",
)
continue
if (
table.tableType == TableType.View
and not self.source_config.includeViews
):
self.status.filter(
table.fullyQualifiedName.__root__,
"View filtered out",
)
continue
yield table
except Exception as exc:
self.status.failed(
StackTraceError(
name=table.fullyQualifiedName.__root__,
error=f"Unexpected error filtering entities for table [{table}]: {exc}",
stack_trace=traceback.format_exc(),
)
)
def get_table_entities(self, database):
"""
List and filter OpenMetadata tables based on the
source configuration.
The listing will be based on the entities from the
informed service name in the source configuration.
Note that users can specify `table_filter_pattern` to
either be `includes` or `excludes`. This means
that we will either what is specified in `includes`
or we will use everything but the tables excluded.
Same with `schema_filter_pattern`.
"""
tables = self.metadata.list_all_entities(
entity=Table,
fields=[
"tableProfilerConfig",
],
params={
"service": self.config.source.serviceName,
"database": fqn.build(
self.metadata,
entity_type=Database,
service_name=self.config.source.serviceName,
database_name=database.name.__root__,
),
}, # type: ignore
)
yield from self.filter_entities(tables)

View File

@ -531,12 +531,12 @@ def build_es_fqn_search_string(
Returns:
FQN search string
"""
if not service_name or not table_name:
if not table_name:
raise FQNBuildingException(
f"Service Name and Table Name should be informed, but got service=`{service_name}`, table=`{table_name}`"
f"Table Name should be informed, but got table=`{table_name}`"
)
fqn_search_string = _build(
service_name, database_name or "*", schema_name or "*", table_name
service_name or "*", database_name or "*", schema_name or "*", table_name
)
return fqn_search_string
@ -548,6 +548,7 @@ def search_table_from_es(
service_name: str,
table_name: str,
fetch_multiple_entities: bool = False,
fields: Optional[str] = None,
):
fqn_search_string = build_es_fqn_search_string(
database_name, schema_name, service_name, table_name
@ -556,6 +557,36 @@ def search_table_from_es(
es_result = metadata.es_search_from_fqn(
entity_type=Table,
fqn_search_string=fqn_search_string,
fields=fields,
)
return get_entity_from_es_result(
entity_list=es_result, fetch_multiple_entities=fetch_multiple_entities
)
def search_database_from_es(
metadata: OpenMetadata,
database_name: str,
service_name: Optional[str],
fetch_multiple_entities: Optional[bool] = False,
fields: Optional[str] = None,
):
"""
Search Database entity from ES
"""
if not database_name:
raise FQNBuildingException(
f"Database Name should be informed, but got database=`{database_name}`"
)
fqn_search_string = _build(service_name or "*", database_name)
es_result = metadata.es_search_from_fqn(
entity_type=Database,
fqn_search_string=fqn_search_string,
fields=fields,
)
return get_entity_from_es_result(

View File

@ -12,6 +12,7 @@
Workflow definition for metadata related ingestions: metadata and lineage.
"""
from metadata.config.common import WorkflowExecutionError
from metadata.ingestion.api.steps import Sink, Source
from metadata.utils.importer import (
import_from_module,
@ -40,6 +41,12 @@ class MetadataWorkflow(BaseWorkflow):
def _get_source(self) -> Source:
# Source that we are ingesting, e.g., mysql, looker or kafka
source_type = self.config.source.type.lower()
if not self.config.source.serviceName:
raise WorkflowExecutionError(
"serviceName is required field for executing the Metadata Workflow. "
"You can find more information on how to build the YAML "
"configuration here: https://docs.open-metadata.org/connectors"
)
source_class = (
import_from_module(

View File

@ -19,6 +19,7 @@ from metadata.ingestion.source.connections import get_connection, get_test_conne
from metadata.pii.processor import PIIProcessor
from metadata.profiler.processor.processor import ProfilerProcessor
from metadata.profiler.source.metadata import OpenMetadataSource
from metadata.profiler.source.metadata_ext import OpenMetadataSourceExt
from metadata.utils.importer import import_sink_class
from metadata.utils.logger import profiler_logger
from metadata.workflow.base import BaseWorkflow
@ -40,8 +41,19 @@ class ProfilerWorkflow(BaseWorkflow):
# Validate that we can properly reach the source database
self.test_connection()
def _get_source_class(self):
if self.config.source.serviceName:
return OpenMetadataSource
logger.info(
"Database Service name not provided, we will scan all the tables"
"available within data source and locate table entity in OpenMetadata"
"to ingest profiler data."
)
return OpenMetadataSourceExt
def set_steps(self):
self.source = OpenMetadataSource.create(self.config.dict(), self.metadata)
source_class = self._get_source_class()
self.source = source_class.create(self.config.dict(), self.metadata)
profiler_processor = self._get_profiler_processor()
pii_processor = self._get_pii_processor()

View File

@ -12,6 +12,7 @@
Usage Workflow Definition
"""
from metadata.config.common import WorkflowExecutionError
from metadata.ingestion.api.steps import BulkSink, Processor, Source, Stage
from metadata.utils.importer import (
import_bulk_sink_type,
@ -44,6 +45,12 @@ class UsageWorkflow(BaseWorkflow):
def _get_source(self) -> Source:
# Source that we are ingesting, e.g., mysql, looker or kafka
source_type = self.config.source.type.lower()
if not self.config.source.serviceName:
raise WorkflowExecutionError(
"ServiceName is required field for executing the Usage Workflow. "
"You can find more information on how to build the YAML "
"configuration here: https://docs.open-metadata.org/connectors"
)
source_class = (
import_from_module(

View File

@ -83,7 +83,7 @@
}
},
"additionalProperties": false,
"required": ["type", "serviceName", "sourceConfig"]
"required": ["type", "sourceConfig"]
},
"processor": {
"description": "Configuration for Processor Component in the OpenMetadata Ingestion Framework.",