Add used by field in query entity (#13096)

This commit is contained in:
Mayur Singal 2023-09-11 17:30:34 +05:30 committed by GitHub
parent d0125ae296
commit 2e3e50f0cb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 144 additions and 57 deletions

View File

@ -81,6 +81,14 @@ class OMetaQueryMixin:
),
)
# Add Query used by
user_list = create_query.usedBy
if user_list:
self.client.put(
f"{self.get_suffix(Query)}/{model_str(query.id)}/usedBy",
data=json.dumps(user_list),
)
def get_entity_queries(
self, entity_id: Union[Uuid, str], fields: Optional[List[str]] = None
) -> Optional[List[Query]]:

View File

@ -18,7 +18,7 @@ import os
import shutil
import traceback
from pathlib import Path
from typing import Iterable
from typing import Iterable, List, Tuple
from metadata.config.common import ConfigModel
from metadata.generated.schema.api.data.createQuery import CreateQueryRequest
@ -26,7 +26,7 @@ from metadata.generated.schema.entity.services.connections.metadata.openMetadata
OpenMetadataConnection,
)
from metadata.generated.schema.entity.teams.user import User
from metadata.generated.schema.type.queryParserData import QueryParserData
from metadata.generated.schema.type.queryParserData import ParsedData, QueryParserData
from metadata.generated.schema.type.tableUsageCount import TableUsageCount
from metadata.ingestion.api.models import Either, StackTraceError
from metadata.ingestion.api.steps import Stage
@ -82,22 +82,24 @@ class TableUsageStage(Stage):
logger.info(f"Creating the directory to store staging data in {location}")
location.mkdir(parents=True, exist_ok=True)
def _get_user_entity(self, username: str):
def _get_user_entity(self, username: str) -> Tuple[List[str], List[str]]:
if username:
user = self.metadata.get_by_name(entity=User, fqn=username)
if user:
return [user.fullyQualifiedName.__root__]
return []
return [user.fullyQualifiedName.__root__], []
return [], [username]
def _add_sql_query(self, record, table):
users, used_by = self._get_user_entity(record.userName)
if self.table_queries.get((table, record.date)):
self.table_queries[(table, record.date)].append(
CreateQueryRequest(
query=record.sql,
query_type=record.query_type,
exclude_usage=record.exclude_usage,
users=self._get_user_entity(record.userName),
users=users,
queryDate=record.date,
usedBy=used_by,
duration=record.duration,
)
)
@ -107,12 +109,50 @@ class TableUsageStage(Stage):
query=record.sql,
query_type=record.query_type,
exclude_usage=record.exclude_usage,
users=self._get_user_entity(record.userName),
users=users,
queryDate=record.date,
usedBy=used_by,
duration=record.duration,
)
]
def _handle_table_usage(
self, parsed_data: ParsedData, table: str
) -> Iterable[Either[str]]:
table_joins = parsed_data.joins.get(table)
try:
self._add_sql_query(record=parsed_data, table=table)
table_usage_count = self.table_usage.get((table, parsed_data.date))
if table_usage_count is not None:
table_usage_count.count = table_usage_count.count + 1
if table_joins:
table_usage_count.joins.extend(table_joins)
else:
joins = []
if table_joins:
joins.extend(table_joins)
table_usage_count = TableUsageCount(
table=table,
databaseName=parsed_data.databaseName,
date=parsed_data.date,
joins=joins,
serviceName=parsed_data.serviceName,
sqlQueries=[],
databaseSchema=parsed_data.databaseSchema,
)
except Exception as exc:
yield Either(
left=StackTraceError(
name=table,
error=f"Error in staging record [{exc}]",
stack_trace=traceback.format_exc(),
)
)
self.table_usage[(table, parsed_data.date)] = table_usage_count
yield Either(right=table)
def _run(self, record: QueryParserData) -> Iterable[Either[str]]:
"""
Process the parsed data and store it in a file
@ -125,40 +165,9 @@ class TableUsageStage(Stage):
if parsed_data is None:
continue
for table in parsed_data.tables:
table_joins = parsed_data.joins.get(table)
try:
self._add_sql_query(record=parsed_data, table=table)
table_usage_count = self.table_usage.get((table, parsed_data.date))
if table_usage_count is not None:
table_usage_count.count = table_usage_count.count + 1
if table_joins:
table_usage_count.joins.extend(table_joins)
else:
joins = []
if table_joins:
joins.extend(table_joins)
table_usage_count = TableUsageCount(
table=table,
databaseName=parsed_data.databaseName,
date=parsed_data.date,
joins=joins,
serviceName=parsed_data.serviceName,
sqlQueries=[],
databaseSchema=parsed_data.databaseSchema,
)
except Exception as exc:
yield Either(
left=StackTraceError(
name=table,
error=f"Error in staging record [{exc}]",
stack_trace=traceback.format_exc(),
)
)
self.table_usage[(table, parsed_data.date)] = table_usage_count
yield Either(right=table)
yield from self._handle_table_usage(
parsed_data=parsed_data, table=table
)
self.dump_data_to_file()
def dump_data_to_file(self):

View File

@ -4,10 +4,7 @@ import static org.openmetadata.common.utils.CommonUtil.listOrEmpty;
import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty;
import static org.openmetadata.service.Entity.USER;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.*;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriInfo;
import lombok.SneakyThrows;
@ -23,11 +20,14 @@ import org.openmetadata.schema.type.Relationship;
import org.openmetadata.service.Entity;
import org.openmetadata.service.resources.query.QueryResource;
import org.openmetadata.service.util.EntityUtil;
import org.openmetadata.service.util.JsonUtils;
import org.openmetadata.service.util.RestUtil;
public class QueryRepository extends EntityRepository<Query> {
private static final String QUERY_USED_IN_FIELD = "queryUsedIn";
private static final String QUERY_USERS_FIELD = "users";
private static final String QUERY_USED_BY_FIELD = "usedBy";
private static final String QUERY_PATCH_FIELDS = "users,query,queryUsedIn";
private static final String QUERY_UPDATE_FIELDS = "users,queryUsedIn";
@ -135,6 +135,18 @@ public class QueryRepository extends EntityRepository<Query> {
return new RestUtil.PutResponse<>(Response.Status.CREATED, changeEvent, RestUtil.ENTITY_FIELDS_CHANGED);
}
public RestUtil.PutResponse<?> AddQueryUsedBy(
UriInfo uriInfo, String updatedBy, UUID queryId, List<String> userList) {
Query query = Entity.getEntity(Entity.QUERY, queryId, QUERY_UPDATE_FIELDS, Include.NON_DELETED);
Query oldQuery = JsonUtils.readValue(JsonUtils.pojoToJson(query), Query.class);
query.getUsedBy().addAll(userList);
ChangeEvent changeEvent =
getQueryChangeEvent(
updatedBy, QUERY_USERS_FIELD, oldQuery.getUsedBy(), query.getUsers(), withHref(uriInfo, query));
update(uriInfo, oldQuery, query);
return new RestUtil.PutResponse<>(Response.Status.CREATED, changeEvent, RestUtil.ENTITY_FIELDS_CHANGED);
}
public RestUtil.PutResponse<?> addQueryUsage(
UriInfo uriInfo, String updatedBy, UUID queryId, List<EntityReference> entityIds) {
Query query = Entity.getEntity(Entity.QUERY, queryId, QUERY_USED_IN_FIELD, Include.NON_DELETED);
@ -206,6 +218,7 @@ public class QueryRepository extends EntityRepository<Query> {
deleted,
EntityUtil.entityReferenceMatch);
// Store Query Used in Relation
recordChange("usedBy", original.getUsedBy(), updated.getUsedBy(), true);
storeQueryUsedIn(updated.getId(), added, deleted);
String originalChecksum = EntityUtil.hash(original.getQuery());
String updatedChecksum = EntityUtil.hash(updated.getQuery());

View File

@ -405,6 +405,29 @@ public class QueryResource extends EntityResource<Query, QueryRepository> {
return repository.AddQueryUser(uriInfo, securityContext.getUserPrincipal().getName(), id, userFqnList).toResponse();
}
@PUT
@Path("/{id}/usedBy")
@Operation(
operationId = "addQueryUsedBy",
summary = "Populate Used By Field",
description = "Add query users",
responses = {
@ApiResponse(
responseCode = "200",
description = "OK",
content = @Content(mediaType = "application/json", schema = @Schema(implementation = Query.class)))
})
public Response addQueryUsedBy(
@Context UriInfo uriInfo,
@Context SecurityContext securityContext,
@Parameter(description = "Id of the query", schema = @Schema(type = "UUID")) @PathParam("id") UUID id,
@Valid List<String> usedByList) {
OperationContext operationContext = new OperationContext(entityType, MetadataOperation.EDIT_ALL);
authorizer.authorize(securityContext, operationContext, getResourceContextById(id));
return repository.AddQueryUsedBy(uriInfo, securityContext.getUserPrincipal().getName(), id, usedByList)
.toResponse();
}
@DELETE
@Path("/{id}/usage")
@Operation(

View File

@ -57,6 +57,14 @@
"$ref" : "../../type/basic.json#/definitions/fullyQualifiedEntityName"
}
},
"usedBy" : {
"description": "List of users who ran the query but does not exist in OpenMetadata.",
"type": "array",
"items": {
"type": "string"
},
"uniqueItems": true
},
"queryDate": {
"description": "Date on which the query ran.",
"$ref": "../../type/basic.json#/definitions/timestamp"

View File

@ -91,6 +91,14 @@
"description": "Date on which the query ran.",
"$ref": "../../type/basic.json#/definitions/timestamp"
},
"usedBy" : {
"description": "List of users who ran the query but does not exist in OpenMetadata.",
"type": "array",
"items": {
"type": "string"
},
"uniqueItems": true
},
"tags": {
"description": "Tags for this SQL query.",
"type": "array",

View File

@ -14,7 +14,7 @@
import { Button, Card, Col, Row, Space, Typography } from 'antd';
import { DefaultOptionType } from 'antd/lib/select';
import classNames from 'classnames';
import { getTableTabPath, getUserPath, PIPE_SYMBOL } from 'constants/constants';
import { getTableTabPath, PIPE_SYMBOL } from 'constants/constants';
import { QUERY_DATE_FORMAT, QUERY_LINE_HEIGHT } from 'constants/Query.constant';
import { EntityType } from 'enums/entity.enum';
import { useClipboard } from 'hooks/useClipBoard';
@ -23,7 +23,7 @@ import { Duration } from 'luxon';
import Qs from 'qs';
import React, { FC, useMemo, useState } from 'react';
import { useTranslation } from 'react-i18next';
import { Link, useHistory, useLocation, useParams } from 'react-router-dom';
import { useHistory, useLocation, useParams } from 'react-router-dom';
import { customFormatDateTime } from 'utils/date-time/DateTimeUtils';
import { parseSearchParams } from 'utils/Query/QueryUtils';
import { getQueryPath } from 'utils/RouterUtils';
@ -177,17 +177,8 @@ const QueryCard: FC<QueryCardProp> = ({
{duration && (
<>
<Text>{duration}</Text>
<Text className="text-gray-400">{PIPE_SYMBOL}</Text>
</>
)}
{query.updatedBy && (
<Text>
{`${t('label.by-lowercase')} `}
<Link to={getUserPath(query.updatedBy)}>
{query.updatedBy}
</Link>
</Text>
)}
</Space>
}
onClick={handleCardClick}>

View File

@ -11,7 +11,9 @@
* limitations under the License.
*/
import Icon from '@ant-design/icons';
import { Button, Col, Drawer, Row, Space, Typography } from 'antd';
import { ReactComponent as IconUser } from 'assets/svg/user.svg';
import Description from 'components/common/description/Description';
import ProfilePicture from 'components/common/ProfilePicture/ProfilePicture';
import { UserTeamSelectableList } from 'components/common/UserTeamSelectableList/UserTeamSelectableList.component';
@ -175,7 +177,7 @@ const TableQueryRightPanel = ({
<Typography.Text
className="right-panel-label"
data-testid="users">
{t('label.used-by')}
{t('label.user-plural')}
</Typography.Text>
{query.users && query.users.length ? (
<Space wrap size={6}>
@ -203,6 +205,31 @@ const TableQueryRightPanel = ({
)}
</Space>
</Col>
<Col span={24}>
<Space className="m-b-md" direction="vertical" size={4}>
<Typography.Text
className="right-panel-label"
data-testid="used-by">
{t('label.used-by')}
</Typography.Text>
{query.usedBy && query.usedBy.length ? (
<Space wrap size={6}>
{query.usedBy.map((user) => (
<Space className="m-r-xss" key={user} size={4}>
<Icon component={IconUser} />
{user}
</Space>
))}
</Space>
) : (
<Typography.Paragraph className="m-b-0 text-grey-muted">
{t('label.no-entity', {
entity: t('label.used-by'),
})}
</Typography.Paragraph>
)}
</Space>
</Col>
</Row>
)}
</Drawer>