diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/clickhouse.py b/metadata-ingestion/src/datahub/ingestion/source/sql/clickhouse.py index 30893fd03b..84c1d3844a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/clickhouse.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/clickhouse.py @@ -501,15 +501,17 @@ class ClickHouseSource(TwoTierSQLAlchemySource): try: for db_row in engine.execute(text(query)): - if not self.config.schema_pattern.allowed( + dataset_name = f'{db_row["target_schema"]}.{db_row["target_table"]}' + if not self.config.database_pattern.allowed( db_row["target_schema"] - ) or not self.config.table_pattern.allowed(db_row["target_table"]): + ) or not self.config.table_pattern.allowed(dataset_name): + self.report.report_dropped(dataset_name) continue # Target target_path = ( f'{self.config.platform_instance+"." if self.config.platform_instance else ""}' - f'{db_row["target_schema"]}.{db_row["target_table"]}' + f"{dataset_name}" ) target = LineageItem( dataset=LineageDataset( diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/two_tier_sql_source.py b/metadata-ingestion/src/datahub/ingestion/source/sql/two_tier_sql_source.py index efb1d3ffe1..98ad2f6027 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/two_tier_sql_source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/two_tier_sql_source.py @@ -31,6 +31,7 @@ class TwoTierSQLAlchemyConfig(BasicSQLAlchemyConfig): # The superclass contains a `schema_pattern` field, so we need this here # to override the documentation. default=AllowDenyPattern.allow_all(), + hidden_from_docs=True, description="Deprecated in favour of database_pattern.", ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/usage/clickhouse_usage.py b/metadata-ingestion/src/datahub/ingestion/source/usage/clickhouse_usage.py index f659ea0c1c..50cbbded69 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/usage/clickhouse_usage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/usage/clickhouse_usage.py @@ -34,10 +34,11 @@ logger = logging.getLogger(__name__) clickhouse_datetime_format = "%Y-%m-%d %H:%M:%S" clickhouse_usage_sql_comment = """\ -SELECT user AS usename +SELECT user AS username , query - , substring(full_table_name, 1, position(full_table_name, '.') - 1) AS schema_ + , substring(full_table_name, 1, position(full_table_name, '.') - 1) AS database , substring(full_table_name, position(full_table_name, '.') + 1) AS table + , full_table_name , arrayMap(x -> substr(x, length(full_table_name) + 2), arrayFilter(x -> startsWith(x, full_table_name || '.'), columns)) AS columns , query_start_time AS starttime @@ -59,9 +60,9 @@ AggregatedDataset = GenericAggregatedDataset[ClickHouseTableRef] class ClickHouseJoinedAccessEvent(BaseModel): - usename: str = None # type:ignore + username: str = None # type:ignore query: str = None # type: ignore - schema_: str = None # type:ignore + database: str = None # type:ignore table: str = None # type:ignore columns: List[str] starttime: datetime @@ -150,9 +151,15 @@ class ClickHouseUsageSource(Source): if isinstance(v, str): event_dict[k] = v.strip() - if not self.config.schema_pattern.allowed( - event_dict.get("schema_") - ) or not self.config.table_pattern.allowed(event_dict.get("table")): + if not self.config.database_pattern.allowed( + event_dict.get("database") + ) or not ( + self.config.table_pattern.allowed(event_dict.get("full_table_name")) + or self.config.view_pattern.allowed(event_dict.get("full_table_name")) + ): + logger.debug( + f"Dropping usage event for {event_dict.get('full_table_name')}" + ) continue if event_dict.get("starttime", None): @@ -191,11 +198,11 @@ class ClickHouseUsageSource(Source): event_dict.get("endtime") ) - if not (event_dict.get("schema_", None) and event_dict.get("table", None)): + if not (event_dict.get("database", None) and event_dict.get("table", None)): logging.info("An access event parameter(s) is missing. Skipping ....") continue - if not event_dict.get("usename") or event_dict["usename"] == "": + if not event_dict.get("username") or event_dict["username"] == "": logging.info("The username parameter is missing. Skipping ....") continue @@ -215,7 +222,7 @@ class ClickHouseUsageSource(Source): resource = ( f'{self.config.platform_instance+"." if self.config.platform_instance else ""}' - f"{event.schema_}.{event.table}" + f"{event.database}.{event.table}" ) agg_bucket = datasets[floored_ts].setdefault( @@ -224,7 +231,7 @@ class ClickHouseUsageSource(Source): ) # current limitation in user stats UI, we need to provide email to show users - user_email = f"{event.usename if event.usename else 'unknown'}" + user_email = f"{event.username if event.username else 'unknown'}" if "@" not in user_email: user_email += f"@{self.config.email_domain}" logger.info(f"user_email: {user_email}")