Fix #1690: Ingestion: Apply DBT models to views (#1691)

* Fix #1690: Ingestion: Apply DBT models to views

* Fix #1690: Ingestion: Apply DBT models to views
This commit is contained in:
Sriharsha Chintalapani 2021-12-12 07:23:08 -08:00 committed by GitHub
parent a7c8056bf5
commit a9198c72f2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 16 additions and 15 deletions

View File

@ -2,16 +2,14 @@
"source": {
"type": "redshift",
"config": {
"host_port": "cluster.name.region.redshift.amazonaws.com:5439",
"host_port": "cluster.name.region.redshift.amazonaws.com:5439",
"username": "username",
"password": "strong_password",
"database": "warehouse",
"service_name": "aws_redshift",
"generate_sample_data": "false",
"filter_pattern": {
"excludes": [
"information_schema.*",
"[\\w]*event_vw.*"
]
"excludes": ["information_schema.*", "[\\w]*event_vw.*"]
},
"dbt_manifest_file": "./examples/sample_data/dbt/manifest.json",
"dbt_catalog_file": "./examples/sample_data/dbt/catalog.json"

View File

@ -32,10 +32,10 @@ from metadata.generated.schema.api.data.createPipeline import (
)
from metadata.generated.schema.api.data.createTable import CreateTableEntityRequest
from metadata.generated.schema.api.data.createTopic import CreateTopicEntityRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineage
from metadata.generated.schema.api.policies.createPolicy import (
CreatePolicyEntityRequest,
)
from metadata.generated.schema.api.lineage.addLineage import AddLineage
from metadata.generated.schema.api.teams.createTeam import CreateTeamEntityRequest
from metadata.generated.schema.api.teams.createUser import CreateUserEntityRequest
from metadata.generated.schema.entity.data.chart import ChartType
@ -149,10 +149,9 @@ class MetadataRestSink(Sink[Entity]):
name=db_and_table.table.name,
tableType=db_and_table.table.tableType,
columns=db_and_table.table.columns,
description=db_and_table.table.description,
description=db_and_table.table.description.strip(),
database=db.id,
)
if db_and_table.table.viewDefinition:
table_request.viewDefinition = (
db_and_table.table.viewDefinition.__root__
@ -162,7 +161,7 @@ class MetadataRestSink(Sink[Entity]):
if db_and_table.location is not None:
location_request = CreateLocationEntityRequest(
name=db_and_table.location.name,
description=db_and_table.location.description,
description=db_and_table.location.description.strip(),
service=EntityReference(
id=db_and_table.location.service.id,
type="storageService",

View File

@ -254,10 +254,7 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
[profile] if profile is not None else None
)
# check if we have any model to associate with
table_fqn = f"{schema}.{table_name}"
if table_fqn in self.data_models:
model = self.data_models[table_fqn]
table_entity.dataModel = model
table_entity.dataModel = self._get_data_model(schema, table_name)
table_and_db = OMetaDatabaseAndTable(
table=table_entity, database=self._get_database(schema)
@ -316,7 +313,7 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
if self.sql_config.generate_sample_data:
table_data = self.fetch_sample_data(schema, view_name)
table.sampleData = table_data
table.dataModel = self._get_data_model(schema, view_name)
table_and_db = OMetaDatabaseAndTable(
table=table, database=self._get_database(schema)
)
@ -328,7 +325,7 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
)
continue
def _parse_data_model(self) -> DataModel:
def _parse_data_model(self):
logger.info("Parsing Data Models")
if (
self.config.dbt_manifest_file is not None
@ -378,6 +375,13 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
upstream_nodes.append(table_fqn)
return upstream_nodes
def _get_data_model(self, schema, table_name):
table_fqn = f"{schema}.{table_name}"
if table_fqn in self.data_models:
model = self.data_models[table_fqn]
return model
return None
def _parse_data_model_columns(
self, model_name: str, mnode: Dict, cnode: Dict
) -> [Column]: