From 19e5f737ffc7ff7442fea6471bf5eb744365e641 Mon Sep 17 00:00:00 2001 From: Ayush Shah Date: Tue, 4 Jan 2022 17:12:10 +0530 Subject: [PATCH] Usage fix - removed service_name from bulk_sink (#1956) * Query Usage fixed * Usage updated * Reformatted change * Usage files reformatted --- ingestion/examples/airflow/dags/airflow_sample_usage.py | 3 +-- ingestion/examples/workflows/bigquery_usage.json | 3 +-- ingestion/examples/workflows/redshift_usage.json | 3 +-- ingestion/examples/workflows/snowflake_usage.json | 3 +-- ingestion/pipelines/sample_usage.json | 3 +-- ingestion/src/metadata/ingestion/bulksink/metadata_usage.py | 5 +++-- ingestion/src/metadata/ingestion/models/table_queries.py | 4 ++++ ingestion/src/metadata/ingestion/processor/query_parser.py | 1 + ingestion/src/metadata/ingestion/source/bigquery_usage.py | 1 + ingestion/src/metadata/ingestion/source/redshift_usage.py | 2 ++ ingestion/src/metadata/ingestion/source/sample_usage.py | 1 + ingestion/src/metadata/ingestion/source/snowflake_usage.py | 4 ++-- ingestion/src/metadata/ingestion/stage/table_usage.py | 1 + 13 files changed, 20 insertions(+), 14 deletions(-) diff --git a/ingestion/examples/airflow/dags/airflow_sample_usage.py b/ingestion/examples/airflow/dags/airflow_sample_usage.py index e49c05a0233..204fefde91d 100644 --- a/ingestion/examples/airflow/dags/airflow_sample_usage.py +++ b/ingestion/examples/airflow/dags/airflow_sample_usage.py @@ -59,8 +59,7 @@ config = """ "bulk_sink": { "type": "metadata-usage", "config": { - "filename": "/tmp/sample_usage", - "service_name": "bigquery_gcp" + "filename": "/tmp/sample_usage" } }, "metadata_server": { diff --git a/ingestion/examples/workflows/bigquery_usage.json b/ingestion/examples/workflows/bigquery_usage.json index 951aeecbeea..f439a23e448 100644 --- a/ingestion/examples/workflows/bigquery_usage.json +++ b/ingestion/examples/workflows/bigquery_usage.json @@ -27,8 +27,7 @@ "bulk_sink": { "type": "metadata-usage", "config": { - "filename": "/tmp/bigquery_usage", - "service_name": "gcp_bigquery" + "filename": "/tmp/bigquery_usage" } }, "metadata_server": { diff --git a/ingestion/examples/workflows/redshift_usage.json b/ingestion/examples/workflows/redshift_usage.json index 467e0101dae..e2b33d915b3 100644 --- a/ingestion/examples/workflows/redshift_usage.json +++ b/ingestion/examples/workflows/redshift_usage.json @@ -26,8 +26,7 @@ "bulk_sink": { "type": "metadata-usage", "config": { - "filename": "/tmp/redshift_usage", - "service_name": "aws_redshift" + "filename": "/tmp/redshift_usage" } }, "metadata_server": { diff --git a/ingestion/examples/workflows/snowflake_usage.json b/ingestion/examples/workflows/snowflake_usage.json index 6be03e1b98e..285df234e5e 100644 --- a/ingestion/examples/workflows/snowflake_usage.json +++ b/ingestion/examples/workflows/snowflake_usage.json @@ -26,8 +26,7 @@ "bulk_sink": { "type": "metadata-usage", "config": { - "filename": "/tmp/snowflake_usage", - "service_name": "snowflake" + "filename": "/tmp/snowflake_usage" } }, "metadata_server": { diff --git a/ingestion/pipelines/sample_usage.json b/ingestion/pipelines/sample_usage.json index 53c5192b671..d6b1d815c7f 100644 --- a/ingestion/pipelines/sample_usage.json +++ b/ingestion/pipelines/sample_usage.json @@ -22,8 +22,7 @@ "bulk_sink": { "type": "metadata-usage", "config": { - "filename": "/tmp/sample_usage", - "service_name": "bigquery_gcp" + "filename": "/tmp/sample_usage" } }, "metadata_server": { diff --git a/ingestion/src/metadata/ingestion/bulksink/metadata_usage.py b/ingestion/src/metadata/ingestion/bulksink/metadata_usage.py index 637c3d6cc24..efab30f3ad3 100644 --- a/ingestion/src/metadata/ingestion/bulksink/metadata_usage.py +++ b/ingestion/src/metadata/ingestion/bulksink/metadata_usage.py @@ -33,7 +33,6 @@ logger = logging.getLogger(__name__) class MetadataUsageSinkConfig(ConfigModel): filename: str - service_name: str class MetadataUsageBulkSink(BulkSink): @@ -48,6 +47,7 @@ class MetadataUsageBulkSink(BulkSink): super().__init__(ctx) self.config = config self.metadata_config = metadata_config + self.service_name = None self.wrote_something = False self.file_handler = open(self.config.filename, "r") self.metadata = OpenMetadata(self.metadata_config) @@ -75,6 +75,7 @@ class MetadataUsageBulkSink(BulkSink): table_usage = TableUsageCount(**json.loads(record)) if "." in table_usage.table: table_usage.table = table_usage.table.split(".")[1] + self.service_name = table_usage.service_name table_entity = self.__get_table_entity( table_usage.database, table_usage.table ) @@ -168,7 +169,7 @@ class MetadataUsageBulkSink(BulkSink): return tbl_column.fullyQualifiedName.__root__ def __get_table_entity(self, database_name: str, table_name: str) -> Table: - table_fqn = f"{self.config.service_name}.{database_name}.{table_name}" + table_fqn = f"{self.service_name}.{database_name}.{table_name}" table_entity = self.metadata.get_by_name(Table, fqdn=table_fqn) return table_entity diff --git a/ingestion/src/metadata/ingestion/models/table_queries.py b/ingestion/src/metadata/ingestion/models/table_queries.py index 7e10c14e8df..f8e00c51da8 100644 --- a/ingestion/src/metadata/ingestion/models/table_queries.py +++ b/ingestion/src/metadata/ingestion/models/table_queries.py @@ -29,6 +29,7 @@ class TableQuery(JsonSerializable): database: str, aborted: bool, sql: str, + service_name: str, ) -> None: """ """ self.query = query @@ -39,6 +40,7 @@ class TableQuery(JsonSerializable): self.database = database self.aborted = aborted self.sql = sql + self.service_name = service_name class TableColumn(BaseModel): @@ -60,6 +62,7 @@ class TableUsageCount(BaseModel): database: str count: int = 1 joins: TableColumnJoins + service_name: str class QueryParserData(BaseModel): @@ -69,6 +72,7 @@ class QueryParserData(BaseModel): date: str database: str sql: str + service_name: str class Config: arbitrary_types_allowed = True diff --git a/ingestion/src/metadata/ingestion/processor/query_parser.py b/ingestion/src/metadata/ingestion/processor/query_parser.py index 2b5cb274a10..aac52a2dc5d 100644 --- a/ingestion/src/metadata/ingestion/processor/query_parser.py +++ b/ingestion/src/metadata/ingestion/processor/query_parser.py @@ -66,6 +66,7 @@ class QueryParserProcessor(Processor): database=record.database, sql=record.sql, date=start_date.strftime("%Y-%m-%d"), + service_name=record.service_name, ) except Exception as err: logger.debug(record.sql) diff --git a/ingestion/src/metadata/ingestion/source/bigquery_usage.py b/ingestion/src/metadata/ingestion/source/bigquery_usage.py index 1e8f85680a2..de9e9c853ba 100644 --- a/ingestion/src/metadata/ingestion/source/bigquery_usage.py +++ b/ingestion/src/metadata/ingestion/source/bigquery_usage.py @@ -106,6 +106,7 @@ class BigqueryUsageSource(Source[TableQuery]): aborted=0, database=str(database), sql=queryConfig["query"], + service_name=self.config.service_name, ) yield tq except Exception as err: diff --git a/ingestion/src/metadata/ingestion/source/redshift_usage.py b/ingestion/src/metadata/ingestion/source/redshift_usage.py index 20a4d6572df..f6a417f097f 100644 --- a/ingestion/src/metadata/ingestion/source/redshift_usage.py +++ b/ingestion/src/metadata/ingestion/source/redshift_usage.py @@ -60,6 +60,7 @@ class RedshiftUsageSource(Source[TableQuery]): def __init__(self, config, metadata_config, ctx): super().__init__(ctx) + self.config = config start, end = get_start_and_end(config.duration) self.sql_stmt = RedshiftUsageSource.SQL_STATEMENT.format( start_time=start, end_time=end @@ -105,6 +106,7 @@ class RedshiftUsageSource(Source[TableQuery]): database=row["database"], aborted=row["aborted"], sql=row["querytxt"], + service_name=self.config.service_name, ) yield tq diff --git a/ingestion/src/metadata/ingestion/source/sample_usage.py b/ingestion/src/metadata/ingestion/source/sample_usage.py index 3b7907cc1d7..e6ccd2ba1ab 100644 --- a/ingestion/src/metadata/ingestion/source/sample_usage.py +++ b/ingestion/src/metadata/ingestion/source/sample_usage.py @@ -63,6 +63,7 @@ class SampleUsageSource(Source[TableQuery]): database="shopify", aborted=False, sql=row["query"], + service_name=self.config.service_name, ) yield tq diff --git a/ingestion/src/metadata/ingestion/source/snowflake_usage.py b/ingestion/src/metadata/ingestion/source/snowflake_usage.py index 7893c906ce0..eee5a1068de 100644 --- a/ingestion/src/metadata/ingestion/source/snowflake_usage.py +++ b/ingestion/src/metadata/ingestion/source/snowflake_usage.py @@ -45,10 +45,9 @@ class SnowflakeUsageSource(Source[TableQuery]): def __init__(self, config, metadata_config, ctx): super().__init__(ctx) + self.config = config start, end = get_start_and_end(config.duration) self.analysis_date = start - print(start) - print(end) self.sql_stmt = SnowflakeUsageSource.SQL_STATEMENT.format( start_date=start, end_date=end ) @@ -92,6 +91,7 @@ class SnowflakeUsageSource(Source[TableQuery]): aborted=True if "1969" in str(row["end_time"]) else False, database=row["database_name"], sql=row["query_text"], + service_name=self.config.service_name, ) if row["schema_name"] is not None: self.report.scanned(f"{row['database_name']}.{row['schema_name']}") diff --git a/ingestion/src/metadata/ingestion/stage/table_usage.py b/ingestion/src/metadata/ingestion/stage/table_usage.py index 544c87c8eaf..0adb9e61d28 100644 --- a/ingestion/src/metadata/ingestion/stage/table_usage.py +++ b/ingestion/src/metadata/ingestion/stage/table_usage.py @@ -112,6 +112,7 @@ class TableUsageStage(Stage[QueryParserData]): database=record.database, date=record.date, joins=joins, + service_name=record.service_name, ) self.table_usage[table] = table_usage_count