Fix #8794 - Separate DL requirements and lazy imports (#8806)

This commit is contained in:
Pere Miquel Brull 2022-11-17 10:11:54 +01:00 committed by GitHub
parent 722d7ef473
commit fe16dea584
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 110 additions and 49 deletions

View File

@ -53,6 +53,11 @@ base_requirements = {
} }
datalake_common = {
"pandas==1.3.5",
"pyarrow==6.0.1",
}
plugins: Dict[str, Set[str]] = { plugins: Dict[str, Set[str]] = {
"airflow": { "airflow": {
"apache-airflow==2.3.3" "apache-airflow==2.3.3"
@ -71,13 +76,15 @@ plugins: Dict[str, Set[str]] = {
"docker": {"python_on_whales==0.34.0"}, "docker": {"python_on_whales==0.34.0"},
"backup": {"boto3~=1.19.12", "azure-identity", "azure-storage-blob"}, "backup": {"boto3~=1.19.12", "azure-identity", "azure-storage-blob"},
"dagster": {"pymysql>=1.0.2", "psycopg2-binary", "GeoAlchemy2", "dagster_graphql"}, "dagster": {"pymysql>=1.0.2", "psycopg2-binary", "GeoAlchemy2", "dagster_graphql"},
"datalake": { "datalake-s3": {
"google-cloud-storage==1.43.0",
"pandas==1.3.5",
"gcsfs==2022.5.0",
"s3fs==0.4.2", "s3fs==0.4.2",
"pyarrow==6.0.1",
"boto3~=1.19.12", "boto3~=1.19.12",
*datalake_common,
},
"datalake-gcs": {
"google-cloud-storage==1.43.0",
"gcsfs==2022.5.0",
*datalake_common,
}, },
"dbt": {"google-cloud", "boto3", "google-cloud-storage==1.43.0"}, "dbt": {"google-cloud", "boto3", "google-cloud-storage==1.43.0"},
"druid": {"pydruid>=0.6.2"}, "druid": {"pydruid>=0.6.2"},

View File

@ -15,8 +15,6 @@ DataLake connector to fetch metadata from a files stored s3, gcs and Hdfs
import traceback import traceback
from typing import Iterable, Optional, Tuple from typing import Iterable, Optional, Tuple
from pandas import DataFrame
from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest
from metadata.generated.schema.api.data.createDatabaseSchema import ( from metadata.generated.schema.api.data.createDatabaseSchema import (
CreateDatabaseSchemaRequest, CreateDatabaseSchemaRequest,
@ -55,19 +53,7 @@ from metadata.ingestion.source.database.database_service import (
from metadata.utils import fqn from metadata.utils import fqn
from metadata.utils.connections import get_connection, test_connection from metadata.utils.connections import get_connection, test_connection
from metadata.utils.filters import filter_by_schema, filter_by_table from metadata.utils.filters import filter_by_schema, filter_by_table
from metadata.utils.gcs_utils import (
read_csv_from_gcs,
read_json_from_gcs,
read_parquet_from_gcs,
read_tsv_from_gcs,
)
from metadata.utils.logger import ingestion_logger from metadata.utils.logger import ingestion_logger
from metadata.utils.s3_utils import (
read_csv_from_s3,
read_json_from_s3,
read_parquet_from_s3,
read_tsv_from_s3,
)
logger = ingestion_logger() logger = ingestion_logger()
@ -294,6 +280,8 @@ class DatalakeSource(DatabaseServiceSource):
From topology. From topology.
Prepare a table request and pass it to the sink Prepare a table request and pass it to the sink
""" """
from pandas import DataFrame # pylint: disable=import-outside-toplevel
table_name, table_type = table_name_and_type table_name, table_type = table_name_and_type
schema_name = self.context.database_schema.name.__root__ schema_name = self.context.database_schema.name.__root__
try: try:
@ -336,6 +324,13 @@ class DatalakeSource(DatabaseServiceSource):
""" """
Fetch GCS Bucket files Fetch GCS Bucket files
""" """
from metadata.utils.gcs_utils import ( # pylint: disable=import-outside-toplevel
read_csv_from_gcs,
read_json_from_gcs,
read_parquet_from_gcs,
read_tsv_from_gcs,
)
try: try:
if key.endswith(".csv"): if key.endswith(".csv"):
return read_csv_from_gcs(key, bucket_name) return read_csv_from_gcs(key, bucket_name)
@ -361,6 +356,13 @@ class DatalakeSource(DatabaseServiceSource):
""" """
Fetch S3 Bucket files Fetch S3 Bucket files
""" """
from metadata.utils.s3_utils import ( # pylint: disable=import-outside-toplevel
read_csv_from_s3,
read_json_from_s3,
read_parquet_from_s3,
read_tsv_from_s3,
)
try: try:
if key.endswith(".csv"): if key.endswith(".csv"):
return read_csv_from_s3(client, key, bucket_name) return read_csv_from_s3(client, key, bucket_name)
@ -386,30 +388,35 @@ class DatalakeSource(DatabaseServiceSource):
""" """
method to process column details method to process column details
""" """
try: cols = []
cols = [] if hasattr(data_frame, "columns"):
if hasattr(data_frame, "columns"): df_columns = list(data_frame.columns)
df_columns = list(data_frame.columns) for column in df_columns:
for column in df_columns: # use String by default
data_type = DataType.STRING.value
try:
if ( if (
hasattr(data_frame[column], "dtypes") hasattr(data_frame[column], "dtypes")
and data_frame[column].dtypes.name in DATALAKE_INT_TYPES and data_frame[column].dtypes.name in DATALAKE_INT_TYPES
and data_frame[column].dtypes.name == "int64"
): ):
if data_frame[column].dtypes.name == "int64": data_type = DataType.INT.value
data_type = DataType.INT.value
else: parsed_string = {
data_type = DataType.STRING.value "dataTypeDisplay": data_type,
parsed_string = {} "dataType": data_type,
parsed_string["dataTypeDisplay"] = data_type "name": column[:64],
parsed_string["dataType"] = data_type }
parsed_string["name"] = column[:64]
parsed_string["dataLength"] = parsed_string.get("dataLength", 1) parsed_string["dataLength"] = parsed_string.get("dataLength", 1)
cols.append(Column(**parsed_string)) cols.append(Column(**parsed_string))
return cols except Exception as exc:
except Exception as exc: logger.debug(traceback.format_exc())
logger.debug(traceback.format_exc()) logger.warning(
logger.warning(f"Unexpected exception parsing column [{column}]: {exc}") f"Unexpected exception parsing column [{column}]: {exc}"
return None )
return cols
def yield_view_lineage(self) -> Optional[Iterable[AddLineageRequest]]: def yield_view_lineage(self) -> Optional[Iterable[AddLineageRequest]]:
yield from [] yield from []

View File

@ -143,7 +143,8 @@ class DataLakeProfilerInterface(ProfilerProtocol):
Args: Args:
column: the column to compute the metrics against column: the column to compute the metrics against
metrics: list of metrics to compute metric: list of metrics to compute
column_results: computed values for the column
Returns: Returns:
dictionary of results dictionary of results
""" """

View File

@ -20,7 +20,6 @@ from enum import Enum
from functools import wraps from functools import wraps
from typing import Any, Dict, Optional, Tuple, TypeVar from typing import Any, Dict, Optional, Tuple, TypeVar
import pandas as pd
from sqlalchemy import Column from sqlalchemy import Column
from sqlalchemy.orm import DeclarativeMeta, Session from sqlalchemy.orm import DeclarativeMeta, Session
@ -44,6 +43,8 @@ def _label(_fn):
@wraps(_fn) @wraps(_fn)
def inner(self, *args, **kwargs): def inner(self, *args, **kwargs):
import pandas as pd # pylint: disable=import-outside-toplevel
res = _fn(self, *args, **kwargs) res = _fn(self, *args, **kwargs)
# If the metric computation returns some value # If the metric computation returns some value
if res is not None: if res is not None:

View File

@ -23,8 +23,6 @@ having the verbosely pass .value all the time...
import traceback import traceback
from typing import Dict, List, Optional, Union from typing import Dict, List, Optional, Union
import pandas as pd
from metadata.ingestion.api.processor import ProfilerProcessorStatus from metadata.ingestion.api.processor import ProfilerProcessorStatus
from metadata.orm_profiler.metrics.registry import Metrics from metadata.orm_profiler.metrics.registry import Metrics
from metadata.utils.dispatch import enum_register from metadata.utils.dispatch import enum_register
@ -47,6 +45,8 @@ def get_table_metrics(
Returns: Returns:
dictionnary of results dictionnary of results
""" """
import pandas as pd # pylint: disable=import-outside-toplevel
try: try:
row = [] row = []
for metric in metrics: for metric in metrics:
@ -88,6 +88,8 @@ def get_static_metrics(
Returns: Returns:
dictionnary of results dictionnary of results
""" """
import pandas as pd # pylint: disable=import-outside-toplevel
try: try:
row = [] row = []
for metric in metrics: for metric in metrics:

View File

@ -14,7 +14,6 @@ MAX_LENGTH Metric definition
""" """
# pylint: disable=duplicate-code # pylint: disable=duplicate-code
import pandas as pd
from sqlalchemy import column, func from sqlalchemy import column, func
from metadata.orm_profiler.metrics.core import StaticMetric, _label from metadata.orm_profiler.metrics.core import StaticMetric, _label
@ -55,6 +54,8 @@ class MaxLength(StaticMetric):
@_label @_label
def dl_fn(self, data_frame=None): def dl_fn(self, data_frame=None):
import pandas as pd # pylint: disable=import-outside-toplevel
if is_concatenable(self.col.datatype): if is_concatenable(self.col.datatype):
return ( return (
pd.DataFrame( pd.DataFrame(

View File

@ -16,7 +16,6 @@ AVG Metric definition
import traceback import traceback
import pandas as pd
from sqlalchemy import column, func from sqlalchemy import column, func
from sqlalchemy.ext.compiler import compiles from sqlalchemy.ext.compiler import compiles
from sqlalchemy.sql.functions import GenericFunction from sqlalchemy.sql.functions import GenericFunction
@ -81,6 +80,8 @@ class Mean(StaticMetric):
""" """
Data lake function to calculate mean Data lake function to calculate mean
""" """
import pandas as pd # pylint: disable=import-outside-toplevel
try: try:
if is_quantifiable(self.col.datatype): if is_quantifiable(self.col.datatype):
return data_frame[self.col.name].mean() return data_frame[self.col.name].mean()

View File

@ -14,7 +14,6 @@ MIN_LENGTH Metric definition
""" """
# pylint: disable=duplicate-code # pylint: disable=duplicate-code
import pandas as pd
from sqlalchemy import column, func from sqlalchemy import column, func
from metadata.orm_profiler.metrics.core import StaticMetric, _label from metadata.orm_profiler.metrics.core import StaticMetric, _label
@ -55,6 +54,8 @@ class MinLength(StaticMetric):
@_label @_label
def dl_fn(self, data_frame=None): def dl_fn(self, data_frame=None):
import pandas as pd # pylint: disable=import-outside-toplevel
if is_concatenable(self.col.datatype): if is_concatenable(self.col.datatype):
return ( return (
pd.DataFrame( pd.DataFrame(

View File

@ -14,7 +14,6 @@ Custom types' registry for easy access
without having an import mess without having an import mess
""" """
import sqlalchemy import sqlalchemy
from pandas.core.dtypes.common import is_numeric_dtype, is_string_dtype
from sqlalchemy import Date, DateTime, Integer, Numeric, Time from sqlalchemy import Date, DateTime, Integer, Numeric, Time
from sqlalchemy.sql.sqltypes import Concatenable, Enum from sqlalchemy.sql.sqltypes import Concatenable, Enum
@ -124,6 +123,9 @@ def is_quantifiable(_type) -> bool:
""" """
Check if sqlalchemy _type is either integer or numeric Check if sqlalchemy _type is either integer or numeric
""" """
from pandas.core.dtypes.common import ( # pylint: disable=import-outside-toplevel
is_numeric_dtype,
)
return is_numeric(_type) or is_integer(_type) or is_numeric_dtype(_type) return is_numeric(_type) or is_integer(_type) or is_numeric_dtype(_type)
@ -133,4 +135,8 @@ def is_concatenable(_type) -> bool:
Check if sqlalchemy _type is derived from Concatenable Check if sqlalchemy _type is derived from Concatenable
e.g., strings or text e.g., strings or text
""" """
from pandas.core.dtypes.common import ( # pylint: disable=import-outside-toplevel
is_string_dtype,
)
return issubclass(_type.__class__, Concatenable) or is_string_dtype(_type) return issubclass(_type.__class__, Concatenable) or is_string_dtype(_type)

View File

@ -14,8 +14,6 @@ for the profiler
""" """
from typing import Any, Dict, Optional from typing import Any, Dict, Optional
from pandas import DataFrame, notnull
from metadata.generated.schema.entity.data.table import TableData from metadata.generated.schema.entity.data.table import TableData
from metadata.ingestion.source.database.datalake import DatalakeSource from metadata.ingestion.source.database.datalake import DatalakeSource
@ -45,6 +43,8 @@ class DatalakeSampler:
self._sample_rows = None self._sample_rows = None
def get_col_row(self, data_frame): def get_col_row(self, data_frame):
from pandas import DataFrame, notnull # pylint: disable=import-outside-toplevel
cols = [] cols = []
chunk = None chunk = None
if isinstance(data_frame, DataFrame): if isinstance(data_frame, DataFrame):
@ -63,6 +63,8 @@ class DatalakeSampler:
return cols, rows, chunk return cols, rows, chunk
def fetch_dl_sample_data(self) -> TableData: def fetch_dl_sample_data(self) -> TableData:
from pandas import DataFrame # pylint: disable=import-outside-toplevel
cols, rows = self.get_col_row( cols, rows = self.get_col_row(
data_frame=self.table[0] data_frame=self.table[0]
if not isinstance(self.table, DataFrame) if not isinstance(self.table, DataFrame)

View File

@ -46,7 +46,23 @@ custom Airflow plugins to handle the workflow deployment.
### Python Requirements ### Python Requirements
To run the Datalake ingestion, you will need to install: If running OpenMetadata version greater than 0.13, you will need to install the Datalake ingestion for GCS or S3:
#### S3 installation
```bash
pip3 install "openmetadata-ingestion[datalake-s3]"
```
#### GCS installation
```bash
pip3 install "openmetadata-ingestion[datalake-gcs]"
```
#### If version <0.13
You will be installing the requirements together for S3 and GCS
```bash ```bash
pip3 install "openmetadata-ingestion[datalake]" pip3 install "openmetadata-ingestion[datalake]"

View File

@ -46,7 +46,23 @@ custom Airflow plugins to handle the workflow deployment.
### Python Requirements ### Python Requirements
To run the Datalake ingestion, you will need to install: If running OpenMetadata version greater than 0.13, you will need to install the Datalake ingestion for GCS or S3:
#### S3 installation
```bash
pip3 install "openmetadata-ingestion[datalake-s3]"
```
#### GCS installation
```bash
pip3 install "openmetadata-ingestion[datalake-gcs]"
```
#### If version <0.13
You will be installing the requirements together for S3 and GCS
```bash ```bash
pip3 install "openmetadata-ingestion[datalake]" pip3 install "openmetadata-ingestion[datalake]"