Fix for profiler: modified filter patterns and added error handling (#6608)

This commit is contained in:
Ayush Shah 2022-08-08 10:43:17 +05:30 committed by GitHub
parent a65998c7bc
commit a6db2e8a84
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 189 additions and 121 deletions

View File

@ -17,8 +17,16 @@
"$ref": "#/definitions/profilerConfigType",
"default": "Profiler"
},
"fqnFilterPattern": {
"description": "Regex to only fetch tables with FQN matching the pattern.",
"schemaFilterPattern": {
"description": "Regex to only fetch tables or databases that matches the pattern.",
"$ref": "../type/filterPattern.json#/definitions/filterPattern"
},
"tableFilterPattern": {
"description": "Regex exclude tables or databases that matches the pattern.",
"$ref": "../type/filterPattern.json#/definitions/filterPattern"
},
"databaseFilterPattern": {
"description": "Regex to only fetch databases that matches the pattern.",
"$ref": "../type/filterPattern.json#/definitions/filterPattern"
},
"generateSampleData": {

View File

@ -122,7 +122,7 @@ services:
ingestion:
build:
context: ../../.
dockerfile: ingestion/Dockerfile
dockerfile: ingestion/Dockerfile_local
args:
INGESTION_DEPENDENCY: ${INGESTION_DEPENDENCY:-all}
container_name: openmetadata_ingestion

View File

@ -25,13 +25,12 @@ COPY openmetadata-airflow-apis /openmetadata-airflow-apis
RUN pip install "."
FROM apis as ingestion
WORKDIR /ingestion
COPY ingestion /ingestion
ARG INGESTION_DEPENDENCY=all
RUN pip install ".[${INGESTION_DEPENDENCY}]"
RUN pip install --upgrade ".[${INGESTION_DEPENDENCY}]"
RUN airflow db init
RUN cp -r /ingestion/airflow.cfg /airflow/airflow.cfg

View File

@ -0,0 +1,40 @@
FROM python:3.9-slim as base
ENV AIRFLOW_HOME=/airflow
RUN apt-get update && \
apt-get install -y gcc libsasl2-modules libsasl2-dev build-essential libssl-dev libffi-dev librdkafka-dev unixodbc-dev python3.9-dev openjdk-11-jre unixodbc freetds-dev freetds-bin tdsodbc libevent-dev wget openssl --no-install-recommends && \
rm -rf /var/lib/apt/lists/*
# Manually fix security vulnerability from curl
# - https://security.snyk.io/vuln/SNYK-DEBIAN11-CURL-2936229
# Add it back to the usual apt-get install once a fix for Debian is released
RUN wget https://curl.se/download/curl-7.84.0.tar.gz && \
tar -xvf curl-7.84.0.tar.gz && cd curl-7.84.0 && \
./configure --with-openssl && make && make install
FROM base as airflow
ENV AIRFLOW_VERSION=2.3.3
ENV CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-3.9.txt"
# Add docker provider for the DockerOperator
RUN pip install "apache-airflow[docker]==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
FROM airflow as apis
WORKDIR /openmetadata-airflow-apis
COPY openmetadata-airflow-apis /openmetadata-airflow-apis
RUN pip install "." "openmetadata-ingestion[all]"
RUN pip uninstall openmetadata-ingestion -y
FROM apis as ingestion
WORKDIR /ingestion
COPY ingestion /ingestion
ARG INGESTION_DEPENDENCY=all
RUN pip install --upgrade ".[${INGESTION_DEPENDENCY}]"
RUN airflow db init
RUN cp -r /ingestion/airflow.cfg /airflow/airflow.cfg
RUN chmod 755 ingestion_dependency.sh
EXPOSE 8080
CMD [ "./ingestion_dependency.sh" ]

View File

@ -13,9 +13,9 @@ source:
config:
type: Profiler
generateSampleData: true
fqnFilterPattern:
schemaFilterPattern:
includes:
- local_mysql.openmetadata_db*
- openmetadata_db*
processor:
type: "orm-profiler"

View File

@ -16,6 +16,7 @@ Workflow definition for the ORM Profiler.
- How to specify the entities to run
- How to define metrics & tests
"""
import traceback
from copy import deepcopy
from typing import Iterable, List
@ -50,7 +51,7 @@ from metadata.utils.class_helper import (
get_service_type_from_source_type,
)
from metadata.utils.connections import get_connection, test_connection
from metadata.utils.filters import filter_by_fqn
from metadata.utils.filters import filter_by_fqn, filter_by_schema, filter_by_table
from metadata.utils.logger import profiler_logger
logger = profiler_logger()
@ -123,18 +124,31 @@ class ProfilerWorkflow:
We will update the status on the SQLSource Status.
"""
for table in tables:
try:
if filter_by_schema(
self.source_config.schemaFilterPattern,
table.databaseSchema.name,
):
self.source_status.filter(
table.databaseSchema.fullyQualifiedName,
"Schema pattern not allowed",
)
continue
if filter_by_table(
self.source_config.tableFilterPattern,
table.name.__root__,
):
self.source_status.filter(
table.fullyQualifiedName.__root__, "Table pattern not allowed"
)
continue
if filter_by_fqn(
fqn_filter_pattern=self.source_config.fqnFilterPattern,
fqn=table.fullyQualifiedName.__root__,
):
self.source_status.filter(
table.fullyQualifiedName.__root__, "Schema pattern not allowed"
)
continue
self.source_status.scanned(table.fullyQualifiedName.__root__)
yield table
self.source_status.scanned(table.fullyQualifiedName.__root__)
yield table
except Exception as err: # pylint: disable=broad-except
self.source_status.filter(table.fullyQualifiedName.__root__, f"{err}")
logger.error(err)
logger.debug(traceback.format_exc())
def create_processor(self, service_connection_config):
self.processor_interface: InterfaceProtocol = SQAProfilerInterface(
@ -201,38 +215,48 @@ class ProfilerWorkflow:
yield from self.filter_entities(all_tables)
def copy_service_config(self, database) -> None:
copy_service_connection_config = deepcopy(
self.config.source.serviceConnection.__root__.config
)
if hasattr(
self.config.source.serviceConnection.__root__.config,
"supportsDatabase",
):
if hasattr(
self.config.source.serviceConnection.__root__.config, "database"
):
copy_service_connection_config.database = database.name.__root__
if hasattr(self.config.source.serviceConnection.__root__.config, "catalog"):
copy_service_connection_config.catalog = database.name.__root__
self.create_processor(copy_service_connection_config)
def execute(self):
"""
Run the profiling and tests
"""
copy_service_connection_config = deepcopy(
self.config.source.serviceConnection.__root__.config
)
for database in self.get_database_entities():
if hasattr(
self.config.source.serviceConnection.__root__.config, "supportsDatabase"
):
if hasattr(
self.config.source.serviceConnection.__root__.config, "database"
):
copy_service_connection_config.database = database.name.__root__
if hasattr(
self.config.source.serviceConnection.__root__.config, "catalog"
):
copy_service_connection_config.catalog = database.name.__root__
try:
self.copy_service_config(database)
self.create_processor(copy_service_connection_config)
for entity in self.get_table_entities(database=database):
try:
profile_and_tests: ProfilerResponse = self.processor.process(
record=entity,
generate_sample_data=self.source_config.generateSampleData,
)
for entity in self.get_table_entities(database=database):
profile_and_tests: ProfilerResponse = self.processor.process(
record=entity,
generate_sample_data=self.source_config.generateSampleData,
)
if hasattr(self, "sink"):
self.sink.write_record(profile_and_tests)
self.processor_interface.session.close()
if hasattr(self, "sink"):
self.sink.write_record(profile_and_tests)
except Exception as err: # pylint: disable=broad-except
logger.error(err)
logger.debug(traceback.format_exc())
self.processor_interface.session.close()
except Exception as err: # pylint: disable=broad-except
logger.error(err)
logger.debug(traceback.format_exc())
def print_status(self) -> int:
"""

View File

@ -163,7 +163,7 @@ class ProfilerWorkflowTest(TestCase):
# workflow_config["source"]["sourceConfig"]["config"].update(
# {
# "type": "Profiler",
# "fqnFilterPattern": {"includes": ["test_sqlite.main.main.users"]},
# "tableFilterPattern": {"includes": ["users"]},
# }
# )
# workflow_config["processor"] = {
@ -233,7 +233,7 @@ class ProfilerWorkflowTest(TestCase):
{
"type": "Profiler",
"profileSample": 50,
"fqnFilterPattern": {"includes": ["test_sqlite.main.main.new_users"]},
"tableFilterPattern": {"includes": ["new_users"]},
}
)
workflow_config["processor"] = {"type": "orm-profiler", "config": {}}

View File

@ -128,17 +128,17 @@ def test_filter_entities(mocked_method):
# We can exclude based on the schema name
exclude_config = deepcopy(config)
exclude_config["source"]["sourceConfig"]["config"]["fqnFilterPattern"] = {
"excludes": ["service*"]
exclude_config["source"]["sourceConfig"]["config"]["schemaFilterPattern"] = {
"excludes": ["another_schema"]
}
exclude_workflow = ProfilerWorkflow.create(exclude_config)
mocked_method.assert_called()
assert len(list(exclude_workflow.filter_entities(all_tables))) == 0
assert len(list(exclude_workflow.filter_entities(all_tables))) == 2
exclude_config = deepcopy(config)
exclude_config["source"]["sourceConfig"]["config"]["fqnFilterPattern"] = {
"excludes": ["service.db.another*"]
exclude_config["source"]["sourceConfig"]["config"]["schemaFilterPattern"] = {
"excludes": ["another*"]
}
exclude_workflow = ProfilerWorkflow.create(exclude_config)
@ -146,8 +146,8 @@ def test_filter_entities(mocked_method):
assert len(list(exclude_workflow.filter_entities(all_tables))) == 2
include_config = deepcopy(config)
include_config["source"]["sourceConfig"]["config"]["fqnFilterPattern"] = {
"includes": ["service*"]
include_config["source"]["sourceConfig"]["config"]["databaseFilterPattern"] = {
"includes": ["db*"]
}
include_workflow = ProfilerWorkflow.create(include_config)

View File

@ -17,8 +17,16 @@
"$ref": "#/definitions/profilerConfigType",
"default": "Profiler"
},
"fqnFilterPattern": {
"description": "Regex to only fetch tables with FQN matching the pattern.",
"schemaFilterPattern": {
"description": "Regex to only fetch tables or databases that matches the pattern.",
"$ref": "../type/filterPattern.json#/definitions/filterPattern"
},
"tableFilterPattern": {
"description": "Regex exclude tables or databases that matches the pattern.",
"$ref": "../type/filterPattern.json#/definitions/filterPattern"
},
"databaseFilterPattern": {
"description": "Regex to only fetch databases that matches the pattern.",
"$ref": "../type/filterPattern.json#/definitions/filterPattern"
},
"generateSampleData": {

View File

@ -10,7 +10,9 @@ slug: /main-concepts/metadata-standard/schemas/metadataingestion/databaseservice
## Properties
- **`type`**: Pipeline type. Refer to *#/definitions/profilerConfigType*. Default: `Profiler`.
- **`fqnFilterPattern`**: Regex to only fetch tables with FQN matching the pattern. Refer to *../type/filterPattern.json#/definitions/filterPattern*.
- **`databaseFilterPattern`**: Regex to only fetch database with database name matching the pattern. Refer to *../type/filterPattern.json#/definitions/filterPattern*.
- **`schemaFilterPattern`**: Regex to only fetch schema with schema name matching the pattern. Refer to *../type/filterPattern.json#/definitions/filterPattern*.
- **`tableFilterPattern`**: Regex to only fetch tables with table name matching the pattern. Refer to *../type/filterPattern.json#/definitions/filterPattern*.
- **`generateSampleData`** *(boolean)*: Option to turn on/off generating sample data. Default: `True`.
## Definitions

View File

@ -13,7 +13,9 @@ source:
sourceConfig:
config:
type: Profiler
fqnFilterPattern: <table FQN filtering regex>
databaseFilterPattern: <table FQN filtering regex>
schemaFilterPattern: <table FQN filtering regex>
tableFilterPattern: <table FQN filtering regex>
processor:
type: orm-profiler
config: {}

View File

@ -12,7 +12,9 @@ source:
sourceConfig:
config:
type: Profiler
fqnFilterPattern: <table FQN filtering regex>
databaseFilterPattern: <table FQN filtering regex>
schemaFilterPattern: <table FQN filtering regex>
tableFilterPattern: <table FQN filtering regex>
processor:
type: orm-profiler
config: {}

View File

@ -18,7 +18,9 @@ source:
clientX509CertUrl: <client certificate URL>
sourceConfig:
config:
fqnFilterPattern: <table FQN filtering regex>
databaseFilterPattern: <table FQN filtering regex>
schemaFilterPattern: <table FQN filtering regex>
tableFilterPattern: <table FQN filtering regex>
processor:
type: orm-profiler
config: {}

View File

@ -10,7 +10,9 @@ source:
sourceConfig:
config:
type: Profiler
fqnFilterPattern: <table FQN filtering regex>
databaseFilterPattern: <table FQN filtering regex>
schemaFilterPattern: <table FQN filtering regex>
tableFilterPattern: <table FQN filtering regex>
processor:
type: orm-profiler
config: {}

View File

@ -11,7 +11,9 @@ source:
sourceConfig:
config:
type: Profiler
fqnFilterPattern: <table FQN filtering regex>
databaseFilterPattern: <table FQN filtering regex>
schemaFilterPattern: <table FQN filtering regex>
tableFilterPattern: <table FQN filtering regex>
processor:
type: orm-profiler
config: {}

View File

@ -10,7 +10,9 @@ source:
sourceConfig:
config:
type: Profiler
fqnFilterPattern: <table FQN filtering regex>
databaseFilterPattern: <table FQN filtering regex>
schemaFilterPattern: <table FQN filtering regex>
tableFilterPattern: <table FQN filtering regex>
processor:
type: orm-profiler
config: {}

View File

@ -10,7 +10,9 @@ source:
sourceConfig:
config:
type: Profiler
fqnFilterPattern: <table FQN filtering regex>
databaseFilterPattern: <table FQN filtering regex>
schemaFilterPattern: <table FQN filtering regex>
tableFilterPattern: <table FQN filtering regex>
processor:
type: orm-profiler
config: {}

View File

@ -12,7 +12,9 @@ source:
sourceConfig:
config:
type: Profiler
fqnFilterPattern: <table FQN filtering regex>
databaseFilterPattern: <table FQN filtering regex>
schemaFilterPattern: <table FQN filtering regex>
tableFilterPattern: <table FQN filtering regex>
processor:
type: orm-profiler
config: {}

View File

@ -11,7 +11,9 @@ source:
sourceConfig:
config:
type: Profiler
fqnFilterPattern: <table FQN filtering regex>
databaseFilterPattern: <table FQN filtering regex>
schemaFilterPattern: <table FQN filtering regex>
tableFilterPattern: <table FQN filtering regex>
processor:
type: orm-profiler
config: {}

View File

@ -12,7 +12,9 @@ source:
sourceConfig:
config:
type: Profiler
fqnFilterPattern: <table FQN filtering regex>
databaseFilterPattern: <table FQN filtering regex>
schemaFilterPattern: <table FQN filtering regex>
tableFilterPattern: <table FQN filtering regex>
processor:
type: orm-profiler
config: {}

View File

@ -10,7 +10,9 @@ source:
sourceConfig:
config:
type: Profiler
fqnFilterPattern: <table FQN filtering regex>
databaseFilterPattern: <table FQN filtering regex>
schemaFilterPattern: <table FQN filtering regex>
tableFilterPattern: <table FQN filtering regex>
processor:
type: orm-profiler
config: {}

View File

@ -15,7 +15,9 @@ source:
sourceConfig:
config:
type: Profiler
fqnFilterPattern: <table FQN filtering regex>
databaseFilterPattern: <table FQN filtering regex>
schemaFilterPattern: <table FQN filtering regex>
tableFilterPattern: <table FQN filtering regex>
processor:
type: orm-profiler
config: {}

View File

@ -10,7 +10,9 @@ source:
sourceConfig:
config:
type: Profiler
fqnFilterPattern: <table FQN filtering regex>
databaseFilterPattern: <table FQN filtering regex>
schemaFilterPattern: <table FQN filtering regex>
tableFilterPattern: <table FQN filtering regex>
processor:
type: orm-profiler
config: {}

View File

@ -127,9 +127,6 @@ const AddIngestion = ({
(data?.sourceConfig.config as ConfigClass)?.pipelineFilterPattern
)
);
const [showFqnFilter, setShowFqnFilter] = useState(
!isUndefined((data?.sourceConfig.config as ConfigClass)?.fqnFilterPattern)
);
const configData = useMemo(
() =>
(data?.sourceConfig.config as DatabaseServiceMetadataPipelineClass)
@ -206,10 +203,6 @@ const AddIngestion = ({
(data?.sourceConfig.config as ConfigClass)?.pipelineFilterPattern ??
INITIAL_FILTER_PATTERN
);
const [fqnFilterPattern, setFqnFilterPattern] = useState<FilterPattern>(
(data?.sourceConfig.config as ConfigClass)?.fqnFilterPattern ??
INITIAL_FILTER_PATTERN
);
const [queryLogDuration, setQueryLogDuration] = useState<number>(
(data?.sourceConfig.config as ConfigClass)?.queryLogDuration ?? 1
@ -261,10 +254,6 @@ const AddIngestion = ({
case FilterPatternEnum.CHART:
setChartFilterPattern({ ...chartFilterPattern, includes: value });
break;
case FilterPatternEnum.FQN:
setFqnFilterPattern({ ...fqnFilterPattern, includes: value });
break;
case FilterPatternEnum.PIPELINE:
setPipelineFilterPattern({ ...pipelineFilterPattern, includes: value });
@ -300,10 +289,6 @@ const AddIngestion = ({
case FilterPatternEnum.CHART:
setChartFilterPattern({ ...chartFilterPattern, excludes: value });
break;
case FilterPatternEnum.FQN:
setFqnFilterPattern({ ...fqnFilterPattern, excludes: value });
break;
case FilterPatternEnum.PIPELINE:
setPipelineFilterPattern({ ...pipelineFilterPattern, excludes: value });
@ -337,10 +322,6 @@ const AddIngestion = ({
case FilterPatternEnum.CHART:
setShowChartFilter(value);
break;
case FilterPatternEnum.FQN:
setShowFqnFilter(value);
break;
case FilterPatternEnum.PIPELINE:
setShowPipelineFilter(value);
@ -469,10 +450,19 @@ const AddIngestion = ({
}
case PipelineType.Profiler: {
return {
fqnFilterPattern: getFilterPatternData(
fqnFilterPattern,
showFqnFilter
databaseFilterPattern: getFilterPatternData(
databaseFilterPattern,
showDatabaseFilter
),
schemaFilterPattern: getFilterPatternData(
schemaFilterPattern,
showSchemaFilter
),
tableFilterPattern: getFilterPatternData(
tableFilterPattern,
showTableFilter
),
type: profilerIngestionType,
generateSampleData: ingestSampleData,
profileSample: profileSample,
@ -624,7 +614,6 @@ const AddIngestion = ({
databaseServiceName={databaseServiceName}
description={description}
enableDebugLog={enableDebugLog}
fqnFilterPattern={fqnFilterPattern}
getExcludeValue={getExcludeValue}
getIncludeValue={getIncludeValue}
handleDatasetServiceName={(val) => setDatabaseServiceName(val)}
@ -658,7 +647,6 @@ const AddIngestion = ({
showChartFilter={showChartFilter}
showDashboardFilter={showDashboardFilter}
showDatabaseFilter={showDatabaseFilter}
showFqnFilter={showFqnFilter}
showPipelineFilter={showPipelineFilter}
showSchemaFilter={showSchemaFilter}
showTableFilter={showTableFilter}

View File

@ -58,10 +58,6 @@ const mockConfigureIngestion: ConfigureIngestionProps = {
includes: [],
excludes: [],
},
fqnFilterPattern: {
includes: [],
excludes: [],
},
includeLineage: false,
includeView: false,
includeTags: false,
@ -77,7 +73,6 @@ const mockConfigureIngestion: ConfigureIngestionProps = {
showTopicFilter: false,
showChartFilter: false,
showPipelineFilter: false,
showFqnFilter: false,
handleIncludeLineage: jest.fn(),
handleIncludeView: jest.fn(),
handleIncludeTags: jest.fn(),

View File

@ -36,7 +36,6 @@ const ConfigureIngestion = ({
topicFilterPattern,
chartFilterPattern,
pipelineFilterPattern,
fqnFilterPattern,
includeLineage,
includeView,
includeTags,
@ -51,7 +50,6 @@ const ConfigureIngestion = ({
showTopicFilter,
showChartFilter,
showPipelineFilter,
showFqnFilter,
queryLogDuration,
stageFileLocation,
threadCount,
@ -402,24 +400,6 @@ const ConfigureIngestion = ({
}
};
const getProfilerFilterPatternField = () => {
return (
<Fragment>
<FilterPattern
checked={showFqnFilter}
excludePattern={fqnFilterPattern?.excludes ?? []}
getExcludeValue={getExcludeValue}
getIncludeValue={getIncludeValue}
handleChecked={(value) =>
handleShowFilter(value, FilterPatternEnum.FQN)
}
includePattern={fqnFilterPattern?.includes ?? []}
type={FilterPatternEnum.FQN}
/>
</Fragment>
);
};
const getMetadataFields = () => {
return (
<>
@ -521,7 +501,7 @@ const ConfigureIngestion = ({
{getSeparator('')}
</Field>
</div>
<div>{getProfilerFilterPatternField()}</div>
<div>{getMetadataFilterPatternField()}</div>
{getSeparator('')}
{getProfileSample()}
{getSeparator('')}

View File

@ -66,7 +66,6 @@ export interface ConfigureIngestionProps {
topicFilterPattern: FilterPattern;
chartFilterPattern: FilterPattern;
pipelineFilterPattern: FilterPattern;
fqnFilterPattern: FilterPattern;
includeLineage: boolean;
includeView: boolean;
includeTags: boolean;
@ -82,7 +81,6 @@ export interface ConfigureIngestionProps {
showTopicFilter: boolean;
showChartFilter: boolean;
showPipelineFilter: boolean;
showFqnFilter: boolean;
threadCount: number;
queryLogDuration: number;
stageFileLocation: string;