From 199fe8753ad3af9e62545d127fd504f38a04091e Mon Sep 17 00:00:00 2001 From: Mayur Singal <39544459+ulixius9@users.noreply.github.com> Date: Fri, 14 Apr 2023 22:48:38 +0530 Subject: [PATCH] Fix Top Level Imports (#11075) --- .../metadata/ingestion/sink/elasticsearch.py | 10 ++++--- .../source/database/datalake/utils.py | 28 ++++++++++--------- 2 files changed, 21 insertions(+), 17 deletions(-) diff --git a/ingestion/src/metadata/ingestion/sink/elasticsearch.py b/ingestion/src/metadata/ingestion/sink/elasticsearch.py index 0dfc81c5974..c378a33e20b 100644 --- a/ingestion/src/metadata/ingestion/sink/elasticsearch.py +++ b/ingestion/src/metadata/ingestion/sink/elasticsearch.py @@ -19,11 +19,9 @@ import json import ssl import traceback from functools import singledispatch -from typing import Any, List, Optional +from typing import Any, List, Optional, Tuple import boto3 -from elasticsearch import Elasticsearch, RequestsHttpConnection -from elasticsearch.connection import create_ssl_context from requests_aws4auth import AWS4Auth from metadata.config.common import ConfigModel @@ -169,6 +167,10 @@ class ElasticsearchSink(Sink[Entity]): config: ElasticSearchConfig, metadata_config: OpenMetadataConnection, ) -> None: + # pylint: disable=import-outside-toplevel + from elasticsearch import Elasticsearch, RequestsHttpConnection + from elasticsearch.connection import create_ssl_context + super().__init__() self.config = config self.metadata_config = metadata_config @@ -453,7 +455,7 @@ def _parse_columns( ) -def get_es_tag_list_and_tier(record: Entity) -> (List[dict], Optional[str]): +def get_es_tag_list_and_tier(record: Entity) -> Tuple[List[dict], Optional[str]]: """ Build ES tag list from any Entity """ diff --git a/ingestion/src/metadata/ingestion/source/database/datalake/utils.py b/ingestion/src/metadata/ingestion/source/database/datalake/utils.py index f90c21439a5..d36e3b227d5 100644 --- a/ingestion/src/metadata/ingestion/source/database/datalake/utils.py +++ b/ingestion/src/metadata/ingestion/source/database/datalake/utils.py @@ -19,7 +19,6 @@ import json import zipfile from typing import List, Union -import pandas as pd from avro.datafile import DataFileReader from avro.errors import InvalidAvroBinaryEncoding from avro.io import DatumReader @@ -49,10 +48,13 @@ COMPLEX_COLUMN_SEPARATOR = "_##" def read_from_avro( avro_text: bytes, -) -> Union[DatalakeColumnWrapper, List[pd.DataFrame]]: +) -> Union[DatalakeColumnWrapper, List]: """ Method to parse the avro data from storage sources """ + # pylint: disable=import-outside-toplevel + from pandas import DataFrame, Series + try: elements = DataFileReader(io.BytesIO(avro_text), DatumReader()) if elements.meta.get(AVRO_SCHEMA): @@ -60,20 +62,16 @@ def read_from_avro( columns=parse_avro_schema( schema=elements.meta.get(AVRO_SCHEMA).decode(UTF_8), cls=Column ), - dataframes=[pd.DataFrame.from_records(elements)], + dataframes=[DataFrame.from_records(elements)], ) - return [pd.DataFrame.from_records(elements)] + return [DataFrame.from_records(elements)] except (AssertionError, InvalidAvroBinaryEncoding): columns = parse_avro_schema(schema=avro_text, cls=Column) field_map = { - col.name.__root__: pd.Series( - PD_AVRO_FIELD_MAP.get(col.dataType.value, "str") - ) + col.name.__root__: Series(PD_AVRO_FIELD_MAP.get(col.dataType.value, "str")) for col in columns } - return DatalakeColumnWrapper( - columns=columns, dataframes=[pd.DataFrame(field_map)] - ) + return DatalakeColumnWrapper(columns=columns, dataframes=[DataFrame(field_map)]) def _get_json_text(key: str, text: bytes, decode: bool) -> str: @@ -89,10 +87,14 @@ def _get_json_text(key: str, text: bytes, decode: bool) -> str: def read_from_json( key: str, json_text: str, sample_size: int = 100, decode: bool = False -) -> List[pd.DataFrame]: +) -> List: """ Read the json file from the azure container and return a dataframe """ + + # pylint: disable=import-outside-toplevel + from pandas import json_normalize + json_text = _get_json_text(key, json_text, decode) try: data = json.loads(json_text) @@ -104,5 +106,5 @@ def read_from_json( ] if isinstance(data, list): - return [pd.json_normalize(data[:sample_size], sep=COMPLEX_COLUMN_SEPARATOR)] - return [pd.json_normalize(data, sep=COMPLEX_COLUMN_SEPARATOR)] + return [json_normalize(data[:sample_size], sep=COMPLEX_COLUMN_SEPARATOR)] + return [json_normalize(data, sep=COMPLEX_COLUMN_SEPARATOR)]