Fix#6212: Retrieve connection params from secret manager in CLI commands (#6441)

* Retrieve connection params from secret manager for database connectors

* Retrieve connection params from secret manager for all services except database connectors

* Stop retrieving connection from SM in Airflow rest plugin

* Retrieve connection params from secret manager for dashboard services

* Retrieve connection params when initializing Workflow/ProfilerWorkflow objects

* Align services topologies + comment changes in topology runner

* Address SonarCloud bug detected

* Update database service topology

* Address PR comments

* Address PR comments

* Address PR comments
This commit is contained in:
Nahuel 2022-08-02 09:13:46 +02:00 committed by GitHub
parent 157fcd8dcf
commit a878aa911c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
78 changed files with 344 additions and 416 deletions

View File

@ -0,0 +1,17 @@
{
"$id": "https://open-metadata.org/schema/entity/services/serviceType.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Service Type",
"description": "This schema defines the service types entities which requires a connection.",
"type": "string",
"javaType": "org.openmetadata.catalog.entity.services.ServiceType",
"enum": [
"Dashboard",
"Database",
"Messaging",
"Metadata",
"MlModel",
"Pipeline"
],
"additionalProperties": false
}

View File

@ -66,7 +66,7 @@
}
},
"additionalProperties": false,
"required": ["type", "serviceName", "serviceConnection", "sourceConfig"]
"required": ["type", "serviceName", "sourceConfig"]
},
"processor": {
"description": "Configuration for Processor Component in the OpenMetadata Ingestion Framework.",

View File

@ -68,50 +68,6 @@ def get_class(key: str) -> Type[T]:
return my_class
def get_source_dir(connection_type: type) -> str:
if connection_type == DatabaseConnection:
return "database"
if connection_type == MessagingConnection:
return "messaging"
if connection_type == MetadataConnection:
return "metadata"
if connection_type == DashboardConnection:
return "dashboard"
if connection_type == PipelineConnection:
return "pipeline"
if connection_type == MlModelConnection:
return "mlmodel"
def get_ingestion_source(
source_type: str,
source_config: WorkflowSource,
metadata_config: OpenMetadataConnection,
) -> Source:
"""
Import the required source class and configure it.
:param source_type: Type specified in the config, e.g., redshift
:param source_config: Specific source configurations, such as the host
:param metadata_config: Metadata server configurations
"""
source_class = get_class(
"metadata.ingestion.source.{}.{}.{}Source".format(
get_source_dir(type(source_config.serviceConnection.__root__)),
fetch_type_class(source_type, is_file=True),
fetch_type_class(source_type, is_file=False),
)
)
source: Source = source_class.create(source_config.dict(), metadata_config)
logger.debug(f"Source type:{source_type},{source_class} configured")
source.prepare()
logger.debug(f"Source type:{source_type},{source_class} prepared")
return source
def get_sink(
sink_type: str,
sink_config: WorkflowSink,

View File

@ -159,22 +159,31 @@ class TopologyRunnerMixin(Generic[C]):
if entity is not None:
if stage.ack_sink:
tries = 3
entity = None
entity_fqn = self.fqn_from_context(
stage=stage, entity_request=entity_request
)
while not entity and tries > 0:
yield entity_request
# Improve validation logic
# we get entity from OM if we do not want to overwrite existing data in OM
if not stage.overwrite:
entity = self.metadata.get_by_name(
entity=stage.type_,
fqn=entity_fqn,
fields=["*"], # Get all the available data from the Entity
)
tries -= 1
# if entity does not exist in OM, or we want to overwrite, we will yield the entity_request
if entity is None:
tries = 3
while not entity and tries > 0:
yield entity_request
# Improve validation logic
entity = self.metadata.get_by_name(
entity=stage.type_,
fqn=entity_fqn,
fields=["*"], # Get all the available data from the Entity
)
tries -= 1
else:
yield entity

View File

@ -15,10 +15,10 @@ from typing import Type, TypeVar
import click
from metadata.config.common import WorkflowExecutionError
from metadata.config.workflow import get_source_dir
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.entity.services.serviceType import ServiceType
from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig,
)
@ -28,6 +28,11 @@ from metadata.ingestion.api.processor import Processor
from metadata.ingestion.api.sink import Sink
from metadata.ingestion.api.source import Source
from metadata.ingestion.api.stage import Stage
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils.class_helper import (
get_service_class_from_service_type,
get_service_type_from_source_type,
)
from metadata.utils.logger import ingestion_logger, set_loggers_level
logger = ingestion_logger()
@ -56,9 +61,14 @@ class Workflow:
set_loggers_level(config.workflowConfig.loggerLevel.value)
source_type = self.config.source.type.lower()
service_type: ServiceType = get_service_type_from_source_type(
self.config.source.type
)
source_class = self.get(
"metadata.ingestion.source.{}.{}.{}Source".format(
get_source_dir(type(self.config.source.serviceConnection.__root__)),
service_type.name.lower(),
self.typeClassFetch(source_type, True),
self.typeClassFetch(source_type, False),
)
@ -67,6 +77,8 @@ class Workflow:
self.config.workflowConfig.openMetadataServerConfig
)
self._retrieve_service_connection_if_needed(metadata_config, service_type)
self.source: Source = source_class.create(
self.config.source.dict(), metadata_config
)
@ -228,3 +240,35 @@ class Workflow:
else:
click.secho("Workflow finished successfully", fg="green", bold=True)
return 0
def _retrieve_service_connection_if_needed(
self, metadata_config: OpenMetadataConnection, service_type: ServiceType
) -> None:
"""
We override the current `serviceConnection` source config object if source workflow service already exists
in OM. When it is configured, we retrieve the service connection from the secrets' manager. Otherwise, we get it
from the service object itself through the default `SecretsManager`.
:param metadata_config: OpenMetadata connection config
:param service_type: source workflow service type
:return:
"""
# We override the current serviceConnection source object if source workflow service already exists in OM.
# We retrieve the service connection from the secrets' manager when it is configured. Otherwise, we get it
# from the service object itself.
if not self._is_sample_source(self.config.source.type):
metadata = OpenMetadata(config=metadata_config)
service = metadata.get_by_name(
get_service_class_from_service_type(service_type),
self.config.source.serviceName,
)
if service:
self.config.source.serviceConnection = (
metadata.secrets_manager_client.retrieve_service_connection(
service, service_type.name.lower()
)
)
@staticmethod
def _is_sample_source(service_type):
return service_type == "sample-data" or service_type == "sample-usage"

View File

@ -8,6 +8,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import os
import shutil

View File

@ -8,6 +8,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import logging
import shutil

View File

@ -8,6 +8,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Optional
from pydantic import BaseModel

View File

@ -8,6 +8,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Optional
from pydantic import BaseModel

View File

@ -9,7 +9,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import List, Optional
from pydantic import BaseModel

View File

@ -39,6 +39,7 @@ class NodeStage(BaseModel, Generic[T]):
False # If we need to cache all values being yielded in the context
)
clear_cache: bool = False # If we need to clean cache values in the context for each produced element
overwrite: bool = True # If we want to overwrite existing data from OM
consumer: Optional[
List[str]
] = None # keys in the source context to fetch state from the parent's context

View File

@ -8,6 +8,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import List, Optional
from pydantic.main import BaseModel

View File

@ -79,7 +79,8 @@ class DashboardServiceTopology(ServiceTopology):
NodeStage(
type_=DashboardService,
context="dashboard_service",
processor="yield_dashboard_service",
processor="yield_create_request_dashboard_service",
overwrite=False,
),
NodeStage(
type_=OMetaTagAndCategory,
@ -267,7 +268,7 @@ class DashboardServiceSource(TopologyRunnerMixin, Source, ABC):
def get_services(self) -> Iterable[WorkflowSource]:
yield self.config
def yield_dashboard_service(self, config: WorkflowSource):
def yield_create_request_dashboard_service(self, config: WorkflowSource):
yield self.metadata.get_create_service_from_source(
entity=DashboardService, config=config
)

View File

@ -8,6 +8,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import traceback
from datetime import datetime
from typing import Iterable, List, Optional, Set, cast

View File

@ -56,20 +56,6 @@ logger = ingestion_logger()
class MetabaseSource(DashboardServiceSource):
"""Metabase entity class
Args:
config:
metadata_config:
Attributes:
config:
metadata_config:
status:
metabase_session:
dashboard_service:
charts:
metric_charts:
"""
config: WorkflowSource
metadata_config: OpenMetadataConnection
@ -81,19 +67,10 @@ class MetabaseSource(DashboardServiceSource):
metadata_config: OpenMetadataConnection,
):
super().__init__(config, metadata_config)
self.connection = get_connection(self.service_connection)
self.metabase_session = self.connection.client["metabase_session"]
@classmethod
def create(cls, config_dict, metadata_config: OpenMetadataConnection):
"""Instantiate object
Args:
config_dict:
metadata_config:
Returns:
MetabaseSource
"""
config = WorkflowSource.parse_obj(config_dict)
connection: MetabaseConnection = config.serviceConnection.__root__.config
if not isinstance(connection, MetabaseConnection):

View File

@ -52,16 +52,6 @@ logger = ingestion_logger()
class ModeSource(DashboardServiceSource):
"""Mode entity class
Args:
config:
metadata_config:
Attributes:
config:
metadata_config:
charts:
"""
def __init__(
self,
config: WorkflowSource,
@ -73,13 +63,6 @@ class ModeSource(DashboardServiceSource):
@classmethod
def create(cls, config_dict, metadata_config: OpenMetadataConnection):
"""Instantiate object
Args:
config_dict:
metadata_config:
Returns:
ModeSource
"""
config = WorkflowSource.parse_obj(config_dict)
connection: ModeConnection = config.serviceConnection.__root__.config
if not isinstance(connection, ModeConnection):

View File

@ -40,16 +40,6 @@ logger = ingestion_logger()
class PowerbiSource(DashboardServiceSource):
"""PowerBi entity class
Args:
config:
metadata_config:
Attributes:
config:
metadata_config:
charts:
"""
def __init__(
self,
config: WorkflowSource,
@ -59,13 +49,6 @@ class PowerbiSource(DashboardServiceSource):
@classmethod
def create(cls, config_dict, metadata_config: OpenMetadataConnection):
"""Instantiate object
Args:
config_dict:
metadata_config:
Returns:
PowerBiSource
"""
config = WorkflowSource.parse_obj(config_dict)
connection: PowerBIConnection = config.serviceConnection.__root__.config
if not isinstance(connection, PowerBIConnection):

View File

@ -8,6 +8,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from logging.config import DictConfigurator
from typing import Iterable, List, Optional

View File

@ -16,8 +16,6 @@ import json
import traceback
from typing import Iterable, List, Optional
import dateutil.parser as dateparser
from metadata.generated.schema.api.data.createChart import CreateChartRequest
from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
@ -87,22 +85,6 @@ def get_filter_name(filter_obj):
class SupersetSource(DashboardServiceSource):
"""
Superset source class
Args:
config:
metadata_config:
Attributes:
config:
metadata_config:
status:
platform:
service_type:
service:
"""
config: WorkflowSource
metadata_config: OpenMetadataConnection

View File

@ -59,17 +59,6 @@ TABLEAU_TAG_CATEGORY = "TableauTags"
class TableauSource(DashboardServiceSource):
"""Tableau source entity class
Args:
config:
metadata_config:
Attributes:
config:
metadata_config:
all_dashboard_details:
"""
config: WorkflowSource
metadata_config: OpenMetadataConnection

View File

@ -8,6 +8,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Iterable, Optional, Tuple
from pyathena.sqlalchemy_athena import AthenaDialect
@ -104,7 +105,6 @@ class AthenaSource(CommonDbSourceService):
raise InvalidSourceException(
f"Expected AthenaConnection, but got {connection}"
)
return cls(config, metadata_config)
def get_tables_name_and_type(self) -> Optional[Iterable[Tuple[str, str]]]:

View File

@ -24,13 +24,6 @@ from metadata.ingestion.source.database.common_db_source import CommonDbSourceSe
class AzuresqlSource(CommonDbSourceService):
"""Azure SQL Source class
Args:
config:
metadata_config:
"""
@classmethod
def create(cls, config_dict, metadata_config: OpenMetadataConnection):
config: WorkflowSource = WorkflowSource.parse_obj(config_dict)

View File

@ -101,7 +101,6 @@ class BigquerySource(CommonDbSourceService):
raise InvalidSourceException(
f"Expected BigQueryConnection, but got {connection}"
)
return cls(config, metadata_config)
def standardize_table_name(self, schema: str, table: str) -> str:

View File

@ -29,7 +29,6 @@ from metadata.generated.schema.metadataIngestion.workflow import (
)
from metadata.ingestion.api.source import InvalidSourceException
from metadata.ingestion.source.database.usage_source import UsageSource
from metadata.utils.credentials import set_google_credentials
from metadata.utils.logger import ingestion_logger
from metadata.utils.sql_queries import BIGQUERY_USAGE_STATEMENT
@ -56,11 +55,6 @@ class BigqueryUsageSource(UsageSource):
raise InvalidSourceException(
f"Expected BigQueryConnection, but got {connection}"
)
set_google_credentials(
gcs_credentials=config.serviceConnection.__root__.config.credentials
)
return cls(config, metadata_config)
def get_sql_statement(self, start_time: datetime, end_time: datetime) -> str:

View File

@ -172,5 +172,4 @@ class ClickhouseSource(CommonDbSourceService):
raise InvalidSourceException(
f"Expected ClickhouseConnection, but got {connection}"
)
return cls(config, metadata_config)

View File

@ -17,31 +17,32 @@ import ast
from metadata.generated.schema.entity.services.connections.database.clickhouseConnection import (
ClickhouseConnection,
)
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.metadataIngestion.workflow import WorkflowConfig
from metadata.ingestion.api.source import InvalidSourceException
from metadata.ingestion.source.database.usage_source import UsageSource
from metadata.utils.sql_queries import CLICKHOUSE_SQL_USAGE_STATEMENT
class ClickhouseUsageSource(UsageSource):
def __init__(self, config: WorkflowSource, metadata_config: WorkflowConfig):
def __init__(self, config: WorkflowSource, metadata_config: OpenMetadataConnection):
super().__init__(config, metadata_config)
self.sql_stmt = CLICKHOUSE_SQL_USAGE_STATEMENT.format(
start_time=self.start, end_time=self.end
)
@classmethod
def create(cls, config_dict, metadata_config: WorkflowConfig):
def create(cls, config_dict, metadata_config: OpenMetadataConnection):
config: WorkflowSource = WorkflowSource.parse_obj(config_dict)
connection: ClickhouseConnection = config.serviceConnection.__root__.config
if not isinstance(connection, ClickhouseConnection):
raise InvalidSourceException(
f"Expected ClickhouseConnection, but got {connection}"
)
return cls(config, metadata_config)
def get_schema_name(self, data: dict) -> str:

View File

@ -48,7 +48,6 @@ from metadata.ingestion.source.database.database_service import (
)
from metadata.ingestion.source.database.sql_column_handler import SqlColumnHandlerMixin
from metadata.ingestion.source.database.sqlalchemy_source import SqlAlchemySource
from metadata.utils import fqn
from metadata.utils.connections import get_connection, test_connection
from metadata.utils.filters import filter_by_schema, filter_by_table
from metadata.utils.logger import ingestion_logger
@ -76,13 +75,14 @@ class CommonDbSourceService(
self.source_config: DatabaseServiceMetadataPipeline = (
self.config.sourceConfig.config
)
self.metadata_config = metadata_config
self.metadata = OpenMetadata(self.metadata_config)
# It will be one of the Unions. We don't know the specific type here.
self.service_connection = self.config.serviceConnection.__root__.config
self.status = SQLSourceStatus()
self.metadata_config = metadata_config
self.metadata = OpenMetadata(metadata_config)
self.engine: Engine = get_connection(self.service_connection)
self.test_connection()

View File

@ -109,7 +109,8 @@ class DatabaseServiceTopology(ServiceTopology):
NodeStage(
type_=DatabaseService,
context="database_service",
processor="yield_database_service",
processor="yield_create_request_database_service",
overwrite=False,
),
NodeStage(
type_=StorageService,
@ -247,7 +248,7 @@ class DatabaseServiceSource(DBTMixin, TopologyRunnerMixin, Source, ABC):
def get_services(self) -> Iterable[WorkflowSource]:
yield self.config
def yield_database_service(self, config: WorkflowSource):
def yield_create_request_database_service(self, config: WorkflowSource):
yield self.metadata.get_create_service_from_source(
entity=DatabaseService, config=config
)

View File

@ -130,5 +130,4 @@ class DatabricksSource(CommonDbSourceService):
raise InvalidSourceException(
f"Expected DatabricksConnection, but got {connection}"
)
return cls(config, metadata_config)

View File

@ -35,7 +35,6 @@ from metadata.generated.schema.entity.services.connections.database.datalakeConn
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.metadataIngestion.databaseServiceMetadataPipeline import (
DatabaseServiceMetadataPipeline,
)
@ -76,7 +75,6 @@ DATALAKE_SUPPORTED_FILE_TYPES = (".csv", ".tsv", ".json", ".parquet")
class DatalakeSource(DatabaseServiceSource):
def __init__(self, config: WorkflowSource, metadata_config: OpenMetadataConnection):
self.status = SQLSourceStatus()
self.config = config
self.source_config: DatabaseServiceMetadataPipeline = (
self.config.sourceConfig.config

View File

@ -11,7 +11,6 @@
from ibm_db_sa.base import DB2Dialect
from sqlalchemy.engine import reflection
from sqlalchemy.engine.reflection import Inspector
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,

View File

@ -19,7 +19,6 @@ from metadata.generated.schema.entity.services.connections.database.deltaLakeCon
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.metadataIngestion.databaseServiceMetadataPipeline import (
DatabaseServiceMetadataPipeline,
)

View File

@ -9,7 +9,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from metadata.generated.schema.entity.services.connections.database.druidConnection import (
DruidConnection,
)

View File

@ -14,7 +14,6 @@ from metadata.generated.schema.entity.services.connections.database.dynamoDBConn
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.metadataIngestion.databaseServiceMetadataPipeline import (
DatabaseServiceMetadataPipeline,
)

View File

@ -27,7 +27,6 @@ from metadata.generated.schema.entity.services.connections.database.glueConnecti
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.metadataIngestion.databaseServiceMetadataPipeline import (
DatabaseServiceMetadataPipeline,
)
@ -35,7 +34,6 @@ from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.common import Entity
from metadata.ingestion.api.source import InvalidSourceException, SourceStatus
from metadata.ingestion.models.ometa_tag_category import OMetaTagAndCategory
from metadata.ingestion.ometa.ometa_api import OpenMetadata

View File

@ -124,9 +124,6 @@ HiveDialect.get_view_names = get_view_names
class HiveSource(CommonDbSourceService):
def prepare(self):
return super().prepare()
@classmethod
def create(cls, config_dict, metadata_config: OpenMetadataConnection):
config = WorkflowSource.parse_obj(config_dict)

View File

@ -9,9 +9,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from sqlalchemy.engine.reflection import Inspector
from metadata.generated.schema.entity.services.connections.database.mariaDBConnection import (
MariaDBConnection,
)
@ -37,5 +34,4 @@ class MariadbSource(CommonDbSourceService):
raise InvalidSourceException(
f"Expected MariaDBConnection, but got {connection}"
)
return cls(config, metadata_config)

View File

@ -11,9 +11,6 @@
"""MSSQL source module"""
from typing import Iterable
from sqlalchemy.engine.reflection import Inspector
from sqlalchemy.inspection import inspect
from metadata.generated.schema.entity.services.connections.database.mssqlConnection import (
MssqlConnection,
)
@ -32,14 +29,6 @@ logger = ingestion_logger()
class MssqlSource(CommonDbSourceService):
"""MSSQL Source class
Args:
config:
metadata_config:
ctx
"""
@classmethod
def create(cls, config_dict, metadata_config: OpenMetadataConnection):
"""Create class instance"""

View File

@ -34,5 +34,4 @@ class MysqlSource(CommonDbSourceService):
raise InvalidSourceException(
f"Expected MysqlConnection, but got {connection}"
)
return cls(config, metadata_config)

View File

@ -9,7 +9,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from metadata.generated.schema.entity.services.connections.database.pinotDBConnection import (
PinotDBConnection,
)
@ -35,5 +34,4 @@ class PinotdbSource(CommonDbSourceService):
raise InvalidSourceException(
f"Expected PinotdbConnection, but got {connection}"
)
return cls(config, metadata_config)

View File

@ -44,7 +44,6 @@ class PostgresSource(CommonDbSourceService):
raise InvalidSourceException(
f"Expected PostgresConnection, but got {connection}"
)
return cls(config, metadata_config)
def get_database_names(self) -> Iterable[str]:

View File

@ -86,9 +86,6 @@ class PrestoSource(CommonDbSourceService):
"""
def __init__(self, config, metadata_config):
self.presto_connection: PrestoConnection = (
config.serviceConnection.__root__.config
)
super().__init__(config, metadata_config)
@classmethod
@ -103,4 +100,4 @@ class PrestoSource(CommonDbSourceService):
def get_database_names(self) -> Iterable[str]:
self.inspector = inspect(self.engine)
yield self.presto_connection.catalog
yield self.service_connection.catalog

View File

@ -30,6 +30,5 @@ class QueryLogUsageSource(UsageSource):
@classmethod
def create(cls, config_dict, metadata_config: WorkflowConfig):
"""Create class instance"""
config: WorkflowSource = WorkflowSource.parse_obj(config_dict)
return cls(config, metadata_config)

View File

@ -441,27 +441,11 @@ STANDARD_TABLE_TYPES = {
# pylint: disable=useless-super-delegation
class RedshiftSource(CommonDbSourceService):
"""
Redshift source cloass
Args:
confi:
metadata_config:
"""
def __init__(self, config, metadata_config):
super().__init__(config, metadata_config)
@classmethod
def create(cls, config_dict, metadata_config: OpenMetadataConnection):
"""
Create source
Args:
config_dict:
metadata_config:
Returns:
"""
config: WorkflowSource = WorkflowSource.parse_obj(config_dict)
connection: RedshiftConnection = config.serviceConnection.__root__.config
if not isinstance(connection, RedshiftConnection):

View File

@ -16,14 +16,14 @@ from typing import Iterator, Union
from metadata.generated.schema.entity.services.connections.database.redshiftConnection import (
RedshiftConnection,
)
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.metadataIngestion.workflow import WorkflowConfig
from metadata.ingestion.api.source import InvalidSourceException
from metadata.ingestion.source.database.usage_source import UsageSource
# pylint: disable=useless-super-delegation
from metadata.utils.logger import ingestion_logger
from metadata.utils.sql_queries import REDSHIFT_SQL_STATEMENT
@ -34,7 +34,7 @@ class RedshiftUsageSource(UsageSource):
SQL_STATEMENT = REDSHIFT_SQL_STATEMENT
def __init__(self, config: WorkflowSource, metadata_config: WorkflowConfig):
def __init__(self, config: WorkflowSource, metadata_config: OpenMetadataConnection):
super().__init__(config, metadata_config)
self.sql_stmt = RedshiftUsageSource.SQL_STATEMENT.format(
start_time=self.start, end_time=self.end
@ -43,7 +43,7 @@ class RedshiftUsageSource(UsageSource):
self._database = "redshift"
@classmethod
def create(cls, config_dict, metadata_config: WorkflowConfig):
def create(cls, config_dict, metadata_config: OpenMetadataConnection):
config: WorkflowSource = WorkflowSource.parse_obj(config_dict)
connection: RedshiftConnection = config.serviceConnection.__root__.config
if not isinstance(connection, RedshiftConnection):

View File

@ -25,7 +25,6 @@ from metadata.generated.schema.entity.services.connections.database.salesforceCo
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.metadataIngestion.databaseServiceMetadataPipeline import (
DatabaseServiceMetadataPipeline,
)
@ -71,7 +70,6 @@ class SalesforceSource(DatabaseServiceSource):
raise InvalidSourceException(
f"Expected SalesforceConnection, but got {connection}"
)
return cls(config, metadata_config)
def get_database_names(self) -> Iterable[str]:

View File

@ -70,7 +70,7 @@ class SampleUsageSource(UsageSource):
connection: SampleDataConnection = config.serviceConnection.__root__.config
if not isinstance(connection, SampleDataConnection):
raise InvalidSourceException(
f"Expected MssqlConnection, but got {connection}"
f"Expected SampleDataConnection, but got {connection}"
)
return cls(config, metadata_config)

View File

@ -9,7 +9,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from metadata.generated.schema.entity.services.connections.database.singleStoreConnection import (
SingleStoreConnection,
)
@ -35,5 +34,4 @@ class SinglestoreSource(CommonDbSourceService):
raise InvalidSourceException(
f"Expected SingleStoreConnection, but got {connection}"
)
return cls(config, metadata_config)

View File

@ -8,6 +8,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Iterable
from snowflake.sqlalchemy.custom_types import VARIANT

View File

@ -39,5 +39,4 @@ class SqliteSource(CommonDbSourceService):
raise InvalidSourceException(
f"Expected SQLiteConnection, but got {connection}"
)
return cls(config, metadata_config)

View File

@ -25,6 +25,7 @@ from metadata.generated.schema.metadataIngestion.workflow import (
)
from metadata.generated.schema.type.tableQuery import TableQueries, TableQuery
from metadata.ingestion.api.source import Source, SourceStatus
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.common_db_source import SQLSourceStatus
from metadata.utils.connections import get_connection, test_connection
from metadata.utils.filters import filter_by_database, filter_by_schema
@ -42,7 +43,8 @@ class UsageSource(Source[TableQuery], ABC):
super().__init__()
self.config = config
self.metadata_config = metadata_config
self.connection = config.serviceConnection.__root__.config
self.metadata = OpenMetadata(metadata_config)
self.connection = self.config.serviceConnection.__root__.config
self.source_config = self.config.sourceConfig.config
self.start, self.end = get_start_and_end(self.source_config.queryLogDuration)
self.analysis_date = self.end

View File

@ -230,5 +230,4 @@ class VerticaSource(CommonDbSourceService):
raise InvalidSourceException(
f"Expected VerticaConnection, but got {connection}"
)
return cls(config, metadata_config)

View File

@ -16,8 +16,6 @@ from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Any, Iterable, List, Optional
from confluent_kafka.admin import AdminClient, ConfigResource
from metadata.generated.schema.api.data.createTopic import CreateTopicRequest
from metadata.generated.schema.entity.data.topic import Topic
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
@ -61,7 +59,8 @@ class MessagingServiceTopology(ServiceTopology):
NodeStage(
type_=MessagingService,
context="messaging_service",
processor="yield_messaging_service",
processor="yield_create_request_messaging_service",
overwrite=False,
)
],
children=["topic"],
@ -139,10 +138,10 @@ class MessagingServiceSource(TopologyRunnerMixin, Source, ABC):
self.config = config
self.metadata_config = metadata_config
self.metadata = OpenMetadata(metadata_config)
self.service_connection = self.config.serviceConnection.__root__.config
self.source_config: MessagingServiceMetadataPipeline = (
self.config.sourceConfig.config
)
self.service_connection = self.config.serviceConnection.__root__.config
self.connection = get_connection(self.service_connection)
self.test_connection()
self.status = MessagingSourceStatus()
@ -160,7 +159,7 @@ class MessagingServiceSource(TopologyRunnerMixin, Source, ABC):
continue
yield topic_details
def yield_messaging_service(self, config: WorkflowSource):
def yield_create_request_messaging_service(self, config: WorkflowSource):
yield self.metadata.get_create_service_from_source(
entity=MessagingService, config=config
)

View File

@ -39,6 +39,7 @@ from metadata.generated.schema.entity.services.databaseService import (
DatabaseService,
DatabaseServiceType,
)
from metadata.generated.schema.entity.services.metadataService import MetadataService
from metadata.generated.schema.entity.tags.tagCategory import Tag
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
@ -114,9 +115,8 @@ class AmundsenSource(Source[Entity]):
def __init__(self, config: WorkflowSource, metadata_config: OpenMetadataConnection):
self.config = config
self.metadata_config = metadata_config
self.service_connection = config.serviceConnection.__root__.config
self.metadata = OpenMetadata(self.metadata_config)
self.service_connection = self.config.serviceConnection.__root__.config
neo4j_config = Neo4JConfig(
username=self.service_connection.username,
password=self.service_connection.password.get_secret_value(),

View File

@ -1,12 +1,10 @@
import traceback
import uuid
from dataclasses import dataclass, field
from distutils.command.config import config
from pathlib import Path
from typing import Any, Dict, Iterable, List
import yaml
from importlib_metadata import SelectableGroups
from metadata.generated.schema.api.data.createTopic import CreateTopicRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
@ -22,6 +20,7 @@ from metadata.generated.schema.entity.services.connections.metadata.openMetadata
)
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.entity.services.messagingService import MessagingService
from metadata.generated.schema.entity.services.metadataService import MetadataService
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
@ -52,7 +51,7 @@ class AtlasSourceStatus(SourceStatus):
@dataclass
class AtlasSource(Source):
config: AtlasConnection
config: WorkflowSource
atlas_client: AtlasClient
status: AtlasSourceStatus
tables: Dict[str, Any]
@ -60,15 +59,14 @@ class AtlasSource(Source):
def __init__(
self,
config: AtlasConnection,
config: WorkflowSource,
metadata_config: OpenMetadataConnection,
):
super().__init__()
self.config = config
self.metadata_config = metadata_config
self.service_connection = config.serviceConnection.__root__.config
self.metadata = OpenMetadata(metadata_config)
self.service_connection = self.config.serviceConnection.__root__.config
self.status = AtlasSourceStatus()
self.schema_registry_url = "http://localhost:8081"

View File

@ -35,6 +35,7 @@ from metadata.generated.schema.metadataIngestion.workflow import (
)
from metadata.ingestion.api.common import Entity
from metadata.ingestion.api.source import Source, SourceStatus
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
@ -42,24 +43,12 @@ logger = ingestion_logger()
@dataclass
class MetadataSourceStatus(SourceStatus):
"""Metadata Source class -- extends SourceStatus class
Attributes:
success:
failures:
warnings:
"""
success: List[str] = field(default_factory=list)
failures: List[str] = field(default_factory=list)
warnings: List[str] = field(default_factory=list)
def scanned_entity(self, entity_class_name: str, entity_name: str) -> None:
"""scanned entity method
Args:
entity_name (str):
"""
self.success.append(entity_name)
logger.info("%s Scanned: %s", entity_class_name, entity_name)
@ -67,33 +56,11 @@ class MetadataSourceStatus(SourceStatus):
def filtered(
self, table_name: str, err: str, dataset_name: str = None, col_type: str = None
) -> None:
"""filtered methods
Args:
table_name (str):
err (str):
"""
self.warnings.append(table_name)
logger.warning("Dropped Entity %s due to %s", table_name, err)
class MetadataSource(Source[Entity]):
"""Metadata source class
Args:
config:
metadata_config:
Attributes:
config:
report:
metadata_config:
status:
wrote_something:
metadata:
tables:
topics:
"""
config: WorkflowSource
report: SourceStatus
@ -106,10 +73,9 @@ class MetadataSource(Source[Entity]):
super().__init__()
self.config = config
self.metadata_config = metadata_config
self.service_connection = config.serviceConnection.__root__.config
self.metadata = OpenMetadata(metadata_config)
self.status = MetadataSourceStatus()
self.wrote_something = False
self.metadata = None
self.tables = None
self.topics = None

View File

@ -10,7 +10,6 @@
# limitations under the License.
"""Metadata source module"""
from metadata.generated.schema.entity.services.connections.metadata.metadataESConnection import (
MetadataESConnection,
)
@ -21,7 +20,6 @@ from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.ingestion.api.source import InvalidSourceException, SourceStatus
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.metadata.metadata import MetadataSource
from metadata.utils.logger import ingestion_logger
@ -29,22 +27,6 @@ logger = ingestion_logger()
class MetadataElasticsearchSource(MetadataSource):
"""MetadataElasticsearch class
Args:
config:
metadata_config:
Attributes:
config:
report:
metadata_config:
status:
wrote_something:
metadata:
tables:
topics:
"""
config: WorkflowSource
report: SourceStatus
@ -55,7 +37,6 @@ class MetadataElasticsearchSource(MetadataSource):
metadata_config: OpenMetadataConnection,
):
super().__init__(config=config, metadata_config=metadata_config)
self.metadata = OpenMetadata(self.metadata_config)
@classmethod
def create(cls, config_dict, metadata_config: OpenMetadataConnection):

View File

@ -29,7 +29,6 @@ from metadata.generated.schema.metadataIngestion.workflow import (
)
from metadata.ingestion.api.common import Entity
from metadata.ingestion.api.source import InvalidSourceException, SourceStatus
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.metadata import MetadataSource
logger = logging.getLogger(__name__)
@ -71,22 +70,6 @@ class DatabaseServiceWrapper:
class MigrateSource(MetadataSource):
"""OpenmetadataSource class
Args:
config:
metadata_config:
Attributes:
config:
report:
metadata_config:
status:
wrote_something:
metadata:
tables:
topics:
"""
config: WorkflowSource
report: SourceStatus
@ -97,9 +80,6 @@ class MigrateSource(MetadataSource):
metadata_config: OpenMetadataConnection,
):
super().__init__(config, metadata_config)
self.metadata = OpenMetadata(
OpenMetadataConnection.parse_obj(self.service_connection)
)
@classmethod
def create(cls, config_dict, metadata_config: OpenMetadataConnection):

View File

@ -18,7 +18,6 @@ from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.ingestion.api.source import InvalidSourceException, SourceStatus
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.metadata.metadata import MetadataSource
from metadata.utils.logger import ingestion_logger
@ -26,22 +25,6 @@ logger = ingestion_logger()
class OpenmetadataSource(MetadataSource):
"""OpenmetadataSource class
Args:
config:
metadata_config:
Attributes:
config:
report:
metadata_config:
status:
wrote_something:
metadata:
tables:
topics:
"""
config: WorkflowSource
report: SourceStatus
@ -52,9 +35,6 @@ class OpenmetadataSource(MetadataSource):
metadata_config: OpenMetadataConnection,
):
super().__init__(config, metadata_config)
self.metadata = OpenMetadata(
OpenMetadataConnection.parse_obj(self.service_connection)
)
@classmethod
def create(cls, config_dict, metadata_config: OpenMetadataConnection):

View File

@ -59,7 +59,6 @@ class MlflowSource(MlModelServiceSource):
raise InvalidSourceException(
f"Expected MysqlConnection, but got {connection}"
)
return cls(config, metadata_config)
def get_mlmodels(self) -> Iterable[Tuple[RegisteredModel, ModelVersion]]:

View File

@ -65,7 +65,8 @@ class MlModelServiceTopology(ServiceTopology):
NodeStage(
type_=MlModelService,
context="mlmodel_service",
processor="yield_mlmodel_service",
processor="yield_create_request_mlmodel_service",
overwrite=False,
),
],
children=["mlmodel"],
@ -153,7 +154,7 @@ class MlModelServiceSource(TopologyRunnerMixin, Source, ABC):
def get_services(self) -> Iterable[WorkflowSource]:
yield self.config
def yield_mlmodel_service(self, config: WorkflowSource):
def yield_create_request_mlmodel_service(self, config: WorkflowSource):
yield self.metadata.get_create_service_from_source(
entity=MlModelService, config=config
)

View File

@ -17,7 +17,6 @@ from typing import Any, Iterable, List, Optional, cast
from airflow.models import BaseOperator, DagRun
from airflow.models.serialized_dag import SerializedDagModel
from airflow.serialization.serialized_objects import SerializedDAG
from sqlalchemy.engine import Engine
from sqlalchemy.orm import Session
from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest
@ -43,11 +42,7 @@ from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.source import InvalidSourceException
from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus
from metadata.ingestion.source.pipeline.pipeline_service import PipelineServiceSource
from metadata.utils.connections import (
create_and_bind_session,
get_connection,
test_connection,
)
from metadata.utils.connections import create_and_bind_session, test_connection
from metadata.utils.helpers import datetime_to_ts
from metadata.utils.logger import ingestion_logger
@ -73,11 +68,8 @@ class AirflowSource(PipelineServiceSource):
config: WorkflowSource,
metadata_config: OpenMetadataConnection,
):
self._session = None
self.service_connection = config.serviceConnection.__root__.config
self.engine: Engine = get_connection(self.service_connection.connection)
super().__init__(config, metadata_config)
# Create the connection to the database
self._session = None
@classmethod
def create(cls, config_dict, metadata_config: OpenMetadataConnection):

View File

@ -12,7 +12,6 @@
Airbyte source to extract metadata
"""
from tokenize import group
from typing import Iterable, Optional
from pydantic import BaseModel

View File

@ -8,8 +8,9 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import traceback
from typing import Any, Iterable, Optional
from typing import Any, Iterable, List, Optional
from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
@ -97,7 +98,7 @@ class GlueSource(PipelineServiceSource):
)
yield pipeline_ev
def get_tasks(self, pipeline_details: Any) -> Task:
def get_tasks(self, pipeline_details: Any) -> List[Task]:
task_list = []
for task in pipeline_details["Graph"]["Nodes"]:
self.task_id_mapping[task["UniqueId"]] = task["Name"][:128]

View File

@ -62,7 +62,8 @@ class PipelineServiceTopology(ServiceTopology):
NodeStage(
type_=PipelineService,
context="pipeline_service",
processor="yield_pipeline_service",
processor="yield_create_request_pipeline_service",
overwrite=False,
),
],
children=["pipeline"],
@ -171,7 +172,6 @@ class PipelineServiceSource(TopologyRunnerMixin, Source, ABC):
topology = PipelineServiceTopology()
context = create_source_context(topology)
@abstractmethod
def __init__(
self,
config: WorkflowSource,
@ -200,7 +200,7 @@ class PipelineServiceSource(TopologyRunnerMixin, Source, ABC):
def get_services(self) -> Iterable[WorkflowSource]:
yield self.config
def yield_pipeline_service(self, config: WorkflowSource):
def yield_create_request_pipeline_service(self, config: WorkflowSource):
yield self.metadata.get_create_service_from_source(
entity=PipelineService, config=config
)

View File

@ -30,6 +30,7 @@ from metadata.generated.schema.entity.services.connections.metadata.openMetadata
OpenMetadataConnection,
)
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.entity.services.serviceType import ServiceType
from metadata.generated.schema.metadataIngestion.databaseServiceProfilerPipeline import (
DatabaseServiceProfilerPipeline,
)
@ -44,6 +45,10 @@ from metadata.orm_profiler.api.models import ProfilerProcessorConfig, ProfilerRe
from metadata.orm_profiler.interfaces.interface_protocol import InterfaceProtocol
from metadata.orm_profiler.interfaces.sqa_profiler_interface import SQAProfilerInterface
from metadata.utils import fqn
from metadata.utils.class_helper import (
get_service_class_from_service_type,
get_service_type_from_source_type,
)
from metadata.utils.connections import get_connection, test_connection
from metadata.utils.filters import filter_by_fqn
from metadata.utils.logger import profiler_logger
@ -67,6 +72,11 @@ class ProfilerWorkflow:
self.config.workflowConfig.openMetadataServerConfig
)
# OpenMetadata client to fetch tables
self.metadata = OpenMetadata(self.metadata_config)
self._retrieve_service_connection_if_needed()
# Prepare the connection to the source service
# We don't need the whole Source class, as it is the OM Server
@ -86,9 +96,6 @@ class ProfilerWorkflow:
_from="orm_profiler",
)
# OpenMetadata client to fetch tables
self.metadata = OpenMetadata(self.metadata_config)
if not self._validate_service_name():
raise ValueError(
f"Service name `{self.config.source.serviceName}` does not exist. "
@ -302,3 +309,30 @@ class ProfilerWorkflow:
"""
self.metadata.close()
self.processor.close()
def _retrieve_service_connection_if_needed(self) -> None:
"""
We override the current `serviceConnection` source config object if source workflow service already exists
in OM. When it is configured, we retrieve the service connection from the secrets' manager. Otherwise, we get it
from the service object itself through the default `SecretsManager`.
:return:
"""
if not self._is_sample_source(self.config.source.type):
service_type: ServiceType = get_service_type_from_source_type(
self.config.source.type
)
service = self.metadata.get_by_name(
get_service_class_from_service_type(service_type),
self.config.source.serviceName,
)
if service:
self.config.source.serviceConnection = (
self.metadata.secrets_manager_client.retrieve_service_connection(
service, service_type.name.lower()
)
)
@staticmethod
def _is_sample_source(service_type):
return service_type == "sample-data" or service_type == "sample-usage"

View File

@ -0,0 +1,38 @@
from pydoc import locate
from typing import Type
from pydantic import BaseModel
from metadata.generated.schema.entity.services.serviceType import ServiceType
def _clean(source_type: str):
source_type = source_type.replace("-", "_")
source_type = source_type.replace("_usage", "")
source_type = source_type.replace("_", "")
if source_type == "sample":
source_type = "sampledata"
return source_type
def _get_service_type_from(service_subtype: str) -> ServiceType:
for service_type in ServiceType:
if service_subtype.lower() in [
subtype.value.lower()
for subtype in locate(
f"metadata.generated.schema.entity.services.{service_type.name.lower()}Service.{service_type.name}ServiceType"
)
or []
]:
return service_type
def get_service_type_from_source_type(source_type: str) -> ServiceType:
return _get_service_type_from(_clean(source_type))
def get_service_class_from_service_type(service_type: ServiceType) -> Type[BaseModel]:
return locate(
f"metadata.generated.schema.entity.services.{service_type.name.lower()}Service.{service_type.name}Service"
)

View File

@ -8,6 +8,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import re
from datetime import datetime, timedelta
from typing import Any, Dict, Iterable, List, Optional

View File

@ -8,9 +8,9 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import os
from itertools import islice
from typing import Any
import pandas as pd

View File

@ -8,6 +8,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import inspect
import json
from abc import abstractmethod
@ -25,6 +26,12 @@ from metadata.generated.schema.entity.services.connections.metadata.openMetadata
from metadata.generated.schema.entity.services.connections.serviceConnection import (
ServiceConnection,
)
from metadata.generated.schema.entity.services.dashboardService import DashboardService
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.entity.services.messagingService import MessagingService
from metadata.generated.schema.entity.services.metadataService import MetadataService
from metadata.generated.schema.entity.services.mlmodelService import MlModelService
from metadata.generated.schema.entity.services.pipelineService import PipelineService
from metadata.generated.schema.security.client import (
auth0SSOClientConfig,
azureSSOClientConfig,
@ -42,8 +49,16 @@ logger = ingestion_logger()
SECRET_MANAGER_AIRFLOW_CONF = "openmetadata_secrets_manager"
# new typing type wrapping types from the '__root__' field of 'ServiceConnection' class
ServiceConnectionType = NewType(
"ServiceConnectionType", ServiceConnection.__fields__["__root__"].type_
ServiceWithConnectionType = NewType(
"ServiceWithConnectionType",
Union[
DashboardService,
DatabaseService,
MessagingService,
MetadataService,
MlModelService,
PipelineService,
],
)
# new typing type wrapping types from the 'securityConfig' field of 'OpenMetadataConnection' class
@ -70,13 +85,13 @@ class SecretsManager(metaclass=Singleton):
"""
@abstractmethod
def add_service_config_connection(
def retrieve_service_connection(
self,
service: ServiceConnectionType,
service: ServiceWithConnectionType,
service_type: str,
) -> None:
) -> ServiceConnection:
"""
Add the service connection config from the secret manager to a given service connection object.
Retrieve the service connection from the secret manager to a given service connection object.
:param service: Service connection object e.g. DatabaseConnection
:param service_type: Service type e.g. databaseService
"""
@ -90,15 +105,6 @@ class SecretsManager(metaclass=Singleton):
"""
pass
@staticmethod
def to_service_simple(service_type: str) -> str:
"""
Return the service simple name.
:param service_type: Service type e.g. databaseService
:return:
"""
return service_type.replace("Service", "").lower()
@staticmethod
def build_secret_id(*args: str) -> str:
"""
@ -111,7 +117,8 @@ class SecretsManager(metaclass=Singleton):
secret_suffix = "-".join([arg.lower() for arg in args])
return f"openmetadata-{secret_suffix}"
def get_service_connection_class(self, service_type: str) -> object:
@staticmethod
def get_service_connection_class(service_type: str) -> object:
"""
Returns the located service object by dotted path, importing as necessary.
:param service_type: Service type e.g. databaseService
@ -121,20 +128,20 @@ class SecretsManager(metaclass=Singleton):
(
clazz[1]
for clazz in inspect.getmembers(
locate(f"metadata.generated.schema.entity.services.{service_type}"),
locate(
f"metadata.generated.schema.entity.services.{service_type}Service"
),
inspect.isclass,
)
if clazz[0].lower()
== f"{self.to_service_simple(service_type)}connection"
if clazz[0].lower() == f"{service_type}connection"
)
).__name__
return locate(
f"metadata.generated.schema.entity.services.{service_type}.{service_conn_name}"
f"metadata.generated.schema.entity.services.{service_type}Service.{service_conn_name}"
)
def get_connection_class(
self, service_type: str, service_connection_type: str
) -> object:
@staticmethod
def get_connection_class(service_type: str, service_connection_type: str) -> object:
"""
Returns the located connection object by dotted path, importing as necessary.
:param service_type: Service type e.g. databaseService
@ -145,7 +152,7 @@ class SecretsManager(metaclass=Singleton):
service_connection_type[0].lower() + service_connection_type[1:]
)
return locate(
f"metadata.generated.schema.entity.services.connections.{self.to_service_simple(service_type)}.{connection_py_file}Connection.{service_connection_type}Connection"
f"metadata.generated.schema.entity.services.connections.{service_type}.{connection_py_file}Connection.{service_connection_type}Connection"
)
@ -162,15 +169,15 @@ class LocalSecretsManager(SecretsManager):
"""
pass
def add_service_config_connection(
def retrieve_service_connection(
self,
service: ServiceConnectionType,
service: ServiceWithConnectionType,
service_type: str,
) -> None:
) -> ServiceConnection:
"""
The LocalSecretsManager does not modify the ServiceConnection object
"""
pass
return ServiceConnection(__root__=service.connection)
class AWSSecretsManager(SecretsManager):
@ -182,25 +189,26 @@ class AWSSecretsManager(SecretsManager):
)
self.secretsmanager_client = session.client("secretsmanager")
def add_service_config_connection(
def retrieve_service_connection(
self,
service: ServiceConnectionType,
service: ServiceWithConnectionType,
service_type: str,
) -> None:
) -> ServiceConnection:
service_connection_type = service.serviceType.value
service_name = service.name.__root__
secret_id = self.build_secret_id(
self.to_service_simple(service_type), service_connection_type, service_name
service_type, service_connection_type, service_name
)
connection_class = self.get_connection_class(
service_type, service_connection_type
)
service_conn_class = self.get_service_connection_class(service_type)
service.connection = service_conn_class(
service_connection = service_conn_class(
config=connection_class.parse_obj(
json.loads(self._get_string_value(secret_id))
)
)
return ServiceConnection(__root__=service_connection)
def add_auth_provider_security_config(self, config: OpenMetadataConnection) -> None:
if config.authProvider == AuthProvider.no_auth:
@ -231,7 +239,7 @@ class AWSSecretsManager(SecretsManager):
try:
kwargs = {"SecretId": name}
response = self.secretsmanager_client.get_secret_value(**kwargs)
logger.info("Got value for secret %s.", name)
logger.debug("Got value for secret %s.", name)
except ClientError:
logger.exception("Couldn't get value for secret %s.", name)
raise

View File

@ -9,7 +9,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Hosts the singledispatch to build source URLs
"""

View File

@ -8,6 +8,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from copy import copy
from unittest import TestCase
from unittest.mock import Mock, patch

View File

@ -0,0 +1,52 @@
import pytest
from metadata.generated.schema.entity.services.dashboardService import DashboardService
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.entity.services.messagingService import MessagingService
from metadata.generated.schema.entity.services.metadataService import MetadataService
from metadata.generated.schema.entity.services.mlmodelService import MlModelService
from metadata.generated.schema.entity.services.pipelineService import PipelineService
from metadata.generated.schema.entity.services.serviceType import ServiceType
from metadata.utils.class_helper import (
get_service_class_from_service_type,
get_service_type_from_source_type,
)
@pytest.mark.parametrize(
("source_type", "expected_service_type"),
[
("looker", ServiceType.Dashboard),
("mysql", ServiceType.Database),
("kafka", ServiceType.Messaging),
("amundsen", ServiceType.Metadata),
("mlflow", ServiceType.MlModel),
("airflow", ServiceType.Pipeline),
("clickhouse_usage", ServiceType.Database),
("sample-data", ServiceType.Database),
("redshift-usage", ServiceType.Database),
],
)
def test_get_service_type_from_source_type(
source_type: str, expected_service_type: ServiceType
):
actual_service_type = get_service_type_from_source_type(source_type)
assert actual_service_type == expected_service_type
@pytest.mark.parametrize(
("service_type", "expected_service_class"),
[
(ServiceType.Dashboard, DashboardService),
(ServiceType.Database, DatabaseService),
(ServiceType.Messaging, MessagingService),
(ServiceType.Metadata, MetadataService),
(ServiceType.MlModel, MlModelService),
(ServiceType.Pipeline, PipelineService),
],
)
def test_get_service_class_from_service_type(
service_type: ServiceType, expected_service_class: object
):
actual_service_class = get_service_class_from_service_type(service_type)
assert actual_service_class == expected_service_class

View File

@ -27,6 +27,9 @@ from metadata.generated.schema.entity.services.connections.metadata.openMetadata
OpenMetadataConnection,
SecretsManagerProvider,
)
from metadata.generated.schema.entity.services.connections.serviceConnection import (
ServiceConnection,
)
from metadata.generated.schema.entity.services.databaseService import (
DatabaseConnection,
DatabaseService,
@ -55,8 +58,9 @@ AUTH_PROVIDER_CONFIG = {"secretKey": "/fake/path"}
class TestSecretsManager(TestCase):
service_type: str = "databaseService"
service_type: str = "database"
service: DatabaseService
service_connection: ServiceConnection
database_connection = MysqlConnection(**DATABASE_CONNECTION)
auth_provider_config = GoogleSSOClientConfig(**AUTH_PROVIDER_CONFIG)
om_connection: OpenMetadataConnection
@ -64,6 +68,8 @@ class TestSecretsManager(TestCase):
@classmethod
def setUpClass(cls) -> None:
cls.service = DatabaseService(**DATABASE_SERVICE)
cls.service.connection = DatabaseConnection(config=cls.database_connection)
cls.service_connection = ServiceConnection(__root__=cls.service.connection)
cls.om_connection = OpenMetadataConnection(
authProvider=AuthProvider.google,
hostPort="http://localhost:8585/api",
@ -75,13 +81,16 @@ class TestSecretsManager(TestCase):
def test_local_manager_add_service_config_connection(self):
local_manager = get_secrets_manager(SecretsManagerProvider.local, None)
self.service.connection.config = self.database_connection
expected_service = deepcopy(self.service)
expected_service_connection = self.service_connection
local_manager.add_service_config_connection(self.service, self.service_type)
actual_service_connection: ServiceConnection = (
local_manager.retrieve_service_connection(self.service, self.service_type)
)
self.assertEqual(expected_service, self.service)
assert id(self.database_connection) == id(self.service.connection.config)
self.assertEqual(actual_service_connection, expected_service_connection)
assert id(actual_service_connection.__root__.config) == id(
expected_service_connection.__root__.config
)
def test_local_manager_add_auth_provider_security_config(self):
local_manager = get_secrets_manager(SecretsManagerProvider.local, None)
@ -98,21 +107,23 @@ class TestSecretsManager(TestCase):
aws_manager = self._build_secret_manager(
boto3_mock, {"SecretString": json.dumps(DATABASE_CONNECTION)}
)
expected_service = deepcopy(self.service)
expected_service.connection.config = self.database_connection
self.service.connection = None
expected_service_connection = self.service_connection
aws_manager.add_service_config_connection(self.service, self.service_type)
actual_service_connection: ServiceConnection = (
aws_manager.retrieve_service_connection(self.service, self.service_type)
)
self.assertEqual(expected_service, self.service)
assert id(self.database_connection) != id(self.service.connection.config)
self.assertEqual(expected_service_connection, actual_service_connection)
assert id(actual_service_connection.__root__.config) != id(
expected_service_connection.__root__.config
)
@patch("metadata.utils.secrets_manager.boto3")
def test_aws_manager_fails_add_auth_provider_security_config(self, mocked_boto3):
aws_manager = self._build_secret_manager(mocked_boto3, {})
with self.assertRaises(ValueError) as value_error:
aws_manager.add_service_config_connection(self.service, self.service_type)
aws_manager.retrieve_service_connection(self.service, self.service_type)
self.assertEqual(
"[SecretString] not present in the response.", value_error.exception
)
@ -137,7 +148,7 @@ class TestSecretsManager(TestCase):
aws_manager = self._build_secret_manager(mocked_boto3, {})
with self.assertRaises(ValueError) as value_error:
aws_manager.add_service_config_connection(self.service, self.service_type)
aws_manager.retrieve_service_connection(self.service, self.service_type)
self.assertEqual(
"[SecretString] not present in the response.", value_error.exception
)

View File

@ -9,7 +9,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from unittest import TestCase
from metadata.generated.schema.entity.services.connections.database.athenaConnection import (

View File

@ -99,8 +99,6 @@ def build_source(ingestion_pipeline: IngestionPipeline) -> WorkflowSource:
if not service:
raise ValueError(f"Could not get service from type {service_type}")
metadata.secrets_manager_client.add_service_config_connection(service, service_type)
return WorkflowSource(
type=service.serviceType.value.lower(),
serviceName=service.name.__root__,

View File

@ -0,0 +1,10 @@
{
"$id": "https://open-metadata.org/schema/entity/services/serviceType.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Service Type",
"description": "This schema defines the service types entities which requires a connection.",
"type": "string",
"javaType": "org.openmetadata.catalog.entity.services.ServiceType",
"enum": ["Dashboard", "Database", "Messaging", "Metadata", "MlModel", "Pipeline"],
"additionalProperties": false
}

View File

@ -66,7 +66,7 @@
}
},
"additionalProperties": false,
"required": ["type", "serviceName", "serviceConnection", "sourceConfig"]
"required": ["type", "serviceName", "sourceConfig"]
},
"processor": {
"description": "Configuration for Processor Component in the OpenMetadata Ingestion Framework.",