diff --git a/catalog-rest-service/src/main/resources/json/schema/metadataIngestion/databaseServiceProfilerPipeline.json b/catalog-rest-service/src/main/resources/json/schema/metadataIngestion/databaseServiceProfilerPipeline.json index e572d9ee2a2..5be5c2b354c 100644 --- a/catalog-rest-service/src/main/resources/json/schema/metadataIngestion/databaseServiceProfilerPipeline.json +++ b/catalog-rest-service/src/main/resources/json/schema/metadataIngestion/databaseServiceProfilerPipeline.json @@ -17,8 +17,16 @@ "$ref": "#/definitions/profilerConfigType", "default": "Profiler" }, - "fqnFilterPattern": { - "description": "Regex to only fetch tables with FQN matching the pattern.", + "schemaFilterPattern": { + "description": "Regex to only fetch tables or databases that matches the pattern.", + "$ref": "../type/filterPattern.json#/definitions/filterPattern" + }, + "tableFilterPattern": { + "description": "Regex exclude tables or databases that matches the pattern.", + "$ref": "../type/filterPattern.json#/definitions/filterPattern" + }, + "databaseFilterPattern": { + "description": "Regex to only fetch databases that matches the pattern.", "$ref": "../type/filterPattern.json#/definitions/filterPattern" }, "generateSampleData": { diff --git a/docker/local-metadata/docker-compose.yml b/docker/local-metadata/docker-compose.yml index 544af38e91a..a6a3355fb55 100644 --- a/docker/local-metadata/docker-compose.yml +++ b/docker/local-metadata/docker-compose.yml @@ -122,7 +122,7 @@ services: ingestion: build: context: ../../. - dockerfile: ingestion/Dockerfile + dockerfile: ingestion/Dockerfile_local args: INGESTION_DEPENDENCY: ${INGESTION_DEPENDENCY:-all} container_name: openmetadata_ingestion diff --git a/ingestion/Dockerfile b/ingestion/Dockerfile index f9ad38cdb74..3162d4d8d7d 100644 --- a/ingestion/Dockerfile +++ b/ingestion/Dockerfile @@ -25,13 +25,12 @@ COPY openmetadata-airflow-apis /openmetadata-airflow-apis RUN pip install "." - FROM apis as ingestion WORKDIR /ingestion COPY ingestion /ingestion ARG INGESTION_DEPENDENCY=all -RUN pip install ".[${INGESTION_DEPENDENCY}]" +RUN pip install --upgrade ".[${INGESTION_DEPENDENCY}]" RUN airflow db init RUN cp -r /ingestion/airflow.cfg /airflow/airflow.cfg diff --git a/ingestion/Dockerfile_local b/ingestion/Dockerfile_local new file mode 100644 index 00000000000..355eac2c43b --- /dev/null +++ b/ingestion/Dockerfile_local @@ -0,0 +1,40 @@ +FROM python:3.9-slim as base +ENV AIRFLOW_HOME=/airflow +RUN apt-get update && \ + apt-get install -y gcc libsasl2-modules libsasl2-dev build-essential libssl-dev libffi-dev librdkafka-dev unixodbc-dev python3.9-dev openjdk-11-jre unixodbc freetds-dev freetds-bin tdsodbc libevent-dev wget openssl --no-install-recommends && \ + rm -rf /var/lib/apt/lists/* + +# Manually fix security vulnerability from curl +# - https://security.snyk.io/vuln/SNYK-DEBIAN11-CURL-2936229 +# Add it back to the usual apt-get install once a fix for Debian is released +RUN wget https://curl.se/download/curl-7.84.0.tar.gz && \ + tar -xvf curl-7.84.0.tar.gz && cd curl-7.84.0 && \ + ./configure --with-openssl && make && make install + + +FROM base as airflow +ENV AIRFLOW_VERSION=2.3.3 +ENV CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-3.9.txt" +# Add docker provider for the DockerOperator +RUN pip install "apache-airflow[docker]==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}" + + +FROM airflow as apis +WORKDIR /openmetadata-airflow-apis +COPY openmetadata-airflow-apis /openmetadata-airflow-apis + +RUN pip install "." "openmetadata-ingestion[all]" +RUN pip uninstall openmetadata-ingestion -y + +FROM apis as ingestion +WORKDIR /ingestion +COPY ingestion /ingestion + +ARG INGESTION_DEPENDENCY=all +RUN pip install --upgrade ".[${INGESTION_DEPENDENCY}]" + +RUN airflow db init +RUN cp -r /ingestion/airflow.cfg /airflow/airflow.cfg +RUN chmod 755 ingestion_dependency.sh +EXPOSE 8080 +CMD [ "./ingestion_dependency.sh" ] diff --git a/ingestion/examples/workflows/mysql_profiler.yaml b/ingestion/examples/workflows/mysql_profiler.yaml index d209706e3ef..861527cfb4e 100644 --- a/ingestion/examples/workflows/mysql_profiler.yaml +++ b/ingestion/examples/workflows/mysql_profiler.yaml @@ -13,9 +13,9 @@ source: config: type: Profiler generateSampleData: true - fqnFilterPattern: + schemaFilterPattern: includes: - - local_mysql.openmetadata_db* + - openmetadata_db* processor: type: "orm-profiler" diff --git a/ingestion/src/metadata/orm_profiler/api/workflow.py b/ingestion/src/metadata/orm_profiler/api/workflow.py index de2b4033a82..69506adec35 100644 --- a/ingestion/src/metadata/orm_profiler/api/workflow.py +++ b/ingestion/src/metadata/orm_profiler/api/workflow.py @@ -16,6 +16,7 @@ Workflow definition for the ORM Profiler. - How to specify the entities to run - How to define metrics & tests """ +import traceback from copy import deepcopy from typing import Iterable, List @@ -50,7 +51,7 @@ from metadata.utils.class_helper import ( get_service_type_from_source_type, ) from metadata.utils.connections import get_connection, test_connection -from metadata.utils.filters import filter_by_fqn +from metadata.utils.filters import filter_by_fqn, filter_by_schema, filter_by_table from metadata.utils.logger import profiler_logger logger = profiler_logger() @@ -123,18 +124,31 @@ class ProfilerWorkflow: We will update the status on the SQLSource Status. """ for table in tables: + try: + if filter_by_schema( + self.source_config.schemaFilterPattern, + table.databaseSchema.name, + ): + self.source_status.filter( + table.databaseSchema.fullyQualifiedName, + "Schema pattern not allowed", + ) + continue + if filter_by_table( + self.source_config.tableFilterPattern, + table.name.__root__, + ): + self.source_status.filter( + table.fullyQualifiedName.__root__, "Table pattern not allowed" + ) + continue - if filter_by_fqn( - fqn_filter_pattern=self.source_config.fqnFilterPattern, - fqn=table.fullyQualifiedName.__root__, - ): - self.source_status.filter( - table.fullyQualifiedName.__root__, "Schema pattern not allowed" - ) - continue - - self.source_status.scanned(table.fullyQualifiedName.__root__) - yield table + self.source_status.scanned(table.fullyQualifiedName.__root__) + yield table + except Exception as err: # pylint: disable=broad-except + self.source_status.filter(table.fullyQualifiedName.__root__, f"{err}") + logger.error(err) + logger.debug(traceback.format_exc()) def create_processor(self, service_connection_config): self.processor_interface: InterfaceProtocol = SQAProfilerInterface( @@ -201,38 +215,48 @@ class ProfilerWorkflow: yield from self.filter_entities(all_tables) + def copy_service_config(self, database) -> None: + copy_service_connection_config = deepcopy( + self.config.source.serviceConnection.__root__.config + ) + if hasattr( + self.config.source.serviceConnection.__root__.config, + "supportsDatabase", + ): + if hasattr( + self.config.source.serviceConnection.__root__.config, "database" + ): + copy_service_connection_config.database = database.name.__root__ + if hasattr(self.config.source.serviceConnection.__root__.config, "catalog"): + copy_service_connection_config.catalog = database.name.__root__ + + self.create_processor(copy_service_connection_config) + def execute(self): """ Run the profiling and tests """ - copy_service_connection_config = deepcopy( - self.config.source.serviceConnection.__root__.config - ) + for database in self.get_database_entities(): - if hasattr( - self.config.source.serviceConnection.__root__.config, "supportsDatabase" - ): - if hasattr( - self.config.source.serviceConnection.__root__.config, "database" - ): - copy_service_connection_config.database = database.name.__root__ - if hasattr( - self.config.source.serviceConnection.__root__.config, "catalog" - ): - copy_service_connection_config.catalog = database.name.__root__ + try: + self.copy_service_config(database) - self.create_processor(copy_service_connection_config) + for entity in self.get_table_entities(database=database): + try: + profile_and_tests: ProfilerResponse = self.processor.process( + record=entity, + generate_sample_data=self.source_config.generateSampleData, + ) - for entity in self.get_table_entities(database=database): - profile_and_tests: ProfilerResponse = self.processor.process( - record=entity, - generate_sample_data=self.source_config.generateSampleData, - ) - - if hasattr(self, "sink"): - self.sink.write_record(profile_and_tests) - - self.processor_interface.session.close() + if hasattr(self, "sink"): + self.sink.write_record(profile_and_tests) + except Exception as err: # pylint: disable=broad-except + logger.error(err) + logger.debug(traceback.format_exc()) + self.processor_interface.session.close() + except Exception as err: # pylint: disable=broad-except + logger.error(err) + logger.debug(traceback.format_exc()) def print_status(self) -> int: """ diff --git a/ingestion/tests/integration/orm_profiler/test_orm_profiler.py b/ingestion/tests/integration/orm_profiler/test_orm_profiler.py index 661c79aaa9f..041b3032914 100644 --- a/ingestion/tests/integration/orm_profiler/test_orm_profiler.py +++ b/ingestion/tests/integration/orm_profiler/test_orm_profiler.py @@ -163,7 +163,7 @@ class ProfilerWorkflowTest(TestCase): # workflow_config["source"]["sourceConfig"]["config"].update( # { # "type": "Profiler", - # "fqnFilterPattern": {"includes": ["test_sqlite.main.main.users"]}, + # "tableFilterPattern": {"includes": ["users"]}, # } # ) # workflow_config["processor"] = { @@ -233,7 +233,7 @@ class ProfilerWorkflowTest(TestCase): { "type": "Profiler", "profileSample": 50, - "fqnFilterPattern": {"includes": ["test_sqlite.main.main.new_users"]}, + "tableFilterPattern": {"includes": ["new_users"]}, } ) workflow_config["processor"] = {"type": "orm-profiler", "config": {}} diff --git a/ingestion/tests/unit/profiler/test_workflow.py b/ingestion/tests/unit/profiler/test_workflow.py index 883df17a66f..4961a4279e8 100644 --- a/ingestion/tests/unit/profiler/test_workflow.py +++ b/ingestion/tests/unit/profiler/test_workflow.py @@ -128,17 +128,17 @@ def test_filter_entities(mocked_method): # We can exclude based on the schema name exclude_config = deepcopy(config) - exclude_config["source"]["sourceConfig"]["config"]["fqnFilterPattern"] = { - "excludes": ["service*"] + exclude_config["source"]["sourceConfig"]["config"]["schemaFilterPattern"] = { + "excludes": ["another_schema"] } exclude_workflow = ProfilerWorkflow.create(exclude_config) mocked_method.assert_called() - assert len(list(exclude_workflow.filter_entities(all_tables))) == 0 + assert len(list(exclude_workflow.filter_entities(all_tables))) == 2 exclude_config = deepcopy(config) - exclude_config["source"]["sourceConfig"]["config"]["fqnFilterPattern"] = { - "excludes": ["service.db.another*"] + exclude_config["source"]["sourceConfig"]["config"]["schemaFilterPattern"] = { + "excludes": ["another*"] } exclude_workflow = ProfilerWorkflow.create(exclude_config) @@ -146,8 +146,8 @@ def test_filter_entities(mocked_method): assert len(list(exclude_workflow.filter_entities(all_tables))) == 2 include_config = deepcopy(config) - include_config["source"]["sourceConfig"]["config"]["fqnFilterPattern"] = { - "includes": ["service*"] + include_config["source"]["sourceConfig"]["config"]["databaseFilterPattern"] = { + "includes": ["db*"] } include_workflow = ProfilerWorkflow.create(include_config) diff --git a/openmetadata-core/src/main/resources/json/schema/metadataIngestion/databaseServiceProfilerPipeline.json b/openmetadata-core/src/main/resources/json/schema/metadataIngestion/databaseServiceProfilerPipeline.json index 559fcf36b08..76091f0c937 100644 --- a/openmetadata-core/src/main/resources/json/schema/metadataIngestion/databaseServiceProfilerPipeline.json +++ b/openmetadata-core/src/main/resources/json/schema/metadataIngestion/databaseServiceProfilerPipeline.json @@ -17,8 +17,16 @@ "$ref": "#/definitions/profilerConfigType", "default": "Profiler" }, - "fqnFilterPattern": { - "description": "Regex to only fetch tables with FQN matching the pattern.", + "schemaFilterPattern": { + "description": "Regex to only fetch tables or databases that matches the pattern.", + "$ref": "../type/filterPattern.json#/definitions/filterPattern" + }, + "tableFilterPattern": { + "description": "Regex exclude tables or databases that matches the pattern.", + "$ref": "../type/filterPattern.json#/definitions/filterPattern" + }, + "databaseFilterPattern": { + "description": "Regex to only fetch databases that matches the pattern.", "$ref": "../type/filterPattern.json#/definitions/filterPattern" }, "generateSampleData": { diff --git a/openmetadata-docs/content/main-concepts/metadata-standard/schemas/metadataIngestion/databaseServiceProfilerPipeline.md b/openmetadata-docs/content/main-concepts/metadata-standard/schemas/metadataIngestion/databaseServiceProfilerPipeline.md index fbedb3e9cb1..7508f372662 100644 --- a/openmetadata-docs/content/main-concepts/metadata-standard/schemas/metadataIngestion/databaseServiceProfilerPipeline.md +++ b/openmetadata-docs/content/main-concepts/metadata-standard/schemas/metadataIngestion/databaseServiceProfilerPipeline.md @@ -10,7 +10,9 @@ slug: /main-concepts/metadata-standard/schemas/metadataingestion/databaseservice ## Properties - **`type`**: Pipeline type. Refer to *#/definitions/profilerConfigType*. Default: `Profiler`. -- **`fqnFilterPattern`**: Regex to only fetch tables with FQN matching the pattern. Refer to *../type/filterPattern.json#/definitions/filterPattern*. +- **`databaseFilterPattern`**: Regex to only fetch database with database name matching the pattern. Refer to *../type/filterPattern.json#/definitions/filterPattern*. +- **`schemaFilterPattern`**: Regex to only fetch schema with schema name matching the pattern. Refer to *../type/filterPattern.json#/definitions/filterPattern*. +- **`tableFilterPattern`**: Regex to only fetch tables with table name matching the pattern. Refer to *../type/filterPattern.json#/definitions/filterPattern*. - **`generateSampleData`** *(boolean)*: Option to turn on/off generating sample data. Default: `True`. ## Definitions diff --git a/openmetadata-docs/ingestion/connectors/athena/profiler.yaml b/openmetadata-docs/ingestion/connectors/athena/profiler.yaml index 1679dad73ef..40e4db30d57 100644 --- a/openmetadata-docs/ingestion/connectors/athena/profiler.yaml +++ b/openmetadata-docs/ingestion/connectors/athena/profiler.yaml @@ -13,7 +13,9 @@ source: sourceConfig: config: type: Profiler - fqnFilterPattern: