fix: added upsert logic back for system metrics (#13092)

This commit is contained in:
Teddy 2023-09-06 08:12:38 +02:00 committed by GitHub
parent 9e9319096a
commit c52af7eba0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 33 additions and 11 deletions

View File

@ -35,19 +35,17 @@ STL_QUERY = """
sti."database",
sti."schema",
sti."table",
sq.text,
DATE_TRUNC('second', data.starttime) AS starttime
FROM
data
INNER JOIN pg_catalog.svv_table_info sti ON data.tbl = sti.table_id
INNER JOIN pg_catalog.stl_querytext sq ON data.query = sq.query
where
sti."database" = '{database}' AND
sti."schema" = '{schema}' AND
"rows" != 0 AND
DATE(data.starttime) >= CURRENT_DATE - 1
GROUP BY 2,3,4,5,6
ORDER BY 6 DESC
GROUP BY 2,3,4,5
ORDER BY 5 DESC
"""
@ -73,7 +71,7 @@ def get_query_results(
database_name=row.database,
schema_name=row.schema,
table_name=row.table,
query_text=row.text,
query_text=None,
query_type=operation,
timestamp=row.starttime,
rows=row.rows,

View File

@ -113,10 +113,6 @@ def _(
dataset_id = table.__table_args__["schema"] # type: ignore
metric_results: List[Dict] = []
# QueryResult = namedtuple(
# "QueryResult",
# "query_type,timestamp,destination_table,dml_statistics",
# )
jobs = get_value_from_cache(
SYSTEM_QUERY_RESULT_CACHE, f"{Dialects.BigQuery}.{project_id}.{dataset_id}.jobs"

View File

@ -393,6 +393,21 @@ public interface EntityTimeSeriesDAO {
return getLatestExtensionByKeyInternal(getTimeSeriesTableName(), value, entityFQN, extension, mysqlCond, psqlCond);
}
default void storeTimeSeriesWithOperation(
String fqn,
String extension,
String jsonSchema,
String entityJson,
Long timestamp,
String operation,
boolean update) {
if (update) {
updateExtensionByOperation(fqn, extension, entityJson, timestamp, operation);
} else {
insert(fqn, extension, jsonSchema, entityJson);
}
}
/** @deprecated */
@SqlQuery("SELECT DISTINCT entityFQN FROM <table> WHERE entityFQNHash = '' or entityFQNHash is null LIMIT :limit")
@Deprecated(since = "1.1.1")

View File

@ -379,13 +379,26 @@ public class TableRepository extends EntityRepository<Table> {
List<SystemProfile> systemProfiles = createTableProfile.getSystemProfile();
if (systemProfiles != null && !systemProfiles.isEmpty()) {
for (SystemProfile systemProfile : createTableProfile.getSystemProfile()) {
// system metrics timestamp is the one of the operation. We'll need to
// update the entry if it already exists in the database
String storedSystemProfile =
daoCollection
.profilerDataTimeSeriesDao()
.getExtensionAtTimestampWithOperation(
table.getFullyQualifiedName(),
SYSTEM_PROFILE_EXTENSION,
systemProfile.getTimestamp(),
systemProfile.getOperation().value());
daoCollection
.profilerDataTimeSeriesDao()
.insert(
.storeTimeSeriesWithOperation(
table.getFullyQualifiedName(),
SYSTEM_PROFILE_EXTENSION,
"systemProfile",
JsonUtils.pojoToJson(systemProfile));
JsonUtils.pojoToJson(systemProfile),
systemProfile.getTimestamp(),
systemProfile.getOperation().value(),
storedSystemProfile != null);
}
}