Fix Top Level Imports (#11075)

This commit is contained in:
Mayur Singal 2023-04-14 22:48:38 +05:30 committed by GitHub
parent ae984d1808
commit 199fe8753a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 21 additions and 17 deletions

View File

@ -19,11 +19,9 @@ import json
import ssl import ssl
import traceback import traceback
from functools import singledispatch from functools import singledispatch
from typing import Any, List, Optional from typing import Any, List, Optional, Tuple
import boto3 import boto3
from elasticsearch import Elasticsearch, RequestsHttpConnection
from elasticsearch.connection import create_ssl_context
from requests_aws4auth import AWS4Auth from requests_aws4auth import AWS4Auth
from metadata.config.common import ConfigModel from metadata.config.common import ConfigModel
@ -169,6 +167,10 @@ class ElasticsearchSink(Sink[Entity]):
config: ElasticSearchConfig, config: ElasticSearchConfig,
metadata_config: OpenMetadataConnection, metadata_config: OpenMetadataConnection,
) -> None: ) -> None:
# pylint: disable=import-outside-toplevel
from elasticsearch import Elasticsearch, RequestsHttpConnection
from elasticsearch.connection import create_ssl_context
super().__init__() super().__init__()
self.config = config self.config = config
self.metadata_config = metadata_config self.metadata_config = metadata_config
@ -453,7 +455,7 @@ def _parse_columns(
) )
def get_es_tag_list_and_tier(record: Entity) -> (List[dict], Optional[str]): def get_es_tag_list_and_tier(record: Entity) -> Tuple[List[dict], Optional[str]]:
""" """
Build ES tag list from any Entity Build ES tag list from any Entity
""" """

View File

@ -19,7 +19,6 @@ import json
import zipfile import zipfile
from typing import List, Union from typing import List, Union
import pandas as pd
from avro.datafile import DataFileReader from avro.datafile import DataFileReader
from avro.errors import InvalidAvroBinaryEncoding from avro.errors import InvalidAvroBinaryEncoding
from avro.io import DatumReader from avro.io import DatumReader
@ -49,10 +48,13 @@ COMPLEX_COLUMN_SEPARATOR = "_##"
def read_from_avro( def read_from_avro(
avro_text: bytes, avro_text: bytes,
) -> Union[DatalakeColumnWrapper, List[pd.DataFrame]]: ) -> Union[DatalakeColumnWrapper, List]:
""" """
Method to parse the avro data from storage sources Method to parse the avro data from storage sources
""" """
# pylint: disable=import-outside-toplevel
from pandas import DataFrame, Series
try: try:
elements = DataFileReader(io.BytesIO(avro_text), DatumReader()) elements = DataFileReader(io.BytesIO(avro_text), DatumReader())
if elements.meta.get(AVRO_SCHEMA): if elements.meta.get(AVRO_SCHEMA):
@ -60,20 +62,16 @@ def read_from_avro(
columns=parse_avro_schema( columns=parse_avro_schema(
schema=elements.meta.get(AVRO_SCHEMA).decode(UTF_8), cls=Column schema=elements.meta.get(AVRO_SCHEMA).decode(UTF_8), cls=Column
), ),
dataframes=[pd.DataFrame.from_records(elements)], dataframes=[DataFrame.from_records(elements)],
) )
return [pd.DataFrame.from_records(elements)] return [DataFrame.from_records(elements)]
except (AssertionError, InvalidAvroBinaryEncoding): except (AssertionError, InvalidAvroBinaryEncoding):
columns = parse_avro_schema(schema=avro_text, cls=Column) columns = parse_avro_schema(schema=avro_text, cls=Column)
field_map = { field_map = {
col.name.__root__: pd.Series( col.name.__root__: Series(PD_AVRO_FIELD_MAP.get(col.dataType.value, "str"))
PD_AVRO_FIELD_MAP.get(col.dataType.value, "str")
)
for col in columns for col in columns
} }
return DatalakeColumnWrapper( return DatalakeColumnWrapper(columns=columns, dataframes=[DataFrame(field_map)])
columns=columns, dataframes=[pd.DataFrame(field_map)]
)
def _get_json_text(key: str, text: bytes, decode: bool) -> str: def _get_json_text(key: str, text: bytes, decode: bool) -> str:
@ -89,10 +87,14 @@ def _get_json_text(key: str, text: bytes, decode: bool) -> str:
def read_from_json( def read_from_json(
key: str, json_text: str, sample_size: int = 100, decode: bool = False key: str, json_text: str, sample_size: int = 100, decode: bool = False
) -> List[pd.DataFrame]: ) -> List:
""" """
Read the json file from the azure container and return a dataframe Read the json file from the azure container and return a dataframe
""" """
# pylint: disable=import-outside-toplevel
from pandas import json_normalize
json_text = _get_json_text(key, json_text, decode) json_text = _get_json_text(key, json_text, decode)
try: try:
data = json.loads(json_text) data = json.loads(json_text)
@ -104,5 +106,5 @@ def read_from_json(
] ]
if isinstance(data, list): if isinstance(data, list):
return [pd.json_normalize(data[:sample_size], sep=COMPLEX_COLUMN_SEPARATOR)] return [json_normalize(data[:sample_size], sep=COMPLEX_COLUMN_SEPARATOR)]
return [pd.json_normalize(data, sep=COMPLEX_COLUMN_SEPARATOR)] return [json_normalize(data, sep=COMPLEX_COLUMN_SEPARATOR)]