cleaned dbt code after pydantic update (#16605)

This commit is contained in:
Onkar Ravgan 2024-06-12 11:24:29 +05:30 committed by GitHub
parent 3db41f08e2
commit 4f50e0b6a0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 52 additions and 51 deletions

View File

@ -183,7 +183,8 @@ def _(config: DbtCloudConfig): # pylint: disable=too-many-locals
last_run = runs_data[0]
run_id = last_run["id"]
logger.info(
f"Retrieved last successful run [{run_id}] finished {last_run['finished_at_humanized']} (duration: {last_run['duration_humanized']})"
f"Retrieved last successful run [{str(run_id)}]: "
f"Finished {str(last_run['finished_at_humanized'])} (duration: {str(last_run['duration_humanized'])})"
)
try:
logger.debug("Requesting [dbt_catalog]")

View File

@ -216,8 +216,7 @@ class DbtServiceSource(TopologyRunnerMixin, Source, ABC):
"""
Prepare the data models
"""
for data_model_link in self.context.get().data_model_links:
yield data_model_link
yield from self.context.get().data_model_links
@abstractmethod
def create_dbt_lineage(self, data_model_link: DataModelLink) -> AddLineageRequest:

View File

@ -13,7 +13,7 @@ DBT source methods.
"""
import traceback
from datetime import datetime
from typing import Iterable, List, Optional, Union
from typing import Any, Iterable, List, Optional, Union
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.api.tests.createTestCase import CreateTestCaseRequest
@ -136,10 +136,9 @@ class DbtSource(DbtServiceSource):
"""
By default for DBT nothing is required to be prepared
"""
pass
def get_dbt_owner(
self, manifest_node: dict, catalog_node: Optional[dict]
self, manifest_node: Any, catalog_node: Optional[Any]
) -> Optional[EntityReference]:
"""
Returns dbt owner
@ -411,7 +410,6 @@ class DbtSource(DbtServiceSource):
dbt_raw_query = get_dbt_raw_query(manifest_node)
# Get the table entity from ES
# TODO: Change to get_by_name once the postgres case sensitive calls is fixed
table_fqn = fqn.build(
self.metadata,
entity_type=Table,
@ -445,8 +443,9 @@ class DbtSource(DbtServiceSource):
rawSql=SqlQuery(dbt_raw_query)
if dbt_raw_query
else None,
# SQL Is a required param for the DataModel
sql=SqlQuery(dbt_compiled_query or dbt_raw_query or ""),
sql=SqlQuery(dbt_compiled_query)
if dbt_compiled_query
else None,
columns=self.parse_data_model_columns(
manifest_node, catalog_node
),
@ -519,7 +518,6 @@ class DbtSource(DbtServiceSource):
)
# check if the parent table exists in OM before adding it to the upstream list
# TODO: Change to get_by_name once the postgres case sensitive calls is fixed
parent_table_entity: Optional[
Union[Table, List[Table]]
] = get_entity_from_es_result(
@ -539,7 +537,7 @@ class DbtSource(DbtServiceSource):
return upstream_nodes
def parse_data_model_columns(
self, manifest_node: dict, catalog_node: dict
self, manifest_node: Any, catalog_node: Any
) -> List[Column]:
"""
Method to parse the DBT columns
@ -549,7 +547,7 @@ class DbtSource(DbtServiceSource):
for key, manifest_column in manifest_columns.items():
try:
logger.debug(f"Processing DBT column: {key}")
# If catalog file is passed pass the column information from catalog file
# If catalog file is passed, pass the column information from catalog file
catalog_column = None
if catalog_node and catalog_node.columns:
catalog_column = catalog_node.columns.get(key)
@ -665,44 +663,48 @@ class DbtSource(DbtServiceSource):
"""
Method to process DBT lineage from queries
"""
to_entity: Table = data_model_link.table_entity
logger.debug(
f"Processing DBT Query lineage for: {to_entity.fullyQualifiedName.root}"
)
try:
source_elements = fqn.split(to_entity.fullyQualifiedName.root)
# remove service name from fqn to make it parseable in format db.schema.table
query_fqn = fqn._build( # pylint: disable=protected-access
*source_elements[-3:]
if data_model_link.datamodel.sql:
to_entity: Table = data_model_link.table_entity
logger.debug(
f"Processing DBT Query lineage for: {to_entity.fullyQualifiedName.root}"
)
query = f"create table {query_fqn} as {data_model_link.datamodel.sql.root}"
connection_type = str(self.config.serviceConnection.root.config.type.value)
dialect = ConnectionTypeDialectMapper.dialect_of(connection_type)
lineages = get_lineage_by_query(
self.metadata,
query=query,
service_name=source_elements[0],
database_name=source_elements[1],
schema_name=source_elements[2],
dialect=dialect,
timeout_seconds=self.source_config.parsingTimeoutLimit,
lineage_source=LineageSource.DbtLineage,
)
for lineage_request in lineages or []:
yield lineage_request
except Exception as exc: # pylint: disable=broad-except
yield Either(
left=StackTraceError(
name=data_model_link.datamodel.sql.root,
error=(
f"Failed to parse the query {data_model_link.datamodel.sql.root}"
f" to capture lineage: {exc}"
),
stackTrace=traceback.format_exc(),
try:
source_elements = fqn.split(to_entity.fullyQualifiedName.root)
# remove service name from fqn to make it parseable in format db.schema.table
query_fqn = fqn._build( # pylint: disable=protected-access
*source_elements[-3:]
)
query = (
f"create table {query_fqn} as {data_model_link.datamodel.sql.root}"
)
connection_type = str(
self.config.serviceConnection.root.config.type.value
)
dialect = ConnectionTypeDialectMapper.dialect_of(connection_type)
lineages = get_lineage_by_query(
self.metadata,
query=query,
service_name=source_elements[0],
database_name=source_elements[1],
schema_name=source_elements[2],
dialect=dialect,
timeout_seconds=self.source_config.parsingTimeoutLimit,
lineage_source=LineageSource.DbtLineage,
)
yield from lineages or []
except Exception as exc: # pylint: disable=broad-except
yield Either(
left=StackTraceError(
name=data_model_link.datamodel.sql.root,
error=(
f"Failed to parse the query {data_model_link.datamodel.sql.root}"
f" to capture lineage: {exc}"
),
stackTrace=traceback.format_exc(),
)
)
)
def process_dbt_meta(self, manifest_meta):
"""

View File

@ -123,4 +123,4 @@ def convert_timestamp_to_milliseconds(timestamp: Union[int, float]) -> int:
"""
if len(str(round(timestamp))) == 13:
return timestamp
return timestamp * 1000
return round(timestamp * 1000)

View File

@ -612,7 +612,7 @@ public class TableRepository extends EntityRepository<Table> {
if (dataModel.getSql() == null || dataModel.getSql().isBlank()) {
if (table.getDataModel() != null
&& (table.getDataModel().getSql() != null || !table.getDataModel().getSql().isBlank())) {
&& (table.getDataModel().getSql() != null && !table.getDataModel().getSql().isBlank())) {
dataModel.setSql(table.getDataModel().getSql());
}
}

View File

@ -899,8 +899,7 @@
}
},
"required": [
"modelType",
"sql"
"modelType"
],
"additionalProperties": false
},