feat(ingest): operational stats - show last updated for sql usage sources (#3845)

This commit is contained in:
Aditya Radhakrishnan 2022-01-07 15:27:09 -10:00 committed by GitHub
parent 0a6ec819cf
commit c3e98645a2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 837 additions and 64 deletions

View File

@ -96,6 +96,7 @@ import com.linkedin.datahub.graphql.resolvers.search.AutoCompleteForMultipleReso
import com.linkedin.datahub.graphql.resolvers.search.SearchResolver;
import com.linkedin.datahub.graphql.resolvers.type.EntityInterfaceTypeResolver;
import com.linkedin.datahub.graphql.resolvers.type.PlatformSchemaUnionTypeResolver;
import com.linkedin.datahub.graphql.types.common.mappers.OperationMapper;
import com.linkedin.datahub.graphql.types.dataset.mappers.DatasetProfileMapper;
import com.linkedin.datahub.graphql.types.mlmodel.MLFeatureTableType;
import com.linkedin.datahub.graphql.types.mlmodel.MLFeatureType;
@ -561,6 +562,14 @@ public class GmsGraphQLEngine {
DatasetProfileMapper::map
)
))
.dataFetcher("operations", new AuthenticatedResolver<>(
new TimeSeriesAspectResolver(
this.entityClient,
"dataset",
"operation",
OperationMapper::map
)
))
.dataFetcher("usageStats", new AuthenticatedResolver<>(new UsageTypeResolver()))
.dataFetcher("schemaMetadata", new AuthenticatedResolver<>(
new AspectResolver())

View File

@ -0,0 +1,43 @@
package com.linkedin.datahub.graphql.types.common.mappers;
import com.linkedin.common.Operation;
import com.linkedin.datahub.graphql.generated.OperationType;
import com.linkedin.common.urn.Urn;
import com.linkedin.datahub.graphql.types.mappers.TimeSeriesAspectMapper;
import com.linkedin.metadata.aspect.EnvelopedAspect;
import com.linkedin.metadata.utils.GenericAspectUtils;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
public class OperationMapper implements TimeSeriesAspectMapper<com.linkedin.datahub.graphql.generated.Operation> {
public static final OperationMapper INSTANCE = new OperationMapper();
public static com.linkedin.datahub.graphql.generated.Operation map(@Nonnull final EnvelopedAspect envelopedAspect) {
return INSTANCE.apply(envelopedAspect);
}
@Override
public com.linkedin.datahub.graphql.generated.Operation apply(@Nonnull final EnvelopedAspect envelopedAspect) {
Operation gmsProfile = GenericAspectUtils
.deserializeAspect(
envelopedAspect.getAspect().getValue(),
envelopedAspect.getAspect().getContentType(),
Operation.class);
final com.linkedin.datahub.graphql.generated.Operation result =
new com.linkedin.datahub.graphql.generated.Operation();
result.setTimestampMillis(gmsProfile.getTimestampMillis());
result.setLastUpdatedTimestamp(gmsProfile.getLastUpdatedTimestamp());
result.setActor(gmsProfile.getActor().toString());
result.setOperationType(OperationType.valueOf(OperationType.class, gmsProfile.getOperationType().toString()));
result.setNumAffectedRows(gmsProfile.getNumAffectedRows());
if (gmsProfile.hasAffectedDatasets()) {
result.setAffectedDatasets(gmsProfile.getAffectedDatasets().stream().map(Urn::toString).collect(Collectors.toList()));
}
return result;
}
}

View File

@ -553,6 +553,11 @@ type Dataset implements EntityWithRelationships & Entity {
If no start or end time are provided, the most recent events will be returned
"""
datasetProfiles(startTimeMillis: Long, endTimeMillis: Long, limit: Int): [DatasetProfile!]
"""
Operational events for an entity.
"""
operations(startTimeMillis: Long, endTimeMillis: Long, limit: Int): [Operation!]
"""
Edges extending from this entity
@ -3691,6 +3696,64 @@ type TimeWindow {
durationMillis: Long!
}
"""
Operational info for an entity.
"""
type Operation implements TimeSeriesAspect {
"""
The time at which the profile was reported
"""
timestampMillis: Long!
"""
When the entity was last updated.
"""
lastUpdatedTimestamp: Long!
"""
Actor who issued this operation.
"""
actor: String
"""
Operation type of change.
"""
operationType: OperationType!
"""
How many rows were affected by this operation.
"""
numAffectedRows: Long
"""
Which other datasets were affected by this operation.
"""
affectedDatasets: [String!]
}
"""
Enum to define the operation type when an entity changes.
"""
enum OperationType {
"""
When data is inserted.
"""
INSERT
"""
When data is updated.
"""
UPDATE
"""
When data is deleted.
"""
DELETE
"""
When data is created.
"""
CREATE
}
"""
Information about Metadata Entity deprecation status
"""
@ -5043,4 +5106,4 @@ type SubTypes {
The sub-types that this entity implements. e.g. Datasets that are views will implement the "view" subtype
"""
typeNames: [String!]
}
}

View File

@ -396,6 +396,7 @@ export const dataset3 = {
editableSchemaMetadata: null,
deprecation: null,
usageStats: null,
operations: null,
datasetProfiles: [
{
rowCount: 10,

View File

@ -130,7 +130,8 @@ export class DatasetEntity implements Entity<Dataset> {
visible: (_, _1) => true,
enabled: (_, dataset: GetDatasetQuery) =>
(dataset?.dataset?.datasetProfiles?.length || 0) > 0 ||
(dataset?.dataset?.usageStats?.buckets?.length || 0) > 0,
(dataset?.dataset?.usageStats?.buckets?.length || 0) > 0 ||
(dataset?.dataset?.operations?.length || 0) > 0,
},
},
]}

View File

@ -2,7 +2,7 @@ import { Button, Typography } from 'antd';
import React from 'react';
import styled from 'styled-components';
import { GetDatasetQuery } from '../../../../../../../graphql/dataset.generated';
import { DatasetProfile, UsageQueryResult } from '../../../../../../../types.generated';
import { DatasetProfile, Operation, UsageQueryResult } from '../../../../../../../types.generated';
import UsageFacepile from '../../../../../dataset/profile/UsageFacepile';
import { ANTD_GRAY } from '../../../../constants';
import { useBaseEntity, useRouteToTab } from '../../../../EntityContext';
@ -29,17 +29,35 @@ const StatsRow = styled.div`
`;
const INFO_ITEM_WIDTH_PX = '150px';
const LAST_UPDATED_WIDTH_PX = '220px';
export const SidebarStatsSection = () => {
const baseEntity = useBaseEntity<GetDatasetQuery>();
const toLocalDateTimeString = (time: number) => {
const date = new Date(time);
return date.toLocaleString([], {
year: 'numeric',
month: 'numeric',
day: 'numeric',
hour: '2-digit',
minute: '2-digit',
timeZoneName: 'short',
});
};
const hasUsageStats = baseEntity?.dataset?.usageStats !== undefined;
const hasDatasetProfiles = baseEntity?.dataset?.datasetProfiles !== undefined;
const hasOperations = (baseEntity?.dataset?.operations?.length || 0) > 0;
const usageStats = (hasUsageStats && (baseEntity?.dataset?.usageStats as UsageQueryResult)) || undefined;
const datasetProfiles =
(hasDatasetProfiles && (baseEntity?.dataset?.datasetProfiles as Array<DatasetProfile>)) || undefined;
const latestProfile = datasetProfiles && datasetProfiles[0];
const operations = (hasOperations && (baseEntity?.dataset?.operations as Array<Operation>)) || undefined;
const latestOperation = operations && operations[0];
const lastUpdated = latestOperation && toLocalDateTimeString(latestOperation?.timestampMillis);
const routeToTab = useRouteToTab();
@ -85,6 +103,18 @@ export const SidebarStatsSection = () => {
</InfoItem>
) : null}
</StatsRow>
{/* Operation Entry */}
<StatsRow>
{latestOperation?.timestampMillis ? (
<InfoItem
title="Last Updated"
onClick={() => routeToTab({ tabName: 'Queries' })}
width={LAST_UPDATED_WIDTH_PX}
>
<HeaderInfoBody>{lastUpdated}</HeaderInfoBody>
</InfoItem>
) : null}
</StatsRow>
</div>
);
};

View File

@ -1,7 +1,13 @@
import React, { useState } from 'react';
import { GetDatasetQuery } from '../../../../../../graphql/dataset.generated';
import { DatasetProfile, UsageQueryResult } from '../../../../../../types.generated';
import { DatasetProfile, Operation, UsageQueryResult } from '../../../../../../types.generated';
import { useBaseEntity } from '../../../EntityContext';
import {
toLocalDateString,
toLocalTimeString,
toLocalDateTimeString,
toUTCDateTimeString,
} from '../../../../../shared/time/timeUtils';
import HistoricalStats from './historical/HistoricalStats';
import { LOOKBACK_WINDOWS } from './lookbackWindows';
import ColumnStats from './snapshot/ColumnStats';
@ -9,16 +15,6 @@ import TableStats from './snapshot/TableStats';
import StatsHeader from './StatsHeader';
import { ViewType } from './viewType';
const toLocalDateString = (time: number) => {
const date = new Date(time);
return date.toLocaleDateString();
};
const toLocalTimeString = (time: number) => {
const date = new Date(time);
return date.toLocaleTimeString();
};
export default function StatsTab() {
const baseEntity = useBaseEntity<GetDatasetQuery>();
@ -27,6 +23,7 @@ export default function StatsTab() {
const hasUsageStats = baseEntity?.dataset?.usageStats !== undefined;
const hasDatasetProfiles = baseEntity?.dataset?.datasetProfiles !== undefined;
const hasOperations = baseEntity?.dataset?.operations !== undefined;
const usageStats = (hasUsageStats && (baseEntity?.dataset?.usageStats as UsageQueryResult)) || undefined;
const datasetProfiles =
@ -36,6 +33,11 @@ export default function StatsTab() {
const latestProfile = datasetProfiles && datasetProfiles[0]; // This is required for showing latest stats.
const urn = baseEntity && baseEntity.dataset && baseEntity.dataset?.urn;
// Used for rendering operation info.
const operations = (hasOperations && (baseEntity?.dataset?.operations as Array<Operation>)) || undefined;
const latestOperation = operations && operations[0];
const lastUpdated = latestOperation && toLocalDateTimeString(latestOperation?.timestampMillis);
const lastUpdatedUTC = latestOperation && toUTCDateTimeString(latestOperation?.timestampMillis);
// Okay so if we are disabled, we don't have both or the other. Let's render
// const emptyView = <Empty description="TODO: Stats!" image={Empty.PRESENTED_IMAGE_SIMPLE} />;
@ -66,6 +68,8 @@ export default function StatsTab() {
columnCount={latestProfile?.columnCount || undefined}
queryCount={usageStats?.aggregations?.totalSqlQueries || undefined}
users={usageStats?.aggregations?.users || undefined}
lastUpdated={lastUpdated || undefined}
lastUpdatedUTC={lastUpdatedUTC || undefined}
/>
<ColumnStats columnStats={(latestProfile && latestProfile.fieldProfiles) || []} />
</>

View File

@ -1,4 +1,4 @@
import { Typography } from 'antd';
import { Tooltip, Typography } from 'antd';
import React from 'react';
import styled from 'styled-components';
import { Maybe, UserUsageCounts } from '../../../../../../../types.generated';
@ -11,6 +11,8 @@ type Props = {
columnCount?: number;
queryCount?: number;
users?: Array<Maybe<UserUsageCounts>>;
lastUpdated?: string;
lastUpdatedUTC?: string;
};
const StatSection = styled.div`
@ -26,7 +28,7 @@ const StatContainer = styled.div<{ justifyContent }>`
padding: 12px 2px;
`;
export default function TableStats({ rowCount, columnCount, queryCount, users }: Props) {
export default function TableStats({ rowCount, columnCount, queryCount, users, lastUpdated, lastUpdatedUTC }: Props) {
// If there are less than 4 items, simply stack the stat views.
const justifyContent = !queryCount && !users ? 'default' : 'space-between';
@ -62,6 +64,15 @@ export default function TableStats({ rowCount, columnCount, queryCount, users }:
</div>
</InfoItem>
)}
{lastUpdated && (
<InfoItem title="Last Updated" width="220px">
<Tooltip title={lastUpdatedUTC}>
<Typography.Text strong style={{ fontSize: 16 }}>
{lastUpdated}
</Typography.Text>
</Tooltip>
</InfoItem>
)}
</StatContainer>
</StatSection>
);

View File

@ -57,3 +57,38 @@ export const getFixedLookbackWindow = (windowSize: TimeWindowSize): TimeWindow =
endTime,
};
};
export const toLocalDateString = (timeMs: number) => {
const date = new Date(timeMs);
return date.toLocaleDateString();
};
export const toLocalTimeString = (timeMs: number) => {
const date = new Date(timeMs);
return date.toLocaleTimeString();
};
export const toLocalDateTimeString = (timeMs: number) => {
const date = new Date(timeMs);
return date.toLocaleString([], {
year: 'numeric',
month: 'numeric',
day: 'numeric',
hour: '2-digit',
minute: '2-digit',
timeZoneName: 'short',
});
};
export const toUTCDateTimeString = (timeMs: number) => {
const date = new Date(timeMs);
return date.toLocaleString([], {
year: 'numeric',
month: 'numeric',
day: 'numeric',
hour: '2-digit',
minute: '2-digit',
timeZone: 'UTC',
timeZoneName: 'short',
});
};

View File

@ -104,6 +104,9 @@ query getDataset($urn: String!) {
sampleValues
}
}
operations(limit: 1) {
timestampMillis
}
incoming: relationships(
input: { types: ["DownstreamOf", "Consumes", "Produces"], direction: INCOMING, start: 0, count: 100 }
) {

View File

@ -95,7 +95,7 @@ plugins: Dict[str, Set[str]] = {
"athena": sql_common | {"PyAthena[SQLAlchemy]"},
"azure-ad": set(),
"bigquery": sql_common | bigquery_common | {"pybigquery >= 0.6.0"},
"bigquery-usage": bigquery_common | {"cachetools"},
"bigquery-usage": bigquery_common | {"cachetools", "more-itertools>=8.12.0"},
"datahub-business-glossary": set(),
"dbt": {"requests"},
"druid": sql_common | {"pydruid>=0.6.2"},

View File

@ -211,19 +211,20 @@ Note that a `.` is used to denote nested fields in the YAML recipe.
By default, we extract usage stats for the last day, with the recommendation that this source is executed every day.
| Field | Required | Default | Description |
| ---------------------- | -------- | -------------------------------------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `projects` | | | |
| `extra_client_options` | | | |
| `env` | | `"PROD"` | Environment to use in namespace when constructing URNs. |
| `start_time` | | Last full day in UTC (or hour, depending on `bucket_duration`) | Earliest date of usage logs to consider. |
| `end_time` | | Last full day in UTC (or hour, depending on `bucket_duration`) | Latest date of usage logs to consider. |
| `top_n_queries` | | `10` | Number of top queries to save to each table. |
| `extra_client_options` | | | Additional options to pass to `google.cloud.logging_v2.client.Client`. |
| `query_log_delay` | | | To account for the possibility that the query event arrives after the read event in the audit logs, we wait for at least `query_log_delay` additional events to be processed before attempting to resolve BigQuery job information from the logs. If `query_log_delay` is `None`, it gets treated as an unlimited delay, which prioritizes correctness at the expense of memory usage. |
| `max_query_duration` | | `15` | Correction to pad `start_time` and `end_time` with. For handling the case where the read happens within our time range but the query completion event is delayed and happens after the configured end time. |
| `table_pattern.allow` | | | List of regex patterns for tables to include in ingestion. |
| `table_pattern.deny` | | | List of regex patterns for tables to exclude in ingestion. |
| Field | Required | Default | Description |
|-----------------------------|----------|----------------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `projects` | | | |
| `extra_client_options` | | | |
| `env` | | `"PROD"` | Environment to use in namespace when constructing URNs. |
| `start_time` | | Last full day in UTC (or hour, depending on `bucket_duration`) | Earliest date of usage logs to consider. |
| `end_time` | | Last full day in UTC (or hour, depending on `bucket_duration`) | Latest date of usage logs to consider. |
| `top_n_queries` | | `10` | Number of top queries to save to each table. |
| `include_operational_stats` | | `true` | Whether to display operational stats. |
| `extra_client_options` | | | Additional options to pass to `google.cloud.logging_v2.client.Client`. |
| `query_log_delay` | | | To account for the possibility that the query event arrives after the read event in the audit logs, we wait for at least `query_log_delay` additional events to be processed before attempting to resolve BigQuery job information from the logs. If `query_log_delay` is `None`, it gets treated as an unlimited delay, which prioritizes correctness at the expense of memory usage. |
| `max_query_duration` | | `15` | Correction to pad `start_time` and `end_time` with. For handling the case where the read happens within our time range but the query completion event is delayed and happens after the configured end time. |
| `table_pattern.allow` | | | List of regex patterns for tables to include in ingestion. |
| `table_pattern.deny` | | | List of regex patterns for tables to exclude in ingestion. |
## Compatibility

View File

@ -223,17 +223,18 @@ Note that a `.` is used to denote nested fields in the YAML recipe.
By default, we extract usage stats for the last day, with the recommendation that this source is executed every day.
| Field | Required | Default | Description |
| --------------------------- | -------- | ---------------------------------------------------------------| --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
|-----------------------------|----------|----------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `username` | | | Redshift username. |
| `password` | | | Redshift password. |
| `host_port` | ✅ | | Redshift host URL. |
| `host_port` | ✅ | | Redshift host URL. |
| `database` | | | Redshift database. |
| `env` | | `"PROD"` | Environment to use in namespace when constructing URNs. |
| `options.<option>` | | | Any options specified here will be passed to SQLAlchemy's `create_engine` as kwargs.<br />See https://docs.sqlalchemy.org/en/14/core/engines.html#sqlalchemy.create_engine for details. |
| `email_domain` | ✅ | | Email domain of your organisation so users can be displayed on UI appropriately. |
| `email_domain` | ✅ | | Email domain of your organisation so users can be displayed on UI appropriately. |
| `start_time` | | Last full day in UTC (or hour, depending on `bucket_duration`) | Earliest date of usage to consider. |
| `end_time` | | Last full day in UTC (or hour, depending on `bucket_duration`) | Latest date of usage to consider. |
| `top_n_queries` | | `10` | Number of top queries to save to each table. |
| `include_operational_stats` | | `true` | Whether to display operational stats. |
| `bucket_duration` | | `"DAY"` | Size of the time window to aggregate usage stats. |
## Questions

View File

@ -193,23 +193,24 @@ Snowflake integration also supports prevention of redundant reruns for the same
Note that a `.` is used to denote nested fields in the YAML recipe.
| Field | Required | Default | Description |
|--------------------|----------|---------------------------------------------------------------------|----------------------------------------------------------------------------------|
| `username` | | | Snowflake username. |
| `password` | | | Snowflake password. |
| `host_port` | ✅ | | Snowflake host URL. |
| `warehouse` | | | Snowflake warehouse. |
| `role` | | | Snowflake role. |
| `env` | | `"PROD"` | Environment to use in namespace when constructing URNs. |
| `bucket_duration` | | `"DAY"` | Duration to bucket usage events by. Can be `"DAY"` or `"HOUR"`. |
| `email_domain` | | | Email domain of your organisation so users can be displayed on UI appropriately. |
| `start_time` | | Last full day in UTC (or hour, depending on `bucket_duration`) | Earliest date of usage logs to consider. |
| `end_time` | | Last full day in UTC (or hour, depending on `bucket_duration`) | Latest date of usage logs to consider. |
| `top_n_queries` | | `10` | Number of top queries to save to each table. |
| `database_pattern` | | `"^UTIL_DB$" `<br />`"^SNOWFLAKE$"`<br />`"^SNOWFLAKE_SAMPLE_DATA$" | Allow/deny patterns for db in snowflake dataset names. |
| `schema_pattern` | | | Allow/deny patterns for schema in snowflake dataset names. |
| `view_pattern` | | | Allow/deny patterns for views in snowflake dataset names. |
| `table_pattern` | | | Allow/deny patterns for tables in snowflake dataset names. |
| Field | Required | Default | Description |
|-----------------------------|----------|---------------------------------------------------------------------|----------------------------------------------------------------------------------|
| `username` | | | Snowflake username. |
| `password` | | | Snowflake password. |
| `host_port` | ✅ | | Snowflake host URL. |
| `warehouse` | | | Snowflake warehouse. |
| `role` | | | Snowflake role. |
| `env` | | `"PROD"` | Environment to use in namespace when constructing URNs. |
| `bucket_duration` | | `"DAY"` | Duration to bucket usage events by. Can be `"DAY"` or `"HOUR"`. |
| `email_domain` | | | Email domain of your organisation so users can be displayed on UI appropriately. |
| `start_time` | | Last full day in UTC (or hour, depending on `bucket_duration`) | Earliest date of usage logs to consider. |
| `end_time` | | Last full day in UTC (or hour, depending on `bucket_duration`) | Latest date of usage logs to consider. |
| `top_n_queries` | | `10` | Number of top queries to save to each table. |
| `include_operational_stats` | | `true` | Whether to display operational stats. |
| `database_pattern` | | `"^UTIL_DB$" `<br />`"^SNOWFLAKE$"`<br />`"^SNOWFLAKE_SAMPLE_DATA$" | Allow/deny patterns for db in snowflake dataset names. |
| `schema_pattern` | | | Allow/deny patterns for schema in snowflake dataset names. |
| `view_pattern` | | | Allow/deny patterns for views in snowflake dataset names. |
| `table_pattern` | | | Allow/deny patterns for tables in snowflake dataset names. |
:::caution

View File

@ -6,15 +6,27 @@ import logging
import re
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Any, Counter, Dict, Iterable, List, MutableMapping, Optional, Union
from typing import (
Any,
Counter,
Dict,
Iterable,
List,
MutableMapping,
Optional,
Union,
cast,
)
import cachetools
import pydantic
from google.cloud.logging_v2.client import Client as GCPLoggingClient
from more_itertools import partition
import datahub.emitter.mce_builder as builder
from datahub.configuration.common import AllowDenyPattern
from datahub.configuration.time_window_config import get_time_bucket
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit
@ -22,6 +34,11 @@ from datahub.ingestion.source.usage.usage_common import (
BaseUsageConfig,
GenericAggregatedDataset,
)
from datahub.metadata.schema_classes import (
ChangeTypeClass,
OperationClass,
OperationTypeClass,
)
from datahub.utilities.delayed_iter import delayed_iter
logger = logging.getLogger(__name__)
@ -88,6 +105,16 @@ timestamp >= "{start_time}"
AND
timestamp < "{end_time}"
""".strip()
OPERATION_STATEMENT_TYPES = {
"INSERT": OperationTypeClass.INSERT,
"UPDATE": OperationTypeClass.UPDATE,
"DELETE": OperationTypeClass.DELETE,
"MERGE": OperationTypeClass.UPDATE,
"CREATE": OperationTypeClass.CREATE,
"CREATE_TABLE_AS_SELECT": OperationTypeClass.CREATE,
"CREATE_SCHEMA": OperationTypeClass.CREATE,
"DROP_TABLE": OperationTypeClass.DROP,
}
@dataclass(frozen=True, order=True)
@ -192,7 +219,6 @@ class ReadEvent:
@classmethod
def from_entry(cls, entry: AuditLogEntry) -> "ReadEvent":
user = entry.payload["authenticationInfo"]["principalEmail"]
resourceName = entry.payload["resourceName"]
readInfo = entry.payload["metadata"]["tableDataRead"]
@ -233,6 +259,7 @@ class QueryEvent:
actor_email: str
query: str
statementType: Optional[str]
destinationTable: Optional[BigQueryTableRef]
referencedTables: Optional[List[BigQueryTableRef]]
jobName: Optional[str]
@ -262,6 +289,11 @@ class QueryEvent:
if rawDestTable:
destinationTable = BigQueryTableRef.from_spec_obj(rawDestTable)
try:
statementType = job["jobConfiguration"]["query"]["statementType"]
except KeyError:
statementType = None
rawRefTables = job["jobStatistics"].get("referencedTables")
referencedTables = None
if rawRefTables:
@ -273,6 +305,7 @@ class QueryEvent:
timestamp=entry.timestamp,
actor_email=user,
query=rawQuery,
statementType=statementType,
destinationTable=destinationTable,
referencedTables=referencedTables,
jobName=jobName,
@ -325,10 +358,16 @@ class QueryEvent:
BigQueryTableRef.from_string_name(spec) for spec in raw_ref_tables
]
try:
statementType = job["jobConfiguration"]["query"]["statementType"]
except KeyError:
statementType = None
query_event = QueryEvent(
timestamp=timestamp,
actor_email=user,
query=raw_query,
statementType=statementType,
destinationTable=destination_table,
referencedTables=referenced_tables,
jobName=job_name,
@ -394,7 +433,26 @@ class BigQueryUsageSource(Source):
def get_workunits(self) -> Iterable[MetadataWorkUnit]:
clients = self._make_bigquery_clients()
bigquery_log_entries = self._get_bigquery_log_entries(clients)
parsed_events = self._parse_bigquery_log_entries(bigquery_log_entries)
parsed_bigquery_log_events = self._parse_bigquery_log_entries(
bigquery_log_entries
)
parsed_events_uncasted: Iterable[Union[ReadEvent, QueryEvent, MetadataWorkUnit]]
last_updated_work_units_uncasted: Iterable[
Union[ReadEvent, QueryEvent, MetadataWorkUnit]
]
parsed_events_uncasted, last_updated_work_units_uncasted = partition(
lambda x: isinstance(x, MetadataWorkUnit), parsed_bigquery_log_events
)
parsed_events: Iterable[Union[ReadEvent, QueryEvent]] = cast(
Iterable[Union[ReadEvent, QueryEvent]], parsed_events_uncasted
)
last_updated_work_units: Iterable[MetadataWorkUnit] = cast(
Iterable[MetadataWorkUnit], last_updated_work_units_uncasted
)
if self.config.include_operational_stats:
for wu in last_updated_work_units:
self.report.report_workunit(wu)
yield wu
hydrated_read_events = self._join_events_by_job_id(parsed_events)
aggregated_info = self._aggregate_enriched_read_events(hydrated_read_events)
@ -497,9 +555,55 @@ class BigQueryUsageSource(Source):
yield entry
logger.info(f"Finished loading {i} log entries from BigQuery")
def _create_operation_aspect_work_unit(
self, event: QueryEvent
) -> Optional[MetadataWorkUnit]:
if event.statementType in OPERATION_STATEMENT_TYPES and event.destinationTable:
destination_table: BigQueryTableRef
try:
destination_table = event.destinationTable.remove_extras()
except Exception as e:
self.report.report_warning(
str(event.destinationTable),
f"Failed to clean up destination table, {e}",
)
return None
last_updated_timestamp: int = int(event.timestamp.timestamp() * 1000)
affected_datasets = []
if event.referencedTables:
for table in event.referencedTables:
try:
affected_datasets.append(
_table_ref_to_urn(table.remove_extras(), self.config.env)
)
except Exception as e:
self.report.report_warning(
str(table),
f"Failed to clean up table, {e}",
)
operation_aspect = OperationClass(
timestampMillis=last_updated_timestamp,
lastUpdatedTimestamp=last_updated_timestamp,
actor=builder.make_user_urn(event.actor_email.split("@")[0]),
operationType=OPERATION_STATEMENT_TYPES[event.statementType],
affectedDatasets=affected_datasets,
)
mcp = MetadataChangeProposalWrapper(
entityType="dataset",
aspectName="operation",
changeType=ChangeTypeClass.UPSERT,
entityUrn=_table_ref_to_urn(destination_table, self.config.env),
aspect=operation_aspect,
)
return MetadataWorkUnit(
id=f"operation-aspect-{destination_table}-{event.timestamp.isoformat()}",
mcp=mcp,
)
return None
def _parse_bigquery_log_entries(
self, entries: Iterable[AuditLogEntry]
) -> Iterable[Union[ReadEvent, QueryEvent]]:
) -> Iterable[Union[ReadEvent, QueryEvent, MetadataWorkUnit]]:
num_read_events: int = 0
num_query_events: int = 0
for entry in entries:
@ -511,6 +615,9 @@ class BigQueryUsageSource(Source):
elif QueryEvent.can_parse_entry(entry):
event = QueryEvent.from_entry(entry)
num_query_events += 1
wu = self._create_operation_aspect_work_unit(event)
if wu:
yield wu
else:
self.report.report_warning(
f"{entry.log_name}-{entry.insert_id}",

View File

@ -2,7 +2,7 @@ import collections
import dataclasses
import logging
from datetime import datetime
from typing import Dict, Iterable, List
from typing import Any, Dict, Iterable, List, Optional, Union
from dateutil import parser
from pydantic import Field
@ -12,6 +12,7 @@ from sqlalchemy.engine import Engine
import datahub.emitter.mce_builder as builder
from datahub.configuration.time_window_config import get_time_bucket
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.sql.redshift import RedshiftConfig
@ -19,6 +20,11 @@ from datahub.ingestion.source.usage.usage_common import (
BaseUsageConfig,
GenericAggregatedDataset,
)
from datahub.metadata.schema_classes import (
ChangeTypeClass,
OperationClass,
OperationTypeClass,
)
logger = logging.getLogger(__name__)
@ -93,7 +99,18 @@ class RedshiftUsageSource(Source):
def get_workunits(self) -> Iterable[MetadataWorkUnit]:
"""Gets Redshift usage stats as work units"""
access_events = self._get_redshift_history()
engine = self._make_sql_engine()
if self.config.include_operational_stats:
operation_aspect_work_units = (
self._get_all_operation_aspect_work_units_by_type(engine)
)
for operation_aspect_work_unit in operation_aspect_work_units:
yield operation_aspect_work_unit
access_events = self._get_redshift_history(
self._make_usage_query(redshift_usage_sql_comment), engine
)
# If the query results is empty, we don't want to proceed
if not access_events:
return []
@ -107,21 +124,85 @@ class RedshiftUsageSource(Source):
self.report.report_workunit(wu)
yield wu
def _make_usage_query(self) -> str:
return redshift_usage_sql_comment.format(
def _get_operation_aspect_work_units_by_type(
self, operation_type: Union[str, "OperationTypeClass"], engine: Engine
) -> Iterable[MetadataWorkUnit]:
if operation_type == OperationTypeClass.INSERT:
table_name = "stl_insert"
elif operation_type == OperationTypeClass.DELETE:
table_name = "stl_delete"
else:
return []
events = self._get_redshift_history(
self._make_redshift_operation_aspect_query(table_name), engine
)
if not events:
return []
access_events = self._get_joined_access_event(events)
work_units = self._aggregate_operation_aspect_events(
access_events, operation_type
)
for wu in work_units:
self.report.report_workunit(wu)
yield wu
def _get_all_operation_aspect_work_units_by_type(
self, engine: Engine
) -> Iterable[MetadataWorkUnit]:
insert_work_units = self._get_operation_aspect_work_units_by_type(
OperationTypeClass.INSERT, engine
)
for insert_work_unit in insert_work_units:
self.report.report_workunit(insert_work_unit)
yield insert_work_unit
delete_work_units = self._get_operation_aspect_work_units_by_type(
OperationTypeClass.DELETE, engine
)
for delete_work_unit in delete_work_units:
self.report.report_workunit(delete_work_unit)
yield delete_work_unit
def _make_usage_query(self, query: str) -> str:
return query.format(
start_time=self.config.start_time.strftime(redshift_datetime_format),
end_time=self.config.end_time.strftime(redshift_datetime_format),
)
def _make_redshift_operation_aspect_query(self, table_name: str) -> str:
return f"""
SELECT DISTINCT ss.userid,
ss.query,
ss.rows,
sui.usename,
ss.tbl,
sq.querytxt,
sti.database,
sti.schema,
sti.table,
sq.starttime,
sq.endtime,
sq.aborted
FROM {table_name} ss
JOIN svv_table_info sti ON ss.tbl = sti.table_id
JOIN stl_query sq ON ss.query = sq.query
JOIN svl_user_info sui ON sq.userid = sui.usesysid
WHERE ss.starttime >= '{self.config.start_time.strftime(redshift_datetime_format)}'
AND ss.starttime < '{self.config.end_time.strftime(redshift_datetime_format)}'
AND ss.rows > 0
AND sq.aborted = 0
ORDER BY ss.endtime DESC;
""".strip()
def _make_sql_engine(self) -> Engine:
url = self.config.get_sql_alchemy_url()
logger.debug(f"sql_alchemy_url = {url}")
engine = create_engine(url, **self.config.options)
return engine
def _get_redshift_history(self):
query = self._make_usage_query()
engine = self._make_sql_engine()
def _get_redshift_history(
self, query: str, engine: Engine
) -> Optional[Iterable[Any]]:
results = engine.execute(query)
events = []
for row in results:
@ -183,6 +264,44 @@ class RedshiftUsageSource(Source):
joined_access_events.append(RedshiftJoinedAccessEvent(**event_dict))
return joined_access_events
def _aggregate_operation_aspect_events(
self,
events: List[RedshiftJoinedAccessEvent],
operation_type: Union[str, "OperationTypeClass"],
) -> Iterable[MetadataWorkUnit]:
for event in events:
if (
event.database
and event.usename
and event.schema_
and event.table
and event.endtime
):
resource = f"{event.database}.{event.schema_}.{event.table}"
last_updated_timestamp: int = int(event.endtime.timestamp() * 1000)
user_email = event.usename
operation_aspect = OperationClass(
timestampMillis=last_updated_timestamp,
lastUpdatedTimestamp=last_updated_timestamp,
actor=builder.make_user_urn(user_email.split("@")[0]),
operationType=operation_type,
)
mcp = MetadataChangeProposalWrapper(
entityType="dataset",
aspectName="operation",
changeType=ChangeTypeClass.UPSERT,
entityUrn=builder.make_dataset_urn(
"redshift", resource.lower(), self.config.env
),
aspect=operation_aspect,
)
wu = MetadataWorkUnit(
id=f"operation-aspect-{event.table}-{event.endtime.isoformat()}",
mcp=mcp,
)
yield wu
def _aggregate_access_events(
self, events: List[RedshiftJoinedAccessEvent]
) -> Dict[datetime, Dict[RedshiftTableRef, AggregatedDataset]]:

View File

@ -12,6 +12,7 @@ from sqlalchemy.engine import Engine
import datahub.emitter.mce_builder as builder
from datahub.configuration.common import AllowDenyPattern
from datahub.configuration.time_window_config import get_time_bucket
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.source import SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit
@ -28,6 +29,11 @@ from datahub.ingestion.source.usage.usage_common import (
BaseUsageConfig,
GenericAggregatedDataset,
)
from datahub.metadata.schema_classes import (
ChangeTypeClass,
OperationClass,
OperationTypeClass,
)
logger = logging.getLogger(__name__)
@ -40,6 +46,9 @@ SELECT
access_history.query_start_time,
query_history.query_text,
query_history.query_type,
query_history.rows_inserted,
query_history.rows_updated,
query_history.rows_deleted,
access_history.base_objects_accessed,
access_history.direct_objects_accessed, -- when dealing with views, direct objects will show the view while base will show the underlying table
-- query_history.execution_status, -- not really necessary, but should equal "SUCCESS"
@ -65,6 +74,16 @@ ORDER BY query_start_time DESC
;
""".strip()
OPERATION_STATEMENT_TYPES = {
"INSERT": OperationTypeClass.INSERT,
"UPDATE": OperationTypeClass.UPDATE,
"DELETE": OperationTypeClass.DELETE,
"CREATE": OperationTypeClass.CREATE,
"CREATE_TABLE": OperationTypeClass.CREATE,
"CREATE_TABLE_AS_SELECT": OperationTypeClass.CREATE,
"CREATE_SCHEMA": OperationTypeClass.CREATE,
}
@pydantic.dataclasses.dataclass
class SnowflakeColumnReference:
@ -89,6 +108,9 @@ class SnowflakeJoinedAccessEvent(PermissiveModel):
query_start_time: datetime
query_text: str
query_type: str
rows_inserted: Optional[int]
rows_updated: Optional[int]
rows_deleted: Optional[int]
base_objects_accessed: List[SnowflakeObjectAccessEntry]
direct_objects_accessed: List[SnowflakeObjectAccessEntry]
@ -236,6 +258,13 @@ class SnowflakeUsageSource(StatefulIngestionSourceBase):
self._init_checkpoints()
# Generate the workunits.
access_events = self._get_snowflake_history()
if self.config.include_operational_stats:
operation_aspect_work_units = self._get_operation_aspect_work_units(
access_events
)
for wu in operation_aspect_work_units:
self.report.report_workunit(wu)
yield wu
aggregated_info = self._aggregate_access_events(access_events)
for time_bucket in aggregated_info.values():
@ -273,7 +302,7 @@ class SnowflakeUsageSource(StatefulIngestionSourceBase):
event_dict = dict(row)
# no use processing events that don't have a query text
if event_dict["query_text"] is None:
if not event_dict["query_text"]:
continue
def is_unsupported_object_accessed(obj: Dict[str, Any]) -> bool:
@ -356,6 +385,41 @@ class SnowflakeUsageSource(StatefulIngestionSourceBase):
"usage", f"Failed to parse usage line {event_dict}"
)
def _get_operation_aspect_work_units(
self, events: Iterable[SnowflakeJoinedAccessEvent]
) -> Iterable[MetadataWorkUnit]:
for event in events:
if event.query_start_time and event.query_type in OPERATION_STATEMENT_TYPES:
start_time = event.query_start_time
query_type = event.query_type
user_email = event.email
operation_type = OPERATION_STATEMENT_TYPES[query_type]
last_updated_timestamp: int = int(start_time.timestamp() * 1000)
user_urn = builder.make_user_urn(user_email.split("@")[0])
for obj in event.base_objects_accessed:
resource = obj.objectName
dataset_urn = builder.make_dataset_urn(
"snowflake", resource.lower(), self.config.env
)
operation_aspect = OperationClass(
timestampMillis=last_updated_timestamp,
lastUpdatedTimestamp=last_updated_timestamp,
actor=user_urn,
operationType=operation_type,
)
mcp = MetadataChangeProposalWrapper(
entityType="dataset",
aspectName="operation",
changeType=ChangeTypeClass.UPSERT,
entityUrn=dataset_urn,
aspect=operation_aspect,
)
wu = MetadataWorkUnit(
id=f"operation-aspect-{resource}-{start_time.isoformat()}",
mcp=mcp,
)
yield wu
def _aggregate_access_events(
self, events: Iterable[SnowflakeJoinedAccessEvent]
) -> Dict[datetime, Dict[SnowflakeTableRef, AggregatedDataset]]:

View File

@ -40,7 +40,10 @@ class GenericAggregatedDataset(Generic[ResourceType]):
query_trimmer_string: str = " ..."
def add_read_entry(
self, user_email: str, query: Optional[str], fields: List[str]
self,
user_email: str,
query: Optional[str],
fields: List[str],
) -> None:
self.readCount += 1
self.userFreq[user_email] += 1
@ -112,6 +115,7 @@ class GenericAggregatedDataset(Generic[ResourceType]):
class BaseUsageConfig(BaseTimeWindowConfig):
top_n_queries: pydantic.PositiveInt = 10
include_operational_stats: bool = True
@pydantic.validator("top_n_queries")
def ensure_top_n_queries_is_not_too_big(cls, v: int) -> int:

View File

@ -1,4 +1,147 @@
[
{
"auditHeader": null,
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,harshal-playground-306419.test_schema.austin311_derived,PROD)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "operation",
"aspect": {
"value": "{\"timestampMillis\": 1622073693816, \"lastUpdatedTimestamp\": 1622073693816, \"actor\": \"urn:li:corpuser:harshal\", \"operationType\": \"CREATE\", \"affectedDatasets\": [\"urn:li:dataset:(urn:li:dataPlatform:bigquery,bigquery-public-data.austin_311.311_service_requests,PROD)\"]}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,harshal-playground-306419.test_schema.austin311_derived,PROD)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "operation",
"aspect": {
"value": "{\"timestampMillis\": 1622074056868, \"lastUpdatedTimestamp\": 1622074056868, \"actor\": \"urn:li:corpuser:harshal\", \"operationType\": \"CREATE\", \"affectedDatasets\": [\"urn:li:dataset:(urn:li:dataPlatform:bigquery,bigquery-public-data.austin_311.311_service_requests,PROD)\"]}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,harshal-playground-306419.test_schema.austin311_derived,PROD)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "operation",
"aspect": {
"value": "{\"timestampMillis\": 1622075993220, \"lastUpdatedTimestamp\": 1622075993220, \"actor\": \"urn:li:corpuser:harshal\", \"operationType\": \"CREATE\", \"affectedDatasets\": [\"urn:li:dataset:(urn:li:dataPlatform:bigquery,bigquery-public-data.austin_311.311_service_requests,PROD)\"]}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,harshal-playground-306419.test_schema.excess_deaths_derived,PROD)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "operation",
"aspect": {
"value": "{\"timestampMillis\": 1622076153701, \"lastUpdatedTimestamp\": 1622076153701, \"actor\": \"urn:li:corpuser:harshal\", \"operationType\": \"CREATE\", \"affectedDatasets\": [\"urn:li:dataset:(urn:li:dataPlatform:bigquery,bigquery-public-data.covid19_nyt.excess_deaths,PROD)\"]}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,harshal-playground-306419.test_schema.excess_deaths_derived,PROD)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "operation",
"aspect": {
"value": "{\"timestampMillis\": 1622076935475, \"lastUpdatedTimestamp\": 1622076935475, \"actor\": \"urn:li:corpuser:harshal\", \"operationType\": \"CREATE\", \"affectedDatasets\": [\"urn:li:dataset:(urn:li:dataPlatform:bigquery,bigquery-public-data.covid19_nyt.excess_deaths,PROD)\"]}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,harshal-playground-306419.test_schema.excess_deaths_derived,PROD)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "operation",
"aspect": {
"value": "{\"timestampMillis\": 1622091452779, \"lastUpdatedTimestamp\": 1622091452779, \"actor\": \"urn:li:corpuser:harshal\", \"operationType\": \"CREATE\", \"affectedDatasets\": [\"urn:li:dataset:(urn:li:dataPlatform:bigquery,bigquery-public-data.covid19_nyt.excess_deaths,PROD)\"]}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,harshal-playground-306419.test_schema.excess_deaths_derived,PROD)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "operation",
"aspect": {
"value": "{\"timestampMillis\": 1622098252897, \"lastUpdatedTimestamp\": 1622098252897, \"actor\": \"urn:li:corpuser:harshal\", \"operationType\": \"CREATE\", \"affectedDatasets\": [\"urn:li:dataset:(urn:li:dataPlatform:bigquery,bigquery-public-data.covid19_nyt.excess_deaths,PROD)\"]}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,harshal-playground-306419.test_schema.excess_deaths_derived,PROD)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "operation",
"aspect": {
"value": "{\"timestampMillis\": 1622148197222, \"lastUpdatedTimestamp\": 1622148197222, \"actor\": \"urn:li:corpuser:harshal\", \"operationType\": \"CREATE\", \"affectedDatasets\": [\"urn:li:dataset:(urn:li:dataPlatform:bigquery,bigquery-public-data.covid19_nyt.excess_deaths,PROD)\"]}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,harshal-playground-306419.test_schema.excess_deaths_derived,PROD)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "operation",
"aspect": {
"value": "{\"timestampMillis\": 1622148734552, \"lastUpdatedTimestamp\": 1622148734552, \"actor\": \"urn:li:corpuser:harshal\", \"operationType\": \"CREATE\", \"affectedDatasets\": [\"urn:li:dataset:(urn:li:dataPlatform:bigquery,bigquery-public-data.covid19_nyt.excess_deaths,PROD)\"]}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,harshal-playground-306419.test_schema.excess_deaths_derived,PROD)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "operation",
"aspect": {
"value": "{\"timestampMillis\": 1622161940000, \"lastUpdatedTimestamp\": 1622161940000, \"actor\": \"urn:li:corpuser:harshal\", \"operationType\": \"CREATE\", \"affectedDatasets\": [\"urn:li:dataset:(urn:li:dataPlatform:bigquery,bigquery-public-data.covid19_nyt.excess_deaths,PROD)\"]}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,harshal-playground-306419.test_schema.excess_deaths_derived,PROD)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "operation",
"aspect": {
"value": "{\"timestampMillis\": 1622243780153, \"lastUpdatedTimestamp\": 1622243780153, \"actor\": \"urn:li:corpuser:harshal\", \"operationType\": \"CREATE\", \"affectedDatasets\": [\"urn:li:dataset:(urn:li:dataPlatform:bigquery,bigquery-public-data.covid19_nyt.excess_deaths,PROD)\"]}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "dataset",

View File

@ -1,4 +1,56 @@
[
{
"auditHeader": null,
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.users,PROD)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "operation",
"aspect": {
"value": "{\"timestampMillis\": 1631664000000, \"lastUpdatedTimestamp\": 1631664000000, \"actor\": \"urn:li:corpuser:test-name\", \"operationType\": \"INSERT\"}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:redshift,db1.schema1.category,PROD)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "operation",
"aspect": {
"value": "{\"timestampMillis\": 1631664000000, \"lastUpdatedTimestamp\": 1631664000000, \"actor\": \"urn:li:corpuser:real_shirshanka\", \"operationType\": \"INSERT\"}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.users,PROD)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "operation",
"aspect": {
"value": "{\"timestampMillis\": 1631664000000, \"lastUpdatedTimestamp\": 1631664000000, \"actor\": \"urn:li:corpuser:test-name\", \"operationType\": \"DELETE\"}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:redshift,db1.schema1.category,PROD)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "operation",
"aspect": {
"value": "{\"timestampMillis\": 1631664000000, \"lastUpdatedTimestamp\": 1631664000000, \"actor\": \"urn:li:corpuser:real_shirshanka\", \"operationType\": \"DELETE\"}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "dataset",

View File

@ -0,0 +1,43 @@
namespace com.linkedin.common
import com.linkedin.timeseries.TimeseriesAspectBase
/**
* Operational info for an entity.
*/
@Aspect = {
"name": "operation",
"type": "timeseries"
}
record Operation includes TimeseriesAspectBase {
/**
* When the entity was last updated.
*/
@TimeseriesField = {}
lastUpdatedTimestamp: long
/**
* Actor who issued this operation.
*/
@TimeseriesField = {}
actor: optional Urn
/**
* Operation type of change.
*/
@TimeseriesField = {}
operationType: OperationType
/**
* How many rows were affected by this operation.
*/
@TimeseriesField = {}
numAffectedRows: optional long
/**
* Which other datasets were affected by this operation.
*/
@TimeseriesFieldCollection = {"key":"datasetName"}
affectedDatasets: optional array[Urn]
}

View File

@ -0,0 +1,13 @@
namespace com.linkedin.common
/**
* Enum to define the operation type when an entity changes.
*/
enum OperationType {
INSERT
UPDATE
DELETE
CREATE
DROP
UNKNOWN
}

View File

@ -7,6 +7,7 @@ entities:
- subTypes
- datasetProfile
- datasetUsageStatistics
- operation
- name: dataHubPolicy
doc: DataHub Policies represent access policies granted to users or groups on metadata operations like edit, view etc.
keyAspect: dataHubPolicyKey

View File

@ -0,0 +1,11 @@
describe('operations', () => {
it('can visit dataset with operation aspect and verify last updated is present', () => {
cy.login();
cy.visit('/dataset/urn:li:dataset:(urn:li:dataPlatform:bigquery,test-project.bigquery_usage_logs.cypress_logging_events,PROD)/Stats?is_lineage_mode=false');
cy.contains('test-project.bigquery_usage_logs.cypress_logging_events');
// Last updated text is present
cy.contains('Last Updated')
});
})

View File

@ -1556,5 +1556,18 @@
}
},
"proposedDelta": null
}
},
{
"auditHeader": null,
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,test-project.bigquery_usage_logs.cypress_logging_events,PROD)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "operation",
"aspect": {
"value": "{\"timestampMillis\": 1622243780153, \"lastUpdatedTimestamp\": 1622243780153, \"actor\": \"urn:li:corpuser:harshal\", \"operationType\": \"CREATE\", \"affectedDatasets\": []}",
"contentType": "application/json"
},
"systemMetadata": null
}
]