diff --git a/ingestion/src/metadata/ingestion/source/messaging/common_broker_source.py b/ingestion/src/metadata/ingestion/source/messaging/common_broker_source.py index d82768d825e..19f9e162ffb 100644 --- a/ingestion/src/metadata/ingestion/source/messaging/common_broker_source.py +++ b/ingestion/src/metadata/ingestion/source/messaging/common_broker_source.py @@ -9,6 +9,10 @@ # See the License for the specific language governing permissions and # limitations under the License. +""" +Common Broker for fetching metadata +""" + import concurrent.futures import traceback from abc import ABC @@ -45,6 +49,11 @@ class BrokerTopicDetails(BaseModel): class CommonBrokerSource(MessagingServiceSource, ABC): + """ + Common Broker Source Class + to fetch topics from Broker based sources + """ + def __init__( self, config: WorkflowSource, @@ -74,9 +83,9 @@ class CommonBrokerSource(MessagingServiceSource, ABC): self, topic_details: BrokerTopicDetails ) -> Iterable[CreateTopicRequest]: try: - logger.info("Fetching topic schema {}".format(topic_details.topic_name)) + logger.info(f"Fetching topic schema {topic_details.topic_name}") topic_schema = self._parse_topic_metadata(topic_details.topic_name) - logger.info("Fetching topic config {}".format(topic_details.topic_name)) + logger.info(f"Fetching topic config {topic_details.topic_name}") topic = CreateTopicRequest( name=topic_details.topic_name, service=EntityReference( @@ -121,7 +130,7 @@ class CommonBrokerSource(MessagingServiceSource, ABC): f"Unexpected exception to yield topic [{topic_details.topic_name}]: {exc}" ) self.status.failures.append( - "{}.{}".format(self.config.serviceName, topic_details.topic_name) + f"{self.config.serviceName}.{topic_details.topic_name}" ) @staticmethod @@ -171,14 +180,13 @@ class CommonBrokerSource(MessagingServiceSource, ABC): topic_name + "-value" ) return registered_schema.schema - - return None except Exception as exc: logger.debug(traceback.format_exc()) logger.warning(f"Failed to get schema for topic [{topic_name}]: {exc}") self.status.warning( topic_name, f"failed to get schema: {exc} for topic {topic_name}" ) + return None def _get_sample_data(self, topic_name): sample_data = [] @@ -198,7 +206,7 @@ class CommonBrokerSource(MessagingServiceSource, ABC): for message in messages: sample_data.append( str( - self.consumer_client._serializer.decode_message( + self.consumer_client._serializer.decode_message( # pylint: disable=protected-access message.value() ) ) diff --git a/ingestion/src/metadata/ingestion/source/messaging/kafka.py b/ingestion/src/metadata/ingestion/source/messaging/kafka.py index 0054047e87a..f76ef0158aa 100644 --- a/ingestion/src/metadata/ingestion/source/messaging/kafka.py +++ b/ingestion/src/metadata/ingestion/source/messaging/kafka.py @@ -8,7 +8,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +""" +Kafka source ingestion +""" from metadata.generated.schema.entity.services.connections.messaging.kafkaConnection import ( KafkaConnection, ) diff --git a/ingestion/src/metadata/ingestion/source/messaging/messaging_service.py b/ingestion/src/metadata/ingestion/source/messaging/messaging_service.py index eb92640cae9..71e15b2f291 100644 --- a/ingestion/src/metadata/ingestion/source/messaging/messaging_service.py +++ b/ingestion/src/metadata/ingestion/source/messaging/messaging_service.py @@ -39,7 +39,6 @@ from metadata.ingestion.models.topology import ( create_source_context, ) from metadata.ingestion.ometa.ometa_api import OpenMetadata -from metadata.utils import fqn from metadata.utils.connections import get_connection, test_connection from metadata.utils.filters import filter_by_topic @@ -84,8 +83,8 @@ class MessagingSourceStatus(SourceStatus): Reports the source status after ingestion """ - topics_scanned: List[str] = list() - filtered: List[str] = list() + topics_scanned: List[str] = [] + filtered: List[str] = [] def topic_scanned(self, topic: str) -> None: self.topics_scanned.append(topic) @@ -101,7 +100,7 @@ class MessagingServiceSource(TopologyRunnerMixin, Source, ABC): """ @abstractmethod - def yield_topic(self, messaging_details: Any) -> Iterable[CreateTopicRequest]: + def yield_topic(self, topic_details: Any) -> Iterable[CreateTopicRequest]: """ Method to Get Messaging Entity """ diff --git a/ingestion/src/metadata/ingestion/source/messaging/redpanda.py b/ingestion/src/metadata/ingestion/source/messaging/redpanda.py index 1fe012b830f..7fd71c0cf0c 100644 --- a/ingestion/src/metadata/ingestion/source/messaging/redpanda.py +++ b/ingestion/src/metadata/ingestion/source/messaging/redpanda.py @@ -8,7 +8,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +""" +RedPanda source ingestion +""" from metadata.generated.schema.entity.services.connections.messaging.redpandaConnection import ( RedpandaConnection, ) diff --git a/ingestion/src/metadata/ingestion/source/mlmodel/mlflow.py b/ingestion/src/metadata/ingestion/source/mlmodel/mlflow.py index ed6463284ad..a64ab2497e6 100644 --- a/ingestion/src/metadata/ingestion/source/mlmodel/mlflow.py +++ b/ingestion/src/metadata/ingestion/source/mlmodel/mlflow.py @@ -24,7 +24,6 @@ from metadata.generated.schema.entity.data.mlmodel import ( FeatureType, MlFeature, MlHyperParameter, - MlModel, MlStore, ) from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( @@ -39,7 +38,6 @@ from metadata.generated.schema.metadataIngestion.workflow import ( from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.source import InvalidSourceException from metadata.ingestion.source.mlmodel.mlmodel_service import MlModelServiceSource -from metadata.utils import fqn from metadata.utils.filters import filter_by_mlmodel from metadata.utils.logger import ingestion_logger @@ -64,7 +62,11 @@ class MlflowSource(MlModelServiceSource): ) return cls(config, metadata_config) - def get_mlmodels(self) -> Iterable[Tuple[RegisteredModel, ModelVersion]]: + def get_mlmodels( + self, + ) -> Iterable[ + Tuple[RegisteredModel, ModelVersion] + ]: # pylint: disable=arguments-differ """ List and filters models from the registry """ @@ -93,11 +95,11 @@ class MlflowSource(MlModelServiceSource): yield model, latest_version - def _get_algorithm(self) -> str: + def _get_algorithm(self) -> str: # pylint: disable=arguments-differ logger.info("Setting algorithm with default value `mlmodel` for Mlflow") return "mlmodel" - def yield_mlmodel( + def yield_mlmodel( # pylint: disable=arguments-differ self, model_and_version: Tuple[RegisteredModel, ModelVersion] ) -> Iterable[CreateMlModelRequest]: """ @@ -123,7 +125,11 @@ class MlflowSource(MlModelServiceSource): ) @staticmethod - def _get_hyper_params(data: RunData) -> Optional[List[MlHyperParameter]]: + def _get_hyper_params( + data: RunData, + ) -> Optional[ + List[MlHyperParameter] + ]: # pylint: disable=arguments-differ,arguments-renamed """ Get the hyper parameters from the parameters logged in the run data object. @@ -148,7 +154,9 @@ class MlflowSource(MlModelServiceSource): return None @staticmethod - def _get_ml_store(version: ModelVersion) -> Optional[MlStore]: + def _get_ml_store( + version: ModelVersion, + ) -> Optional[MlStore]: # pylint: disable=arguments-differ, arguments-renamed """ Get the Ml Store from the model version object """ @@ -167,7 +175,7 @@ class MlflowSource(MlModelServiceSource): ) return None - def _get_ml_features( + def _get_ml_features( # pylint: disable=arguments-differ self, data: RunData, run_id: str, model_name: str ) -> Optional[List[MlFeature]]: """ diff --git a/ingestion/src/metadata/ingestion/source/mlmodel/mlmodel_service.py b/ingestion/src/metadata/ingestion/source/mlmodel/mlmodel_service.py index aa4f78124f3..a209d9de447 100644 --- a/ingestion/src/metadata/ingestion/source/mlmodel/mlmodel_service.py +++ b/ingestion/src/metadata/ingestion/source/mlmodel/mlmodel_service.py @@ -89,9 +89,9 @@ class MlModelSourceStatus(SourceStatus): ML Model specific Status """ - success: List[str] = list() - failures: List[str] = list() - warnings: List[str] = list() + success: List[str] = [] + failures: List[str] = [] + warnings: List[str] = [] def scanned(self, record: str) -> None: """