mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2026-01-07 13:07:22 +00:00
Pylint api (#8040)
This commit is contained in:
parent
13f6f79f30
commit
ab2fabee63
@ -8,7 +8,9 @@
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""
|
||||
Abstract BulkSink definition to build a Workflow
|
||||
"""
|
||||
from abc import ABCMeta, abstractmethod
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Any, List
|
||||
|
||||
@ -8,7 +8,9 @@
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""
|
||||
Closeable abstract class to be extended by Workflow pieces
|
||||
"""
|
||||
from abc import abstractmethod
|
||||
|
||||
|
||||
|
||||
@ -8,7 +8,9 @@
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""
|
||||
Common definitions for configuration management
|
||||
"""
|
||||
from typing import Any, Optional, TypeVar
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
@ -8,7 +8,6 @@
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""
|
||||
Helper to parse workflow configurations
|
||||
"""
|
||||
@ -85,6 +84,26 @@ T = TypeVar("T", bound=BaseModel)
|
||||
# Sources which contain inner connections to validate
|
||||
HAS_INNER_CONNECTION = {"Airflow"}
|
||||
|
||||
# Build a service type map dynamically from JSON Schema covered types
|
||||
SERVICE_TYPE_MAP = {
|
||||
**{service: DatabaseConnection for service in DatabaseServiceType.__members__},
|
||||
**{service: DashboardConnection for service in DashboardServiceType.__members__},
|
||||
**{service: MessagingConnection for service in MessagingServiceType.__members__},
|
||||
**{service: MetadataConnection for service in MetadataServiceType.__members__},
|
||||
**{service: PipelineConnection for service in PipelineServiceType.__members__},
|
||||
**{service: MlModelConnection for service in MlModelServiceType.__members__},
|
||||
}
|
||||
|
||||
SOURCE_CONFIG_CLASS_MAP = {
|
||||
DashboardMetadataConfigType.DashboardMetadata.value: DashboardServiceMetadataPipeline,
|
||||
ProfilerConfigType.Profiler.value: DatabaseServiceProfilerPipeline,
|
||||
DatabaseUsageConfigType.DatabaseUsage.value: DatabaseServiceQueryUsagePipeline,
|
||||
MessagingMetadataConfigType.MessagingMetadata.value: MessagingServiceMetadataPipeline,
|
||||
PipelineMetadataConfigType.PipelineMetadata.value: PipelineServiceMetadataPipeline,
|
||||
MlModelMetadataConfigType.MlModelMetadata.value: MlModelServiceMetadataPipeline,
|
||||
DatabaseMetadataConfigType.DatabaseMetadata.value: DatabaseServiceMetadataPipeline,
|
||||
}
|
||||
|
||||
|
||||
class ParsingConfigurationError(Exception):
|
||||
"""A parsing configuration error has happened"""
|
||||
@ -111,18 +130,10 @@ def get_service_type(
|
||||
:param source_type: source string
|
||||
:return: service connection type
|
||||
"""
|
||||
if source_type in DatabaseServiceType.__members__:
|
||||
return DatabaseConnection
|
||||
if source_type in DashboardServiceType.__members__:
|
||||
return DashboardConnection
|
||||
if source_type in MessagingServiceType.__members__:
|
||||
return MessagingConnection
|
||||
if source_type in MetadataServiceType.__members__:
|
||||
return MetadataConnection
|
||||
if source_type in PipelineServiceType.__members__:
|
||||
return PipelineConnection
|
||||
if source_type in MlModelServiceType.__members__:
|
||||
return MlModelConnection
|
||||
service_tye = SERVICE_TYPE_MAP.get(source_type)
|
||||
|
||||
if service_tye:
|
||||
return service_tye
|
||||
|
||||
raise ValueError(f"Cannot find the service type of {source_type}")
|
||||
|
||||
@ -143,20 +154,11 @@ def get_source_config_class(
|
||||
:param source_config_type: source config type string
|
||||
:return: source config class
|
||||
"""
|
||||
if source_config_type == DashboardMetadataConfigType.DashboardMetadata.value:
|
||||
return DashboardServiceMetadataPipeline
|
||||
if source_config_type == ProfilerConfigType.Profiler.value:
|
||||
return DatabaseServiceProfilerPipeline
|
||||
if source_config_type == DatabaseUsageConfigType.DatabaseUsage.value:
|
||||
return DatabaseServiceQueryUsagePipeline
|
||||
if source_config_type == MessagingMetadataConfigType.MessagingMetadata.value:
|
||||
return MessagingServiceMetadataPipeline
|
||||
if source_config_type == PipelineMetadataConfigType.PipelineMetadata.value:
|
||||
return PipelineServiceMetadataPipeline
|
||||
if source_config_type == MlModelMetadataConfigType.MlModelMetadata.value:
|
||||
return MlModelServiceMetadataPipeline
|
||||
if source_config_type == DatabaseMetadataConfigType.DatabaseMetadata.value:
|
||||
return DatabaseServiceMetadataPipeline
|
||||
source_config_class = SOURCE_CONFIG_CLASS_MAP.get(source_config_type)
|
||||
|
||||
if source_config_type:
|
||||
return source_config_class
|
||||
|
||||
raise ValueError(f"Cannot find the service type of {source_config_type}")
|
||||
|
||||
|
||||
@ -379,8 +381,11 @@ def parse_workflow_config_gracefully(
|
||||
parse_server_config(config_dict)
|
||||
except (ValidationError, InvalidWorkflowException) as scoped_error:
|
||||
if isinstance(scoped_error, ValidationError):
|
||||
# Let's catch validations of internal Workflow models, not the Workflow itself
|
||||
object_error = (
|
||||
scoped_error.model.__name__ if scoped_error.model else "workflow"
|
||||
scoped_error.model.__name__
|
||||
if scoped_error.model is not None
|
||||
else "workflow"
|
||||
)
|
||||
raise ParsingConfigurationError(
|
||||
f"We encountered an error parsing the configuration of your {object_error}.\n"
|
||||
@ -388,16 +393,15 @@ def parse_workflow_config_gracefully(
|
||||
f"{_parse_validation_err(scoped_error)}"
|
||||
)
|
||||
raise scoped_error
|
||||
except Exception as runtime_error:
|
||||
runtime_error = (
|
||||
runtime_error.model.__name__ if runtime_error.model else "workflow"
|
||||
)
|
||||
except Exception: # Let's just raise the original error if any internal logic fails
|
||||
raise ParsingConfigurationError(
|
||||
f"We encountered an error parsing the configuration of your workflow.\n"
|
||||
"You might need to review your config based on the original cause of this failure:\n"
|
||||
f"{_parse_validation_err(original_error)}"
|
||||
)
|
||||
|
||||
raise ParsingConfigurationError("Uncaught error when parsing the workflow!")
|
||||
|
||||
|
||||
def parse_test_connection_request_gracefully(
|
||||
config_dict: dict,
|
||||
@ -431,3 +435,5 @@ def parse_test_connection_request_gracefully(
|
||||
cls=connection_class,
|
||||
message="Error parsing the connection config",
|
||||
)
|
||||
|
||||
raise ParsingConfigurationError("Uncaught error when parsing the workflow!")
|
||||
|
||||
@ -8,7 +8,9 @@
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""
|
||||
Abstract Processor definition to build a Workflow
|
||||
"""
|
||||
from abc import ABCMeta, abstractmethod
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Any, Generic, List
|
||||
@ -19,16 +21,20 @@ from metadata.generated.schema.entity.services.connections.metadata.openMetadata
|
||||
from metadata.ingestion.api.closeable import Closeable
|
||||
from metadata.ingestion.api.common import Entity
|
||||
from metadata.ingestion.api.status import Status
|
||||
from metadata.utils.logger import ingestion_logger
|
||||
|
||||
logger = ingestion_logger()
|
||||
|
||||
|
||||
# pylint: disable=duplicate-code
|
||||
@dataclass
|
||||
class ProcessorStatus(Status):
|
||||
records = 0
|
||||
records: List[str] = field(default_factory=list)
|
||||
warnings: List[Any] = field(default_factory=list)
|
||||
failures: List[Any] = field(default_factory=list)
|
||||
|
||||
def processed(self, record: Any):
|
||||
self.records += 1
|
||||
self.records.append(record)
|
||||
|
||||
def warning(self, info: Any) -> None:
|
||||
self.warnings.append(info)
|
||||
@ -41,7 +47,10 @@ class ProcessorStatus(Status):
|
||||
class ProfilerProcessorStatus(ProcessorStatus):
|
||||
entity: str = None
|
||||
|
||||
def failure(self, column: str, metric: str, reason: str) -> None:
|
||||
# Disabling linting here until we find a better way to handling workflow statuses
|
||||
def failure( # pylint: disable=arguments-differ
|
||||
self, column: str, metric: str, reason: str
|
||||
) -> None:
|
||||
self.failures.append({self.entity: {column: {metric: {reason}}}})
|
||||
|
||||
|
||||
|
||||
@ -8,7 +8,9 @@
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""
|
||||
Abstract Sink definition to build a Workflow
|
||||
"""
|
||||
from abc import ABCMeta, abstractmethod
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Any, Generic, List
|
||||
@ -21,6 +23,7 @@ from metadata.ingestion.api.common import Entity
|
||||
from metadata.ingestion.api.status import Status
|
||||
|
||||
|
||||
# pylint: disable=duplicate-code
|
||||
@dataclass
|
||||
class SinkStatus(Status):
|
||||
records: List[str] = field(default_factory=list)
|
||||
|
||||
@ -8,10 +8,12 @@
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""
|
||||
Abstract Source definition to build a Workflow
|
||||
"""
|
||||
import time
|
||||
from abc import ABCMeta, abstractmethod
|
||||
from typing import Any, Generic, Iterable, List
|
||||
from typing import Any, Dict, Generic, Iterable, List
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
@ -31,16 +33,22 @@ class InvalidSourceException(Exception):
|
||||
|
||||
|
||||
class SourceStatus(BaseModel, Status):
|
||||
"""
|
||||
Class to handle processed records
|
||||
and success %
|
||||
"""
|
||||
|
||||
records = 0
|
||||
source_start_time = time.time()
|
||||
|
||||
success: List[str] = list()
|
||||
failures: List[str] = list()
|
||||
warnings: List[str] = list()
|
||||
filtered: List[str] = list()
|
||||
success: List[Any] = []
|
||||
failures: List[Dict[str, str]] = []
|
||||
warnings: List[Dict[str, str]] = []
|
||||
filtered: List[Dict[str, str]] = []
|
||||
|
||||
def scanned(self, record: Any) -> None:
|
||||
self.records += 1
|
||||
self.success.append(record)
|
||||
|
||||
def warning(self, key: str, reason: str) -> None:
|
||||
self.warnings.append({key: reason})
|
||||
@ -60,6 +68,11 @@ class SourceStatus(BaseModel, Status):
|
||||
|
||||
|
||||
class Source(Closeable, Generic[Entity], metaclass=ABCMeta):
|
||||
"""
|
||||
Abstract source implementation. The workflow will run
|
||||
its next_record and pass them to the next step.
|
||||
"""
|
||||
|
||||
@classmethod
|
||||
@abstractmethod
|
||||
def create(
|
||||
|
||||
@ -8,7 +8,9 @@
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""
|
||||
Abstract Stage definition to build a Workflow
|
||||
"""
|
||||
from abc import ABCMeta, abstractmethod
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Any, Dict, Generic, List
|
||||
@ -22,10 +24,12 @@ from metadata.ingestion.api.status import Status
|
||||
class StageStatus(Status):
|
||||
records_produced = 0
|
||||
|
||||
records: List[str] = field(default_factory=list)
|
||||
warnings: Dict[str, List[str]] = field(default_factory=dict)
|
||||
failures: Dict[str, List[str]] = field(default_factory=dict)
|
||||
|
||||
def records_status(self, record: Any) -> None:
|
||||
self.records.append(record)
|
||||
self.records_produced += 1
|
||||
|
||||
def warning_status(self, key: str, reason: str) -> None:
|
||||
|
||||
@ -8,7 +8,9 @@
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""
|
||||
Status output utilities
|
||||
"""
|
||||
import json
|
||||
import pprint
|
||||
|
||||
|
||||
@ -167,7 +167,9 @@ class TopologyRunnerMixin(Generic[C]):
|
||||
self.context.__dict__[dependency].name.__root__
|
||||
for dependency in stage.consumer or [] # root nodes do not have consumers
|
||||
]
|
||||
return fqn._build(*context_names, entity_request.name.__root__)
|
||||
return fqn._build( # pylint: disable=protected-access
|
||||
*context_names, entity_request.name.__root__
|
||||
)
|
||||
|
||||
def sink_request(self, stage: NodeStage, entity_request: C) -> Iterable[Entity]:
|
||||
"""
|
||||
|
||||
@ -8,10 +8,14 @@
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""
|
||||
Workflow definition for metadata related ingestions: metadata, lineage and usage.
|
||||
"""
|
||||
# module building strings read better with .format instead of f-strings
|
||||
# pylint: disable=consider-using-f-string
|
||||
import importlib
|
||||
import traceback
|
||||
from typing import Type, TypeVar
|
||||
from typing import Optional, Type, TypeVar
|
||||
|
||||
from metadata.config.common import WorkflowExecutionError
|
||||
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
|
||||
@ -53,6 +57,13 @@ class InvalidWorkflowJSONException(Exception):
|
||||
|
||||
|
||||
class Workflow:
|
||||
"""
|
||||
Ingestion workflow implementation.
|
||||
|
||||
It gets the data from the sources and passes it to
|
||||
the processor + sink or stage + bulk sink.
|
||||
"""
|
||||
|
||||
config: OpenMetadataWorkflowConfig
|
||||
source: Source
|
||||
processor: Processor
|
||||
@ -61,7 +72,12 @@ class Workflow:
|
||||
bulk_sink: BulkSink
|
||||
report = {}
|
||||
|
||||
def __init__(self, config: OpenMetadataWorkflowConfig):
|
||||
def __init__(
|
||||
self, config: OpenMetadataWorkflowConfig
|
||||
): # pylint: disable=too-many-locals
|
||||
"""
|
||||
Disabling pylint to wait for workflow reimplementation as a topology
|
||||
"""
|
||||
self.config = config
|
||||
|
||||
set_loggers_level(config.workflowConfig.loggerLevel.value)
|
||||
@ -87,8 +103,8 @@ class Workflow:
|
||||
if source_type.startswith("custom")
|
||||
else "metadata.ingestion.source.{}.{}.{}Source".format(
|
||||
service_type.name.lower(),
|
||||
self.typeClassFetch(source_type, True),
|
||||
self.typeClassFetch(source_type, False),
|
||||
self.type_class_fetch(source_type, True),
|
||||
self.type_class_fetch(source_type, False),
|
||||
)
|
||||
)
|
||||
|
||||
@ -103,8 +119,8 @@ class Workflow:
|
||||
processor_type = self.config.processor.type
|
||||
processor_class = self.get(
|
||||
"metadata.ingestion.processor.{}.{}Processor".format(
|
||||
self.typeClassFetch(processor_type, True),
|
||||
self.typeClassFetch(processor_type, False),
|
||||
self.type_class_fetch(processor_type, True),
|
||||
self.type_class_fetch(processor_type, False),
|
||||
)
|
||||
)
|
||||
processor_config = self.config.processor.dict().get("config", {})
|
||||
@ -119,8 +135,8 @@ class Workflow:
|
||||
stage_type = self.config.stage.type
|
||||
stage_class = self.get(
|
||||
"metadata.ingestion.stage.{}.{}Stage".format(
|
||||
self.typeClassFetch(stage_type, True),
|
||||
self.typeClassFetch(stage_type, False),
|
||||
self.type_class_fetch(stage_type, True),
|
||||
self.type_class_fetch(stage_type, False),
|
||||
)
|
||||
)
|
||||
stage_config = self.config.stage.dict().get("config", {})
|
||||
@ -131,8 +147,8 @@ class Workflow:
|
||||
sink_type = self.config.sink.type
|
||||
sink_class = self.get(
|
||||
"metadata.ingestion.sink.{}.{}Sink".format(
|
||||
self.typeClassFetch(sink_type, True),
|
||||
self.typeClassFetch(sink_type, False),
|
||||
self.type_class_fetch(sink_type, True),
|
||||
self.type_class_fetch(sink_type, False),
|
||||
)
|
||||
)
|
||||
sink_config = self.config.sink.dict().get("config", {})
|
||||
@ -143,8 +159,8 @@ class Workflow:
|
||||
bulk_sink_type = self.config.bulkSink.type
|
||||
bulk_sink_class = self.get(
|
||||
"metadata.ingestion.bulksink.{}.{}BulkSink".format(
|
||||
self.typeClassFetch(bulk_sink_type, True),
|
||||
self.typeClassFetch(bulk_sink_type, False),
|
||||
self.type_class_fetch(bulk_sink_type, True),
|
||||
self.type_class_fetch(bulk_sink_type, False),
|
||||
)
|
||||
)
|
||||
bulk_sink_config = self.config.bulkSink.dict().get("config", {})
|
||||
@ -155,19 +171,21 @@ class Workflow:
|
||||
f"BulkSink type:{self.config.bulkSink.type},{bulk_sink_class} configured"
|
||||
)
|
||||
|
||||
def typeClassFetch(self, type: str, isFile: bool):
|
||||
if isFile:
|
||||
return type.replace("-", "_")
|
||||
else:
|
||||
return "".join([i.title() for i in type.replace("-", "_").split("_")])
|
||||
def type_class_fetch(self, type_: str, is_file: bool):
|
||||
if is_file:
|
||||
return type_.replace("-", "_")
|
||||
|
||||
def get(self, key: str) -> Type[T]:
|
||||
return "".join([i.title() for i in type_.replace("-", "_").split("_")])
|
||||
|
||||
def get(self, key: str) -> Optional[Type[T]]:
|
||||
if key.find(".") >= 0:
|
||||
# If the key contains a dot, we treat it as a import path and attempt
|
||||
# If the key contains a dot, we treat it as an import path and attempt
|
||||
# to load it dynamically.
|
||||
module_name, class_name = key.rsplit(".", 1)
|
||||
MyClass = getattr(importlib.import_module(module_name), class_name)
|
||||
return MyClass
|
||||
my_class = getattr(importlib.import_module(module_name), class_name)
|
||||
return my_class
|
||||
|
||||
return None
|
||||
|
||||
@classmethod
|
||||
def create(cls, config_dict: dict) -> "Workflow":
|
||||
@ -237,8 +255,8 @@ class Workflow:
|
||||
hasattr(self, "sink") and self.sink.get_status().failures
|
||||
):
|
||||
return 1
|
||||
else:
|
||||
return 0
|
||||
|
||||
return 0
|
||||
|
||||
def _retrieve_service_connection_if_needed(
|
||||
self, metadata_config: OpenMetadataConnection, service_type: ServiceType
|
||||
@ -271,7 +289,8 @@ class Workflow:
|
||||
except Exception as exc:
|
||||
logger.debug(traceback.format_exc())
|
||||
logger.error(
|
||||
f"Error getting dbtConfigSource for service name [{service_name}] using the secrets manager provider [{metadata.config.secretsManagerProvider}]: {exc}"
|
||||
f"Error getting dbtConfigSource for service name [{service_name}]"
|
||||
f" using the secrets manager provider [{metadata.config.secretsManagerProvider}]: {exc}"
|
||||
)
|
||||
|
||||
def _retrieve_dbt_config_source_if_needed(
|
||||
@ -307,7 +326,8 @@ class Workflow:
|
||||
except Exception as exc:
|
||||
logger.debug(traceback.format_exc())
|
||||
logger.error(
|
||||
f"Error getting dbtConfigSource for config [{config}] using the secrets manager provider [{metadata.config.secretsManagerProvider}]: {exc}"
|
||||
f"Error getting dbtConfigSource for config [{config}] using the secrets manager"
|
||||
f" provider [{metadata.config.secretsManagerProvider}]: {exc}"
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user