Fixes #12601 - column filter for profiler workflow (#13535)

* fix: sample data ingestion to match entity profiler column setting

* fix: python linting

* fix: updated fn call

* fix: added logic to handle json filed in datalake connector

* fix: handle NA values in parsing

* fix: reverted sampler changes from #13338

* fix: reverted metric changes from #13338

* fix: added datalake profiler ingestion test

* fix: python linting

* fix: removed normalization of json blob in NoSQL db
This commit is contained in:
Teddy 2023-10-12 14:51:38 +02:00 committed by GitHub
parent 3178637b53
commit 1cbdfb3ae7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
36 changed files with 831 additions and 208 deletions

View File

@ -15,6 +15,7 @@ volumes:
ingestion-volume-dags:
ingestion-volume-tmp:
es-data:
db-data:
services:
mysql:
build:
@ -39,7 +40,7 @@ services:
timeout: 10s
retries: 10
volumes:
- ./docker-volume/db-data:/var/lib/mysql
- db-data:/var/lib/mysql
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.10.2

View File

@ -215,8 +215,8 @@ class LookerSource(DashboardServiceSource):
file_path = f"{self._main_lookml_repo.path}/{path}"
if not os.path.isfile(file_path):
return None
with open(file_path, "r", encoding="utf-8") as f:
manifest = LookMLManifest.parse_obj(lkml.load(f))
with open(file_path, "r", encoding="utf-8") as fle:
manifest = LookMLManifest.parse_obj(lkml.load(fle))
if manifest and manifest.remote_dependency:
remote_name = manifest.remote_dependency["name"]
remote_git_url = manifest.remote_dependency["url"]

View File

@ -64,5 +64,5 @@ def _clone_repo(
Repo.clone_from(url, path)
logger.info(f"repo {repo_name} cloned to {path}")
except Exception as e:
logger.error(f"GitHubCloneReader::_clone: ERROR {e} ")
except Exception as exc:
logger.error(f"GitHubCloneReader::_clone: ERROR {exc} ")

View File

@ -16,8 +16,6 @@ import traceback
from abc import ABC, abstractmethod
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union
from pandas import json_normalize
from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest
from metadata.generated.schema.api.data.createDatabaseSchema import (
CreateDatabaseSchemaRequest,
@ -45,7 +43,7 @@ from metadata.ingestion.source.database.database_service import (
QueryByProcedure,
)
from metadata.utils import fqn
from metadata.utils.constants import COMPLEX_COLUMN_SEPARATOR, DEFAULT_DATABASE
from metadata.utils.constants import DEFAULT_DATABASE
from metadata.utils.datalake.datalake_utils import get_columns
from metadata.utils.filters import filter_by_schema, filter_by_table
from metadata.utils.logger import ingestion_logger
@ -205,11 +203,13 @@ class CommonNoSQLSource(DatabaseServiceSource, ABC):
From topology.
Prepare a table request and pass it to the sink
"""
import pandas as pd # pylint: disable=import-outside-toplevel
table_name, table_type = table_name_and_type
schema_name = self.context.database_schema.name.__root__
try:
data = self.get_table_columns_dict(schema_name, table_name)
df = json_normalize(list(data), sep=COMPLEX_COLUMN_SEPARATOR)
df = pd.DataFrame.from_records(list(data))
columns = get_columns(df)
table_request = CreateTableRequest(
name=table_name,

View File

@ -75,9 +75,11 @@ logger = ingestion_logger()
# pylint: disable=not-callable
@classmethod
def from_dict(cls, d: Dict[str, any]) -> "TableConstraintList":
def from_dict(cls, dct: Dict[str, Any]) -> "TableConstraintList":
return cls(
table_constraints=[DBTableConstraint.from_dict(constraint) for constraint in d]
table_constraints=[
DBTableConstraint.from_dict(constraint) for constraint in dct
]
)

View File

@ -158,7 +158,7 @@ def get_columns(
# tds_version 4.2 does not support NVARCHAR(MAX)
computed_definition = sql.cast(computed_cols.c.definition, NVARCHAR(4000))
s = (
sql_qry = (
sql.select(
columns,
computed_definition,
@ -173,10 +173,10 @@ def get_columns(
.order_by(columns.c.ordinal_position)
)
c = connection.execution_options(future_result=True).execute(s)
cursr = connection.execution_options(future_result=True).execute(sql_qry)
cols = []
for row in c.mappings():
for row in cursr.mappings():
name = row[columns.c.column_name]
type_ = row[columns.c.data_type]
nullable = row[columns.c.is_nullable] == "YES"

View File

@ -262,7 +262,7 @@ class PandasProfilerInterface(ProfilerInterface, PandasInterfaceMixin):
self.status.scanned(table.name.__root__)
return row, column, metric_type.value
def fetch_sample_data(self, table) -> TableData:
def fetch_sample_data(self, table, columns: SQALikeColumn) -> TableData:
"""Fetch sample data from database
Args:
@ -271,7 +271,8 @@ class PandasProfilerInterface(ProfilerInterface, PandasInterfaceMixin):
Returns:
TableData: sample table data
"""
return self.sampler.fetch_sample_data()
sampler = self._get_sampler()
return sampler.fetch_sample_data(columns)
def get_composed_metrics(
self, column: Column, metric: Metrics, column_results: Dict

View File

@ -325,7 +325,7 @@ class ProfilerInterface(ABC):
raise NotImplementedError
@abstractmethod
def fetch_sample_data(self, table) -> TableData:
def fetch_sample_data(self, table, columns: List[Column]) -> TableData:
"""run profiler metrics"""
raise NotImplementedError
@ -333,3 +333,8 @@ class ProfilerInterface(ABC):
def close(self):
"""Clean up profiler interface"""
raise NotImplementedError
@abstractmethod
def get_columns(self):
"""get columns"""
raise NotImplementedError

View File

@ -22,7 +22,7 @@ from collections import defaultdict
from datetime import datetime, timezone
from typing import Dict, List
from sqlalchemy import Column
from sqlalchemy import Column, inspect
from sqlalchemy.exc import ProgrammingError
from sqlalchemy.orm import scoped_session
@ -454,7 +454,7 @@ class SQAProfilerInterface(ProfilerInterface, SQAInterfaceMixin):
return profile_results
def fetch_sample_data(self, table) -> TableData:
def fetch_sample_data(self, table, columns) -> TableData:
"""Fetch sample data from database
Args:
@ -467,7 +467,7 @@ class SQAProfilerInterface(ProfilerInterface, SQAInterfaceMixin):
table=table,
)
return sampler.fetch_sample_data()
return sampler.fetch_sample_data(columns)
def get_composed_metrics(
self, column: Column, metric: Metrics, column_results: Dict
@ -519,6 +519,10 @@ class SQAProfilerInterface(ProfilerInterface, SQAInterfaceMixin):
f"Skipping metrics due to {exc} for {runner.table.__tablename__}.{column.name}"
)
def get_columns(self):
"""get columns from entity"""
return list(inspect(self.table).c)
def close(self):
"""Clean up session"""
self.session.close()

View File

@ -19,7 +19,6 @@ from sqlalchemy import column, distinct, func
from metadata.profiler.metrics.core import StaticMetric, _label
from metadata.profiler.orm.functions.count import CountFn
from metadata.profiler.orm.registry import is_quantifiable
from metadata.utils.logger import profiler_logger
logger = profiler_logger()
@ -54,17 +53,10 @@ class DistinctCount(StaticMetric):
# pylint: disable=import-outside-toplevel
from collections import Counter
import pandas as pd
try:
counter = Counter()
for df in dfs:
df_col = df[self.col.name].dropna()
df_col_value = (
pd.to_numeric(df_col).to_list()
if is_quantifiable(self.col.type)
else df_col.to_list()
)
df_col_value = df[self.col.name].dropna().to_list()
counter.update(df_col_value)
return len(counter.keys())
except Exception as err:

View File

@ -106,10 +106,10 @@ class StdDev(StaticMetric):
if is_quantifiable(self.col.type):
try:
merged_df = pd.to_numeric(pd.concat(df[self.col.name] for df in dfs))
if len(merged_df) > 1:
return merged_df.std()
return 0
df = pd.to_numeric(pd.concat(df[self.col.name] for df in dfs))
if not df.empty:
return df.std()
return None
except MemoryError:
logger.error(
f"Unable to compute Standard Deviation for {self.col.name} due to memory constraints."

View File

@ -48,9 +48,7 @@ class Sum(StaticMetric):
def df_fn(self, dfs=None):
"""pandas function"""
# pylint: disable=import-outside-toplevel
import pandas as pd
if is_quantifiable(self.col.type):
return sum(pd.to_numeric(df[self.col.name]).sum() for df in dfs)
return sum(df[self.col.name].sum() for df in dfs)
return None

View File

@ -83,7 +83,7 @@ class FirstQuartile(StaticMetric, PercentilMixin):
# the entire set. Median of Medians could be used
# though it would required set to be sorted before hand
try:
df = pd.to_numeric(pd.concat([df[self.col.name] for df in dfs]))
df = pd.concat([df[self.col.name] for df in dfs])
except MemoryError:
logger.error(
f"Unable to compute Median for {self.col.name} due to memory constraints."

View File

@ -82,7 +82,7 @@ class Median(StaticMetric, PercentilMixin):
# the entire set. Median of Medians could be used
# though it would required set to be sorted before hand
try:
df = pd.to_numeric(pd.concat([df[self.col.name] for df in dfs]))
df = pd.concat([df[self.col.name] for df in dfs])
except MemoryError:
logger.error(
f"Unable to compute Median for {self.col.name} due to memory constraints."

View File

@ -83,7 +83,7 @@ class ThirdQuartile(StaticMetric, PercentilMixin):
# the entire set. Median of Medians could be used
# though it would required set to be sorted before hand
try:
df = pd.to_numeric(pd.concat([df[self.col.name] for df in dfs]))
df = pd.concat([df[self.col.name] for df in dfs])
except MemoryError:
logger.error(
f"Unable to compute Median for {self.col.name} due to memory constraints."

View File

@ -494,7 +494,7 @@ class Profiler(Generic[TMetric]):
"Fetching sample data for "
f"{self.profiler_interface.table_entity.fullyQualifiedName.__root__}..." # type: ignore
)
return self.profiler_interface.fetch_sample_data(self.table)
return self.profiler_interface.fetch_sample_data(self.table, self.columns)
except Exception as err:
logger.debug(traceback.format_exc())
logger.warning(f"Error fetching sample data: {err}")

View File

@ -12,10 +12,9 @@
Helper module to handle data sampling
for the profiler
"""
import json
import math
import random
from typing import cast
from typing import List, Optional, cast
from metadata.data_quality.validations.table.pandas.tableRowInsertedCountToBeBetween import (
TableRowInsertedCountToBeBetweenValidator,
@ -26,9 +25,8 @@ from metadata.generated.schema.entity.data.table import (
ProfileSampleType,
TableData,
)
from metadata.ingestion.source.database.datalake.columns import _get_root_col
from metadata.profiler.processor.sampler.sampler_interface import SamplerInterface
from metadata.utils.constants import COMPLEX_COLUMN_SEPARATOR
from metadata.utils.sqa_like_column import SQALikeColumn
class DatalakeSampler(SamplerInterface):
@ -121,6 +119,23 @@ class DatalakeSampler(SamplerInterface):
for df in self.table
]
def get_col_row(self, data_frame, columns: Optional[List[SQALikeColumn]] = None):
"""
Fetches columns and rows from the data_frame
"""
if columns:
cols = [col.name for col in columns]
else:
# we'll use the first dataframe to get the columns
cols = data_frame[0].columns.tolist()
rows = []
# Sample Data should not exceed sample limit
for chunk in data_frame:
rows.extend(self._fetch_rows(chunk[cols])[: self.sample_limit])
if len(rows) >= self.sample_limit:
break
return cols, rows
def random_sample(self):
"""Generate random sample from the table
@ -138,68 +153,12 @@ class DatalakeSampler(SamplerInterface):
return self._get_sampled_dataframe()
def get_col_row(self, data_frame):
"""
Fetches columns and rows from the data_frame
"""
result_rows = []
def _fetch_rows(self, data_frame):
return data_frame.dropna().values.tolist()
for chunk in data_frame:
row_df = self._fetch_rows_df(chunk)
result_rows.extend(row_df.values.tolist()[: self.sample_limit])
if len(result_rows) >= self.sample_limit:
break
cols = row_df.columns.tolist()
return cols, result_rows
@staticmethod
def unflatten_dict(flat_dict):
unflattened_dict = {}
for key, value in flat_dict.items():
keys = key.split(".")
current_dict = unflattened_dict
for key in keys[:-1]:
current_dict = current_dict.setdefault(key, {})
current_dict[keys[-1]] = value
return unflattened_dict
def _fetch_rows_df(self, data_frame):
# pylint: disable=import-outside-toplevel
import numpy as np
import pandas as pd
complex_columns = list(
set(
_get_root_col(col)
for col in data_frame.columns
if COMPLEX_COLUMN_SEPARATOR in col
)
)
for complex_col in complex_columns or []:
for df_col in data_frame.columns:
if complex_col in df_col:
complex_col_name = ".".join(
df_col.split(COMPLEX_COLUMN_SEPARATOR)[1:]
)
if complex_col_name:
data_frame.rename(
columns={df_col: complex_col_name},
inplace=True,
)
return pd.json_normalize(
[
self.unflatten_dict(json.loads(row_values))
for row_values in data_frame.apply(
lambda row: row.to_json(), axis=1
).values
],
max_level=0,
).replace(np.nan, None)
def fetch_sample_data(self) -> TableData:
def fetch_sample_data(
self, columns: Optional[List[SQALikeColumn]] = None
) -> TableData:
"""Fetch sample data from the table
Returns:
@ -208,5 +167,5 @@ class DatalakeSampler(SamplerInterface):
if self._profile_sample_query:
return self._fetch_sample_data_from_user_query()
cols, rows = self.get_col_row(data_frame=self.table)
cols, rows = self.get_col_row(data_frame=self.table, columns=columns)
return TableData(columns=cols, rows=rows)

View File

@ -13,10 +13,13 @@ Interface for sampler
"""
from abc import ABC, abstractmethod
from typing import Dict, Optional
from typing import Dict, List, Optional, Union
from sqlalchemy import Column
from metadata.generated.schema.entity.data.table import TableData
from metadata.profiler.api.models import ProfileSampleConfig
from metadata.utils.sqa_like_column import SQALikeColumn
class SamplerInterface(ABC):
@ -58,6 +61,12 @@ class SamplerInterface(ABC):
raise NotImplementedError
@abstractmethod
def fetch_sample_data(self) -> TableData:
"""Fetch sample data"""
def fetch_sample_data(
self, columns: Optional[Union[List[Column], List[SQALikeColumn]]]
) -> TableData:
"""Fetch sample data
Args:
columns (Optional[List]): List of columns to fetch
"""
raise NotImplementedError

View File

@ -12,7 +12,7 @@
Helper module to handle data sampling
for the profiler
"""
from typing import Union, cast
from typing import List, Optional, Union, cast
from sqlalchemy import Column, inspect, text
from sqlalchemy.orm import DeclarativeMeta, Query, aliased
@ -117,17 +117,24 @@ class SQASampler(SamplerInterface):
# Assign as an alias
return aliased(self.table, sampled)
def fetch_sample_data(self) -> TableData:
def fetch_sample_data(self, columns: Optional[List[Column]] = None) -> TableData:
"""
Use the sampler to retrieve sample data rows as per limit given by user
:return: TableData to be added to the Table Entity
Args:
columns (Optional[List]): List of columns to fetch
Retunrs:
TableData to be added to the Table Entity
"""
if self._profile_sample_query:
return self._fetch_sample_data_from_user_query()
# Add new RandomNumFn column
rnd = self.get_sample_query()
sqa_columns = [col for col in inspect(rnd).c if col.name != RANDOM_LABEL]
if not columns:
sqa_columns = [col for col in inspect(rnd).c if col.name != RANDOM_LABEL]
else:
sqa_columns = list(columns) # copy columns
sqa_sample = (
self.client.query(*sqa_columns)

View File

@ -21,7 +21,7 @@ from typing import List, Union
from metadata.readers.dataframe.base import DataFrameReader
from metadata.readers.dataframe.common import dataframe_to_chunks
from metadata.readers.dataframe.models import DatalakeColumnWrapper
from metadata.utils.constants import COMPLEX_COLUMN_SEPARATOR, UTF_8
from metadata.utils.constants import UTF_8
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
@ -56,7 +56,7 @@ class JSONDataFrameReader(DataFrameReader):
correct column name to match with the metadata description.
"""
# pylint: disable=import-outside-toplevel
from pandas import json_normalize
import pandas as pd
json_text = _get_json_text(key=key, text=json_text, decode=decode)
try:
@ -65,7 +65,9 @@ class JSONDataFrameReader(DataFrameReader):
logger.debug("Failed to read as JSON object. Trying to read as JSON Lines")
data = [json.loads(json_obj) for json_obj in json_text.strip().split("\n")]
return dataframe_to_chunks(json_normalize(data, sep=COMPLEX_COLUMN_SEPARATOR))
# if we get a scalar value (e.g. {"a":"b"}) then we need to specify the index
data = data if not isinstance(data, dict) else [data]
return dataframe_to_chunks(pd.DataFrame.from_records(data))
def _read(self, *, key: str, bucket_name: str, **kwargs) -> DatalakeColumnWrapper:
text = self.reader.read(key, bucket_name=bucket_name)

View File

@ -73,8 +73,8 @@ class LocalReader(Reader):
file_paths = []
for root, _, file in os.walk(self.base_path):
for f in file:
if search_key in f and f not in excluded_files:
file_paths.append(f"{root}/{f}")
for fle in file:
if search_key in fle and fle not in excluded_files:
file_paths.append(f"{root}/{fle}")
return file_paths

View File

@ -16,8 +16,7 @@ from different auths and different file systems.
import ast
import json
import traceback
from functools import reduce
from typing import List, Optional
from typing import Dict, List, Optional, cast
from metadata.generated.schema.entity.data.table import Column, DataType
from metadata.ingestion.source.database.column_helpers import truncate_column_name
@ -26,7 +25,6 @@ from metadata.readers.dataframe.models import (
DatalakeTableSchemaWrapper,
)
from metadata.readers.dataframe.reader_factory import SupportedTypes, get_df_reader
from metadata.utils.constants import COMPLEX_COLUMN_SEPARATOR
from metadata.utils.logger import utils_logger
logger = utils_logger()
@ -101,42 +99,70 @@ def get_file_format_type(key_name, metadata_entry=None):
return False
def get_parent_col(data_frame, complex_cols, parent_col_fqn=""):
"""Get Complex Column Objects"""
cols = []
parent_cols = [top_level[0] for top_level in complex_cols if len(top_level) > 0]
filter_unique = (
lambda l, x: l # pylint: disable=unnecessary-lambda-assignment
if x in l
else l + [x]
)
parent_cols = reduce(filter_unique, parent_cols, [])
for top_level in parent_cols:
if parent_col_fqn.startswith(COMPLEX_COLUMN_SEPARATOR) or not parent_col_fqn:
col_fqn = COMPLEX_COLUMN_SEPARATOR.join([parent_col_fqn, top_level])
else:
col_fqn = COMPLEX_COLUMN_SEPARATOR.join(["", parent_col_fqn, top_level])
col_obj = {
"name": truncate_column_name(top_level),
"displayName": top_level,
}
leaf_node = [
leaf_parse[1:] for leaf_parse in complex_cols if top_level == leaf_parse[0]
]
if any(leaf_node):
col_obj["children"] = []
col_obj["dataTypeDisplay"] = DataType.RECORD.value
col_obj["dataType"] = DataType.RECORD
col_obj["children"].extend(get_parent_col(data_frame, leaf_node, col_fqn))
else:
col_type = fetch_col_types(data_frame, col_fqn)
col_obj["dataTypeDisplay"] = col_type.value
col_obj["dataType"] = col_type
col_obj["arrayDataType"] = (
DataType.UNKNOWN if col_type == DataType.ARRAY else None
)
cols.append(Column(**col_obj))
return cols
def unique_json_structure(dicts: List[Dict]) -> Dict:
"""Given a sample of `n` json objects, return a json object that represents the unique structure of all `n` objects.
Note that the type of the key will be that of the last object seen in the sample.
Args:
dicts: list of json objects
"""
result = {}
for dict_ in dicts:
for key, value in dict_.items():
if isinstance(value, dict):
nested_json = result.get(key, {})
# `isinstance(nested_json, dict)` if for a key we first see a non dict value
# but then see a dict value later, we will consider the key to be a dict.
result[key] = unique_json_structure(
[nested_json if isinstance(nested_json, dict) else {}, value]
)
else:
result[key] = value
return result
def construct_json_column_children(json_column: Dict) -> List[Dict]:
"""Construt a dict representation of a Column object
Args:
json_column: unique json structure of a column
"""
children = []
for key, value in json_column.items():
column = {}
type_ = type(value).__name__.lower()
column["dataTypeDisplay"] = DATALAKE_DATA_TYPES.get(
type_, DataType.UNKNOWN
).value
column["dataType"] = DATALAKE_DATA_TYPES.get(type_, DataType.UNKNOWN).value
column["name"] = truncate_column_name(key)
column["displayName"] = key
if isinstance(value, dict):
column["children"] = construct_json_column_children(value)
children.append(column)
return children
def get_children(json_column) -> List[Dict]:
"""Get children of json column.
Args:
json_column (pandas.Series): column with 100 sample rows.
Sample rows will be used to infer children.
"""
from pandas import Series # pylint: disable=import-outside-toplevel
json_column = cast(Series, json_column)
try:
json_column = json_column.apply(json.loads)
except TypeError:
# if values are not strings, we will assume they are already json objects
# based on the read class logic
pass
json_structure = unique_json_structure(json_column.values.tolist())
return construct_json_column_children(json_structure)
def get_columns(data_frame: "DataFrame"):
@ -147,36 +173,30 @@ def get_columns(data_frame: "DataFrame"):
if hasattr(data_frame, "columns"):
df_columns = list(data_frame.columns)
for column in df_columns:
if COMPLEX_COLUMN_SEPARATOR not in column:
# use String by default
data_type = DataType.STRING
try:
if hasattr(data_frame[column], "dtypes"):
data_type = fetch_col_types(data_frame, column_name=column)
# use String by default
data_type = DataType.STRING
try:
if hasattr(data_frame[column], "dtypes"):
data_type = fetch_col_types(data_frame, column_name=column)
parsed_string = {
"dataTypeDisplay": data_type.value,
"dataType": data_type,
"name": truncate_column_name(column),
"displayName": column,
}
if data_type == DataType.ARRAY:
parsed_string["arrayDataType"] = DataType.UNKNOWN
parsed_string = {
"dataTypeDisplay": data_type.value,
"dataType": data_type,
"name": truncate_column_name(column),
"displayName": column,
}
if data_type == DataType.ARRAY:
parsed_string["arrayDataType"] = DataType.UNKNOWN
cols.append(Column(**parsed_string))
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(
f"Unexpected exception parsing column [{column}]: {exc}"
if data_type == DataType.JSON:
parsed_string["children"] = get_children(
data_frame[column].dropna()[:100]
)
complex_cols = [
complex_col.split(COMPLEX_COLUMN_SEPARATOR)[1:]
for complex_col in json.loads(
data_frame.apply(lambda row: row.to_json(), axis=1).values[0]
).keys()
if COMPLEX_COLUMN_SEPARATOR in complex_col
]
cols.extend(get_parent_col(data_frame, complex_cols))
cols.append(Column(**parsed_string))
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Unexpected exception parsing column [{column}]: {exc}")
return cols
@ -187,8 +207,8 @@ def fetch_col_types(data_frame, column_name):
data_frame (DataFrame)
column_name (string)
"""
data_type = None
try:
data_type = None
if data_frame[column_name].dtypes.name == "object" and any(
data_frame[column_name].dropna().values
):

View File

@ -32,12 +32,12 @@ class EntityLinkBuildingException(Exception):
"""
def split(s: str) -> List[str]:
def split(str_: str) -> List[str]:
"""
Method to handle the splitting logic
"""
lexer = EntityLinkLexer(InputStream(s))
lexer = EntityLinkLexer(InputStream(str_))
stream = CommonTokenStream(lexer)
parser = EntityLinkParser(stream)
parser._errHandler = BailErrorStrategy() # pylint: disable=protected-access

View File

@ -67,11 +67,11 @@ class SplitTestCaseFqn(BaseModel):
test_case: Optional[str]
def split(s: str) -> List[str]:
def split(str_: str) -> List[str]:
"""
Equivalent of Java's FullyQualifiedName#split
"""
lexer = FqnLexer(InputStream(s))
lexer = FqnLexer(InputStream(str_))
stream = CommonTokenStream(lexer)
parser = FqnParser(stream)
parser._errHandler = BailErrorStrategy() # pylint: disable=protected-access

View File

@ -0,0 +1,30 @@
[
{
"id": "1",
"first_name": "John",
"last_name": "Doe",
"city": "Los Angeles",
"country": "US",
"birthdate": "1980-01-01",
"age": "40",
"json_data": {
"foo": {
"bar": "baz"
}
}
},
{
"id": "2",
"first_name": "James",
"last_name": "Doe",
"city": "Los Angeles",
"country": "US",
"birthdate": "1980-01-01",
"age": "40",
"json_data": {
"foo": {
"bar": "baz"
}
}
}
]

View File

@ -0,0 +1,4 @@
id,first_name,last_name,city,country,birthdate,age,json_data
1,John,Doe,Los Angeles,US,1980-01-01,40,{"foo": {"bar": "baz"}}
2,Jane,Doe,Los Angeles,US,2000-12-31,39,{"foo": {"bar": "baz"}}
3,Jane,Smith,Paris,FR,2001-11-11,28,{"foo": {"bar": "baz"}}
Can't render this file because it contains an unexpected character in line 2 and column 42.

View File

@ -0,0 +1,194 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Datalake ingestion integration tests"""
import os
from copy import deepcopy
from unittest import TestCase
import boto3
import botocore
import pytest
from moto import mock_s3
from metadata.generated.schema.entity.data.table import DataType, Table
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.security.client.openMetadataJWTClientConfig import (
OpenMetadataJWTClientConfig,
)
from metadata.ingestion.ometa.models import EntityList
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.workflow.metadata import MetadataWorkflow
from metadata.workflow.profiler import ProfilerWorkflow
BUCKET_NAME = "MyBucket"
INGESTION_CONFIG = {
"source": {
"type": "datalake",
"serviceName": "datalake_for_integration_tests",
"serviceConnection": {
"config": {
"type": "Datalake",
"configSource": {
"securityConfig": {
"awsAccessKeyId": "fake_access_key",
"awsSecretAccessKey": "fake_secret_key",
"awsRegion": "us-weat-1",
}
},
"bucketName": f"{BUCKET_NAME}",
}
},
"sourceConfig": {"config": {"type": "DatabaseMetadata"}},
},
"sink": {"type": "metadata-rest", "config": {}},
"workflowConfig": {
"openMetadataServerConfig": {
"hostPort": "http://localhost:8585/api",
"authProvider": "openmetadata",
"securityConfig": {
"jwtToken": "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"
},
}
},
}
@mock_s3
class DatalakeTestE2E(TestCase):
"""datalake profiler E2E test"""
@classmethod
def setUpClass(cls) -> None:
server_config = OpenMetadataConnection(
hostPort="http://localhost:8585/api",
authProvider="openmetadata",
securityConfig=OpenMetadataJWTClientConfig(
jwtToken="eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"
),
) # type: ignore
cls.metadata = OpenMetadata(server_config)
def setUp(self) -> None:
# Mock our S3 bucket and ingest a file
boto3.DEFAULT_SESSION = None
self.client = boto3.client(
"s3",
region_name="us-weat-1",
)
# check that we are not running our test against a real bucket
try:
s3 = boto3.resource(
"s3",
region_name="us-west-1",
aws_access_key_id="fake_access_key",
aws_secret_access_key="fake_secret_key",
)
s3.meta.client.head_bucket(Bucket=BUCKET_NAME)
except botocore.exceptions.ClientError:
pass
else:
err = f"{BUCKET_NAME} should not exist."
raise EnvironmentError(err)
self.client.create_bucket(
Bucket=BUCKET_NAME,
CreateBucketConfiguration={"LocationConstraint": "us-west-1"},
)
current_dir = os.path.dirname(__file__)
resources_dir = os.path.join(current_dir, "resources")
resources_paths = [
os.path.join(path, filename)
for path, _, files in os.walk(resources_dir)
for filename in files
]
self.s3_keys = []
for path in resources_paths:
key = os.path.relpath(path, resources_dir)
self.s3_keys.append(key)
self.client.upload_file(Filename=path, Bucket=BUCKET_NAME, Key=key)
@pytest.mark.order(10000)
def test_ingestion(self):
"""test ingestion of datalake data"""
# Ingest our S3 data
ingestion_workflow = MetadataWorkflow.create(INGESTION_CONFIG)
ingestion_workflow.execute()
ingestion_workflow.raise_from_status()
ingestion_workflow.stop()
resp: EntityList[Table] = self.metadata.list_entities(
entity=Table, params={"database": "datalake_for_integration_tests.default"}
) # type: ignore
entities = resp.entities
self.assertEqual(len(entities), 3)
names = [entity.name.__root__ for entity in entities]
self.assertListEqual(
sorted(["names.json", "new_users.parquet", "users.csv"]), sorted(names)
)
for entity in entities:
columns = entity.columns
for column in columns:
if column.dataType == DataType.JSON:
assert column.children
@pytest.mark.order(10001)
def test_profiler(self):
"""Test profiler ingestion"""
workflow_config = deepcopy(INGESTION_CONFIG)
workflow_config["source"]["sourceConfig"]["config"].update(
{
"type": "Profiler",
}
)
workflow_config["processor"] = {
"type": "orm-profiler",
"config": {},
}
profiler_workflow = ProfilerWorkflow.create(workflow_config)
profiler_workflow.execute()
status = profiler_workflow.result_status()
profiler_workflow.stop()
assert status == 0
csv_ = self.metadata.get_by_name(
entity=Table,
fqn='datalake_for_integration_tests.default.MyBucket."users.csv"',
fields=["tableProfilerConfig"],
)
parquet_ = self.metadata.get_by_name(
entity=Table,
fqn='datalake_for_integration_tests.default.MyBucket."new_users.parquet"',
fields=["tableProfilerConfig"],
)
json_ = self.metadata.get_by_name(
entity=Table,
fqn='datalake_for_integration_tests.default.MyBucket."names.json"',
fields=["tableProfilerConfig"],
)
csv_sample_data = self.metadata.get_sample_data(csv_)
parquet_sample_data = self.metadata.get_sample_data(parquet_)
json_sample_data = self.metadata.get_sample_data(json_)
assert csv_sample_data.sampleData.rows
assert parquet_sample_data.sampleData.rows
assert json_sample_data.sampleData.rows

View File

@ -309,6 +309,114 @@ class DatalakeProfilerTestE2E(TestCase):
assert profile.rowCount == 2.0
def test_datalake_profiler_workflow_with_custom_profiler_config(self):
"""Test custom profiler config return expected sample and metric computation"""
profiler_metrics = [
"MIN",
"MAX",
"MEAN",
"MEDIAN",
]
id_metrics = ["MIN", "MAX"]
non_metric_values = ["name", "timestamp"]
workflow_config = deepcopy(INGESTION_CONFIG)
workflow_config["source"]["sourceConfig"]["config"].update(
{
"type": "Profiler",
}
)
workflow_config["processor"] = {
"type": "orm-profiler",
"config": {
"profiler": {
"name": "ingestion_profiler",
"metrics": profiler_metrics,
},
"tableConfig": [
{
"fullyQualifiedName": 'datalake_for_integration_tests.default.MyBucket."profiler_test_.csv"',
"columnConfig": {
"includeColumns": [
{"columnName": "id", "metrics": id_metrics},
{"columnName": "age"},
]
},
}
],
},
}
profiler_workflow = ProfilerWorkflow.create(workflow_config)
profiler_workflow.execute()
status = profiler_workflow.result_status()
profiler_workflow.stop()
assert status == 0
table = self.metadata.get_by_name(
entity=Table,
fqn='datalake_for_integration_tests.default.MyBucket."profiler_test_.csv"',
fields=["tableProfilerConfig"],
)
id_profile = self.metadata.get_profile_data(
'datalake_for_integration_tests.default.MyBucket."profiler_test_.csv".id',
get_beginning_of_day_timestamp_mill(),
get_end_of_day_timestamp_mill(),
profile_type=ColumnProfile,
).entities
latest_id_profile = max(id_profile, key=lambda o: o.timestamp.__root__)
id_metric_ln = 0
for metric_name, metric in latest_id_profile:
if metric_name.upper() in id_metrics:
assert metric is not None
id_metric_ln += 1
else:
assert metric is None if metric_name not in non_metric_values else True
assert id_metric_ln == len(id_metrics)
age_profile = self.metadata.get_profile_data(
'datalake_for_integration_tests.default.MyBucket."profiler_test_.csv".age',
get_beginning_of_day_timestamp_mill(),
get_end_of_day_timestamp_mill(),
profile_type=ColumnProfile,
).entities
latest_age_profile = max(age_profile, key=lambda o: o.timestamp.__root__)
age_metric_ln = 0
for metric_name, metric in latest_age_profile:
if metric_name.upper() in profiler_metrics:
assert metric is not None
age_metric_ln += 1
else:
assert metric is None if metric_name not in non_metric_values else True
assert age_metric_ln == len(profiler_metrics)
latest_exc_timestamp = latest_age_profile.timestamp.__root__
first_name_profile = self.metadata.get_profile_data(
'datalake_for_integration_tests.default.MyBucket."profiler_test_.csv".first_name_profile',
get_beginning_of_day_timestamp_mill(),
get_end_of_day_timestamp_mill(),
profile_type=ColumnProfile,
).entities
assert not [
p
for p in first_name_profile
if p.timestamp.__root__ == latest_exc_timestamp
]
sample_data = self.metadata.get_sample_data(table)
assert sorted([c.__root__ for c in sample_data.sampleData.columns]) == sorted(
["id", "age"]
)
def tearDown(self):
s3 = boto3.resource(
"s3",

View File

@ -24,7 +24,11 @@ from unittest import TestCase
from sqlalchemy import Column, DateTime, Integer, String, create_engine
from sqlalchemy.orm import declarative_base
from metadata.generated.schema.entity.data.table import ProfileSampleType, Table
from metadata.generated.schema.entity.data.table import (
ColumnProfile,
ProfileSampleType,
Table,
)
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
@ -34,6 +38,10 @@ from metadata.generated.schema.security.client.openMetadataJWTClientConfig impor
)
from metadata.ingestion.connections.session import create_and_bind_session
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils.time_utils import (
get_beginning_of_day_timestamp_mill,
get_end_of_day_timestamp_mill,
)
from metadata.workflow.metadata import MetadataWorkflow
from metadata.workflow.profiler import ProfilerWorkflow
from metadata.workflow.workflow_output_handler import print_status
@ -576,3 +584,109 @@ class ProfilerWorkflowTest(TestCase):
).profile
assert profile.rowCount == 4.0
def test_datalake_profiler_workflow_with_custom_profiler_config(self):
"""Test custom profiler config return expected sample and metric computation"""
profiler_metrics = [
"MIN",
"MAX",
"MEAN",
"MEDIAN",
]
id_metrics = ["MIN", "MAX"]
non_metric_values = ["name", "timestamp"]
workflow_config = deepcopy(ingestion_config)
workflow_config["source"]["sourceConfig"]["config"].update(
{
"type": "Profiler",
}
)
workflow_config["processor"] = {
"type": "orm-profiler",
"config": {
"profiler": {
"name": "ingestion_profiler",
"metrics": profiler_metrics,
},
"tableConfig": [
{
"fullyQualifiedName": "test_sqlite.main.main.users",
"columnConfig": {
"includeColumns": [
{"columnName": "id", "metrics": id_metrics},
{"columnName": "age"},
]
},
}
],
},
}
profiler_workflow = ProfilerWorkflow.create(workflow_config)
profiler_workflow.execute()
status = profiler_workflow.result_status()
profiler_workflow.stop()
assert status == 0
table = self.metadata.get_by_name(
entity=Table,
fqn="test_sqlite.main.main.users",
fields=["tableProfilerConfig"],
)
id_profile = self.metadata.get_profile_data(
"test_sqlite.main.main.users.id",
get_beginning_of_day_timestamp_mill(),
get_end_of_day_timestamp_mill(),
profile_type=ColumnProfile,
).entities
latest_id_profile = max(id_profile, key=lambda o: o.timestamp.__root__)
id_metric_ln = 0
for metric_name, metric in latest_id_profile:
if metric_name.upper() in id_metrics:
assert metric is not None
id_metric_ln += 1
else:
assert metric is None if metric_name not in non_metric_values else True
assert id_metric_ln == len(id_metrics)
age_profile = self.metadata.get_profile_data(
"test_sqlite.main.main.users.age",
get_beginning_of_day_timestamp_mill(),
get_end_of_day_timestamp_mill(),
profile_type=ColumnProfile,
).entities
latest_age_profile = max(age_profile, key=lambda o: o.timestamp.__root__)
age_metric_ln = 0
for metric_name, metric in latest_age_profile:
if metric_name.upper() in profiler_metrics:
assert metric is not None
age_metric_ln += 1
else:
assert metric is None if metric_name not in non_metric_values else True
assert age_metric_ln == len(profiler_metrics)
latest_exc_timestamp = latest_age_profile.timestamp.__root__
fullname_profile = self.metadata.get_profile_data(
"test_sqlite.main.main.users.fullname",
get_beginning_of_day_timestamp_mill(),
get_end_of_day_timestamp_mill(),
profile_type=ColumnProfile,
).entities
assert not [
p for p in fullname_profile if p.timestamp.__root__ == latest_exc_timestamp
]
sample_data = self.metadata.get_sample_data(table)
assert sorted([c.__root__ for c in sample_data.sampleData.columns]) == sorted(
["id", "age"]
)

View File

@ -16,7 +16,6 @@ import os
from unittest import TestCase, mock
from uuid import uuid4
import pandas as pd
import pytest
from sqlalchemy import TEXT, Column, Integer, String
from sqlalchemy.orm import declarative_base
@ -222,7 +221,7 @@ class DatalakeSampleTest(TestCase):
"""
sampler = DatalakeSampler(
client=FakeConnection().client(),
table=[pd.concat([self.df1, self.df2])],
table=[self.df1, self.df2],
)
sample_data = sampler.fetch_sample_data()

View File

@ -130,8 +130,8 @@ MOCK_CREATE_TABLE = [
Column(
name="address",
displayName="address",
dataType=DataType.RECORD,
dataTypeDisplay=DataType.RECORD.value,
dataType=DataType.JSON,
dataTypeDisplay=DataType.JSON.value,
children=[
Column(
name="line",

View File

@ -182,8 +182,8 @@ EXAMPLE_JSON_COL_3 = [
),
Column(
name="address",
dataType="RECORD",
dataTypeDisplay="RECORD",
dataType="JSON",
dataTypeDisplay="JSON",
displayName="address",
children=[
Column(
@ -206,8 +206,8 @@ EXAMPLE_JSON_COL_3 = [
),
Column(
name="coordinates",
dataType="RECORD",
dataTypeDisplay="RECORD",
dataType="JSON",
dataTypeDisplay="JSON",
displayName="coordinates",
children=[
Column(
@ -233,13 +233,15 @@ EXAMPLE_JSON_COL_4 = deepcopy(EXAMPLE_JSON_COL_3)
EXAMPLE_JSON_COL_4[3].children[3].children = [
Column(
name="lat",
dataType="FLOAT",
dataTypeDisplay="FLOAT",
dataType="INT",
dataTypeDisplay="INT",
displayName="lat",
),
Column(
name="lon",
dataType="FLOAT",
dataTypeDisplay="FLOAT",
dataType="INT",
dataTypeDisplay="INT",
displayName="lon",
),
]
@ -437,13 +439,13 @@ class DatalakeUnitTest(TestCase):
sample_dict = {"name": "John", "age": 16, "sex": "M"}
exp_df_list = pd.json_normalize(
exp_df_list = pd.DataFrame.from_records(
[
{"name": "John", "age": 16, "sex": "M"},
{"name": "Milan", "age": 19, "sex": "M"},
]
)
exp_df_obj = pd.json_normalize(sample_dict)
exp_df_obj = pd.DataFrame.from_records([sample_dict])
actual_df_1 = JSONDataFrameReader.read_from_json(
key="file.json", json_text=EXAMPLE_JSON_TEST_1, decode=True
@ -467,7 +469,7 @@ class DatalakeUnitTest(TestCase):
key="file.json", json_text=EXAMPLE_JSON_TEST_4, decode=True
)[0]
actual_cols_4 = get_columns(actual_df_4)
self.assertFalse(actual_cols_4 == EXAMPLE_JSON_COL_4)
assert actual_cols_4 == EXAMPLE_JSON_COL_4
def test_avro_file_parse(self):
columns = AvroDataFrameReader.read_from_avro(AVRO_SCHEMA_FILE)

View File

@ -142,8 +142,8 @@ MOCK_CREATE_TABLE = CreateTableRequest(
Column(
name="address",
displayName="address",
dataType=DataType.RECORD,
dataTypeDisplay=DataType.RECORD.value,
dataType=DataType.JSON,
dataTypeDisplay=DataType.JSON.value,
children=[
Column(
name="line",

View File

@ -0,0 +1,172 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Test datalake utils
"""
from unittest import TestCase
from metadata.generated.schema.entity.data.table import Column
from metadata.utils.datalake.datalake_utils import (
construct_json_column_children,
unique_json_structure,
)
STRUCTURE = {
"a": "w",
"b": 4,
"c": {
"d": 2,
"e": 4,
"f": {
"g": 9,
"h": {"i": 6},
"n": {
"o": 10,
"p": 11,
},
},
"j": 7,
"k": 8,
},
}
class TestDatalakeUtils(TestCase):
"""class for datalake utils test"""
def test_unique_json_structure(self):
"""test unique json structure fn"""
sample_data = [
{"a": "x", "b": 1, "c": {"d": 2}},
{"a": "y", "b": 2, "c": {"e": 4, "f": {"g": 5, "h": {"i": 6}, "n": 5}}},
{"a": "z", "b": 3, "c": {"j": 7}},
{"a": "w", "b": 4, "c": {"k": 8, "f": {"g": 9, "n": {"o": 10, "p": 11}}}},
]
expected = STRUCTURE
actual = unique_json_structure(sample_data)
self.assertDictEqual(expected, actual)
def test_construct_column(self):
"""test construct column fn"""
expected = [
{
"dataTypeDisplay": "STRING",
"dataType": "STRING",
"name": "a",
"displayName": "a",
},
{
"dataTypeDisplay": "INT",
"dataType": "INT",
"name": "b",
"displayName": "b",
},
{
"dataTypeDisplay": "JSON",
"dataType": "JSON",
"name": "c",
"displayName": "c",
"children": [
{
"dataTypeDisplay": "INT",
"dataType": "INT",
"name": "d",
"displayName": "d",
},
{
"dataTypeDisplay": "INT",
"dataType": "INT",
"name": "e",
"displayName": "e",
},
{
"dataTypeDisplay": "JSON",
"dataType": "JSON",
"name": "f",
"displayName": "f",
"children": [
{
"dataTypeDisplay": "INT",
"dataType": "INT",
"name": "g",
"displayName": "g",
},
{
"dataTypeDisplay": "JSON",
"dataType": "JSON",
"name": "h",
"displayName": "h",
"children": [
{
"dataTypeDisplay": "INT",
"dataType": "INT",
"name": "i",
"displayName": "i",
}
],
},
{
"dataTypeDisplay": "JSON",
"dataType": "JSON",
"name": "n",
"displayName": "n",
"children": [
{
"dataTypeDisplay": "INT",
"dataType": "INT",
"name": "o",
"displayName": "o",
},
{
"dataTypeDisplay": "INT",
"dataType": "INT",
"name": "p",
"displayName": "p",
},
],
},
],
},
{
"dataTypeDisplay": "INT",
"dataType": "INT",
"name": "j",
"displayName": "j",
},
{
"dataTypeDisplay": "INT",
"dataType": "INT",
"name": "k",
"displayName": "k",
},
],
},
]
actual = construct_json_column_children(STRUCTURE)
for el in zip(expected, actual):
self.assertDictEqual(el[0], el[1])
def test_create_column_object(self):
"""test create column object fn"""
formatted_column = construct_json_column_children(STRUCTURE)
column = {
"dataTypeDisplay": "STRING",
"dataType": "STRING",
"name": "a",
"displayName": "a",
"children": formatted_column,
}
column_obj = Column(**column)
assert len(column_obj.children) == 3