diff --git a/ingestion/src/metadata/profiler/metrics/system/queries/redshift.py b/ingestion/src/metadata/profiler/metrics/system/queries/redshift.py index c6a3d61fa4d..1e76ab80b98 100644 --- a/ingestion/src/metadata/profiler/metrics/system/queries/redshift.py +++ b/ingestion/src/metadata/profiler/metrics/system/queries/redshift.py @@ -35,19 +35,17 @@ STL_QUERY = """ sti."database", sti."schema", sti."table", - sq.text, DATE_TRUNC('second', data.starttime) AS starttime FROM data INNER JOIN pg_catalog.svv_table_info sti ON data.tbl = sti.table_id - INNER JOIN pg_catalog.stl_querytext sq ON data.query = sq.query where sti."database" = '{database}' AND sti."schema" = '{schema}' AND "rows" != 0 AND DATE(data.starttime) >= CURRENT_DATE - 1 - GROUP BY 2,3,4,5,6 - ORDER BY 6 DESC + GROUP BY 2,3,4,5 + ORDER BY 5 DESC """ @@ -73,7 +71,7 @@ def get_query_results( database_name=row.database, schema_name=row.schema, table_name=row.table, - query_text=row.text, + query_text=None, query_type=operation, timestamp=row.starttime, rows=row.rows, diff --git a/ingestion/src/metadata/profiler/metrics/system/system.py b/ingestion/src/metadata/profiler/metrics/system/system.py index 5f3b7608c31..b57a4ae3857 100644 --- a/ingestion/src/metadata/profiler/metrics/system/system.py +++ b/ingestion/src/metadata/profiler/metrics/system/system.py @@ -113,10 +113,6 @@ def _( dataset_id = table.__table_args__["schema"] # type: ignore metric_results: List[Dict] = [] - # QueryResult = namedtuple( - # "QueryResult", - # "query_type,timestamp,destination_table,dml_statistics", - # ) jobs = get_value_from_cache( SYSTEM_QUERY_RESULT_CACHE, f"{Dialects.BigQuery}.{project_id}.{dataset_id}.jobs" diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityTimeSeriesDAO.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityTimeSeriesDAO.java index 07155108c3d..0ae1150be04 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityTimeSeriesDAO.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityTimeSeriesDAO.java @@ -393,6 +393,21 @@ public interface EntityTimeSeriesDAO { return getLatestExtensionByKeyInternal(getTimeSeriesTableName(), value, entityFQN, extension, mysqlCond, psqlCond); } + default void storeTimeSeriesWithOperation( + String fqn, + String extension, + String jsonSchema, + String entityJson, + Long timestamp, + String operation, + boolean update) { + if (update) { + updateExtensionByOperation(fqn, extension, entityJson, timestamp, operation); + } else { + insert(fqn, extension, jsonSchema, entityJson); + } + } + /** @deprecated */ @SqlQuery("SELECT DISTINCT entityFQN FROM WHERE entityFQNHash = '' or entityFQNHash is null LIMIT :limit") @Deprecated(since = "1.1.1") diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TableRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TableRepository.java index a00f4191714..1ff2ce0c1eb 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TableRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/TableRepository.java @@ -379,13 +379,26 @@ public class TableRepository extends EntityRepository
{ List systemProfiles = createTableProfile.getSystemProfile(); if (systemProfiles != null && !systemProfiles.isEmpty()) { for (SystemProfile systemProfile : createTableProfile.getSystemProfile()) { + // system metrics timestamp is the one of the operation. We'll need to + // update the entry if it already exists in the database + String storedSystemProfile = + daoCollection + .profilerDataTimeSeriesDao() + .getExtensionAtTimestampWithOperation( + table.getFullyQualifiedName(), + SYSTEM_PROFILE_EXTENSION, + systemProfile.getTimestamp(), + systemProfile.getOperation().value()); daoCollection .profilerDataTimeSeriesDao() - .insert( + .storeTimeSeriesWithOperation( table.getFullyQualifiedName(), SYSTEM_PROFILE_EXTENSION, "systemProfile", - JsonUtils.pojoToJson(systemProfile)); + JsonUtils.pojoToJson(systemProfile), + systemProfile.getTimestamp(), + systemProfile.getOperation().value(), + storedSystemProfile != null); } }