Dbt worflow (#9302)

This commit is contained in:
Ashish Gupta 2022-12-16 17:00:28 +05:30 committed by GitHub
parent 4e783e7f9c
commit 7c181a5a89
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
64 changed files with 1557 additions and 876 deletions

View File

@ -43,3 +43,7 @@ SET json = JSON_INSERT(
'$.connection.config.database',
JSON_EXTRACT(json, '$.connection.config.databaseSchema')
) where serviceType in ('Db2');
-- Remove DBT source config
UPDATE ingestion_pipeline_entity
SET json = JSON_REMOVE(json ,'$.sourceConfig.config.dbtConfigSource');

View File

@ -38,4 +38,9 @@ where serviceType in ('Db2')
UPDATE dbservice_entity
SET json = json::jsonb #- '{connection,config,databaseSchema}'
where serviceType in ('Db2');
where serviceType in ('Db2');
-- Remove DBT source config
UPDATE ingestion_pipeline_entity
SET json = json::jsonb #- '{sourceConfig,config,dbtConfigSource}';

View File

@ -0,0 +1,52 @@
source:
type: dbt
serviceName: service_name
sourceConfig:
config:
type: DBT
# For DBT, choose one of Cloud, Local, HTTP, S3 or GCS configurations
# dbtConfigSource:
# # For cloud
# dbtCloudAuthToken: token
# dbtCloudAccountId: ID
# # For Local
# dbtCatalogFilePath: path-to-catalog.json
# dbtManifestFilePath: path-to-manifest.json
# dbtRunResultsFilePath: path-to-run_results.json
# # For HTTP
# dbtCatalogHttpPath: http://path-to-catalog.json
# dbtManifestHttpPath: http://path-to-manifest.json
# dbtRunResultsHttpPath: http://path-to-run_results.json
# # For S3
# dbtSecurityConfig: # These are modeled after all AWS credentials
# awsAccessKeyId: KEY
# awsSecretAccessKey: SECRET
# awsRegion: us-east-2
# dbtPrefixConfig:
# dbtBucketName: bucket
# dbtObjectPrefix: "dbt/"
# # For GCS
# dbtSecurityConfig: # These are modeled after all GCS credentials
# type: My Type
# projectId: project ID
# privateKeyId: us-east-2
# privateKey: |
# -----BEGIN PRIVATE KEY-----
# Super secret key
# -----END PRIVATE KEY-----
# clientEmail: client@mail.com
# clientId: 1234
# authUri: https://accounts.google.com/o/oauth2/auth (default)
# tokenUri: https://oauth2.googleapis.com/token (default)
# authProviderX509CertUrl: https://www.googleapis.com/oauth2/v1/certs (default)
# clientX509CertUrl: https://cert.url (URI)
# dbtPrefixConfig:
# dbtBucketName: bucket
# dbtObjectPrefix: "dbt/"
sink:
type: metadata-rest
config: {}
workflowConfig:
openMetadataServerConfig:
hostPort: http://localhost:8585/api
authProvider: no-auth

View File

@ -1,22 +0,0 @@
source:
type: redshift
config:
host_port: cluster.name.region.redshift.amazonaws.com:5439
username: username
password: strong_password
database: warehouse
service_name: aws_redshift
generate_sample_data: 'false'
schema_filter_pattern:
excludes:
- information_schema.*
dbt_manifest_file: ./examples/sample_data/dbt/manifest_1.0.json
dbt_catalog_file: ./examples/sample_data/dbt/catalog_1.0.json
sink:
type: metadata-rest
config: {}
metadata_server:
type: metadata-server
config:
api_endpoint: http://localhost:8585/api
auth_provider_type: no-auth

View File

@ -177,6 +177,9 @@ class MetadataRestSink(Sink[Entity]):
self.metadata.ingest_table_data_model(
table=table, data_model=datamodel_link.datamodel
)
logger.debug(
f"Successfully ingested DataModel for {table.fullyQualifiedName.__root__}"
)
else:
logger.warning(
f"Could not find any entity by Table FQN [{datamodel_link.fqn}] when adding DBT models."

View File

@ -27,10 +27,6 @@ from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.api.services.createStorageService import (
CreateStorageServiceRequest,
)
from metadata.generated.schema.api.tags.createTag import CreateTagRequest
from metadata.generated.schema.api.tags.createTagCategory import (
CreateTagCategoryRequest,
)
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
from metadata.generated.schema.entity.data.location import Location
@ -71,9 +67,7 @@ from metadata.ingestion.models.topology import (
create_source_context,
)
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.dbt_source import DBTMixin
from metadata.utils import fqn
from metadata.utils.dbt_config import get_dbt_details
from metadata.utils.filters import filter_by_schema
from metadata.utils.logger import ingestion_logger
@ -126,9 +120,6 @@ class DatabaseServiceTopology(ServiceTopology):
],
children=["database"],
post_process=[
"process_dbt_lineage_and_descriptions",
"create_dbt_tests_suite_definition",
"create_dbt_test_cases",
"yield_view_lineage",
],
)
@ -181,11 +172,6 @@ class DatabaseServiceTopology(ServiceTopology):
consumer=["storage_service"],
nullable=True,
),
NodeStage(
type_=DataModelLink,
processor="yield_datamodel",
ack_sink=False,
),
NodeStage(
type_=TableLocationLink,
processor="yield_table_location_link",
@ -216,7 +202,7 @@ class SQLSourceStatus(SourceStatus):
class DatabaseServiceSource(
DBTMixin, TopologyRunnerMixin, Source, ABC
TopologyRunnerMixin, Source, ABC
): # pylint: disable=too-many-public-methods
"""
Base class for Database Services.
@ -237,31 +223,8 @@ class DatabaseServiceSource(
topology = DatabaseServiceTopology()
context = create_source_context(topology)
# Initialize DBT structures for all Databases
data_models = {}
dbt_tests = {}
def __init__(self):
if (
hasattr(self.source_config.dbtConfigSource, "dbtSecurityConfig")
and self.source_config.dbtConfigSource.dbtSecurityConfig is None
):
logger.info("dbtConfigSource is not configured")
self.dbt_catalog = None
self.dbt_manifest = None
self.dbt_run_results = None
self.data_models = {}
else:
dbt_details = get_dbt_details(self.source_config.dbtConfigSource)
if dbt_details:
self.dbt_catalog = dbt_details[0] if len(dbt_details) == 3 else None
self.dbt_manifest = dbt_details[1] if len(dbt_details) == 3 else None
self.dbt_run_results = dbt_details[2] if len(dbt_details) == 3 else None
self.data_models = {}
def prepare(self):
self._parse_data_model()
pass
def get_status(self) -> SourceStatus:
return self.status
@ -373,52 +336,6 @@ class DatabaseServiceSource(
"""
yield from self.get_database_schema_names()
def yield_datamodel(
self, table_name_and_type: Tuple[str, TableType]
) -> Iterable[DataModelLink]:
"""
Gets the current table being processed, fetches its data model
and sends it ot the sink
"""
table_name, _ = table_name_and_type
table_fqn = fqn.build(
self.metadata,
entity_type=Table,
service_name=self.context.database_service.name.__root__,
database_name=self.context.database.name.__root__,
schema_name=self.context.database_schema.name.__root__,
table_name=table_name,
)
datamodel = self.get_data_model(table_fqn)
dbt_tag_labels = None
if datamodel:
logger.info("Processing DBT Tags")
dbt_tag_labels = datamodel.tags
if not dbt_tag_labels:
dbt_tag_labels = []
for column in datamodel.columns:
if column.tags:
dbt_tag_labels.extend(column.tags)
if dbt_tag_labels:
for tag_label in dbt_tag_labels:
yield OMetaTagAndCategory(
category_name=CreateTagCategoryRequest(
name="DBTTags",
description="",
),
category_details=CreateTagRequest(
name=tag_label.tagFQN.__root__.split(".")[1],
description="DBT Tags",
),
)
yield DataModelLink(
fqn=table_fqn,
datamodel=datamodel,
)
def yield_table_location_link(
self,
table_name_and_type: Tuple[str, TableType], # pylint: disable=unused-argument

View File

@ -0,0 +1,780 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
DBT source methods.
"""
import traceback
from datetime import datetime
from typing import Iterable, List, Optional, Union
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.api.tags.createTag import CreateTagRequest
from metadata.generated.schema.api.tags.createTagCategory import (
CreateTagCategoryRequest,
)
from metadata.generated.schema.api.tests.createTestCase import CreateTestCaseRequest
from metadata.generated.schema.api.tests.createTestDefinition import (
CreateTestDefinitionRequest,
)
from metadata.generated.schema.api.tests.createTestSuite import CreateTestSuiteRequest
from metadata.generated.schema.entity.data.table import (
Column,
DataModel,
ModelType,
Table,
)
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.entity.tags.tagCategory import Tag
from metadata.generated.schema.entity.teams.team import Team
from metadata.generated.schema.entity.teams.user import User
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.tests.basic import (
TestCaseResult,
TestCaseStatus,
TestResultValue,
)
from metadata.generated.schema.tests.testCase import TestCase
from metadata.generated.schema.tests.testDefinition import (
EntityType,
TestDefinition,
TestPlatform,
)
from metadata.generated.schema.tests.testSuite import TestSuite
from metadata.generated.schema.type.entityLineage import EntitiesEdge
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.generated.schema.type.tagLabel import (
LabelType,
State,
TagLabel,
TagSource,
)
from metadata.ingestion.api.source import SourceStatus
from metadata.ingestion.lineage.sql_lineage import get_lineage_by_query
from metadata.ingestion.models.ometa_tag_category import OMetaTagAndCategory
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.column_type_parser import ColumnTypeParser
from metadata.ingestion.source.database.common_db_source import SQLSourceStatus
from metadata.ingestion.source.database.database_service import DataModelLink
from metadata.ingestion.source.database.dbt_service import DbtFiles, DbtServiceSource
from metadata.utils import fqn
from metadata.utils.elasticsearch import get_entity_from_es_result
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
class DbtSource(DbtServiceSource): # pylint: disable=too-many-public-methods
"""
Class defines method to extract metadata from DBT
"""
def __init__(self, config: WorkflowSource, metadata_config: OpenMetadataConnection):
self.config = config
self.source_config = self.config.sourceConfig.config
self.metadata_config = metadata_config
self.metadata = OpenMetadata(metadata_config)
self.report = SQLSourceStatus()
@classmethod
def create(cls, config_dict, metadata_config: OpenMetadataConnection):
config: WorkflowSource = WorkflowSource.parse_obj(config_dict)
return cls(config, metadata_config)
def get_status(self) -> SourceStatus:
return self.report
def test_connection(self) -> None:
# DBT does not need to connect to any source to process information
# Passing the test connection here
pass
def prepare(self):
"""
By default, there's nothing to prepare
"""
# By default for DBT nothing is required to be prepared
def get_dbt_owner(self, manifest_node: dict, catalog_node: dict) -> Optional[str]:
"""
Returns dbt owner
"""
owner = None
dbt_owner = None
if catalog_node:
dbt_owner = catalog_node["metadata"].get("owner")
if manifest_node:
dbt_owner = manifest_node["meta"].get("owner")
if dbt_owner:
owner_name = dbt_owner
user_owner_fqn = fqn.build(
self.metadata, entity_type=User, user_name=owner_name
)
if user_owner_fqn:
owner = self.metadata.get_entity_reference(
entity=User, fqn=user_owner_fqn
)
else:
team_owner_fqn = fqn.build(
self.metadata, entity_type=Team, team_name=owner_name
)
if team_owner_fqn:
owner = self.metadata.get_entity_reference(
entity=Team, fqn=team_owner_fqn
)
else:
logger.warning(
"Unable to ingest owner from DBT since no user or"
f" team was found with name {dbt_owner}"
)
return owner
def get_dbt_tag_labels(self, dbt_tags_list):
return [
TagLabel(
tagFQN=fqn.build(
self.metadata,
entity_type=Tag,
tag_category_name="DBTTags",
tag_name=tag.replace(".", ""),
),
labelType=LabelType.Automated,
state=State.Confirmed,
source=TagSource.Tag,
)
for tag in dbt_tags_list
] or None
def validate_dbt_files(self, dbt_files: DbtFiles):
"""
Method to validate DBT files
"""
# Validate the Manifest File
logger.info("Validating Manifest File")
required_manifest_keys = [
"alias",
"name",
"schema",
"database",
"resource_type",
"description",
]
required_catalog_keys = ["name", "type", "index", "comment"]
if self.source_config.dbtConfigSource and dbt_files.dbt_manifest:
manifest_entities = {
**dbt_files.dbt_manifest["nodes"],
**dbt_files.dbt_manifest["sources"],
}
if dbt_files.dbt_catalog:
catalog_entities = {
**dbt_files.dbt_catalog["nodes"],
**dbt_files.dbt_catalog["sources"],
}
for key, manifest_node in manifest_entities.items():
if manifest_node["resource_type"] in ["analysis", "test"]:
continue
# Validate if all the required keys are present in the manifest nodes
if all(
required_key in manifest_node
for required_key in required_manifest_keys
):
logger.info(f"Successfully Validated DBT Node: {key}")
else:
logger.warning(
f"Error validating DBT Node: {key}\n"
f"Please check if following keys exist for the node: {required_manifest_keys}"
)
# Validate the catalog file if it is passed
if dbt_files.dbt_catalog:
catalog_node = catalog_entities.get(key)
for catalog_key, catalog_column in catalog_node.get(
"columns"
).items():
if all(
required_catalog_key in catalog_column
for required_catalog_key in required_catalog_keys
):
logger.info(
f"Successfully Validated DBT Column: {catalog_key}"
)
else:
logger.warning(
f"Error validating DBT Column: {catalog_key}\n"
f"Please check if following keys exist for the column node: {required_catalog_keys}"
)
def yield_dbt_tags(self, dbt_files: DbtFiles) -> Iterable[OMetaTagAndCategory]:
"""
Create and yeild tags from DBT
"""
if self.source_config.dbtConfigSource and dbt_files.dbt_manifest:
manifest_entities = {
**dbt_files.dbt_manifest["nodes"],
**dbt_files.dbt_manifest["sources"],
}
logger.info("Processing DBT Tags")
dbt_tags_list = []
for key, manifest_node in manifest_entities.items():
try:
if manifest_node["resource_type"] in ["analysis", "test"]:
continue
# Add the tags from the model
model_tags = manifest_node.get("tags")
if model_tags:
dbt_tags_list.extend(model_tags)
# Add the tags from the columns
for _, column in manifest_node["columns"].items():
column_tags = column.get("tags")
if column_tags:
dbt_tags_list.extend(column_tags)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(
f"Unable to process DBT tags for node: f{key} - {exc}"
)
try:
# Create all the tags added
dbt_tag_labels = self.get_dbt_tag_labels(dbt_tags_list)
for tag_label in dbt_tag_labels:
yield OMetaTagAndCategory(
category_name=CreateTagCategoryRequest(
name="DBTTags",
description="",
),
category_details=CreateTagRequest(
name=tag_label.tagFQN.__root__.split(".")[1],
description="DBT Tags",
),
)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Unexpected exception creating DBT tags: {exc}")
def yield_data_models(self, dbt_files: DbtFiles) -> Iterable[DataModelLink]:
"""
Yield the data models
"""
if self.source_config.dbtConfigSource and dbt_files.dbt_manifest:
logger.info("Parsing DBT Data Models")
manifest_entities = {
**dbt_files.dbt_manifest["nodes"],
**dbt_files.dbt_manifest["sources"],
}
if dbt_files.dbt_catalog:
catalog_entities = {
**dbt_files.dbt_catalog["nodes"],
**dbt_files.dbt_catalog["sources"],
}
self.context.data_model_links = []
self.context.dbt_tests = {}
for key, manifest_node in manifest_entities.items():
try:
# Skip the analysis node since it does not contain relevant metatada
if manifest_node["resource_type"] in ["analysis"]:
continue
# If the run_results file is passed then only DBT tests will be processed
if dbt_files.dbt_run_results:
# Test nodes will be processed further in the topology
if manifest_node["resource_type"] == "test":
self.context.dbt_tests[key] = manifest_node
self.context.dbt_tests[key][
"upstream"
] = self.parse_upstream_nodes(
manifest_entities, manifest_node
)
self.context.dbt_tests[key][
"results"
] = next( # pylint: disable=stop-iteration-return
item
for item in dbt_files.dbt_run_results.get("results")
if item["unique_id"] == key
)
continue
model_name = (
manifest_node["alias"]
if "alias" in manifest_node.keys()
else manifest_node["name"]
)
logger.info(f"Processing DBT node: {model_name}")
catalog_node = None
if dbt_files.dbt_catalog:
catalog_node = catalog_entities.get(key)
dbt_table_tags_list = None
dbt_model_tag_labels = manifest_node.get("tags")
if dbt_model_tag_labels:
dbt_table_tags_list = self.get_dbt_tag_labels(
dbt_model_tag_labels
)
dbt_compiled_query = self.get_dbt_compiled_query(manifest_node)
dbt_raw_query = self.get_dbt_raw_query(manifest_node)
data_model_link = DataModelLink(
fqn=fqn.build(
self.metadata,
entity_type=Table,
service_name=self.config.serviceName,
database_name=(
manifest_node["database"]
if manifest_node["database"]
else "default"
),
schema_name=(
manifest_node["schema"]
if manifest_node["schema"]
else "default"
),
table_name=model_name,
),
datamodel=DataModel(
modelType=ModelType.DBT,
description=manifest_node.get("description")
if manifest_node.get("description")
else None,
path=f"{manifest_node['root_path']}/{manifest_node['original_file_path']}",
rawSql=dbt_raw_query if dbt_raw_query else "",
sql=dbt_compiled_query if dbt_compiled_query else "",
columns=self.parse_data_model_columns(
manifest_node, catalog_node
),
upstream=self.parse_upstream_nodes(
manifest_entities, manifest_node
),
owner=self.get_dbt_owner(
manifest_node=manifest_node, catalog_node=catalog_node
),
tags=dbt_table_tags_list,
),
)
yield data_model_link
self.context.data_model_links.append(data_model_link)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(
f"Unexpected exception parsing DBT node:{model_name} - {exc}"
)
def parse_upstream_nodes(self, manifest_entities, dbt_node):
"""
Method to fetch the upstream nodes
"""
upstream_nodes = []
if "depends_on" in dbt_node and "nodes" in dbt_node["depends_on"]:
for node in dbt_node["depends_on"]["nodes"]:
try:
parent_node = manifest_entities[node]
parent_fqn = fqn.build(
self.metadata,
entity_type=Table,
service_name=self.config.serviceName,
database_name=parent_node["database"]
if parent_node["database"]
else "default",
schema_name=parent_node["schema"]
if parent_node["schema"]
else "default",
table_name=parent_node["name"],
).lower()
if parent_fqn:
upstream_nodes.append(parent_fqn)
except Exception as exc: # pylint: disable=broad-except
logger.debug(traceback.format_exc())
logger.warning(
f"Failed to parse the DBT node {node} to get upstream nodes: {exc}"
)
continue
return upstream_nodes
def parse_data_model_columns(
self, manifest_node: dict, catalog_node: dict
) -> List[Column]:
"""
Method to parse the DBT columns
"""
columns = []
manifest_columns = manifest_node.get("columns", {})
for key, manifest_column in manifest_columns.items():
try:
logger.info(f"Processing DBT column: {key}")
column_name = manifest_column.get("name")
column_type = manifest_column.get("data_type")
column_description = manifest_columns.get(key.lower(), {}).get(
"description"
)
dbt_column_tags = manifest_columns.get(key.lower(), {}).get("tags")
dbt_column_tags_list = self.get_dbt_tag_labels(dbt_column_tags)
# If catalog file is passed pass the column information from catalog file
column_index = None
if catalog_node:
catalog_column = catalog_node["columns"].get(key)
if catalog_column:
column_name = catalog_column.get("name")
column_type = catalog_column.get("type")
column_index = catalog_column.get("index")
if column_description is None:
column_description = catalog_column.get("comment")
columns.append(
Column(
name=column_name,
description=column_description,
dataType=ColumnTypeParser.get_column_type(column_type),
dataLength=1,
ordinalPosition=column_index,
tags=dbt_column_tags_list,
)
)
logger.info(f"Successfully processed DBT column: {key}")
except Exception as exc: # pylint: disable=broad-except
logger.debug(traceback.format_exc())
logger.warning(f"Failed to parse DBT column {column_name}: {exc}")
return columns
def create_dbt_lineage(
self, data_model_link: DataModelLink
) -> Iterable[AddLineageRequest]:
"""
Method to process DBT lineage from upstream nodes
"""
logger.info(f"Processing DBT lineage for: {data_model_link.fqn.__root__}")
# Get the table entity from ES
to_es_result = self.metadata.es_search_from_fqn(
entity_type=Table,
fqn_search_string=data_model_link.fqn.__root__,
)
to_entity: Optional[Union[Table, List[Table]]] = get_entity_from_es_result(
entity_list=to_es_result, fetch_multiple_entities=False
)
for upstream_node in data_model_link.datamodel.upstream:
try:
from_es_result = self.metadata.es_search_from_fqn(
entity_type=Table,
fqn_search_string=upstream_node,
)
from_entity: Optional[
Union[Table, List[Table]]
] = get_entity_from_es_result(
entity_list=from_es_result, fetch_multiple_entities=False
)
if from_entity and to_entity:
yield AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(
id=from_entity.id.__root__,
type="table",
),
toEntity=EntityReference(
id=to_entity.id.__root__,
type="table",
),
)
)
except Exception as exc: # pylint: disable=broad-except
logger.debug(traceback.format_exc())
logger.warning(
f"Failed to parse the node {upstream_node} to capture lineage: {exc}"
)
def create_dbt_query_lineage(
self, data_model_link: DataModelLink
) -> Iterable[AddLineageRequest]:
"""
Method to process DBT lineage from queries
"""
table_fqn = data_model_link.fqn.__root__
logger.info(f"Processing DBT Query lineage for: {table_fqn}")
try:
source_elements = table_fqn.split(".")
query = (
f"create table {table_fqn} as {data_model_link.datamodel.sql.__root__}"
)
lineages = get_lineage_by_query(
self.metadata,
query=query,
service_name=source_elements[1],
database_name=source_elements[2],
schema_name=source_elements[3],
)
for lineage_request in lineages or []:
yield lineage_request
except Exception as exc: # pylint: disable=broad-except
logger.debug(traceback.format_exc())
logger.warning(
f"Failed to parse the query {data_model_link.datamodel.sql.__root__} to capture lineage: {exc}"
)
def process_dbt_descriptions(self, data_model_link: DataModelLink):
"""
Method to process DBT descriptions using patch APIs
"""
logger.info(f"Processing DBT Descriptions for: {data_model_link.fqn.__root__}")
# Get the table entity from ES
to_es_result = self.metadata.es_search_from_fqn(
entity_type=Table,
fqn_search_string=data_model_link.fqn.__root__,
)
to_entity: Optional[Union[Table, List[Table]]] = get_entity_from_es_result(
entity_list=to_es_result, fetch_multiple_entities=False
)
if to_entity:
try:
data_model = data_model_link.datamodel
# Patch table descriptions from DBT
if data_model.description:
self.metadata.patch_description(
entity=Table,
entity_id=to_entity.id,
description=data_model.description.__root__,
force=self.source_config.dbtConfigSource.dbtUpdateDescriptions,
)
# Patch column descriptions from DBT
for column in data_model.columns:
if column.description:
self.metadata.patch_column_description(
entity_id=to_entity.id,
column_name=column.name.__root__,
description=column.description.__root__,
force=self.source_config.dbtConfigSource.dbtUpdateDescriptions,
)
except Exception as exc: # pylint: disable=broad-except
logger.debug(traceback.format_exc())
logger.warning(
f"Failed to parse the node {data_model_link.fqn.__root__} to update dbt desctiption: {exc}"
)
def create_dbt_tests_suite(
self, dbt_test: dict
) -> Iterable[CreateTestSuiteRequest]:
"""
Method to add the DBT tests suites
"""
try:
test_name = dbt_test.get("name")
logger.info(f"Processing DBT Tests Suite for node: {test_name}")
test_suite_name = dbt_test["meta"].get("test_suite_name", "DBT_TEST_SUITE")
test_suite_desciption = dbt_test["meta"].get("test_suite_desciption", "")
check_test_suite_exists = self.metadata.get_by_name(
fqn=test_suite_name, entity=TestSuite
)
if not check_test_suite_exists:
yield CreateTestSuiteRequest(
name=test_suite_name,
description=test_suite_desciption,
)
except Exception as err: # pylint: disable=broad-except
logger.error(f"Failed to parse the node to capture tests {err}")
def create_dbt_tests_suite_definition(
self, dbt_test: dict
) -> Iterable[CreateTestDefinitionRequest]:
"""
AMethod to add DBT test definitions
"""
try:
test_name = dbt_test.get("name")
logger.info(f"Processing DBT Tests Suite Definition for node: {test_name}")
check_test_definition_exists = self.metadata.get_by_name(
fqn=dbt_test["name"],
entity=TestDefinition,
)
if not check_test_definition_exists:
column_name = dbt_test.get("column_name")
if column_name:
entity_type = EntityType.COLUMN
else:
entity_type = EntityType.TABLE
yield CreateTestDefinitionRequest(
name=dbt_test["name"],
description=dbt_test["description"],
entityType=entity_type,
testPlatforms=[TestPlatform.DBT],
parameterDefinition=self.create_test_case_parameter_definitions(
dbt_test
),
)
except Exception as err: # pylint: disable=broad-except
logger.error(f"Failed to parse the node to capture tests {err}")
def create_dbt_test_case(self, dbt_test: dict) -> Iterable[CreateTestCaseRequest]:
"""
After test suite and test definitions have been processed, add the tests cases info
"""
test_name = dbt_test.get("name")
logger.info(f"Processing DBT Test Case Definition for node: {test_name}")
try:
entity_link_list = self.generate_entity_link(dbt_test)
for entity_link in entity_link_list:
test_suite_name = dbt_test["meta"].get(
"test_suite_name", "DBT_TEST_SUITE"
)
yield CreateTestCaseRequest(
name=test_name,
description=dbt_test["description"],
testDefinition=EntityReference(
id=self.metadata.get_by_name(
fqn=test_name,
entity=TestDefinition,
).id.__root__,
type="testDefinition",
),
entityLink=entity_link,
testSuite=EntityReference(
id=self.metadata.get_by_name(
fqn=test_suite_name, entity=TestSuite
).id.__root__,
type="testSuite",
),
parameterValues=self.create_test_case_parameter_values(dbt_test),
)
except Exception as err: # pylint: disable=broad-except
logger.error(f"Failed to parse the node {test_name} to capture tests {err}")
def update_dbt_test_result(self, dbt_test: dict):
"""
After test cases has been processed, add the tests results info
"""
test_name = dbt_test.get("name")
logger.info(f"Processing DBT Test Case Results for node: {test_name}")
try:
# Process the Test Status
dbt_test_result = dbt_test.get("results")
test_case_status = TestCaseStatus.Aborted
test_result_value = 0
if dbt_test_result.get("status") in {"success", "pass"}:
test_case_status = TestCaseStatus.Success
test_result_value = 1
elif dbt_test_result.get("status") in {"failure", "fail"}:
test_case_status = TestCaseStatus.Failed
test_result_value = 0
# Process the Test Timings
dbt_test_timings = dbt_test_result["timing"]
dbt_test_completed_at = None
for dbt_test_timing in dbt_test_timings:
if dbt_test_timing.get("name", "") == "execute":
dbt_test_completed_at = dbt_test_timing.get("completed_at")
dbt_timestamp = None
if dbt_test_completed_at:
dbt_timestamp = datetime.strptime(
dbt_test_completed_at, "%Y-%m-%dT%H:%M:%S.%fZ"
).replace(microsecond=0)
dbt_timestamp = dbt_timestamp.timestamp()
# Create the test case result object
test_case_result = TestCaseResult(
timestamp=dbt_timestamp,
testCaseStatus=test_case_status,
testResultValue=[
TestResultValue(
name=dbt_test_result.get("unique_id"),
value=str(test_result_value),
)
],
)
# Create the test case fqns and add the results
for table_fqn in dbt_test.get("upstream"):
source_elements = table_fqn.split(".")
test_case_fqn = fqn.build(
self.metadata,
entity_type=TestCase,
service_name=self.config.serviceName,
database_name=source_elements[1],
schema_name=source_elements[2],
table_name=source_elements[3],
column_name=dbt_test.get("column_name"),
test_case_name=test_name,
)
self.metadata.add_test_case_results(
test_results=test_case_result,
test_case_fqn=test_case_fqn,
)
except Exception as err: # pylint: disable=broad-except
logger.error(f"Failed capture tests results for node: {test_name} {err}")
def create_test_case_parameter_definitions(self, dbt_test):
test_case_param_definition = [
{
"name": dbt_test["test_metadata"]["name"],
"displayName": dbt_test["test_metadata"]["name"],
"required": False,
}
]
return test_case_param_definition
def create_test_case_parameter_values(self, dbt_test):
values = dbt_test["test_metadata"]["kwargs"].get("values")
dbt_test_values = ""
if values:
dbt_test_values = ",".join(values)
test_case_param_values = [
{"name": dbt_test["test_metadata"]["name"], "value": dbt_test_values}
]
return test_case_param_values
def generate_entity_link(self, dbt_test):
"""
Method returns entity link
"""
entity_link_list = []
for table_fqn in dbt_test["upstream"]:
column_name = dbt_test.get("column_name")
if column_name:
entity_link = (
f"<#E::table::" f"{table_fqn}" f"::columns::" f"{column_name}>"
)
else:
entity_link = f"<#E::table::" f"{table_fqn}>"
entity_link_list.append(entity_link)
return entity_link_list
def get_dbt_compiled_query(self, mnode) -> Optional[str]:
dbt_query_key_names = ["compiled_sql", "compiled_code"]
for key_name in dbt_query_key_names:
query = mnode.get(key_name)
if query:
return query
logger.debug(
f"Unable to get DBT compiled query for node - {mnode.get('name','unknown')}"
)
return None
def get_dbt_raw_query(self, mnode) -> Optional[str]:
dbt_query_key_names = ["raw_sql", "raw_code"]
for key_name in dbt_query_key_names:
query = mnode.get(key_name)
if query:
return query
logger.debug(
f"Unable to get DBT raw query for node - {mnode.get('name','unknown')}"
)
return None

View File

@ -0,0 +1,228 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
DBT service Topology.
"""
from abc import ABC, abstractmethod
from typing import Iterable, Optional
from pydantic import BaseModel
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.api.tests.createTestCase import CreateTestCaseRequest
from metadata.generated.schema.api.tests.createTestDefinition import (
CreateTestDefinitionRequest,
)
from metadata.generated.schema.api.tests.createTestSuite import CreateTestSuiteRequest
from metadata.generated.schema.tests.basic import TestCaseResult
from metadata.ingestion.api.source import Source
from metadata.ingestion.api.topology_runner import TopologyRunnerMixin
from metadata.ingestion.models.ometa_tag_category import OMetaTagAndCategory
from metadata.ingestion.models.topology import (
NodeStage,
ServiceTopology,
TopologyNode,
create_source_context,
)
from metadata.ingestion.source.database.database_service import DataModelLink
from metadata.utils.dbt_config import get_dbt_details
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
class DbtFiles(BaseModel):
dbt_catalog: Optional[dict]
dbt_manifest: Optional[dict]
dbt_run_results: Optional[dict]
class DbtServiceTopology(ServiceTopology):
"""
Defines the hierarchy in Database Services.
service -> db -> schema -> table.
We could have a topology validator. We can only consume
data that has been produced by any parent node.
"""
root = TopologyNode(
producer="get_dbt_files",
stages=[
NodeStage(
type_=OMetaTagAndCategory,
processor="validate_dbt_files",
ack_sink=False,
nullable=True,
),
NodeStage(
type_=OMetaTagAndCategory,
context="tags",
processor="yield_dbt_tags",
ack_sink=False,
nullable=True,
cache_all=True,
),
NodeStage(
type_=DataModelLink,
processor="yield_data_models",
ack_sink=False,
nullable=True,
),
],
children=["process_dbt_entities", "process_dbt_tests"],
)
process_dbt_entities = TopologyNode(
producer="get_data_model",
stages=[
NodeStage(
type_=AddLineageRequest,
processor="create_dbt_lineage",
ack_sink=False,
),
NodeStage(
type_=AddLineageRequest,
processor="create_dbt_query_lineage",
ack_sink=False,
),
NodeStage(
type_=DataModelLink,
processor="process_dbt_descriptions",
ack_sink=False,
nullable=True,
),
],
)
process_dbt_tests = TopologyNode(
producer="get_dbt_tests",
stages=[
NodeStage(
type_=CreateTestSuiteRequest,
processor="create_dbt_tests_suite",
ack_sink=False,
),
NodeStage(
type_=CreateTestDefinitionRequest,
processor="create_dbt_tests_suite_definition",
ack_sink=False,
),
NodeStage(
type_=CreateTestCaseRequest,
processor="create_dbt_test_case",
ack_sink=False,
),
NodeStage(
type_=TestCaseResult,
processor="update_dbt_test_result",
ack_sink=False,
nullable=True,
),
],
)
class DbtServiceSource(TopologyRunnerMixin, Source, ABC):
"""
Class for defining the topology of the DBT source
"""
topology = DbtServiceTopology()
context = create_source_context(topology)
def get_dbt_files(self) -> DbtFiles:
dbt_details = get_dbt_details(
self.source_config.dbtConfigSource # pylint: disable=no-member
)
dbt_files = DbtFiles(
dbt_catalog=dbt_details[0],
dbt_manifest=dbt_details[1],
dbt_run_results=dbt_details[2],
)
yield dbt_files
@abstractmethod
def validate_dbt_files(self, dbt_files: DbtFiles):
"""
Method to validate DBT files
"""
@abstractmethod
def yield_dbt_tags(self, dbt_files: DbtFiles) -> Iterable[OMetaTagAndCategory]:
"""
Create and yeild tags from DBT
"""
@abstractmethod
def yield_data_models(self, dbt_files: DbtFiles) -> DataModelLink:
"""
Yield the data models
"""
def get_data_model(self) -> DataModelLink:
"""
Prepare the data models
"""
for data_model_link in self.context.data_model_links:
yield data_model_link
@abstractmethod
def create_dbt_lineage(self, data_model_link: DataModelLink) -> AddLineageRequest:
"""
Method to process DBT lineage from upstream nodes
"""
@abstractmethod
def create_dbt_query_lineage(
self, data_model_link: DataModelLink
) -> AddLineageRequest:
"""
Method to process DBT lineage from queries
"""
@abstractmethod
def process_dbt_descriptions(self, data_model_link: DataModelLink):
"""
Method to process DBT descriptions using patch APIs
"""
def get_dbt_tests(self) -> dict:
"""
Prepare the DBT tests
"""
for _, dbt_test in self.context.dbt_tests.items():
yield dbt_test
@abstractmethod
def create_dbt_tests_suite(self, dbt_test: dict) -> CreateTestSuiteRequest:
"""
Method to add the DBT tests suites
"""
@abstractmethod
def create_dbt_tests_suite_definition(
self, dbt_test: dict
) -> CreateTestDefinitionRequest:
"""
Method to add DBT test definitions
"""
@abstractmethod
def create_dbt_test_case(self, dbt_test: dict) -> CreateTestCaseRequest:
"""
After test suite and test definitions have been processed, add the tests cases info
"""
@abstractmethod
def update_dbt_test_result(self, dbt_test: dict):
"""
After test cases has been processed, add the tests results info
"""

View File

@ -1,589 +0,0 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
DBT source methods.
"""
import traceback
from datetime import datetime
from typing import Dict, Iterable, List, Optional, Union
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.api.tests.createTestCase import CreateTestCaseRequest
from metadata.generated.schema.api.tests.createTestDefinition import (
CreateTestDefinitionRequest,
)
from metadata.generated.schema.api.tests.createTestSuite import CreateTestSuiteRequest
from metadata.generated.schema.entity.data.table import (
Column,
DataModel,
ModelType,
Table,
)
from metadata.generated.schema.entity.tags.tagCategory import Tag
from metadata.generated.schema.entity.teams.team import Team
from metadata.generated.schema.entity.teams.user import User
from metadata.generated.schema.tests.basic import (
TestCaseResult,
TestCaseStatus,
TestResultValue,
)
from metadata.generated.schema.tests.testCase import TestCase
from metadata.generated.schema.tests.testDefinition import (
EntityType,
TestDefinition,
TestPlatform,
)
from metadata.generated.schema.tests.testSuite import TestSuite
from metadata.generated.schema.type.entityLineage import EntitiesEdge
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.generated.schema.type.tagLabel import (
LabelType,
State,
TagLabel,
TagSource,
)
from metadata.ingestion.lineage.sql_lineage import get_lineage_by_query
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.column_type_parser import ColumnTypeParser
from metadata.utils import fqn
from metadata.utils.elasticsearch import get_entity_from_es_result
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
class DBTMixin:
"""
Class defines method to extract metadata from DBT
"""
metadata: OpenMetadata
def get_data_model(self, table_fqn: str) -> Optional[DataModel]:
return self.data_models.get(table_fqn.lower())
def get_dbt_owner(self, mnode: dict, cnode: dict) -> Optional[str]:
"""
Returns dbt owner
"""
owner = None
if mnode and cnode:
dbt_owner = mnode["meta"].get("owner") or cnode["metadata"].get("owner")
if dbt_owner:
owner_name = f"*{dbt_owner}*"
user_owner_fqn = fqn.build(
self.metadata, entity_type=User, user_name=owner_name
)
if user_owner_fqn:
owner = self.metadata.get_entity_reference(
entity=User, fqn=user_owner_fqn
)
else:
team_owner_fqn = fqn.build(
self.metadata, entity_type=Team, team_name=owner_name
)
if team_owner_fqn:
owner = self.metadata.get_entity_reference(
entity=Team, fqn=team_owner_fqn
)
else:
logger.warning(
"Unable to ingest owner from DBT since no user or"
f" team was found with name {dbt_owner}"
)
return owner
def _parse_data_model(self):
"""
Get all the DBT information and feed it to the Table Entity
"""
if (
self.source_config.dbtConfigSource
and self.dbt_manifest
and self.dbt_catalog
):
logger.info("Parsing Data Models")
self.manifest_entities = {
**self.dbt_manifest["nodes"],
**self.dbt_manifest["sources"],
}
self.catalog_entities = {
**self.dbt_catalog["nodes"],
**self.dbt_catalog["sources"],
}
for key, mnode in self.manifest_entities.items():
try:
model_name = (
mnode["alias"] if "alias" in mnode.keys() else mnode["name"]
)
cnode = self.catalog_entities.get(key)
columns = (
self._parse_data_model_columns(model_name, mnode, cnode)
if cnode
else []
)
if mnode["resource_type"] == "test":
self.dbt_tests[key] = mnode
continue
if mnode["resource_type"] == "analysis":
continue
upstream_nodes = self._parse_data_model_upstream(mnode)
database = mnode["database"] if mnode["database"] else "default"
schema = mnode["schema"] if mnode["schema"] else "default"
dbt_table_tags_list = None
if mnode.get("tags"):
dbt_table_tags_list = [
TagLabel(
tagFQN=fqn.build(
self.metadata,
entity_type=Tag,
tag_category_name="DBTTags",
tag_name=tag,
),
labelType=LabelType.Automated,
state=State.Confirmed,
source=TagSource.Tag,
)
for tag in mnode.get("tags")
] or None
dbt_compiled_query = self.get_dbt_compiled_query(mnode)
dbt_raw_query = self.get_dbt_raw_query(mnode)
model = DataModel(
modelType=ModelType.DBT,
description=mnode.get("description")
if mnode.get("description")
else None,
path=f"{mnode['root_path']}/{mnode['original_file_path']}",
rawSql=dbt_raw_query if dbt_raw_query else "",
sql=dbt_compiled_query if dbt_compiled_query else "",
columns=columns,
upstream=upstream_nodes,
owner=self.get_dbt_owner(mnode=mnode, cnode=cnode),
tags=dbt_table_tags_list,
)
model_fqn = fqn.build(
self.metadata,
entity_type=DataModel,
service_name=self.config.serviceName,
database_name=database,
schema_name=schema,
model_name=model_name,
).lower()
self.data_models[model_fqn] = model
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Unexpected exception parsing data model: {exc}")
def _parse_data_model_upstream(self, mnode):
upstream_nodes = []
if "depends_on" in mnode and "nodes" in mnode["depends_on"]:
for node in mnode["depends_on"]["nodes"]:
try:
parent_node = self.manifest_entities[node]
parent_fqn = fqn.build(
self.metadata,
entity_type=Table,
service_name=self.config.serviceName,
database_name=parent_node["database"]
if parent_node["database"]
else "default",
schema_name=parent_node["schema"]
if parent_node["schema"]
else "default",
table_name=parent_node["name"],
)
if parent_fqn:
upstream_nodes.append(parent_fqn)
except Exception as exc: # pylint: disable=broad-except
logger.debug(traceback.format_exc())
logger.warning(
f"Failed to parse the node {node} to capture lineage: {exc}"
)
continue
return upstream_nodes
def _parse_data_model_columns(
self, _: str, mnode: Dict, cnode: Dict
) -> List[Column]:
columns = []
catalogue_columns = cnode.get("columns", {})
manifest_columns = mnode.get("columns", {})
for key in catalogue_columns:
ccolumn = catalogue_columns[key]
try:
ctype = ccolumn["type"]
description = manifest_columns.get(key.lower(), {}).get("description")
if description is None:
description = ccolumn.get("comment")
dbt_column_tags = manifest_columns.get(key.lower(), {}).get("tags")
dbt_column_tags_list = None
if dbt_column_tags:
dbt_column_tags_list = [
TagLabel(
tagFQN=fqn.build(
self.metadata,
entity_type=Tag,
tag_category_name="DBTTags",
tag_name=tag,
),
labelType=LabelType.Automated,
state=State.Confirmed,
source=TagSource.Tag,
)
for tag in dbt_column_tags
] or None
col = Column(
name=ccolumn["name"].lower(),
description=description if description else None,
dataType=ColumnTypeParser.get_column_type(ctype),
dataLength=1,
ordinalPosition=ccolumn["index"],
tags=dbt_column_tags_list,
)
columns.append(col)
except Exception as exc: # pylint: disable=broad-except
logger.debug(traceback.format_exc())
logger.warning(f"Failed to parse column {ccolumn['name']}: {exc}")
return columns
def process_dbt_lineage_and_descriptions(
self,
) -> Iterable[AddLineageRequest]:
"""
After everything has been processed, add the lineage info
"""
logger.info("Processing DBT lineage and Descriptions")
for data_model_name, data_model in self.data_models.items():
to_es_result = self.metadata.es_search_from_fqn(
entity_type=Table,
fqn_search_string=data_model_name,
)
to_entity: Optional[Union[Table, List[Table]]] = get_entity_from_es_result(
entity_list=to_es_result, fetch_multiple_entities=False
)
if to_entity:
try:
# Patch table descriptions from DBT
if data_model.description:
self.metadata.patch_description(
entity=Table,
entity_id=to_entity.id,
description=data_model.description.__root__,
force=self.source_config.dbtConfigSource.dbtUpdateDescriptions,
)
# Patch column descriptions from DBT
for column in data_model.columns:
if column.description:
self.metadata.patch_column_description(
entity_id=to_entity.id,
column_name=column.name.__root__,
description=column.description.__root__,
force=self.source_config.dbtConfigSource.dbtUpdateDescriptions,
)
except Exception as exc: # pylint: disable=broad-except
logger.debug(traceback.format_exc())
logger.warning(
f"Failed to parse the node {upstream_node} to update dbt desctiption: {exc}"
)
# Create Lineage from DBT Files
for upstream_node in data_model.upstream:
try:
from_es_result = self.metadata.es_search_from_fqn(
entity_type=Table,
fqn_search_string=upstream_node,
)
from_entity: Optional[
Union[Table, List[Table]]
] = get_entity_from_es_result(
entity_list=from_es_result, fetch_multiple_entities=False
)
if from_entity and to_entity:
yield AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(
id=from_entity.id.__root__,
type="table",
),
toEntity=EntityReference(
id=to_entity.id.__root__,
type="table",
),
)
)
except Exception as exc: # pylint: disable=broad-except
logger.debug(traceback.format_exc())
logger.warning(
f"Failed to parse the node {upstream_node} to capture lineage: {exc}"
)
# Create Lineage from DBT Queries
try:
service_database_schema_table = fqn.split(data_model_name)
target_table_fqn = ".".join(service_database_schema_table[1:])
query = f"create table {target_table_fqn} as {data_model.sql.__root__}"
lineages = get_lineage_by_query(
self.metadata,
query=query,
service_name=service_database_schema_table[0],
database_name=service_database_schema_table[1],
schema_name=service_database_schema_table[2],
)
for lineage_request in lineages or []:
yield lineage_request
except Exception as exc: # pylint: disable=broad-except
logger.debug(traceback.format_exc())
logger.warning(
f"Failed to parse the query {data_model.sql.__root__} to capture lineage: {exc}"
)
def create_dbt_tests_suite_definition(self):
"""
After everything has been processed, add the tests suite and test definitions
"""
try:
if (
self.source_config.dbtConfigSource
and self.dbt_manifest
and self.dbt_catalog
and self.dbt_run_results
):
logger.info("Processing DBT Tests Suites and Test Definitions")
for _, dbt_test in self.dbt_tests.items():
test_suite_name = dbt_test["meta"].get(
"test_suite_name", "DBT_TEST_SUITE"
)
test_suite_desciption = dbt_test["meta"].get(
"test_suite_desciption", ""
)
check_test_suite_exists = self.metadata.get_by_name(
fqn=test_suite_name, entity=TestSuite
)
if not check_test_suite_exists:
test_suite = CreateTestSuiteRequest(
name=test_suite_name,
description=test_suite_desciption,
)
yield test_suite
check_test_definition_exists = self.metadata.get_by_name(
fqn=dbt_test["name"],
entity=TestDefinition,
)
if not check_test_definition_exists:
column_name = dbt_test.get("column_name")
if column_name:
entity_type = EntityType.COLUMN
else:
entity_type = EntityType.TABLE
test_definition = CreateTestDefinitionRequest(
name=dbt_test["name"],
description=dbt_test["description"],
entityType=entity_type,
testPlatforms=[TestPlatform.DBT],
parameterDefinition=self.create_test_case_parameter_definitions(
dbt_test
),
)
yield test_definition
except Exception as err: # pylint: disable=broad-except
logger.error(f"Failed to parse the node to capture tests {err}")
def create_dbt_test_cases(self):
"""
After test suite and test definitions have been processed, add the tests cases info
"""
if (
self.source_config.dbtConfigSource
and self.dbt_manifest
and self.dbt_catalog
and self.dbt_run_results
):
logger.info("Processing DBT Tests Cases")
for key, dbt_test in self.dbt_tests.items():
try:
entity_link_list = self.generate_entity_link(dbt_test)
for entity_link in entity_link_list:
test_suite_name = dbt_test["meta"].get(
"test_suite_name", "DBT_TEST_SUITE"
)
test_case = CreateTestCaseRequest(
name=dbt_test["name"],
description=dbt_test["description"],
testDefinition=EntityReference(
id=self.metadata.get_by_name(
fqn=dbt_test["name"],
entity=TestDefinition,
).id.__root__,
type="testDefinition",
),
entityLink=entity_link,
testSuite=EntityReference(
id=self.metadata.get_by_name(
fqn=test_suite_name, entity=TestSuite
).id.__root__,
type="testSuite",
),
parameterValues=self.create_test_case_parameter_values(
dbt_test
),
)
yield test_case
except Exception as err: # pylint: disable=broad-except
logger.error(
f"Failed to parse the node {key} to capture tests {err}"
)
self.update_dbt_test_result()
def update_dbt_test_result(self):
"""
After test cases has been processed, add the tests results info
"""
if self.dbt_run_results:
logger.info("Processing DBT Tests Results")
for dbt_test_result in self.dbt_run_results.get("results"):
try:
# Process the Test Status
test_case_status = TestCaseStatus.Aborted
test_result_value = 0
if dbt_test_result.get("status") in {"success", "pass"}:
test_case_status = TestCaseStatus.Success
test_result_value = 1
elif dbt_test_result.get("status") in {"failure", "fail"}:
test_case_status = TestCaseStatus.Failed
test_result_value = 0
# Process the Test Timings
dbt_test_timings = dbt_test_result["timing"]
dbt_test_completed_at = None
for dbt_test_timing in dbt_test_timings:
if dbt_test_timing.get("name", "") == "execute":
dbt_test_completed_at = dbt_test_timing.get("completed_at")
dbt_timestamp = None
if dbt_test_completed_at:
dbt_timestamp = datetime.strptime(
dbt_test_completed_at, "%Y-%m-%dT%H:%M:%S.%fZ"
).replace(microsecond=0)
dbt_timestamp = dbt_timestamp.timestamp()
test_case_result = TestCaseResult(
timestamp=dbt_timestamp,
testCaseStatus=test_case_status,
testResultValue=[
TestResultValue(
name=dbt_test_result.get("unique_id"),
value=str(test_result_value),
)
],
)
dbt_test_node = self.dbt_tests.get(dbt_test_result["unique_id"])
if dbt_test_node:
nodes = dbt_test_node["depends_on"]["nodes"]
for node in nodes:
model = self.manifest_entities.get(node)
test_case_fqn = fqn.build(
self.metadata,
entity_type=TestCase,
service_name=self.config.serviceName,
database_name=model["database"]
if model["database"]
else "default",
schema_name=model["schema"]
if model["schema"]
else "default",
table_name=model.get("name"),
column_name=dbt_test_node.get("column_name"),
test_case_name=self.dbt_tests.get(
dbt_test_result["unique_id"]
)["name"],
)
self.metadata.add_test_case_results(
test_results=test_case_result,
test_case_fqn=test_case_fqn,
)
except Exception as err: # pylint: disable=broad-except
logger.error(f"Failed capture tests results {err}")
def create_test_case_parameter_definitions(self, dbt_test):
test_case_param_definition = [
{
"name": dbt_test["test_metadata"]["name"],
"displayName": dbt_test["test_metadata"]["name"],
"required": False,
}
]
return test_case_param_definition
def create_test_case_parameter_values(self, dbt_test):
values = dbt_test["test_metadata"]["kwargs"].get("values")
dbt_test_values = ""
if values:
dbt_test_values = ",".join(values)
test_case_param_values = [
{"name": dbt_test["test_metadata"]["name"], "value": dbt_test_values}
]
return test_case_param_values
def generate_entity_link(self, dbt_test):
"""
Method returns entity link
"""
nodes = dbt_test["depends_on"]["nodes"]
entity_link_list = []
for node in nodes:
model = self.manifest_entities.get(node)
table_fqn = fqn.build(
self.metadata,
entity_type=Table,
service_name=self.config.serviceName,
database_name=model["database"] if model["database"] else "default",
schema_name=model["schema"] if model["schema"] else "default",
table_name=model.get("name"),
)
column_name = dbt_test.get("column_name")
if column_name:
entity_link = (
f"<#E::table::" f"{table_fqn}" f"::columns::" f"{column_name}>"
)
else:
entity_link = f"<#E::table::" f"{table_fqn}>"
entity_link_list.append(entity_link)
return entity_link_list
def get_dbt_compiled_query(self, mnode) -> Optional[str]:
dbt_query_key_names = ["compiled_sql", "compiled_code"]
for key_name in dbt_query_key_names:
query = mnode.get(key_name)
if query:
return query
logger.debug(
f"Unable to get DBT compiled query for node - {mnode.get('name','unknown')}"
)
return None
def get_dbt_raw_query(self, mnode) -> Optional[str]:
dbt_query_key_names = ["raw_sql", "raw_code"]
for key_name in dbt_query_key_names:
query = mnode.get(key_name)
if query:
return query
logger.debug(
f"Unable to get DBT raw query for node - {mnode.get('name','unknown')}"
)
return None

View File

@ -17,7 +17,7 @@ from typing import List, Optional, Set, Tuple
from sqlalchemy.engine import Engine
from sqlalchemy.engine.reflection import Inspector
from metadata.generated.schema.entity.data.table import Column, DataModel
from metadata.generated.schema.entity.data.table import Column
from metadata.generated.schema.metadataIngestion.databaseServiceMetadataPipeline import (
DatabaseServiceMetadataPipeline,
)
@ -67,14 +67,6 @@ class SqlAlchemySource(ABC):
Method returns the table level comment
"""
@abstractmethod
def get_data_model(
self, database: str, schema_name: str, table_name: str
) -> DataModel:
"""
Method to fetch data models
"""
@abstractmethod
def get_columns_and_constraints(
self, schema_name: str, table_name: str, inspector: Inspector

View File

@ -59,22 +59,14 @@ def get_dbt_details(config):
def _(config: DbtLocalConfig):
try:
dbt_run_results = None
if config.dbtCatalogFilePath is not None:
logger.debug(
f"Reading [dbtCatalogFilePath] from: {config.dbtCatalogFilePath}"
)
with open(config.dbtCatalogFilePath, "r", encoding="utf-8") as catalog:
dbt_catalog = catalog.read()
dbt_catalog = None
if config.dbtManifestFilePath is not None:
logger.debug(
f"Reading [dbtManifestFilePath] from: {config.dbtCatalogFilePath}"
)
with open(config.dbtManifestFilePath, "r", encoding="utf-8") as manifest:
dbt_manifest = manifest.read()
if (
config.dbtRunResultsFilePath is not None
and config.dbtRunResultsFilePath != ""
):
if config.dbtRunResultsFilePath is not None:
logger.debug(
f"Reading [dbtRunResultsFilePath] from: {config.dbtRunResultsFilePath}"
)
@ -82,8 +74,14 @@ def _(config: DbtLocalConfig):
config.dbtRunResultsFilePath, "r", encoding="utf-8"
) as run_results:
dbt_run_results = run_results.read()
if config.dbtCatalogFilePath is not None:
logger.debug(
f"Reading [dbtCatalogFilePath] from: {config.dbtCatalogFilePath}"
)
with open(config.dbtCatalogFilePath, "r", encoding="utf-8") as catalog:
dbt_catalog = catalog.read()
return (
json.loads(dbt_catalog),
json.loads(dbt_catalog) if dbt_catalog else None,
json.loads(dbt_manifest),
json.loads(dbt_run_results) if dbt_run_results else None,
)
@ -96,27 +94,31 @@ def _(config: DbtLocalConfig):
@get_dbt_details.register
def _(config: DbtHttpConfig):
try:
logger.debug(f"Requesting [dbtCatalogHttpPath] to: {config.dbtCatalogHttpPath}")
dbt_catalog = requests.get( # pylint: disable=missing-timeout
config.dbtCatalogHttpPath
logger.debug(
f"Requesting [dbtManifestHttpPath] to: {config.dbtManifestHttpPath}"
)
logger.debug(f"Requesting [dbtCatalogHttpPath] to: {config.dbtCatalogHttpPath}")
dbt_manifest = requests.get( # pylint: disable=missing-timeout
config.dbtManifestHttpPath
)
dbt_run_results = None
if (
config.dbtRunResultsHttpPath is not None
and config.dbtRunResultsHttpPath != ""
):
if config.dbtRunResultsHttpPath is not None:
logger.debug(
f"Requesting [dbtRunResultsHttpPath] to: {config.dbtRunResultsHttpPath}"
)
dbt_run_results = requests.get( # pylint: disable=missing-timeout
config.dbtRunResultsHttpPath
)
dbt_catalog = None
if config.dbtCatalogHttpPath is not None:
logger.debug(
f"Requesting [dbtCatalogHttpPath] to: {config.dbtCatalogHttpPath}"
)
dbt_catalog = requests.get( # pylint: disable=missing-timeout
config.dbtCatalogHttpPath
)
return (
json.loads(dbt_catalog.text),
json.loads(dbt_catalog.text) if dbt_catalog else None,
json.loads(dbt_manifest.text),
json.loads(dbt_run_results.text) if dbt_run_results else None,
)
@ -210,7 +212,7 @@ def _(config: DbtS3Config):
logger.debug(f"{DBT_RUN_RESULTS_FILE_NAME} found")
dbt_run_results = bucket_object.get()["Body"].read().decode()
return (
json.loads(dbt_catalog),
json.loads(dbt_catalog) if dbt_catalog else None,
json.loads(dbt_manifest),
json.loads(dbt_run_results) if dbt_run_results else None,
)
@ -251,7 +253,7 @@ def _(config: DbtGcsConfig):
logger.debug(f"{DBT_RUN_RESULTS_FILE_NAME} found")
dbt_run_results = blob.download_as_string().decode()
return (
json.loads(dbt_catalog),
json.loads(dbt_catalog) if dbt_catalog else None,
json.loads(dbt_manifest),
json.loads(dbt_run_results) if dbt_run_results else None,
)

View File

@ -0,0 +1,72 @@
# Copyright 2022 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Metadata DAG function builder
"""
from airflow import DAG
from openmetadata_managed_apis.workflows.ingestion.common import (
build_dag,
build_source,
build_workflow_config_property,
metadata_ingestion_workflow,
)
try:
from airflow.operators.python import PythonOperator
except ModuleNotFoundError:
from airflow.operators.python_operator import PythonOperator
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
IngestionPipeline,
)
from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig,
Sink,
)
def build_dbt_workflow_config(
ingestion_pipeline: IngestionPipeline,
) -> OpenMetadataWorkflowConfig:
"""
Given an airflow_pipeline, prepare the workflow config JSON
"""
source = build_source(ingestion_pipeline)
source.type = f"dbt" # Mark the source as dbt
workflow_config = OpenMetadataWorkflowConfig(
source=source,
sink=Sink(
type="metadata-rest",
config={},
),
workflowConfig=build_workflow_config_property(ingestion_pipeline),
ingestionPipelineFQN=ingestion_pipeline.fullyQualifiedName.__root__,
)
return workflow_config
def build_dbt_dag(ingestion_pipeline: IngestionPipeline) -> DAG:
"""
Build a simple metadata workflow DAG
"""
workflow_config = build_dbt_workflow_config(ingestion_pipeline)
dag = build_dag(
task_name="dbt_task",
ingestion_pipeline=ingestion_pipeline,
workflow_config=workflow_config,
workflow_fn=metadata_ingestion_workflow,
)
return dag

View File

@ -16,6 +16,7 @@ Add a function for each type from PipelineType
from openmetadata_managed_apis.workflows.ingestion.data_insight import (
build_data_insight_dag,
)
from openmetadata_managed_apis.workflows.ingestion.dbt import build_dbt_dag
from openmetadata_managed_apis.workflows.ingestion.es_reindex import (
build_es_reindex_dag,
)
@ -37,6 +38,7 @@ build_registry = enum_register()
build_registry.add(PipelineType.metadata.value)(build_metadata_dag)
build_registry.add(PipelineType.usage.value)(build_usage_dag)
build_registry.add(PipelineType.lineage.value)(build_lineage_dag)
build_registry.add(PipelineType.dbt.value)(build_dbt_dag)
build_registry.add(PipelineType.profiler.value)(build_profiler_dag)
build_registry.add(PipelineType.TestSuite.value)(build_test_suite_dag)
build_registry.add(PipelineType.dataInsight.value)(build_data_insight_dag)

View File

@ -46,6 +46,8 @@ public class AirflowRESTClient extends PipelineServiceClient {
"profiler_task",
PipelineType.LINEAGE.toString(),
"lineage_task",
PipelineType.DBT.toString(),
"dbt_task",
PipelineType.USAGE.toString(),
"usage_task",
PipelineType.TEST_SUITE.toString(),

View File

@ -13,12 +13,12 @@
package org.openmetadata.service.util;
import static org.openmetadata.schema.entity.services.ingestionPipelines.PipelineType.METADATA;
import static org.openmetadata.schema.entity.services.ingestionPipelines.PipelineType.DBT;
import java.util.List;
import org.jetbrains.annotations.Nullable;
import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline;
import org.openmetadata.schema.metadataIngestion.DatabaseServiceMetadataPipeline;
import org.openmetadata.schema.metadataIngestion.DbtPipeline;
import org.openmetadata.schema.metadataIngestion.dbtconfig.DbtCloudConfig;
import org.openmetadata.schema.metadataIngestion.dbtconfig.DbtGCSConfig;
import org.openmetadata.schema.metadataIngestion.dbtconfig.DbtHttpConfig;
@ -30,7 +30,6 @@ import org.openmetadata.schema.security.client.CustomOIDCSSOClientConfig;
import org.openmetadata.schema.security.client.GoogleSSOClientConfig;
import org.openmetadata.schema.security.client.OktaSSOClientConfig;
import org.openmetadata.schema.security.client.OpenMetadataJWTClientConfig;
import org.openmetadata.service.Entity;
public class IngestionPipelineBuilder {
@ -53,17 +52,12 @@ public class IngestionPipelineBuilder {
* @return ingestion pipeline with concrete classes
*/
public static IngestionPipeline build(IngestionPipeline ingestionPipeline) {
if (METADATA.equals(ingestionPipeline.getPipelineType())
&& ingestionPipeline.getService().getType().equals(Entity.DATABASE_SERVICE)
&& ingestionPipeline.getSourceConfig() != null) {
DatabaseServiceMetadataPipeline databaseServiceMetadataPipeline =
JsonUtils.convertValue(
ingestionPipeline.getSourceConfig().getConfig(), DatabaseServiceMetadataPipeline.class);
if (DBT.equals(ingestionPipeline.getPipelineType())) {
DbtPipeline dbtPipeline =
JsonUtils.convertValue(ingestionPipeline.getSourceConfig().getConfig(), DbtPipeline.class);
ingestionPipeline
.getSourceConfig()
.setConfig(
databaseServiceMetadataPipeline.withDbtConfigSource(
buildDbtConfigSource(databaseServiceMetadataPipeline.getDbtConfigSource())));
.setConfig(dbtPipeline.withDbtConfigSource(buildDbtConfigSource(dbtPipeline.getDbtConfigSource())));
}
if (ingestionPipeline.getOpenMetadataServerConnection() != null) {
ingestionPipeline

View File

@ -17,7 +17,6 @@ import static javax.ws.rs.core.Response.Status.BAD_REQUEST;
import static javax.ws.rs.core.Response.Status.OK;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.openmetadata.service.Entity.FIELD_OWNER;
import static org.openmetadata.service.util.EntityUtil.fieldAdded;
@ -54,10 +53,13 @@ import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineType;
import org.openmetadata.schema.metadataIngestion.DashboardServiceMetadataPipeline;
import org.openmetadata.schema.metadataIngestion.DatabaseServiceMetadataPipeline;
import org.openmetadata.schema.metadataIngestion.DatabaseServiceQueryUsagePipeline;
import org.openmetadata.schema.metadataIngestion.DbtPipeline;
import org.openmetadata.schema.metadataIngestion.FilterPattern;
import org.openmetadata.schema.metadataIngestion.LogLevels;
import org.openmetadata.schema.metadataIngestion.MessagingServiceMetadataPipeline;
import org.openmetadata.schema.metadataIngestion.SourceConfig;
import org.openmetadata.schema.metadataIngestion.dbtconfig.DbtS3Config;
import org.openmetadata.schema.security.credentials.AWSCredentials;
import org.openmetadata.schema.services.connections.database.BigQueryConnection;
import org.openmetadata.schema.services.connections.database.ConnectionArguments;
import org.openmetadata.schema.services.connections.database.ConnectionOptions;
@ -155,11 +157,6 @@ public class IngestionPipelineResourceTest extends EntityResourceTest<IngestionP
IngestionPipeline expected, IngestionPipeline updated, Map<String, String> authHeaders) {
assertEquals(expected.getDisplayName(), updated.getDisplayName());
assertReference(expected.getService(), updated.getService());
if (Entity.DATABASE_SERVICE.equals(updated.getService().getType())) {
assertNull(
JsonUtils.convertValue(updated.getSourceConfig().getConfig(), DatabaseServiceMetadataPipeline.class)
.getDbtConfigSource());
}
}
@Override
@ -596,6 +593,33 @@ public class IngestionPipelineResourceTest extends EntityResourceTest<IngestionP
assertNotNull(change);
}
@Test
void post_dbtPipeline_configIsEncrypted(TestInfo test) throws IOException {
AWSCredentials awsCredentials =
new AWSCredentials()
.withAwsAccessKeyId("123456789")
.withAwsSecretAccessKey("asdfqwer1234")
.withAwsRegion("eu-west-2");
DbtPipeline dbtPipeline =
new DbtPipeline().withDbtConfigSource(new DbtS3Config().withDbtSecurityConfig(awsCredentials));
CreateIngestionPipeline request =
createRequest(test)
.withPipelineType(PipelineType.DBT)
.withSourceConfig(new SourceConfig().withConfig(dbtPipeline))
.withService(BIGQUERY_REFERENCE)
.withDescription(null)
.withOwner(null);
IngestionPipeline ingestion = createAndCheckEntity(request, ADMIN_AUTH_HEADERS);
DbtPipeline actualDbtPipeline = JsonUtils.convertValue(ingestion.getSourceConfig().getConfig(), DbtPipeline.class);
DbtS3Config actualDbtS3Config = JsonUtils.convertValue(actualDbtPipeline.getDbtConfigSource(), DbtS3Config.class);
assertEquals(actualDbtS3Config.getDbtSecurityConfig().getAwsAccessKeyId(), awsCredentials.getAwsAccessKeyId());
assertEquals(actualDbtS3Config.getDbtSecurityConfig().getAwsRegion(), awsCredentials.getAwsRegion());
assertEquals(
actualDbtS3Config.getDbtSecurityConfig().getAwsSecretAccessKey(),
"secret:/openmetadata/pipeline/ingestionpipeline_post_dbtpipeline_configisencrypted/sourceconfig/config/dbtconfigsource/dbtsecurityconfig/awssecretaccesskey");
}
private IngestionPipeline updateIngestionPipeline(
CreateIngestionPipeline create, Status status, Map<String, String> authHeaders) throws HttpResponseException {
return TestUtils.put(getCollection(), create, IngestionPipeline.class, status, authHeaders);

View File

@ -27,7 +27,7 @@ import org.openmetadata.schema.entity.services.ServiceType;
import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline;
import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineType;
import org.openmetadata.schema.entity.teams.AuthenticationMechanism;
import org.openmetadata.schema.metadataIngestion.DatabaseServiceMetadataPipeline;
import org.openmetadata.schema.metadataIngestion.DbtPipeline;
import org.openmetadata.schema.metadataIngestion.SourceConfig;
import org.openmetadata.schema.metadataIngestion.dbtconfig.DbtS3Config;
import org.openmetadata.schema.security.client.OktaSSOClientConfig;
@ -143,12 +143,12 @@ public abstract class ExternalSecretsManagerTest {
IngestionPipeline expectedIngestionPipeline =
new IngestionPipeline()
.withName("my-pipeline")
.withPipelineType(PipelineType.METADATA)
.withPipelineType(PipelineType.DBT)
.withService(new DatabaseService().getEntityReference().withType(Entity.DATABASE_SERVICE))
.withSourceConfig(
new SourceConfig()
.withConfig(
new DatabaseServiceMetadataPipeline()
new DbtPipeline()
.withDbtConfigSource(
new DbtS3Config()
.withDbtSecurityConfig(
@ -159,7 +159,7 @@ public abstract class ExternalSecretsManagerTest {
IngestionPipeline ingestionPipeline =
new IngestionPipeline()
.withName("my-pipeline")
.withPipelineType(PipelineType.METADATA)
.withPipelineType(PipelineType.DBT)
.withService(new DatabaseService().getEntityReference().withType(Entity.DATABASE_SERVICE))
.withSourceConfig(
new SourceConfig()
@ -176,20 +176,18 @@ public abstract class ExternalSecretsManagerTest {
secretsManager.encryptOrDecryptIngestionPipeline(ingestionPipeline, decrypt);
if (decrypt) {
DatabaseServiceMetadataPipeline expectedDbPipeline =
((DatabaseServiceMetadataPipeline) expectedIngestionPipeline.getSourceConfig().getConfig());
DatabaseServiceMetadataPipeline actualDbPipeline =
((DatabaseServiceMetadataPipeline) actualIngestionPipeline.getSourceConfig().getConfig());
((DbtS3Config) expectedDbPipeline.getDbtConfigSource())
DbtPipeline expectedDbtPipeline = ((DbtPipeline) expectedIngestionPipeline.getSourceConfig().getConfig());
DbtPipeline actualDbtPipeline = ((DbtPipeline) actualIngestionPipeline.getSourceConfig().getConfig());
((DbtS3Config) expectedDbtPipeline.getDbtConfigSource())
.getDbtSecurityConfig()
.setAwsSecretAccessKey(
"secret:/openmetadata/pipeline/my-pipeline/sourceconfig/config/dbtconfigsource/dbtsecurityconfig/awssecretaccesskey");
((DbtS3Config) actualDbPipeline.getDbtConfigSource())
((DbtS3Config) actualDbtPipeline.getDbtConfigSource())
.getDbtSecurityConfig()
.setAwsSecretAccessKey(
Fernet.getInstance()
.decrypt(
((DbtS3Config) actualDbPipeline.getDbtConfigSource())
((DbtS3Config) actualDbtPipeline.getDbtConfigSource())
.getDbtSecurityConfig()
.getAwsSecretAccessKey()));
}

View File

@ -59,6 +59,11 @@
"description": "Support Elastic Search Reindexing",
"type": "boolean",
"default": true
},
"supportsDBTExtraction": {
"description": "Supports DBT Extraction.",
"type": "boolean",
"default": true
}
}
}

View File

@ -59,6 +59,9 @@
"title": "Supports Metadata Extraction",
"$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction"
},
"supportsDBTExtraction": {
"$ref": "../connectionBasicType.json#/definitions/supportsDBTExtraction"
},
"supportsProfiler": {
"title": "Supports Profiler",
"$ref": "../connectionBasicType.json#/definitions/supportsProfiler"

View File

@ -71,6 +71,9 @@
"title": "Supports Metadata Extraction",
"$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction"
},
"supportsDBTExtraction": {
"$ref": "../connectionBasicType.json#/definitions/supportsDBTExtraction"
},
"supportsProfiler": {
"title": "Supports Profiler",
"$ref": "../connectionBasicType.json#/definitions/supportsProfiler"

View File

@ -110,6 +110,9 @@
"supportsLineageExtraction": {
"$ref": "../connectionBasicType.json#/definitions/supportsLineageExtraction"
},
"supportsDBTExtraction": {
"$ref": "../connectionBasicType.json#/definitions/supportsDBTExtraction"
},
"supportsProfiler": {
"title": "Supports Profiler",
"$ref": "../connectionBasicType.json#/definitions/supportsProfiler"

View File

@ -76,6 +76,9 @@
"supportsLineageExtraction": {
"$ref": "../connectionBasicType.json#/definitions/supportsLineageExtraction"
},
"supportsDBTExtraction": {
"$ref": "../connectionBasicType.json#/definitions/supportsDBTExtraction"
},
"supportsProfiler": {
"title": "Supports Profiler",
"$ref": "../connectionBasicType.json#/definitions/supportsProfiler"

View File

@ -72,6 +72,9 @@
"supportsLineageExtraction": {
"$ref": "../connectionBasicType.json#/definitions/supportsLineageExtraction"
},
"supportsDBTExtraction": {
"$ref": "../connectionBasicType.json#/definitions/supportsDBTExtraction"
},
"supportsMetadataExtraction": {
"title": "Supports Metadata Extraction",
"$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction"

View File

@ -65,6 +65,9 @@
"title": "Supports Metadata Extraction",
"$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction"
},
"supportsDBTExtraction": {
"$ref": "../connectionBasicType.json#/definitions/supportsDBTExtraction"
},
"supportsProfiler": {
"title": "Supports Profiler",
"$ref": "../connectionBasicType.json#/definitions/supportsProfiler"

View File

@ -40,6 +40,9 @@
"supportsMetadataExtraction": {
"title": "Supports Metadata Extraction",
"$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction"
},
"supportsDBTExtraction": {
"$ref": "../connectionBasicType.json#/definitions/supportsDBTExtraction"
}
},
"additionalProperties": false,

View File

@ -70,6 +70,9 @@
"title": "Supports Metadata Extraction",
"$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction"
},
"supportsDBTExtraction": {
"$ref": "../connectionBasicType.json#/definitions/supportsDBTExtraction"
},
"supportsProfiler": {
"title": "Supports Profiler",
"$ref": "../connectionBasicType.json#/definitions/supportsProfiler"

View File

@ -65,6 +65,9 @@
"title": "Supports Metadata Extraction",
"$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction"
},
"supportsDBTExtraction": {
"$ref": "../connectionBasicType.json#/definitions/supportsDBTExtraction"
},
"supportsProfiler": {
"title": "Supports Profiler",
"$ref": "../connectionBasicType.json#/definitions/supportsProfiler"

View File

@ -95,6 +95,9 @@
"title": "Supports Metadata Extraction",
"$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction"
},
"supportsDBTExtraction": {
"$ref": "../connectionBasicType.json#/definitions/supportsDBTExtraction"
},
"supportsProfiler": {
"title": "Supports Profiler",
"$ref": "../connectionBasicType.json#/definitions/supportsProfiler"

View File

@ -71,6 +71,9 @@
"supportsLineageExtraction": {
"$ref": "../connectionBasicType.json#/definitions/supportsLineageExtraction"
},
"supportsDBTExtraction": {
"$ref": "../connectionBasicType.json#/definitions/supportsDBTExtraction"
},
"supportsProfiler": {
"title": "Supports Profiler",
"$ref": "../connectionBasicType.json#/definitions/supportsProfiler"

View File

@ -77,6 +77,9 @@
"supportsLineageExtraction": {
"$ref": "../connectionBasicType.json#/definitions/supportsLineageExtraction"
},
"supportsDBTExtraction": {
"$ref": "../connectionBasicType.json#/definitions/supportsDBTExtraction"
},
"supportsProfiler": {
"title": "Supports Profiler",
"$ref": "../connectionBasicType.json#/definitions/supportsProfiler"

View File

@ -65,6 +65,9 @@
"title": "Supports Metadata Extraction",
"$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction"
},
"supportsDBTExtraction": {
"$ref": "../connectionBasicType.json#/definitions/supportsDBTExtraction"
},
"supportsProfiler": {
"title": "Supports Profiler",
"$ref": "../connectionBasicType.json#/definitions/supportsProfiler"

View File

@ -98,6 +98,9 @@
"supportsLineageExtraction": {
"$ref": "../connectionBasicType.json#/definitions/supportsLineageExtraction"
},
"supportsDBTExtraction": {
"$ref": "../connectionBasicType.json#/definitions/supportsDBTExtraction"
},
"supportsProfiler": {
"title": "Supports Profiler",
"$ref": "../connectionBasicType.json#/definitions/supportsProfiler"

View File

@ -71,6 +71,9 @@
"title": "Supports Metadata Extraction",
"$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction"
},
"supportsDBTExtraction": {
"$ref": "../connectionBasicType.json#/definitions/supportsDBTExtraction"
},
"supportsProfiler": {
"title": "Supports Profiler",
"$ref": "../connectionBasicType.json#/definitions/supportsProfiler"

View File

@ -86,6 +86,9 @@
"title": "Supports Metadata Extraction",
"$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction"
},
"supportsDBTExtraction": {
"$ref": "../connectionBasicType.json#/definitions/supportsDBTExtraction"
},
"supportsProfiler": {
"title": "Supports Profiler",
"$ref": "../connectionBasicType.json#/definitions/supportsProfiler"

View File

@ -68,6 +68,9 @@
"supportsUsageExtraction": {
"$ref": "../connectionBasicType.json#/definitions/supportsUsageExtraction"
},
"supportsDBTExtraction": {
"$ref": "../connectionBasicType.json#/definitions/supportsDBTExtraction"
},
"supportsProfiler": {
"title": "Supports Profiler",
"$ref": "../connectionBasicType.json#/definitions/supportsProfiler"

View File

@ -44,7 +44,8 @@
"Datalake",
"DomoDatabase",
"QueryLog",
"CustomDatabase"
"CustomDatabase",
"Dbt"
],
"javaEnums": [
{
@ -136,6 +137,9 @@
},
{
"name": "CustomDatabase"
},
{
"name": "Dbt"
}
]
},

View File

@ -11,7 +11,7 @@
"description": "Type of Pipeline - metadata, usage",
"type": "string",
"javaType": "org.openmetadata.schema.entity.services.ingestionPipelines.PipelineType",
"enum": ["metadata", "usage", "lineage", "profiler", "TestSuite", "dataInsight", "elasticSearchReindex"]
"enum": ["metadata", "usage", "lineage", "profiler", "TestSuite", "dataInsight", "elasticSearchReindex", "dbt"]
},
"pipelineStatus": {
"type": "object",

View File

@ -59,28 +59,6 @@
"databaseFilterPattern": {
"description": "Regex to only fetch databases that matches the pattern.",
"$ref": "../type/filterPattern.json#/definitions/filterPattern"
},
"dbtConfigSource": {
"mask": true,
"title": "DBT Configuration Source",
"description": "Available sources to fetch DBT catalog and manifest files.",
"oneOf": [
{
"$ref": "./dbtconfig/dbtCloudConfig.json"
},
{
"$ref": "./dbtconfig/dbtLocalConfig.json"
},
{
"$ref": "./dbtconfig/dbtHttpConfig.json"
},
{
"$ref": "./dbtconfig/dbtS3Config.json"
},
{
"$ref": "./dbtconfig/dbtGCSConfig.json"
}
]
}
},
"additionalProperties": false

View File

@ -0,0 +1,44 @@
{
"$id": "https://open-metadata.org/schema/metadataIngestion/dbtPipeline.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "dbtPipeline",
"description": "DBT Pipeline Configuration.",
"definitions": {
"dbtConfigType": {
"description": "DBT Config Pipeline type",
"type": "string",
"enum": ["DBT"],
"default": "DBT"
}
},
"properties": {
"type": {
"description": "Pipeline type",
"$ref": "#/definitions/dbtConfigType",
"default": "DBT"
},
"dbtConfigSource": {
"mask": true,
"title": "DBT Configuration Source",
"description": "Available sources to fetch DBT catalog and manifest files.",
"oneOf": [
{
"$ref": "./dbtconfig/dbtCloudConfig.json"
},
{
"$ref": "./dbtconfig/dbtLocalConfig.json"
},
{
"$ref": "./dbtconfig/dbtHttpConfig.json"
},
{
"$ref": "./dbtconfig/dbtS3Config.json"
},
{
"$ref": "./dbtconfig/dbtGCSConfig.json"
}
]
}
},
"additionalProperties": false
}

View File

@ -28,5 +28,5 @@
}
},
"additionalProperties": false,
"required": ["dbtCatalogHttpPath", "dbtManifestHttpPath"]
"required": ["dbtManifestHttpPath"]
}

View File

@ -28,5 +28,5 @@
}
},
"additionalProperties": false,
"required": ["dbtCatalogFilePath", "dbtManifestFilePath"]
"required": ["dbtManifestFilePath"]
}

View File

@ -44,6 +44,9 @@
},
{
"$ref": "dataInsightPipeline.json"
},
{
"$ref": "dbtPipeline.json"
}
]
}

View File

@ -85,12 +85,17 @@ export const handleIngestionRetry = (
}
});
if(ingestionType === 'metadata') {
cy.get(`[data-row-key*="${ingestionType}"]`).find('[data-testid="pipeline-status"]').as('checkRun');
} else {
cy.get(`[data-row-key*="${ingestionType}"]`).find('[data-testid="pipeline-status"]').as('checkRun');
}
// the latest run should be success
cy.get(`.ant-table-tbody > :nth-child(${rowIndex}) > :nth-child(4)`).then(
cy.get('@checkRun').then(
($ingestionStatus) => {
if (
$ingestionStatus.text() !== 'Success' &&
$ingestionStatus.text() !== 'Success' && $ingestionStatus.text() !== 'Failed' &&
retryCount <= RETRY_TIMES
) {
// retry after waiting with log1 method [20s,40s,80s,160s,320s]
@ -101,7 +106,7 @@ export const handleIngestionRetry = (
checkSuccessState();
} else {
cy.get(
`.ant-table-tbody > :nth-child(${rowIndex}) > :nth-child(4)`
'@checkRun'
).should('have.text', 'Success');
}
}
@ -135,7 +140,6 @@ export const testServiceCreationAndIngestion = (
serviceName,
type = 'database',
testIngestionButton = true,
configureDBT
) => {
//Storing the created service name and the type of service
// Select Service in step 1
@ -200,13 +204,6 @@ export const testServiceCreationAndIngestion = (
cy.get('[data-testid="next-button"]').should('exist').click();
// Configure DBT Model
if (isDatabaseService(type)) {
cy.contains('Configure DBT Model').should('be.visible');
configureDBT && configureDBT();
cy.get('[data-testid="submit-btn"]').should('be.visible').click();
}
scheduleIngestion();
cy.contains(`${serviceName}_metadata`).should('be.visible');
@ -962,7 +959,7 @@ export const updateDescriptionForIngestedTables = (
'/api/v1/services/ingestionPipelines/trigger/*',
'checkRun'
);
cy.get('[data-testid="run"]').should('be.visible').click();
cy.get(`[data-row-key*="${serviceName}_metadata"] [data-testid="run"]`).should('be.visible').click();
verifyResponseStatusCode('@checkRun', 200);
//Close the toast message

View File

@ -3,7 +3,7 @@ const uniqueID = uuid();
export const REDSHIFT = {
serviceType: 'Redshift',
serviceName: `Redshift-ct-test-${uniqueID}`,
serviceName: `redshift-ct-test-${uniqueID}`,
tableName: 'boolean_test',
DBTTable: 'customers',
description: `This is Redshift-ct-test-${uniqueID} description`,

View File

@ -14,16 +14,13 @@
import {
deleteCreatedService,
editOwnerforCreatedService,
goToAddNewServicePage,
interceptURL,
testServiceCreationAndIngestion,
goToAddNewServicePage, handleIngestionRetry, interceptURL, scheduleIngestion, testServiceCreationAndIngestion,
updateDescriptionForIngestedTables,
verifyResponseStatusCode,
visitEntityDetailsPage
} from '../../common/common';
import {
API_SERVICE, DBT, HTTP_CONFIG_SOURCE,
SERVICE_TYPE
API_SERVICE, DBT, HTTP_CONFIG_SOURCE, SERVICE_TYPE
} from '../../constants/constants';
import { REDSHIFT } from '../../constants/service.constants';
@ -57,8 +54,59 @@ describe('RedShift Ingestion', () => {
.click();
};
const configureDBT = () => {
cy.contains('Configure DBT Model').should('be.visible');
testServiceCreationAndIngestion(
REDSHIFT.serviceType,
connectionInput,
addIngestionInput,
REDSHIFT.serviceName,
'database',
true,
);
});
it('Update table description and verify description after re-run', () => {
updateDescriptionForIngestedTables(
REDSHIFT.serviceName,
REDSHIFT.tableName,
REDSHIFT.description,
SERVICE_TYPE.Database,
'tables'
);
});
it('Add DBT ingestion', () => {
interceptURL('GET', 'api/v1/teams/name/Organization?fields=*', 'getSettingsPage');
interceptURL("POST", "/api/v1/services/ingestionPipelines/deploy/*", "deployIngestion");
cy.get('[data-testid="appbar-item-settings"]').should('be.visible').click({ force: true });
verifyResponseStatusCode('@getSettingsPage', 200);
// Services page
interceptURL('GET', '/api/v1/services/*', 'getServices');
cy.get('[data-testid="settings-left-panel"]')
.contains(SERVICE_TYPE.Database,)
.should('be.visible')
.click();
verifyResponseStatusCode('@getServices', 200);
cy.intercept('/api/v1/services/ingestionPipelines?*').as('ingestionData');
cy.get(`[data-testid="service-name-${REDSHIFT.serviceName}"]`)
.should('exist')
.click();
cy.get('[data-testid="tabs"]').should('exist');
cy.wait('@ingestionData');
cy.get('[data-testid="Ingestions"]')
.scrollIntoView()
.should('be.visible')
.click();
cy.get('[data-testid="ingestion-details-container"]').should('exist');
cy.get('[data-testid="add-new-ingestion-button"]')
.should('be.visible')
.click();
cy.get('[data-testid="list-item"]')
.contains('Add Dbt Ingestion')
.click();
//Add DBT ingestion
cy.contains('Add Dbt Ingestion').should('be.visible');
cy.get('[data-testid="dbt-source"]')
.should('be.visible')
.select('HTTP Config Source');
@ -74,18 +122,20 @@ describe('RedShift Ingestion', () => {
.scrollIntoView()
.should('be.visible')
.type(HTTP_CONFIG_SOURCE.DBT_RUN_RESTLTS_FILE_PATH);
};
cy.get('[data-testid="submit-btn"]').should('be.visible').click();
testServiceCreationAndIngestion(
REDSHIFT.serviceType,
connectionInput,
addIngestionInput,
REDSHIFT.serviceName,
'database',
true,
configureDBT
);
});
scheduleIngestion();
cy.wait("@deployIngestion").then(() => {
cy.get('[data-testid="view-service-button"]')
.scrollIntoView()
.should('be.visible')
.click();
handleIngestionRetry('database', true, 0, 'dbt');
});
});
it('Validate DBT is ingested properly', () => {
//Verify DBT tags
@ -143,16 +193,6 @@ describe('RedShift Ingestion', () => {
.should('contain', DBT.dataQualityTest2);
});
it('Update table description and verify description after re-run', () => {
updateDescriptionForIngestedTables(
REDSHIFT.serviceName,
REDSHIFT.tableName,
REDSHIFT.description,
SERVICE_TYPE.Database,
'tables'
);
});
it('Edit and validate owner', () => {
editOwnerforCreatedService(
SERVICE_TYPE.Database,

View File

@ -14,6 +14,7 @@
import { isEmpty, isUndefined, trim } from 'lodash';
import { LoadingState } from 'Models';
import React, { useMemo, useState } from 'react';
import { useTranslation } from 'react-i18next';
import {
INITIAL_FILTER_PATTERN,
STEPS_FOR_ADD_INGESTION,
@ -34,9 +35,9 @@ import {
IngestionPipeline,
} from '../../generated/entity/services/ingestionPipelines/ingestionPipeline';
import {
DatabaseServiceMetadataPipelineClass,
DbtConfig,
} from '../../generated/metadataIngestion/databaseServiceMetadataPipeline';
DbtPipelineClass,
} from '../../generated/metadataIngestion/dbtPipeline';
import {
getCurrentUserId,
getIngestionFrequency,
@ -81,6 +82,7 @@ const AddIngestion = ({
handleCancelClick,
handleViewServiceClick,
}: AddIngestionProps) => {
const { t } = useTranslation();
const isDatabaseService = useMemo(() => {
return serviceCategory === ServiceCategory.DATABASE_SERVICES;
}, [serviceCategory]);
@ -88,7 +90,7 @@ const AddIngestion = ({
return serviceData.serviceType === MetadataServiceType.OpenMetadata;
}, [serviceCategory]);
const showDBTConfig = useMemo(() => {
return isDatabaseService && pipelineType === PipelineType.Metadata;
return isDatabaseService && pipelineType === PipelineType.Dbt;
}, [isDatabaseService, pipelineType]);
const [saveState, setSaveState] = useState<LoadingState>('initial');
@ -144,22 +146,21 @@ const AddIngestion = ({
)
);
const configData = useMemo(
() =>
(data?.sourceConfig.config as DatabaseServiceMetadataPipelineClass)
?.dbtConfigSource,
() => (data?.sourceConfig.config as DbtPipelineClass)?.dbtConfigSource,
[data]
);
const [dbtConfigSource, setDbtConfigSource] = useState<DbtConfig | undefined>(
showDBTConfig ? (configData as DbtConfig) : undefined
configData as DbtConfig
);
const sourceTypeData = useMemo(
() => getSourceTypeFromConfig(configData as DbtConfig | undefined),
[configData]
);
const [dbtConfigSourceType, setDbtConfigSourceType] = useState<
DBT_SOURCES | undefined
>(showDBTConfig ? sourceTypeData.sourceType : undefined);
const [dbtConfigSourceType, setDbtConfigSourceType] = useState<DBT_SOURCES>(
sourceTypeData.sourceType
);
const [gcsConfigType, setGcsConfigType] = useState<GCS_CONFIG | undefined>(
showDBTConfig ? sourceTypeData.gcsType : undefined
);
@ -442,12 +443,6 @@ const AddIngestion = ({
const getMetadataIngestionFields = () => {
switch (serviceCategory) {
case ServiceCategory.DATABASE_SERVICES: {
const DatabaseConfigData = {
...(showDBTConfig
? escapeBackwardSlashChar({ dbtConfigSource } as ConfigClass)
: undefined),
};
return {
useFqnForFiltering: useFqnFilter,
includeViews: includeView,
@ -466,7 +461,6 @@ const AddIngestion = ({
),
markDeletedTables,
markAllDeletedTables,
...DatabaseConfigData,
type: ConfigType.DatabaseMetadata,
};
}
@ -557,6 +551,16 @@ const AddIngestion = ({
threadCount: threadCount,
};
}
case PipelineType.Dbt: {
return {
...escapeBackwardSlashChar({
dbtConfigSource,
} as ConfigClass),
type: ConfigType.Dbt,
};
}
case PipelineType.ElasticSearchReindex:
case PipelineType.DataInsight: {
return metadataToESConfig
@ -693,7 +697,9 @@ const AddIngestion = ({
};
const getExcludedSteps = () => {
const excludedSteps = [];
if (!showDBTConfig) {
if (showDBTConfig) {
excludedSteps.push(1);
} else {
excludedSteps.push(2);
}
if (!isServiceTypeOpenMetadata) {
@ -779,14 +785,17 @@ const AddIngestion = ({
{activeIngestionStep === 2 && (
<DBTConfigFormBuilder
cancelText="Back"
cancelText={t('label.cancel')}
data={dbtConfigSource || {}}
formType={status}
gcsType={gcsConfigType}
handleGcsTypeChange={(type) => setGcsConfigType(type)}
handleIngestionName={(val) => setIngestionName(val)}
handleSourceChange={(src) => setDbtConfigSourceType(src)}
okText="Next"
ingestionName={ingestionName}
okText={t('label.next')}
source={dbtConfigSourceType}
onCancel={handlePrev}
onCancel={handleCancelClick}
onSubmit={(dbtConfigData) => {
setDbtConfigSource(dbtConfigData);
handleNext();
@ -812,7 +821,9 @@ const AddIngestion = ({
}
repeatFrequency={repeatFrequency}
status={saveState}
submitButtonLabel={isUndefined(data) ? 'Add & Deploy' : 'Submit'}
submitButtonLabel={
isUndefined(data) ? t('label.add-deploy') : t('label.submit')
}
onBack={handlePrev}
onDeploy={handleScheduleIntervalDeployClick}
/>

View File

@ -138,6 +138,7 @@ const Ingestion: React.FC<IngestionProps> = ({
config.supportsUsageExtraction && pipelineType.push(PipelineType.Usage);
config.supportsUsageExtraction && pipelineType.push(PipelineType.Lineage);
config.supportsProfiler && pipelineType.push(PipelineType.Profiler);
config.supportsDBTExtraction && pipelineType.push(PipelineType.Dbt);
(config as MetadataConnection).supportsDataInsightExtraction &&
pipelineType.push(PipelineType.DataInsight);
(config as MetadataConnection)
@ -149,6 +150,7 @@ const Ingestion: React.FC<IngestionProps> = ({
PipelineType.Usage,
PipelineType.Lineage,
PipelineType.Profiler,
PipelineType.Dbt,
];
}
@ -176,6 +178,7 @@ const Ingestion: React.FC<IngestionProps> = ({
PipelineType.Usage,
PipelineType.Lineage,
PipelineType.Profiler,
PipelineType.Dbt,
];
};

View File

@ -13,7 +13,7 @@
import { Input } from 'antd';
import React, { Fragment, FunctionComponent, useState } from 'react';
import { DbtConfig } from '../../../generated/metadataIngestion/databaseServiceMetadataPipeline';
import { DbtConfig } from '../../../generated/metadataIngestion/dbtPipeline';
import {
errorMsg,
getSeparator,

View File

@ -11,11 +11,12 @@
* limitations under the License.
*/
import { FormSubmitType } from '../../../enums/form.enum';
import {
DbtConfig,
GCSCredentialsValues,
SCredentials,
} from '../../../generated/metadataIngestion/databaseServiceMetadataPipeline';
} from '../../../generated/metadataIngestion/dbtPipeline';
import { DBT_SOURCES, GCS_CONFIG } from './DBTFormEnum';
export interface DBTFormCommonProps {
@ -26,11 +27,14 @@ export interface DBTFormCommonProps {
}
export interface DBTConfigFormProps extends DBTFormCommonProps {
formType: FormSubmitType;
data: DbtConfig;
gcsType?: GCS_CONFIG;
source?: DBT_SOURCES;
handleGcsTypeChange?: (type: GCS_CONFIG) => void;
handleSourceChange?: (src: DBT_SOURCES) => void;
ingestionName: string;
handleIngestionName: (value: string) => void;
}
export type DbtConfigCloud = Pick<

View File

@ -12,7 +12,9 @@
*/
import { fireEvent, getByTestId, render } from '@testing-library/react';
import i18n from 'i18next';
import React from 'react';
import { FormSubmitType } from '../../../enums/form.enum';
import DBTConfigFormBuilder from './DBTConfigFormBuilder';
import { DBT_SOURCES, GCS_CONFIG } from './DBTFormEnum';
@ -148,16 +150,20 @@ const mockCancel = jest.fn();
const mockSubmit = jest.fn();
const mockCatalogChange = jest.fn();
const mockManifestChange = jest.fn();
const mockIngestionName = jest.fn();
const mockProps = {
data: mockData,
okText: 'Next',
cancelText: 'Back',
okText: i18n.t('label.next'),
cancelText: i18n.t('label.cancel'),
gcsType: GCS_CONFIG.GCSValues,
handleGcsTypeChange: mockCatalogChange,
handleSourceChange: mockManifestChange,
onCancel: mockCancel,
onSubmit: mockSubmit,
formType: FormSubmitType.ADD,
handleIngestionName: mockIngestionName,
ingestionName: i18n.t('label.dbt'),
};
describe('Test DBT Config Form Builder', () => {

View File

@ -11,14 +11,16 @@
* limitations under the License.
*/
import { Button } from 'antd';
import React, { Fragment, FunctionComponent, useEffect, useState } from 'react';
import { useTranslation } from 'react-i18next';
import { FormSubmitType } from '../../../enums/form.enum';
import {
DBTBucketDetails,
DbtConfig,
SCredentials,
} from '../../../generated/metadataIngestion/databaseServiceMetadataPipeline';
} from '../../../generated/metadataIngestion/dbtPipeline';
import { getSeparator } from '../../../utils/CommonUtils';
import { Button } from '../../buttons/Button/Button';
import { Field } from '../../Field/Field';
import { DBTCloudConfig } from './DBTCloudConfig';
import { DBTConfigFormProps } from './DBTConfigForm.interface';
@ -39,7 +41,11 @@ const DBTConfigFormBuilder: FunctionComponent<DBTConfigFormProps> = ({
handleSourceChange,
onCancel,
onSubmit,
formType,
ingestionName,
handleIngestionName,
}: DBTConfigFormProps) => {
const { t } = useTranslation();
const [dbtConfig, setDbtConfig] = useState<DbtConfig>(data);
const updateDbtConfig = (
@ -203,27 +209,24 @@ const DBTConfigFormBuilder: FunctionComponent<DBTConfigFormProps> = ({
return (
<Fragment>
<span data-testid="dbt-source-none">
No source selected for DBT Configuration.
{t('label.no-selected-dbt')}
</span>
{getSeparator('')}
<Field className="tw-flex tw-justify-end">
<Button
className="tw-mr-2"
className="m-r-xs"
data-testid="back-button"
size="regular"
theme="primary"
variant="text"
type="link"
onClick={onCancel}>
<span>{cancelText}</span>
{cancelText}
</Button>
<Button
className="font-medium p-x-md p-y-xxs h-auto rounded-6"
data-testid="submit-btn"
size="regular"
theme="primary"
variant="contained"
type="primary"
onClick={() => onSubmit()}>
<span>{okText}</span>
{okText}
</Button>
</Field>
</Fragment>
@ -239,18 +242,37 @@ const DBTConfigFormBuilder: FunctionComponent<DBTConfigFormProps> = ({
return (
<Fragment>
<Field>
<label className="tw-block tw-form-label tw-mb-1" htmlFor="dbt-source">
DBT Configuration Source
<label className="tw-block tw-form-label tw-mb-1" htmlFor="name">
{t('label.name')}
</label>
<p className="tw-text-grey-muted tw-mt-1 tw-mb-2 tw-text-sm">
Available sources to fetch DBT catalog and manifest files.
{t('message.instance-identifier')}
</p>
<input
className="tw-form-inputs tw-form-inputs-padding"
data-testid="name"
disabled={formType === FormSubmitType.EDIT}
id="name"
name="name"
type="text"
value={ingestionName}
onChange={(e) => handleIngestionName(e.target.value)}
/>
{getSeparator('')}
</Field>
<Field>
<label className="tw-block tw-form-label tw-mb-1" htmlFor="dbt-source">
{t('label.dbt-Configuration-source')}
</label>
<p className="tw-text-grey-muted tw-mt-1 tw-mb-2 tw-text-sm">
{t('message.fetch-dbt-files')}
</p>
<select
className="tw-form-inputs tw-form-inputs-padding"
data-testid="dbt-source"
id="dbt-source"
name="dbt-source"
placeholder="Select DBT Source"
placeholder={t('label.select-dbt-source')}
value={source}
onChange={(e) => {
handleSourceChange &&

View File

@ -25,7 +25,7 @@ import {
DbtConfig,
GCSCredentialsValues,
SCredentials,
} from '../../../generated/metadataIngestion/databaseServiceMetadataPipeline';
} from '../../../generated/metadataIngestion/dbtPipeline';
import jsonData from '../../../jsons/en';
import {
errorMsg,

View File

@ -12,7 +12,7 @@
*/
import React, { Fragment, FunctionComponent, useState } from 'react';
import { DbtConfig } from '../../../generated/metadataIngestion/databaseServiceMetadataPipeline';
import { DbtConfig } from '../../../generated/metadataIngestion/dbtPipeline';
import {
errorMsg,
getSeparator,

View File

@ -12,7 +12,7 @@
*/
import React, { Fragment, FunctionComponent, useState } from 'react';
import { DbtConfig } from '../../../generated/metadataIngestion/databaseServiceMetadataPipeline';
import { DbtConfig } from '../../../generated/metadataIngestion/dbtPipeline';
import {
errorMsg,
getSeparator,

View File

@ -17,7 +17,7 @@ import {
DBTBucketDetails,
DbtConfig,
SCredentials,
} from '../../../generated/metadataIngestion/databaseServiceMetadataPipeline';
} from '../../../generated/metadataIngestion/dbtPipeline';
import {
errorMsg,
getSeparator,

View File

@ -218,6 +218,7 @@ export const DEF_UI_SCHEMA = {
supportsProfiler: { 'ui:widget': 'hidden', 'ui:hideError': true },
supportsDatabase: { 'ui:widget': 'hidden', 'ui:hideError': true },
supportsQueryComment: { 'ui:widget': 'hidden', 'ui:hideError': true },
supportsDBTExtraction: { 'ui:widget': 'hidden', 'ui:hideError': true },
type: { 'ui:widget': 'hidden' },
};

View File

@ -118,3 +118,19 @@ export const addProfilerIngestionGuide = [
'You are all set! The <Ingestion Pipeline Name> has been successfully deployed. The profiler will run at a regular interval as per the schedule.',
},
];
export const addDBTIngestionGuide = [
{
step: 2,
title: 'Add DBT Ingestion',
description: `A profiler workflow can be configured and deployed after a metadata ingestion has been set up. Multiple profiler pipelines can be set up for the same database service.
The pipeline feeds the Profiler tab of the Table entity, and also runs the tests configured for that entity. Add a Name, FQN, and define the filter pattern to start.`,
},
{ ...schedulingIngestionGuide },
{
step: 5,
title: 'DBT Ingestion Added Successfully',
description:
'You are all set! The <Ingestion Pipeline Name> has been successfully deployed. The profiler will run at a regular interval as per the schedule.',
},
];

View File

@ -545,7 +545,12 @@
"insert": "Insert",
"table-profile": "Table Profile",
"column-profile": "Column Profile",
"applied-advanced-search": "Applied advanced search"
"applied-advanced-search": "Applied advanced search",
"add-deploy": "Add & Deploy",
"dbt-Configuration-source": "DBT Configuration Source",
"select-dbt-source": "Select DBT Source",
"no-selected-dbt": "No source selected for DBT Configuration.",
"dbt": "DBT"
},
"message": {
"service-email-required": "Service account Email is required",
@ -662,7 +667,9 @@
"permanently-delete-metadata": "Permanently deleting this {{entityName}} will remove its metadata from OpenMetadata permanently.",
"announcement-created-successfully": "Announcement created successfully!",
"no-profiler-message": "Data Profiler is an optional configuration in Ingestion. Please enable the data profiler by following the documentation",
"advanced-search-message": "Discover the right data assets using the syntax editor with and/or conditions."
"advanced-search-message": "Discover the right data assets using the syntax editor with and/or conditions.",
"instance-identifier": "Name that identifies this configuration instance uniquely.",
"fetch-dbt-files": "Available sources to fetch DBT catalog and manifest files."
},
"server": {
"you-have-not-action-anything-yet": "You have not {{action}} anything yet.",

View File

@ -240,6 +240,12 @@ const AddIngestionPage = () => {
]);
}, [serviceCategory, ingestionType, serviceData]);
useEffect(() => {
if (ingestionType === PipelineType.Dbt) {
setActiveIngestionStep(2);
}
}, [ingestionType]);
const renderAddIngestionPage = () => {
if (isLoading) {
return <Loader />;

View File

@ -274,6 +274,12 @@ const EditIngestionPage = () => {
]);
}, [serviceCategory, ingestionType, serviceData]);
useEffect(() => {
if (ingestionType === PipelineType.Dbt) {
setActiveIngestionStep(2);
}
}, [ingestionType]);
const renderEditIngestionPage = () => {
if (isLoading) {
return <Loader />;

View File

@ -84,6 +84,10 @@ const LogsViewer = () => {
case PipelineType.Lineage:
setLogs(logs.concat(res.data?.lineage_task || ''));
break;
case PipelineType.Dbt:
setLogs(logs.concat(res.data?.dbt_task || ''));
break;
case PipelineType.TestSuite:
setLogs(logs.concat(res.data?.test_suite_task || ''));

View File

@ -17,6 +17,7 @@ export interface IngestionPipelineLogByIdInterface {
lineage_task?: string;
test_suite_task?: string;
data_insight_task?: string;
dbt_task?: string;
total?: string;
after?: string;
}

View File

@ -44,7 +44,7 @@ import {
DbtConfig,
GCSCredentialsValues,
SCredentials,
} from '../generated/metadataIngestion/databaseServiceMetadataPipeline';
} from '../generated/metadataIngestion/dbtPipeline';
import { FormValidationRules } from '../interface/genericForm.interface';
import jsonData from '../jsons/en';
import { isValidEmail, isValidUrl } from './CommonUtils';

View File

@ -29,6 +29,7 @@ import {
} from '../components/PermissionProvider/PermissionProvider.interface';
import { GlobalSettingOptions } from '../constants/GlobalSettings.constants';
import {
addDBTIngestionGuide,
addLineageIngestionGuide,
addMetadataIngestionGuide,
addProfilerIngestionGuide,
@ -509,6 +510,11 @@ export const getServiceIngestionStepGuide = (
break;
}
case IngestionPipelineType.Dbt: {
guide = addDBTIngestionGuide.find((item) => item.step === step);
break;
}
case IngestionPipelineType.Metadata:
default: {
guide = addMetadataIngestionGuide.find((item) => item.step === step);
@ -569,9 +575,11 @@ export const getIngestionName = (
type: IngestionPipelineType
) => {
if (
[IngestionPipelineType.Profiler, IngestionPipelineType.Metadata].includes(
type
)
[
IngestionPipelineType.Profiler,
IngestionPipelineType.Metadata,
IngestionPipelineType.Dbt,
].includes(type)
) {
return `${serviceName}_${type}_${cryptoRandomString({
length: 8,