Added database in fqdn for dbt (#4479)

* added database in fqdn for dbt

* used fqdn generator function

* lower method added

Co-authored-by: Onkar Ravgan <onkarravgan@Onkars-MacBook-Pro.local>
This commit is contained in:
Onkar Ravgan 2022-05-02 17:58:58 +05:30 committed by GitHub
parent dba6bf2adf
commit b299c0683a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 12 additions and 8 deletions

View File

@ -346,9 +346,11 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
except Exception as err:
logger.error(err)
# check if we have any model to associate with
table_entity.dataModel = self._get_data_model(schema, table_name)
database = self._get_database(self.service_connection.database)
# check if we have any model to associate with
table_entity.dataModel = self._get_data_model(
database.name.__root__, schema, table_name
)
table_schema_and_db = OMetaDatabaseAndTable(
table=table_entity,
database=database,
@ -425,7 +427,6 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
except Exception as err:
logger.error(err)
# table.dataModel = self._get_data_model(schema, view_name)
database = self._get_database(self.service_connection.database)
table_schema_and_db = OMetaDatabaseAndTable(
table=table,
@ -487,6 +488,7 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
model_name = (
mnode["alias"] if "alias" in mnode.keys() else mnode["name"]
)
database = mnode["database"]
schema = mnode["schema"]
raw_sql = mnode.get("raw_sql", "")
model = DataModel(
@ -498,7 +500,9 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
columns=columns,
upstream=upstream_nodes,
)
model_fqdn = f"{schema}.{model_name}".lower()
model_fqdn = get_fqdn(
DataModel, database, schema, model_name
).lower()
self.data_models[model_fqdn] = model
except Exception as err:
logger.debug(traceback.format_exc())
@ -523,8 +527,8 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
continue
return upstream_nodes
def _get_data_model(self, schema, table_name):
table_fqn = f"{schema}{FQDN_SEPARATOR}{table_name}".lower()
def _get_data_model(self, database, schema, table_name):
table_fqn = get_fqdn(DataModel, database, schema, table_name).lower()
if table_fqn in self.data_models:
model = self.data_models[table_fqn]
return model

View File

@ -64,8 +64,8 @@ def _(tag_category="", tag_name=""):
@fqdn_build_registry.add(DataModel)
def _(schema_name="", model_name=""):
return FQDN_SEPARATOR.join(replace_none(schema_name, model_name))
def _(database_name="", schema_name="", model_name=""):
return FQDN_SEPARATOR.join(replace_none(database_name, schema_name, model_name))
@fqdn_build_registry.add(AddLineageRequest)