diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/TableRepository.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/TableRepository.java
index 1889d705e23..297747854c6 100644
--- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/TableRepository.java
+++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/TableRepository.java
@@ -237,6 +237,22 @@ public class TableRepository extends EntityRepository
{
public Table addDataModel(UUID tableId, DataModel dataModel) throws IOException, ParseException {
Table table = dao.tableDAO().findEntityById(tableId);
table.withDataModel(dataModel);
+
+ // Carry forward the table description from the model to table entity, if empty
+ if (table.getDescription() == null || table.getDescription().isEmpty()) {
+ table.setDescription(dataModel.getDescription());
+ }
+ // Carry forward the column description from the model to table columns, if empty
+ for (Column modelColumn : Optional.ofNullable(dataModel.getColumns()).orElse(Collections.emptyList())) {
+ Column stored = table.getColumns().stream().filter(c ->
+ EntityUtil.columnNameMatch.test(c, modelColumn)).findAny().orElse(null);
+ if (stored == null) {
+ continue;
+ }
+ if (stored.getDescription() == null || stored.getDescription().isEmpty()) {
+ stored.setDescription(modelColumn.getDescription());
+ }
+ }
dao.tableDAO().update(table.getId(), JsonUtils.pojoToJson(table));
setFields(table, Fields.EMPTY_FIELDS);
return table;
@@ -785,8 +801,7 @@ public class TableRepository extends EntityRepository {
// Carry forward the user generated metadata from existing columns to new columns
for (Column updated : updatedColumns) {
// Find stored column matching name, data type and ordinal position
- Column stored = origColumns.stream().filter(c ->
- EntityUtil.columnMatch.test(c, updated)).findAny().orElse(null);
+ Column stored = origColumns.stream().filter(c -> columnMatch.test(c, updated)).findAny().orElse(null);
if (stored == null) { // New column added
continue;
}
diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/databases/TableResource.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/databases/TableResource.java
index a5599c503aa..89e62f829a9 100644
--- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/databases/TableResource.java
+++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/databases/TableResource.java
@@ -420,7 +420,7 @@ public class TableResource {
@Path("/{id}/dataModel")
@Operation(summary = "Add data modeling information to a table", tags = "tables",
description = "Add data modeling (such as DBT model) information on how the table was created to the table.")
- public Table addQuery(@Context UriInfo uriInfo,
+ public Table addDataModel(@Context UriInfo uriInfo,
@Context SecurityContext securityContext,
@Parameter(description = "Id of the table", schema = @Schema(type = "string"))
@PathParam("id") String id, DataModel dataModel) throws IOException, ParseException {
diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/EntityUtil.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/EntityUtil.java
index c428f218602..ade0ee286ea 100644
--- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/EntityUtil.java
+++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/EntityUtil.java
@@ -100,6 +100,9 @@ public final class EntityUtil {
column1.getArrayDataType() == column2.getArrayDataType() &&
Objects.equals(column1.getOrdinalPosition(), column2.getOrdinalPosition());
+ public static BiPredicate columnNameMatch = (column1, column2) ->
+ column1.getName().equals(column2.getName());
+
public static BiPredicate tableConstraintMatch = (constraint1, constraint2) ->
constraint1.getConstraintType() == constraint2.getConstraintType() &&
constraint1.getColumns().equals(constraint2.getColumns());
diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/data/table.json b/catalog-rest-service/src/main/resources/json/schema/entity/data/table.json
index 71c553ee1f6..812b1b8ad0a 100644
--- a/catalog-rest-service/src/main/resources/json/schema/entity/data/table.json
+++ b/catalog-rest-service/src/main/resources/json/schema/entity/data/table.json
@@ -395,6 +395,10 @@
"modelType" : {
"$ref" : "#/definitions/modelType"
},
+ "description" : {
+ "description": "Description of the Table from the model",
+ "type" : "string"
+ },
"path" : {
"description": "Path to sql definition file.",
"type" : "string"
diff --git a/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/databases/TableResourceTest.java b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/databases/TableResourceTest.java
index 8daced63e51..74011a3f722 100644
--- a/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/databases/TableResourceTest.java
+++ b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/databases/TableResourceTest.java
@@ -811,19 +811,30 @@ public class TableResourceTest extends EntityResourceTest {
@Test
public void put_tableDataModel(TestInfo test) throws IOException {
- Table table = createAndCheckEntity(create(test), adminAuthHeaders());
+ List columns = Arrays.asList(
+ getColumn("c1", BIGINT, USER_ADDRESS_TAG_LABEL).withDescription(null),
+ getColumn("c2", ColumnDataType.VARCHAR, USER_ADDRESS_TAG_LABEL)
+ .withDataLength(10).withDescription(null));
+ Table table = createAndCheckEntity(create(test).withColumns(columns).withDescription(null), adminAuthHeaders());
//
- // Update the data model and validate the response
+ // Update the data model and validate the response.
+ // Make sure table and column description is carried forward if the original entity had them as null
//
+ columns.get(0).setDescription("updatedDescription");
+ columns.get(1).setDescription("updatedDescription");
String query = "select * from test;";
- DataModel dataModel = new DataModel().withModelType(ModelType.DBT).withSql(query).withGeneratedAt(new Date());
+ DataModel dataModel = new DataModel().withDescription("updatedTableDescription").withModelType(ModelType.DBT)
+ .withSql(query).withGeneratedAt(new Date()).withColumns(columns);
Table putResponse = putTableDataModel(table.getId(), dataModel, adminAuthHeaders());
assertDataModel(dataModel, putResponse.getDataModel());
+ assertEquals("updatedTableDescription", putResponse.getDescription()); // Table description updated
// Get the table and validate the data model
- Table getResponse = getEntity(table.getId(), "dataModel", adminAuthHeaders());
+ Table getResponse = getEntity(table.getId(), "dataModel,columns,tags", adminAuthHeaders());
assertDataModel(dataModel, getResponse.getDataModel());
+ assertEquals("updatedTableDescription", getResponse.getDescription()); // Table description updated
+ assertColumns(columns, getResponse.getColumns()); // Column description updated
//
// Update again
diff --git a/ingestion/examples/workflows/redshift_dbt.json b/ingestion/examples/workflows/redshift_dbt.json
new file mode 100644
index 00000000000..3c125507a08
--- /dev/null
+++ b/ingestion/examples/workflows/redshift_dbt.json
@@ -0,0 +1,31 @@
+{
+ "source": {
+ "type": "redshift",
+ "config": {
+ "host_port": "cluster.name.region.redshift.amazonaws.com:5439",
+ "username": "username",
+ "password": "strong_password",
+ "database": "warehouse",
+ "service_name": "aws_redshift",
+ "filter_pattern": {
+ "excludes": [
+ "information_schema.*",
+ "[\\w]*event_vw.*"
+ ]
+ },
+ "dbt_manifest_file": "./examples/sample_data/dbt/manifest.json",
+ "dbt_catalog_file": "./examples/sample_data/dbt/catalog.json"
+ }
+ },
+ "sink": {
+ "type": "metadata-rest",
+ "config": {}
+ },
+ "metadata_server": {
+ "type": "metadata-server",
+ "config": {
+ "api_endpoint": "http://localhost:8585/api",
+ "auth_provider_type": "no-auth"
+ }
+ }
+}
\ No newline at end of file
diff --git a/ingestion/src/metadata/ingestion/models/ometa_table_db.py b/ingestion/src/metadata/ingestion/models/ometa_table_db.py
index bcc688d0d1b..781b300f5a7 100644
--- a/ingestion/src/metadata/ingestion/models/ometa_table_db.py
+++ b/ingestion/src/metadata/ingestion/models/ometa_table_db.py
@@ -13,7 +13,6 @@ from typing import Optional
from pydantic import BaseModel
from metadata.generated.schema.entity.data.database import Database
-from metadata.generated.schema.entity.data.dbtmodel import DbtModel
from metadata.generated.schema.entity.data.location import Location
from metadata.generated.schema.entity.data.table import Table
@@ -22,8 +21,3 @@ class OMetaDatabaseAndTable(BaseModel):
database: Database
table: Table
location: Optional[Location]
-
-
-class OMetaDatabaseAndModel(BaseModel):
- model: DbtModel
- database: Database
diff --git a/ingestion/src/metadata/ingestion/ometa/mixins/tableMixin.py b/ingestion/src/metadata/ingestion/ometa/mixins/tableMixin.py
index ce81cf925d3..9188eea7696 100644
--- a/ingestion/src/metadata/ingestion/ometa/mixins/tableMixin.py
+++ b/ingestion/src/metadata/ingestion/ometa/mixins/tableMixin.py
@@ -8,6 +8,7 @@ from typing import List
from metadata.generated.schema.entity.data.location import Location
from metadata.generated.schema.entity.data.table import (
+ DataModel,
Table,
TableData,
TableJoins,
@@ -71,6 +72,19 @@ class OMetaTableMixin:
)
return [TableProfile(**t) for t in resp["tableProfile"]]
+ def ingest_table_data_model(self, table: Table, data_model: DataModel) -> Table:
+ """
+ PUT data model for a table
+
+ :param table: Table Entity to update
+ :param data_model: Model to add
+ """
+ resp = self.client.put(
+ f"{self.get_suffix(Table)}/{table.id.__root__}/dataModel",
+ data=data_model.json(),
+ )
+ return Table(**resp)
+
def publish_table_usage(
self, table: Table, table_usage_request: TableUsageRequest
) -> None:
diff --git a/ingestion/src/metadata/ingestion/ometa/ometa_api.py b/ingestion/src/metadata/ingestion/ometa/ometa_api.py
index e2fc58022bf..d88f77ea05d 100644
--- a/ingestion/src/metadata/ingestion/ometa/ometa_api.py
+++ b/ingestion/src/metadata/ingestion/ometa/ometa_api.py
@@ -18,7 +18,6 @@ from metadata.generated.schema.api.lineage.addLineage import AddLineage
from metadata.generated.schema.entity.data.chart import Chart
from metadata.generated.schema.entity.data.dashboard import Dashboard
from metadata.generated.schema.entity.data.database import Database
-from metadata.generated.schema.entity.data.dbtmodel import DbtModel
from metadata.generated.schema.entity.data.location import Location
from metadata.generated.schema.entity.data.metrics import Metrics
from metadata.generated.schema.entity.data.mlmodel import MlModel
@@ -171,11 +170,6 @@ class OpenMetadata(OMetaLineageMixin, OMetaTableMixin, Generic[T, C]):
):
return "/topics"
- if issubclass(
- entity, get_args(Union[DbtModel, self.get_create_entity_type(DbtModel)])
- ):
- return "/dbtmodels"
-
if issubclass(entity, Metrics):
return "/metrics"
diff --git a/ingestion/src/metadata/ingestion/sink/elasticsearch.py b/ingestion/src/metadata/ingestion/sink/elasticsearch.py
index 43dd3d0f684..dbdd204dea9 100644
--- a/ingestion/src/metadata/ingestion/sink/elasticsearch.py
+++ b/ingestion/src/metadata/ingestion/sink/elasticsearch.py
@@ -23,7 +23,6 @@ from metadata.config.common import ConfigModel
from metadata.generated.schema.entity.data.chart import Chart
from metadata.generated.schema.entity.data.dashboard import Dashboard
from metadata.generated.schema.entity.data.database import Database
-from metadata.generated.schema.entity.data.dbtmodel import DbtModel
from metadata.generated.schema.entity.data.pipeline import Pipeline, Task
from metadata.generated.schema.entity.data.table import Column, Table
from metadata.generated.schema.entity.data.topic import Topic
@@ -209,14 +208,6 @@ class ElasticsearchSink(Sink[Entity]):
body=pipeline_doc.json(),
request_timeout=self.config.timeout,
)
- if isinstance(record, DbtModel):
- dbt_model_doc = self._create_dbt_model_es_doc(record)
- self.elasticsearch_client.index(
- index=self.config.dbt_index_name,
- id=str(dbt_model_doc.dbt_model_id),
- body=dbt_model_doc.json(),
- request_timeout=self.config.timeout,
- )
if hasattr(record.name, "__root__"):
self.status.records_written(record.name.__root__)
@@ -450,71 +441,6 @@ class ElasticsearchSink(Sink[Entity]):
return pipeline_doc
- def _create_dbt_model_es_doc(self, dbt_model: DbtModel):
- fqdn = dbt_model.fullyQualifiedName
- database = dbt_model.database.name
- dbt_model_name = dbt_model.name
- suggest = [
- {"input": [fqdn], "weight": 5},
- {"input": [dbt_model_name], "weight": 10},
- ]
- column_names = []
- column_descriptions = []
- tags = set()
-
- timestamp = time.time()
- tier = None
- for dbt_model_tag in dbt_model.tags:
- if "Tier" in dbt_model_tag.tagFQN:
- tier = dbt_model_tag.tagFQN
- else:
- tags.add(dbt_model_tag.tagFQN)
- self._parse_columns(
- dbt_model.columns, None, column_names, column_descriptions, tags
- )
-
- database_entity = self.metadata.get_by_id(
- entity=Database, entity_id=str(dbt_model.database.id.__root__)
- )
- service_entity = self.metadata.get_by_id(
- entity=DatabaseService, entity_id=str(database_entity.service.id.__root__)
- )
- dbt_model_owner = (
- str(dbt_model.owner.id.__root__) if dbt_model.owner is not None else ""
- )
- dbt_model_followers = []
- if dbt_model.followers:
- for follower in dbt_model.followers.__root__:
- dbt_model_followers.append(str(follower.id.__root__))
- dbt_node_type = None
- if hasattr(dbt_model.dbtNodeType, "name"):
- dbt_node_type = dbt_model.dbtNodeType.name
- change_descriptions = self._get_change_descriptions(
- DbtModel, dbt_model.id.__root__
- )
- dbt_model_doc = DbtModelESDocument(
- dbt_model_id=str(dbt_model.id.__root__),
- database=str(database_entity.name.__root__),
- service=service_entity.name,
- service_type=service_entity.serviceType.name,
- service_category="databaseService",
- name=dbt_model.name.__root__,
- suggest=suggest,
- description=dbt_model.description,
- dbt_model_type=dbt_node_type,
- last_updated_timestamp=timestamp,
- column_names=column_names,
- column_descriptions=column_descriptions,
- tier=tier,
- tags=list(tags),
- fqdn=fqdn,
- schema_description=None,
- owner=dbt_model_owner,
- followers=dbt_model_followers,
- change_descriptions=change_descriptions,
- )
- return dbt_model_doc
-
def _get_charts(self, chart_refs: Optional[List[entityReference.EntityReference]]):
charts = []
if chart_refs:
diff --git a/ingestion/src/metadata/ingestion/sink/metadata_rest.py b/ingestion/src/metadata/ingestion/sink/metadata_rest.py
index 3a08b13eff2..c1f645e6a3e 100644
--- a/ingestion/src/metadata/ingestion/sink/metadata_rest.py
+++ b/ingestion/src/metadata/ingestion/sink/metadata_rest.py
@@ -23,9 +23,6 @@ from metadata.generated.schema.api.data.createDashboard import (
from metadata.generated.schema.api.data.createDatabase import (
CreateDatabaseEntityRequest,
)
-from metadata.generated.schema.api.data.createDbtModel import (
- CreateDbtModelEntityRequest,
-)
from metadata.generated.schema.api.data.createLocation import (
CreateLocationEntityRequest,
)
@@ -46,10 +43,7 @@ from metadata.generated.schema.entity.teams.user import User
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.common import Entity, WorkflowContext
from metadata.ingestion.api.sink import Sink, SinkStatus
-from metadata.ingestion.models.ometa_table_db import (
- OMetaDatabaseAndModel,
- OMetaDatabaseAndTable,
-)
+from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable
from metadata.ingestion.models.table_metadata import Chart, Dashboard
from metadata.ingestion.ometa.client import APIError
from metadata.ingestion.ometa.ometa_api import OpenMetadata
@@ -129,8 +123,6 @@ class MetadataRestSink(Sink[Entity]):
self.write_users(record)
elif isinstance(record, MlModel):
self.write_ml_model(record)
- elif isinstance(record, OMetaDatabaseAndModel):
- self.write_dbt_models(record)
else:
logging.info(
f"Ignoring the record due to unknown Record type {type(record)}"
@@ -188,6 +180,11 @@ class MetadataRestSink(Sink[Entity]):
table_profile=db_and_table.table.tableProfile,
)
+ if db_and_table.table.dataModel is not None:
+ self.metadata.ingest_table_data_model(
+ table=created_table, data_model=db_and_table.table.dataModel
+ )
+
logger.info(
"Successfully ingested table {}.{}".format(
db_and_table.database.name.__root__,
@@ -207,44 +204,6 @@ class MetadataRestSink(Sink[Entity]):
logger.error(err)
self.status.failure(f"Table: {db_and_table.table.name.__root__}")
- def write_dbt_models(self, model_and_db: OMetaDatabaseAndModel):
- try:
- db_request = CreateDatabaseEntityRequest(
- name=model_and_db.database.name,
- description=model_and_db.database.description,
- service=EntityReference(
- id=model_and_db.database.service.id, type="databaseService"
- ),
- )
- db = self.metadata.create_or_update(db_request)
- model = model_and_db.model
- model_request = CreateDbtModelEntityRequest(
- name=model.name,
- description=model.description,
- viewDefinition=model.viewDefinition,
- database=db.id,
- dbtNodeType=model.dbtNodeType,
- columns=model.columns,
- )
- created_model = self.metadata.create_or_update(model_request)
- logger.info(
- "Successfully ingested model {}.{}".format(
- db.name.__root__, created_model.name.__root__
- )
- )
- self.status.records_written(
- f"Model: {db.name.__root__}.{created_model.name.__root__}"
- )
- except (APIError, ValidationError) as err:
- logger.error(
- "Failed to ingest model {} in database {} ".format(
- model_and_db.model.name.__root__,
- model_and_db.database.name.__root__,
- )
- )
- logger.error(err)
- self.status.failure(f"Model: {model_and_db.model.name.__root__}")
-
def write_topics(self, topic: CreateTopicEntityRequest) -> None:
try:
created_topic = self.metadata.create_or_update(topic)
diff --git a/ingestion/src/metadata/ingestion/source/metadata.py b/ingestion/src/metadata/ingestion/source/metadata.py
index fdcfd57a5f2..779367760d4 100644
--- a/ingestion/src/metadata/ingestion/source/metadata.py
+++ b/ingestion/src/metadata/ingestion/source/metadata.py
@@ -14,16 +14,14 @@ from dataclasses import dataclass, field
from typing import Iterable, List, Optional
from metadata.config.common import ConfigModel
+from metadata.generated.schema.entity.data.dashboard import Dashboard
from metadata.generated.schema.entity.data.pipeline import Pipeline
+from metadata.generated.schema.entity.data.table import Table
+from metadata.generated.schema.entity.data.topic import Topic
from metadata.ingestion.api.common import Entity, WorkflowContext
from metadata.ingestion.api.source import Source, SourceStatus
from metadata.ingestion.ometa.ometa_api import OpenMetadata
-
-from ...generated.schema.entity.data.dashboard import Dashboard
-from ...generated.schema.entity.data.dbtmodel import DbtModel
-from ...generated.schema.entity.data.table import Table
-from ...generated.schema.entity.data.topic import Topic
-from ..ometa.openmetadata_rest import MetadataServerConfig
+from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig
logger = logging.getLogger(__name__)
@@ -97,7 +95,6 @@ class MetadataSource(Source[Entity]):
yield from self.fetch_topic()
yield from self.fetch_dashboard()
yield from self.fetch_pipeline()
- yield from self.fetch_dbt_models()
def fetch_table(self) -> Table:
if self.config.include_tables:
@@ -180,29 +177,6 @@ class MetadataSource(Source[Entity]):
break
after = pipeline_entities.after
- def fetch_dbt_models(self) -> Pipeline:
- after = None
- while True:
- dbt_model_entities = self.metadata.list_entities(
- entity=DbtModel,
- fields=[
- "columns",
- "owner",
- "database",
- "tags",
- "followers",
- "viewDefinition",
- ],
- after=after,
- limit=self.config.limit_records,
- )
- for dbt_model in dbt_model_entities.entities:
- self.status.scanned_dashboard(dbt_model.name)
- yield dbt_model
- if dbt_model_entities.after is None:
- break
- after = dbt_model_entities.after
-
def get_status(self) -> SourceStatus:
return self.status
diff --git a/ingestion/src/metadata/ingestion/source/sql_source.py b/ingestion/src/metadata/ingestion/source/sql_source.py
index 09a7f8e6545..363d8195f44 100644
--- a/ingestion/src/metadata/ingestion/source/sql_source.py
+++ b/ingestion/src/metadata/ingestion/source/sql_source.py
@@ -8,7 +8,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
+import json
import logging
import re
import traceback
@@ -28,6 +28,8 @@ from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.table import (
Column,
Constraint,
+ DataModel,
+ ModelType,
Table,
TableData,
TableProfile,
@@ -87,6 +89,8 @@ class SQLConnectionConfig(ConfigModel):
data_profiler_offset: Optional[int] = 0
data_profiler_limit: Optional[int] = 50000
filter_pattern: IncludeFilterPattern = IncludeFilterPattern.allow_all()
+ dbt_manifest_file: Optional[str] = None
+ dbt_catalog_file: Optional[str] = None
@abstractmethod
def get_connection_url(self):
@@ -136,6 +140,11 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
self.engine = create_engine(self.connection_string, **self.sql_config.options)
self.connection = self.engine.connect()
self.data_profiler = None
+ self.data_models = {}
+ if self.config.dbt_catalog_file is not None:
+ self.dbt_catalog = json.load(open(self.config.dbt_catalog_file, "r"))
+ if self.config.dbt_manifest_file is not None:
+ self.dbt_manifest = json.load(open(self.config.dbt_manifest_file, "r"))
def _instantiate_profiler(self):
try:
@@ -155,7 +164,7 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
)
def prepare(self):
- pass
+ self._parse_data_model()
@classmethod
def create(
@@ -244,6 +253,11 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
table_entity.tableProfile = (
[profile] if profile is not None else None
)
+ # check if we have any model to associate with
+ table_fqn = f"{schema}.{table_name}"
+ if table_fqn in self.data_models:
+ model = self.data_models[table_fqn]
+ table_entity.dataModel = model
table_and_db = OMetaDatabaseAndTable(
table=table_entity, database=self._get_database(schema)
@@ -314,6 +328,85 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
)
continue
+ def _parse_data_model(self) -> DataModel:
+ logger.info("Parsing Data Models")
+ if (
+ self.config.dbt_manifest_file is not None
+ and self.config.dbt_catalog_file is not None
+ ):
+ manifest_nodes = self.dbt_manifest["nodes"]
+ manifest_sources = self.dbt_manifest["sources"]
+ manifest_entities = {**manifest_nodes, **manifest_sources}
+ catalog_nodes = self.dbt_catalog["nodes"]
+ catalog_sources = self.dbt_catalog["sources"]
+ catalog_entities = {**catalog_nodes, **catalog_sources}
+
+ for key, mnode in manifest_entities.items():
+ name = mnode["alias"] if "alias" in mnode.keys() else mnode["name"]
+ cnode = catalog_entities.get(key)
+ if cnode is not None:
+ columns = self._parse_data_model_columns(name, mnode, cnode)
+ else:
+ columns = []
+ if mnode["resource_type"] == "test":
+ continue
+ upstream_nodes = self._parse_data_model_upstream(mnode)
+ model_name = (
+ mnode["alias"] if "alias" in mnode.keys() else mnode["name"]
+ )
+ description = mnode.get("description", "")
+ schema = mnode["schema"]
+ path = f"{mnode['root_path']}/{mnode['original_file_path']}"
+ model = DataModel(
+ modelType=ModelType.DBT,
+ description=description,
+ path=path,
+ rawSql=mnode["raw_sql"] if "raw_sql" in mnode else None,
+ sql=mnode["compiled_sql"] if "compiled_sql" in mnode else None,
+ columns=columns,
+ upstream=upstream_nodes,
+ )
+ model_fqdn = f"{schema}.{model_name}"
+ self.data_models[model_fqdn] = model
+
+ def _parse_data_model_upstream(self, mnode):
+ upstream_nodes = []
+ if "depends_on" in mnode and "nodes" in mnode["depends_on"]:
+ for node in mnode["depends_on"]["nodes"]:
+ node_type, database, table = node.split(".")
+ table_fqn = f"{self.config.service_name}.{database}.{table}"
+ upstream_nodes.append(table_fqn)
+ return upstream_nodes
+
+ def _parse_data_model_columns(
+ self, model_name: str, mnode: Dict, cnode: Dict
+ ) -> [Column]:
+ columns = []
+ ccolumns = cnode.get("columns")
+ manifest_columns = mnode.get("columns", {})
+ for key in ccolumns:
+ ccolumn = ccolumns[key]
+ try:
+ ctype = ccolumn["type"]
+ col_type = get_column_type(self.status, model_name, ctype)
+ description = manifest_columns.get(key.lower(), {}).get(
+ "description", None
+ )
+ if description is None:
+ description = ccolumn.get("comment", None)
+ col = Column(
+ name=ccolumn["name"].lower(),
+ description=description,
+ dataType=col_type,
+ dataLength=1,
+ ordinalPosition=ccolumn["index"],
+ )
+ columns.append(col)
+ except Exception as e:
+ logger.error(f"Failed to parse column type due to {e}")
+
+ return columns
+
def _get_database(self, schema: str) -> Database:
return Database(
name=schema,