diff --git a/ingestion/src/metadata/ingestion/source/database/azuresql/metadata.py b/ingestion/src/metadata/ingestion/source/database/azuresql/metadata.py index fce9d3dd59b..58b8fb96fa8 100644 --- a/ingestion/src/metadata/ingestion/source/database/azuresql/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/azuresql/metadata.py @@ -11,7 +11,7 @@ """Azure SQL source module""" import traceback -from typing import Iterable +from typing import Iterable, Optional from sqlalchemy.dialects.mssql.base import MSDialect, ischema_names @@ -24,12 +24,14 @@ from metadata.generated.schema.metadataIngestion.workflow import ( ) from metadata.ingestion.api.steps import InvalidSourceException from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.ingestion.source.database.azuresql.queries import AZURE_SQL_GET_DATABASES from metadata.ingestion.source.database.common_db_source import CommonDbSourceService from metadata.ingestion.source.database.mssql.utils import ( get_columns, get_table_comment, get_view_definition, ) +from metadata.ingestion.source.database.multi_db_source import MultiDBSource from metadata.utils import fqn from metadata.utils.filters import filter_by_database from metadata.utils.logger import ingestion_logger @@ -50,7 +52,7 @@ MSDialect.get_all_table_comments = get_all_table_comments MSDialect.get_columns = get_columns -class AzuresqlSource(CommonDbSourceService): +class AzuresqlSource(CommonDbSourceService, MultiDBSource): """ Implements the necessary methods to extract Database metadata from Azuresql Source @@ -66,6 +68,14 @@ class AzuresqlSource(CommonDbSourceService): ) return cls(config, metadata) + def get_configured_database(self) -> Optional[str]: + if not self.service_connection.ingestAllDatabases: + return self.service_connection.database + return None + + def get_database_names_raw(self) -> Iterable[str]: + yield from self._execute_database_query(AZURE_SQL_GET_DATABASES) + def get_database_names(self) -> Iterable[str]: if not self.config.serviceConnection.__root__.config.ingestAllDatabases: @@ -73,12 +83,7 @@ class AzuresqlSource(CommonDbSourceService): self.set_inspector(database_name=configured_db) yield configured_db else: - results = self.connection.execute( - "SELECT name FROM master.sys.databases order by name" - ) - for res in results: - row = list(res) - new_database = row[0] + for new_database in self.get_database_names_raw(): database_fqn = fqn.build( self.metadata, entity_type=Database, diff --git a/ingestion/src/metadata/ingestion/source/database/azuresql/queries.py b/ingestion/src/metadata/ingestion/source/database/azuresql/queries.py new file mode 100644 index 00000000000..b98f4afec76 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/azuresql/queries.py @@ -0,0 +1,15 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +SQL Queries used during ingestion +""" + +AZURE_SQL_GET_DATABASES = "SELECT name FROM master.sys.databases order by name" diff --git a/ingestion/src/metadata/ingestion/source/database/bigquery/metadata.py b/ingestion/src/metadata/ingestion/source/database/bigquery/metadata.py index 2ab083e12a9..051919dac8d 100644 --- a/ingestion/src/metadata/ingestion/source/database/bigquery/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/bigquery/metadata.py @@ -68,6 +68,7 @@ from metadata.ingestion.source.database.common_db_source import ( CommonDbSourceService, TableNameAndType, ) +from metadata.ingestion.source.database.multi_db_source import MultiDBSource from metadata.ingestion.source.database.stored_procedures_mixin import ( QueryByProcedure, StoredProcedureMixin, @@ -190,7 +191,7 @@ BigQueryDialect._build_formatted_table_id = ( # pylint: disable=protected-acces ) -class BigquerySource(StoredProcedureMixin, CommonDbSourceService): +class BigquerySource(StoredProcedureMixin, CommonDbSourceService, MultiDBSource): """ Implements the necessary methods to extract Database metadata from Bigquery Source @@ -421,6 +422,12 @@ class BigquerySource(StoredProcedureMixin, CommonDbSourceService): self.engine = inspector_details.engine self.inspector = inspector_details.inspector + def get_configured_database(self) -> Optional[str]: + return None + + def get_database_names_raw(self) -> Iterable[str]: + yield from self.project_ids + def get_database_names(self) -> Iterable[str]: for project_id in self.project_ids: database_fqn = fqn.build( diff --git a/ingestion/src/metadata/ingestion/source/database/greenplum/metadata.py b/ingestion/src/metadata/ingestion/source/database/greenplum/metadata.py index bcc15a4b88f..a277252c44d 100644 --- a/ingestion/src/metadata/ingestion/source/database/greenplum/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/greenplum/metadata.py @@ -13,7 +13,7 @@ Greenplum source module """ import traceback from collections import namedtuple -from typing import Iterable, Tuple +from typing import Iterable, Optional, Tuple from sqlalchemy import sql from sqlalchemy.dialects.postgresql.base import PGDialect, ischema_names @@ -51,6 +51,7 @@ from metadata.ingestion.source.database.greenplum.utils import ( get_table_comment, get_view_definition, ) +from metadata.ingestion.source.database.multi_db_source import MultiDBSource from metadata.utils import fqn from metadata.utils.filters import filter_by_database from metadata.utils.logger import ingestion_logger @@ -109,7 +110,7 @@ PGDialect.get_all_view_definitions = get_all_view_definitions PGDialect.ischema_names = ischema_names -class GreenplumSource(CommonDbSourceService): +class GreenplumSource(CommonDbSourceService, MultiDBSource): """ Implements the necessary methods to extract Database metadata from Greenplum Source @@ -144,16 +145,21 @@ class GreenplumSource(CommonDbSourceService): for name, relkind in result ] + def get_configured_database(self) -> Optional[str]: + if not self.service_connection.ingestAllDatabases: + return self.service_connection.database + return None + + def get_database_names_raw(self) -> Iterable[str]: + yield from self._execute_database_query(GREENPLUM_GET_DB_NAMES) + def get_database_names(self) -> Iterable[str]: if not self.config.serviceConnection.__root__.config.ingestAllDatabases: configured_db = self.config.serviceConnection.__root__.config.database self.set_inspector(database_name=configured_db) yield configured_db else: - results = self.connection.execute(GREENPLUM_GET_DB_NAMES) - for res in results: - row = list(res) - new_database = row[0] + for new_database in self.get_database_names_raw(): database_fqn = fqn.build( self.metadata, entity_type=Database, diff --git a/ingestion/src/metadata/ingestion/source/database/mssql/metadata.py b/ingestion/src/metadata/ingestion/source/database/mssql/metadata.py index c505dc2e4c9..6aac7b9122f 100644 --- a/ingestion/src/metadata/ingestion/source/database/mssql/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/mssql/metadata.py @@ -10,7 +10,7 @@ # limitations under the License. """MSSQL source module""" import traceback -from typing import Iterable +from typing import Iterable, Optional from sqlalchemy.dialects.mssql.base import MSDialect, ischema_names @@ -24,11 +24,13 @@ from metadata.generated.schema.metadataIngestion.workflow import ( from metadata.ingestion.api.steps import InvalidSourceException from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.database.common_db_source import CommonDbSourceService +from metadata.ingestion.source.database.mssql.queries import MSSQL_GET_DATABASE from metadata.ingestion.source.database.mssql.utils import ( get_columns, get_table_comment, get_view_definition, ) +from metadata.ingestion.source.database.multi_db_source import MultiDBSource from metadata.utils import fqn from metadata.utils.filters import filter_by_database from metadata.utils.logger import ingestion_logger @@ -53,7 +55,7 @@ MSDialect.get_all_table_comments = get_all_table_comments MSDialect.get_columns = get_columns -class MssqlSource(CommonDbSourceService): +class MssqlSource(CommonDbSourceService, MultiDBSource): """ Implements the necessary methods to extract Database metadata from MSSQL Source @@ -70,6 +72,14 @@ class MssqlSource(CommonDbSourceService): ) return cls(config, metadata) + def get_configured_database(self) -> Optional[str]: + if not self.service_connection.ingestAllDatabases: + return self.service_connection.database + return None + + def get_database_names_raw(self) -> Iterable[str]: + yield from self._execute_database_query(MSSQL_GET_DATABASE) + def get_database_names(self) -> Iterable[str]: if not self.config.serviceConnection.__root__.config.ingestAllDatabases: @@ -77,12 +87,7 @@ class MssqlSource(CommonDbSourceService): self.set_inspector(database_name=configured_db) yield configured_db else: - results = self.connection.execute( - "SELECT name FROM master.sys.databases order by name" - ) - for res in results: - row = list(res) - new_database = row[0] + for new_database in self.get_database_names_raw(): database_fqn = fqn.build( self.metadata, entity_type=Database, diff --git a/ingestion/src/metadata/ingestion/source/database/multi_db_source.py b/ingestion/src/metadata/ingestion/source/database/multi_db_source.py new file mode 100644 index 00000000000..394bdd8b2ed --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/multi_db_source.py @@ -0,0 +1,37 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Multi DB Source Abstract class +""" + +from abc import ABC, abstractmethod +from typing import Iterable, Optional + + +class MultiDBSource(ABC): + @abstractmethod + def get_configured_database(self) -> Optional[str]: + """ + Method to return the name of default configured database if available + """ + + @abstractmethod + def get_database_names_raw(self) -> Iterable[str]: + """ + Method to return the name of all databases. + """ + + def _execute_database_query(self, query: str) -> Iterable[str]: + results = self.connection.execute(query) # pylint: disable=no-member + for res in results: + row = list(res) + yield row[0] diff --git a/ingestion/src/metadata/ingestion/source/database/postgres/metadata.py b/ingestion/src/metadata/ingestion/source/database/postgres/metadata.py index 7ddde065ee2..6494c83878d 100644 --- a/ingestion/src/metadata/ingestion/source/database/postgres/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/postgres/metadata.py @@ -13,7 +13,7 @@ Postgres source module """ import traceback from collections import namedtuple -from typing import Iterable, Tuple +from typing import Iterable, Optional, Tuple from sqlalchemy import sql from sqlalchemy.dialects.postgresql.base import PGDialect, ischema_names @@ -40,6 +40,7 @@ from metadata.ingestion.source.database.common_db_source import ( CommonDbSourceService, TableNameAndType, ) +from metadata.ingestion.source.database.multi_db_source import MultiDBSource from metadata.ingestion.source.database.postgres.queries import ( POSTGRES_GET_ALL_TABLE_PG_POLICY, POSTGRES_GET_DB_NAMES, @@ -111,7 +112,7 @@ PGDialect.get_all_view_definitions = get_all_view_definitions PGDialect.ischema_names = ischema_names -class PostgresSource(CommonDbSourceService): +class PostgresSource(CommonDbSourceService, MultiDBSource): """ Implements the necessary methods to extract Database metadata from Postgres Source @@ -146,16 +147,21 @@ class PostgresSource(CommonDbSourceService): for name, relkind in result ] + def get_configured_database(self) -> Optional[str]: + if not self.service_connection.ingestAllDatabases: + return self.service_connection.database + return None + + def get_database_names_raw(self) -> Iterable[str]: + yield from self._execute_database_query(POSTGRES_GET_DB_NAMES) + def get_database_names(self) -> Iterable[str]: if not self.config.serviceConnection.__root__.config.ingestAllDatabases: configured_db = self.config.serviceConnection.__root__.config.database self.set_inspector(database_name=configured_db) yield configured_db else: - results = self.connection.execute(POSTGRES_GET_DB_NAMES) - for res in results: - row = list(res) - new_database = row[0] + for new_database in self.get_database_names_raw(): database_fqn = fqn.build( self.metadata, entity_type=Database, diff --git a/ingestion/src/metadata/ingestion/source/database/redshift/metadata.py b/ingestion/src/metadata/ingestion/source/database/redshift/metadata.py index 403378c952f..9109721c1da 100644 --- a/ingestion/src/metadata/ingestion/source/database/redshift/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/redshift/metadata.py @@ -50,6 +50,7 @@ from metadata.ingestion.source.database.common_db_source import ( CommonDbSourceService, TableNameAndType, ) +from metadata.ingestion.source.database.multi_db_source import MultiDBSource from metadata.ingestion.source.database.redshift.models import RedshiftStoredProcedure from metadata.ingestion.source.database.redshift.queries import ( REDSHIFT_GET_ALL_RELATION_INFO, @@ -101,7 +102,7 @@ RedshiftDialect._get_all_relation_info = ( # pylint: disable=protected-access ) -class RedshiftSource(StoredProcedureMixin, CommonDbSourceService): +class RedshiftSource(StoredProcedureMixin, CommonDbSourceService, MultiDBSource): """ Implements the necessary methods to extract Database metadata from Redshift Source @@ -153,16 +154,21 @@ class RedshiftSource(StoredProcedureMixin, CommonDbSourceService): for name, relkind in result ] + def get_configured_database(self) -> Optional[str]: + if not self.service_connection.ingestAllDatabases: + return self.service_connection.database + return None + + def get_database_names_raw(self) -> Iterable[str]: + yield from self._execute_database_query(REDSHIFT_GET_DATABASE_NAMES) + def get_database_names(self) -> Iterable[str]: if not self.config.serviceConnection.__root__.config.ingestAllDatabases: self.inspector = inspect(self.engine) self.get_partition_details() yield self.config.serviceConnection.__root__.config.database else: - results = self.connection.execute(REDSHIFT_GET_DATABASE_NAMES) - for res in results: - row = list(res) - new_database = row[0] + for new_database in self.get_database_names_raw(): database_fqn = fqn.build( self.metadata, entity_type=Database, diff --git a/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py b/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py index d72252d4b08..901b86c3b97 100644 --- a/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py @@ -52,6 +52,7 @@ from metadata.ingestion.source.database.common_db_source import ( from metadata.ingestion.source.database.life_cycle_query_mixin import ( LifeCycleQueryMixin, ) +from metadata.ingestion.source.database.multi_db_source import MultiDBSource from metadata.ingestion.source.database.snowflake.models import ( STORED_PROC_LANGUAGE_MAP, SnowflakeStoredProcedure, @@ -122,7 +123,9 @@ SnowflakeDialect.get_foreign_keys = get_foreign_keys SnowflakeDialect.get_columns = get_columns -class SnowflakeSource(LifeCycleQueryMixin, StoredProcedureMixin, CommonDbSourceService): +class SnowflakeSource( + LifeCycleQueryMixin, StoredProcedureMixin, CommonDbSourceService, MultiDBSource +): """ Implements the necessary methods to extract Database metadata from Snowflake Source @@ -214,6 +217,15 @@ class SnowflakeSource(LifeCycleQueryMixin, StoredProcedureMixin, CommonDbSourceS """ return self.database_desc_map.get(database_name) + def get_configured_database(self) -> Optional[str]: + return self.service_connection.database + + def get_database_names_raw(self) -> Iterable[str]: + results = self.connection.execute(SNOWFLAKE_GET_DATABASES) + for res in results: + row = list(res) + yield row[1] + def get_database_names(self) -> Iterable[str]: configured_db = self.config.serviceConnection.__root__.config.database if configured_db: @@ -224,10 +236,7 @@ class SnowflakeSource(LifeCycleQueryMixin, StoredProcedureMixin, CommonDbSourceS self.set_database_description_map() yield configured_db else: - results = self.connection.execute(SNOWFLAKE_GET_DATABASES) - for res in results: - row = list(res) - new_database = row[1] + for new_database in self.get_database_names_raw(): database_fqn = fqn.build( self.metadata, entity_type=Database, diff --git a/ingestion/src/metadata/ingestion/source/database/vertica/metadata.py b/ingestion/src/metadata/ingestion/source/database/vertica/metadata.py index f87f6f3667a..c53c3f3709c 100644 --- a/ingestion/src/metadata/ingestion/source/database/vertica/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/vertica/metadata.py @@ -32,6 +32,7 @@ from metadata.ingestion.api.steps import InvalidSourceException from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.database.column_type_parser import create_sqlalchemy_type from metadata.ingestion.source.database.common_db_source import CommonDbSourceService +from metadata.ingestion.source.database.multi_db_source import MultiDBSource from metadata.ingestion.source.database.vertica.queries import ( VERTICA_GET_COLUMNS, VERTICA_GET_PRIMARY_KEYS, @@ -262,7 +263,7 @@ VerticaDialect.get_all_table_comments = get_all_table_comments VerticaDialect.get_table_comment = get_table_comment -class VerticaSource(CommonDbSourceService): +class VerticaSource(CommonDbSourceService, MultiDBSource): """ Implements the necessary methods to extract Database metadata from Vertica Source @@ -293,6 +294,12 @@ class VerticaSource(CommonDbSourceService): self.engine, VERTICA_SCHEMA_COMMENTS ) + def get_configured_database(self) -> Optional[str]: + return self.service_connection.database + + def get_database_names_raw(self) -> Iterable[str]: + yield from self._execute_database_query(VERTICA_LIST_DATABASES) + def get_database_names(self) -> Iterable[str]: configured_db = self.config.serviceConnection.__root__.config.database if configured_db: @@ -300,10 +307,7 @@ class VerticaSource(CommonDbSourceService): self.set_schema_description_map() yield configured_db else: - results = self.connection.execute(VERTICA_LIST_DATABASES) - for res in results: - row = list(res) - new_database = row[0] + for new_database in self.get_database_names_raw(): database_fqn = fqn.build( self.metadata, entity_type=Database, diff --git a/ingestion/src/metadata/profiler/source/metadata.py b/ingestion/src/metadata/profiler/source/metadata.py index f44d49ed121..a9a16f67771 100644 --- a/ingestion/src/metadata/profiler/source/metadata.py +++ b/ingestion/src/metadata/profiler/source/metadata.py @@ -64,13 +64,16 @@ class OpenMetadataSource(Source): We do this here as well. """ + def init_steps(self): + super().__init__() + + # pylint: disable=super-init-not-called def __init__( self, config: OpenMetadataWorkflowConfig, metadata: OpenMetadata, ): - - super().__init__() + self.init_steps() self.config = config self.metadata = metadata diff --git a/ingestion/src/metadata/profiler/source/metadata_ext.py b/ingestion/src/metadata/profiler/source/metadata_ext.py new file mode 100644 index 00000000000..67ce7cd1475 --- /dev/null +++ b/ingestion/src/metadata/profiler/source/metadata_ext.py @@ -0,0 +1,293 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +OpenMetadataExt source for the profiler + +This source is used in cases where the service name +is not provided for the profiler workflow. +In such situations, the profiler will perform a thorough scan +of the entire data source to locate the +corresponding table entity in OpenMetadata. +Subsequently, it will proceed to ingest relevant metrics +and sample data for that identified entity. +""" +import traceback +from copy import deepcopy +from typing import Iterable, cast + +from sqlalchemy.inspection import inspect + +from metadata.generated.schema.entity.data.database import Database +from metadata.generated.schema.entity.data.table import Table, TableType +from metadata.generated.schema.metadataIngestion.databaseServiceMetadataPipeline import ( + DatabaseServiceMetadataPipeline, +) +from metadata.generated.schema.metadataIngestion.databaseServiceProfilerPipeline import ( + DatabaseServiceProfilerPipeline, +) +from metadata.generated.schema.metadataIngestion.workflow import ( + OpenMetadataWorkflowConfig, +) +from metadata.ingestion.api.models import Either, StackTraceError +from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.ingestion.source.connections import get_connection +from metadata.profiler.source.metadata import ( + OpenMetadataSource, + ProfilerSourceAndEntity, +) +from metadata.profiler.source.profiler_source_factory import profiler_source_factory +from metadata.utils import fqn +from metadata.utils.class_helper import get_service_type_from_source_type +from metadata.utils.filters import filter_by_database, filter_by_schema, filter_by_table +from metadata.utils.importer import import_source_class +from metadata.utils.logger import profiler_logger + +logger = profiler_logger() + + +class OpenMetadataSourceExt(OpenMetadataSource): + """ + This source lists and filters the entities that need + to be processed by the profiler workflow. + + Note that in order to manage the following steps we need + to test the connection against the Database Service Source. + We do this here as well. + """ + + # pylint: disable=super-init-not-called + def __init__( + self, + config: OpenMetadataWorkflowConfig, + metadata: OpenMetadata, + ): + self.init_steps() + + self.config = config + self.metadata = metadata + self.test_connection() + + # Init and type the source config + self.service_connection = self.config.source.serviceConnection.__root__.config + self.source_config: DatabaseServiceProfilerPipeline = cast( + DatabaseServiceProfilerPipeline, self.config.source.sourceConfig.config + ) # Used to satisfy type checked + source_type = self.config.source.type.lower() + service_type = get_service_type_from_source_type(self.config.source.type) + source_class = import_source_class( + service_type=service_type, source_type=source_type + ) + database_source_config = DatabaseServiceMetadataPipeline() + new_config = deepcopy(self.config.source) + new_config.sourceConfig.config = database_source_config + self.source = source_class.create(new_config.dict(), self.metadata) + self.engine = None + self.inspector = None + self._connection = None + self.set_inspector() + + logger.info( + f"Starting profiler for service {self.config.source.type}" + f":{self.config.source.type.lower()}" + ) + + def set_inspector(self, database_name: str = None) -> None: + """ + When sources override `get_database_names`, they will need + to setup multiple inspectors. They can use this function. + :param database_name: new database to set + """ + new_service_connection = deepcopy(self.service_connection) + if database_name: + logger.info(f"Ingesting from database: {database_name}") + new_service_connection.database = database_name + self.engine = get_connection(new_service_connection) + self.inspector = inspect(self.engine) + self._connection = None # Lazy init as well + + def _iter(self, *_, **__) -> Iterable[Either[ProfilerSourceAndEntity]]: + for database_name in self.get_database_names(): + try: + database_entity = fqn.search_database_from_es( + database_name=database_name, + metadata=self.metadata, + service_name=None, + ) + if not database_entity: + logger.debug( + f"Database Entity for database `{database_name}` not found" + ) + continue + for schema_name in self.get_schema_names(): + for table_name in self.get_table_names(schema_name): + table_entity = fqn.search_table_from_es( + metadata=self.metadata, + database_name=database_name, + service_name=None, + schema_name=schema_name, + table_name=table_name, + fields="tableProfilerConfig", + ) + if not table_entity: + logger.debug( + f"Table Entity for table `{database_name}.{schema_name}.{table_name}` not found" + ) + continue + + profiler_source = profiler_source_factory.create( + self.config.source.type.lower(), + self.config, + database_entity, + self.metadata, + ) + yield Either( + right=ProfilerSourceAndEntity( + profiler_source=profiler_source, + entity=table_entity, + ) + ) + except Exception as exc: + yield Either( + left=StackTraceError( + name=database_name, + error=f"Error listing source and entities for database due to [{exc}]", + stack_trace=traceback.format_exc(), + ) + ) + + def get_table_names(self, schema_name: str) -> Iterable[str]: + for table_name in self.inspector.get_table_names(schema_name) or []: + if filter_by_table(self.source_config.tableFilterPattern, table_name): + self.status.filter(table_name, "Table pattern not allowed") + continue + yield table_name + + def get_schema_names(self) -> Iterable[str]: + if self.service_connection.__dict__.get("databaseSchema"): + yield self.service_connection.databaseSchema + else: + for schema_name in self.inspector.get_schema_names(): + if filter_by_schema( + self.source_config.schemaFilterPattern, schema_name + ): + self.status.filter(schema_name, "Schema pattern not allowed") + continue + yield schema_name + + def get_database_names(self) -> Iterable[str]: + """ + Method to fetch database names from source + """ + try: + if hasattr(self.service_connection, "supportsDatabase"): + configured_db = self.source.get_configured_database() + if configured_db: + yield configured_db + else: + database_names = self.source.get_database_names_raw() + for database in database_names: + if filter_by_database( + self.source_config.databaseFilterPattern, database + ): + self.status.filter(database, "Database pattern not allowed") + continue + self.set_inspector(database_name=database) + yield database + else: + custom_database_name = self.service_connection.__dict__.get( + "databaseName" + ) + database_name = self.service_connection.__dict__.get( + "database", custom_database_name or "default" + ) + yield database_name + except Exception as exc: + logger.debug(f"Failed to fetch database names {exc}") + logger.debug(traceback.format_exc()) + + def filter_entities(self, tables: Iterable[Table]) -> Iterable[Table]: + """ + From a list of tables, apply the SQLSourceConfig + filter patterns. + + We will update the status on the SQLSource Status. + """ + for table in tables: + try: + if filter_by_schema( + self.source_config.schemaFilterPattern, + table.databaseSchema.name, # type: ignore + ): + self.status.filter( + f"Schema pattern not allowed: {table.fullyQualifiedName.__root__}", + "Schema pattern not allowed", + ) + continue + if filter_by_table( + self.source_config.tableFilterPattern, + table.name.__root__, + ): + self.status.filter( + f"Table pattern not allowed: {table.fullyQualifiedName.__root__}", + "Table pattern not allowed", + ) + continue + if ( + table.tableType == TableType.View + and not self.source_config.includeViews + ): + self.status.filter( + table.fullyQualifiedName.__root__, + "View filtered out", + ) + continue + yield table + except Exception as exc: + self.status.failed( + StackTraceError( + name=table.fullyQualifiedName.__root__, + error=f"Unexpected error filtering entities for table [{table}]: {exc}", + stack_trace=traceback.format_exc(), + ) + ) + + def get_table_entities(self, database): + """ + List and filter OpenMetadata tables based on the + source configuration. + + The listing will be based on the entities from the + informed service name in the source configuration. + + Note that users can specify `table_filter_pattern` to + either be `includes` or `excludes`. This means + that we will either what is specified in `includes` + or we will use everything but the tables excluded. + + Same with `schema_filter_pattern`. + """ + tables = self.metadata.list_all_entities( + entity=Table, + fields=[ + "tableProfilerConfig", + ], + params={ + "service": self.config.source.serviceName, + "database": fqn.build( + self.metadata, + entity_type=Database, + service_name=self.config.source.serviceName, + database_name=database.name.__root__, + ), + }, # type: ignore + ) + + yield from self.filter_entities(tables) diff --git a/ingestion/src/metadata/utils/fqn.py b/ingestion/src/metadata/utils/fqn.py index 2caeddaef9a..a40187042f5 100644 --- a/ingestion/src/metadata/utils/fqn.py +++ b/ingestion/src/metadata/utils/fqn.py @@ -531,12 +531,12 @@ def build_es_fqn_search_string( Returns: FQN search string """ - if not service_name or not table_name: + if not table_name: raise FQNBuildingException( - f"Service Name and Table Name should be informed, but got service=`{service_name}`, table=`{table_name}`" + f"Table Name should be informed, but got table=`{table_name}`" ) fqn_search_string = _build( - service_name, database_name or "*", schema_name or "*", table_name + service_name or "*", database_name or "*", schema_name or "*", table_name ) return fqn_search_string @@ -548,6 +548,7 @@ def search_table_from_es( service_name: str, table_name: str, fetch_multiple_entities: bool = False, + fields: Optional[str] = None, ): fqn_search_string = build_es_fqn_search_string( database_name, schema_name, service_name, table_name @@ -556,6 +557,36 @@ def search_table_from_es( es_result = metadata.es_search_from_fqn( entity_type=Table, fqn_search_string=fqn_search_string, + fields=fields, + ) + + return get_entity_from_es_result( + entity_list=es_result, fetch_multiple_entities=fetch_multiple_entities + ) + + +def search_database_from_es( + metadata: OpenMetadata, + database_name: str, + service_name: Optional[str], + fetch_multiple_entities: Optional[bool] = False, + fields: Optional[str] = None, +): + """ + Search Database entity from ES + """ + + if not database_name: + raise FQNBuildingException( + f"Database Name should be informed, but got database=`{database_name}`" + ) + + fqn_search_string = _build(service_name or "*", database_name) + + es_result = metadata.es_search_from_fqn( + entity_type=Database, + fqn_search_string=fqn_search_string, + fields=fields, ) return get_entity_from_es_result( diff --git a/ingestion/src/metadata/workflow/metadata.py b/ingestion/src/metadata/workflow/metadata.py index 97a7faa71a9..631b48f21da 100644 --- a/ingestion/src/metadata/workflow/metadata.py +++ b/ingestion/src/metadata/workflow/metadata.py @@ -12,6 +12,7 @@ Workflow definition for metadata related ingestions: metadata and lineage. """ +from metadata.config.common import WorkflowExecutionError from metadata.ingestion.api.steps import Sink, Source from metadata.utils.importer import ( import_from_module, @@ -40,6 +41,12 @@ class MetadataWorkflow(BaseWorkflow): def _get_source(self) -> Source: # Source that we are ingesting, e.g., mysql, looker or kafka source_type = self.config.source.type.lower() + if not self.config.source.serviceName: + raise WorkflowExecutionError( + "serviceName is required field for executing the Metadata Workflow. " + "You can find more information on how to build the YAML " + "configuration here: https://docs.open-metadata.org/connectors" + ) source_class = ( import_from_module( diff --git a/ingestion/src/metadata/workflow/profiler.py b/ingestion/src/metadata/workflow/profiler.py index 54dea4b86e2..d72ea384a37 100644 --- a/ingestion/src/metadata/workflow/profiler.py +++ b/ingestion/src/metadata/workflow/profiler.py @@ -19,6 +19,7 @@ from metadata.ingestion.source.connections import get_connection, get_test_conne from metadata.pii.processor import PIIProcessor from metadata.profiler.processor.processor import ProfilerProcessor from metadata.profiler.source.metadata import OpenMetadataSource +from metadata.profiler.source.metadata_ext import OpenMetadataSourceExt from metadata.utils.importer import import_sink_class from metadata.utils.logger import profiler_logger from metadata.workflow.base import BaseWorkflow @@ -40,8 +41,19 @@ class ProfilerWorkflow(BaseWorkflow): # Validate that we can properly reach the source database self.test_connection() + def _get_source_class(self): + if self.config.source.serviceName: + return OpenMetadataSource + logger.info( + "Database Service name not provided, we will scan all the tables" + "available within data source and locate table entity in OpenMetadata" + "to ingest profiler data." + ) + return OpenMetadataSourceExt + def set_steps(self): - self.source = OpenMetadataSource.create(self.config.dict(), self.metadata) + source_class = self._get_source_class() + self.source = source_class.create(self.config.dict(), self.metadata) profiler_processor = self._get_profiler_processor() pii_processor = self._get_pii_processor() diff --git a/ingestion/src/metadata/workflow/usage.py b/ingestion/src/metadata/workflow/usage.py index 7504d2b40a0..bee82843b34 100644 --- a/ingestion/src/metadata/workflow/usage.py +++ b/ingestion/src/metadata/workflow/usage.py @@ -12,6 +12,7 @@ Usage Workflow Definition """ +from metadata.config.common import WorkflowExecutionError from metadata.ingestion.api.steps import BulkSink, Processor, Source, Stage from metadata.utils.importer import ( import_bulk_sink_type, @@ -44,6 +45,12 @@ class UsageWorkflow(BaseWorkflow): def _get_source(self) -> Source: # Source that we are ingesting, e.g., mysql, looker or kafka source_type = self.config.source.type.lower() + if not self.config.source.serviceName: + raise WorkflowExecutionError( + "ServiceName is required field for executing the Usage Workflow. " + "You can find more information on how to build the YAML " + "configuration here: https://docs.open-metadata.org/connectors" + ) source_class = ( import_from_module( diff --git a/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/workflow.json b/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/workflow.json index 84a6e86bbc7..78b67926eee 100644 --- a/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/workflow.json +++ b/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/workflow.json @@ -83,7 +83,7 @@ } }, "additionalProperties": false, - "required": ["type", "serviceName", "sourceConfig"] + "required": ["type", "sourceConfig"] }, "processor": { "description": "Configuration for Processor Component in the OpenMetadata Ingestion Framework.",