From fe16dea584dafc639fc527f9f1f9b463e2eeeaf7 Mon Sep 17 00:00:00 2001 From: Pere Miquel Brull Date: Thu, 17 Nov 2022 10:11:54 +0100 Subject: [PATCH] Fix #8794 - Separate DL requirements and lazy imports (#8806) --- ingestion/setup.py | 17 +++-- .../ingestion/source/database/datalake.py | 71 ++++++++++--------- .../datalake/datalake_profiler_interface.py | 3 +- .../src/metadata/orm_profiler/metrics/core.py | 3 +- .../datalake_metrics_computation_registry.py | 6 +- .../orm_profiler/metrics/static/max_length.py | 3 +- .../orm_profiler/metrics/static/mean.py | 3 +- .../orm_profiler/metrics/static/min_length.py | 3 +- .../src/metadata/orm_profiler/orm/registry.py | 8 ++- .../orm_profiler/profiler/datalake_sampler.py | 6 +- .../connectors/database/datalake/airflow.md | 18 ++++- .../connectors/database/datalake/cli.md | 18 ++++- 12 files changed, 110 insertions(+), 49 deletions(-) diff --git a/ingestion/setup.py b/ingestion/setup.py index 530e8709c3b..eab33e582f1 100644 --- a/ingestion/setup.py +++ b/ingestion/setup.py @@ -53,6 +53,11 @@ base_requirements = { } +datalake_common = { + "pandas==1.3.5", + "pyarrow==6.0.1", +} + plugins: Dict[str, Set[str]] = { "airflow": { "apache-airflow==2.3.3" @@ -71,13 +76,15 @@ plugins: Dict[str, Set[str]] = { "docker": {"python_on_whales==0.34.0"}, "backup": {"boto3~=1.19.12", "azure-identity", "azure-storage-blob"}, "dagster": {"pymysql>=1.0.2", "psycopg2-binary", "GeoAlchemy2", "dagster_graphql"}, - "datalake": { - "google-cloud-storage==1.43.0", - "pandas==1.3.5", - "gcsfs==2022.5.0", + "datalake-s3": { "s3fs==0.4.2", - "pyarrow==6.0.1", "boto3~=1.19.12", + *datalake_common, + }, + "datalake-gcs": { + "google-cloud-storage==1.43.0", + "gcsfs==2022.5.0", + *datalake_common, }, "dbt": {"google-cloud", "boto3", "google-cloud-storage==1.43.0"}, "druid": {"pydruid>=0.6.2"}, diff --git a/ingestion/src/metadata/ingestion/source/database/datalake.py b/ingestion/src/metadata/ingestion/source/database/datalake.py index a64a313b60c..898cda37f1f 100644 --- a/ingestion/src/metadata/ingestion/source/database/datalake.py +++ b/ingestion/src/metadata/ingestion/source/database/datalake.py @@ -15,8 +15,6 @@ DataLake connector to fetch metadata from a files stored s3, gcs and Hdfs import traceback from typing import Iterable, Optional, Tuple -from pandas import DataFrame - from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest from metadata.generated.schema.api.data.createDatabaseSchema import ( CreateDatabaseSchemaRequest, @@ -55,19 +53,7 @@ from metadata.ingestion.source.database.database_service import ( from metadata.utils import fqn from metadata.utils.connections import get_connection, test_connection from metadata.utils.filters import filter_by_schema, filter_by_table -from metadata.utils.gcs_utils import ( - read_csv_from_gcs, - read_json_from_gcs, - read_parquet_from_gcs, - read_tsv_from_gcs, -) from metadata.utils.logger import ingestion_logger -from metadata.utils.s3_utils import ( - read_csv_from_s3, - read_json_from_s3, - read_parquet_from_s3, - read_tsv_from_s3, -) logger = ingestion_logger() @@ -294,6 +280,8 @@ class DatalakeSource(DatabaseServiceSource): From topology. Prepare a table request and pass it to the sink """ + from pandas import DataFrame # pylint: disable=import-outside-toplevel + table_name, table_type = table_name_and_type schema_name = self.context.database_schema.name.__root__ try: @@ -336,6 +324,13 @@ class DatalakeSource(DatabaseServiceSource): """ Fetch GCS Bucket files """ + from metadata.utils.gcs_utils import ( # pylint: disable=import-outside-toplevel + read_csv_from_gcs, + read_json_from_gcs, + read_parquet_from_gcs, + read_tsv_from_gcs, + ) + try: if key.endswith(".csv"): return read_csv_from_gcs(key, bucket_name) @@ -361,6 +356,13 @@ class DatalakeSource(DatabaseServiceSource): """ Fetch S3 Bucket files """ + from metadata.utils.s3_utils import ( # pylint: disable=import-outside-toplevel + read_csv_from_s3, + read_json_from_s3, + read_parquet_from_s3, + read_tsv_from_s3, + ) + try: if key.endswith(".csv"): return read_csv_from_s3(client, key, bucket_name) @@ -386,30 +388,35 @@ class DatalakeSource(DatabaseServiceSource): """ method to process column details """ - try: - cols = [] - if hasattr(data_frame, "columns"): - df_columns = list(data_frame.columns) - for column in df_columns: + cols = [] + if hasattr(data_frame, "columns"): + df_columns = list(data_frame.columns) + for column in df_columns: + # use String by default + data_type = DataType.STRING.value + + try: if ( hasattr(data_frame[column], "dtypes") and data_frame[column].dtypes.name in DATALAKE_INT_TYPES + and data_frame[column].dtypes.name == "int64" ): - if data_frame[column].dtypes.name == "int64": - data_type = DataType.INT.value - else: - data_type = DataType.STRING.value - parsed_string = {} - parsed_string["dataTypeDisplay"] = data_type - parsed_string["dataType"] = data_type - parsed_string["name"] = column[:64] + data_type = DataType.INT.value + + parsed_string = { + "dataTypeDisplay": data_type, + "dataType": data_type, + "name": column[:64], + } parsed_string["dataLength"] = parsed_string.get("dataLength", 1) cols.append(Column(**parsed_string)) - return cols - except Exception as exc: - logger.debug(traceback.format_exc()) - logger.warning(f"Unexpected exception parsing column [{column}]: {exc}") - return None + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.warning( + f"Unexpected exception parsing column [{column}]: {exc}" + ) + + return cols def yield_view_lineage(self) -> Optional[Iterable[AddLineageRequest]]: yield from [] diff --git a/ingestion/src/metadata/interfaces/datalake/datalake_profiler_interface.py b/ingestion/src/metadata/interfaces/datalake/datalake_profiler_interface.py index 8e676bdbcee..aa1d2e9b23b 100644 --- a/ingestion/src/metadata/interfaces/datalake/datalake_profiler_interface.py +++ b/ingestion/src/metadata/interfaces/datalake/datalake_profiler_interface.py @@ -143,7 +143,8 @@ class DataLakeProfilerInterface(ProfilerProtocol): Args: column: the column to compute the metrics against - metrics: list of metrics to compute + metric: list of metrics to compute + column_results: computed values for the column Returns: dictionary of results """ diff --git a/ingestion/src/metadata/orm_profiler/metrics/core.py b/ingestion/src/metadata/orm_profiler/metrics/core.py index f3f2e0cf93a..acc23ac593c 100644 --- a/ingestion/src/metadata/orm_profiler/metrics/core.py +++ b/ingestion/src/metadata/orm_profiler/metrics/core.py @@ -20,7 +20,6 @@ from enum import Enum from functools import wraps from typing import Any, Dict, Optional, Tuple, TypeVar -import pandas as pd from sqlalchemy import Column from sqlalchemy.orm import DeclarativeMeta, Session @@ -44,6 +43,8 @@ def _label(_fn): @wraps(_fn) def inner(self, *args, **kwargs): + import pandas as pd # pylint: disable=import-outside-toplevel + res = _fn(self, *args, **kwargs) # If the metric computation returns some value if res is not None: diff --git a/ingestion/src/metadata/orm_profiler/metrics/datalake_metrics_computation_registry.py b/ingestion/src/metadata/orm_profiler/metrics/datalake_metrics_computation_registry.py index a7e37767521..e40fb53277e 100644 --- a/ingestion/src/metadata/orm_profiler/metrics/datalake_metrics_computation_registry.py +++ b/ingestion/src/metadata/orm_profiler/metrics/datalake_metrics_computation_registry.py @@ -23,8 +23,6 @@ having the verbosely pass .value all the time... import traceback from typing import Dict, List, Optional, Union -import pandas as pd - from metadata.ingestion.api.processor import ProfilerProcessorStatus from metadata.orm_profiler.metrics.registry import Metrics from metadata.utils.dispatch import enum_register @@ -47,6 +45,8 @@ def get_table_metrics( Returns: dictionnary of results """ + import pandas as pd # pylint: disable=import-outside-toplevel + try: row = [] for metric in metrics: @@ -88,6 +88,8 @@ def get_static_metrics( Returns: dictionnary of results """ + import pandas as pd # pylint: disable=import-outside-toplevel + try: row = [] for metric in metrics: diff --git a/ingestion/src/metadata/orm_profiler/metrics/static/max_length.py b/ingestion/src/metadata/orm_profiler/metrics/static/max_length.py index 30d0da3a44f..ad3e16aae6f 100644 --- a/ingestion/src/metadata/orm_profiler/metrics/static/max_length.py +++ b/ingestion/src/metadata/orm_profiler/metrics/static/max_length.py @@ -14,7 +14,6 @@ MAX_LENGTH Metric definition """ # pylint: disable=duplicate-code -import pandas as pd from sqlalchemy import column, func from metadata.orm_profiler.metrics.core import StaticMetric, _label @@ -55,6 +54,8 @@ class MaxLength(StaticMetric): @_label def dl_fn(self, data_frame=None): + import pandas as pd # pylint: disable=import-outside-toplevel + if is_concatenable(self.col.datatype): return ( pd.DataFrame( diff --git a/ingestion/src/metadata/orm_profiler/metrics/static/mean.py b/ingestion/src/metadata/orm_profiler/metrics/static/mean.py index a6e5a9ca8cd..75672ea7065 100644 --- a/ingestion/src/metadata/orm_profiler/metrics/static/mean.py +++ b/ingestion/src/metadata/orm_profiler/metrics/static/mean.py @@ -16,7 +16,6 @@ AVG Metric definition import traceback -import pandas as pd from sqlalchemy import column, func from sqlalchemy.ext.compiler import compiles from sqlalchemy.sql.functions import GenericFunction @@ -81,6 +80,8 @@ class Mean(StaticMetric): """ Data lake function to calculate mean """ + import pandas as pd # pylint: disable=import-outside-toplevel + try: if is_quantifiable(self.col.datatype): return data_frame[self.col.name].mean() diff --git a/ingestion/src/metadata/orm_profiler/metrics/static/min_length.py b/ingestion/src/metadata/orm_profiler/metrics/static/min_length.py index b0d105207a6..625c60dd7e2 100644 --- a/ingestion/src/metadata/orm_profiler/metrics/static/min_length.py +++ b/ingestion/src/metadata/orm_profiler/metrics/static/min_length.py @@ -14,7 +14,6 @@ MIN_LENGTH Metric definition """ # pylint: disable=duplicate-code -import pandas as pd from sqlalchemy import column, func from metadata.orm_profiler.metrics.core import StaticMetric, _label @@ -55,6 +54,8 @@ class MinLength(StaticMetric): @_label def dl_fn(self, data_frame=None): + import pandas as pd # pylint: disable=import-outside-toplevel + if is_concatenable(self.col.datatype): return ( pd.DataFrame( diff --git a/ingestion/src/metadata/orm_profiler/orm/registry.py b/ingestion/src/metadata/orm_profiler/orm/registry.py index 976db6382b0..a2f239cbce5 100644 --- a/ingestion/src/metadata/orm_profiler/orm/registry.py +++ b/ingestion/src/metadata/orm_profiler/orm/registry.py @@ -14,7 +14,6 @@ Custom types' registry for easy access without having an import mess """ import sqlalchemy -from pandas.core.dtypes.common import is_numeric_dtype, is_string_dtype from sqlalchemy import Date, DateTime, Integer, Numeric, Time from sqlalchemy.sql.sqltypes import Concatenable, Enum @@ -124,6 +123,9 @@ def is_quantifiable(_type) -> bool: """ Check if sqlalchemy _type is either integer or numeric """ + from pandas.core.dtypes.common import ( # pylint: disable=import-outside-toplevel + is_numeric_dtype, + ) return is_numeric(_type) or is_integer(_type) or is_numeric_dtype(_type) @@ -133,4 +135,8 @@ def is_concatenable(_type) -> bool: Check if sqlalchemy _type is derived from Concatenable e.g., strings or text """ + from pandas.core.dtypes.common import ( # pylint: disable=import-outside-toplevel + is_string_dtype, + ) + return issubclass(_type.__class__, Concatenable) or is_string_dtype(_type) diff --git a/ingestion/src/metadata/orm_profiler/profiler/datalake_sampler.py b/ingestion/src/metadata/orm_profiler/profiler/datalake_sampler.py index d87de031fa2..cf3d31551bb 100644 --- a/ingestion/src/metadata/orm_profiler/profiler/datalake_sampler.py +++ b/ingestion/src/metadata/orm_profiler/profiler/datalake_sampler.py @@ -14,8 +14,6 @@ for the profiler """ from typing import Any, Dict, Optional -from pandas import DataFrame, notnull - from metadata.generated.schema.entity.data.table import TableData from metadata.ingestion.source.database.datalake import DatalakeSource @@ -45,6 +43,8 @@ class DatalakeSampler: self._sample_rows = None def get_col_row(self, data_frame): + from pandas import DataFrame, notnull # pylint: disable=import-outside-toplevel + cols = [] chunk = None if isinstance(data_frame, DataFrame): @@ -63,6 +63,8 @@ class DatalakeSampler: return cols, rows, chunk def fetch_dl_sample_data(self) -> TableData: + from pandas import DataFrame # pylint: disable=import-outside-toplevel + cols, rows = self.get_col_row( data_frame=self.table[0] if not isinstance(self.table, DataFrame) diff --git a/openmetadata-docs/content/connectors/database/datalake/airflow.md b/openmetadata-docs/content/connectors/database/datalake/airflow.md index d8a8949b9ec..af0feeacdd6 100644 --- a/openmetadata-docs/content/connectors/database/datalake/airflow.md +++ b/openmetadata-docs/content/connectors/database/datalake/airflow.md @@ -46,7 +46,23 @@ custom Airflow plugins to handle the workflow deployment. ### Python Requirements -To run the Datalake ingestion, you will need to install: +If running OpenMetadata version greater than 0.13, you will need to install the Datalake ingestion for GCS or S3: + +#### S3 installation + +```bash +pip3 install "openmetadata-ingestion[datalake-s3]" +``` + +#### GCS installation + +```bash +pip3 install "openmetadata-ingestion[datalake-gcs]" +``` + +#### If version <0.13 + +You will be installing the requirements together for S3 and GCS ```bash pip3 install "openmetadata-ingestion[datalake]" diff --git a/openmetadata-docs/content/connectors/database/datalake/cli.md b/openmetadata-docs/content/connectors/database/datalake/cli.md index c7b4cf553f7..543b6df5915 100644 --- a/openmetadata-docs/content/connectors/database/datalake/cli.md +++ b/openmetadata-docs/content/connectors/database/datalake/cli.md @@ -46,7 +46,23 @@ custom Airflow plugins to handle the workflow deployment. ### Python Requirements -To run the Datalake ingestion, you will need to install: +If running OpenMetadata version greater than 0.13, you will need to install the Datalake ingestion for GCS or S3: + +#### S3 installation + +```bash +pip3 install "openmetadata-ingestion[datalake-s3]" +``` + +#### GCS installation + +```bash +pip3 install "openmetadata-ingestion[datalake-gcs]" +``` + +#### If version <0.13 + +You will be installing the requirements together for S3 and GCS ```bash pip3 install "openmetadata-ingestion[datalake]"