Capture Query Cost (#19991)

This commit is contained in:
Mayur Singal 2025-03-06 19:49:59 +05:30 committed by GitHub
parent 242e85a797
commit c1592b54c5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
39 changed files with 2277 additions and 13 deletions

View File

@ -1,3 +1,18 @@
UPDATE workflow_definition_entity UPDATE workflow_definition_entity
SET json = JSON_REMOVE(json, '$.type') SET json = JSON_REMOVE(json, '$.type')
WHERE JSON_EXTRACT(json, '$.type') IS NOT NULL; WHERE JSON_EXTRACT(json, '$.type') IS NOT NULL;
-- Query Cost History Time Series
CREATE TABLE query_cost_time_series (
id varchar(36) GENERATED ALWAYS AS (json_unquote(json_extract(json,'$.id'))) VIRTUAL NOT NULL,
cost float GENERATED ALWAYS AS (json_unquote(json_extract(json,'$.cost'))) VIRTUAL NOT NULL,
count int GENERATED ALWAYS AS (json_unquote(json_extract(json,'$.count'))) VIRTUAL NULL,
timestamp bigint unsigned GENERATED ALWAYS AS (json_unquote(json_extract(json,'$.timestamp'))) VIRTUAL NOT NULL,
jsonSchema varchar(256) NOT NULL,
json json NOT NULL,
entityFQNHash varchar(768) CHARACTER SET ascii COLLATE ascii_bin DEFAULT NULL,
CONSTRAINT query_cost_unique_constraint UNIQUE (timestamp,entityFQNHash),
INDEX (id),
INDEX (id, timestamp)
) DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

View File

@ -1,3 +1,17 @@
UPDATE workflow_definition_entity UPDATE workflow_definition_entity
SET json = json - 'type' SET json = json - 'type'
WHERE json->>'type' IS NOT NULL; WHERE json->>'type' IS NOT NULL;
-- Query Cost History Time Series
CREATE TABLE query_cost_time_series (
id varchar(36) GENERATED ALWAYS AS (json ->> 'id') STORED NOT NULL,
cost real GENERATED ALWAYS AS (json ->> 'cost') STORED NOT NULL,
count int GENERATED ALWAYS AS (json ->> 'count') STORED NOT NULL,
timestamp bigint GENERATED ALWAYS AS ((json ->> 'timestamp')::bigint) STORED NOT NULL,
jsonSchema varchar(256) NOT NULL,
json jsonb NOT NULL,
entityFQNHash varchar(768) COLLATE "C" DEFAULT NULL,
CONSTRAINT query_cost_unique_constraint UNIQUE (id, timestamp, entityFQNHash)
);
CREATE INDEX IF NOT EXISTS query_cost_time_series_id on query_cost_time_series (id);
CREATE INDEX IF NOT EXISTS query_cost_time_series_id_timestamp on test_case_resolution_status_time_series (id, timestamp);

View File

@ -41,7 +41,11 @@ from metadata.generated.schema.entity.services.ingestionPipelines.status import
from metadata.generated.schema.entity.teams.user import User from metadata.generated.schema.entity.teams.user import User
from metadata.generated.schema.type.basic import Timestamp from metadata.generated.schema.type.basic import Timestamp
from metadata.generated.schema.type.lifeCycle import AccessDetails, LifeCycle from metadata.generated.schema.type.lifeCycle import AccessDetails, LifeCycle
from metadata.generated.schema.type.tableUsageCount import TableColumn, TableUsageCount from metadata.generated.schema.type.tableUsageCount import (
QueryCostWrapper,
TableColumn,
TableUsageCount,
)
from metadata.generated.schema.type.usageRequest import UsageRequest from metadata.generated.schema.type.usageRequest import UsageRequest
from metadata.ingestion.api.steps import BulkSink from metadata.ingestion.api.steps import BulkSink
from metadata.ingestion.lineage.sql_lineage import ( from metadata.ingestion.lineage.sql_lineage import (
@ -163,7 +167,7 @@ class MetadataUsageBulkSink(BulkSink):
) )
) )
def iterate_files(self): def iterate_files(self, usage_files: bool = True):
""" """
Iterate through files in the given directory Iterate through files in the given directory
""" """
@ -173,11 +177,16 @@ class MetadataUsageBulkSink(BulkSink):
full_file_name = os.path.join(self.config.filename, filename) full_file_name = os.path.join(self.config.filename, filename)
if not os.path.isfile(full_file_name): if not os.path.isfile(full_file_name):
continue continue
with open(full_file_name, encoding=UTF_8) as file: # if usage_files is True, then we want to iterate through files does not end with query
yield file # if usage_files is False, then we want to iterate through files that end with query
if filename.endswith("query") ^ usage_files:
with open(full_file_name, encoding=UTF_8) as file:
yield file
# Check here how to properly pick up ES and/or table query data def handle_table_usage(self) -> None:
def run(self) -> None: """
Handle table usage.
"""
for file_handler in self.iterate_files(): for file_handler in self.iterate_files():
self.table_usage_map = {} self.table_usage_map = {}
for usage_record in file_handler.readlines(): for usage_record in file_handler.readlines():
@ -210,6 +219,18 @@ class MetadataUsageBulkSink(BulkSink):
self.__publish_usage_records() self.__publish_usage_records()
def handle_query_cost(self) -> None:
for file_handler in self.iterate_files(usage_files=False):
for usage_record in file_handler.readlines():
record = json.loads(usage_record)
cost_record = QueryCostWrapper(**record)
self.metadata.publish_query_cost(cost_record, self.service_name)
# Check here how to properly pick up ES and/or table query data
def run(self) -> None:
self.handle_table_usage()
self.handle_query_cost()
def get_table_usage_and_joins( def get_table_usage_and_joins(
self, table_entities: List[Table], table_usage: TableUsageCount self, table_entities: List[Table], table_usage: TableUsageCount
): ):

View File

@ -14,6 +14,7 @@ Query masking utilities
import traceback import traceback
from cachetools import LRUCache
from collate_sqllineage.runner import SQLPARSE_DIALECT, LineageRunner from collate_sqllineage.runner import SQLPARSE_DIALECT, LineageRunner
from sqlparse.sql import Comparison from sqlparse.sql import Comparison
from sqlparse.tokens import Literal, Number, String from sqlparse.tokens import Literal, Number, String
@ -22,6 +23,8 @@ from metadata.ingestion.lineage.models import Dialect
MASK_TOKEN = "?" MASK_TOKEN = "?"
# Cache size is 128 to avoid memory issues
masked_query_cache = LRUCache(maxsize=128)
# pylint: disable=protected-access # pylint: disable=protected-access
def get_logger(): def get_logger():
@ -112,8 +115,13 @@ def mask_literals_with_sqlfluff(query: str, parser: LineageRunner) -> str:
def mask_query( def mask_query(
query: str, dialect: str = Dialect.ANSI.value, parser: LineageRunner = None query: str, dialect: str = Dialect.ANSI.value, parser: LineageRunner = None
) -> str: ) -> str:
"""
Mask a query using sqlparse or sqlfluff.
"""
logger = get_logger() logger = get_logger()
try: try:
if masked_query_cache.get((query, dialect)):
return masked_query_cache.get((query, dialect))
if not parser: if not parser:
try: try:
parser = LineageRunner(query, dialect=dialect) parser = LineageRunner(query, dialect=dialect)
@ -122,8 +130,11 @@ def mask_query(
parser = LineageRunner(query) parser = LineageRunner(query)
len(parser.source_tables) len(parser.source_tables)
if parser._dialect == SQLPARSE_DIALECT: if parser._dialect == SQLPARSE_DIALECT:
return mask_literals_with_sqlparse(query, parser) masked_query = mask_literals_with_sqlparse(query, parser)
return mask_literals_with_sqlfluff(query, parser) else:
masked_query = mask_literals_with_sqlfluff(query, parser)
masked_query_cache[(query, dialect)] = masked_query
return masked_query
except Exception as exc: except Exception as exc:
logger.debug(f"Failed to mask query with sqlfluff: {exc}") logger.debug(f"Failed to mask query with sqlfluff: {exc}")
logger.debug(traceback.format_exc()) logger.debug(traceback.format_exc())

View File

@ -15,14 +15,20 @@ To be used by OpenMetadata class
""" """
import hashlib import hashlib
import json import json
from functools import lru_cache
from typing import List, Optional, Union from typing import List, Optional, Union
from metadata.generated.schema.api.data.createQuery import CreateQueryRequest from metadata.generated.schema.api.data.createQuery import CreateQueryRequest
from metadata.generated.schema.api.data.createQueryCostRecord import (
CreateQueryCostRecordRequest,
)
from metadata.generated.schema.entity.data.dashboard import Dashboard from metadata.generated.schema.entity.data.dashboard import Dashboard
from metadata.generated.schema.entity.data.query import Query from metadata.generated.schema.entity.data.query import Query
from metadata.generated.schema.entity.data.queryCostRecord import QueryCostRecord
from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.type.basic import Uuid from metadata.generated.schema.type.basic import Uuid
from metadata.generated.schema.type.entityReference import EntityReference from metadata.generated.schema.type.entityReference import EntityReference
from metadata.generated.schema.type.tableUsageCount import QueryCostWrapper
from metadata.ingestion.lineage.masker import mask_query from metadata.ingestion.lineage.masker import mask_query
from metadata.ingestion.ometa.client import REST from metadata.ingestion.ometa.client import REST
from metadata.ingestion.ometa.utils import model_str from metadata.ingestion.ometa.utils import model_str
@ -115,3 +121,40 @@ class OMetaQueryMixin:
if res and res.get("data"): if res and res.get("data"):
return [Query(**query) for query in res.get("data")] return [Query(**query) for query in res.get("data")]
return None return None
@lru_cache(maxsize=5000)
def __get_query_by_hash(
self, query_hash: str, service_name: str
) -> Optional[Query]:
return self.get_by_name(entity=Query, fqn=f"{service_name}.{query_hash}")
def publish_query_cost(self, query_cost_data: QueryCostWrapper, service_name: str):
"""
Create Query Cost Record
Args:
query_cost_record: QueryCostWrapper
"""
masked_query = mask_query(query_cost_data.query, query_cost_data.dialect)
query_hash = self._get_query_hash(masked_query)
query = self.__get_query_by_hash(
query_hash=query_hash, service_name=service_name
)
if not query:
return None
create_request = CreateQueryCostRecordRequest(
timestamp=int(query_cost_data.date),
jsonSchema="queryCostRecord",
queryReference=EntityReference(id=query.id.root, type="query"),
cost=query_cost_data.cost,
count=query_cost_data.count,
totalDuration=query_cost_data.totalDuration,
)
return self.client.post(
self.get_suffix(QueryCostRecord), data=create_request.model_dump_json()
)

View File

@ -46,6 +46,9 @@ from metadata.generated.schema.api.data.createMetric import CreateMetricRequest
from metadata.generated.schema.api.data.createMlModel import CreateMlModelRequest from metadata.generated.schema.api.data.createMlModel import CreateMlModelRequest
from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest
from metadata.generated.schema.api.data.createQuery import CreateQueryRequest from metadata.generated.schema.api.data.createQuery import CreateQueryRequest
from metadata.generated.schema.api.data.createQueryCostRecord import (
CreateQueryCostRecordRequest,
)
from metadata.generated.schema.api.data.createSearchIndex import ( from metadata.generated.schema.api.data.createSearchIndex import (
CreateSearchIndexRequest, CreateSearchIndexRequest,
) )
@ -133,6 +136,7 @@ from metadata.generated.schema.entity.data.metric import Metric
from metadata.generated.schema.entity.data.mlmodel import MlModel from metadata.generated.schema.entity.data.mlmodel import MlModel
from metadata.generated.schema.entity.data.pipeline import Pipeline from metadata.generated.schema.entity.data.pipeline import Pipeline
from metadata.generated.schema.entity.data.query import Query from metadata.generated.schema.entity.data.query import Query
from metadata.generated.schema.entity.data.queryCostRecord import QueryCostRecord
from metadata.generated.schema.entity.data.report import Report from metadata.generated.schema.entity.data.report import Report
from metadata.generated.schema.entity.data.searchIndex import SearchIndex from metadata.generated.schema.entity.data.searchIndex import SearchIndex
from metadata.generated.schema.entity.data.storedProcedure import StoredProcedure from metadata.generated.schema.entity.data.storedProcedure import StoredProcedure
@ -280,4 +284,7 @@ ROUTES = {
CreateAppMarketPlaceDefinitionRequest.__name__: "/apps/marketplace", CreateAppMarketPlaceDefinitionRequest.__name__: "/apps/marketplace",
# Settings # Settings
Settings.__name__: "/system/settings", Settings.__name__: "/system/settings",
# Query Cost
QueryCostRecord.__name__: "/queryCostRecord",
CreateQueryCostRecordRequest.__name__: "/queryCostRecord",
} }

View File

@ -66,6 +66,7 @@ def parse_sql_statement(record: TableQuery, dialect: Dialect) -> Optional[Parsed
date=str(start_time), date=str(start_time),
serviceName=record.serviceName, serviceName=record.serviceName,
duration=record.duration, duration=record.duration,
cost=record.cost,
) )

View File

@ -23,6 +23,7 @@ from metadata.ingestion.api.steps import Source
from metadata.ingestion.connections.test_connections import ( from metadata.ingestion.connections.test_connections import (
raise_test_connection_exception, raise_test_connection_exception,
) )
from metadata.ingestion.lineage.masker import masked_query_cache
from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper
from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.connections import get_test_connection_fn from metadata.ingestion.source.connections import get_test_connection_fn
@ -129,7 +130,8 @@ class QueryParserSource(Source, ABC):
yield self.engine yield self.engine
def close(self): def close(self):
"""By default, there is nothing to close""" # Clear the cache
masked_query_cache.clear()
def test_connection(self) -> None: def test_connection(self) -> None:
test_connection_fn = get_test_connection_fn(self.service_connection) test_connection_fn = get_test_connection_fn(self.service_connection)

View File

@ -24,7 +24,8 @@ SNOWFLAKE_SQL_STATEMENT = textwrap.dedent(
schema_name "schema_name", schema_name "schema_name",
start_time "start_time", start_time "start_time",
end_time "end_time", end_time "end_time",
total_elapsed_time "duration" total_elapsed_time "duration",
CREDITS_USED_CLOUD_SERVICES * {credit_cost} as "cost"
from {account_usage}.query_history from {account_usage}.query_history
WHERE query_text NOT LIKE '/* {{"app": "OpenMetadata", %%}} */%%' WHERE query_text NOT LIKE '/* {{"app": "OpenMetadata", %%}} */%%'
AND query_text NOT LIKE '/* {{"app": "dbt", %%}} */%%' AND query_text NOT LIKE '/* {{"app": "dbt", %%}} */%%'

View File

@ -61,6 +61,8 @@ class SnowflakeQueryParserSource(QueryParserSource, ABC):
result_limit=self.config.sourceConfig.config.resultLimit, result_limit=self.config.sourceConfig.config.resultLimit,
filters=self.get_filters(), filters=self.get_filters(),
account_usage=self.service_connection.accountUsageSchema, account_usage=self.service_connection.accountUsageSchema,
credit_cost=self.service_connection.creditCost
* self.service_connection.creditCost,
) )
def check_life_cycle_query( def check_life_cycle_query(

View File

@ -72,6 +72,7 @@ class UsageSource(QueryParserSource, ABC):
startTime=query_dict.get("start_time", ""), startTime=query_dict.get("start_time", ""),
endTime=query_dict.get("end_time", ""), endTime=query_dict.get("end_time", ""),
duration=query_dict.get("duration"), duration=query_dict.get("duration"),
cost=query_dict.get("cost"),
analysisDate=DateTime(analysis_date), analysisDate=DateTime(analysis_date),
aborted=self.get_aborted_status(query_dict), aborted=self.get_aborted_status(query_dict),
databaseName=self.get_database_name(query_dict), databaseName=self.get_database_name(query_dict),
@ -142,6 +143,7 @@ class UsageSource(QueryParserSource, ABC):
duration=row.get("duration"), duration=row.get("duration"),
serviceName=self.config.serviceName, serviceName=self.config.serviceName,
databaseSchema=self.get_schema_name(row), databaseSchema=self.get_schema_name(row),
cost=row.get("cost"),
) )
) )
except Exception as exc: except Exception as exc:

View File

@ -32,7 +32,7 @@ from metadata.ingestion.api.models import Either
from metadata.ingestion.api.steps import Stage from metadata.ingestion.api.steps import Stage
from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils.constants import UTF_8 from metadata.utils.constants import UTF_8
from metadata.utils.helpers import init_staging_dir from metadata.utils.helpers import get_query_hash, init_staging_dir
from metadata.utils.logger import ingestion_logger from metadata.utils.logger import ingestion_logger
logger = ingestion_logger() logger = ingestion_logger()
@ -62,8 +62,10 @@ class TableUsageStage(Stage):
self.metadata = metadata self.metadata = metadata
self.table_usage = {} self.table_usage = {}
self.table_queries = {} self.table_queries = {}
self.query_cost = {}
init_staging_dir(self.config.filename) init_staging_dir(self.config.filename)
self.wrote_something = False self.wrote_something = False
self.service_name = ""
@property @property
def name(self) -> str: def name(self) -> str:
@ -107,6 +109,7 @@ class TableUsageStage(Stage):
def _add_sql_query(self, record, table): def _add_sql_query(self, record, table):
users, used_by = self._get_user_entity(record.userName) users, used_by = self._get_user_entity(record.userName)
if self.table_queries.get((table, record.date)): if self.table_queries.get((table, record.date)):
self.service_name = record.serviceName
self.table_queries[(table, record.date)].append( self.table_queries[(table, record.date)].append(
CreateQueryRequest( CreateQueryRequest(
query=record.sql, query=record.sql,
@ -172,6 +175,30 @@ class TableUsageStage(Stage):
) )
yield Either(right=table) yield Either(right=table)
def _handle_query_cost(self, parsed_data: ParsedData):
query_hash = get_query_hash(parsed_data.sql)
if (query_hash, parsed_data.date) in self.query_cost:
self.query_cost[(query_hash, parsed_data.date)].update(
{
"cost": self.query_cost[(query_hash, parsed_data.date)]["cost"]
+ (parsed_data.cost or 0),
"count": self.query_cost[(query_hash, parsed_data.date)]["count"]
+ 1,
"totalDuration": self.query_cost[(query_hash, parsed_data.date)][
"totalDuration"
]
+ (parsed_data.duration or 0),
}
)
else:
self.query_cost[(query_hash, parsed_data.date)] = {
"cost": parsed_data.cost or 0,
"count": 1,
"query": parsed_data.sql,
"dialect": parsed_data.dialect,
"totalDuration": parsed_data.duration or 0,
}
def _run(self, record: QueryParserData) -> Iterable[Either[str]]: def _run(self, record: QueryParserData) -> Iterable[Either[str]]:
""" """
Process the parsed data and store it in a file Process the parsed data and store it in a file
@ -187,9 +214,13 @@ class TableUsageStage(Stage):
yield from self._handle_table_usage( yield from self._handle_table_usage(
parsed_data=parsed_data, table=table parsed_data=parsed_data, table=table
) )
self._handle_query_cost(parsed_data)
self.dump_data_to_file() self.dump_data_to_file()
def dump_data_to_file(self): def dump_data_to_file(self):
"""
Dump the table usage data to a file.
"""
for key, value in self.table_usage.items(): for key, value in self.table_usage.items():
if value: if value:
value.sqlQueries = self.table_queries.get(key, []) value.sqlQueries = self.table_queries.get(key, [])
@ -202,6 +233,27 @@ class TableUsageStage(Stage):
file.write(json.dumps(data)) file.write(json.dumps(data))
file.write("\n") file.write("\n")
for key, value in self.query_cost.items():
if value:
data = {
"queryHash": key[0],
"date": key[1],
"cost": value["cost"],
"count": value["count"],
"query": value["query"],
"dialect": value["dialect"],
"totalDuration": value["totalDuration"],
}
with open(
os.path.join(
self.config.filename, f"{self.service_name}_{key[1]}_query"
),
"a+",
encoding=UTF_8,
) as file:
file.write(json.dumps(data))
file.write("\n")
def close(self) -> None: def close(self) -> None:
""" """
Nothing to close. Data is being dumped inside a context manager Nothing to close. Data is being dumped inside a context manager

View File

@ -15,6 +15,7 @@ Helpers module for ingestion related methods
from __future__ import annotations from __future__ import annotations
import hashlib
import itertools import itertools
import re import re
import shutil import shutil
@ -521,3 +522,8 @@ def retry_with_docker_host(config: Optional[WorkflowSource] = None):
return wrapper return wrapper
return decorator return decorator
def get_query_hash(query: str) -> str:
result = hashlib.md5(query.encode())
return str(result.hexdigest())

View File

@ -175,6 +175,7 @@ public final class Entity {
public static final String MLMODEL = "mlmodel"; public static final String MLMODEL = "mlmodel";
public static final String CONTAINER = "container"; public static final String CONTAINER = "container";
public static final String QUERY = "query"; public static final String QUERY = "query";
public static final String QUERY_COST_RECORD = "queryCostRecord";
public static final String GLOSSARY = "glossary"; public static final String GLOSSARY = "glossary";
public static final String GLOSSARY_TERM = "glossaryTerm"; public static final String GLOSSARY_TERM = "glossaryTerm";

View File

@ -1,6 +1,7 @@
package org.openmetadata.service.apps.bundles.searchIndex; package org.openmetadata.service.apps.bundles.searchIndex;
import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty; import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty;
import static org.openmetadata.service.Entity.QUERY_COST_RECORD;
import static org.openmetadata.service.Entity.TEST_CASE_RESOLUTION_STATUS; import static org.openmetadata.service.Entity.TEST_CASE_RESOLUTION_STATUS;
import static org.openmetadata.service.Entity.TEST_CASE_RESULT; import static org.openmetadata.service.Entity.TEST_CASE_RESULT;
import static org.openmetadata.service.apps.scheduler.AppScheduler.ON_DEMAND_JOB; import static org.openmetadata.service.apps.scheduler.AppScheduler.ON_DEMAND_JOB;
@ -70,7 +71,8 @@ public class SearchIndexApp extends AbstractNativeApplication {
ReportData.ReportDataType.WEB_ANALYTIC_ENTITY_VIEW_REPORT_DATA.value(), ReportData.ReportDataType.WEB_ANALYTIC_ENTITY_VIEW_REPORT_DATA.value(),
ReportData.ReportDataType.AGGREGATED_COST_ANALYSIS_REPORT_DATA.value(), ReportData.ReportDataType.AGGREGATED_COST_ANALYSIS_REPORT_DATA.value(),
TEST_CASE_RESOLUTION_STATUS, TEST_CASE_RESOLUTION_STATUS,
TEST_CASE_RESULT); TEST_CASE_RESULT,
QUERY_COST_RECORD);
// Constants to replace magic numbers // Constants to replace magic numbers
private BulkSink searchIndexSink; private BulkSink searchIndexSink;

View File

@ -195,6 +195,9 @@ public interface CollectionDAO {
@CreateSqlObject @CreateSqlObject
TestCaseResolutionStatusTimeSeriesDAO testCaseResolutionStatusTimeSeriesDao(); TestCaseResolutionStatusTimeSeriesDAO testCaseResolutionStatusTimeSeriesDao();
@CreateSqlObject
QueryCostTimeSeriesDAO queryCostRecordTimeSeriesDAO();
@CreateSqlObject @CreateSqlObject
TestCaseResultTimeSeriesDAO testCaseResultTimeSeriesDao(); TestCaseResultTimeSeriesDAO testCaseResultTimeSeriesDao();
@ -5166,6 +5169,34 @@ public interface CollectionDAO {
} }
} }
interface QueryCostTimeSeriesDAO extends EntityTimeSeriesDAO {
@Override
default String getTimeSeriesTableName() {
return "query_cost_time_series";
}
// TODO: Do not change id on override... updating json changed the id as well
@ConnectionAwareSqlUpdate(
value =
"INSERT INTO <table>(entityFQNHash, jsonSchema, json) "
+ "VALUES (:entityFQNHash, :jsonSchema, :json) ON DUPLICATE KEY UPDATE"
+ " json = VALUES(json);",
connectionType = MYSQL)
@ConnectionAwareSqlUpdate(
value =
"INSERT INTO <table>(entityFQNHash, jsonSchema, json) "
+ "VALUES (:entityFQNHash, :jsonSchema, (:json :: jsonb)) "
+ "ON CONFLICT (entityFQNHash, timestamp)"
+ "DO UPDATE SET"
+ "json = EXCLUDED.json",
connectionType = POSTGRES)
void insertWithoutExtension(
@Define("table") String table,
@BindFQN("entityFQNHash") String entityFQNHash,
@Bind("jsonSchema") String jsonSchema,
@Bind("json") String json);
}
interface TestCaseResolutionStatusTimeSeriesDAO extends EntityTimeSeriesDAO { interface TestCaseResolutionStatusTimeSeriesDAO extends EntityTimeSeriesDAO {
@Override @Override
default String getTimeSeriesTableName() { default String getTimeSeriesTableName() {

View File

@ -0,0 +1,16 @@
package org.openmetadata.service.jdbi3;
import org.openmetadata.schema.entity.data.QueryCostRecord;
import org.openmetadata.service.Entity;
import org.openmetadata.service.resources.query.QueryCostResource;
public class QueryCostRepository extends EntityTimeSeriesRepository<QueryCostRecord> {
public QueryCostRepository() {
super(
QueryCostResource.COLLECTION_PATH,
Entity.getCollectionDAO().queryCostRecordTimeSeriesDAO(),
QueryCostRecord.class,
Entity.QUERY_COST_RECORD);
}
}

View File

@ -0,0 +1,28 @@
package org.openmetadata.service.resources.query;
import org.openmetadata.schema.entity.data.CreateQueryCostRecord;
import org.openmetadata.schema.entity.data.Query;
import org.openmetadata.schema.entity.data.QueryCostRecord;
import org.openmetadata.schema.entity.teams.User;
import org.openmetadata.schema.type.Include;
import org.openmetadata.service.Entity;
import org.openmetadata.service.mapper.EntityTimeSeriesMapper;
public class QueryCostRecordMapper
implements EntityTimeSeriesMapper<QueryCostRecord, CreateQueryCostRecord> {
@Override
public QueryCostRecord createToEntity(CreateQueryCostRecord create, String user) {
Query query =
Entity.getEntity(Entity.QUERY, create.getQueryReference().getId(), null, Include.ALL);
User userEntity = Entity.getEntityByName(Entity.USER, user, null, Include.ALL);
return new QueryCostRecord()
.withTimestamp(create.getTimestamp())
.withCost(create.getCost())
.withCount(create.getCount())
.withTotalDuration(create.getTotalDuration())
.withUpdatedAt(System.currentTimeMillis())
.withUpdatedBy(userEntity.getEntityReference())
.withQueryReference(query.getEntityReference());
}
}

View File

@ -0,0 +1,109 @@
package org.openmetadata.service.resources.query;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.media.Content;
import io.swagger.v3.oas.annotations.media.Schema;
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import io.swagger.v3.oas.annotations.tags.Tag;
import java.util.UUID;
import javax.validation.Valid;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.SecurityContext;
import javax.ws.rs.core.UriInfo;
import org.openmetadata.schema.entity.data.CreateQueryCostRecord;
import org.openmetadata.schema.entity.data.QueryCostRecord;
import org.openmetadata.schema.type.MetadataOperation;
import org.openmetadata.service.Entity;
import org.openmetadata.service.jdbi3.QueryCostRepository;
import org.openmetadata.service.resources.Collection;
import org.openmetadata.service.resources.EntityTimeSeriesResource;
import org.openmetadata.service.security.Authorizer;
import org.openmetadata.service.security.policyevaluator.OperationContext;
import org.openmetadata.service.security.policyevaluator.ResourceContext;
import org.openmetadata.service.security.policyevaluator.ResourceContextInterface;
@Path("/v1/queryCostRecord")
@Tag(
name = "Query Cost Record Manager",
description = "APIs to query cost records from usage workflow.")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
@Collection(name = "QueriesCost")
public class QueryCostResource
extends EntityTimeSeriesResource<QueryCostRecord, QueryCostRepository> {
public static final String COLLECTION_PATH = "v1/queryCostRecord";
private final QueryCostRecordMapper mapper = new QueryCostRecordMapper();
public QueryCostResource(Authorizer authorizer) {
super(Entity.QUERY_COST_RECORD, authorizer);
}
@GET
@Path("/{id}")
@Operation(
operationId = "getQueryCostRecord",
summary = "Get query cost record by id",
description = "Get query cost record by id",
responses = {
@ApiResponse(
responseCode = "200",
description = "The query cost record",
content =
@Content(
mediaType = "application/json",
schema = @Schema(implementation = QueryCostRecord.class)))
})
public QueryCostRecord get(
@Context SecurityContext securityContext,
@Parameter(description = "Get query cost record by id", schema = @Schema(type = "UUID"))
@PathParam("id")
UUID testCaseResolutionStatusId) {
QueryCostRecord costRecord = repository.getById(testCaseResolutionStatusId);
OperationContext queryCostOperationContext =
new OperationContext(Entity.QUERY, MetadataOperation.VIEW_ALL);
ResourceContextInterface queryResourceContext =
new ResourceContext<>(Entity.QUERY, costRecord.getQueryReference().getId(), null);
authorizer.authorize(securityContext, queryCostOperationContext, queryResourceContext);
return costRecord;
}
@POST
@Operation(
operationId = "createQueryCostRecord",
summary = "Create query cost record",
description = "Create query cost record",
responses = {
@ApiResponse(
responseCode = "200",
description = "Create query cost record",
content =
@Content(
mediaType = "application/json",
schema = @Schema(implementation = CreateQueryCostRecord.class)))
})
public Response create(
@Context UriInfo uriInfo,
@Context SecurityContext securityContext,
@Valid CreateQueryCostRecord createQueryCostRecord) {
OperationContext operationContext =
new OperationContext(Entity.QUERY, MetadataOperation.EDIT_QUERIES);
ResourceContextInterface queryResourceContext =
new ResourceContext<>(
Entity.QUERY, createQueryCostRecord.getQueryReference().getId(), null);
authorizer.authorize(securityContext, operationContext, queryResourceContext);
QueryCostRecord queryCostRecord =
mapper.createToEntity(createQueryCostRecord, securityContext.getUserPrincipal().getName());
return create(queryCostRecord, queryCostRecord.getQueryReference().getFullyQualifiedName());
}
}

View File

@ -18,6 +18,7 @@ import org.openmetadata.schema.entity.data.Metric;
import org.openmetadata.schema.entity.data.MlModel; import org.openmetadata.schema.entity.data.MlModel;
import org.openmetadata.schema.entity.data.Pipeline; import org.openmetadata.schema.entity.data.Pipeline;
import org.openmetadata.schema.entity.data.Query; import org.openmetadata.schema.entity.data.Query;
import org.openmetadata.schema.entity.data.QueryCostRecord;
import org.openmetadata.schema.entity.data.StoredProcedure; import org.openmetadata.schema.entity.data.StoredProcedure;
import org.openmetadata.schema.entity.data.Table; import org.openmetadata.schema.entity.data.Table;
import org.openmetadata.schema.entity.data.Topic; import org.openmetadata.schema.entity.data.Topic;
@ -59,6 +60,7 @@ import org.openmetadata.service.search.indexes.MlModelIndex;
import org.openmetadata.service.search.indexes.MlModelServiceIndex; import org.openmetadata.service.search.indexes.MlModelServiceIndex;
import org.openmetadata.service.search.indexes.PipelineIndex; import org.openmetadata.service.search.indexes.PipelineIndex;
import org.openmetadata.service.search.indexes.PipelineServiceIndex; import org.openmetadata.service.search.indexes.PipelineServiceIndex;
import org.openmetadata.service.search.indexes.QueryCostRecordIndex;
import org.openmetadata.service.search.indexes.QueryIndex; import org.openmetadata.service.search.indexes.QueryIndex;
import org.openmetadata.service.search.indexes.RawCostAnalysisReportDataIndex; import org.openmetadata.service.search.indexes.RawCostAnalysisReportDataIndex;
import org.openmetadata.service.search.indexes.SearchEntityIndex; import org.openmetadata.service.search.indexes.SearchEntityIndex;
@ -97,6 +99,7 @@ public class SearchIndexFactory {
case Entity.TAG -> new TagIndex((Tag) entity); case Entity.TAG -> new TagIndex((Tag) entity);
case Entity.CLASSIFICATION -> new ClassificationIndex((Classification) entity); case Entity.CLASSIFICATION -> new ClassificationIndex((Classification) entity);
case Entity.QUERY -> new QueryIndex((Query) entity); case Entity.QUERY -> new QueryIndex((Query) entity);
case Entity.QUERY_COST_RECORD -> new QueryCostRecordIndex((QueryCostRecord) entity);
case Entity.CONTAINER -> new ContainerIndex((Container) entity); case Entity.CONTAINER -> new ContainerIndex((Container) entity);
case Entity.DATABASE -> new DatabaseIndex((Database) entity); case Entity.DATABASE -> new DatabaseIndex((Database) entity);
case Entity.DATABASE_SCHEMA -> new DatabaseSchemaIndex((DatabaseSchema) entity); case Entity.DATABASE_SCHEMA -> new DatabaseSchemaIndex((DatabaseSchema) entity);

View File

@ -0,0 +1,34 @@
package org.openmetadata.service.search.indexes;
import java.util.Map;
import org.openmetadata.schema.entity.data.Query;
import org.openmetadata.schema.entity.data.QueryCostRecord;
import org.openmetadata.schema.type.EntityReference;
import org.openmetadata.schema.type.Include;
import org.openmetadata.service.Entity;
public class QueryCostRecordIndex implements SearchIndex {
final QueryCostRecord queryCostRecord;
public QueryCostRecordIndex(QueryCostRecord queryCostRecord) {
this.queryCostRecord = queryCostRecord;
}
public Map<String, Object> buildSearchIndexDocInternal(Map<String, Object> doc) {
EntityReference queryReference = queryCostRecord.getQueryReference();
Query query = Entity.getEntity(queryReference, "*", Include.NON_DELETED);
doc.put("query", query);
doc.put("service", query.getService());
doc.put("cost", queryCostRecord.getCost());
doc.put("count", queryCostRecord.getCount());
doc.put("timestamp", queryCostRecord.getTimestamp());
doc.put("@timestamp", queryCostRecord.getTimestamp());
doc.put("totalDuration", queryCostRecord.getTotalDuration());
return doc;
}
@Override
public Object getEntity() {
return queryCostRecord;
}
}

View File

@ -0,0 +1,509 @@
{
"settings": {
"index": {
"max_ngram_diff": 17
},
"analysis": {
"tokenizer": {
"n_gram_tokenizer": {
"type": "ngram",
"min_gram": 3,
"max_gram": 20,
"token_chars": [
"letter",
"digit"
]
}
},
"normalizer": {
"lowercase_normalizer": {
"type": "custom",
"char_filter": [],
"filter": [
"lowercase"
]
}
},
"analyzer": {
"om_analyzer": {
"tokenizer": "standard",
"filter": [
"lowercase",
"word_delimiter_filter",
"om_stemmer"
]
},
"om_ngram": {
"type": "custom",
"tokenizer": "n_gram_tokenizer",
"filter": [
"lowercase"
]
}
},
"filter": {
"om_stemmer": {
"type": "stemmer",
"name": "kstem"
},
"word_delimiter_filter": {
"type": "word_delimiter",
"preserve_original": true
}
}
}
},
"mappings": {
"properties": {
"query": {
"properties": {
"id": {
"type": "keyword"
},
"checksum": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"processedLineage": {
"type": "boolean"
},
"name": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"fullyQualifiedName": {
"type": "keyword",
"normalizer": "lowercase_normalizer"
},
"fqnParts": {
"type": "keyword"
},
"service": {
"properties": {
"id": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 36
}
}
},
"type": {
"type": "keyword"
},
"name": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"displayName": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"normalizer": "lowercase_normalizer",
"ignore_above": 256
}
}
},
"fullyQualifiedName": {
"type": "text"
},
"description": {
"type": "text"
},
"deleted": {
"type": "boolean"
},
"href": {
"type": "text"
}
}
},
"displayName": {
"type": "text",
"analyzer": "om_analyzer"
},
"description": {
"type": "text",
"analyzer": "om_analyzer",
"index_options": "docs"
},
"version": {
"type": "float"
},
"updatedAt": {
"type": "date",
"format": "epoch_second"
},
"updatedBy": {
"type": "text"
},
"href": {
"type": "text"
},
"duration": {
"type": "long"
},
"users": {
"properties": {
"id": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 36
}
}
},
"type": {
"type": "keyword"
},
"name": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"fullyQualifiedName": {
"type": "text"
},
"description": {
"type": "text"
},
"deleted": {
"type": "boolean"
},
"href": {
"type": "text"
}
}
},
"query": {
"type": "keyword"
},
"queryDate": {
"type": "long"
},
"owners": {
"properties": {
"id": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 36
}
}
},
"type": {
"type": "keyword"
},
"name": {
"type": "keyword",
"normalizer": "lowercase_normalizer",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"displayName": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"normalizer": "lowercase_normalizer",
"ignore_above": 256
}
}
},
"fullyQualifiedName": {
"type": "text"
},
"description": {
"type": "text"
},
"deleted": {
"type": "boolean"
},
"href": {
"type": "text"
}
}
},
"followers": {
"type": "keyword"
},
"tags": {
"properties": {
"tagFQN": {
"type": "keyword",
"normalizer": "lowercase_normalizer"
},
"labelType": {
"type": "keyword"
},
"description": {
"type": "text"
},
"source": {
"type": "keyword"
},
"state": {
"type": "keyword"
}
}
},
"queryUsedIn": {
"properties": {
"id": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 36
}
}
},
"type": {
"type": "keyword"
},
"name": {
"type": "keyword",
"normalizer": "lowercase_normalizer",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"displayName": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"fullyQualifiedName": {
"type": "text"
},
"description": {
"type": "text"
},
"deleted": {
"type": "boolean"
},
"href": {
"type": "text"
}
}
},
"entityType": {
"type": "keyword"
},
"suggest": {
"type": "completion"
},
"column_suggest": {
"type": "completion"
},
"service_suggest": {
"type": "completion"
},
"domain": {
"properties": {
"id": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 36
}
}
},
"type": {
"type": "keyword"
},
"name": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"displayName": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"normalizer": "lowercase_normalizer",
"ignore_above": 256
}
}
},
"fullyQualifiedName": {
"type": "keyword"
},
"description": {
"type": "text"
},
"deleted": {
"type": "boolean"
},
"href": {
"type": "text"
}
}
},
"tier": {
"properties": {
"description": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"labelType": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"name": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"source": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"state": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"tagFQN": {
"type": "keyword",
"normalizer": "lowercase_normalizer"
}
}
},
"totalVotes": {
"type": "long",
"null_value": 0
},
"votes": {
"type": "object"
}
}
},
"service": {
"properties": {
"id": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 36
}
}
},
"type": {
"type": "keyword"
},
"name": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"displayName": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"normalizer": "lowercase_normalizer",
"ignore_above": 256
}
}
},
"fullyQualifiedName": {
"type": "text"
},
"description": {
"type": "text"
},
"deleted": {
"type": "boolean"
},
"href": {
"type": "text"
}
}
},
"cost": {
"type": "float"
},
"timestamp": {
"type": "date"
},
"@timestamp": {
"type": "date"
},
"totalDuration": {
"type": "long"
},
"count": {
"type": "long"
}
}
}
}

View File

@ -299,5 +299,12 @@
"alias": "ingestionPipeline", "alias": "ingestionPipeline",
"parentAliases": [], "parentAliases": [],
"childAliases": [] "childAliases": []
},
"queryCostRecord": {
"indexName": "query_cost_record_search_index",
"indexMappingFile": "/elasticsearch/%s/query_cost_record_index_mapping.json",
"alias": "queryCostRecord",
"parentAliases": [],
"childAliases": []
} }
} }

View File

@ -0,0 +1,486 @@
{
"settings": {
"analysis": {
"normalizer": {
"lowercase_normalizer": {
"type": "custom",
"char_filter": [],
"filter": [
"lowercase"
]
}
},
"analyzer": {
"om_analyzer_jp": {
"tokenizer": "kuromoji_tokenizer",
"type": "custom",
"filter": [
"kuromoji_baseform",
"kuromoji_part_of_speech",
"kuromoji_number",
"kuromoji_stemmer"
]
}
},
"filter": {
"om_stemmer": {
"type": "stemmer",
"name": "english"
}
}
}
},
"mappings": {
"properties": {
"query": {
"properties": {
"id": {
"type": "keyword"
},
"checksum": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"processedLineage": {
"type": "boolean"
},
"name": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"fullyQualifiedName": {
"type": "keyword",
"normalizer": "lowercase_normalizer"
},
"fqnParts": {
"type": "keyword"
},
"service": {
"properties": {
"id": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 36
}
}
},
"type": {
"type": "keyword"
},
"name": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"displayName": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"normalizer": "lowercase_normalizer",
"ignore_above": 256
}
}
},
"fullyQualifiedName": {
"type": "text"
},
"description": {
"type": "text"
},
"deleted": {
"type": "boolean"
},
"href": {
"type": "text"
}
}
},
"displayName": {
"type": "text",
"analyzer": "om_analyzer"
},
"description": {
"type": "text",
"analyzer": "om_analyzer",
"index_options": "docs"
},
"version": {
"type": "float"
},
"updatedAt": {
"type": "date",
"format": "epoch_second"
},
"updatedBy": {
"type": "text"
},
"href": {
"type": "text"
},
"duration": {
"type": "long"
},
"users": {
"properties": {
"id": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 36
}
}
},
"type": {
"type": "keyword"
},
"name": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"fullyQualifiedName": {
"type": "text"
},
"description": {
"type": "text"
},
"deleted": {
"type": "boolean"
},
"href": {
"type": "text"
}
}
},
"query": {
"type": "keyword"
},
"queryDate": {
"type": "long"
},
"owners": {
"properties": {
"id": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 36
}
}
},
"type": {
"type": "keyword"
},
"name": {
"type": "keyword",
"normalizer": "lowercase_normalizer",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"displayName": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"normalizer": "lowercase_normalizer",
"ignore_above": 256
}
}
},
"fullyQualifiedName": {
"type": "text"
},
"description": {
"type": "text"
},
"deleted": {
"type": "boolean"
},
"href": {
"type": "text"
}
}
},
"followers": {
"type": "keyword"
},
"tags": {
"properties": {
"tagFQN": {
"type": "keyword",
"normalizer": "lowercase_normalizer"
},
"labelType": {
"type": "keyword"
},
"description": {
"type": "text"
},
"source": {
"type": "keyword"
},
"state": {
"type": "keyword"
}
}
},
"queryUsedIn": {
"properties": {
"id": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 36
}
}
},
"type": {
"type": "keyword"
},
"name": {
"type": "keyword",
"normalizer": "lowercase_normalizer",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"displayName": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"fullyQualifiedName": {
"type": "text"
},
"description": {
"type": "text"
},
"deleted": {
"type": "boolean"
},
"href": {
"type": "text"
}
}
},
"entityType": {
"type": "keyword"
},
"suggest": {
"type": "completion"
},
"column_suggest": {
"type": "completion"
},
"service_suggest": {
"type": "completion"
},
"domain": {
"properties": {
"id": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 36
}
}
},
"type": {
"type": "keyword"
},
"name": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"displayName": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"normalizer": "lowercase_normalizer",
"ignore_above": 256
}
}
},
"fullyQualifiedName": {
"type": "keyword"
},
"description": {
"type": "text"
},
"deleted": {
"type": "boolean"
},
"href": {
"type": "text"
}
}
},
"tier": {
"properties": {
"description": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"labelType": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"name": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"source": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"state": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"tagFQN": {
"type": "keyword",
"normalizer": "lowercase_normalizer"
}
}
},
"totalVotes": {
"type": "long",
"null_value": 0
},
"votes": {
"type": "object"
}
}
},
"service": {
"properties": {
"id": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 36
}
}
},
"type": {
"type": "keyword"
},
"name": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"displayName": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"normalizer": "lowercase_normalizer",
"ignore_above": 256
}
}
},
"fullyQualifiedName": {
"type": "text"
},
"description": {
"type": "text"
},
"deleted": {
"type": "boolean"
},
"href": {
"type": "text"
}
}
},
"cost": {
"type": "float"
},
"timestamp": {
"type": "date"
},
"@timestamp": {
"type": "date"
},
"count": {
"type": "long"
},
"totalDuration": {
"type": "long"
}
}
}
}

View File

@ -0,0 +1,491 @@
{
"settings": {
"analysis": {
"normalizer": {
"lowercase_normalizer": {
"type": "custom",
"char_filter": [],
"filter": [
"lowercase"
]
}
},
"analyzer": {
"om_analyzer": {
"tokenizer": "letter",
"filter": [
"lowercase",
"om_stemmer"
]
},
"om_ngram": {
"tokenizer": "ngram",
"min_gram": 2,
"max_gram": 3,
"filter": [
"lowercase"
]
}
},
"filter": {
"om_stemmer": {
"type": "stemmer",
"name": "english"
}
}
}
},
"mappings": {
"properties": {
"query": {
"properties": {
"id": {
"type": "keyword"
},
"checksum": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"processedLineage": {
"type": "boolean"
},
"name": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"fullyQualifiedName": {
"type": "keyword",
"normalizer": "lowercase_normalizer"
},
"fqnParts": {
"type": "keyword"
},
"service": {
"properties": {
"id": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 36
}
}
},
"type": {
"type": "keyword"
},
"name": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"displayName": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"normalizer": "lowercase_normalizer",
"ignore_above": 256
}
}
},
"fullyQualifiedName": {
"type": "text"
},
"description": {
"type": "text"
},
"deleted": {
"type": "boolean"
},
"href": {
"type": "text"
}
}
},
"displayName": {
"type": "text",
"analyzer": "om_analyzer"
},
"description": {
"type": "text",
"analyzer": "om_analyzer",
"index_options": "docs"
},
"version": {
"type": "float"
},
"updatedAt": {
"type": "date",
"format": "epoch_second"
},
"updatedBy": {
"type": "text"
},
"href": {
"type": "text"
},
"duration": {
"type": "long"
},
"users": {
"properties": {
"id": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 36
}
}
},
"type": {
"type": "keyword"
},
"name": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"fullyQualifiedName": {
"type": "text"
},
"description": {
"type": "text"
},
"deleted": {
"type": "boolean"
},
"href": {
"type": "text"
}
}
},
"query": {
"type": "keyword"
},
"queryDate": {
"type": "long"
},
"owners": {
"properties": {
"id": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 36
}
}
},
"type": {
"type": "keyword"
},
"name": {
"type": "keyword",
"normalizer": "lowercase_normalizer",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"displayName": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"normalizer": "lowercase_normalizer",
"ignore_above": 256
}
}
},
"fullyQualifiedName": {
"type": "text"
},
"description": {
"type": "text"
},
"deleted": {
"type": "boolean"
},
"href": {
"type": "text"
}
}
},
"followers": {
"type": "keyword"
},
"tags": {
"properties": {
"tagFQN": {
"type": "keyword",
"normalizer": "lowercase_normalizer"
},
"labelType": {
"type": "keyword"
},
"description": {
"type": "text"
},
"source": {
"type": "keyword"
},
"state": {
"type": "keyword"
}
}
},
"queryUsedIn": {
"properties": {
"id": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 36
}
}
},
"type": {
"type": "keyword"
},
"name": {
"type": "keyword",
"normalizer": "lowercase_normalizer",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"displayName": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"fullyQualifiedName": {
"type": "text"
},
"description": {
"type": "text"
},
"deleted": {
"type": "boolean"
},
"href": {
"type": "text"
}
}
},
"entityType": {
"type": "keyword"
},
"suggest": {
"type": "completion"
},
"column_suggest": {
"type": "completion"
},
"service_suggest": {
"type": "completion"
},
"domain": {
"properties": {
"id": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 36
}
}
},
"type": {
"type": "keyword"
},
"name": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"displayName": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"normalizer": "lowercase_normalizer",
"ignore_above": 256
}
}
},
"fullyQualifiedName": {
"type": "keyword"
},
"description": {
"type": "text"
},
"deleted": {
"type": "boolean"
},
"href": {
"type": "text"
}
}
},
"tier": {
"properties": {
"description": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"labelType": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"name": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"source": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"state": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"tagFQN": {
"type": "keyword",
"normalizer": "lowercase_normalizer"
}
}
},
"totalVotes": {
"type": "long",
"null_value": 0
},
"votes": {
"type": "object"
}
}
},
"service": {
"properties": {
"id": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 36
}
}
},
"type": {
"type": "keyword"
},
"name": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"displayName": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"normalizer": "lowercase_normalizer",
"ignore_above": 256
}
}
},
"fullyQualifiedName": {
"type": "text"
},
"description": {
"type": "text"
},
"deleted": {
"type": "boolean"
},
"href": {
"type": "text"
}
}
},
"cost": {
"type": "float"
},
"timestamp": {
"type": "date"
},
"@timestamp": {
"type": "date"
},
"count": {
"type": "long"
},
"totalDuration": {
"type": "long"
}
}
}
}

View File

@ -0,0 +1,42 @@
{
"$id": "https://open-metadata.org/schema/api/data/createQueryCostRecord.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "CreateQueryCostRecordRequest",
"description": "CreateQuery Cost Record",
"javaType": "org.openmetadata.schema.entity.data.CreateQueryCostRecord",
"type": "object",
"properties": {
"timestamp": {
"description": "Timestamp on which the failure was created.",
"$ref": "../../type/basic.json#/definitions/timestamp"
},
"jsonSchema": {
"description": "Json schema of the query",
"type": "string"
},
"queryReference": {
"description": "Query entity reference",
"$ref": "../../type/entityReference.json"
},
"cost": {
"description": "Avg query cost per execution",
"type": "number"
},
"count": {
"description": "Number of times the query was executed",
"type": "number"
},
"totalDuration": {
"description": "Total duration of the query",
"type": "number",
"default": 0
}
},
"required": [
"timestamp",
"queryReference",
"cost",
"count"
],
"additionalProperties": false
}

View File

@ -0,0 +1,58 @@
{
"$id": "https://open-metadata.org/schema/entity/data/queryCostRecord.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "QueryCostRecord",
"description": "Query Cost Record",
"javaType": "org.openmetadata.schema.entity.data.QueryCostRecord",
"javaInterfaces": [
"org.openmetadata.schema.EntityTimeSeriesInterface"
],
"type": "object",
"properties": {
"id": {
"description": "Unique identifier of this failure instance",
"$ref": "../../type/basic.json#/definitions/uuid"
},
"timestamp": {
"description": "Timestamp on which the failure was created.",
"$ref": "../../type/basic.json#/definitions/timestamp"
},
"jsonSchema": {
"description": "Json schema of the query",
"type": "string"
},
"queryReference": {
"description": "Query entity reference",
"$ref": "../../type/entityReference.json"
},
"cost": {
"description": "Avg query cost per execution",
"type": "number"
},
"updatedBy": {
"description": "User who updated the query cost record.",
"$ref": "../../type/entityReference.json",
"default": null
},
"updatedAt": {
"description": "Time when query cost record was updated.",
"$ref": "../../type/basic.json#/definitions/timestamp"
},
"count": {
"description": "Number of times the query was executed",
"type": "number"
},
"totalDuration": {
"description": "Total duration of the query",
"type": "number",
"default": 0
}
},
"required": [
"timestamp",
"queryReference",
"cost",
"count"
],
"additionalProperties": false
}

View File

@ -146,6 +146,12 @@
"supportsDataDiff": { "supportsDataDiff": {
"title": "Supports Data Diff Extraction.", "title": "Supports Data Diff Extraction.",
"$ref": "../connectionBasicType.json#/definitions/supportsDataDiff" "$ref": "../connectionBasicType.json#/definitions/supportsDataDiff"
},
"creditCost": {
"title": "Cost of Credit",
"description": "Cost of credit for the Snowflake account.",
"type": "number",
"default": 3.30
} }
}, },
"additionalProperties": false, "additionalProperties": false,

View File

@ -45,6 +45,12 @@
"description": "Configuration to set the file path for query logs", "description": "Configuration to set the file path for query logs",
"type": "string", "type": "string",
"title": "Query Log File Path" "title": "Query Log File Path"
},
"processQueryCostAnalysis": {
"description": "Configuration to process query cost",
"type": "boolean",
"default": true,
"title": "Process Query Cost"
} }
}, },
"additionalProperties": false "additionalProperties": false

View File

@ -65,6 +65,10 @@
"duration": { "duration": {
"description": "How long did the query took to run in milliseconds.", "description": "How long did the query took to run in milliseconds.",
"type": "number" "type": "number"
},
"cost": {
"description": "Cost of the query execution",
"type": "number"
} }
}, },
"required": ["sql", "serviceName", "tables"] "required": ["sql", "serviceName", "tables"]

View File

@ -58,6 +58,10 @@
"duration": { "duration": {
"description": "How long did the query took to run in milliseconds.", "description": "How long did the query took to run in milliseconds.",
"type": "number" "type": "number"
},
"cost": {
"description": "Cost of the query execution",
"type": "number"
} }
}, },
"required": ["query", "serviceName"] "required": ["query", "serviceName"]

View File

@ -35,6 +35,41 @@
} }
}, },
"additionalProperties": false "additionalProperties": false
},
"queryCostWrapper": {
"type": "object",
"properties": {
"cost": {
"description": "Avg query cost per execution",
"type": "number"
},
"count": {
"description": "Number of times the query was executed",
"type": "number"
},
"date": {
"description": "Date of execution of SQL query",
"type": "string"
},
"queryHash": {
"description": "Hash of the query",
"type": "string"
},
"query": {
"description": "SQL query",
"type": "string"
},
"dialect": {
"description": "SQL dialect",
"type": "string"
},
"totalDuration": {
"description": "Total duration of the query",
"type": "number"
}
},
"required": ["cost", "count"],
"additionalProperties": false
} }
}, },
"properties": { "properties": {

View File

@ -0,0 +1,92 @@
/*
* Copyright 2025 Collate.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* CreateQuery Cost Record
*/
export interface CreateQueryCostRecord {
/**
* Avg query cost per execution
*/
cost: number;
/**
* Number of times the query was executed
*/
count: number;
/**
* Json schema of the query
*/
jsonSchema?: string;
/**
* Query entity reference
*/
queryReference: EntityReference;
/**
* Timestamp on which the failure was created.
*/
timestamp: number;
/**
* Total duration of the query
*/
totalDuration?: number;
}
/**
* Query entity reference
*
* This schema defines the EntityReference type used for referencing an entity.
* EntityReference is used for capturing relationships from one entity to another. For
* example, a table has an attribute called database of type EntityReference that captures
* the relationship of a table `belongs to a` database.
*/
export interface EntityReference {
/**
* If true the entity referred to has been soft-deleted.
*/
deleted?: boolean;
/**
* Optional description of entity.
*/
description?: string;
/**
* Display Name that identifies this entity.
*/
displayName?: string;
/**
* Fully qualified name of the entity instance. For entities such as tables, databases
* fullyQualifiedName is returned in this field. For entities that don't have name hierarchy
* such as `user` and `team` this will be same as the `name` field.
*/
fullyQualifiedName?: string;
/**
* Link to the entity resource.
*/
href?: string;
/**
* Unique identifier that identifies an entity instance.
*/
id: string;
/**
* If true the relationship indicated by this entity reference is inherited from the parent
* entity.
*/
inherited?: boolean;
/**
* Name of the entity instance.
*/
name?: string;
/**
* Entity type/class name - Examples: `database`, `table`, `metrics`, `databaseService`,
* `dashboardService`...
*/
type: string;
}

View File

@ -0,0 +1,106 @@
/*
* Copyright 2025 Collate.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Query Cost Record
*/
export interface QueryCostRecord {
/**
* Avg query cost per execution
*/
cost: number;
/**
* Number of times the query was executed
*/
count: number;
/**
* Unique identifier of this failure instance
*/
id?: string;
/**
* Json schema of the query
*/
jsonSchema?: string;
/**
* Query entity reference
*/
queryReference: EntityReference;
/**
* Timestamp on which the failure was created.
*/
timestamp: number;
/**
* Total duration of the query
*/
totalDuration?: number;
/**
* Time when query cost record was updated.
*/
updatedAt?: number;
/**
* User who updated the query cost record.
*/
updatedBy?: EntityReference;
}
/**
* Query entity reference
*
* This schema defines the EntityReference type used for referencing an entity.
* EntityReference is used for capturing relationships from one entity to another. For
* example, a table has an attribute called database of type EntityReference that captures
* the relationship of a table `belongs to a` database.
*
* User who updated the query cost record.
*/
export interface EntityReference {
/**
* If true the entity referred to has been soft-deleted.
*/
deleted?: boolean;
/**
* Optional description of entity.
*/
description?: string;
/**
* Display Name that identifies this entity.
*/
displayName?: string;
/**
* Fully qualified name of the entity instance. For entities such as tables, databases
* fullyQualifiedName is returned in this field. For entities that don't have name hierarchy
* such as `user` and `team` this will be same as the `name` field.
*/
fullyQualifiedName?: string;
/**
* Link to the entity resource.
*/
href?: string;
/**
* Unique identifier that identifies an entity instance.
*/
id: string;
/**
* If true the relationship indicated by this entity reference is inherited from the parent
* entity.
*/
inherited?: boolean;
/**
* Name of the entity instance.
*/
name?: string;
/**
* Entity type/class name - Examples: `database`, `table`, `metrics`, `databaseService`,
* `dashboardService`...
*/
type: string;
}

View File

@ -30,6 +30,10 @@ export interface SnowflakeConnection {
clientSessionKeepAlive?: boolean; clientSessionKeepAlive?: boolean;
connectionArguments?: { [key: string]: any }; connectionArguments?: { [key: string]: any };
connectionOptions?: { [key: string]: string }; connectionOptions?: { [key: string]: string };
/**
* Cost of credit for the Snowflake account.
*/
creditCost?: number;
/** /**
* Database of the data source. This is optional parameter, if you would like to restrict * Database of the data source. This is optional parameter, if you would like to restrict
* the metadata reading to a single database. When left blank, OpenMetadata Ingestion * the metadata reading to a single database. When left blank, OpenMetadata Ingestion

View File

@ -18,6 +18,10 @@ export interface DatabaseServiceQueryUsagePipeline {
* Configuration the condition to filter the query history. * Configuration the condition to filter the query history.
*/ */
filterCondition?: string; filterCondition?: string;
/**
* Configuration to process query cost
*/
processQueryCostAnalysis?: boolean;
/** /**
* Configuration to tune how far we want to look back in query logs to process usage data. * Configuration to tune how far we want to look back in query logs to process usage data.
*/ */

View File

@ -18,6 +18,10 @@ export interface QueryParserData {
} }
export interface ParsedDataObject { export interface ParsedDataObject {
/**
* Cost of the query execution
*/
cost?: number;
/** /**
* Database associated with the table in the query * Database associated with the table in the query
*/ */

View File

@ -29,6 +29,10 @@ export interface TableQueryObject {
* Date of execution of SQL query * Date of execution of SQL query
*/ */
analysisDate?: Date; analysisDate?: Date;
/**
* Cost of the query execution
*/
cost?: number;
/** /**
* Database associated with the table in the query * Database associated with the table in the query
*/ */

View File

@ -87,6 +87,7 @@
"databaseService", "databaseService",
"dashboardService", "dashboardService",
"query", "query",
"queryCostResult",
"apiService", "apiService",
"searchIndex", "searchIndex",
"testCaseResult", "testCaseResult",