From c8c57de37e66f19718a06e842e607e6d8e08b3a4 Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Thu, 9 Dec 2021 20:32:17 -0800 Subject: [PATCH] Issue 1658: Ingestion changes to add dbtModel as part of Table entity (#1659) * Fix #1658: Ingestion changes to add dbtModel as part of Table entity * Fixes #1652 Remove DBTModel as top level entity and capture information from DBT in existing Table entity Co-authored-by: sureshms --- .../catalog/jdbi3/TableRepository.java | 19 +++- .../resources/databases/TableResource.java | 2 +- .../openmetadata/catalog/util/EntityUtil.java | 3 + .../json/schema/entity/data/table.json | 4 + .../databases/TableResourceTest.java | 19 +++- .../examples/workflows/redshift_dbt.json | 31 ++++++ .../ingestion/models/ometa_table_db.py | 6 -- .../ingestion/ometa/mixins/tableMixin.py | 14 +++ .../src/metadata/ingestion/ometa/ometa_api.py | 6 -- .../metadata/ingestion/sink/elasticsearch.py | 74 -------------- .../metadata/ingestion/sink/metadata_rest.py | 53 ++-------- .../src/metadata/ingestion/source/metadata.py | 34 +------ .../metadata/ingestion/source/sql_source.py | 97 ++++++++++++++++++- 13 files changed, 190 insertions(+), 172 deletions(-) create mode 100644 ingestion/examples/workflows/redshift_dbt.json diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/TableRepository.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/TableRepository.java index 1889d705e23..297747854c6 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/TableRepository.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/jdbi3/TableRepository.java @@ -237,6 +237,22 @@ public class TableRepository extends EntityRepository { public Table addDataModel(UUID tableId, DataModel dataModel) throws IOException, ParseException { Table table = dao.tableDAO().findEntityById(tableId); table.withDataModel(dataModel); + + // Carry forward the table description from the model to table entity, if empty + if (table.getDescription() == null || table.getDescription().isEmpty()) { + table.setDescription(dataModel.getDescription()); + } + // Carry forward the column description from the model to table columns, if empty + for (Column modelColumn : Optional.ofNullable(dataModel.getColumns()).orElse(Collections.emptyList())) { + Column stored = table.getColumns().stream().filter(c -> + EntityUtil.columnNameMatch.test(c, modelColumn)).findAny().orElse(null); + if (stored == null) { + continue; + } + if (stored.getDescription() == null || stored.getDescription().isEmpty()) { + stored.setDescription(modelColumn.getDescription()); + } + } dao.tableDAO().update(table.getId(), JsonUtils.pojoToJson(table)); setFields(table, Fields.EMPTY_FIELDS); return table; @@ -785,8 +801,7 @@ public class TableRepository extends EntityRepository
{ // Carry forward the user generated metadata from existing columns to new columns for (Column updated : updatedColumns) { // Find stored column matching name, data type and ordinal position - Column stored = origColumns.stream().filter(c -> - EntityUtil.columnMatch.test(c, updated)).findAny().orElse(null); + Column stored = origColumns.stream().filter(c -> columnMatch.test(c, updated)).findAny().orElse(null); if (stored == null) { // New column added continue; } diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/databases/TableResource.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/databases/TableResource.java index a5599c503aa..89e62f829a9 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/databases/TableResource.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/databases/TableResource.java @@ -420,7 +420,7 @@ public class TableResource { @Path("/{id}/dataModel") @Operation(summary = "Add data modeling information to a table", tags = "tables", description = "Add data modeling (such as DBT model) information on how the table was created to the table.") - public Table addQuery(@Context UriInfo uriInfo, + public Table addDataModel(@Context UriInfo uriInfo, @Context SecurityContext securityContext, @Parameter(description = "Id of the table", schema = @Schema(type = "string")) @PathParam("id") String id, DataModel dataModel) throws IOException, ParseException { diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/EntityUtil.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/EntityUtil.java index c428f218602..ade0ee286ea 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/EntityUtil.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/util/EntityUtil.java @@ -100,6 +100,9 @@ public final class EntityUtil { column1.getArrayDataType() == column2.getArrayDataType() && Objects.equals(column1.getOrdinalPosition(), column2.getOrdinalPosition()); + public static BiPredicate columnNameMatch = (column1, column2) -> + column1.getName().equals(column2.getName()); + public static BiPredicate tableConstraintMatch = (constraint1, constraint2) -> constraint1.getConstraintType() == constraint2.getConstraintType() && constraint1.getColumns().equals(constraint2.getColumns()); diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/data/table.json b/catalog-rest-service/src/main/resources/json/schema/entity/data/table.json index 71c553ee1f6..812b1b8ad0a 100644 --- a/catalog-rest-service/src/main/resources/json/schema/entity/data/table.json +++ b/catalog-rest-service/src/main/resources/json/schema/entity/data/table.json @@ -395,6 +395,10 @@ "modelType" : { "$ref" : "#/definitions/modelType" }, + "description" : { + "description": "Description of the Table from the model", + "type" : "string" + }, "path" : { "description": "Path to sql definition file.", "type" : "string" diff --git a/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/databases/TableResourceTest.java b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/databases/TableResourceTest.java index 8daced63e51..74011a3f722 100644 --- a/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/databases/TableResourceTest.java +++ b/catalog-rest-service/src/test/java/org/openmetadata/catalog/resources/databases/TableResourceTest.java @@ -811,19 +811,30 @@ public class TableResourceTest extends EntityResourceTest
{ @Test public void put_tableDataModel(TestInfo test) throws IOException { - Table table = createAndCheckEntity(create(test), adminAuthHeaders()); + List columns = Arrays.asList( + getColumn("c1", BIGINT, USER_ADDRESS_TAG_LABEL).withDescription(null), + getColumn("c2", ColumnDataType.VARCHAR, USER_ADDRESS_TAG_LABEL) + .withDataLength(10).withDescription(null)); + Table table = createAndCheckEntity(create(test).withColumns(columns).withDescription(null), adminAuthHeaders()); // - // Update the data model and validate the response + // Update the data model and validate the response. + // Make sure table and column description is carried forward if the original entity had them as null // + columns.get(0).setDescription("updatedDescription"); + columns.get(1).setDescription("updatedDescription"); String query = "select * from test;"; - DataModel dataModel = new DataModel().withModelType(ModelType.DBT).withSql(query).withGeneratedAt(new Date()); + DataModel dataModel = new DataModel().withDescription("updatedTableDescription").withModelType(ModelType.DBT) + .withSql(query).withGeneratedAt(new Date()).withColumns(columns); Table putResponse = putTableDataModel(table.getId(), dataModel, adminAuthHeaders()); assertDataModel(dataModel, putResponse.getDataModel()); + assertEquals("updatedTableDescription", putResponse.getDescription()); // Table description updated // Get the table and validate the data model - Table getResponse = getEntity(table.getId(), "dataModel", adminAuthHeaders()); + Table getResponse = getEntity(table.getId(), "dataModel,columns,tags", adminAuthHeaders()); assertDataModel(dataModel, getResponse.getDataModel()); + assertEquals("updatedTableDescription", getResponse.getDescription()); // Table description updated + assertColumns(columns, getResponse.getColumns()); // Column description updated // // Update again diff --git a/ingestion/examples/workflows/redshift_dbt.json b/ingestion/examples/workflows/redshift_dbt.json new file mode 100644 index 00000000000..3c125507a08 --- /dev/null +++ b/ingestion/examples/workflows/redshift_dbt.json @@ -0,0 +1,31 @@ +{ + "source": { + "type": "redshift", + "config": { + "host_port": "cluster.name.region.redshift.amazonaws.com:5439", + "username": "username", + "password": "strong_password", + "database": "warehouse", + "service_name": "aws_redshift", + "filter_pattern": { + "excludes": [ + "information_schema.*", + "[\\w]*event_vw.*" + ] + }, + "dbt_manifest_file": "./examples/sample_data/dbt/manifest.json", + "dbt_catalog_file": "./examples/sample_data/dbt/catalog.json" + } + }, + "sink": { + "type": "metadata-rest", + "config": {} + }, + "metadata_server": { + "type": "metadata-server", + "config": { + "api_endpoint": "http://localhost:8585/api", + "auth_provider_type": "no-auth" + } + } +} \ No newline at end of file diff --git a/ingestion/src/metadata/ingestion/models/ometa_table_db.py b/ingestion/src/metadata/ingestion/models/ometa_table_db.py index bcc688d0d1b..781b300f5a7 100644 --- a/ingestion/src/metadata/ingestion/models/ometa_table_db.py +++ b/ingestion/src/metadata/ingestion/models/ometa_table_db.py @@ -13,7 +13,6 @@ from typing import Optional from pydantic import BaseModel from metadata.generated.schema.entity.data.database import Database -from metadata.generated.schema.entity.data.dbtmodel import DbtModel from metadata.generated.schema.entity.data.location import Location from metadata.generated.schema.entity.data.table import Table @@ -22,8 +21,3 @@ class OMetaDatabaseAndTable(BaseModel): database: Database table: Table location: Optional[Location] - - -class OMetaDatabaseAndModel(BaseModel): - model: DbtModel - database: Database diff --git a/ingestion/src/metadata/ingestion/ometa/mixins/tableMixin.py b/ingestion/src/metadata/ingestion/ometa/mixins/tableMixin.py index ce81cf925d3..9188eea7696 100644 --- a/ingestion/src/metadata/ingestion/ometa/mixins/tableMixin.py +++ b/ingestion/src/metadata/ingestion/ometa/mixins/tableMixin.py @@ -8,6 +8,7 @@ from typing import List from metadata.generated.schema.entity.data.location import Location from metadata.generated.schema.entity.data.table import ( + DataModel, Table, TableData, TableJoins, @@ -71,6 +72,19 @@ class OMetaTableMixin: ) return [TableProfile(**t) for t in resp["tableProfile"]] + def ingest_table_data_model(self, table: Table, data_model: DataModel) -> Table: + """ + PUT data model for a table + + :param table: Table Entity to update + :param data_model: Model to add + """ + resp = self.client.put( + f"{self.get_suffix(Table)}/{table.id.__root__}/dataModel", + data=data_model.json(), + ) + return Table(**resp) + def publish_table_usage( self, table: Table, table_usage_request: TableUsageRequest ) -> None: diff --git a/ingestion/src/metadata/ingestion/ometa/ometa_api.py b/ingestion/src/metadata/ingestion/ometa/ometa_api.py index e2fc58022bf..d88f77ea05d 100644 --- a/ingestion/src/metadata/ingestion/ometa/ometa_api.py +++ b/ingestion/src/metadata/ingestion/ometa/ometa_api.py @@ -18,7 +18,6 @@ from metadata.generated.schema.api.lineage.addLineage import AddLineage from metadata.generated.schema.entity.data.chart import Chart from metadata.generated.schema.entity.data.dashboard import Dashboard from metadata.generated.schema.entity.data.database import Database -from metadata.generated.schema.entity.data.dbtmodel import DbtModel from metadata.generated.schema.entity.data.location import Location from metadata.generated.schema.entity.data.metrics import Metrics from metadata.generated.schema.entity.data.mlmodel import MlModel @@ -171,11 +170,6 @@ class OpenMetadata(OMetaLineageMixin, OMetaTableMixin, Generic[T, C]): ): return "/topics" - if issubclass( - entity, get_args(Union[DbtModel, self.get_create_entity_type(DbtModel)]) - ): - return "/dbtmodels" - if issubclass(entity, Metrics): return "/metrics" diff --git a/ingestion/src/metadata/ingestion/sink/elasticsearch.py b/ingestion/src/metadata/ingestion/sink/elasticsearch.py index 43dd3d0f684..dbdd204dea9 100644 --- a/ingestion/src/metadata/ingestion/sink/elasticsearch.py +++ b/ingestion/src/metadata/ingestion/sink/elasticsearch.py @@ -23,7 +23,6 @@ from metadata.config.common import ConfigModel from metadata.generated.schema.entity.data.chart import Chart from metadata.generated.schema.entity.data.dashboard import Dashboard from metadata.generated.schema.entity.data.database import Database -from metadata.generated.schema.entity.data.dbtmodel import DbtModel from metadata.generated.schema.entity.data.pipeline import Pipeline, Task from metadata.generated.schema.entity.data.table import Column, Table from metadata.generated.schema.entity.data.topic import Topic @@ -209,14 +208,6 @@ class ElasticsearchSink(Sink[Entity]): body=pipeline_doc.json(), request_timeout=self.config.timeout, ) - if isinstance(record, DbtModel): - dbt_model_doc = self._create_dbt_model_es_doc(record) - self.elasticsearch_client.index( - index=self.config.dbt_index_name, - id=str(dbt_model_doc.dbt_model_id), - body=dbt_model_doc.json(), - request_timeout=self.config.timeout, - ) if hasattr(record.name, "__root__"): self.status.records_written(record.name.__root__) @@ -450,71 +441,6 @@ class ElasticsearchSink(Sink[Entity]): return pipeline_doc - def _create_dbt_model_es_doc(self, dbt_model: DbtModel): - fqdn = dbt_model.fullyQualifiedName - database = dbt_model.database.name - dbt_model_name = dbt_model.name - suggest = [ - {"input": [fqdn], "weight": 5}, - {"input": [dbt_model_name], "weight": 10}, - ] - column_names = [] - column_descriptions = [] - tags = set() - - timestamp = time.time() - tier = None - for dbt_model_tag in dbt_model.tags: - if "Tier" in dbt_model_tag.tagFQN: - tier = dbt_model_tag.tagFQN - else: - tags.add(dbt_model_tag.tagFQN) - self._parse_columns( - dbt_model.columns, None, column_names, column_descriptions, tags - ) - - database_entity = self.metadata.get_by_id( - entity=Database, entity_id=str(dbt_model.database.id.__root__) - ) - service_entity = self.metadata.get_by_id( - entity=DatabaseService, entity_id=str(database_entity.service.id.__root__) - ) - dbt_model_owner = ( - str(dbt_model.owner.id.__root__) if dbt_model.owner is not None else "" - ) - dbt_model_followers = [] - if dbt_model.followers: - for follower in dbt_model.followers.__root__: - dbt_model_followers.append(str(follower.id.__root__)) - dbt_node_type = None - if hasattr(dbt_model.dbtNodeType, "name"): - dbt_node_type = dbt_model.dbtNodeType.name - change_descriptions = self._get_change_descriptions( - DbtModel, dbt_model.id.__root__ - ) - dbt_model_doc = DbtModelESDocument( - dbt_model_id=str(dbt_model.id.__root__), - database=str(database_entity.name.__root__), - service=service_entity.name, - service_type=service_entity.serviceType.name, - service_category="databaseService", - name=dbt_model.name.__root__, - suggest=suggest, - description=dbt_model.description, - dbt_model_type=dbt_node_type, - last_updated_timestamp=timestamp, - column_names=column_names, - column_descriptions=column_descriptions, - tier=tier, - tags=list(tags), - fqdn=fqdn, - schema_description=None, - owner=dbt_model_owner, - followers=dbt_model_followers, - change_descriptions=change_descriptions, - ) - return dbt_model_doc - def _get_charts(self, chart_refs: Optional[List[entityReference.EntityReference]]): charts = [] if chart_refs: diff --git a/ingestion/src/metadata/ingestion/sink/metadata_rest.py b/ingestion/src/metadata/ingestion/sink/metadata_rest.py index 3a08b13eff2..c1f645e6a3e 100644 --- a/ingestion/src/metadata/ingestion/sink/metadata_rest.py +++ b/ingestion/src/metadata/ingestion/sink/metadata_rest.py @@ -23,9 +23,6 @@ from metadata.generated.schema.api.data.createDashboard import ( from metadata.generated.schema.api.data.createDatabase import ( CreateDatabaseEntityRequest, ) -from metadata.generated.schema.api.data.createDbtModel import ( - CreateDbtModelEntityRequest, -) from metadata.generated.schema.api.data.createLocation import ( CreateLocationEntityRequest, ) @@ -46,10 +43,7 @@ from metadata.generated.schema.entity.teams.user import User from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.common import Entity, WorkflowContext from metadata.ingestion.api.sink import Sink, SinkStatus -from metadata.ingestion.models.ometa_table_db import ( - OMetaDatabaseAndModel, - OMetaDatabaseAndTable, -) +from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable from metadata.ingestion.models.table_metadata import Chart, Dashboard from metadata.ingestion.ometa.client import APIError from metadata.ingestion.ometa.ometa_api import OpenMetadata @@ -129,8 +123,6 @@ class MetadataRestSink(Sink[Entity]): self.write_users(record) elif isinstance(record, MlModel): self.write_ml_model(record) - elif isinstance(record, OMetaDatabaseAndModel): - self.write_dbt_models(record) else: logging.info( f"Ignoring the record due to unknown Record type {type(record)}" @@ -188,6 +180,11 @@ class MetadataRestSink(Sink[Entity]): table_profile=db_and_table.table.tableProfile, ) + if db_and_table.table.dataModel is not None: + self.metadata.ingest_table_data_model( + table=created_table, data_model=db_and_table.table.dataModel + ) + logger.info( "Successfully ingested table {}.{}".format( db_and_table.database.name.__root__, @@ -207,44 +204,6 @@ class MetadataRestSink(Sink[Entity]): logger.error(err) self.status.failure(f"Table: {db_and_table.table.name.__root__}") - def write_dbt_models(self, model_and_db: OMetaDatabaseAndModel): - try: - db_request = CreateDatabaseEntityRequest( - name=model_and_db.database.name, - description=model_and_db.database.description, - service=EntityReference( - id=model_and_db.database.service.id, type="databaseService" - ), - ) - db = self.metadata.create_or_update(db_request) - model = model_and_db.model - model_request = CreateDbtModelEntityRequest( - name=model.name, - description=model.description, - viewDefinition=model.viewDefinition, - database=db.id, - dbtNodeType=model.dbtNodeType, - columns=model.columns, - ) - created_model = self.metadata.create_or_update(model_request) - logger.info( - "Successfully ingested model {}.{}".format( - db.name.__root__, created_model.name.__root__ - ) - ) - self.status.records_written( - f"Model: {db.name.__root__}.{created_model.name.__root__}" - ) - except (APIError, ValidationError) as err: - logger.error( - "Failed to ingest model {} in database {} ".format( - model_and_db.model.name.__root__, - model_and_db.database.name.__root__, - ) - ) - logger.error(err) - self.status.failure(f"Model: {model_and_db.model.name.__root__}") - def write_topics(self, topic: CreateTopicEntityRequest) -> None: try: created_topic = self.metadata.create_or_update(topic) diff --git a/ingestion/src/metadata/ingestion/source/metadata.py b/ingestion/src/metadata/ingestion/source/metadata.py index fdcfd57a5f2..779367760d4 100644 --- a/ingestion/src/metadata/ingestion/source/metadata.py +++ b/ingestion/src/metadata/ingestion/source/metadata.py @@ -14,16 +14,14 @@ from dataclasses import dataclass, field from typing import Iterable, List, Optional from metadata.config.common import ConfigModel +from metadata.generated.schema.entity.data.dashboard import Dashboard from metadata.generated.schema.entity.data.pipeline import Pipeline +from metadata.generated.schema.entity.data.table import Table +from metadata.generated.schema.entity.data.topic import Topic from metadata.ingestion.api.common import Entity, WorkflowContext from metadata.ingestion.api.source import Source, SourceStatus from metadata.ingestion.ometa.ometa_api import OpenMetadata - -from ...generated.schema.entity.data.dashboard import Dashboard -from ...generated.schema.entity.data.dbtmodel import DbtModel -from ...generated.schema.entity.data.table import Table -from ...generated.schema.entity.data.topic import Topic -from ..ometa.openmetadata_rest import MetadataServerConfig +from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig logger = logging.getLogger(__name__) @@ -97,7 +95,6 @@ class MetadataSource(Source[Entity]): yield from self.fetch_topic() yield from self.fetch_dashboard() yield from self.fetch_pipeline() - yield from self.fetch_dbt_models() def fetch_table(self) -> Table: if self.config.include_tables: @@ -180,29 +177,6 @@ class MetadataSource(Source[Entity]): break after = pipeline_entities.after - def fetch_dbt_models(self) -> Pipeline: - after = None - while True: - dbt_model_entities = self.metadata.list_entities( - entity=DbtModel, - fields=[ - "columns", - "owner", - "database", - "tags", - "followers", - "viewDefinition", - ], - after=after, - limit=self.config.limit_records, - ) - for dbt_model in dbt_model_entities.entities: - self.status.scanned_dashboard(dbt_model.name) - yield dbt_model - if dbt_model_entities.after is None: - break - after = dbt_model_entities.after - def get_status(self) -> SourceStatus: return self.status diff --git a/ingestion/src/metadata/ingestion/source/sql_source.py b/ingestion/src/metadata/ingestion/source/sql_source.py index 09a7f8e6545..363d8195f44 100644 --- a/ingestion/src/metadata/ingestion/source/sql_source.py +++ b/ingestion/src/metadata/ingestion/source/sql_source.py @@ -8,7 +8,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import json import logging import re import traceback @@ -28,6 +28,8 @@ from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.data.table import ( Column, Constraint, + DataModel, + ModelType, Table, TableData, TableProfile, @@ -87,6 +89,8 @@ class SQLConnectionConfig(ConfigModel): data_profiler_offset: Optional[int] = 0 data_profiler_limit: Optional[int] = 50000 filter_pattern: IncludeFilterPattern = IncludeFilterPattern.allow_all() + dbt_manifest_file: Optional[str] = None + dbt_catalog_file: Optional[str] = None @abstractmethod def get_connection_url(self): @@ -136,6 +140,11 @@ class SQLSource(Source[OMetaDatabaseAndTable]): self.engine = create_engine(self.connection_string, **self.sql_config.options) self.connection = self.engine.connect() self.data_profiler = None + self.data_models = {} + if self.config.dbt_catalog_file is not None: + self.dbt_catalog = json.load(open(self.config.dbt_catalog_file, "r")) + if self.config.dbt_manifest_file is not None: + self.dbt_manifest = json.load(open(self.config.dbt_manifest_file, "r")) def _instantiate_profiler(self): try: @@ -155,7 +164,7 @@ class SQLSource(Source[OMetaDatabaseAndTable]): ) def prepare(self): - pass + self._parse_data_model() @classmethod def create( @@ -244,6 +253,11 @@ class SQLSource(Source[OMetaDatabaseAndTable]): table_entity.tableProfile = ( [profile] if profile is not None else None ) + # check if we have any model to associate with + table_fqn = f"{schema}.{table_name}" + if table_fqn in self.data_models: + model = self.data_models[table_fqn] + table_entity.dataModel = model table_and_db = OMetaDatabaseAndTable( table=table_entity, database=self._get_database(schema) @@ -314,6 +328,85 @@ class SQLSource(Source[OMetaDatabaseAndTable]): ) continue + def _parse_data_model(self) -> DataModel: + logger.info("Parsing Data Models") + if ( + self.config.dbt_manifest_file is not None + and self.config.dbt_catalog_file is not None + ): + manifest_nodes = self.dbt_manifest["nodes"] + manifest_sources = self.dbt_manifest["sources"] + manifest_entities = {**manifest_nodes, **manifest_sources} + catalog_nodes = self.dbt_catalog["nodes"] + catalog_sources = self.dbt_catalog["sources"] + catalog_entities = {**catalog_nodes, **catalog_sources} + + for key, mnode in manifest_entities.items(): + name = mnode["alias"] if "alias" in mnode.keys() else mnode["name"] + cnode = catalog_entities.get(key) + if cnode is not None: + columns = self._parse_data_model_columns(name, mnode, cnode) + else: + columns = [] + if mnode["resource_type"] == "test": + continue + upstream_nodes = self._parse_data_model_upstream(mnode) + model_name = ( + mnode["alias"] if "alias" in mnode.keys() else mnode["name"] + ) + description = mnode.get("description", "") + schema = mnode["schema"] + path = f"{mnode['root_path']}/{mnode['original_file_path']}" + model = DataModel( + modelType=ModelType.DBT, + description=description, + path=path, + rawSql=mnode["raw_sql"] if "raw_sql" in mnode else None, + sql=mnode["compiled_sql"] if "compiled_sql" in mnode else None, + columns=columns, + upstream=upstream_nodes, + ) + model_fqdn = f"{schema}.{model_name}" + self.data_models[model_fqdn] = model + + def _parse_data_model_upstream(self, mnode): + upstream_nodes = [] + if "depends_on" in mnode and "nodes" in mnode["depends_on"]: + for node in mnode["depends_on"]["nodes"]: + node_type, database, table = node.split(".") + table_fqn = f"{self.config.service_name}.{database}.{table}" + upstream_nodes.append(table_fqn) + return upstream_nodes + + def _parse_data_model_columns( + self, model_name: str, mnode: Dict, cnode: Dict + ) -> [Column]: + columns = [] + ccolumns = cnode.get("columns") + manifest_columns = mnode.get("columns", {}) + for key in ccolumns: + ccolumn = ccolumns[key] + try: + ctype = ccolumn["type"] + col_type = get_column_type(self.status, model_name, ctype) + description = manifest_columns.get(key.lower(), {}).get( + "description", None + ) + if description is None: + description = ccolumn.get("comment", None) + col = Column( + name=ccolumn["name"].lower(), + description=description, + dataType=col_type, + dataLength=1, + ordinalPosition=ccolumn["index"], + ) + columns.append(col) + except Exception as e: + logger.error(f"Failed to parse column type due to {e}") + + return columns + def _get_database(self, schema: str) -> Database: return Database( name=schema,