mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-08-22 07:58:06 +00:00
Issue 1658: Ingestion changes to add dbtModel as part of Table entity (#1659)
* Fix #1658: Ingestion changes to add dbtModel as part of Table entity * Fixes #1652 Remove DBTModel as top level entity and capture information from DBT in existing Table entity Co-authored-by: sureshms <suresh@getcollate.io>
This commit is contained in:
parent
5d74a8d4f4
commit
c8c57de37e
@ -237,6 +237,22 @@ public class TableRepository extends EntityRepository<Table> {
|
|||||||
public Table addDataModel(UUID tableId, DataModel dataModel) throws IOException, ParseException {
|
public Table addDataModel(UUID tableId, DataModel dataModel) throws IOException, ParseException {
|
||||||
Table table = dao.tableDAO().findEntityById(tableId);
|
Table table = dao.tableDAO().findEntityById(tableId);
|
||||||
table.withDataModel(dataModel);
|
table.withDataModel(dataModel);
|
||||||
|
|
||||||
|
// Carry forward the table description from the model to table entity, if empty
|
||||||
|
if (table.getDescription() == null || table.getDescription().isEmpty()) {
|
||||||
|
table.setDescription(dataModel.getDescription());
|
||||||
|
}
|
||||||
|
// Carry forward the column description from the model to table columns, if empty
|
||||||
|
for (Column modelColumn : Optional.ofNullable(dataModel.getColumns()).orElse(Collections.emptyList())) {
|
||||||
|
Column stored = table.getColumns().stream().filter(c ->
|
||||||
|
EntityUtil.columnNameMatch.test(c, modelColumn)).findAny().orElse(null);
|
||||||
|
if (stored == null) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (stored.getDescription() == null || stored.getDescription().isEmpty()) {
|
||||||
|
stored.setDescription(modelColumn.getDescription());
|
||||||
|
}
|
||||||
|
}
|
||||||
dao.tableDAO().update(table.getId(), JsonUtils.pojoToJson(table));
|
dao.tableDAO().update(table.getId(), JsonUtils.pojoToJson(table));
|
||||||
setFields(table, Fields.EMPTY_FIELDS);
|
setFields(table, Fields.EMPTY_FIELDS);
|
||||||
return table;
|
return table;
|
||||||
@ -785,8 +801,7 @@ public class TableRepository extends EntityRepository<Table> {
|
|||||||
// Carry forward the user generated metadata from existing columns to new columns
|
// Carry forward the user generated metadata from existing columns to new columns
|
||||||
for (Column updated : updatedColumns) {
|
for (Column updated : updatedColumns) {
|
||||||
// Find stored column matching name, data type and ordinal position
|
// Find stored column matching name, data type and ordinal position
|
||||||
Column stored = origColumns.stream().filter(c ->
|
Column stored = origColumns.stream().filter(c -> columnMatch.test(c, updated)).findAny().orElse(null);
|
||||||
EntityUtil.columnMatch.test(c, updated)).findAny().orElse(null);
|
|
||||||
if (stored == null) { // New column added
|
if (stored == null) { // New column added
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -420,7 +420,7 @@ public class TableResource {
|
|||||||
@Path("/{id}/dataModel")
|
@Path("/{id}/dataModel")
|
||||||
@Operation(summary = "Add data modeling information to a table", tags = "tables",
|
@Operation(summary = "Add data modeling information to a table", tags = "tables",
|
||||||
description = "Add data modeling (such as DBT model) information on how the table was created to the table.")
|
description = "Add data modeling (such as DBT model) information on how the table was created to the table.")
|
||||||
public Table addQuery(@Context UriInfo uriInfo,
|
public Table addDataModel(@Context UriInfo uriInfo,
|
||||||
@Context SecurityContext securityContext,
|
@Context SecurityContext securityContext,
|
||||||
@Parameter(description = "Id of the table", schema = @Schema(type = "string"))
|
@Parameter(description = "Id of the table", schema = @Schema(type = "string"))
|
||||||
@PathParam("id") String id, DataModel dataModel) throws IOException, ParseException {
|
@PathParam("id") String id, DataModel dataModel) throws IOException, ParseException {
|
||||||
|
@ -100,6 +100,9 @@ public final class EntityUtil {
|
|||||||
column1.getArrayDataType() == column2.getArrayDataType() &&
|
column1.getArrayDataType() == column2.getArrayDataType() &&
|
||||||
Objects.equals(column1.getOrdinalPosition(), column2.getOrdinalPosition());
|
Objects.equals(column1.getOrdinalPosition(), column2.getOrdinalPosition());
|
||||||
|
|
||||||
|
public static BiPredicate<Column, Column> columnNameMatch = (column1, column2) ->
|
||||||
|
column1.getName().equals(column2.getName());
|
||||||
|
|
||||||
public static BiPredicate<TableConstraint, TableConstraint> tableConstraintMatch = (constraint1, constraint2) ->
|
public static BiPredicate<TableConstraint, TableConstraint> tableConstraintMatch = (constraint1, constraint2) ->
|
||||||
constraint1.getConstraintType() == constraint2.getConstraintType() &&
|
constraint1.getConstraintType() == constraint2.getConstraintType() &&
|
||||||
constraint1.getColumns().equals(constraint2.getColumns());
|
constraint1.getColumns().equals(constraint2.getColumns());
|
||||||
|
@ -395,6 +395,10 @@
|
|||||||
"modelType" : {
|
"modelType" : {
|
||||||
"$ref" : "#/definitions/modelType"
|
"$ref" : "#/definitions/modelType"
|
||||||
},
|
},
|
||||||
|
"description" : {
|
||||||
|
"description": "Description of the Table from the model",
|
||||||
|
"type" : "string"
|
||||||
|
},
|
||||||
"path" : {
|
"path" : {
|
||||||
"description": "Path to sql definition file.",
|
"description": "Path to sql definition file.",
|
||||||
"type" : "string"
|
"type" : "string"
|
||||||
|
@ -811,19 +811,30 @@ public class TableResourceTest extends EntityResourceTest<Table> {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void put_tableDataModel(TestInfo test) throws IOException {
|
public void put_tableDataModel(TestInfo test) throws IOException {
|
||||||
Table table = createAndCheckEntity(create(test), adminAuthHeaders());
|
List<Column> columns = Arrays.asList(
|
||||||
|
getColumn("c1", BIGINT, USER_ADDRESS_TAG_LABEL).withDescription(null),
|
||||||
|
getColumn("c2", ColumnDataType.VARCHAR, USER_ADDRESS_TAG_LABEL)
|
||||||
|
.withDataLength(10).withDescription(null));
|
||||||
|
Table table = createAndCheckEntity(create(test).withColumns(columns).withDescription(null), adminAuthHeaders());
|
||||||
|
|
||||||
//
|
//
|
||||||
// Update the data model and validate the response
|
// Update the data model and validate the response.
|
||||||
|
// Make sure table and column description is carried forward if the original entity had them as null
|
||||||
//
|
//
|
||||||
|
columns.get(0).setDescription("updatedDescription");
|
||||||
|
columns.get(1).setDescription("updatedDescription");
|
||||||
String query = "select * from test;";
|
String query = "select * from test;";
|
||||||
DataModel dataModel = new DataModel().withModelType(ModelType.DBT).withSql(query).withGeneratedAt(new Date());
|
DataModel dataModel = new DataModel().withDescription("updatedTableDescription").withModelType(ModelType.DBT)
|
||||||
|
.withSql(query).withGeneratedAt(new Date()).withColumns(columns);
|
||||||
Table putResponse = putTableDataModel(table.getId(), dataModel, adminAuthHeaders());
|
Table putResponse = putTableDataModel(table.getId(), dataModel, adminAuthHeaders());
|
||||||
assertDataModel(dataModel, putResponse.getDataModel());
|
assertDataModel(dataModel, putResponse.getDataModel());
|
||||||
|
assertEquals("updatedTableDescription", putResponse.getDescription()); // Table description updated
|
||||||
|
|
||||||
// Get the table and validate the data model
|
// Get the table and validate the data model
|
||||||
Table getResponse = getEntity(table.getId(), "dataModel", adminAuthHeaders());
|
Table getResponse = getEntity(table.getId(), "dataModel,columns,tags", adminAuthHeaders());
|
||||||
assertDataModel(dataModel, getResponse.getDataModel());
|
assertDataModel(dataModel, getResponse.getDataModel());
|
||||||
|
assertEquals("updatedTableDescription", getResponse.getDescription()); // Table description updated
|
||||||
|
assertColumns(columns, getResponse.getColumns()); // Column description updated
|
||||||
|
|
||||||
//
|
//
|
||||||
// Update again
|
// Update again
|
||||||
|
31
ingestion/examples/workflows/redshift_dbt.json
Normal file
31
ingestion/examples/workflows/redshift_dbt.json
Normal file
@ -0,0 +1,31 @@
|
|||||||
|
{
|
||||||
|
"source": {
|
||||||
|
"type": "redshift",
|
||||||
|
"config": {
|
||||||
|
"host_port": "cluster.name.region.redshift.amazonaws.com:5439",
|
||||||
|
"username": "username",
|
||||||
|
"password": "strong_password",
|
||||||
|
"database": "warehouse",
|
||||||
|
"service_name": "aws_redshift",
|
||||||
|
"filter_pattern": {
|
||||||
|
"excludes": [
|
||||||
|
"information_schema.*",
|
||||||
|
"[\\w]*event_vw.*"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"dbt_manifest_file": "./examples/sample_data/dbt/manifest.json",
|
||||||
|
"dbt_catalog_file": "./examples/sample_data/dbt/catalog.json"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"sink": {
|
||||||
|
"type": "metadata-rest",
|
||||||
|
"config": {}
|
||||||
|
},
|
||||||
|
"metadata_server": {
|
||||||
|
"type": "metadata-server",
|
||||||
|
"config": {
|
||||||
|
"api_endpoint": "http://localhost:8585/api",
|
||||||
|
"auth_provider_type": "no-auth"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -13,7 +13,6 @@ from typing import Optional
|
|||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
|
|
||||||
from metadata.generated.schema.entity.data.database import Database
|
from metadata.generated.schema.entity.data.database import Database
|
||||||
from metadata.generated.schema.entity.data.dbtmodel import DbtModel
|
|
||||||
from metadata.generated.schema.entity.data.location import Location
|
from metadata.generated.schema.entity.data.location import Location
|
||||||
from metadata.generated.schema.entity.data.table import Table
|
from metadata.generated.schema.entity.data.table import Table
|
||||||
|
|
||||||
@ -22,8 +21,3 @@ class OMetaDatabaseAndTable(BaseModel):
|
|||||||
database: Database
|
database: Database
|
||||||
table: Table
|
table: Table
|
||||||
location: Optional[Location]
|
location: Optional[Location]
|
||||||
|
|
||||||
|
|
||||||
class OMetaDatabaseAndModel(BaseModel):
|
|
||||||
model: DbtModel
|
|
||||||
database: Database
|
|
||||||
|
@ -8,6 +8,7 @@ from typing import List
|
|||||||
|
|
||||||
from metadata.generated.schema.entity.data.location import Location
|
from metadata.generated.schema.entity.data.location import Location
|
||||||
from metadata.generated.schema.entity.data.table import (
|
from metadata.generated.schema.entity.data.table import (
|
||||||
|
DataModel,
|
||||||
Table,
|
Table,
|
||||||
TableData,
|
TableData,
|
||||||
TableJoins,
|
TableJoins,
|
||||||
@ -71,6 +72,19 @@ class OMetaTableMixin:
|
|||||||
)
|
)
|
||||||
return [TableProfile(**t) for t in resp["tableProfile"]]
|
return [TableProfile(**t) for t in resp["tableProfile"]]
|
||||||
|
|
||||||
|
def ingest_table_data_model(self, table: Table, data_model: DataModel) -> Table:
|
||||||
|
"""
|
||||||
|
PUT data model for a table
|
||||||
|
|
||||||
|
:param table: Table Entity to update
|
||||||
|
:param data_model: Model to add
|
||||||
|
"""
|
||||||
|
resp = self.client.put(
|
||||||
|
f"{self.get_suffix(Table)}/{table.id.__root__}/dataModel",
|
||||||
|
data=data_model.json(),
|
||||||
|
)
|
||||||
|
return Table(**resp)
|
||||||
|
|
||||||
def publish_table_usage(
|
def publish_table_usage(
|
||||||
self, table: Table, table_usage_request: TableUsageRequest
|
self, table: Table, table_usage_request: TableUsageRequest
|
||||||
) -> None:
|
) -> None:
|
||||||
|
@ -18,7 +18,6 @@ from metadata.generated.schema.api.lineage.addLineage import AddLineage
|
|||||||
from metadata.generated.schema.entity.data.chart import Chart
|
from metadata.generated.schema.entity.data.chart import Chart
|
||||||
from metadata.generated.schema.entity.data.dashboard import Dashboard
|
from metadata.generated.schema.entity.data.dashboard import Dashboard
|
||||||
from metadata.generated.schema.entity.data.database import Database
|
from metadata.generated.schema.entity.data.database import Database
|
||||||
from metadata.generated.schema.entity.data.dbtmodel import DbtModel
|
|
||||||
from metadata.generated.schema.entity.data.location import Location
|
from metadata.generated.schema.entity.data.location import Location
|
||||||
from metadata.generated.schema.entity.data.metrics import Metrics
|
from metadata.generated.schema.entity.data.metrics import Metrics
|
||||||
from metadata.generated.schema.entity.data.mlmodel import MlModel
|
from metadata.generated.schema.entity.data.mlmodel import MlModel
|
||||||
@ -171,11 +170,6 @@ class OpenMetadata(OMetaLineageMixin, OMetaTableMixin, Generic[T, C]):
|
|||||||
):
|
):
|
||||||
return "/topics"
|
return "/topics"
|
||||||
|
|
||||||
if issubclass(
|
|
||||||
entity, get_args(Union[DbtModel, self.get_create_entity_type(DbtModel)])
|
|
||||||
):
|
|
||||||
return "/dbtmodels"
|
|
||||||
|
|
||||||
if issubclass(entity, Metrics):
|
if issubclass(entity, Metrics):
|
||||||
return "/metrics"
|
return "/metrics"
|
||||||
|
|
||||||
|
@ -23,7 +23,6 @@ from metadata.config.common import ConfigModel
|
|||||||
from metadata.generated.schema.entity.data.chart import Chart
|
from metadata.generated.schema.entity.data.chart import Chart
|
||||||
from metadata.generated.schema.entity.data.dashboard import Dashboard
|
from metadata.generated.schema.entity.data.dashboard import Dashboard
|
||||||
from metadata.generated.schema.entity.data.database import Database
|
from metadata.generated.schema.entity.data.database import Database
|
||||||
from metadata.generated.schema.entity.data.dbtmodel import DbtModel
|
|
||||||
from metadata.generated.schema.entity.data.pipeline import Pipeline, Task
|
from metadata.generated.schema.entity.data.pipeline import Pipeline, Task
|
||||||
from metadata.generated.schema.entity.data.table import Column, Table
|
from metadata.generated.schema.entity.data.table import Column, Table
|
||||||
from metadata.generated.schema.entity.data.topic import Topic
|
from metadata.generated.schema.entity.data.topic import Topic
|
||||||
@ -209,14 +208,6 @@ class ElasticsearchSink(Sink[Entity]):
|
|||||||
body=pipeline_doc.json(),
|
body=pipeline_doc.json(),
|
||||||
request_timeout=self.config.timeout,
|
request_timeout=self.config.timeout,
|
||||||
)
|
)
|
||||||
if isinstance(record, DbtModel):
|
|
||||||
dbt_model_doc = self._create_dbt_model_es_doc(record)
|
|
||||||
self.elasticsearch_client.index(
|
|
||||||
index=self.config.dbt_index_name,
|
|
||||||
id=str(dbt_model_doc.dbt_model_id),
|
|
||||||
body=dbt_model_doc.json(),
|
|
||||||
request_timeout=self.config.timeout,
|
|
||||||
)
|
|
||||||
|
|
||||||
if hasattr(record.name, "__root__"):
|
if hasattr(record.name, "__root__"):
|
||||||
self.status.records_written(record.name.__root__)
|
self.status.records_written(record.name.__root__)
|
||||||
@ -450,71 +441,6 @@ class ElasticsearchSink(Sink[Entity]):
|
|||||||
|
|
||||||
return pipeline_doc
|
return pipeline_doc
|
||||||
|
|
||||||
def _create_dbt_model_es_doc(self, dbt_model: DbtModel):
|
|
||||||
fqdn = dbt_model.fullyQualifiedName
|
|
||||||
database = dbt_model.database.name
|
|
||||||
dbt_model_name = dbt_model.name
|
|
||||||
suggest = [
|
|
||||||
{"input": [fqdn], "weight": 5},
|
|
||||||
{"input": [dbt_model_name], "weight": 10},
|
|
||||||
]
|
|
||||||
column_names = []
|
|
||||||
column_descriptions = []
|
|
||||||
tags = set()
|
|
||||||
|
|
||||||
timestamp = time.time()
|
|
||||||
tier = None
|
|
||||||
for dbt_model_tag in dbt_model.tags:
|
|
||||||
if "Tier" in dbt_model_tag.tagFQN:
|
|
||||||
tier = dbt_model_tag.tagFQN
|
|
||||||
else:
|
|
||||||
tags.add(dbt_model_tag.tagFQN)
|
|
||||||
self._parse_columns(
|
|
||||||
dbt_model.columns, None, column_names, column_descriptions, tags
|
|
||||||
)
|
|
||||||
|
|
||||||
database_entity = self.metadata.get_by_id(
|
|
||||||
entity=Database, entity_id=str(dbt_model.database.id.__root__)
|
|
||||||
)
|
|
||||||
service_entity = self.metadata.get_by_id(
|
|
||||||
entity=DatabaseService, entity_id=str(database_entity.service.id.__root__)
|
|
||||||
)
|
|
||||||
dbt_model_owner = (
|
|
||||||
str(dbt_model.owner.id.__root__) if dbt_model.owner is not None else ""
|
|
||||||
)
|
|
||||||
dbt_model_followers = []
|
|
||||||
if dbt_model.followers:
|
|
||||||
for follower in dbt_model.followers.__root__:
|
|
||||||
dbt_model_followers.append(str(follower.id.__root__))
|
|
||||||
dbt_node_type = None
|
|
||||||
if hasattr(dbt_model.dbtNodeType, "name"):
|
|
||||||
dbt_node_type = dbt_model.dbtNodeType.name
|
|
||||||
change_descriptions = self._get_change_descriptions(
|
|
||||||
DbtModel, dbt_model.id.__root__
|
|
||||||
)
|
|
||||||
dbt_model_doc = DbtModelESDocument(
|
|
||||||
dbt_model_id=str(dbt_model.id.__root__),
|
|
||||||
database=str(database_entity.name.__root__),
|
|
||||||
service=service_entity.name,
|
|
||||||
service_type=service_entity.serviceType.name,
|
|
||||||
service_category="databaseService",
|
|
||||||
name=dbt_model.name.__root__,
|
|
||||||
suggest=suggest,
|
|
||||||
description=dbt_model.description,
|
|
||||||
dbt_model_type=dbt_node_type,
|
|
||||||
last_updated_timestamp=timestamp,
|
|
||||||
column_names=column_names,
|
|
||||||
column_descriptions=column_descriptions,
|
|
||||||
tier=tier,
|
|
||||||
tags=list(tags),
|
|
||||||
fqdn=fqdn,
|
|
||||||
schema_description=None,
|
|
||||||
owner=dbt_model_owner,
|
|
||||||
followers=dbt_model_followers,
|
|
||||||
change_descriptions=change_descriptions,
|
|
||||||
)
|
|
||||||
return dbt_model_doc
|
|
||||||
|
|
||||||
def _get_charts(self, chart_refs: Optional[List[entityReference.EntityReference]]):
|
def _get_charts(self, chart_refs: Optional[List[entityReference.EntityReference]]):
|
||||||
charts = []
|
charts = []
|
||||||
if chart_refs:
|
if chart_refs:
|
||||||
|
@ -23,9 +23,6 @@ from metadata.generated.schema.api.data.createDashboard import (
|
|||||||
from metadata.generated.schema.api.data.createDatabase import (
|
from metadata.generated.schema.api.data.createDatabase import (
|
||||||
CreateDatabaseEntityRequest,
|
CreateDatabaseEntityRequest,
|
||||||
)
|
)
|
||||||
from metadata.generated.schema.api.data.createDbtModel import (
|
|
||||||
CreateDbtModelEntityRequest,
|
|
||||||
)
|
|
||||||
from metadata.generated.schema.api.data.createLocation import (
|
from metadata.generated.schema.api.data.createLocation import (
|
||||||
CreateLocationEntityRequest,
|
CreateLocationEntityRequest,
|
||||||
)
|
)
|
||||||
@ -46,10 +43,7 @@ from metadata.generated.schema.entity.teams.user import User
|
|||||||
from metadata.generated.schema.type.entityReference import EntityReference
|
from metadata.generated.schema.type.entityReference import EntityReference
|
||||||
from metadata.ingestion.api.common import Entity, WorkflowContext
|
from metadata.ingestion.api.common import Entity, WorkflowContext
|
||||||
from metadata.ingestion.api.sink import Sink, SinkStatus
|
from metadata.ingestion.api.sink import Sink, SinkStatus
|
||||||
from metadata.ingestion.models.ometa_table_db import (
|
from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable
|
||||||
OMetaDatabaseAndModel,
|
|
||||||
OMetaDatabaseAndTable,
|
|
||||||
)
|
|
||||||
from metadata.ingestion.models.table_metadata import Chart, Dashboard
|
from metadata.ingestion.models.table_metadata import Chart, Dashboard
|
||||||
from metadata.ingestion.ometa.client import APIError
|
from metadata.ingestion.ometa.client import APIError
|
||||||
from metadata.ingestion.ometa.ometa_api import OpenMetadata
|
from metadata.ingestion.ometa.ometa_api import OpenMetadata
|
||||||
@ -129,8 +123,6 @@ class MetadataRestSink(Sink[Entity]):
|
|||||||
self.write_users(record)
|
self.write_users(record)
|
||||||
elif isinstance(record, MlModel):
|
elif isinstance(record, MlModel):
|
||||||
self.write_ml_model(record)
|
self.write_ml_model(record)
|
||||||
elif isinstance(record, OMetaDatabaseAndModel):
|
|
||||||
self.write_dbt_models(record)
|
|
||||||
else:
|
else:
|
||||||
logging.info(
|
logging.info(
|
||||||
f"Ignoring the record due to unknown Record type {type(record)}"
|
f"Ignoring the record due to unknown Record type {type(record)}"
|
||||||
@ -188,6 +180,11 @@ class MetadataRestSink(Sink[Entity]):
|
|||||||
table_profile=db_and_table.table.tableProfile,
|
table_profile=db_and_table.table.tableProfile,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if db_and_table.table.dataModel is not None:
|
||||||
|
self.metadata.ingest_table_data_model(
|
||||||
|
table=created_table, data_model=db_and_table.table.dataModel
|
||||||
|
)
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
"Successfully ingested table {}.{}".format(
|
"Successfully ingested table {}.{}".format(
|
||||||
db_and_table.database.name.__root__,
|
db_and_table.database.name.__root__,
|
||||||
@ -207,44 +204,6 @@ class MetadataRestSink(Sink[Entity]):
|
|||||||
logger.error(err)
|
logger.error(err)
|
||||||
self.status.failure(f"Table: {db_and_table.table.name.__root__}")
|
self.status.failure(f"Table: {db_and_table.table.name.__root__}")
|
||||||
|
|
||||||
def write_dbt_models(self, model_and_db: OMetaDatabaseAndModel):
|
|
||||||
try:
|
|
||||||
db_request = CreateDatabaseEntityRequest(
|
|
||||||
name=model_and_db.database.name,
|
|
||||||
description=model_and_db.database.description,
|
|
||||||
service=EntityReference(
|
|
||||||
id=model_and_db.database.service.id, type="databaseService"
|
|
||||||
),
|
|
||||||
)
|
|
||||||
db = self.metadata.create_or_update(db_request)
|
|
||||||
model = model_and_db.model
|
|
||||||
model_request = CreateDbtModelEntityRequest(
|
|
||||||
name=model.name,
|
|
||||||
description=model.description,
|
|
||||||
viewDefinition=model.viewDefinition,
|
|
||||||
database=db.id,
|
|
||||||
dbtNodeType=model.dbtNodeType,
|
|
||||||
columns=model.columns,
|
|
||||||
)
|
|
||||||
created_model = self.metadata.create_or_update(model_request)
|
|
||||||
logger.info(
|
|
||||||
"Successfully ingested model {}.{}".format(
|
|
||||||
db.name.__root__, created_model.name.__root__
|
|
||||||
)
|
|
||||||
)
|
|
||||||
self.status.records_written(
|
|
||||||
f"Model: {db.name.__root__}.{created_model.name.__root__}"
|
|
||||||
)
|
|
||||||
except (APIError, ValidationError) as err:
|
|
||||||
logger.error(
|
|
||||||
"Failed to ingest model {} in database {} ".format(
|
|
||||||
model_and_db.model.name.__root__,
|
|
||||||
model_and_db.database.name.__root__,
|
|
||||||
)
|
|
||||||
)
|
|
||||||
logger.error(err)
|
|
||||||
self.status.failure(f"Model: {model_and_db.model.name.__root__}")
|
|
||||||
|
|
||||||
def write_topics(self, topic: CreateTopicEntityRequest) -> None:
|
def write_topics(self, topic: CreateTopicEntityRequest) -> None:
|
||||||
try:
|
try:
|
||||||
created_topic = self.metadata.create_or_update(topic)
|
created_topic = self.metadata.create_or_update(topic)
|
||||||
|
@ -14,16 +14,14 @@ from dataclasses import dataclass, field
|
|||||||
from typing import Iterable, List, Optional
|
from typing import Iterable, List, Optional
|
||||||
|
|
||||||
from metadata.config.common import ConfigModel
|
from metadata.config.common import ConfigModel
|
||||||
|
from metadata.generated.schema.entity.data.dashboard import Dashboard
|
||||||
from metadata.generated.schema.entity.data.pipeline import Pipeline
|
from metadata.generated.schema.entity.data.pipeline import Pipeline
|
||||||
|
from metadata.generated.schema.entity.data.table import Table
|
||||||
|
from metadata.generated.schema.entity.data.topic import Topic
|
||||||
from metadata.ingestion.api.common import Entity, WorkflowContext
|
from metadata.ingestion.api.common import Entity, WorkflowContext
|
||||||
from metadata.ingestion.api.source import Source, SourceStatus
|
from metadata.ingestion.api.source import Source, SourceStatus
|
||||||
from metadata.ingestion.ometa.ometa_api import OpenMetadata
|
from metadata.ingestion.ometa.ometa_api import OpenMetadata
|
||||||
|
from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig
|
||||||
from ...generated.schema.entity.data.dashboard import Dashboard
|
|
||||||
from ...generated.schema.entity.data.dbtmodel import DbtModel
|
|
||||||
from ...generated.schema.entity.data.table import Table
|
|
||||||
from ...generated.schema.entity.data.topic import Topic
|
|
||||||
from ..ometa.openmetadata_rest import MetadataServerConfig
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
@ -97,7 +95,6 @@ class MetadataSource(Source[Entity]):
|
|||||||
yield from self.fetch_topic()
|
yield from self.fetch_topic()
|
||||||
yield from self.fetch_dashboard()
|
yield from self.fetch_dashboard()
|
||||||
yield from self.fetch_pipeline()
|
yield from self.fetch_pipeline()
|
||||||
yield from self.fetch_dbt_models()
|
|
||||||
|
|
||||||
def fetch_table(self) -> Table:
|
def fetch_table(self) -> Table:
|
||||||
if self.config.include_tables:
|
if self.config.include_tables:
|
||||||
@ -180,29 +177,6 @@ class MetadataSource(Source[Entity]):
|
|||||||
break
|
break
|
||||||
after = pipeline_entities.after
|
after = pipeline_entities.after
|
||||||
|
|
||||||
def fetch_dbt_models(self) -> Pipeline:
|
|
||||||
after = None
|
|
||||||
while True:
|
|
||||||
dbt_model_entities = self.metadata.list_entities(
|
|
||||||
entity=DbtModel,
|
|
||||||
fields=[
|
|
||||||
"columns",
|
|
||||||
"owner",
|
|
||||||
"database",
|
|
||||||
"tags",
|
|
||||||
"followers",
|
|
||||||
"viewDefinition",
|
|
||||||
],
|
|
||||||
after=after,
|
|
||||||
limit=self.config.limit_records,
|
|
||||||
)
|
|
||||||
for dbt_model in dbt_model_entities.entities:
|
|
||||||
self.status.scanned_dashboard(dbt_model.name)
|
|
||||||
yield dbt_model
|
|
||||||
if dbt_model_entities.after is None:
|
|
||||||
break
|
|
||||||
after = dbt_model_entities.after
|
|
||||||
|
|
||||||
def get_status(self) -> SourceStatus:
|
def get_status(self) -> SourceStatus:
|
||||||
return self.status
|
return self.status
|
||||||
|
|
||||||
|
@ -8,7 +8,7 @@
|
|||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
import json
|
||||||
import logging
|
import logging
|
||||||
import re
|
import re
|
||||||
import traceback
|
import traceback
|
||||||
@ -28,6 +28,8 @@ from metadata.generated.schema.entity.data.database import Database
|
|||||||
from metadata.generated.schema.entity.data.table import (
|
from metadata.generated.schema.entity.data.table import (
|
||||||
Column,
|
Column,
|
||||||
Constraint,
|
Constraint,
|
||||||
|
DataModel,
|
||||||
|
ModelType,
|
||||||
Table,
|
Table,
|
||||||
TableData,
|
TableData,
|
||||||
TableProfile,
|
TableProfile,
|
||||||
@ -87,6 +89,8 @@ class SQLConnectionConfig(ConfigModel):
|
|||||||
data_profiler_offset: Optional[int] = 0
|
data_profiler_offset: Optional[int] = 0
|
||||||
data_profiler_limit: Optional[int] = 50000
|
data_profiler_limit: Optional[int] = 50000
|
||||||
filter_pattern: IncludeFilterPattern = IncludeFilterPattern.allow_all()
|
filter_pattern: IncludeFilterPattern = IncludeFilterPattern.allow_all()
|
||||||
|
dbt_manifest_file: Optional[str] = None
|
||||||
|
dbt_catalog_file: Optional[str] = None
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def get_connection_url(self):
|
def get_connection_url(self):
|
||||||
@ -136,6 +140,11 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
|
|||||||
self.engine = create_engine(self.connection_string, **self.sql_config.options)
|
self.engine = create_engine(self.connection_string, **self.sql_config.options)
|
||||||
self.connection = self.engine.connect()
|
self.connection = self.engine.connect()
|
||||||
self.data_profiler = None
|
self.data_profiler = None
|
||||||
|
self.data_models = {}
|
||||||
|
if self.config.dbt_catalog_file is not None:
|
||||||
|
self.dbt_catalog = json.load(open(self.config.dbt_catalog_file, "r"))
|
||||||
|
if self.config.dbt_manifest_file is not None:
|
||||||
|
self.dbt_manifest = json.load(open(self.config.dbt_manifest_file, "r"))
|
||||||
|
|
||||||
def _instantiate_profiler(self):
|
def _instantiate_profiler(self):
|
||||||
try:
|
try:
|
||||||
@ -155,7 +164,7 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
|
|||||||
)
|
)
|
||||||
|
|
||||||
def prepare(self):
|
def prepare(self):
|
||||||
pass
|
self._parse_data_model()
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def create(
|
def create(
|
||||||
@ -244,6 +253,11 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
|
|||||||
table_entity.tableProfile = (
|
table_entity.tableProfile = (
|
||||||
[profile] if profile is not None else None
|
[profile] if profile is not None else None
|
||||||
)
|
)
|
||||||
|
# check if we have any model to associate with
|
||||||
|
table_fqn = f"{schema}.{table_name}"
|
||||||
|
if table_fqn in self.data_models:
|
||||||
|
model = self.data_models[table_fqn]
|
||||||
|
table_entity.dataModel = model
|
||||||
|
|
||||||
table_and_db = OMetaDatabaseAndTable(
|
table_and_db = OMetaDatabaseAndTable(
|
||||||
table=table_entity, database=self._get_database(schema)
|
table=table_entity, database=self._get_database(schema)
|
||||||
@ -314,6 +328,85 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
|
|||||||
)
|
)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
def _parse_data_model(self) -> DataModel:
|
||||||
|
logger.info("Parsing Data Models")
|
||||||
|
if (
|
||||||
|
self.config.dbt_manifest_file is not None
|
||||||
|
and self.config.dbt_catalog_file is not None
|
||||||
|
):
|
||||||
|
manifest_nodes = self.dbt_manifest["nodes"]
|
||||||
|
manifest_sources = self.dbt_manifest["sources"]
|
||||||
|
manifest_entities = {**manifest_nodes, **manifest_sources}
|
||||||
|
catalog_nodes = self.dbt_catalog["nodes"]
|
||||||
|
catalog_sources = self.dbt_catalog["sources"]
|
||||||
|
catalog_entities = {**catalog_nodes, **catalog_sources}
|
||||||
|
|
||||||
|
for key, mnode in manifest_entities.items():
|
||||||
|
name = mnode["alias"] if "alias" in mnode.keys() else mnode["name"]
|
||||||
|
cnode = catalog_entities.get(key)
|
||||||
|
if cnode is not None:
|
||||||
|
columns = self._parse_data_model_columns(name, mnode, cnode)
|
||||||
|
else:
|
||||||
|
columns = []
|
||||||
|
if mnode["resource_type"] == "test":
|
||||||
|
continue
|
||||||
|
upstream_nodes = self._parse_data_model_upstream(mnode)
|
||||||
|
model_name = (
|
||||||
|
mnode["alias"] if "alias" in mnode.keys() else mnode["name"]
|
||||||
|
)
|
||||||
|
description = mnode.get("description", "")
|
||||||
|
schema = mnode["schema"]
|
||||||
|
path = f"{mnode['root_path']}/{mnode['original_file_path']}"
|
||||||
|
model = DataModel(
|
||||||
|
modelType=ModelType.DBT,
|
||||||
|
description=description,
|
||||||
|
path=path,
|
||||||
|
rawSql=mnode["raw_sql"] if "raw_sql" in mnode else None,
|
||||||
|
sql=mnode["compiled_sql"] if "compiled_sql" in mnode else None,
|
||||||
|
columns=columns,
|
||||||
|
upstream=upstream_nodes,
|
||||||
|
)
|
||||||
|
model_fqdn = f"{schema}.{model_name}"
|
||||||
|
self.data_models[model_fqdn] = model
|
||||||
|
|
||||||
|
def _parse_data_model_upstream(self, mnode):
|
||||||
|
upstream_nodes = []
|
||||||
|
if "depends_on" in mnode and "nodes" in mnode["depends_on"]:
|
||||||
|
for node in mnode["depends_on"]["nodes"]:
|
||||||
|
node_type, database, table = node.split(".")
|
||||||
|
table_fqn = f"{self.config.service_name}.{database}.{table}"
|
||||||
|
upstream_nodes.append(table_fqn)
|
||||||
|
return upstream_nodes
|
||||||
|
|
||||||
|
def _parse_data_model_columns(
|
||||||
|
self, model_name: str, mnode: Dict, cnode: Dict
|
||||||
|
) -> [Column]:
|
||||||
|
columns = []
|
||||||
|
ccolumns = cnode.get("columns")
|
||||||
|
manifest_columns = mnode.get("columns", {})
|
||||||
|
for key in ccolumns:
|
||||||
|
ccolumn = ccolumns[key]
|
||||||
|
try:
|
||||||
|
ctype = ccolumn["type"]
|
||||||
|
col_type = get_column_type(self.status, model_name, ctype)
|
||||||
|
description = manifest_columns.get(key.lower(), {}).get(
|
||||||
|
"description", None
|
||||||
|
)
|
||||||
|
if description is None:
|
||||||
|
description = ccolumn.get("comment", None)
|
||||||
|
col = Column(
|
||||||
|
name=ccolumn["name"].lower(),
|
||||||
|
description=description,
|
||||||
|
dataType=col_type,
|
||||||
|
dataLength=1,
|
||||||
|
ordinalPosition=ccolumn["index"],
|
||||||
|
)
|
||||||
|
columns.append(col)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to parse column type due to {e}")
|
||||||
|
|
||||||
|
return columns
|
||||||
|
|
||||||
def _get_database(self, schema: str) -> Database:
|
def _get_database(self, schema: str) -> Database:
|
||||||
return Database(
|
return Database(
|
||||||
name=schema,
|
name=schema,
|
||||||
|
Loading…
x
Reference in New Issue
Block a user