Usage fix - removed service_name from bulk_sink (#1956)

* Query Usage fixed

* Usage updated

* Reformatted change

* Usage files reformatted
This commit is contained in:
Ayush Shah 2022-01-04 17:12:10 +05:30 committed by GitHub
parent bcbd3aef22
commit 19e5f737ff
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 20 additions and 14 deletions

View File

@ -59,8 +59,7 @@ config = """
"bulk_sink": {
"type": "metadata-usage",
"config": {
"filename": "/tmp/sample_usage",
"service_name": "bigquery_gcp"
"filename": "/tmp/sample_usage"
}
},
"metadata_server": {

View File

@ -27,8 +27,7 @@
"bulk_sink": {
"type": "metadata-usage",
"config": {
"filename": "/tmp/bigquery_usage",
"service_name": "gcp_bigquery"
"filename": "/tmp/bigquery_usage"
}
},
"metadata_server": {

View File

@ -26,8 +26,7 @@
"bulk_sink": {
"type": "metadata-usage",
"config": {
"filename": "/tmp/redshift_usage",
"service_name": "aws_redshift"
"filename": "/tmp/redshift_usage"
}
},
"metadata_server": {

View File

@ -26,8 +26,7 @@
"bulk_sink": {
"type": "metadata-usage",
"config": {
"filename": "/tmp/snowflake_usage",
"service_name": "snowflake"
"filename": "/tmp/snowflake_usage"
}
},
"metadata_server": {

View File

@ -22,8 +22,7 @@
"bulk_sink": {
"type": "metadata-usage",
"config": {
"filename": "/tmp/sample_usage",
"service_name": "bigquery_gcp"
"filename": "/tmp/sample_usage"
}
},
"metadata_server": {

View File

@ -33,7 +33,6 @@ logger = logging.getLogger(__name__)
class MetadataUsageSinkConfig(ConfigModel):
filename: str
service_name: str
class MetadataUsageBulkSink(BulkSink):
@ -48,6 +47,7 @@ class MetadataUsageBulkSink(BulkSink):
super().__init__(ctx)
self.config = config
self.metadata_config = metadata_config
self.service_name = None
self.wrote_something = False
self.file_handler = open(self.config.filename, "r")
self.metadata = OpenMetadata(self.metadata_config)
@ -75,6 +75,7 @@ class MetadataUsageBulkSink(BulkSink):
table_usage = TableUsageCount(**json.loads(record))
if "." in table_usage.table:
table_usage.table = table_usage.table.split(".")[1]
self.service_name = table_usage.service_name
table_entity = self.__get_table_entity(
table_usage.database, table_usage.table
)
@ -168,7 +169,7 @@ class MetadataUsageBulkSink(BulkSink):
return tbl_column.fullyQualifiedName.__root__
def __get_table_entity(self, database_name: str, table_name: str) -> Table:
table_fqn = f"{self.config.service_name}.{database_name}.{table_name}"
table_fqn = f"{self.service_name}.{database_name}.{table_name}"
table_entity = self.metadata.get_by_name(Table, fqdn=table_fqn)
return table_entity

View File

@ -29,6 +29,7 @@ class TableQuery(JsonSerializable):
database: str,
aborted: bool,
sql: str,
service_name: str,
) -> None:
""" """
self.query = query
@ -39,6 +40,7 @@ class TableQuery(JsonSerializable):
self.database = database
self.aborted = aborted
self.sql = sql
self.service_name = service_name
class TableColumn(BaseModel):
@ -60,6 +62,7 @@ class TableUsageCount(BaseModel):
database: str
count: int = 1
joins: TableColumnJoins
service_name: str
class QueryParserData(BaseModel):
@ -69,6 +72,7 @@ class QueryParserData(BaseModel):
date: str
database: str
sql: str
service_name: str
class Config:
arbitrary_types_allowed = True

View File

@ -66,6 +66,7 @@ class QueryParserProcessor(Processor):
database=record.database,
sql=record.sql,
date=start_date.strftime("%Y-%m-%d"),
service_name=record.service_name,
)
except Exception as err:
logger.debug(record.sql)

View File

@ -106,6 +106,7 @@ class BigqueryUsageSource(Source[TableQuery]):
aborted=0,
database=str(database),
sql=queryConfig["query"],
service_name=self.config.service_name,
)
yield tq
except Exception as err:

View File

@ -60,6 +60,7 @@ class RedshiftUsageSource(Source[TableQuery]):
def __init__(self, config, metadata_config, ctx):
super().__init__(ctx)
self.config = config
start, end = get_start_and_end(config.duration)
self.sql_stmt = RedshiftUsageSource.SQL_STATEMENT.format(
start_time=start, end_time=end
@ -105,6 +106,7 @@ class RedshiftUsageSource(Source[TableQuery]):
database=row["database"],
aborted=row["aborted"],
sql=row["querytxt"],
service_name=self.config.service_name,
)
yield tq

View File

@ -63,6 +63,7 @@ class SampleUsageSource(Source[TableQuery]):
database="shopify",
aborted=False,
sql=row["query"],
service_name=self.config.service_name,
)
yield tq

View File

@ -45,10 +45,9 @@ class SnowflakeUsageSource(Source[TableQuery]):
def __init__(self, config, metadata_config, ctx):
super().__init__(ctx)
self.config = config
start, end = get_start_and_end(config.duration)
self.analysis_date = start
print(start)
print(end)
self.sql_stmt = SnowflakeUsageSource.SQL_STATEMENT.format(
start_date=start, end_date=end
)
@ -92,6 +91,7 @@ class SnowflakeUsageSource(Source[TableQuery]):
aborted=True if "1969" in str(row["end_time"]) else False,
database=row["database_name"],
sql=row["query_text"],
service_name=self.config.service_name,
)
if row["schema_name"] is not None:
self.report.scanned(f"{row['database_name']}.{row['schema_name']}")

View File

@ -112,6 +112,7 @@ class TableUsageStage(Stage[QueryParserData]):
database=record.database,
date=record.date,
joins=joins,
service_name=record.service_name,
)
self.table_usage[table] = table_usage_count