fix(snowflake): remove extra lineage edges in reports, change badly named config variable (#4595)

* fix(snowflake): remove extra lineage edges in reports
This commit is contained in:
Aseem Bansal 2022-04-20 19:33:54 +05:30 committed by GitHub
parent f659cc8938
commit bb0a87ae74
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 26 additions and 10 deletions

View File

@ -5,6 +5,7 @@ This file documents any backwards-incompatible changes in DataHub and assists pe
## Next
### Breaking Changes
- Rename confusing config `report_upstream_lineage` to `upstream_lineage_in_report` in `snowflake` connector which was added in `0.8.32`
### Potential Downtime

View File

@ -189,7 +189,7 @@ Note that a `.` is used to denote nested fields in the YAML recipe.
| `include_table_lineage` | | `True` | If enabled, populates the snowflake table-to-table and s3-to-snowflake table lineage. Requires appropriate grants given to the role. |
| `include_view_lineage` | | `True` | If enabled, populates the snowflake view->table and table->view lineages (no view->view lineage yet). Requires appropriate grants given to the role, and `include_table_lineage` to be `True`. |
| `bucket_duration` | | `"DAY"` | Duration to bucket lineage data extraction by. Can be `"DAY"` or `"HOUR"`. |
| `report_upstream_lineage` | | `False` | Whether to report upstream lineage in the report. This should be marked as `True` in case someone is debugging lineage ingestion issues |
| `upstream_lineage_in_report` | | `False` | Whether to report upstream lineage in the report. This should be marked as `True` in case someone is debugging lineage ingestion issues |
| `ignore_start_time_lineage` | | `False` | Whether to ignore `start_time` and read all data for lineage. It is meant to be used for initial ingestion |
| `start_time` | | Start of last full day in UTC (or hour, depending on `bucket_duration`) | Earliest time of lineage data to consider. For the bootstrap run, set it as far back in time as possible. |
| `end_time` | | End of last full day in UTC (or hour, depending on `bucket_duration`) | Latest time of lineage data to consider. |

View File

@ -165,6 +165,8 @@ WHERE
# Process UpstreamTable/View/ExternalTable/Materialized View->View edge.
view_upstream: str = db_row["view_upstream"].lower()
view_name: str = db_row["downstream_view"].lower()
if not self._is_dataset_allowed(dataset_name=view_name, is_view=True):
continue
# key is the downstream view name
self._lineage_map[view_name].append(
# (<upstream_table_name>, <empty_json_list_of_upstream_table_columns>, <empty_json_list_of_downstream_view_columns>)
@ -254,6 +256,8 @@ WHERE
else:
for db_row in db_rows:
view_name: str = db_row["view_name"].lower().replace('"', "")
if not self._is_dataset_allowed(dataset_name=view_name, is_view=True):
continue
downstream_table: str = (
db_row["downstream_table_name"].lower().replace('"', "")
)
@ -319,6 +323,8 @@ WHERE
for db_row in engine.execute(query):
# key is the down-stream table name
key: str = db_row[1].lower().replace('"', "")
if not self._is_dataset_allowed(key):
continue
self._external_lineage_map[key] |= {*json.loads(db_row[0])}
logger.debug(
f"ExternalLineage[Table(Down)={key}]:External(Up)={self._external_lineage_map[key]}"
@ -336,6 +342,8 @@ WHERE
key = (
f"{db_row.database_name}.{db_row.schema_name}.{db_row.name}".lower()
)
if not self._is_dataset_allowed(dataset_name=key):
continue
self._external_lineage_map[key].add(db_row.location)
logger.debug(
f"ExternalLineage[Table(Down)={key}]:External(Up)={self._external_lineage_map[key]}"
@ -388,9 +396,15 @@ QUALIFY ROW_NUMBER() OVER (PARTITION BY downstream_table_name, upstream_table_na
for db_row in engine.execute(query):
# key is the down-stream table name
key: str = db_row[1].lower().replace('"', "")
upstream_table_name = db_row[0].lower().replace('"', "")
if not (
self._is_dataset_allowed(key)
or self._is_dataset_allowed(upstream_table_name)
):
continue
self._lineage_map[key].append(
# (<upstream_table_name>, <json_list_of_upstream_columns>, <json_list_of_downstream_columns>)
(db_row[0].lower().replace('"', ""), db_row[2], db_row[3])
(upstream_table_name, db_row[2], db_row[3])
)
num_edges += 1
logger.debug(
@ -481,7 +495,7 @@ QUALIFY ROW_NUMBER() OVER (PARTITION BY downstream_table_name, upstream_table_na
logger.debug(
f"Upstream lineage of '{dataset_name}': {[u.dataset for u in upstream_tables]}"
)
if self.config.report_upstream_lineage:
if self.config.upstream_lineage_in_report:
self.report.upstream_lineage[dataset_name] = [
u.dataset for u in upstream_tables
]
@ -491,7 +505,7 @@ QUALIFY ROW_NUMBER() OVER (PARTITION BY downstream_table_name, upstream_table_na
def add_config_to_report(self):
self.report.cleaned_host_port = self.config.host_port
self.report.ignore_start_time_lineage = self.config.ignore_start_time_lineage
self.report.report_upstream_lineage = self.config.report_upstream_lineage
self.report.upstream_lineage_in_report = self.config.upstream_lineage_in_report
if not self.report.ignore_start_time_lineage:
self.report.lineage_start_time = self.config.start_time
self.report.lineage_end_time = self.config.end_time
@ -674,7 +688,9 @@ QUALIFY ROW_NUMBER() OVER (PARTITION BY downstream_table_name, upstream_table_na
# Emit the work unit from super.
yield wu
def _is_dataset_allowed(self, dataset_name: Optional[str]) -> bool:
def _is_dataset_allowed(
self, dataset_name: Optional[str], is_view: bool = False
) -> bool:
# View lineages is not supported. Add the allow/deny pattern for that when it is supported.
if dataset_name is None:
return True
@ -684,11 +700,10 @@ QUALIFY ROW_NUMBER() OVER (PARTITION BY downstream_table_name, upstream_table_na
if (
not self.config.database_pattern.allowed(dataset_params[0])
or not self.config.schema_pattern.allowed(dataset_params[1])
or not self.config.table_pattern.allowed(dataset_params[2])
or (
self.config.include_view_lineage
and not self.config.view_pattern.allowed(dataset_params[2])
not is_view and not self.config.table_pattern.allowed(dataset_params[2])
)
or (is_view and not self.config.view_pattern.allowed(dataset_params[2]))
):
return False
return True

View File

@ -180,7 +180,7 @@ class SnowflakeConfig(BaseSnowflakeConfig, SQLAlchemyConfig):
provision_role: Optional[SnowflakeProvisionRoleConfig] = None
ignore_start_time_lineage: bool = False
report_upstream_lineage: bool = False
upstream_lineage_in_report: bool = False
@pydantic.validator("database")
def note_database_opt_deprecation(cls, v, values, **kwargs):

View File

@ -18,7 +18,7 @@ class SnowflakeReport(BaseSnowflakeReport, SQLSourceReport):
num_view_to_table_edges_scanned: int = 0
num_external_table_edges_scanned: int = 0
ignore_start_time_lineage: Optional[bool] = None
report_upstream_lineage: Optional[bool] = None
upstream_lineage_in_report: Optional[bool] = None
upstream_lineage: Dict[str, List[str]] = field(default_factory=dict)
lineage_start_time: Optional[datetime] = None
lineage_end_time: Optional[datetime] = None