Part of #12998 - Query Service & Lineage filter processed queries (#13215)

* Update mapping

* Prep

* prep

* Prep query migration

* Add query index deletion fix

* Docs and Maven CI fix

* Fix tests

* Add service filter

* Add query entity FQN col migration

* Fix lint

* supported serviceFQN in query api

* Prep repo

* Prep ES query search

* Do not recompute lineage

* Format

* Fix test

---------

Co-authored-by: Ashish Gupta <ashish@getcollate.io>
This commit is contained in:
Pere Miquel Brull 2023-09-19 07:37:47 +02:00 committed by GitHub
parent 1895fbde5c
commit 22b0f44e38
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 502 additions and 40 deletions

View File

@ -93,3 +93,6 @@ SET json = JSON_INSERT(
JSON_EXTRACT(json, '$.sourceConfig.config.viewParsingTimeoutLimit')
)
WHERE JSON_EXTRACT(json, '$.pipelineType') = 'metadata';
-- Query Entity supports service, which requires FQN for name
ALTER TABLE query_entity CHANGE COLUMN nameHash fqnHash VARCHAR(256);

View File

@ -101,3 +101,6 @@ SET json = jsonb_set(
true
)
WHERE json #>> '{pipelineType}' = 'metadata';
-- Query Entity supports service, which requires FQN for name
ALTER TABLE query_entity RENAME COLUMN nameHash TO fqnHash;

View File

@ -14,14 +14,17 @@ Mixin class containing Lineage specific methods
To be used by OpenMetadata class
"""
import functools
import json
import traceback
from typing import Generic, List, Optional, Type, TypeVar
from typing import Generic, List, Optional, Set, Type, TypeVar
from pydantic import BaseModel
from requests.utils import quote
from metadata.generated.schema.api.createEventPublisherJob import (
CreateEventPublisherJob,
)
from metadata.generated.schema.entity.data.query import Query
from metadata.generated.schema.system.eventPublisherJob import EventPublisherResult
from metadata.ingestion.ometa.client import REST, APIError
from metadata.utils.elasticsearch import ES_INDEX_MAP
@ -139,3 +142,38 @@ class ESMixin(Generic[T]):
logger.debug(traceback.format_exc())
logger.debug(f"Failed to fetch reindex job status due to {err}")
return None
@staticmethod
def get_query_with_lineage_filter(service_name: str) -> str:
query_lineage_filter = {
"query": {
"bool": {
"must": [
{"term": {"processedLineage": True}},
{"term": {"service.name.keyword": service_name}},
]
}
}
}
return quote(json.dumps(query_lineage_filter))
@functools.lru_cache(maxsize=12)
def es_get_queries_with_lineage(self, service_name: str) -> Optional[Set[str]]:
"""Get a set of query checksums that have already been processed for lineage"""
try:
resp = self.client.get(
f"/search/query?q=&index={ES_INDEX_MAP[Query.__name__]}"
"&include_source_fields=checksum&include_source_fields="
f"processedLineage&query_filter={self.get_query_with_lineage_filter(service_name)}"
)
return {elem["_source"]["checksum"] for elem in resp["hits"]["hits"]}
except APIError as err:
logger.debug(traceback.format_exc())
logger.warning(f"Could not get queries from ES due to [{err}]")
return None
except Exception as err:
logger.debug(traceback.format_exc())
logger.warning(f"Unknown error extracting results from ES query [{err}]")
return None

View File

@ -254,8 +254,8 @@ class CommonDbSourceService(
:return: tables or views, depending on config
"""
schema_name = self.context.database_schema.name.__root__
try:
schema_name = self.context.database_schema.name.__root__
if self.source_config.includeTables:
for table_and_type in self.query_table_names_and_types(schema_name):
table_name = self.standardize_table_name(

View File

@ -14,14 +14,17 @@ Lineage Source Module
import csv
import traceback
from abc import ABC
from typing import Iterable, Iterator
from typing import Iterable, Iterator, Union
from metadata.generated.schema.api.data.createQuery import CreateQueryRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.type.basic import FullyQualifiedEntityName, SqlQuery
from metadata.generated.schema.type.tableQuery import TableQuery
from metadata.ingestion.api.models import Either
from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper
from metadata.ingestion.lineage.sql_lineage import get_lineage_by_query
from metadata.ingestion.source.database.query_parser_source import QueryParserSource
from metadata.utils import fqn
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
@ -99,7 +102,19 @@ class LineageSource(QueryParserSource, ABC):
logger.debug(traceback.format_exc())
logger.warning(f"Error processing query_dict {query_dict}: {exc}")
def _iter(self, *_, **__) -> Iterable[Either[AddLineageRequest]]:
def _query_already_processed(self, table_query: TableQuery) -> bool:
"""
Check if a query has already been processed by validating if exists
in ES with lineageProcessed as True
"""
checksums = self.metadata.es_get_queries_with_lineage(
service_name=table_query.serviceName,
)
return fqn.get_query_checksum(table_query.query) in checksums or {}
def _iter(
self, *_, **__
) -> Iterable[Either[Union[AddLineageRequest, CreateQueryRequest]]]:
"""
Based on the query logs, prepare the lineage
and send it to the sink
@ -107,15 +122,30 @@ class LineageSource(QueryParserSource, ABC):
connection_type = str(self.service_connection.type.value)
dialect = ConnectionTypeDialectMapper.dialect_of(connection_type)
for table_query in self.get_table_query():
lineages: Iterable[Either[AddLineageRequest]] = get_lineage_by_query(
self.metadata,
query=table_query.query,
service_name=table_query.serviceName,
database_name=table_query.databaseName,
schema_name=table_query.databaseSchema,
dialect=dialect,
timeout_seconds=self.source_config.parsingTimeoutLimit,
)
if not self._query_already_processed(table_query):
lineages: Iterable[Either[AddLineageRequest]] = get_lineage_by_query(
self.metadata,
query=table_query.query,
service_name=table_query.serviceName,
database_name=table_query.databaseName,
schema_name=table_query.databaseSchema,
dialect=dialect,
timeout_seconds=self.source_config.parsingTimeoutLimit,
)
for lineage_request in lineages or []:
yield lineage_request
for lineage_request in lineages or []:
yield lineage_request
# If we identified lineage properly, ingest the original query
if lineage_request.right:
yield Either(
right=CreateQueryRequest(
query=SqlQuery(__root__=table_query.query),
query_type=table_query.query_type,
duration=table_query.duration,
processedLineage=True,
service=FullyQualifiedEntityName(
__root__=self.config.serviceName
),
)
)

View File

@ -179,5 +179,6 @@ class StoredProcedureMixin:
type="storedProcedure",
),
processedLineage=bool(self.context.stored_procedure_query_lineage),
service=self.context.database_service.name.__root__,
)
)

View File

@ -101,6 +101,7 @@ class TableUsageStage(Stage):
queryDate=record.date,
usedBy=used_by,
duration=record.duration,
service=record.serviceName,
)
)
else:
@ -113,6 +114,7 @@ class TableUsageStage(Stage):
queryDate=record.date,
usedBy=used_by,
duration=record.duration,
service=record.serviceName,
)
]

View File

@ -13,6 +13,7 @@ Handle FQN building and splitting logic.
Filter information has been taken from the
ES indexes definitions
"""
import hashlib
import re
from typing import Dict, List, Optional, Type, TypeVar, Union
@ -33,6 +34,7 @@ from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
from metadata.generated.schema.entity.data.mlmodel import MlModel
from metadata.generated.schema.entity.data.pipeline import Pipeline
from metadata.generated.schema.entity.data.query import Query
from metadata.generated.schema.entity.data.searchIndex import SearchIndex
from metadata.generated.schema.entity.data.table import Column, DataModel, Table
from metadata.generated.schema.entity.data.topic import Topic
@ -433,6 +435,20 @@ def _(
return _build(service_name, "model", data_model_name)
@fqn_build_registry.add(Query)
def _(
_: Optional[OpenMetadata], # ES Index not necessary for dashboard FQN building
*,
service_name: str,
query_checksum: str,
) -> str:
if not service_name or not query_checksum:
raise FQNBuildingException(
f"Args should be informed, but got service=`{service_name}`, query_checksum=`{query_checksum}``"
)
return _build(service_name, query_checksum)
def split_table_name(table_name: str) -> Dict[str, Optional[str]]:
"""
Given a table name, try to extract database, schema and
@ -531,3 +547,11 @@ def search_table_from_es(
return get_entity_from_es_result(
entity_list=es_result, fetch_multiple_entities=fetch_multiple_entities
)
def get_query_checksum(query: str) -> str:
"""
Prepare the query checksum from its string representation.
The checksum is used as the query's name.
"""
return hashlib.md5(query.encode()).hexdigest()

View File

@ -15,14 +15,18 @@ import logging
import time
from unittest import TestCase
from requests.utils import quote
from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest
from metadata.generated.schema.api.data.createDatabaseSchema import (
CreateDatabaseSchemaRequest,
)
from metadata.generated.schema.api.data.createQuery import CreateQueryRequest
from metadata.generated.schema.api.data.createTable import CreateTableRequest
from metadata.generated.schema.api.services.createDatabaseService import (
CreateDatabaseServiceRequest,
)
from metadata.generated.schema.entity.data.query import Query
from metadata.generated.schema.entity.data.table import Column, DataType, Table
from metadata.generated.schema.entity.services.connections.database.common.basicAuth import (
BasicAuth,
@ -41,9 +45,12 @@ from metadata.generated.schema.entity.services.databaseService import (
from metadata.generated.schema.security.client.openMetadataJWTClientConfig import (
OpenMetadataJWTClientConfig,
)
from metadata.generated.schema.type.basic import SqlQuery
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils import fqn
QUERY_CHECKSUM = fqn.get_query_checksum("select * from awesome")
class OMetaESTest(TestCase):
"""
@ -75,6 +82,19 @@ class OMetaESTest(TestCase):
)
),
)
another_service = CreateDatabaseServiceRequest(
name="another-test-service-es",
serviceType=DatabaseServiceType.Mysql,
connection=DatabaseConnection(
config=MysqlConnection(
username="username",
authType=BasicAuth(
password="password",
),
hostPort="http://localhost:1234",
)
),
)
service_type = "databaseService"
@classmethod
@ -85,13 +105,23 @@ class OMetaESTest(TestCase):
logging.info("Checking ES index status...")
tries = 0
res = None
while not res and tries <= 5: # Kill in 5 seconds
res = cls.metadata.es_search_from_fqn(
table_res = None
query_res = None
while not table_res and not query_res and tries <= 5: # Kill in 5 seconds
table_res = cls.metadata.es_search_from_fqn(
entity_type=Table,
fqn_search_string="test-service-es.test-db-es.test-schema-es.test-es",
)
if not res:
query_res = cls.metadata.es_search_from_fqn(
entity_type=Query,
fqn_search_string=fqn.build(
metadata=None,
entity_type=Query,
service_name="test-service-es",
query_checksum=QUERY_CHECKSUM,
),
)
if not table_res or query_res:
tries += 1
time.sleep(1)
@ -125,6 +155,32 @@ class OMetaESTest(TestCase):
cls.entity = cls.metadata.create_or_update(create)
# Create queries for the given service
query = CreateQueryRequest(
query=SqlQuery(__root__="select * from awesome"),
service=cls.service_entity.fullyQualifiedName,
processedLineage=True, # Only 1 with processed lineage
)
cls.metadata.create_or_update(query)
query2 = CreateQueryRequest(
query=SqlQuery(__root__="select * from another_awesome"),
service=cls.service_entity.fullyQualifiedName,
)
cls.metadata.create_or_update(query2)
# Create queries for another service
cls.another_service_entity = cls.metadata.create_or_update(
data=cls.another_service
)
another_query = CreateQueryRequest(
query=SqlQuery(__root__="select * from awesome"),
service=cls.another_service_entity.fullyQualifiedName,
processedLineage=True,
)
cls.metadata.create_or_update(another_query)
# Leave some time for indexes to get updated, otherwise this happens too fast
cls.check_es_index()
@ -147,6 +203,19 @@ class OMetaESTest(TestCase):
hard_delete=True,
)
another_service_id = str(
cls.metadata.get_by_name(
entity=DatabaseService, fqn=cls.another_service.name.__root__
).id.__root__
)
cls.metadata.delete(
entity=DatabaseService,
entity_id=another_service_id,
recursive=True,
hard_delete=True,
)
# Disabling this test because it fails with
# this pr: https://github.com/open-metadata/OpenMetadata/pull/11879
# and failure is repoducible only with docker deployment
@ -211,3 +280,17 @@ class OMetaESTest(TestCase):
)
self.assertIsNone(res)
def test_get_query_with_lineage_filter(self):
"""Check we are building the proper filter"""
res = self.metadata.get_query_with_lineage_filter("my_service")
expected = (
'{"query": {"bool": {"must": [{"term": {"processedLineage": true}},'
' {"term": {"service.name.keyword": "my_service"}}]}}}'
)
self.assertEquals(res, quote(expected))
def test_get_queries_with_lineage(self):
"""Check the payload from ES"""
res = self.metadata.es_get_queries_with_lineage(self.service.name.__root__)
self.assertIn(QUERY_CHECKSUM, res)

View File

@ -63,6 +63,7 @@ from metadata.generated.schema.entity.teams.user import User
from metadata.generated.schema.security.client.openMetadataJWTClientConfig import (
OpenMetadataJWTClientConfig,
)
from metadata.generated.schema.type.basic import FullyQualifiedEntityName, SqlQuery
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.generated.schema.type.usageRequest import UsageRequest
from metadata.ingestion.ometa.ometa_api import OpenMetadata
@ -453,7 +454,10 @@ class OMetaTableTest(TestCase):
entity=Table, fqn=self.entity.fullyQualifiedName
)
query_no_user = CreateQueryRequest(query="select * from awesome")
query_no_user = CreateQueryRequest(
query=SqlQuery(__root__="select * from awesome"),
service=FullyQualifiedEntityName(__root__=self.service.name.__root__),
)
self.metadata.ingest_entity_queries_data(entity=res, queries=[query_no_user])
table_with_query: List[Query] = self.metadata.get_entity_queries(
@ -466,7 +470,9 @@ class OMetaTableTest(TestCase):
# Validate that we can properly add user information
query_with_user = CreateQueryRequest(
query="select * from awesome", users=[self.owner.fullyQualifiedName]
query="select * from awesome",
users=[self.owner.fullyQualifiedName],
service=FullyQualifiedEntityName(__root__=self.service.name.__root__),
)
self.metadata.ingest_entity_queries_data(entity=res, queries=[query_with_user])

View File

@ -90,16 +90,25 @@ pip install openmetadata-ingestion[<plugin>]==x.y.z
The `plugin` parameter is a list of the sources that we want to ingest. An example would look like this `openmetadata-ingestion[mysql,snowflake,s3]==1.1.1`.
You will find specific instructions for each connector [here](/connectors).
## 1.1.1 - Stable Release 🎉
## 1.2 - Stable Release 🎉
OpenMetadata 1.1 is a stable release. Please check the [release notes](/releases/latest-release).
OpenMetadata 1.2 is a stable release. Please check the [release notes](/releases/latest-release).
If you are upgrading production this is the recommended version to upgrade to.
## Deprecation Notice
- OpenMetadata only supports Python version 3.8 to 3.10.
## Breaking Changes for 1.1 Stable Release
## Breaking Changes for 1.2 Stable Release
### Query Entity
The Query Entity now has the `service` property, linking the Query to the Database Service that it belongs to. Note
that `service` is a required property both for the Query Entity and the Create Query Entity.
During the migrations, we pick up the service from the tables from `queryUsedIn`. If this information is not available,
then there is no way to link a query to a service and the query will be removed.
### Service Connection Changes

View File

@ -1893,7 +1893,7 @@ public interface CollectionDAO {
@Override
default String getNameHashColumn() {
return "nameHash";
return "fqnHash";
}
@Override

View File

@ -2,6 +2,7 @@ package org.openmetadata.service.jdbi3;
import static org.openmetadata.common.utils.CommonUtil.listOrEmpty;
import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty;
import static org.openmetadata.service.Entity.DATABASE_SERVICE;
import static org.openmetadata.service.Entity.USER;
import java.util.*;
@ -9,6 +10,7 @@ import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriInfo;
import lombok.SneakyThrows;
import org.openmetadata.schema.entity.data.Query;
import org.openmetadata.schema.entity.services.DatabaseService;
import org.openmetadata.schema.entity.teams.User;
import org.openmetadata.schema.type.ChangeDescription;
import org.openmetadata.schema.type.ChangeEvent;
@ -20,6 +22,7 @@ import org.openmetadata.schema.type.Relationship;
import org.openmetadata.service.Entity;
import org.openmetadata.service.resources.query.QueryResource;
import org.openmetadata.service.util.EntityUtil;
import org.openmetadata.service.util.FullyQualifiedName;
import org.openmetadata.service.util.JsonUtils;
import org.openmetadata.service.util.RestUtil;
@ -28,8 +31,8 @@ public class QueryRepository extends EntityRepository<Query> {
private static final String QUERY_USERS_FIELD = "users";
private static final String QUERY_USED_BY_FIELD = "usedBy";
private static final String QUERY_PATCH_FIELDS = "users,query,queryUsedIn";
private static final String QUERY_UPDATE_FIELDS = "users,queryUsedIn";
private static final String QUERY_PATCH_FIELDS = "users,query,queryUsedIn,processedLineage";
private static final String QUERY_UPDATE_FIELDS = "users,queryUsedIn,processedLineage";
public QueryRepository(CollectionDAO dao) {
super(
@ -43,6 +46,11 @@ public class QueryRepository extends EntityRepository<Query> {
supportsSearchIndex = true;
}
@Override
public void setFullyQualifiedName(Query query) {
query.setFullyQualifiedName(FullyQualifiedName.add(query.getService().getFullyQualifiedName(), query.getName()));
}
@Override
public Query setFields(Query entity, EntityUtil.Fields fields) {
entity.setQueryUsedIn(fields.contains(QUERY_USED_IN_FIELD) ? getQueryUsage(entity) : entity.getQueryUsedIn());
@ -76,6 +84,8 @@ public class QueryRepository extends EntityRepository<Query> {
entity.setName(checkSum);
}
entity.setUsers(EntityUtil.populateEntityReferences(entity.getUsers()));
DatabaseService service = Entity.getEntity(entity.getService(), "", Include.ALL);
entity.setService(service.getEntityReference());
}
@Override
@ -100,6 +110,9 @@ public class QueryRepository extends EntityRepository<Query> {
// Store Query Used in Relation
storeQueryUsedIn(queryEntity.getId(), queryEntity.getQueryUsedIn(), null);
// The service contains the query
addRelationship(
queryEntity.getService().getId(), queryEntity.getId(), DATABASE_SERVICE, Entity.QUERY, Relationship.CONTAINS);
}
@Override
@ -217,6 +230,8 @@ public class QueryRepository extends EntityRepository<Query> {
added,
deleted,
EntityUtil.entityReferenceMatch);
// Store processed Lineage
recordChange("processedLineage", original.getProcessedLineage(), updated.getProcessedLineage());
// Store Query Used in Relation
recordChange("usedBy", original.getUsedBy(), updated.getUsedBy(), true);
storeQueryUsedIn(updated.getId(), added, deleted);

View File

@ -0,0 +1,32 @@
package org.openmetadata.service.migration.mysql.v120;
import static org.openmetadata.service.migration.utils.v120.MigrationUtil.addQueryService;
import lombok.SneakyThrows;
import org.jdbi.v3.core.Handle;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.migration.api.MigrationProcessImpl;
import org.openmetadata.service.migration.utils.MigrationFile;
public class Migration extends MigrationProcessImpl {
private CollectionDAO collectionDAO;
private Handle handle;
public Migration(MigrationFile migrationFile) {
super(migrationFile);
}
@Override
public void initialize(Handle handle) {
super.initialize(handle);
this.handle = handle;
this.collectionDAO = handle.attach(CollectionDAO.class);
}
@Override
@SneakyThrows
public void runDataMigration() {
addQueryService(handle, collectionDAO);
}
}

View File

@ -0,0 +1,32 @@
package org.openmetadata.service.migration.postgres.v120;
import static org.openmetadata.service.migration.utils.v120.MigrationUtil.addQueryService;
import lombok.SneakyThrows;
import org.jdbi.v3.core.Handle;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.migration.api.MigrationProcessImpl;
import org.openmetadata.service.migration.utils.MigrationFile;
public class Migration extends MigrationProcessImpl {
private CollectionDAO collectionDAO;
private Handle handle;
public Migration(MigrationFile migrationFile) {
super(migrationFile);
}
@Override
public void initialize(Handle handle) {
super.initialize(handle);
this.handle = handle;
this.collectionDAO = handle.attach(CollectionDAO.class);
}
@Override
@SneakyThrows
public void runDataMigration() {
addQueryService(handle, collectionDAO);
}
}

View File

@ -0,0 +1,112 @@
package org.openmetadata.service.migration.utils.v120;
import javax.json.Json;
import javax.json.JsonObject;
import javax.json.JsonObjectBuilder;
import lombok.extern.slf4j.Slf4j;
import org.jdbi.v3.core.Handle;
import org.openmetadata.schema.entity.data.Query;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.jdbi3.QueryRepository;
import org.openmetadata.service.util.JsonUtils;
@Slf4j
public class MigrationUtil {
private MigrationUtil() {
/* Cannot create object util class*/
}
// Try to find the service for each of the queries by going through
// the table service hierarchy relationships
private static final String QUERY_LIST_SERVICE =
"SELECT "
+ " q.id AS query_id, "
+ " q.json AS query_json, "
+ " er_table_query.fromId AS table_id, "
+ " er_schema_table.fromId AS schema_id, "
+ " er_database_schema.fromId AS database_id, "
+ " er_service_database.fromId AS service_id, "
+ " db_service.name AS service_name "
+ "FROM query_entity q "
+ "LEFT JOIN entity_relationship er_table_query "
+ " ON er_table_query.fromEntity = 'table' "
+ " AND er_table_query.toEntity = 'query' "
+ " AND er_table_query.toId = q.id "
+ "LEFT JOIN entity_relationship er_schema_table "
+ " ON er_schema_table.fromEntity = 'databaseSchema' "
+ " AND er_schema_table.toEntity = 'table' "
+ " AND er_table_query.fromId = er_schema_table.toId "
+ "LEFT JOIN entity_relationship er_database_schema "
+ " ON er_database_schema.fromEntity = 'database' "
+ " AND er_database_schema.toEntity = 'databaseSchema' "
+ " AND er_schema_table.fromId = er_database_schema.toId "
+ "LEFT JOIN entity_relationship er_service_database "
+ " ON er_service_database.fromEntity = 'databaseService' "
+ " AND er_service_database.toEntity = 'database' "
+ " AND er_database_schema.fromId = er_service_database.toId "
+ "LEFT JOIN dbservice_entity db_service "
+ " ON db_service.id = er_service_database.fromId";
private static final String DELETE_QUERY = "DELETE FROM query_entity WHERE id = :id";
private static final String DELETE_RELATIONSHIP = "DELETE FROM entity_relationship WHERE fromId = :id or toId = :id";
/**
* Queries have a `queryUsedIn` field as a list of EntityRef. We'll pick up the first element of the list, since the
* tables should normally be in the same service, and: 1. Get the table from the ID 2. Identify the service 3. Update
* the Query.service EntityRef
*/
public static void addQueryService(Handle handle, CollectionDAO collectionDAO) {
QueryRepository queryRepository = new QueryRepository(collectionDAO);
try (handle) {
handle
.createQuery(QUERY_LIST_SERVICE)
.mapToMap()
.forEach(
row -> {
try {
JsonObject queryJson = JsonUtils.readJson((String) row.get("query_json")).asJsonObject();
String serviceName = (String) row.get("service_name");
String serviceId = (String) row.get("service_id");
if (serviceId == null) {
LOG.warn(
String.format(
"Query [%s] cannot be linked to a service. Deleting...", queryJson.getString("id")));
// We cannot directly call the queryRepository for deletion, since the Query object is missing
// the new `service` property we introduced and the `delete` operation would fail.
// We need to delete the query entry and the relationships from/to this ID by hand.
// It should be OK since queries are simple structures without any children. We should only
// have relationship table <> query & user <> query
handle.createUpdate(DELETE_QUERY).bind("id", queryJson.getString("id")).execute();
handle.createUpdate(DELETE_RELATIONSHIP).bind("id", queryJson.getString("id")).execute();
} else {
// Since the query does not have the service yet, it cannot be cast to the Query class.
JsonObject serviceJson =
Json.createObjectBuilder()
.add("id", serviceId)
.add("name", serviceName)
.add("fullyQualifiedName", serviceName)
.add("type", "databaseService")
.build();
JsonObjectBuilder queryWithService = Json.createObjectBuilder();
queryJson.forEach(queryWithService::add);
queryWithService.add("service", serviceJson);
Query query = JsonUtils.readValue(queryWithService.build().toString(), Query.class);
queryRepository.setFullyQualifiedName(query);
collectionDAO.queryDAO().update(query);
}
} catch (Exception ex) {
LOG.warn(String.format("Error updating query [%s] due to [%s]", row, ex));
}
});
} catch (Exception ex) {
LOG.warn("Error running the query migration ", ex);
}
}
}

View File

@ -116,6 +116,9 @@ public class QueryResource extends EntityResource<Query, QueryRepository> {
@Parameter(description = "UUID of the entity for which to list the Queries", schema = @Schema(type = "UUID"))
@QueryParam("entityId")
UUID entityId,
@Parameter(description = "Filter Queries by service Fully Qualified Name", schema = @Schema(type = "string"))
@QueryParam("service")
String service,
@Parameter(description = "Limit the number queries returned. " + "(1 to 1000000, default = 10)")
@DefaultValue("10")
@Min(0)
@ -132,6 +135,7 @@ public class QueryResource extends EntityResource<Query, QueryRepository> {
if (!CommonUtil.nullOrEmpty(entityId)) {
filter.addQueryParam("entityId", entityId.toString());
}
filter.addQueryParam("service", service);
ResultList<Query> queries =
super.listInternal(uriInfo, securityContext, fieldsParam, filter, limitParam, before, after);
return PIIMasker.getQueries(queries, authorizer, securityContext);
@ -509,6 +513,7 @@ public class QueryResource extends EntityResource<Query, QueryRepository> {
return copy(new Query(), create, user)
.withTags(create.getTags())
.withQuery(create.getQuery())
.withService(getEntityReference(Entity.DATABASE_SERVICE, create.getService()))
.withDuration(create.getDuration())
.withVotes(new Votes().withUpVotes(0).withDownVotes(0))
.withUsers(getEntityReferences(USER, create.getUsers()))

View File

@ -343,7 +343,8 @@ public class ElasticSearchClientImpl implements SearchClient {
/* For backward-compatibility we continue supporting the deleted argument, this should be removed in future versions */
if (request.getIndex().equalsIgnoreCase("domain_search_index")
|| request.getIndex().equalsIgnoreCase("data_products_search_index")) {
|| request.getIndex().equalsIgnoreCase("data_products_search_index")
|| request.getIndex().equalsIgnoreCase("query_search_index")) {
searchSourceBuilder.query(QueryBuilders.boolQuery().must(searchSourceBuilder.query()));
} else {
searchSourceBuilder.query(

View File

@ -284,8 +284,6 @@ public class OpenSearchClientImpl implements SearchClient {
searchSourceBuilder = buildTableSearchBuilder(request.getQuery(), request.getFrom(), request.getSize());
break;
case "user_search_index":
searchSourceBuilder = buildUserOrTeamSearchBuilder(request.getQuery(), request.getFrom(), request.getSize());
break;
case "team_search_index":
searchSourceBuilder = buildUserOrTeamSearchBuilder(request.getQuery(), request.getFrom(), request.getSize());
break;
@ -337,7 +335,8 @@ public class OpenSearchClientImpl implements SearchClient {
/* For backward-compatibility we continue supporting the deleted argument, this should be removed in future versions */
if (request.getIndex().equalsIgnoreCase("domain_search_index")
|| request.getIndex().equalsIgnoreCase("data_products_search_index")) {
|| request.getIndex().equalsIgnoreCase("data_products_search_index")
|| request.getIndex().equalsIgnoreCase("query_search_index")) {
searchSourceBuilder.query(QueryBuilders.boolQuery().must(searchSourceBuilder.query()));
} else {
searchSourceBuilder.query(

View File

@ -40,6 +40,18 @@
"id": {
"type": "keyword"
},
"checksum": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"processedLineage": {
"type": "boolean"
},
"name": {
"type": "text",
"fields": {
@ -53,6 +65,43 @@
"type": "keyword",
"normalizer": "lowercase_normalizer"
},
"service": {
"properties": {
"id": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 36
}
}
},
"type": {
"type": "keyword"
},
"name": {
"type": "keyword",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"fullyQualifiedName": {
"type": "text"
},
"description": {
"type": "text"
},
"deleted": {
"type": "text"
},
"href": {
"type": "text"
}
}
},
"displayName": {
"type": "text",
"analyzer": "om_analyzer",

View File

@ -76,7 +76,8 @@ public class QueryResourceTest extends EntityResourceTest<Query, CreateQuery> {
.withQueryUsedIn(List.of(TABLE_REF))
.withQuery(QUERY)
.withDuration(0.0)
.withQueryDate(1673857635064L);
.withQueryDate(1673857635064L)
.withService(SNOWFLAKE_REFERENCE.getFullyQualifiedName());
}
@Override

View File

@ -81,8 +81,12 @@
"description": "Flag if this query has already been successfully processed for lineage",
"type": "boolean",
"default": false
},
"service": {
"description": "Link to the database service fully qualified name where this query has been run",
"$ref": "../../type/basic.json#/definitions/fullyQualifiedEntityName"
}
},
"required": ["query"],
"required": ["query", "service"],
"additionalProperties": false
}

View File

@ -119,8 +119,12 @@
"description": "Flag if this query has already been successfully processed for lineage",
"type": "boolean",
"default": false
},
"service": {
"description": "Link to the service this query belongs to.",
"$ref": "../../type/entityReference.json"
}
},
"required": ["name","query"],
"required": ["name", "query", "service"],
"additionalProperties": false
}

View File

@ -33,6 +33,11 @@ export const MOCK_QUERIES = [
],
previousVersion: 0.2,
},
service: {
id: '51286e5d-0590-457b-a1ec-bc53c1effa1ee',
type: 'databaseService',
fullyQualifiedName: 'redshift',
},
owner: {
id: '471353cb-f925-4c4e-be6c-14da2c0b00ce',
type: 'user',
@ -194,6 +199,11 @@ export const MOCK_QUERIES = [
fieldsDeleted: [],
previousVersion: 0.1,
},
service: {
id: '44c71a8d-130a-4857-aa88-23bf7e371d5ee',
type: 'databaseService',
fullyQualifiedName: 'redshift',
},
votes: {
upVotes: 1,
downVotes: 1,

View File

@ -40,7 +40,7 @@ import { useParams } from 'react-router-dom';
import { searchData } from 'rest/miscAPI';
import { postQuery } from 'rest/queryAPI';
import { getTableDetailsByFQN } from 'rest/tableAPI';
import { getCurrentUserId } from 'utils/CommonUtils';
import { getCurrentUserId, getPartialNameFromFQN } from 'utils/CommonUtils';
import { getCurrentMillis } from 'utils/date-time/DateTimeUtils';
import { getEntityBreadcrumbs, getEntityName } from 'utils/EntityUtils';
import { showErrorToast, showSuccessToast } from 'utils/ToastUtils';
@ -54,7 +54,6 @@ const AddQueryPage = () => {
TitleBreadcrumbProps['titleLinks']
>([]);
const [description, setDescription] = useState<string>('');
const [sqlQuery, setSqlQuery] = useState<string>('');
const [table, setTable] = useState<Table>();
const [initialOptions, setInitialOptions] = useState<DefaultOptionType[]>();
const [isSaving, setIsSaving] = useState(false);
@ -133,7 +132,7 @@ const AddQueryPage = () => {
history.back();
};
const handleSubmit: FormProps['onFinish'] = async (values) => {
const handleSubmit: FormProps['onFinish'] = async (values): Promise<void> => {
const updatedValues: CreateQuery = {
...values,
description: isEmpty(description) ? undefined : description,
@ -152,6 +151,7 @@ const AddQueryPage = () => {
})),
],
queryDate: getCurrentMillis(),
service: getPartialNameFromFQN(datasetFQN, ['service']),
};
try {
@ -214,15 +214,14 @@ const AddQueryPage = () => {
field: t('label.sql-uppercase-query'),
}),
},
]}>
]}
trigger="onChange">
<SchemaEditor
className="custom-query-editor query-editor-h-200 custom-code-mirror-theme"
mode={{ name: CSMode.SQL }}
options={{
readOnly: false,
}}
value={sqlQuery}
onChange={(value) => setSqlQuery(value)}
/>
</Form.Item>
<Form.Item