Fix #11659: Add support for filter patterns in dbt workflow (#12063)

This commit is contained in:
Mayur Singal 2023-06-26 11:30:35 +05:30 committed by GitHub
parent 18c8eb318e
commit a3fd6e9522
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 276 additions and 50 deletions

View File

@ -58,6 +58,22 @@ source:
# dbtUpdateDescriptions: true or false
# includeTags: true or false
# dbtClassificationName: dbtTags
# databaseFilterPattern:
# includes:
# - .*db.*
# excludes:
# - .*demo.*
# schemaFilterPattern:
# includes:
# - .*schema.*
# excludes:
# - .*demo.*
# tableFilterPattern:
# includes:
# - .*table.*
# excludes:
# - .*demo.*
sink:
type: metadata-rest
config: {}

View File

@ -14,10 +14,9 @@ Hosts the singledispatch to get DBT files
import json
import traceback
from functools import singledispatch
from typing import Any, Optional, Tuple
from typing import Optional, Tuple
import requests
from pydantic import BaseModel
from metadata.generated.schema.metadataIngestion.dbtconfig.dbtAzureConfig import (
DbtAzureConfig,
@ -37,27 +36,17 @@ from metadata.generated.schema.metadataIngestion.dbtconfig.dbtLocalConfig import
from metadata.generated.schema.metadataIngestion.dbtconfig.dbtS3Config import (
DbtS3Config,
)
from metadata.ingestion.source.database.dbt.constants import (
DBT_CATALOG_FILE_NAME,
DBT_MANIFEST_FILE_NAME,
DBT_RUN_RESULTS_FILE_NAME,
)
from metadata.ingestion.source.database.dbt.models import DbtFiles
from metadata.utils.credentials import set_google_credentials
from metadata.utils.logger import ometa_logger
logger = ometa_logger()
DBT_CATALOG_FILE_NAME = "catalog.json"
DBT_MANIFEST_FILE_NAME = "manifest.json"
DBT_RUN_RESULTS_FILE_NAME = "run_results.json"
class DbtFiles(BaseModel):
dbt_catalog: Optional[dict]
dbt_manifest: dict
dbt_run_results: Optional[dict]
class DbtObjects(BaseModel):
dbt_catalog: Optional[Any]
dbt_manifest: Any
dbt_run_results: Optional[Any]
class DBTConfigException(Exception):
"""

View File

@ -22,6 +22,7 @@ from metadata.generated.schema.api.tests.createTestCase import CreateTestCaseReq
from metadata.generated.schema.api.tests.createTestDefinition import (
CreateTestDefinitionRequest,
)
from metadata.generated.schema.metadataIngestion.dbtPipeline import DbtPipeline
from metadata.generated.schema.tests.basic import TestCaseResult
from metadata.ingestion.api.source import Source
from metadata.ingestion.api.topology_runner import TopologyRunnerMixin
@ -33,7 +34,14 @@ from metadata.ingestion.models.topology import (
create_source_context,
)
from metadata.ingestion.source.database.database_service import DataModelLink
from metadata.utils.dbt_config import DbtFiles, DbtObjects, get_dbt_details
from metadata.ingestion.source.database.dbt.dbt_config import get_dbt_details
from metadata.ingestion.source.database.dbt.models import (
DbtFiles,
DbtFilteredModel,
DbtObjects,
)
from metadata.utils import fqn
from metadata.utils.filters import filter_by_database, filter_by_schema, filter_by_table
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
@ -131,6 +139,7 @@ class DbtServiceSource(TopologyRunnerMixin, Source, ABC):
topology = DbtServiceTopology()
context = create_source_context(topology)
source_config: DbtPipeline
def remove_manifest_non_required_keys(self, manifest_dict: dict):
"""
@ -152,9 +161,7 @@ class DbtServiceSource(TopologyRunnerMixin, Source, ABC):
)
def get_dbt_files(self) -> DbtFiles:
dbt_files = get_dbt_details(
self.source_config.dbtConfigSource # pylint: disable=no-member
)
dbt_files = get_dbt_details(self.source_config.dbtConfigSource)
self.context.dbt_files = dbt_files
yield dbt_files
@ -246,3 +253,30 @@ class DbtServiceSource(TopologyRunnerMixin, Source, ABC):
"""
After test cases has been processed, add the tests results info
"""
def is_filtered(
self, database_name: str, schema_name: str, table_name: str
) -> DbtFilteredModel:
"""
Function used to identify the filtered models
"""
# pylint: disable=protected-access
model_fqn = fqn._build(str(database_name), str(schema_name), str(table_name))
is_filtered = False
reason = None
message = None
if filter_by_table(self.source_config.tableFilterPattern, table_name):
reason = "table"
is_filtered = True
if filter_by_schema(self.source_config.schemaFilterPattern, schema_name):
reason = "schema"
is_filtered = True
if filter_by_database(self.source_config.databaseFilterPattern, database_name):
reason = "database"
is_filtered = True
if is_filtered:
message = f"Model Filtered due to {reason} filter pattern"
return DbtFilteredModel(
is_filtered=is_filtered, message=message, model_fqn=model_fqn
)

View File

@ -312,6 +312,7 @@ class DbtSource(DbtServiceSource):
None,
)
# pylint: disable=too-many-locals
def yield_data_models(self, dbt_objects: DbtObjects) -> Iterable[DataModelLink]:
"""
Yield the data models
@ -359,6 +360,17 @@ class DbtSource(DbtServiceSource):
continue
model_name = get_dbt_model_name(manifest_node)
# Filter the dbt models based on filter patterns
filter_model = self.is_filtered(
database_name=get_corrected_name(manifest_node.database),
schema_name=get_corrected_name(manifest_node.schema_),
table_name=model_name,
)
if filter_model.is_filtered:
self.status.filter(filter_model.model_fqn, filter_model.message)
continue
logger.debug(f"Processing DBT node: {model_name}")
catalog_node = None
@ -387,6 +399,7 @@ class DbtSource(DbtServiceSource):
schema_name=get_corrected_name(manifest_node.schema_),
table_name=model_name,
)
table_entity: Optional[
Union[Table, List[Table]]
] = get_entity_from_es_result(
@ -448,6 +461,15 @@ class DbtSource(DbtServiceSource):
for node in dbt_node.depends_on.nodes:
try:
parent_node = manifest_entities[node]
table_name = get_dbt_model_name(parent_node)
filter_model = self.is_filtered(
database_name=get_corrected_name(parent_node.database),
schema_name=get_corrected_name(parent_node.schema_),
table_name=table_name,
)
if filter_model.is_filtered:
continue
# check if the node is an ephemeral node
# Recursively store the upstream of the ephemeral node in the upstream list
@ -462,7 +484,7 @@ class DbtSource(DbtServiceSource):
service_name=self.config.serviceName,
database_name=get_corrected_name(parent_node.database),
schema_name=get_corrected_name(parent_node.schema_),
table_name=get_dbt_model_name(parent_node),
table_name=table_name,
)
# check if the parent table exists in OM before adding it to the upstream list

View File

@ -0,0 +1,35 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Models required for dbt
"""
from typing import Any, Optional
from pydantic import BaseModel
class DbtFiles(BaseModel):
dbt_catalog: Optional[dict]
dbt_manifest: dict
dbt_run_results: Optional[dict]
class DbtObjects(BaseModel):
dbt_catalog: Optional[Any]
dbt_manifest: Any
dbt_run_results: Optional[Any]
class DbtFilteredModel(BaseModel):
is_filtered: Optional[bool] = False
message: Optional[str]
model_fqn: Optional[str]

View File

@ -32,7 +32,7 @@ from metadata.ingestion.source.database.dbt.dbt_utils import (
get_dbt_raw_query,
)
from metadata.ingestion.source.database.dbt.metadata import DbtSource
from metadata.utils.dbt_config import DbtFiles, DbtObjects
from metadata.ingestion.source.database.dbt.models import DbtFiles, DbtObjects
from metadata.utils.tag_utils import get_tag_labels
mock_dbt_config = {

View File

@ -79,6 +79,21 @@ source:
# dbtPrefixConfig:
# dbtBucketName: bucket
# dbtObjectPrefix: "dbt/"
# databaseFilterPattern:
# includes:
# - .*db.*
# excludes:
# - .*demo.*
# schemaFilterPattern:
# includes:
# - .*schema.*
# excludes:
# - .*demo.*
# tableFilterPattern:
# includes:
# - .*table.*
# excludes:
# - .*demo.*
sink:
type: metadata-rest
config: {}

View File

@ -52,7 +52,7 @@ Add the details of the AWS s3 bucket in the above config:
The `get_dbt_details` method takes in the source config provided in the json and detects source type (gcp, s3, local or file server) based on the fields provided in the config.
```python
from metadata.utils.dbt_config import get_dbt_details
from metadata.ingestion.source.database.dbt.dbt_config import get_dbt_details
dbt_details = get_dbt_details(self.source_config.dbtConfigSource)
self.dbt_catalog = dbt_details[0] if dbt_details else None

View File

@ -57,6 +57,18 @@
"description": "Custom OpenMetadata Classification name for dbt tags.",
"type": "string",
"default": "dbtTags"
},
"schemaFilterPattern": {
"description": "Regex to only fetch tables or databases that matches the pattern.",
"$ref": "../type/filterPattern.json#/definitions/filterPattern"
},
"tableFilterPattern": {
"description": "Regex exclude tables or databases that matches the pattern.",
"$ref": "../type/filterPattern.json#/definitions/filterPattern"
},
"databaseFilterPattern": {
"description": "Regex to only fetch databases that matches the pattern.",
"$ref": "../type/filterPattern.json#/definitions/filterPattern"
}
},
"additionalProperties": false

View File

@ -569,6 +569,9 @@ const AddIngestion = ({
dbtUpdateDescriptions: dbtConfigSource?.dbtUpdateDescriptions,
includeTags: dbtConfigSource?.includeTags,
dbtClassificationName: dbtConfigSource?.dbtClassificationName,
databaseFilterPattern: databaseFilterPattern,
schemaFilterPattern: schemaFilterPattern,
tableFilterPattern: tableFilterPattern,
};
}
@ -770,6 +773,9 @@ const AddIngestion = ({
cancelText={t('label.cancel')}
data={state}
formType={status}
getExcludeValue={getExcludeValue}
getIncludeValue={getIncludeValue}
handleShowFilter={handleShowFilter}
okText={t('label.next')}
onCancel={handleCancelClick}
onChange={handleStateChange}

View File

@ -27,7 +27,6 @@ import {
PipelineType,
} from '../../generated/entity/services/ingestionPipelines/ingestionPipeline';
import { DbtPipelineClass } from '../../generated/metadataIngestion/dbtPipeline';
import {
DBT_SOURCES,
GCS_CONFIG,
@ -92,7 +91,12 @@ export type ScheduleIntervalProps = {
export type ModifiedDbtConfig = DbtConfig &
Pick<
DbtPipelineClass,
'dbtUpdateDescriptions' | 'dbtClassificationName' | 'includeTags'
| 'dbtUpdateDescriptions'
| 'dbtClassificationName'
| 'includeTags'
| 'databaseFilterPattern'
| 'schemaFilterPattern'
| 'tableFilterPattern'
>;
export interface AddIngestionState {

View File

@ -11,6 +11,7 @@
* limitations under the License.
*/
import { FilterPatternEnum } from 'enums/filterPattern.enum';
import { FormSubmitType } from '../../../enums/form.enum';
import {
Credentials,
@ -33,9 +34,11 @@ export interface DBTFormCommonProps {
export interface DBTConfigFormProps extends DBTFormCommonProps {
formType: FormSubmitType;
data: AddIngestionState;
onChange: (newState: Partial<AddIngestionState>) => void;
onFocus: (fieldName: string) => void;
getExcludeValue: (value: string[], type: FilterPatternEnum) => void;
getIncludeValue: (value: string[], type: FilterPatternEnum) => void;
handleShowFilter: (value: boolean, type: string) => void;
}
export type DbtConfigCloud = Pick<

View File

@ -21,6 +21,9 @@ const handleSubmit = jest.fn();
const handleFocus = jest.fn();
const handleCancel = jest.fn();
const handleChange = jest.fn();
const mockGetExculdeValue = jest.fn();
const mockGetIncludeValue = jest.fn();
const mockHandleShowFilter = jest.fn();
describe('DBTConfigFormBuilder', () => {
it('renders the DBTCloudConfig form when dbtConfigSourceType is "cloud"', async () => {
@ -32,6 +35,9 @@ describe('DBTConfigFormBuilder', () => {
cancelText="Cancel"
data={data as AddIngestionState}
formType={FormSubmitType.ADD}
getExcludeValue={mockGetExculdeValue}
getIncludeValue={mockGetIncludeValue}
handleShowFilter={mockHandleShowFilter}
okText="Ok"
onCancel={handleCancel}
onChange={handleChange}

View File

@ -12,7 +12,9 @@
*/
import { Button, Form, FormProps, Space } from 'antd';
import { ShowFilter } from 'components/AddIngestion/addIngestion.interface';
import { ENTITY_NAME_REGEX } from 'constants/regex.constants';
import { FilterPatternEnum } from 'enums/filterPattern.enum';
import { FieldProp, FieldTypes } from 'interface/FormUtils.interface';
import React, { FunctionComponent, useMemo } from 'react';
import { useTranslation } from 'react-i18next';
@ -36,34 +38,59 @@ const DBTConfigFormBuilder: FunctionComponent<DBTConfigFormProps> = ({
onChange,
onSubmit,
onFocus,
getExcludeValue,
getIncludeValue,
handleShowFilter,
}: DBTConfigFormProps) => {
const { t } = useTranslation();
const [form] = Form.useForm();
const currentDbtConfigSourceType = Form.useWatch('dbtConfigSource', form);
const currentGcsConfigType = Form.useWatch('gcsConfig', form);
const { dbtConfigSource, gcsConfigType, ingestionName, dbtConfigSourceType } =
useMemo(
() => ({
ingestionName: data.ingestionName,
gcsConfigType: data.gcsConfigType ?? currentGcsConfigType,
dbtConfigSourceType: data.dbtConfigSourceType,
dbtConfigSource: {
...data.dbtConfigSource,
dbtClassificationName: data.dbtClassificationName,
dbtUpdateDescriptions: data.dbtUpdateDescriptions,
includeTags: data.includeTags,
},
}),
[
data.ingestionName,
data.gcsConfigType,
data.dbtConfigSourceType,
data.dbtConfigSource,
data.includeTags,
currentGcsConfigType,
]
);
const {
dbtConfigSource,
gcsConfigType,
ingestionName,
dbtConfigSourceType,
databaseFilterPattern,
schemaFilterPattern,
tableFilterPattern,
showDatabaseFilter,
showSchemaFilter,
showTableFilter,
} = useMemo(
() => ({
ingestionName: data.ingestionName,
gcsConfigType: data.gcsConfigType ?? currentGcsConfigType,
dbtConfigSourceType: data.dbtConfigSourceType,
dbtConfigSource: {
...data.dbtConfigSource,
dbtClassificationName: data.dbtClassificationName,
dbtUpdateDescriptions: data.dbtUpdateDescriptions,
includeTags: data.includeTags,
},
databaseFilterPattern: data.databaseFilterPattern,
schemaFilterPattern: data.schemaFilterPattern,
tableFilterPattern: data.tableFilterPattern,
showDatabaseFilter: data.showDatabaseFilter,
showSchemaFilter: data.showSchemaFilter,
showTableFilter: data.showTableFilter,
}),
[
data.ingestionName,
data.gcsConfigType,
data.dbtConfigSourceType,
data.dbtConfigSource,
data.includeTags,
currentGcsConfigType,
data.databaseFilterPattern,
data.schemaFilterPattern,
data.tableFilterPattern,
data.showDatabaseFilter,
data.showSchemaFilter,
data.showTableFilter,
]
);
const getFields = () => {
switch (currentDbtConfigSourceType) {
@ -165,6 +192,63 @@ const DBTConfigFormBuilder: FunctionComponent<DBTConfigFormProps> = ({
},
],
},
{
name: 'databaseFilterPattern',
label: null,
type: FieldTypes.FILTER_PATTERN,
required: false,
props: {
checked: showDatabaseFilter,
excludePattern: databaseFilterPattern?.excludes ?? [],
getExcludeValue: getExcludeValue,
getIncludeValue: getIncludeValue,
handleChecked: (value: boolean) =>
handleShowFilter(value, ShowFilter.showDatabaseFilter),
includePattern: databaseFilterPattern?.includes ?? [],
includePatternExtraInfo: data.database
? t('message.include-database-filter-extra-information')
: undefined,
isDisabled: data.isDatabaseFilterDisabled,
type: FilterPatternEnum.DATABASE,
},
id: 'root/databaseFilterPattern',
},
{
name: 'schemaFilterPattern',
label: null,
type: FieldTypes.FILTER_PATTERN,
required: false,
props: {
checked: showSchemaFilter,
excludePattern: schemaFilterPattern?.excludes ?? [],
getExcludeValue: getExcludeValue,
getIncludeValue: getIncludeValue,
handleChecked: (value: boolean) =>
handleShowFilter(value, ShowFilter.showSchemaFilter),
includePattern: schemaFilterPattern?.includes ?? [],
type: FilterPatternEnum.SCHEMA,
},
id: 'root/schemaFilterPattern',
},
{
name: 'tableFilterPattern',
label: null,
type: FieldTypes.FILTER_PATTERN,
required: false,
props: {
checked: showTableFilter,
excludePattern: tableFilterPattern?.excludes ?? [],
getExcludeValue: getExcludeValue,
getIncludeValue: getIncludeValue,
handleChecked: (value: boolean) =>
handleShowFilter(value, ShowFilter.showTableFilter),
includePattern: tableFilterPattern?.includes ?? [],
type: FilterPatternEnum.TABLE,
showSeparator: false,
},
id: 'root/tableFilterPattern',
hasSeparator: true,
},
{
name: 'dbtConfigSource',
id: 'root/dbtConfigSource',