Messaging & MlModel Source Lint (#8008)

* Messaging Source Lint

* MlModel Source Lint
This commit is contained in:
Mayur Singal 2022-10-10 10:16:24 +05:30 committed by GitHub
parent 69ddc7081c
commit 4e51c70dcc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 42 additions and 23 deletions

View File

@ -9,6 +9,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Common Broker for fetching metadata
"""
import concurrent.futures
import traceback
from abc import ABC
@ -45,6 +49,11 @@ class BrokerTopicDetails(BaseModel):
class CommonBrokerSource(MessagingServiceSource, ABC):
"""
Common Broker Source Class
to fetch topics from Broker based sources
"""
def __init__(
self,
config: WorkflowSource,
@ -74,9 +83,9 @@ class CommonBrokerSource(MessagingServiceSource, ABC):
self, topic_details: BrokerTopicDetails
) -> Iterable[CreateTopicRequest]:
try:
logger.info("Fetching topic schema {}".format(topic_details.topic_name))
logger.info(f"Fetching topic schema {topic_details.topic_name}")
topic_schema = self._parse_topic_metadata(topic_details.topic_name)
logger.info("Fetching topic config {}".format(topic_details.topic_name))
logger.info(f"Fetching topic config {topic_details.topic_name}")
topic = CreateTopicRequest(
name=topic_details.topic_name,
service=EntityReference(
@ -121,7 +130,7 @@ class CommonBrokerSource(MessagingServiceSource, ABC):
f"Unexpected exception to yield topic [{topic_details.topic_name}]: {exc}"
)
self.status.failures.append(
"{}.{}".format(self.config.serviceName, topic_details.topic_name)
f"{self.config.serviceName}.{topic_details.topic_name}"
)
@staticmethod
@ -171,14 +180,13 @@ class CommonBrokerSource(MessagingServiceSource, ABC):
topic_name + "-value"
)
return registered_schema.schema
return None
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Failed to get schema for topic [{topic_name}]: {exc}")
self.status.warning(
topic_name, f"failed to get schema: {exc} for topic {topic_name}"
)
return None
def _get_sample_data(self, topic_name):
sample_data = []
@ -198,7 +206,7 @@ class CommonBrokerSource(MessagingServiceSource, ABC):
for message in messages:
sample_data.append(
str(
self.consumer_client._serializer.decode_message(
self.consumer_client._serializer.decode_message( # pylint: disable=protected-access
message.value()
)
)

View File

@ -8,7 +8,9 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Kafka source ingestion
"""
from metadata.generated.schema.entity.services.connections.messaging.kafkaConnection import (
KafkaConnection,
)

View File

@ -39,7 +39,6 @@ from metadata.ingestion.models.topology import (
create_source_context,
)
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils import fqn
from metadata.utils.connections import get_connection, test_connection
from metadata.utils.filters import filter_by_topic
@ -84,8 +83,8 @@ class MessagingSourceStatus(SourceStatus):
Reports the source status after ingestion
"""
topics_scanned: List[str] = list()
filtered: List[str] = list()
topics_scanned: List[str] = []
filtered: List[str] = []
def topic_scanned(self, topic: str) -> None:
self.topics_scanned.append(topic)
@ -101,7 +100,7 @@ class MessagingServiceSource(TopologyRunnerMixin, Source, ABC):
"""
@abstractmethod
def yield_topic(self, messaging_details: Any) -> Iterable[CreateTopicRequest]:
def yield_topic(self, topic_details: Any) -> Iterable[CreateTopicRequest]:
"""
Method to Get Messaging Entity
"""

View File

@ -8,7 +8,9 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
RedPanda source ingestion
"""
from metadata.generated.schema.entity.services.connections.messaging.redpandaConnection import (
RedpandaConnection,
)

View File

@ -24,7 +24,6 @@ from metadata.generated.schema.entity.data.mlmodel import (
FeatureType,
MlFeature,
MlHyperParameter,
MlModel,
MlStore,
)
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
@ -39,7 +38,6 @@ from metadata.generated.schema.metadataIngestion.workflow import (
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.source import InvalidSourceException
from metadata.ingestion.source.mlmodel.mlmodel_service import MlModelServiceSource
from metadata.utils import fqn
from metadata.utils.filters import filter_by_mlmodel
from metadata.utils.logger import ingestion_logger
@ -64,7 +62,11 @@ class MlflowSource(MlModelServiceSource):
)
return cls(config, metadata_config)
def get_mlmodels(self) -> Iterable[Tuple[RegisteredModel, ModelVersion]]:
def get_mlmodels(
self,
) -> Iterable[
Tuple[RegisteredModel, ModelVersion]
]: # pylint: disable=arguments-differ
"""
List and filters models from the registry
"""
@ -93,11 +95,11 @@ class MlflowSource(MlModelServiceSource):
yield model, latest_version
def _get_algorithm(self) -> str:
def _get_algorithm(self) -> str: # pylint: disable=arguments-differ
logger.info("Setting algorithm with default value `mlmodel` for Mlflow")
return "mlmodel"
def yield_mlmodel(
def yield_mlmodel( # pylint: disable=arguments-differ
self, model_and_version: Tuple[RegisteredModel, ModelVersion]
) -> Iterable[CreateMlModelRequest]:
"""
@ -123,7 +125,11 @@ class MlflowSource(MlModelServiceSource):
)
@staticmethod
def _get_hyper_params(data: RunData) -> Optional[List[MlHyperParameter]]:
def _get_hyper_params(
data: RunData,
) -> Optional[
List[MlHyperParameter]
]: # pylint: disable=arguments-differ,arguments-renamed
"""
Get the hyper parameters from the parameters
logged in the run data object.
@ -148,7 +154,9 @@ class MlflowSource(MlModelServiceSource):
return None
@staticmethod
def _get_ml_store(version: ModelVersion) -> Optional[MlStore]:
def _get_ml_store(
version: ModelVersion,
) -> Optional[MlStore]: # pylint: disable=arguments-differ, arguments-renamed
"""
Get the Ml Store from the model version object
"""
@ -167,7 +175,7 @@ class MlflowSource(MlModelServiceSource):
)
return None
def _get_ml_features(
def _get_ml_features( # pylint: disable=arguments-differ
self, data: RunData, run_id: str, model_name: str
) -> Optional[List[MlFeature]]:
"""

View File

@ -89,9 +89,9 @@ class MlModelSourceStatus(SourceStatus):
ML Model specific Status
"""
success: List[str] = list()
failures: List[str] = list()
warnings: List[str] = list()
success: List[str] = []
failures: List[str] = []
warnings: List[str] = []
def scanned(self, record: str) -> None:
"""