diff --git a/docker/development/docker-compose.yml b/docker/development/docker-compose.yml index 5f5ef67c19a..a281ceac954 100644 --- a/docker/development/docker-compose.yml +++ b/docker/development/docker-compose.yml @@ -15,6 +15,7 @@ volumes: ingestion-volume-dags: ingestion-volume-tmp: es-data: + db-data: services: mysql: build: @@ -39,7 +40,7 @@ services: timeout: 10s retries: 10 volumes: - - ./docker-volume/db-data:/var/lib/mysql + - db-data:/var/lib/mysql elasticsearch: image: docker.elastic.co/elasticsearch/elasticsearch:8.10.2 diff --git a/ingestion/src/metadata/ingestion/source/dashboard/looker/metadata.py b/ingestion/src/metadata/ingestion/source/dashboard/looker/metadata.py index fdf8fa10c5c..776e753ed08 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/looker/metadata.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/looker/metadata.py @@ -215,8 +215,8 @@ class LookerSource(DashboardServiceSource): file_path = f"{self._main_lookml_repo.path}/{path}" if not os.path.isfile(file_path): return None - with open(file_path, "r", encoding="utf-8") as f: - manifest = LookMLManifest.parse_obj(lkml.load(f)) + with open(file_path, "r", encoding="utf-8") as fle: + manifest = LookMLManifest.parse_obj(lkml.load(fle)) if manifest and manifest.remote_dependency: remote_name = manifest.remote_dependency["name"] remote_git_url = manifest.remote_dependency["url"] diff --git a/ingestion/src/metadata/ingestion/source/dashboard/looker/utils.py b/ingestion/src/metadata/ingestion/source/dashboard/looker/utils.py index 0b09968284f..a2e1656992d 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/looker/utils.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/looker/utils.py @@ -64,5 +64,5 @@ def _clone_repo( Repo.clone_from(url, path) logger.info(f"repo {repo_name} cloned to {path}") - except Exception as e: - logger.error(f"GitHubCloneReader::_clone: ERROR {e} ") + except Exception as exc: + logger.error(f"GitHubCloneReader::_clone: ERROR {exc} ") diff --git a/ingestion/src/metadata/ingestion/source/database/common_nosql_source.py b/ingestion/src/metadata/ingestion/source/database/common_nosql_source.py index 1076857e431..ff4e3b1b3c6 100644 --- a/ingestion/src/metadata/ingestion/source/database/common_nosql_source.py +++ b/ingestion/src/metadata/ingestion/source/database/common_nosql_source.py @@ -16,8 +16,6 @@ import traceback from abc import ABC, abstractmethod from typing import Any, Dict, Iterable, List, Optional, Tuple, Union -from pandas import json_normalize - from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest from metadata.generated.schema.api.data.createDatabaseSchema import ( CreateDatabaseSchemaRequest, @@ -45,7 +43,7 @@ from metadata.ingestion.source.database.database_service import ( QueryByProcedure, ) from metadata.utils import fqn -from metadata.utils.constants import COMPLEX_COLUMN_SEPARATOR, DEFAULT_DATABASE +from metadata.utils.constants import DEFAULT_DATABASE from metadata.utils.datalake.datalake_utils import get_columns from metadata.utils.filters import filter_by_schema, filter_by_table from metadata.utils.logger import ingestion_logger @@ -205,11 +203,13 @@ class CommonNoSQLSource(DatabaseServiceSource, ABC): From topology. Prepare a table request and pass it to the sink """ + import pandas as pd # pylint: disable=import-outside-toplevel + table_name, table_type = table_name_and_type schema_name = self.context.database_schema.name.__root__ try: data = self.get_table_columns_dict(schema_name, table_name) - df = json_normalize(list(data), sep=COMPLEX_COLUMN_SEPARATOR) + df = pd.DataFrame.from_records(list(data)) columns = get_columns(df) table_request = CreateTableRequest( name=table_name, diff --git a/ingestion/src/metadata/ingestion/source/database/databricks/unity_catalog/metadata.py b/ingestion/src/metadata/ingestion/source/database/databricks/unity_catalog/metadata.py index b0e5feceec5..1091d0a04bb 100644 --- a/ingestion/src/metadata/ingestion/source/database/databricks/unity_catalog/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/databricks/unity_catalog/metadata.py @@ -75,9 +75,11 @@ logger = ingestion_logger() # pylint: disable=not-callable @classmethod -def from_dict(cls, d: Dict[str, any]) -> "TableConstraintList": +def from_dict(cls, dct: Dict[str, Any]) -> "TableConstraintList": return cls( - table_constraints=[DBTableConstraint.from_dict(constraint) for constraint in d] + table_constraints=[ + DBTableConstraint.from_dict(constraint) for constraint in dct + ] ) diff --git a/ingestion/src/metadata/ingestion/source/database/mssql/utils.py b/ingestion/src/metadata/ingestion/source/database/mssql/utils.py index 1d0b24df9c1..3664126425f 100644 --- a/ingestion/src/metadata/ingestion/source/database/mssql/utils.py +++ b/ingestion/src/metadata/ingestion/source/database/mssql/utils.py @@ -158,7 +158,7 @@ def get_columns( # tds_version 4.2 does not support NVARCHAR(MAX) computed_definition = sql.cast(computed_cols.c.definition, NVARCHAR(4000)) - s = ( + sql_qry = ( sql.select( columns, computed_definition, @@ -173,10 +173,10 @@ def get_columns( .order_by(columns.c.ordinal_position) ) - c = connection.execution_options(future_result=True).execute(s) + cursr = connection.execution_options(future_result=True).execute(sql_qry) cols = [] - for row in c.mappings(): + for row in cursr.mappings(): name = row[columns.c.column_name] type_ = row[columns.c.data_type] nullable = row[columns.c.is_nullable] == "YES" diff --git a/ingestion/src/metadata/profiler/interface/pandas/profiler_interface.py b/ingestion/src/metadata/profiler/interface/pandas/profiler_interface.py index c36f7ab8fb0..8f879cbcdbc 100644 --- a/ingestion/src/metadata/profiler/interface/pandas/profiler_interface.py +++ b/ingestion/src/metadata/profiler/interface/pandas/profiler_interface.py @@ -262,7 +262,7 @@ class PandasProfilerInterface(ProfilerInterface, PandasInterfaceMixin): self.status.scanned(table.name.__root__) return row, column, metric_type.value - def fetch_sample_data(self, table) -> TableData: + def fetch_sample_data(self, table, columns: SQALikeColumn) -> TableData: """Fetch sample data from database Args: @@ -271,7 +271,8 @@ class PandasProfilerInterface(ProfilerInterface, PandasInterfaceMixin): Returns: TableData: sample table data """ - return self.sampler.fetch_sample_data() + sampler = self._get_sampler() + return sampler.fetch_sample_data(columns) def get_composed_metrics( self, column: Column, metric: Metrics, column_results: Dict diff --git a/ingestion/src/metadata/profiler/interface/profiler_interface.py b/ingestion/src/metadata/profiler/interface/profiler_interface.py index b7c1755c98e..30be9a4277b 100644 --- a/ingestion/src/metadata/profiler/interface/profiler_interface.py +++ b/ingestion/src/metadata/profiler/interface/profiler_interface.py @@ -325,7 +325,7 @@ class ProfilerInterface(ABC): raise NotImplementedError @abstractmethod - def fetch_sample_data(self, table) -> TableData: + def fetch_sample_data(self, table, columns: List[Column]) -> TableData: """run profiler metrics""" raise NotImplementedError @@ -333,3 +333,8 @@ class ProfilerInterface(ABC): def close(self): """Clean up profiler interface""" raise NotImplementedError + + @abstractmethod + def get_columns(self): + """get columns""" + raise NotImplementedError diff --git a/ingestion/src/metadata/profiler/interface/sqlalchemy/profiler_interface.py b/ingestion/src/metadata/profiler/interface/sqlalchemy/profiler_interface.py index 5fe9fb442e4..feff51c9cb2 100644 --- a/ingestion/src/metadata/profiler/interface/sqlalchemy/profiler_interface.py +++ b/ingestion/src/metadata/profiler/interface/sqlalchemy/profiler_interface.py @@ -22,7 +22,7 @@ from collections import defaultdict from datetime import datetime, timezone from typing import Dict, List -from sqlalchemy import Column +from sqlalchemy import Column, inspect from sqlalchemy.exc import ProgrammingError from sqlalchemy.orm import scoped_session @@ -454,7 +454,7 @@ class SQAProfilerInterface(ProfilerInterface, SQAInterfaceMixin): return profile_results - def fetch_sample_data(self, table) -> TableData: + def fetch_sample_data(self, table, columns) -> TableData: """Fetch sample data from database Args: @@ -467,7 +467,7 @@ class SQAProfilerInterface(ProfilerInterface, SQAInterfaceMixin): table=table, ) - return sampler.fetch_sample_data() + return sampler.fetch_sample_data(columns) def get_composed_metrics( self, column: Column, metric: Metrics, column_results: Dict @@ -519,6 +519,10 @@ class SQAProfilerInterface(ProfilerInterface, SQAInterfaceMixin): f"Skipping metrics due to {exc} for {runner.table.__tablename__}.{column.name}" ) + def get_columns(self): + """get columns from entity""" + return list(inspect(self.table).c) + def close(self): """Clean up session""" self.session.close() diff --git a/ingestion/src/metadata/profiler/metrics/static/distinct_count.py b/ingestion/src/metadata/profiler/metrics/static/distinct_count.py index dae0d737c8d..fb896f8d119 100644 --- a/ingestion/src/metadata/profiler/metrics/static/distinct_count.py +++ b/ingestion/src/metadata/profiler/metrics/static/distinct_count.py @@ -19,7 +19,6 @@ from sqlalchemy import column, distinct, func from metadata.profiler.metrics.core import StaticMetric, _label from metadata.profiler.orm.functions.count import CountFn -from metadata.profiler.orm.registry import is_quantifiable from metadata.utils.logger import profiler_logger logger = profiler_logger() @@ -54,17 +53,10 @@ class DistinctCount(StaticMetric): # pylint: disable=import-outside-toplevel from collections import Counter - import pandas as pd - try: counter = Counter() for df in dfs: - df_col = df[self.col.name].dropna() - df_col_value = ( - pd.to_numeric(df_col).to_list() - if is_quantifiable(self.col.type) - else df_col.to_list() - ) + df_col_value = df[self.col.name].dropna().to_list() counter.update(df_col_value) return len(counter.keys()) except Exception as err: diff --git a/ingestion/src/metadata/profiler/metrics/static/stddev.py b/ingestion/src/metadata/profiler/metrics/static/stddev.py index 0687345032f..e95fe218574 100644 --- a/ingestion/src/metadata/profiler/metrics/static/stddev.py +++ b/ingestion/src/metadata/profiler/metrics/static/stddev.py @@ -106,10 +106,10 @@ class StdDev(StaticMetric): if is_quantifiable(self.col.type): try: - merged_df = pd.to_numeric(pd.concat(df[self.col.name] for df in dfs)) - if len(merged_df) > 1: - return merged_df.std() - return 0 + df = pd.to_numeric(pd.concat(df[self.col.name] for df in dfs)) + if not df.empty: + return df.std() + return None except MemoryError: logger.error( f"Unable to compute Standard Deviation for {self.col.name} due to memory constraints." diff --git a/ingestion/src/metadata/profiler/metrics/static/sum.py b/ingestion/src/metadata/profiler/metrics/static/sum.py index ad2bf42b730..dec3bbbb4b9 100644 --- a/ingestion/src/metadata/profiler/metrics/static/sum.py +++ b/ingestion/src/metadata/profiler/metrics/static/sum.py @@ -48,9 +48,7 @@ class Sum(StaticMetric): def df_fn(self, dfs=None): """pandas function""" - # pylint: disable=import-outside-toplevel - import pandas as pd if is_quantifiable(self.col.type): - return sum(pd.to_numeric(df[self.col.name]).sum() for df in dfs) + return sum(df[self.col.name].sum() for df in dfs) return None diff --git a/ingestion/src/metadata/profiler/metrics/window/first_quartile.py b/ingestion/src/metadata/profiler/metrics/window/first_quartile.py index cdf3b364a70..b4be3fc11e6 100644 --- a/ingestion/src/metadata/profiler/metrics/window/first_quartile.py +++ b/ingestion/src/metadata/profiler/metrics/window/first_quartile.py @@ -83,7 +83,7 @@ class FirstQuartile(StaticMetric, PercentilMixin): # the entire set. Median of Medians could be used # though it would required set to be sorted before hand try: - df = pd.to_numeric(pd.concat([df[self.col.name] for df in dfs])) + df = pd.concat([df[self.col.name] for df in dfs]) except MemoryError: logger.error( f"Unable to compute Median for {self.col.name} due to memory constraints." diff --git a/ingestion/src/metadata/profiler/metrics/window/median.py b/ingestion/src/metadata/profiler/metrics/window/median.py index 63ad9ab2e3a..ece4f4811dc 100644 --- a/ingestion/src/metadata/profiler/metrics/window/median.py +++ b/ingestion/src/metadata/profiler/metrics/window/median.py @@ -82,7 +82,7 @@ class Median(StaticMetric, PercentilMixin): # the entire set. Median of Medians could be used # though it would required set to be sorted before hand try: - df = pd.to_numeric(pd.concat([df[self.col.name] for df in dfs])) + df = pd.concat([df[self.col.name] for df in dfs]) except MemoryError: logger.error( f"Unable to compute Median for {self.col.name} due to memory constraints." diff --git a/ingestion/src/metadata/profiler/metrics/window/third_quartile.py b/ingestion/src/metadata/profiler/metrics/window/third_quartile.py index 8f341e371d4..4ee305b62eb 100644 --- a/ingestion/src/metadata/profiler/metrics/window/third_quartile.py +++ b/ingestion/src/metadata/profiler/metrics/window/third_quartile.py @@ -83,7 +83,7 @@ class ThirdQuartile(StaticMetric, PercentilMixin): # the entire set. Median of Medians could be used # though it would required set to be sorted before hand try: - df = pd.to_numeric(pd.concat([df[self.col.name] for df in dfs])) + df = pd.concat([df[self.col.name] for df in dfs]) except MemoryError: logger.error( f"Unable to compute Median for {self.col.name} due to memory constraints." diff --git a/ingestion/src/metadata/profiler/processor/core.py b/ingestion/src/metadata/profiler/processor/core.py index 7eee38b7db4..6f7d5026da9 100644 --- a/ingestion/src/metadata/profiler/processor/core.py +++ b/ingestion/src/metadata/profiler/processor/core.py @@ -494,7 +494,7 @@ class Profiler(Generic[TMetric]): "Fetching sample data for " f"{self.profiler_interface.table_entity.fullyQualifiedName.__root__}..." # type: ignore ) - return self.profiler_interface.fetch_sample_data(self.table) + return self.profiler_interface.fetch_sample_data(self.table, self.columns) except Exception as err: logger.debug(traceback.format_exc()) logger.warning(f"Error fetching sample data: {err}") diff --git a/ingestion/src/metadata/profiler/processor/sampler/pandas/sampler.py b/ingestion/src/metadata/profiler/processor/sampler/pandas/sampler.py index 87d2a469cec..6de115ced1f 100644 --- a/ingestion/src/metadata/profiler/processor/sampler/pandas/sampler.py +++ b/ingestion/src/metadata/profiler/processor/sampler/pandas/sampler.py @@ -12,10 +12,9 @@ Helper module to handle data sampling for the profiler """ -import json import math import random -from typing import cast +from typing import List, Optional, cast from metadata.data_quality.validations.table.pandas.tableRowInsertedCountToBeBetween import ( TableRowInsertedCountToBeBetweenValidator, @@ -26,9 +25,8 @@ from metadata.generated.schema.entity.data.table import ( ProfileSampleType, TableData, ) -from metadata.ingestion.source.database.datalake.columns import _get_root_col from metadata.profiler.processor.sampler.sampler_interface import SamplerInterface -from metadata.utils.constants import COMPLEX_COLUMN_SEPARATOR +from metadata.utils.sqa_like_column import SQALikeColumn class DatalakeSampler(SamplerInterface): @@ -121,6 +119,23 @@ class DatalakeSampler(SamplerInterface): for df in self.table ] + def get_col_row(self, data_frame, columns: Optional[List[SQALikeColumn]] = None): + """ + Fetches columns and rows from the data_frame + """ + if columns: + cols = [col.name for col in columns] + else: + # we'll use the first dataframe to get the columns + cols = data_frame[0].columns.tolist() + rows = [] + # Sample Data should not exceed sample limit + for chunk in data_frame: + rows.extend(self._fetch_rows(chunk[cols])[: self.sample_limit]) + if len(rows) >= self.sample_limit: + break + return cols, rows + def random_sample(self): """Generate random sample from the table @@ -138,68 +153,12 @@ class DatalakeSampler(SamplerInterface): return self._get_sampled_dataframe() - def get_col_row(self, data_frame): - """ - Fetches columns and rows from the data_frame - """ - result_rows = [] + def _fetch_rows(self, data_frame): + return data_frame.dropna().values.tolist() - for chunk in data_frame: - row_df = self._fetch_rows_df(chunk) - result_rows.extend(row_df.values.tolist()[: self.sample_limit]) - if len(result_rows) >= self.sample_limit: - break - cols = row_df.columns.tolist() - return cols, result_rows - - @staticmethod - def unflatten_dict(flat_dict): - unflattened_dict = {} - for key, value in flat_dict.items(): - keys = key.split(".") - current_dict = unflattened_dict - - for key in keys[:-1]: - current_dict = current_dict.setdefault(key, {}) - - current_dict[keys[-1]] = value - - return unflattened_dict - - def _fetch_rows_df(self, data_frame): - # pylint: disable=import-outside-toplevel - import numpy as np - import pandas as pd - - complex_columns = list( - set( - _get_root_col(col) - for col in data_frame.columns - if COMPLEX_COLUMN_SEPARATOR in col - ) - ) - for complex_col in complex_columns or []: - for df_col in data_frame.columns: - if complex_col in df_col: - complex_col_name = ".".join( - df_col.split(COMPLEX_COLUMN_SEPARATOR)[1:] - ) - if complex_col_name: - data_frame.rename( - columns={df_col: complex_col_name}, - inplace=True, - ) - return pd.json_normalize( - [ - self.unflatten_dict(json.loads(row_values)) - for row_values in data_frame.apply( - lambda row: row.to_json(), axis=1 - ).values - ], - max_level=0, - ).replace(np.nan, None) - - def fetch_sample_data(self) -> TableData: + def fetch_sample_data( + self, columns: Optional[List[SQALikeColumn]] = None + ) -> TableData: """Fetch sample data from the table Returns: @@ -208,5 +167,5 @@ class DatalakeSampler(SamplerInterface): if self._profile_sample_query: return self._fetch_sample_data_from_user_query() - cols, rows = self.get_col_row(data_frame=self.table) + cols, rows = self.get_col_row(data_frame=self.table, columns=columns) return TableData(columns=cols, rows=rows) diff --git a/ingestion/src/metadata/profiler/processor/sampler/sampler_interface.py b/ingestion/src/metadata/profiler/processor/sampler/sampler_interface.py index 4e1c2ce4409..6f9549b514f 100644 --- a/ingestion/src/metadata/profiler/processor/sampler/sampler_interface.py +++ b/ingestion/src/metadata/profiler/processor/sampler/sampler_interface.py @@ -13,10 +13,13 @@ Interface for sampler """ from abc import ABC, abstractmethod -from typing import Dict, Optional +from typing import Dict, List, Optional, Union + +from sqlalchemy import Column from metadata.generated.schema.entity.data.table import TableData from metadata.profiler.api.models import ProfileSampleConfig +from metadata.utils.sqa_like_column import SQALikeColumn class SamplerInterface(ABC): @@ -58,6 +61,12 @@ class SamplerInterface(ABC): raise NotImplementedError @abstractmethod - def fetch_sample_data(self) -> TableData: - """Fetch sample data""" + def fetch_sample_data( + self, columns: Optional[Union[List[Column], List[SQALikeColumn]]] + ) -> TableData: + """Fetch sample data + + Args: + columns (Optional[List]): List of columns to fetch + """ raise NotImplementedError diff --git a/ingestion/src/metadata/profiler/processor/sampler/sqlalchemy/sampler.py b/ingestion/src/metadata/profiler/processor/sampler/sqlalchemy/sampler.py index ccaeebcf259..8c86ce06ecc 100644 --- a/ingestion/src/metadata/profiler/processor/sampler/sqlalchemy/sampler.py +++ b/ingestion/src/metadata/profiler/processor/sampler/sqlalchemy/sampler.py @@ -12,7 +12,7 @@ Helper module to handle data sampling for the profiler """ -from typing import Union, cast +from typing import List, Optional, Union, cast from sqlalchemy import Column, inspect, text from sqlalchemy.orm import DeclarativeMeta, Query, aliased @@ -117,17 +117,24 @@ class SQASampler(SamplerInterface): # Assign as an alias return aliased(self.table, sampled) - def fetch_sample_data(self) -> TableData: + def fetch_sample_data(self, columns: Optional[List[Column]] = None) -> TableData: """ Use the sampler to retrieve sample data rows as per limit given by user - :return: TableData to be added to the Table Entity + + Args: + columns (Optional[List]): List of columns to fetch + Retunrs: + TableData to be added to the Table Entity """ if self._profile_sample_query: return self._fetch_sample_data_from_user_query() # Add new RandomNumFn column rnd = self.get_sample_query() - sqa_columns = [col for col in inspect(rnd).c if col.name != RANDOM_LABEL] + if not columns: + sqa_columns = [col for col in inspect(rnd).c if col.name != RANDOM_LABEL] + else: + sqa_columns = list(columns) # copy columns sqa_sample = ( self.client.query(*sqa_columns) diff --git a/ingestion/src/metadata/readers/dataframe/json.py b/ingestion/src/metadata/readers/dataframe/json.py index 707502b96f1..77f80d2a8fe 100644 --- a/ingestion/src/metadata/readers/dataframe/json.py +++ b/ingestion/src/metadata/readers/dataframe/json.py @@ -21,7 +21,7 @@ from typing import List, Union from metadata.readers.dataframe.base import DataFrameReader from metadata.readers.dataframe.common import dataframe_to_chunks from metadata.readers.dataframe.models import DatalakeColumnWrapper -from metadata.utils.constants import COMPLEX_COLUMN_SEPARATOR, UTF_8 +from metadata.utils.constants import UTF_8 from metadata.utils.logger import ingestion_logger logger = ingestion_logger() @@ -56,7 +56,7 @@ class JSONDataFrameReader(DataFrameReader): correct column name to match with the metadata description. """ # pylint: disable=import-outside-toplevel - from pandas import json_normalize + import pandas as pd json_text = _get_json_text(key=key, text=json_text, decode=decode) try: @@ -65,7 +65,9 @@ class JSONDataFrameReader(DataFrameReader): logger.debug("Failed to read as JSON object. Trying to read as JSON Lines") data = [json.loads(json_obj) for json_obj in json_text.strip().split("\n")] - return dataframe_to_chunks(json_normalize(data, sep=COMPLEX_COLUMN_SEPARATOR)) + # if we get a scalar value (e.g. {"a":"b"}) then we need to specify the index + data = data if not isinstance(data, dict) else [data] + return dataframe_to_chunks(pd.DataFrame.from_records(data)) def _read(self, *, key: str, bucket_name: str, **kwargs) -> DatalakeColumnWrapper: text = self.reader.read(key, bucket_name=bucket_name) diff --git a/ingestion/src/metadata/readers/file/local.py b/ingestion/src/metadata/readers/file/local.py index 1709d4bf121..ec4dc71e4d2 100644 --- a/ingestion/src/metadata/readers/file/local.py +++ b/ingestion/src/metadata/readers/file/local.py @@ -73,8 +73,8 @@ class LocalReader(Reader): file_paths = [] for root, _, file in os.walk(self.base_path): - for f in file: - if search_key in f and f not in excluded_files: - file_paths.append(f"{root}/{f}") + for fle in file: + if search_key in fle and fle not in excluded_files: + file_paths.append(f"{root}/{fle}") return file_paths diff --git a/ingestion/src/metadata/utils/datalake/datalake_utils.py b/ingestion/src/metadata/utils/datalake/datalake_utils.py index 107fc6a0e34..c49193ce4e2 100644 --- a/ingestion/src/metadata/utils/datalake/datalake_utils.py +++ b/ingestion/src/metadata/utils/datalake/datalake_utils.py @@ -16,8 +16,7 @@ from different auths and different file systems. import ast import json import traceback -from functools import reduce -from typing import List, Optional +from typing import Dict, List, Optional, cast from metadata.generated.schema.entity.data.table import Column, DataType from metadata.ingestion.source.database.column_helpers import truncate_column_name @@ -26,7 +25,6 @@ from metadata.readers.dataframe.models import ( DatalakeTableSchemaWrapper, ) from metadata.readers.dataframe.reader_factory import SupportedTypes, get_df_reader -from metadata.utils.constants import COMPLEX_COLUMN_SEPARATOR from metadata.utils.logger import utils_logger logger = utils_logger() @@ -101,42 +99,70 @@ def get_file_format_type(key_name, metadata_entry=None): return False -def get_parent_col(data_frame, complex_cols, parent_col_fqn=""): - """Get Complex Column Objects""" - cols = [] - parent_cols = [top_level[0] for top_level in complex_cols if len(top_level) > 0] - filter_unique = ( - lambda l, x: l # pylint: disable=unnecessary-lambda-assignment - if x in l - else l + [x] - ) - parent_cols = reduce(filter_unique, parent_cols, []) - for top_level in parent_cols: - if parent_col_fqn.startswith(COMPLEX_COLUMN_SEPARATOR) or not parent_col_fqn: - col_fqn = COMPLEX_COLUMN_SEPARATOR.join([parent_col_fqn, top_level]) - else: - col_fqn = COMPLEX_COLUMN_SEPARATOR.join(["", parent_col_fqn, top_level]) - col_obj = { - "name": truncate_column_name(top_level), - "displayName": top_level, - } - leaf_node = [ - leaf_parse[1:] for leaf_parse in complex_cols if top_level == leaf_parse[0] - ] - if any(leaf_node): - col_obj["children"] = [] - col_obj["dataTypeDisplay"] = DataType.RECORD.value - col_obj["dataType"] = DataType.RECORD - col_obj["children"].extend(get_parent_col(data_frame, leaf_node, col_fqn)) - else: - col_type = fetch_col_types(data_frame, col_fqn) - col_obj["dataTypeDisplay"] = col_type.value - col_obj["dataType"] = col_type - col_obj["arrayDataType"] = ( - DataType.UNKNOWN if col_type == DataType.ARRAY else None - ) - cols.append(Column(**col_obj)) - return cols +def unique_json_structure(dicts: List[Dict]) -> Dict: + """Given a sample of `n` json objects, return a json object that represents the unique structure of all `n` objects. + Note that the type of the key will be that of the last object seen in the sample. + + Args: + dicts: list of json objects + """ + result = {} + for dict_ in dicts: + for key, value in dict_.items(): + if isinstance(value, dict): + nested_json = result.get(key, {}) + # `isinstance(nested_json, dict)` if for a key we first see a non dict value + # but then see a dict value later, we will consider the key to be a dict. + result[key] = unique_json_structure( + [nested_json if isinstance(nested_json, dict) else {}, value] + ) + else: + result[key] = value + return result + + +def construct_json_column_children(json_column: Dict) -> List[Dict]: + """Construt a dict representation of a Column object + + Args: + json_column: unique json structure of a column + """ + children = [] + for key, value in json_column.items(): + column = {} + type_ = type(value).__name__.lower() + column["dataTypeDisplay"] = DATALAKE_DATA_TYPES.get( + type_, DataType.UNKNOWN + ).value + column["dataType"] = DATALAKE_DATA_TYPES.get(type_, DataType.UNKNOWN).value + column["name"] = truncate_column_name(key) + column["displayName"] = key + if isinstance(value, dict): + column["children"] = construct_json_column_children(value) + children.append(column) + + return children + + +def get_children(json_column) -> List[Dict]: + """Get children of json column. + + Args: + json_column (pandas.Series): column with 100 sample rows. + Sample rows will be used to infer children. + """ + from pandas import Series # pylint: disable=import-outside-toplevel + + json_column = cast(Series, json_column) + try: + json_column = json_column.apply(json.loads) + except TypeError: + # if values are not strings, we will assume they are already json objects + # based on the read class logic + pass + json_structure = unique_json_structure(json_column.values.tolist()) + + return construct_json_column_children(json_structure) def get_columns(data_frame: "DataFrame"): @@ -147,36 +173,30 @@ def get_columns(data_frame: "DataFrame"): if hasattr(data_frame, "columns"): df_columns = list(data_frame.columns) for column in df_columns: - if COMPLEX_COLUMN_SEPARATOR not in column: - # use String by default - data_type = DataType.STRING - try: - if hasattr(data_frame[column], "dtypes"): - data_type = fetch_col_types(data_frame, column_name=column) + # use String by default + data_type = DataType.STRING + try: + if hasattr(data_frame[column], "dtypes"): + data_type = fetch_col_types(data_frame, column_name=column) - parsed_string = { - "dataTypeDisplay": data_type.value, - "dataType": data_type, - "name": truncate_column_name(column), - "displayName": column, - } - if data_type == DataType.ARRAY: - parsed_string["arrayDataType"] = DataType.UNKNOWN + parsed_string = { + "dataTypeDisplay": data_type.value, + "dataType": data_type, + "name": truncate_column_name(column), + "displayName": column, + } + if data_type == DataType.ARRAY: + parsed_string["arrayDataType"] = DataType.UNKNOWN - cols.append(Column(**parsed_string)) - except Exception as exc: - logger.debug(traceback.format_exc()) - logger.warning( - f"Unexpected exception parsing column [{column}]: {exc}" + if data_type == DataType.JSON: + parsed_string["children"] = get_children( + data_frame[column].dropna()[:100] ) - complex_cols = [ - complex_col.split(COMPLEX_COLUMN_SEPARATOR)[1:] - for complex_col in json.loads( - data_frame.apply(lambda row: row.to_json(), axis=1).values[0] - ).keys() - if COMPLEX_COLUMN_SEPARATOR in complex_col - ] - cols.extend(get_parent_col(data_frame, complex_cols)) + + cols.append(Column(**parsed_string)) + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.warning(f"Unexpected exception parsing column [{column}]: {exc}") return cols @@ -187,8 +207,8 @@ def fetch_col_types(data_frame, column_name): data_frame (DataFrame) column_name (string) """ + data_type = None try: - data_type = None if data_frame[column_name].dtypes.name == "object" and any( data_frame[column_name].dropna().values ): diff --git a/ingestion/src/metadata/utils/entity_link.py b/ingestion/src/metadata/utils/entity_link.py index fe86b9b310a..6ad0c2f0826 100644 --- a/ingestion/src/metadata/utils/entity_link.py +++ b/ingestion/src/metadata/utils/entity_link.py @@ -32,12 +32,12 @@ class EntityLinkBuildingException(Exception): """ -def split(s: str) -> List[str]: +def split(str_: str) -> List[str]: """ Method to handle the splitting logic """ - lexer = EntityLinkLexer(InputStream(s)) + lexer = EntityLinkLexer(InputStream(str_)) stream = CommonTokenStream(lexer) parser = EntityLinkParser(stream) parser._errHandler = BailErrorStrategy() # pylint: disable=protected-access diff --git a/ingestion/src/metadata/utils/fqn.py b/ingestion/src/metadata/utils/fqn.py index 304e13d1b08..2caeddaef9a 100644 --- a/ingestion/src/metadata/utils/fqn.py +++ b/ingestion/src/metadata/utils/fqn.py @@ -67,11 +67,11 @@ class SplitTestCaseFqn(BaseModel): test_case: Optional[str] -def split(s: str) -> List[str]: +def split(str_: str) -> List[str]: """ Equivalent of Java's FullyQualifiedName#split """ - lexer = FqnLexer(InputStream(s)) + lexer = FqnLexer(InputStream(str_)) stream = CommonTokenStream(lexer) parser = FqnParser(stream) parser._errHandler = BailErrorStrategy() # pylint: disable=protected-access diff --git a/ingestion/tests/integration/datalake/__init__.py b/ingestion/tests/integration/datalake/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/ingestion/tests/integration/datalake/resources/names.json b/ingestion/tests/integration/datalake/resources/names.json new file mode 100644 index 00000000000..37fa3a64f10 --- /dev/null +++ b/ingestion/tests/integration/datalake/resources/names.json @@ -0,0 +1,30 @@ +[ + { + "id": "1", + "first_name": "John", + "last_name": "Doe", + "city": "Los Angeles", + "country": "US", + "birthdate": "1980-01-01", + "age": "40", + "json_data": { + "foo": { + "bar": "baz" + } + } + }, + { + "id": "2", + "first_name": "James", + "last_name": "Doe", + "city": "Los Angeles", + "country": "US", + "birthdate": "1980-01-01", + "age": "40", + "json_data": { + "foo": { + "bar": "baz" + } + } + } +] diff --git a/ingestion/tests/integration/datalake/resources/new_users.parquet b/ingestion/tests/integration/datalake/resources/new_users.parquet new file mode 100644 index 00000000000..a8cfcd0de14 Binary files /dev/null and b/ingestion/tests/integration/datalake/resources/new_users.parquet differ diff --git a/ingestion/tests/integration/datalake/resources/users.csv b/ingestion/tests/integration/datalake/resources/users.csv new file mode 100644 index 00000000000..2611c2934b7 --- /dev/null +++ b/ingestion/tests/integration/datalake/resources/users.csv @@ -0,0 +1,4 @@ +id,first_name,last_name,city,country,birthdate,age,json_data +1,John,Doe,Los Angeles,US,1980-01-01,40,{"foo": {"bar": "baz"}} +2,Jane,Doe,Los Angeles,US,2000-12-31,39,{"foo": {"bar": "baz"}} +3,Jane,Smith,Paris,FR,2001-11-11,28,{"foo": {"bar": "baz"}} diff --git a/ingestion/tests/integration/datalake/test_ingestion.py b/ingestion/tests/integration/datalake/test_ingestion.py new file mode 100644 index 00000000000..45048692ab3 --- /dev/null +++ b/ingestion/tests/integration/datalake/test_ingestion.py @@ -0,0 +1,194 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Datalake ingestion integration tests""" + +import os +from copy import deepcopy +from unittest import TestCase + +import boto3 +import botocore +import pytest +from moto import mock_s3 + +from metadata.generated.schema.entity.data.table import DataType, Table +from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( + OpenMetadataConnection, +) +from metadata.generated.schema.security.client.openMetadataJWTClientConfig import ( + OpenMetadataJWTClientConfig, +) +from metadata.ingestion.ometa.models import EntityList +from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.workflow.metadata import MetadataWorkflow +from metadata.workflow.profiler import ProfilerWorkflow + +BUCKET_NAME = "MyBucket" + +INGESTION_CONFIG = { + "source": { + "type": "datalake", + "serviceName": "datalake_for_integration_tests", + "serviceConnection": { + "config": { + "type": "Datalake", + "configSource": { + "securityConfig": { + "awsAccessKeyId": "fake_access_key", + "awsSecretAccessKey": "fake_secret_key", + "awsRegion": "us-weat-1", + } + }, + "bucketName": f"{BUCKET_NAME}", + } + }, + "sourceConfig": {"config": {"type": "DatabaseMetadata"}}, + }, + "sink": {"type": "metadata-rest", "config": {}}, + "workflowConfig": { + "openMetadataServerConfig": { + "hostPort": "http://localhost:8585/api", + "authProvider": "openmetadata", + "securityConfig": { + "jwtToken": "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg" + }, + } + }, +} + + +@mock_s3 +class DatalakeTestE2E(TestCase): + """datalake profiler E2E test""" + + @classmethod + def setUpClass(cls) -> None: + server_config = OpenMetadataConnection( + hostPort="http://localhost:8585/api", + authProvider="openmetadata", + securityConfig=OpenMetadataJWTClientConfig( + jwtToken="eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg" + ), + ) # type: ignore + cls.metadata = OpenMetadata(server_config) + + def setUp(self) -> None: + # Mock our S3 bucket and ingest a file + boto3.DEFAULT_SESSION = None + self.client = boto3.client( + "s3", + region_name="us-weat-1", + ) + + # check that we are not running our test against a real bucket + try: + s3 = boto3.resource( + "s3", + region_name="us-west-1", + aws_access_key_id="fake_access_key", + aws_secret_access_key="fake_secret_key", + ) + s3.meta.client.head_bucket(Bucket=BUCKET_NAME) + except botocore.exceptions.ClientError: + pass + else: + err = f"{BUCKET_NAME} should not exist." + raise EnvironmentError(err) + self.client.create_bucket( + Bucket=BUCKET_NAME, + CreateBucketConfiguration={"LocationConstraint": "us-west-1"}, + ) + current_dir = os.path.dirname(__file__) + resources_dir = os.path.join(current_dir, "resources") + + resources_paths = [ + os.path.join(path, filename) + for path, _, files in os.walk(resources_dir) + for filename in files + ] + + self.s3_keys = [] + + for path in resources_paths: + key = os.path.relpath(path, resources_dir) + self.s3_keys.append(key) + self.client.upload_file(Filename=path, Bucket=BUCKET_NAME, Key=key) + + @pytest.mark.order(10000) + def test_ingestion(self): + """test ingestion of datalake data""" + # Ingest our S3 data + ingestion_workflow = MetadataWorkflow.create(INGESTION_CONFIG) + ingestion_workflow.execute() + ingestion_workflow.raise_from_status() + ingestion_workflow.stop() + + resp: EntityList[Table] = self.metadata.list_entities( + entity=Table, params={"database": "datalake_for_integration_tests.default"} + ) # type: ignore + + entities = resp.entities + self.assertEqual(len(entities), 3) + names = [entity.name.__root__ for entity in entities] + self.assertListEqual( + sorted(["names.json", "new_users.parquet", "users.csv"]), sorted(names) + ) + + for entity in entities: + columns = entity.columns + for column in columns: + if column.dataType == DataType.JSON: + assert column.children + + @pytest.mark.order(10001) + def test_profiler(self): + """Test profiler ingestion""" + workflow_config = deepcopy(INGESTION_CONFIG) + workflow_config["source"]["sourceConfig"]["config"].update( + { + "type": "Profiler", + } + ) + workflow_config["processor"] = { + "type": "orm-profiler", + "config": {}, + } + + profiler_workflow = ProfilerWorkflow.create(workflow_config) + profiler_workflow.execute() + status = profiler_workflow.result_status() + profiler_workflow.stop() + + assert status == 0 + + csv_ = self.metadata.get_by_name( + entity=Table, + fqn='datalake_for_integration_tests.default.MyBucket."users.csv"', + fields=["tableProfilerConfig"], + ) + parquet_ = self.metadata.get_by_name( + entity=Table, + fqn='datalake_for_integration_tests.default.MyBucket."new_users.parquet"', + fields=["tableProfilerConfig"], + ) + json_ = self.metadata.get_by_name( + entity=Table, + fqn='datalake_for_integration_tests.default.MyBucket."names.json"', + fields=["tableProfilerConfig"], + ) + csv_sample_data = self.metadata.get_sample_data(csv_) + parquet_sample_data = self.metadata.get_sample_data(parquet_) + json_sample_data = self.metadata.get_sample_data(json_) + + assert csv_sample_data.sampleData.rows + assert parquet_sample_data.sampleData.rows + assert json_sample_data.sampleData.rows diff --git a/ingestion/tests/integration/orm_profiler/test_datalake_profiler_e2e.py b/ingestion/tests/integration/orm_profiler/test_datalake_profiler_e2e.py index 83a5257637c..d6e6799507a 100644 --- a/ingestion/tests/integration/orm_profiler/test_datalake_profiler_e2e.py +++ b/ingestion/tests/integration/orm_profiler/test_datalake_profiler_e2e.py @@ -309,6 +309,114 @@ class DatalakeProfilerTestE2E(TestCase): assert profile.rowCount == 2.0 + def test_datalake_profiler_workflow_with_custom_profiler_config(self): + """Test custom profiler config return expected sample and metric computation""" + profiler_metrics = [ + "MIN", + "MAX", + "MEAN", + "MEDIAN", + ] + id_metrics = ["MIN", "MAX"] + non_metric_values = ["name", "timestamp"] + + workflow_config = deepcopy(INGESTION_CONFIG) + workflow_config["source"]["sourceConfig"]["config"].update( + { + "type": "Profiler", + } + ) + workflow_config["processor"] = { + "type": "orm-profiler", + "config": { + "profiler": { + "name": "ingestion_profiler", + "metrics": profiler_metrics, + }, + "tableConfig": [ + { + "fullyQualifiedName": 'datalake_for_integration_tests.default.MyBucket."profiler_test_.csv"', + "columnConfig": { + "includeColumns": [ + {"columnName": "id", "metrics": id_metrics}, + {"columnName": "age"}, + ] + }, + } + ], + }, + } + + profiler_workflow = ProfilerWorkflow.create(workflow_config) + profiler_workflow.execute() + status = profiler_workflow.result_status() + profiler_workflow.stop() + + assert status == 0 + + table = self.metadata.get_by_name( + entity=Table, + fqn='datalake_for_integration_tests.default.MyBucket."profiler_test_.csv"', + fields=["tableProfilerConfig"], + ) + + id_profile = self.metadata.get_profile_data( + 'datalake_for_integration_tests.default.MyBucket."profiler_test_.csv".id', + get_beginning_of_day_timestamp_mill(), + get_end_of_day_timestamp_mill(), + profile_type=ColumnProfile, + ).entities + + latest_id_profile = max(id_profile, key=lambda o: o.timestamp.__root__) + + id_metric_ln = 0 + for metric_name, metric in latest_id_profile: + if metric_name.upper() in id_metrics: + assert metric is not None + id_metric_ln += 1 + else: + assert metric is None if metric_name not in non_metric_values else True + + assert id_metric_ln == len(id_metrics) + + age_profile = self.metadata.get_profile_data( + 'datalake_for_integration_tests.default.MyBucket."profiler_test_.csv".age', + get_beginning_of_day_timestamp_mill(), + get_end_of_day_timestamp_mill(), + profile_type=ColumnProfile, + ).entities + + latest_age_profile = max(age_profile, key=lambda o: o.timestamp.__root__) + + age_metric_ln = 0 + for metric_name, metric in latest_age_profile: + if metric_name.upper() in profiler_metrics: + assert metric is not None + age_metric_ln += 1 + else: + assert metric is None if metric_name not in non_metric_values else True + + assert age_metric_ln == len(profiler_metrics) + + latest_exc_timestamp = latest_age_profile.timestamp.__root__ + first_name_profile = self.metadata.get_profile_data( + 'datalake_for_integration_tests.default.MyBucket."profiler_test_.csv".first_name_profile', + get_beginning_of_day_timestamp_mill(), + get_end_of_day_timestamp_mill(), + profile_type=ColumnProfile, + ).entities + + assert not [ + p + for p in first_name_profile + if p.timestamp.__root__ == latest_exc_timestamp + ] + + sample_data = self.metadata.get_sample_data(table) + assert sorted([c.__root__ for c in sample_data.sampleData.columns]) == sorted( + ["id", "age"] + ) + def tearDown(self): s3 = boto3.resource( "s3", diff --git a/ingestion/tests/integration/orm_profiler/test_orm_profiler_e2e.py b/ingestion/tests/integration/orm_profiler/test_orm_profiler_e2e.py index 7514fc48185..53558f1532f 100644 --- a/ingestion/tests/integration/orm_profiler/test_orm_profiler_e2e.py +++ b/ingestion/tests/integration/orm_profiler/test_orm_profiler_e2e.py @@ -24,7 +24,11 @@ from unittest import TestCase from sqlalchemy import Column, DateTime, Integer, String, create_engine from sqlalchemy.orm import declarative_base -from metadata.generated.schema.entity.data.table import ProfileSampleType, Table +from metadata.generated.schema.entity.data.table import ( + ColumnProfile, + ProfileSampleType, + Table, +) from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( OpenMetadataConnection, ) @@ -34,6 +38,10 @@ from metadata.generated.schema.security.client.openMetadataJWTClientConfig impor ) from metadata.ingestion.connections.session import create_and_bind_session from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.utils.time_utils import ( + get_beginning_of_day_timestamp_mill, + get_end_of_day_timestamp_mill, +) from metadata.workflow.metadata import MetadataWorkflow from metadata.workflow.profiler import ProfilerWorkflow from metadata.workflow.workflow_output_handler import print_status @@ -576,3 +584,109 @@ class ProfilerWorkflowTest(TestCase): ).profile assert profile.rowCount == 4.0 + + def test_datalake_profiler_workflow_with_custom_profiler_config(self): + """Test custom profiler config return expected sample and metric computation""" + profiler_metrics = [ + "MIN", + "MAX", + "MEAN", + "MEDIAN", + ] + id_metrics = ["MIN", "MAX"] + non_metric_values = ["name", "timestamp"] + + workflow_config = deepcopy(ingestion_config) + workflow_config["source"]["sourceConfig"]["config"].update( + { + "type": "Profiler", + } + ) + workflow_config["processor"] = { + "type": "orm-profiler", + "config": { + "profiler": { + "name": "ingestion_profiler", + "metrics": profiler_metrics, + }, + "tableConfig": [ + { + "fullyQualifiedName": "test_sqlite.main.main.users", + "columnConfig": { + "includeColumns": [ + {"columnName": "id", "metrics": id_metrics}, + {"columnName": "age"}, + ] + }, + } + ], + }, + } + + profiler_workflow = ProfilerWorkflow.create(workflow_config) + profiler_workflow.execute() + status = profiler_workflow.result_status() + profiler_workflow.stop() + + assert status == 0 + + table = self.metadata.get_by_name( + entity=Table, + fqn="test_sqlite.main.main.users", + fields=["tableProfilerConfig"], + ) + + id_profile = self.metadata.get_profile_data( + "test_sqlite.main.main.users.id", + get_beginning_of_day_timestamp_mill(), + get_end_of_day_timestamp_mill(), + profile_type=ColumnProfile, + ).entities + + latest_id_profile = max(id_profile, key=lambda o: o.timestamp.__root__) + + id_metric_ln = 0 + for metric_name, metric in latest_id_profile: + if metric_name.upper() in id_metrics: + assert metric is not None + id_metric_ln += 1 + else: + assert metric is None if metric_name not in non_metric_values else True + + assert id_metric_ln == len(id_metrics) + + age_profile = self.metadata.get_profile_data( + "test_sqlite.main.main.users.age", + get_beginning_of_day_timestamp_mill(), + get_end_of_day_timestamp_mill(), + profile_type=ColumnProfile, + ).entities + + latest_age_profile = max(age_profile, key=lambda o: o.timestamp.__root__) + + age_metric_ln = 0 + for metric_name, metric in latest_age_profile: + if metric_name.upper() in profiler_metrics: + assert metric is not None + age_metric_ln += 1 + else: + assert metric is None if metric_name not in non_metric_values else True + + assert age_metric_ln == len(profiler_metrics) + + latest_exc_timestamp = latest_age_profile.timestamp.__root__ + fullname_profile = self.metadata.get_profile_data( + "test_sqlite.main.main.users.fullname", + get_beginning_of_day_timestamp_mill(), + get_end_of_day_timestamp_mill(), + profile_type=ColumnProfile, + ).entities + + assert not [ + p for p in fullname_profile if p.timestamp.__root__ == latest_exc_timestamp + ] + + sample_data = self.metadata.get_sample_data(table) + assert sorted([c.__root__ for c in sample_data.sampleData.columns]) == sorted( + ["id", "age"] + ) diff --git a/ingestion/tests/unit/profiler/pandas/test_sample.py b/ingestion/tests/unit/profiler/pandas/test_sample.py index 0bcbe7bcf39..3ced96ca1f5 100644 --- a/ingestion/tests/unit/profiler/pandas/test_sample.py +++ b/ingestion/tests/unit/profiler/pandas/test_sample.py @@ -16,7 +16,6 @@ import os from unittest import TestCase, mock from uuid import uuid4 -import pandas as pd import pytest from sqlalchemy import TEXT, Column, Integer, String from sqlalchemy.orm import declarative_base @@ -222,7 +221,7 @@ class DatalakeSampleTest(TestCase): """ sampler = DatalakeSampler( client=FakeConnection().client(), - table=[pd.concat([self.df1, self.df2])], + table=[self.df1, self.df2], ) sample_data = sampler.fetch_sample_data() diff --git a/ingestion/tests/unit/topology/database/test_couchbase.py b/ingestion/tests/unit/topology/database/test_couchbase.py index 3ccbf9e57eb..944564eaeca 100644 --- a/ingestion/tests/unit/topology/database/test_couchbase.py +++ b/ingestion/tests/unit/topology/database/test_couchbase.py @@ -130,8 +130,8 @@ MOCK_CREATE_TABLE = [ Column( name="address", displayName="address", - dataType=DataType.RECORD, - dataTypeDisplay=DataType.RECORD.value, + dataType=DataType.JSON, + dataTypeDisplay=DataType.JSON.value, children=[ Column( name="line", diff --git a/ingestion/tests/unit/topology/database/test_datalake.py b/ingestion/tests/unit/topology/database/test_datalake.py index f6fa1889385..712d0341452 100644 --- a/ingestion/tests/unit/topology/database/test_datalake.py +++ b/ingestion/tests/unit/topology/database/test_datalake.py @@ -182,8 +182,8 @@ EXAMPLE_JSON_COL_3 = [ ), Column( name="address", - dataType="RECORD", - dataTypeDisplay="RECORD", + dataType="JSON", + dataTypeDisplay="JSON", displayName="address", children=[ Column( @@ -206,8 +206,8 @@ EXAMPLE_JSON_COL_3 = [ ), Column( name="coordinates", - dataType="RECORD", - dataTypeDisplay="RECORD", + dataType="JSON", + dataTypeDisplay="JSON", displayName="coordinates", children=[ Column( @@ -233,13 +233,15 @@ EXAMPLE_JSON_COL_4 = deepcopy(EXAMPLE_JSON_COL_3) EXAMPLE_JSON_COL_4[3].children[3].children = [ Column( name="lat", - dataType="FLOAT", - dataTypeDisplay="FLOAT", + dataType="INT", + dataTypeDisplay="INT", + displayName="lat", ), Column( name="lon", - dataType="FLOAT", - dataTypeDisplay="FLOAT", + dataType="INT", + dataTypeDisplay="INT", + displayName="lon", ), ] @@ -437,13 +439,13 @@ class DatalakeUnitTest(TestCase): sample_dict = {"name": "John", "age": 16, "sex": "M"} - exp_df_list = pd.json_normalize( + exp_df_list = pd.DataFrame.from_records( [ {"name": "John", "age": 16, "sex": "M"}, {"name": "Milan", "age": 19, "sex": "M"}, ] ) - exp_df_obj = pd.json_normalize(sample_dict) + exp_df_obj = pd.DataFrame.from_records([sample_dict]) actual_df_1 = JSONDataFrameReader.read_from_json( key="file.json", json_text=EXAMPLE_JSON_TEST_1, decode=True @@ -467,7 +469,7 @@ class DatalakeUnitTest(TestCase): key="file.json", json_text=EXAMPLE_JSON_TEST_4, decode=True )[0] actual_cols_4 = get_columns(actual_df_4) - self.assertFalse(actual_cols_4 == EXAMPLE_JSON_COL_4) + assert actual_cols_4 == EXAMPLE_JSON_COL_4 def test_avro_file_parse(self): columns = AvroDataFrameReader.read_from_avro(AVRO_SCHEMA_FILE) diff --git a/ingestion/tests/unit/topology/database/test_mongodb.py b/ingestion/tests/unit/topology/database/test_mongodb.py index 13ef37566d3..701f7044dde 100644 --- a/ingestion/tests/unit/topology/database/test_mongodb.py +++ b/ingestion/tests/unit/topology/database/test_mongodb.py @@ -142,8 +142,8 @@ MOCK_CREATE_TABLE = CreateTableRequest( Column( name="address", displayName="address", - dataType=DataType.RECORD, - dataTypeDisplay=DataType.RECORD.value, + dataType=DataType.JSON, + dataTypeDisplay=DataType.JSON.value, children=[ Column( name="line", diff --git a/ingestion/tests/unit/utils/test_datalake.py b/ingestion/tests/unit/utils/test_datalake.py new file mode 100644 index 00000000000..428857ec3d9 --- /dev/null +++ b/ingestion/tests/unit/utils/test_datalake.py @@ -0,0 +1,172 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Test datalake utils +""" + +from unittest import TestCase + +from metadata.generated.schema.entity.data.table import Column +from metadata.utils.datalake.datalake_utils import ( + construct_json_column_children, + unique_json_structure, +) + +STRUCTURE = { + "a": "w", + "b": 4, + "c": { + "d": 2, + "e": 4, + "f": { + "g": 9, + "h": {"i": 6}, + "n": { + "o": 10, + "p": 11, + }, + }, + "j": 7, + "k": 8, + }, +} + + +class TestDatalakeUtils(TestCase): + """class for datalake utils test""" + + def test_unique_json_structure(self): + """test unique json structure fn""" + sample_data = [ + {"a": "x", "b": 1, "c": {"d": 2}}, + {"a": "y", "b": 2, "c": {"e": 4, "f": {"g": 5, "h": {"i": 6}, "n": 5}}}, + {"a": "z", "b": 3, "c": {"j": 7}}, + {"a": "w", "b": 4, "c": {"k": 8, "f": {"g": 9, "n": {"o": 10, "p": 11}}}}, + ] + expected = STRUCTURE + + actual = unique_json_structure(sample_data) + + self.assertDictEqual(expected, actual) + + def test_construct_column(self): + """test construct column fn""" + expected = [ + { + "dataTypeDisplay": "STRING", + "dataType": "STRING", + "name": "a", + "displayName": "a", + }, + { + "dataTypeDisplay": "INT", + "dataType": "INT", + "name": "b", + "displayName": "b", + }, + { + "dataTypeDisplay": "JSON", + "dataType": "JSON", + "name": "c", + "displayName": "c", + "children": [ + { + "dataTypeDisplay": "INT", + "dataType": "INT", + "name": "d", + "displayName": "d", + }, + { + "dataTypeDisplay": "INT", + "dataType": "INT", + "name": "e", + "displayName": "e", + }, + { + "dataTypeDisplay": "JSON", + "dataType": "JSON", + "name": "f", + "displayName": "f", + "children": [ + { + "dataTypeDisplay": "INT", + "dataType": "INT", + "name": "g", + "displayName": "g", + }, + { + "dataTypeDisplay": "JSON", + "dataType": "JSON", + "name": "h", + "displayName": "h", + "children": [ + { + "dataTypeDisplay": "INT", + "dataType": "INT", + "name": "i", + "displayName": "i", + } + ], + }, + { + "dataTypeDisplay": "JSON", + "dataType": "JSON", + "name": "n", + "displayName": "n", + "children": [ + { + "dataTypeDisplay": "INT", + "dataType": "INT", + "name": "o", + "displayName": "o", + }, + { + "dataTypeDisplay": "INT", + "dataType": "INT", + "name": "p", + "displayName": "p", + }, + ], + }, + ], + }, + { + "dataTypeDisplay": "INT", + "dataType": "INT", + "name": "j", + "displayName": "j", + }, + { + "dataTypeDisplay": "INT", + "dataType": "INT", + "name": "k", + "displayName": "k", + }, + ], + }, + ] + actual = construct_json_column_children(STRUCTURE) + + for el in zip(expected, actual): + self.assertDictEqual(el[0], el[1]) + + def test_create_column_object(self): + """test create column object fn""" + formatted_column = construct_json_column_children(STRUCTURE) + column = { + "dataTypeDisplay": "STRING", + "dataType": "STRING", + "name": "a", + "displayName": "a", + "children": formatted_column, + } + column_obj = Column(**column) + assert len(column_obj.children) == 3