fix(ingest): support database_pattern in clickhouse and clickhouse-usage (#9482)

Co-authored-by: Harshal Sheth <hsheth2@gmail.com>
This commit is contained in:
Gerrit-K 2024-01-11 02:24:44 +01:00 committed by GitHub
parent 47e6a04fb9
commit 8cb6d8845b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 24 additions and 14 deletions

View File

@ -501,15 +501,17 @@ class ClickHouseSource(TwoTierSQLAlchemySource):
try:
for db_row in engine.execute(text(query)):
if not self.config.schema_pattern.allowed(
dataset_name = f'{db_row["target_schema"]}.{db_row["target_table"]}'
if not self.config.database_pattern.allowed(
db_row["target_schema"]
) or not self.config.table_pattern.allowed(db_row["target_table"]):
) or not self.config.table_pattern.allowed(dataset_name):
self.report.report_dropped(dataset_name)
continue
# Target
target_path = (
f'{self.config.platform_instance+"." if self.config.platform_instance else ""}'
f'{db_row["target_schema"]}.{db_row["target_table"]}'
f"{dataset_name}"
)
target = LineageItem(
dataset=LineageDataset(

View File

@ -31,6 +31,7 @@ class TwoTierSQLAlchemyConfig(BasicSQLAlchemyConfig):
# The superclass contains a `schema_pattern` field, so we need this here
# to override the documentation.
default=AllowDenyPattern.allow_all(),
hidden_from_docs=True,
description="Deprecated in favour of database_pattern.",
)

View File

@ -34,10 +34,11 @@ logger = logging.getLogger(__name__)
clickhouse_datetime_format = "%Y-%m-%d %H:%M:%S"
clickhouse_usage_sql_comment = """\
SELECT user AS usename
SELECT user AS username
, query
, substring(full_table_name, 1, position(full_table_name, '.') - 1) AS schema_
, substring(full_table_name, 1, position(full_table_name, '.') - 1) AS database
, substring(full_table_name, position(full_table_name, '.') + 1) AS table
, full_table_name
, arrayMap(x -> substr(x, length(full_table_name) + 2),
arrayFilter(x -> startsWith(x, full_table_name || '.'), columns)) AS columns
, query_start_time AS starttime
@ -59,9 +60,9 @@ AggregatedDataset = GenericAggregatedDataset[ClickHouseTableRef]
class ClickHouseJoinedAccessEvent(BaseModel):
usename: str = None # type:ignore
username: str = None # type:ignore
query: str = None # type: ignore
schema_: str = None # type:ignore
database: str = None # type:ignore
table: str = None # type:ignore
columns: List[str]
starttime: datetime
@ -150,9 +151,15 @@ class ClickHouseUsageSource(Source):
if isinstance(v, str):
event_dict[k] = v.strip()
if not self.config.schema_pattern.allowed(
event_dict.get("schema_")
) or not self.config.table_pattern.allowed(event_dict.get("table")):
if not self.config.database_pattern.allowed(
event_dict.get("database")
) or not (
self.config.table_pattern.allowed(event_dict.get("full_table_name"))
or self.config.view_pattern.allowed(event_dict.get("full_table_name"))
):
logger.debug(
f"Dropping usage event for {event_dict.get('full_table_name')}"
)
continue
if event_dict.get("starttime", None):
@ -191,11 +198,11 @@ class ClickHouseUsageSource(Source):
event_dict.get("endtime")
)
if not (event_dict.get("schema_", None) and event_dict.get("table", None)):
if not (event_dict.get("database", None) and event_dict.get("table", None)):
logging.info("An access event parameter(s) is missing. Skipping ....")
continue
if not event_dict.get("usename") or event_dict["usename"] == "":
if not event_dict.get("username") or event_dict["username"] == "":
logging.info("The username parameter is missing. Skipping ....")
continue
@ -215,7 +222,7 @@ class ClickHouseUsageSource(Source):
resource = (
f'{self.config.platform_instance+"." if self.config.platform_instance else ""}'
f"{event.schema_}.{event.table}"
f"{event.database}.{event.table}"
)
agg_bucket = datasets[floored_ts].setdefault(
@ -224,7 +231,7 @@ class ClickHouseUsageSource(Source):
)
# current limitation in user stats UI, we need to provide email to show users
user_email = f"{event.usename if event.usename else 'unknown'}"
user_email = f"{event.username if event.username else 'unknown'}"
if "@" not in user_email:
user_email += f"@{self.config.email_domain}"
logger.info(f"user_email: {user_email}")