feat: usage stats (part 2) (#2762)

Co-authored-by: Gabe Lyons <itsgabelyons@gmail.com>
This commit is contained in:
Harshal Sheth 2021-06-24 19:44:59 -07:00 committed by GitHub
parent 937f02c6bc
commit 19b2a42a00
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 9190 additions and 173 deletions

View File

@ -0,0 +1,24 @@
package com.linkedin.datahub.graphql.types.usage;
import com.linkedin.datahub.graphql.generated.FieldUsageCounts;
import com.linkedin.datahub.graphql.types.mappers.ModelMapper;
import javax.annotation.Nonnull;
public class FieldUsageCountsMapper implements ModelMapper<com.linkedin.usage.FieldUsageCounts, FieldUsageCounts> {
public static final FieldUsageCountsMapper INSTANCE = new FieldUsageCountsMapper();
public static FieldUsageCounts map(@Nonnull final com.linkedin.usage.FieldUsageCounts usageCounts) {
return INSTANCE.apply(usageCounts);
}
@Override
public FieldUsageCounts apply(@Nonnull final com.linkedin.usage.FieldUsageCounts usageCounts) {
FieldUsageCounts result = new FieldUsageCounts();
result.setCount(usageCounts.getCount());
result.setFieldName(usageCounts.getFieldName());
return result;
}
}

View File

@ -21,6 +21,10 @@ public class UsageAggregationMetricsMapper implements
result.setTotalSqlQueries(usageAggregationMetrics.getTotalSqlQueries()); result.setTotalSqlQueries(usageAggregationMetrics.getTotalSqlQueries());
result.setUniqueUserCount(usageAggregationMetrics.getUniqueUserCount()); result.setUniqueUserCount(usageAggregationMetrics.getUniqueUserCount());
result.setTopSqlQueries(usageAggregationMetrics.getTopSqlQueries()); result.setTopSqlQueries(usageAggregationMetrics.getTopSqlQueries());
if (usageAggregationMetrics.hasFields()) {
result.setFields(
usageAggregationMetrics.getFields().stream().map(FieldUsageCountsMapper::map).collect(Collectors.toList()));
}
if (usageAggregationMetrics.hasUsers()) { if (usageAggregationMetrics.hasUsers()) {
result.setUsers(usageAggregationMetrics.getUsers() result.setUsers(usageAggregationMetrics.getUsers()
.stream() .stream()

View File

@ -20,6 +20,10 @@ public class UsageQueryResultAggregationMapper implements
UsageQueryResultAggregations result = new UsageQueryResultAggregations(); UsageQueryResultAggregations result = new UsageQueryResultAggregations();
result.setTotalSqlQueries(pdlUsageResultAggregations.getTotalSqlQueries()); result.setTotalSqlQueries(pdlUsageResultAggregations.getTotalSqlQueries());
result.setUniqueUserCount(pdlUsageResultAggregations.getUniqueUserCount()); result.setUniqueUserCount(pdlUsageResultAggregations.getUniqueUserCount());
if (pdlUsageResultAggregations.hasFields()) {
result.setFields(
pdlUsageResultAggregations.getFields().stream().map(FieldUsageCountsMapper::map).collect(Collectors.toList()));
}
if (pdlUsageResultAggregations.hasUsers()) { if (pdlUsageResultAggregations.hasUsers()) {
result.setUsers(pdlUsageResultAggregations.getUsers() result.setUsers(pdlUsageResultAggregations.getUsers()
.stream() .stream()

View File

@ -2618,6 +2618,7 @@ type UsageQueryResult {
type UsageQueryResultAggregations { type UsageQueryResultAggregations {
uniqueUserCount: Int uniqueUserCount: Int
users: [UserUsageCounts] users: [UserUsageCounts]
fields: [FieldUsageCounts]
totalSqlQueries: Int totalSqlQueries: Int
} }
@ -2626,6 +2627,7 @@ type UsageAggregationMetrics {
users: [UserUsageCounts] users: [UserUsageCounts]
totalSqlQueries: Int totalSqlQueries: Int
topSqlQueries: [String] topSqlQueries: [String]
fields: [FieldUsageCounts]
} }
type UsageAggregation { type UsageAggregation {
@ -2635,6 +2637,11 @@ type UsageAggregation {
metrics: UsageAggregationMetrics metrics: UsageAggregationMetrics
} }
type FieldUsageCounts {
fieldName: String
count: Int
}
enum WindowDuration { enum WindowDuration {
DAY DAY
WEEK WEEK

View File

@ -82,6 +82,7 @@ export const DatasetProfile = ({ urn }: { urn: string }): JSX.Element => {
<SchemaView <SchemaView
urn={urn} urn={urn}
schema={schema} schema={schema}
usageStats={dataset.usageStats}
editableSchemaMetadata={editableSchemaMetadata} editableSchemaMetadata={editableSchemaMetadata}
updateEditableSchema={(update) => { updateEditableSchema={(update) => {
analytics.event({ analytics.event({

View File

@ -37,7 +37,6 @@ function getTopNQueries(responseSize: number, buckets?: Maybe<UsageAggregation>[
} }
export default function QueriesTab({ dataset }: Props) { export default function QueriesTab({ dataset }: Props) {
console.log(dataset.usageStats);
const topQueries = getTopNQueries(5, dataset.usageStats?.buckets); const topQueries = getTopNQueries(5, dataset.usageStats?.buckets);
if (topQueries.length === 0) { if (topQueries.length === 0) {

View File

@ -1,5 +1,5 @@
import React from 'react'; import React, { useMemo } from 'react';
import { Avatar } from 'antd'; import { Avatar, Tooltip } from 'antd';
import { UserUsageCounts } from '../../../../types.generated'; import { UserUsageCounts } from '../../../../types.generated';
export type Props = { export type Props = {
@ -7,10 +7,14 @@ export type Props = {
}; };
export default function UsageFacepile({ users }: Props) { export default function UsageFacepile({ users }: Props) {
const sortedUsers = useMemo(() => users?.slice().sort((a, b) => (b?.count || 0) - (a?.count || 0)), [users]);
return ( return (
<Avatar.Group maxCount={3}> <Avatar.Group maxCount={2}>
{users?.map((user) => ( {sortedUsers?.map((user) => (
<Tooltip title={user?.userEmail}>
<Avatar>{user?.userEmail?.charAt(0).toUpperCase()}</Avatar> <Avatar>{user?.userEmail?.charAt(0).toUpperCase()}</Avatar>
</Tooltip>
))} ))}
</Avatar.Group> </Avatar.Group>
); );

View File

@ -1,9 +1,12 @@
import React, { useState, useEffect } from 'react'; import React, { useState, useEffect, useMemo } from 'react';
import { geekblue } from '@ant-design/colors';
import { Button, Table, Typography } from 'antd'; import { Button, Table, Tooltip, Typography } from 'antd';
import { AlignType } from 'rc-table/lib/interface'; import { AlignType } from 'rc-table/lib/interface';
import styled from 'styled-components'; import styled from 'styled-components';
import { FetchResult } from '@apollo/client'; import { FetchResult } from '@apollo/client';
import { ColumnsType } from 'antd/lib/table';
import TypeIcon from './TypeIcon'; import TypeIcon from './TypeIcon';
import { import {
Schema, Schema,
@ -17,6 +20,7 @@ import {
EditableSchemaFieldInfoUpdate, EditableSchemaFieldInfoUpdate,
EntityType, EntityType,
GlossaryTerms, GlossaryTerms,
UsageQueryResult,
} from '../../../../../types.generated'; } from '../../../../../types.generated';
import TagTermGroup from '../../../../shared/tags/TagTermGroup'; import TagTermGroup from '../../../../shared/tags/TagTermGroup';
import { UpdateDatasetMutation } from '../../../../../graphql/dataset.generated'; import { UpdateDatasetMutation } from '../../../../../graphql/dataset.generated';
@ -38,8 +42,21 @@ const LighterText = styled(Typography.Text)`
color: rgba(0, 0, 0, 0.45); color: rgba(0, 0, 0, 0.45);
`; `;
const UsageBar = styled.div<{ width: number }>`
width: ${(props) => props.width}px;
height: 10px;
background-color: ${geekblue[3]};
border-radius: 2px;
`;
const UsageBarContainer = styled.div`
width: 100%;
height: 100%;
`;
export type Props = { export type Props = {
urn: string; urn: string;
usageStats?: UsageQueryResult | null;
schema?: Schema | null; schema?: Schema | null;
editableSchemaMetadata?: EditableSchemaMetadata | null; editableSchemaMetadata?: EditableSchemaMetadata | null;
updateEditableSchema: ( updateEditableSchema: (
@ -109,10 +126,17 @@ function convertEditableSchemaMetadataForUpdate(
}; };
} }
export default function SchemaView({ urn, schema, editableSchemaMetadata, updateEditableSchema }: Props) { const USAGE_BAR_MAX_WIDTH = 50;
export default function SchemaView({ urn, schema, editableSchemaMetadata, updateEditableSchema, usageStats }: Props) {
const [tagHoveredIndex, setTagHoveredIndex] = useState<string | undefined>(undefined); const [tagHoveredIndex, setTagHoveredIndex] = useState<string | undefined>(undefined);
const [showRaw, setShowRaw] = useState(false); const [showRaw, setShowRaw] = useState(false);
const [rows, setRows] = useState<Array<ExtendedSchemaFields>>([]); const [rows, setRows] = useState<Array<ExtendedSchemaFields>>([]);
const hasUsageStats = useMemo(() => (usageStats?.aggregations?.fields?.length || 0) > 0, [usageStats]);
const maxFieldUsageCount = useMemo(
() => Math.max(...(usageStats?.aggregations?.fields?.map((field) => field?.count || 0) || [])),
[usageStats],
);
useEffect(() => { useEffect(() => {
const fields = [...(schema?.fields || [])] as Array<ExtendedSchemaFields>; const fields = [...(schema?.fields || [])] as Array<ExtendedSchemaFields>;
@ -229,6 +253,24 @@ export default function SchemaView({ urn, schema, editableSchemaMetadata, update
); );
}; };
const usageStatsRenderer = (fieldPath: string) => {
const relevantUsageStats = usageStats?.aggregations?.fields?.find(
(fieldStats) => fieldStats?.fieldName === fieldPath,
);
if (!relevantUsageStats) {
return null;
}
return (
<Tooltip placement="topLeft" title={`${relevantUsageStats.count} queries / month`}>
<UsageBarContainer>
<UsageBar width={((relevantUsageStats.count || 0) / maxFieldUsageCount) * USAGE_BAR_MAX_WIDTH} />
</UsageBarContainer>
</Tooltip>
);
};
const descriptionColumn = { const descriptionColumn = {
title: 'Description', title: 'Description',
dataIndex: 'description', dataIndex: 'description',
@ -253,6 +295,20 @@ export default function SchemaView({ urn, schema, editableSchemaMetadata, update
}), }),
}; };
const usageColumn = {
width: 50,
title: 'Usage',
dataIndex: 'fieldPath',
key: 'usage',
render: usageStatsRenderer,
};
let allColumns: ColumnsType<SchemaField> = [...defaultColumns, descriptionColumn, tagAndTermColumn];
if (hasUsageStats) {
allColumns = [...allColumns, usageColumn];
}
const getRawSchema = (schemaValue) => { const getRawSchema = (schemaValue) => {
try { try {
return JSON.stringify(JSON.parse(schemaValue), null, 2); return JSON.stringify(JSON.parse(schemaValue), null, 2);
@ -280,7 +336,7 @@ export default function SchemaView({ urn, schema, editableSchemaMetadata, update
) : ( ) : (
rows.length > 0 && ( rows.length > 0 && (
<Table <Table
columns={[...defaultColumns, descriptionColumn, tagAndTermColumn]} columns={allColumns}
dataSource={rows} dataSource={rows}
rowKey="fieldPath" rowKey="fieldPath"
expandable={{ defaultExpandAllRows: true, expandRowByClick: true }} expandable={{ defaultExpandAllRows: true, expandRowByClick: true }}

View File

@ -131,6 +131,10 @@ query getDataset($urn: String!) {
count count
userEmail userEmail
} }
fields {
fieldName
count
}
} }
} }
} }

View File

@ -13,6 +13,18 @@
"namespace" : "com.linkedin.common", "namespace" : "com.linkedin.common",
"doc" : "Enum to define the length of a bucket when doing aggregations", "doc" : "Enum to define the length of a bucket when doing aggregations",
"symbols" : [ "YEAR", "MONTH", "WEEK", "DAY", "HOUR" ] "symbols" : [ "YEAR", "MONTH", "WEEK", "DAY", "HOUR" ]
}, {
"type" : "record",
"name" : "FieldUsageCounts",
"namespace" : "com.linkedin.usage",
"doc" : " Records field-level usage counts for a given resource ",
"fields" : [ {
"name" : "fieldName",
"type" : "string"
}, {
"name" : "count",
"type" : "int"
} ]
}, { }, {
"type" : "record", "type" : "record",
"name" : "UsageAggregation", "name" : "UsageAggregation",
@ -79,6 +91,14 @@
}, },
"doc" : " Frequent SQL queries; mostly makes sense for datasets in SQL databases ", "doc" : " Frequent SQL queries; mostly makes sense for datasets in SQL databases ",
"optional" : true "optional" : true
}, {
"name" : "fields",
"type" : {
"type" : "array",
"items" : "FieldUsageCounts"
},
"doc" : " Field-level usage stats ",
"optional" : true
} ] } ]
}, },
"doc" : " Metrics associated with this bucket " "doc" : " Metrics associated with this bucket "
@ -116,6 +136,13 @@
"items" : "UserUsageCounts" "items" : "UserUsageCounts"
}, },
"optional" : true "optional" : true
}, {
"name" : "fields",
"type" : {
"type" : "array",
"items" : "FieldUsageCounts"
},
"optional" : true
} ] } ]
}, },
"doc" : "Aggregated metrics. All fields are optional here, since they will be populated\nonly if the underlying buckets contain the data required to generate that aggregation." "doc" : "Aggregated metrics. All fields are optional here, since they will be populated\nonly if the underlying buckets contain the data required to generate that aggregation."

View File

@ -10,6 +10,8 @@ import com.linkedin.restli.server.annotations.Action;
import com.linkedin.restli.server.annotations.ActionParam; import com.linkedin.restli.server.annotations.ActionParam;
import com.linkedin.restli.server.annotations.RestLiSimpleResource; import com.linkedin.restli.server.annotations.RestLiSimpleResource;
import com.linkedin.restli.server.resources.SimpleResourceTemplate; import com.linkedin.restli.server.resources.SimpleResourceTemplate;
import com.linkedin.usage.FieldUsageCounts;
import com.linkedin.usage.FieldUsageCountsArray;
import com.linkedin.usage.UsageAggregation; import com.linkedin.usage.UsageAggregation;
import com.linkedin.usage.UsageAggregationArray; import com.linkedin.usage.UsageAggregationArray;
import com.linkedin.usage.UsageQueryResult; import com.linkedin.usage.UsageQueryResult;
@ -124,6 +126,29 @@ public class UsageStats extends SimpleResourceTemplate<UsageAggregation> {
} }
} }
// Compute aggregations for field usage counts.
{
Map<String, Integer> fieldAgg = new HashMap<>();
buckets.forEach((bucket) -> {
Optional.ofNullable(bucket.getMetrics().getFields()).ifPresent(fieldUsageCounts -> {
fieldUsageCounts.forEach((fieldCount -> {
String key = fieldCount.getFieldName();
int count = fieldAgg.getOrDefault(key, 0);
count += fieldCount.getCount();
fieldAgg.put(key, count);
}));
});
});
if (!fieldAgg.isEmpty()) {
FieldUsageCountsArray fields = new FieldUsageCountsArray();
fields.addAll(fieldAgg.entrySet().stream().map((mapping) -> new FieldUsageCounts()
.setFieldName(mapping.getKey())
.setCount(mapping.getValue())).collect(Collectors.toList()));
aggregations.setFields(fields);
}
}
return new UsageQueryResult() return new UsageQueryResult()
.setBuckets(buckets) .setBuckets(buckets)
.setAggregations(aggregations); .setAggregations(aggregations);

View File

@ -32,11 +32,12 @@ If you run into an error, try checking the [_common setup issues_](./developing.
We use a plugin architecture so that you can install only the dependencies you actually need. We use a plugin architecture so that you can install only the dependencies you actually need.
| Plugin Name | Install Command | Provides | | Plugin Name | Install Command | Provides |
| ------------- | ---------------------------------------------------------- | ----------------------------------- | | --------------- | ---------------------------------------------------------- | ----------------------------------- |
| file | _included by default_ | File source and sink | | file | _included by default_ | File source and sink |
| console | _included by default_ | Console sink | | console | _included by default_ | Console sink |
| athena | `pip install 'acryl-datahub[athena]'` | AWS Athena source | | athena | `pip install 'acryl-datahub[athena]'` | AWS Athena source |
| bigquery | `pip install 'acryl-datahub[bigquery]'` | BigQuery source | | bigquery | `pip install 'acryl-datahub[bigquery]'` | BigQuery source |
| bigquery-usage | `pip install 'acryl-datahub[bigquery-usage]'` | BigQuery usage statistics source |
| feast | `pip install 'acryl-datahub[feast]'` | Feast source | | feast | `pip install 'acryl-datahub[feast]'` | Feast source |
| glue | `pip install 'acryl-datahub[glue]'` | AWS Glue source | | glue | `pip install 'acryl-datahub[glue]'` | AWS Glue source |
| hive | `pip install 'acryl-datahub[hive]'` | Hive source | | hive | `pip install 'acryl-datahub[hive]'` | Hive source |
@ -47,6 +48,7 @@ We use a plugin architecture so that you can install only the dependencies you a
| redshift | `pip install 'acryl-datahub[redshift]'` | Redshift source | | redshift | `pip install 'acryl-datahub[redshift]'` | Redshift source |
| sqlalchemy | `pip install 'acryl-datahub[sqlalchemy]'` | Generic SQLAlchemy source | | sqlalchemy | `pip install 'acryl-datahub[sqlalchemy]'` | Generic SQLAlchemy source |
| snowflake | `pip install 'acryl-datahub[snowflake]'` | Snowflake source | | snowflake | `pip install 'acryl-datahub[snowflake]'` | Snowflake source |
| snowflake-usage | `pip install 'acryl-datahub[snowflake-usage]'` | Snowflake usage statistics source |
| superset | `pip install 'acryl-datahub[superset]'` | Superset source | | superset | `pip install 'acryl-datahub[superset]'` | Superset source |
| mongodb | `pip install 'acryl-datahub[mongodb]'` | MongoDB source | | mongodb | `pip install 'acryl-datahub[mongodb]'` | MongoDB source |
| ldap | `pip install 'acryl-datahub[ldap]'` ([extra requirements]) | LDAP source | | ldap | `pip install 'acryl-datahub[ldap]'` ([extra requirements]) | LDAP source |
@ -451,6 +453,12 @@ source:
# table_pattern/schema_pattern is same as above # table_pattern/schema_pattern is same as above
``` ```
:::tip
You can also get fine-grained usage statistics for BigQuery using the `bigquery-usage` source.
:::
### AWS Athena `athena` ### AWS Athena `athena`
Extracts: Extracts:
@ -766,6 +774,75 @@ sink:
schema_registry_config: {} # passed to https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#confluent_kafka.schema_registry.SchemaRegistryClient schema_registry_config: {} # passed to https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#confluent_kafka.schema_registry.SchemaRegistryClient
``` ```
### Google BigQuery Usage Stats `bigquery-usage`
- Fetch a list of queries issued
- Fetch a list of tables and columns accessed
- Aggregate these statistics into buckets, by day or hour granularity
Note: the client must have one of the following OAuth scopes:
- https://www.googleapis.com/auth/logging.read
- https://www.googleapis.com/auth/logging.admin
- https://www.googleapis.com/auth/cloud-platform.read-only
- https://www.googleapis.com/auth/cloud-platform
```yml
source:
type: bigquery-usage
config:
project_id: project # optional - can autodetect from environment
options:
# See https://googleapis.dev/python/logging/latest/client.html for details.
credentials: ~ # optional - see docs
env: PROD
bucket_duration: "DAY"
start_time: ~ # defaults to the last full day in UTC (or hour)
end_time: ~ # defaults to the last full day in UTC (or hour)
top_n_queries: 10 # number of queries to save for each table
```
:::tip
This source only does usage statistics. To get the tables, views, and schemas in your BigQuery project, use the `bigquery` source.
:::
### Snowflake Usage Stats `snowflake-usage`
- Fetch a list of queries issued
- Fetch a list of tables and columns accessed (excludes views)
- Aggregate these statistics into buckets, by day or hour granularity
Note: the user/role must have access to the account usage table. The "accountadmin" role has this by default, and other roles can be granted this permission: https://docs.snowflake.com/en/sql-reference/account-usage.html#enabling-account-usage-for-other-roles.
Note: the underlying access history views that we use are only available in Snowflake's enterprise edition or higher.
```yml
source:
type: snowflake-usage
config:
username: user
password: pass
host_port: account_name
role: ACCOUNTADMIN
env: PROD
bucket_duration: "DAY"
start_time: ~ # defaults to the last full day in UTC (or hour)
end_time: ~ # defaults to the last full day in UTC (or hour)
top_n_queries: 10 # number of queries to save for each table
```
:::tip
This source only does usage statistics. To get the tables, views, and schemas in your Snowflake warehouse, ingest using the `snowflake` source.
:::
### Console `console` ### Console `console`
Simply prints each metadata event to stdout. Useful for experimentation and debugging purposes. Simply prints each metadata event to stdout. Useful for experimentation and debugging purposes.

View File

@ -91,6 +91,7 @@ plugins: Dict[str, Set[str]] = {
"postgres": sql_common | {"psycopg2-binary", "GeoAlchemy2"}, "postgres": sql_common | {"psycopg2-binary", "GeoAlchemy2"},
"redshift": sql_common | {"sqlalchemy-redshift", "psycopg2-binary", "GeoAlchemy2"}, "redshift": sql_common | {"sqlalchemy-redshift", "psycopg2-binary", "GeoAlchemy2"},
"snowflake": sql_common | {"snowflake-sqlalchemy"}, "snowflake": sql_common | {"snowflake-sqlalchemy"},
"snowflake-usage": sql_common | {"snowflake-sqlalchemy"},
"superset": {"requests"}, "superset": {"requests"},
} }
@ -200,6 +201,7 @@ entry_points = {
"postgres = datahub.ingestion.source.postgres:PostgresSource", "postgres = datahub.ingestion.source.postgres:PostgresSource",
"redshift = datahub.ingestion.source.redshift:RedshiftSource", "redshift = datahub.ingestion.source.redshift:RedshiftSource",
"snowflake = datahub.ingestion.source.snowflake:SnowflakeSource", "snowflake = datahub.ingestion.source.snowflake:SnowflakeSource",
"snowflake-usage = datahub.ingestion.source.snowflake_usage:SnowflakeUsageSource",
"superset = datahub.ingestion.source.superset:SupersetSource", "superset = datahub.ingestion.source.superset:SupersetSource",
], ],
"datahub.ingestion.sink.plugins": [ "datahub.ingestion.sink.plugins": [

View File

@ -146,9 +146,9 @@ class Pipeline:
def pretty_print_summary(self) -> int: def pretty_print_summary(self) -> int:
click.echo() click.echo()
click.secho("Source report:", bold=True) click.secho(f"Source ({self.config.source.type}) report:", bold=True)
click.echo(self.source.get_report().as_string()) click.echo(self.source.get_report().as_string())
click.secho("Sink report:", bold=True) click.secho(f"Sink ({self.config.sink.type}) report:", bold=True)
click.echo(self.sink.get_report().as_string()) click.echo(self.sink.get_report().as_string())
click.echo() click.echo()
if self.source.get_report().failures or self.sink.get_report().failures: if self.source.get_report().failures or self.sink.get_report().failures:

View File

@ -14,11 +14,10 @@ import datahub.emitter.mce_builder as builder
from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.source import Source, SourceReport from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.workunit import UsageStatsWorkUnit from datahub.ingestion.api.workunit import UsageStatsWorkUnit
from datahub.ingestion.source.usage_common import BaseUsageConfig, get_time_bucket from datahub.ingestion.source.usage_common import (
from datahub.metadata.schema_classes import ( BaseUsageConfig,
UsageAggregationClass, GenericAggregatedDataset,
UsageAggregationMetricsClass, get_time_bucket,
UserUsageCountsClass,
) )
from datahub.utilities.delayed_iter import delayed_iter from datahub.utilities.delayed_iter import delayed_iter
@ -101,6 +100,9 @@ class BigQueryTableRef:
return f"projects/{self.project}/datasets/{self.dataset}/tables/{self.table}" return f"projects/{self.project}/datasets/{self.dataset}/tables/{self.table}"
AggregatedDataset = GenericAggregatedDataset[BigQueryTableRef]
def _table_ref_to_urn(ref: BigQueryTableRef, env: str) -> str: def _table_ref_to_urn(ref: BigQueryTableRef, env: str) -> str:
return builder.make_dataset_urn( return builder.make_dataset_urn(
"bigquery", f"{ref.project}.{ref.dataset}.{ref.table}", env "bigquery", f"{ref.project}.{ref.dataset}.{ref.table}", env
@ -213,18 +215,6 @@ class QueryEvent:
return queryEvent return queryEvent
@dataclass
class AggregatedDataset:
bucket_start_time: datetime
resource: BigQueryTableRef
readCount: int = 0
queryCount: int = 0
queryFreq: Counter[str] = dataclasses.field(default_factory=collections.Counter)
userFreq: Counter[str] = dataclasses.field(default_factory=collections.Counter)
columnFreq: Counter[str] = dataclasses.field(default_factory=collections.Counter)
class BigQueryUsageConfig(BaseUsageConfig): class BigQueryUsageConfig(BaseUsageConfig):
project_id: Optional[str] = None project_id: Optional[str] = None
extra_client_options: dict = {} extra_client_options: dict = {}
@ -373,43 +363,15 @@ class BigQueryUsageSource(Source):
resource, resource,
AggregatedDataset(bucket_start_time=floored_ts, resource=resource), AggregatedDataset(bucket_start_time=floored_ts, resource=resource),
) )
agg_bucket.add_read_entry(event.actor_email, event.query, event.fieldsRead)
agg_bucket.readCount += 1
agg_bucket.userFreq[event.actor_email] += 1
if event.query:
agg_bucket.queryCount += 1
agg_bucket.queryFreq[event.query] += 1
for column in event.fieldsRead:
agg_bucket.columnFreq[column] += 1
return datasets return datasets
def _make_usage_stat(self, agg: AggregatedDataset) -> UsageStatsWorkUnit: def _make_usage_stat(self, agg: AggregatedDataset) -> UsageStatsWorkUnit:
return UsageStatsWorkUnit( return agg.make_usage_workunit(
id=f"{agg.bucket_start_time.isoformat()}-{agg.resource}", self.config.bucket_duration,
usageStats=UsageAggregationClass( lambda resource: _table_ref_to_urn(resource, self.config.env),
bucket=int(agg.bucket_start_time.timestamp() * 1000), self.config.top_n_queries,
duration=self.config.bucket_duration,
resource=_table_ref_to_urn(agg.resource, self.config.env),
metrics=UsageAggregationMetricsClass(
uniqueUserCount=len(agg.userFreq),
users=[
UserUsageCountsClass(
user=builder.UNKNOWN_USER,
count=count,
userEmail=user_email,
)
for user_email, count in agg.userFreq.most_common()
],
totalSqlQueries=agg.queryCount,
topSqlQueries=[
query
for query, _ in agg.queryFreq.most_common(
self.config.top_n_queries
)
],
),
),
) )
def get_report(self) -> SourceReport: def get_report(self) -> SourceReport:

View File

@ -0,0 +1,199 @@
import collections
import dataclasses
import json
import logging
from datetime import datetime, timezone
from typing import Dict, Iterable, List, Optional
import pydantic
import pydantic.dataclasses
from sqlalchemy import create_engine
from sqlalchemy.engine import Engine
import datahub.emitter.mce_builder as builder
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.workunit import UsageStatsWorkUnit
from datahub.ingestion.source.snowflake import SnowflakeConfig
from datahub.ingestion.source.usage_common import (
BaseUsageConfig,
GenericAggregatedDataset,
get_time_bucket,
)
logger = logging.getLogger(__name__)
SnowflakeTableRef = str
AggregatedDataset = GenericAggregatedDataset[SnowflakeTableRef]
SNOWFLAKE_USAGE_SQL_TEMPLATE = """
SELECT
-- access_history.query_id, -- only for debugging purposes
access_history.query_start_time,
query_history.query_text,
query_history.query_type,
access_history.base_objects_accessed,
-- access_history.direct_objects_accessed, -- might be useful in the future
-- query_history.execution_status, -- not really necessary, but should equal "SUCCESS"
-- query_history.warehouse_name,
access_history.user_name,
users.first_name,
users.last_name,
users.display_name,
users.email,
query_history.role_name
FROM
snowflake.account_usage.access_history access_history
LEFT JOIN
snowflake.account_usage.query_history query_history
ON access_history.query_id = query_history.query_id
LEFT JOIN
snowflake.account_usage.users users
ON access_history.user_name = users.name
WHERE ARRAY_SIZE(base_objects_accessed) > 0
AND query_start_time >= to_timestamp_ltz({start_time_millis}, 3)
AND query_start_time < to_timestamp_ltz({end_time_millis}, 3)
ORDER BY query_start_time DESC
;
""".strip()
@pydantic.dataclasses.dataclass
class SnowflakeColumnReference:
columnId: int
columnName: str
@pydantic.dataclasses.dataclass
class SnowflakeObjectAccessEntry:
columns: List[SnowflakeColumnReference]
objectDomain: str
objectId: int
objectName: str
@pydantic.dataclasses.dataclass
class SnowflakeJoinedAccessEvent:
query_start_time: datetime
query_text: str
query_type: str
base_objects_accessed: List[SnowflakeObjectAccessEntry]
user_name: str
first_name: Optional[str]
last_name: Optional[str]
display_name: Optional[str]
email: str
role_name: str
class SnowflakeUsageConfig(SnowflakeConfig, BaseUsageConfig):
database: str = "snowflake"
@pydantic.validator("role", always=True)
def role_accountadmin(cls, v):
if not v or v.lower() != "accountadmin":
# This isn't an error, since the privileges can be delegated to other
# roles as well: https://docs.snowflake.com/en/sql-reference/account-usage.html#enabling-account-usage-for-other-roles
logger.info(
'snowflake usage tables are only accessible by role "accountadmin" by default; you set %s',
v,
)
return v
@dataclasses.dataclass
class SnowflakeUsageSource(Source):
config: SnowflakeUsageConfig
report: SourceReport = dataclasses.field(default_factory=SourceReport)
@classmethod
def create(cls, config_dict, ctx):
config = SnowflakeUsageConfig.parse_obj(config_dict)
return cls(ctx, config)
def get_workunits(self) -> Iterable[UsageStatsWorkUnit]:
access_events = self._get_snowflake_history()
aggregated_info = self._aggregate_access_events(access_events)
for time_bucket in aggregated_info.values():
for aggregate in time_bucket.values():
wu = self._make_usage_stat(aggregate)
self.report.report_workunit(wu)
yield wu
def _make_usage_query(self) -> str:
return SNOWFLAKE_USAGE_SQL_TEMPLATE.format(
start_time_millis=int(self.config.start_time.timestamp() * 1000),
end_time_millis=int(self.config.end_time.timestamp() * 1000),
)
def _make_sql_engine(self) -> Engine:
url = self.config.get_sql_alchemy_url()
logger.debug(f"sql_alchemy_url={url}")
engine = create_engine(url, **self.config.options)
return engine
def _get_snowflake_history(self) -> Iterable[SnowflakeJoinedAccessEvent]:
query = self._make_usage_query()
engine = self._make_sql_engine()
results = engine.execute(query)
for row in results:
# Make some minor type conversions.
if hasattr(row, "_asdict"):
# Compat with SQLAlchemy 1.3 and 1.4
# See https://docs.sqlalchemy.org/en/14/changelog/migration_14.html#rowproxy-is-no-longer-a-proxy-is-now-called-row-and-behaves-like-an-enhanced-named-tuple.
event_dict = row._asdict()
else:
event_dict = dict(row)
event_dict["base_objects_accessed"] = json.loads(
event_dict["base_objects_accessed"]
)
event_dict["query_start_time"] = (
event_dict["query_start_time"]
).astimezone(tz=timezone.utc)
event = SnowflakeJoinedAccessEvent(**event_dict)
yield event
def _aggregate_access_events(
self, events: Iterable[SnowflakeJoinedAccessEvent]
) -> Dict[datetime, Dict[SnowflakeTableRef, AggregatedDataset]]:
datasets: Dict[
datetime, Dict[SnowflakeTableRef, AggregatedDataset]
] = collections.defaultdict(dict)
for event in events:
floored_ts = get_time_bucket(
event.query_start_time, self.config.bucket_duration
)
for object in event.base_objects_accessed:
resource = object.objectName
agg_bucket = datasets[floored_ts].setdefault(
resource,
AggregatedDataset(bucket_start_time=floored_ts, resource=resource),
)
agg_bucket.add_read_entry(
event.email,
event.query_text,
[colRef.columnName.lower() for colRef in object.columns],
)
return datasets
def _make_usage_stat(self, agg: AggregatedDataset) -> UsageStatsWorkUnit:
return agg.make_usage_workunit(
self.config.bucket_duration,
lambda resource: builder.make_dataset_urn(
"snowflake", resource.lower(), self.config.env
),
self.config.top_n_queries,
)
def get_report(self):
return self.report
def close(self):
pass

View File

@ -1,11 +1,21 @@
import collections
import dataclasses
import enum import enum
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
from typing import Optional from typing import Callable, Counter, Generic, List, Optional, TypeVar
import pydantic import pydantic
import datahub.emitter.mce_builder as builder
from datahub.configuration.common import ConfigModel from datahub.configuration.common import ConfigModel
from datahub.metadata.schema_classes import WindowDurationClass from datahub.ingestion.api.workunit import UsageStatsWorkUnit
from datahub.metadata.schema_classes import (
FieldUsageCountsClass,
UsageAggregationClass,
UsageAggregationMetricsClass,
UserUsageCountsClass,
WindowDurationClass,
)
@enum.unique @enum.unique
@ -30,6 +40,69 @@ def get_bucket_duration_delta(bucketing: BucketDuration) -> timedelta:
return timedelta(days=1) return timedelta(days=1)
ResourceType = TypeVar("ResourceType")
@dataclasses.dataclass
class GenericAggregatedDataset(Generic[ResourceType]):
bucket_start_time: datetime
resource: ResourceType
readCount: int = 0
queryCount: int = 0
queryFreq: Counter[str] = dataclasses.field(default_factory=collections.Counter)
userFreq: Counter[str] = dataclasses.field(default_factory=collections.Counter)
columnFreq: Counter[str] = dataclasses.field(default_factory=collections.Counter)
def add_read_entry(
self, user: str, query: Optional[str], fields: List[str]
) -> None:
self.readCount += 1
self.userFreq[user] += 1
if query:
self.queryCount += 1
self.queryFreq[query] += 1
for column in fields:
self.columnFreq[column] += 1
def make_usage_workunit(
self,
bucket_duration: BucketDuration,
urn_builder: Callable[[ResourceType], str],
top_n_queries: Optional[int],
) -> UsageStatsWorkUnit:
return UsageStatsWorkUnit(
id=f"{self.bucket_start_time.isoformat()}-{self.resource}",
usageStats=UsageAggregationClass(
bucket=int(self.bucket_start_time.timestamp() * 1000),
duration=bucket_duration,
resource=urn_builder(self.resource),
metrics=UsageAggregationMetricsClass(
uniqueUserCount=len(self.userFreq),
users=[
UserUsageCountsClass(
user=builder.UNKNOWN_USER,
count=count,
userEmail=user_email,
)
for user_email, count in self.userFreq.most_common()
],
totalSqlQueries=self.queryCount,
topSqlQueries=[
query for query, _ in self.queryFreq.most_common(top_n_queries)
],
fields=[
FieldUsageCountsClass(
fieldName=column,
count=count,
)
for column, count in self.columnFreq.most_common()
],
),
),
)
class BaseUsageConfig(ConfigModel): class BaseUsageConfig(ConfigModel):
# start_time and end_time will be populated by the validators. # start_time and end_time will be populated by the validators.
bucket_duration: BucketDuration = BucketDuration.DAY bucket_duration: BucketDuration = BucketDuration.DAY

View File

@ -4,11 +4,13 @@
# Do not modify manually! # Do not modify manually!
# fmt: off # fmt: off
from .....schema_classes import FieldUsageCountsClass
from .....schema_classes import UsageAggregationClass from .....schema_classes import UsageAggregationClass
from .....schema_classes import UsageAggregationMetricsClass from .....schema_classes import UsageAggregationMetricsClass
from .....schema_classes import UserUsageCountsClass from .....schema_classes import UserUsageCountsClass
FieldUsageCounts = FieldUsageCountsClass
UsageAggregation = UsageAggregationClass UsageAggregation = UsageAggregationClass
UsageAggregationMetrics = UsageAggregationMetricsClass UsageAggregationMetrics = UsageAggregationMetricsClass
UserUsageCounts = UserUsageCountsClass UserUsageCounts = UserUsageCountsClass

View File

@ -4731,6 +4731,33 @@
"name": "topSqlQueries", "name": "topSqlQueries",
"default": null, "default": null,
"doc": " Frequent SQL queries; mostly makes sense for datasets in SQL databases " "doc": " Frequent SQL queries; mostly makes sense for datasets in SQL databases "
},
{
"type": [
"null",
{
"type": "array",
"items": {
"type": "record",
"name": "FieldUsageCounts",
"namespace": "com.linkedin.pegasus2avro.usage",
"fields": [
{
"type": "string",
"name": "fieldName"
},
{
"type": "int",
"name": "count"
}
],
"doc": " Records field-level usage counts for a given resource "
}
}
],
"name": "fields",
"default": null,
"doc": " Field-level usage stats "
} }
], ],
"doc": "Metrics for usage data for a given resource and bucket. Not all fields\nmake sense for all buckets, so every field is optional." "doc": "Metrics for usage data for a given resource and bucket. Not all fields\nmake sense for all buckets, so every field is optional."

View File

@ -7606,6 +7606,55 @@ class TagPropertiesClass(DictWrapper):
self._inner_dict['description'] = value self._inner_dict['description'] = value
class FieldUsageCountsClass(DictWrapper):
""" Records field-level usage counts for a given resource """
RECORD_SCHEMA = get_schema_type("com.linkedin.pegasus2avro.usage.FieldUsageCounts")
def __init__(self,
fieldName: str,
count: int,
):
super().__init__()
self.fieldName = fieldName
self.count = count
@classmethod
def construct_with_defaults(cls) -> "FieldUsageCountsClass":
self = cls.construct({})
self._restore_defaults()
return self
def _restore_defaults(self) -> None:
self.fieldName = str()
self.count = int()
@property
def fieldName(self) -> str:
# No docs available.
return self._inner_dict.get('fieldName') # type: ignore
@fieldName.setter
def fieldName(self, value: str) -> None:
# No docs available.
self._inner_dict['fieldName'] = value
@property
def count(self) -> int:
# No docs available.
return self._inner_dict.get('count') # type: ignore
@count.setter
def count(self, value: int) -> None:
# No docs available.
self._inner_dict['count'] = value
class UsageAggregationClass(DictWrapper): class UsageAggregationClass(DictWrapper):
"""Usage data for a given resource, rolled up into a bucket.""" """Usage data for a given resource, rolled up into a bucket."""
@ -7695,6 +7744,7 @@ class UsageAggregationMetricsClass(DictWrapper):
users: Union[None, List["UserUsageCountsClass"]]=None, users: Union[None, List["UserUsageCountsClass"]]=None,
totalSqlQueries: Union[None, int]=None, totalSqlQueries: Union[None, int]=None,
topSqlQueries: Union[None, List[str]]=None, topSqlQueries: Union[None, List[str]]=None,
fields: Union[None, List["FieldUsageCountsClass"]]=None,
): ):
super().__init__() super().__init__()
@ -7702,6 +7752,7 @@ class UsageAggregationMetricsClass(DictWrapper):
self.users = users self.users = users
self.totalSqlQueries = totalSqlQueries self.totalSqlQueries = totalSqlQueries
self.topSqlQueries = topSqlQueries self.topSqlQueries = topSqlQueries
self.fields = fields
@classmethod @classmethod
def construct_with_defaults(cls) -> "UsageAggregationMetricsClass": def construct_with_defaults(cls) -> "UsageAggregationMetricsClass":
@ -7715,6 +7766,7 @@ class UsageAggregationMetricsClass(DictWrapper):
self.users = self.RECORD_SCHEMA.field_map["users"].default self.users = self.RECORD_SCHEMA.field_map["users"].default
self.totalSqlQueries = self.RECORD_SCHEMA.field_map["totalSqlQueries"].default self.totalSqlQueries = self.RECORD_SCHEMA.field_map["totalSqlQueries"].default
self.topSqlQueries = self.RECORD_SCHEMA.field_map["topSqlQueries"].default self.topSqlQueries = self.RECORD_SCHEMA.field_map["topSqlQueries"].default
self.fields = self.RECORD_SCHEMA.field_map["fields"].default
@property @property
@ -7765,6 +7817,18 @@ class UsageAggregationMetricsClass(DictWrapper):
self._inner_dict['topSqlQueries'] = value self._inner_dict['topSqlQueries'] = value
@property
def fields(self) -> Union[None, List["FieldUsageCountsClass"]]:
"""Getter: Field-level usage stats """
return self._inner_dict.get('fields') # type: ignore
@fields.setter
def fields(self, value: Union[None, List["FieldUsageCountsClass"]]) -> None:
"""Setter: Field-level usage stats """
self._inner_dict['fields'] = value
class UserUsageCountsClass(DictWrapper): class UserUsageCountsClass(DictWrapper):
""" Records a single user's usage counts for a given resource """ """ Records a single user's usage counts for a given resource """
@ -7972,6 +8036,7 @@ __SCHEMA_TYPES = {
'com.linkedin.pegasus2avro.schema.UnionType': UnionTypeClass, 'com.linkedin.pegasus2avro.schema.UnionType': UnionTypeClass,
'com.linkedin.pegasus2avro.schema.UrnForeignKey': UrnForeignKeyClass, 'com.linkedin.pegasus2avro.schema.UrnForeignKey': UrnForeignKeyClass,
'com.linkedin.pegasus2avro.tag.TagProperties': TagPropertiesClass, 'com.linkedin.pegasus2avro.tag.TagProperties': TagPropertiesClass,
'com.linkedin.pegasus2avro.usage.FieldUsageCounts': FieldUsageCountsClass,
'com.linkedin.pegasus2avro.usage.UsageAggregation': UsageAggregationClass, 'com.linkedin.pegasus2avro.usage.UsageAggregation': UsageAggregationClass,
'com.linkedin.pegasus2avro.usage.UsageAggregationMetrics': UsageAggregationMetricsClass, 'com.linkedin.pegasus2avro.usage.UsageAggregationMetrics': UsageAggregationMetricsClass,
'com.linkedin.pegasus2avro.usage.UserUsageCounts': UserUsageCountsClass, 'com.linkedin.pegasus2avro.usage.UserUsageCounts': UserUsageCountsClass,
@ -8117,6 +8182,7 @@ __SCHEMA_TYPES = {
'UnionType': UnionTypeClass, 'UnionType': UnionTypeClass,
'UrnForeignKey': UrnForeignKeyClass, 'UrnForeignKey': UrnForeignKeyClass,
'TagProperties': TagPropertiesClass, 'TagProperties': TagPropertiesClass,
'FieldUsageCounts': FieldUsageCountsClass,
'UsageAggregation': UsageAggregationClass, 'UsageAggregation': UsageAggregationClass,
'UsageAggregationMetrics': UsageAggregationMetricsClass, 'UsageAggregationMetrics': UsageAggregationMetricsClass,
'UserUsageCounts': UserUsageCountsClass, 'UserUsageCounts': UserUsageCountsClass,

View File

@ -112,6 +112,32 @@
], ],
"doc": " Frequent SQL queries; mostly makes sense for datasets in SQL databases ", "doc": " Frequent SQL queries; mostly makes sense for datasets in SQL databases ",
"default": null "default": null
},
{
"name": "fields",
"type": [
"null",
{
"type": "array",
"items": {
"type": "record",
"name": "FieldUsageCounts",
"doc": " Records field-level usage counts for a given resource ",
"fields": [
{
"name": "fieldName",
"type": "string"
},
{
"name": "count",
"type": "int"
}
]
}
}
],
"doc": " Field-level usage stats ",
"default": null
} }
] ]
}, },

View File

@ -23,4 +23,4 @@ def assert_mces_equal(output: object, golden: object) -> None:
r"root\[\d+\]\['proposedSnapshot'\].+\['aspects'\].+\['createStamp'\]\['time'\]", r"root\[\d+\]\['proposedSnapshot'\].+\['aspects'\].+\['createStamp'\]\['time'\]",
} }
diff = deepdiff.DeepDiff(golden, output, exclude_regex_paths=ignore_paths) diff = deepdiff.DeepDiff(golden, output, exclude_regex_paths=ignore_paths)
assert not diff assert not diff, str(diff)

View File

@ -13,7 +13,105 @@
} }
], ],
"totalSqlQueries": 0, "totalSqlQueries": 0,
"topSqlQueries": [] "topSqlQueries": [],
"fields": [
{
"fieldName": "unique_key",
"count": 1
},
{
"fieldName": "complaint_type",
"count": 1
},
{
"fieldName": "complaint_description",
"count": 1
},
{
"fieldName": "owning_department",
"count": 1
},
{
"fieldName": "source",
"count": 1
},
{
"fieldName": "status",
"count": 1
},
{
"fieldName": "status_change_date",
"count": 1
},
{
"fieldName": "created_date",
"count": 1
},
{
"fieldName": "last_update_date",
"count": 1
},
{
"fieldName": "close_date",
"count": 1
},
{
"fieldName": "incident_address",
"count": 1
},
{
"fieldName": "street_number",
"count": 1
},
{
"fieldName": "street_name",
"count": 1
},
{
"fieldName": "city",
"count": 1
},
{
"fieldName": "incident_zip",
"count": 1
},
{
"fieldName": "county",
"count": 1
},
{
"fieldName": "state_plane_x_coordinate",
"count": 1
},
{
"fieldName": "state_plane_y_coordinate",
"count": 1
},
{
"fieldName": "latitude",
"count": 1
},
{
"fieldName": "longitude",
"count": 1
},
{
"fieldName": "location",
"count": 1
},
{
"fieldName": "council_district_code",
"count": 1
},
{
"fieldName": "map_page",
"count": 1
},
{
"fieldName": "map_tile",
"count": 1
}
]
} }
}, },
{ {
@ -33,84 +131,56 @@
"topSqlQueries": [ "topSqlQueries": [
"\nSELECT * FROM `harshal-playground-306419.test_schema.excess_deaths_derived`;\n\n", "\nSELECT * FROM `harshal-playground-306419.test_schema.excess_deaths_derived`;\n\n",
"SELECT * FROM `harshal-playground-306419.test_schema.excess_deaths_derived`" "SELECT * FROM `harshal-playground-306419.test_schema.excess_deaths_derived`"
] ],
} "fields": [
{
"fieldName": "placename",
"count": 4
}, },
{ {
"bucket": 1622160000000, "fieldName": "excess_deaths",
"duration": "DAY", "count": 4
"resource": "urn:li:dataset:(urn:li:dataPlatform:bigquery,harshal-playground-306419.test_schema.excess_deaths_derived,PROD)",
"metrics": {
"uniqueUserCount": 1,
"users": [
{
"user": "urn:li:corpuser:unknown",
"count": 2,
"userEmail": "harshal@acryl.io"
}
],
"totalSqlQueries": 2,
"topSqlQueries": [
"SELECT * FROM `harshal-playground-306419.test_schema.excess_deaths_derived`",
"# CREATE OR REPLACE TABLE test_schema.excess_deaths_derived AS (SELECT * FROM `bigquery-public-data.covid19_nyt.excess_deaths` LIMIT 10);\n\nSELECT * FROM `harshal-playground-306419.test_schema.excess_deaths_derived`;\n"
]
}
}, },
{ {
"bucket": 1622505600000, "fieldName": "deaths",
"duration": "DAY", "count": 4
"resource": "urn:li:dataset:(urn:li:dataPlatform:bigquery,harshal-playground-306419.test_schema.excess_deaths_derived,PROD)",
"metrics": {
"uniqueUserCount": 1,
"users": [
{
"user": "urn:li:corpuser:unknown",
"count": 1,
"userEmail": "harshal@acryl.io"
}
],
"totalSqlQueries": 1,
"topSqlQueries": [
"# CREATE OR REPLACE TABLE test_schema.excess_deaths_derived AS (SELECT * FROM `bigquery-public-data.covid19_nyt.excess_deaths` LIMIT 10);\n\nSELECT * FROM `harshal-playground-306419.test_schema.excess_deaths_derived`;\n"
]
}
}, },
{ {
"bucket": 1622505600000, "fieldName": "end_date",
"duration": "DAY", "count": 4
"resource": "urn:li:dataset:(urn:li:dataPlatform:bigquery,harshal-playground-306419.test_schema.austin311_derived,PROD)",
"metrics": {
"uniqueUserCount": 1,
"users": [
{
"user": "urn:li:corpuser:unknown",
"count": 1,
"userEmail": "harshal@acryl.io"
}
],
"totalSqlQueries": 1,
"topSqlQueries": [
"# CREATE OR REPLACE TABLE test_schema.excess_deaths_derived AS (SELECT * FROM `bigquery-public-data.covid19_nyt.excess_deaths` LIMIT 10);\n\nSELECT * FROM `harshal-playground-306419.test_schema.austin311_derived`;\n"
]
}
}, },
{ {
"bucket": 1623888000000, "fieldName": "frequency",
"duration": "DAY", "count": 4
"resource": "urn:li:dataset:(urn:li:dataPlatform:bigquery,harshal-playground-306419.test_schema.austin311_derived,PROD)", },
"metrics": {
"uniqueUserCount": 1,
"users": [
{ {
"user": "urn:li:corpuser:unknown", "fieldName": "expected_deaths",
"count": 2, "count": 4
"userEmail": "harshal@acryl.io" },
{
"fieldName": "start_date",
"count": 4
},
{
"fieldName": "baseline",
"count": 4
},
{
"fieldName": "year",
"count": 4
},
{
"fieldName": "month",
"count": 4
},
{
"fieldName": "week",
"count": 4
},
{
"fieldName": "country",
"count": 4
} }
],
"totalSqlQueries": 2,
"topSqlQueries": [
"select * from `harshal-playground-306419.test_schema.austin311_derived`",
"select complaint_description, complaint_type, unique_key, last_update_date from `harshal-playground-306419.test_schema.austin311_derived`"
] ]
} }
} }

View File

@ -14,6 +14,8 @@ import com.linkedin.metadata.search.elasticsearch.update.BulkListener;
import com.linkedin.metadata.usage.UsageService; import com.linkedin.metadata.usage.UsageService;
import com.linkedin.metadata.utils.elasticsearch.IndexConvention; import com.linkedin.metadata.utils.elasticsearch.IndexConvention;
import com.linkedin.common.WindowDuration; import com.linkedin.common.WindowDuration;
import com.linkedin.usage.FieldUsageCounts;
import com.linkedin.usage.FieldUsageCountsArray;
import com.linkedin.usage.UsageAggregation; import com.linkedin.usage.UsageAggregation;
import com.linkedin.usage.UsageAggregationMetrics; import com.linkedin.usage.UsageAggregationMetrics;
import com.linkedin.usage.UserUsageCounts; import com.linkedin.usage.UserUsageCounts;
@ -139,6 +141,16 @@ public class ElasticUsageService implements UsageService {
document.set("metrics.top_sql_queries", sqlQueriesDocument); document.set("metrics.top_sql_queries", sqlQueriesDocument);
}); });
Optional.ofNullable(bucket.getMetrics().getFields()).ifPresent(fields -> {
ArrayNode fieldsDocument = JsonNodeFactory.instance.arrayNode();
fields.forEach(fieldUsage -> {
ObjectNode fieldDocument = JsonNodeFactory.instance.objectNode();
fieldDocument.set("field_name", JsonNodeFactory.instance.textNode(fieldUsage.getFieldName()));
fieldDocument.set("count", JsonNodeFactory.instance.numberNode(fieldUsage.getCount()));
fieldsDocument.add(fieldDocument);
});
document.set("metrics.fields", fieldsDocument);
});
return document.toString(); return document.toString();
} }
@ -167,7 +179,6 @@ public class ElasticUsageService implements UsageService {
if (endTime != null) { if (endTime != null) {
finalQuery.must(QueryBuilders.rangeQuery(ES_KEY_BUCKET_END).lte(endTime)); finalQuery.must(QueryBuilders.rangeQuery(ES_KEY_BUCKET_END).lte(endTime));
} }
// TODO handle "latest N buckets" style queries
final SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); final SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(finalQuery); searchSourceBuilder.query(finalQuery);
@ -231,6 +242,18 @@ public class ElasticUsageService implements UsageService {
metrics.setTopSqlQueries(queries); metrics.setTopSqlQueries(queries);
} }
if (docFields.containsKey("metrics.fields")) {
FieldUsageCountsArray fields = new FieldUsageCountsArray();
List<Map<String, Object>> docUsers = (List<Map<String, Object>>) docFields.get("metrics.fields");
for (Map<String, Object> map : docUsers) {
FieldUsageCounts fieldUsage = new FieldUsageCounts();
fieldUsage.setFieldName((String) map.get("field_name"));
fieldUsage.setCount((Integer) map.get("count"));
fields.add(fieldUsage);
}
metrics.setFields(fields);
}
return agg; return agg;
} catch (URISyntaxException e) { } catch (URISyntaxException e) {
throw new IllegalArgumentException(e); throw new IllegalArgumentException(e);

View File

@ -0,0 +1,7 @@
namespace com.linkedin.usage
/** Records field-level usage counts for a given resource */
record FieldUsageCounts {
fieldName: string
count: int
}

View File

@ -16,4 +16,7 @@ record UsageAggregationMetrics {
/** Frequent SQL queries; mostly makes sense for datasets in SQL databases */ /** Frequent SQL queries; mostly makes sense for datasets in SQL databases */
topSqlQueries: optional array[string] topSqlQueries: optional array[string]
/** Field-level usage stats */
fields: optional array[FieldUsageCounts]
} }

View File

@ -17,5 +17,7 @@ record UsageQueryResult {
totalSqlQueries: optional int totalSqlQueries: optional int
users: optional array[UserUsageCounts] users: optional array[UserUsageCounts]
fields: optional array[FieldUsageCounts]
} }
} }

View File

@ -10,6 +10,9 @@ FRONTEND_ENDPOINT = "http://localhost:9002"
KAFKA_BROKER = "localhost:9092" KAFKA_BROKER = "localhost:9092"
bootstrap_sample_data = "../metadata-ingestion/examples/mce_files/bootstrap_mce.json" bootstrap_sample_data = "../metadata-ingestion/examples/mce_files/bootstrap_mce.json"
usage_sample_data = (
"../metadata-ingestion/tests/integration/bigquery-usage/bigquery_usages_golden.json"
)
bq_sample_data = "./sample_bq_data.json" bq_sample_data = "./sample_bq_data.json"
restli_default_headers = { restli_default_headers = {
"X-RestLi-Protocol-Version": "2.0.0", "X-RestLi-Protocol-Version": "2.0.0",
@ -30,13 +33,12 @@ def test_healthchecks(wait_for_healthchecks):
pass pass
@pytest.mark.dependency(depends=["test_healthchecks"]) def ingest_file(filename: str):
def test_ingestion_via_rest(wait_for_healthchecks):
pipeline = Pipeline.create( pipeline = Pipeline.create(
{ {
"source": { "source": {
"type": "file", "type": "file",
"config": {"filename": bootstrap_sample_data}, "config": {"filename": filename},
}, },
"sink": { "sink": {
"type": "datahub-rest", "type": "datahub-rest",
@ -48,6 +50,16 @@ def test_ingestion_via_rest(wait_for_healthchecks):
pipeline.raise_from_status() pipeline.raise_from_status()
@pytest.mark.dependency(depends=["test_healthchecks"])
def test_ingestion_via_rest(wait_for_healthchecks):
ingest_file(bootstrap_sample_data)
@pytest.mark.dependency(depends=["test_healthchecks"])
def test_ingestion_usage_via_rest(wait_for_healthchecks):
ingest_file(usage_sample_data)
@pytest.mark.dependency(depends=["test_healthchecks"]) @pytest.mark.dependency(depends=["test_healthchecks"])
def test_ingestion_via_kafka(wait_for_healthchecks): def test_ingestion_via_kafka(wait_for_healthchecks):
pipeline = Pipeline.create( pipeline = Pipeline.create(
@ -74,7 +86,13 @@ def test_ingestion_via_kafka(wait_for_healthchecks):
time.sleep(kafka_post_ingestion_wait_sec) time.sleep(kafka_post_ingestion_wait_sec)
@pytest.mark.dependency(depends=["test_ingestion_via_rest", "test_ingestion_via_kafka"]) @pytest.mark.dependency(
depends=[
"test_ingestion_via_rest",
"test_ingestion_via_kafka",
"test_ingestion_usage_via_rest",
]
)
def test_run_ingestion(wait_for_healthchecks): def test_run_ingestion(wait_for_healthchecks):
# Dummy test so that future ones can just depend on this one. # Dummy test so that future ones can just depend on this one.
pass pass
@ -193,6 +211,39 @@ def test_gms_search_dataset(query, min_expected_results):
assert data["elements"][0]["urn"] assert data["elements"][0]["urn"]
@pytest.mark.dependency(depends=["test_healthchecks", "test_run_ingestion"])
def test_gms_usage_fetch():
response = requests.post(
f"{GMS_ENDPOINT}/usageStats?action=queryRange",
headers=restli_default_headers,
json={
"resource": "urn:li:dataset:(urn:li:dataPlatform:bigquery,harshal-playground-306419.test_schema.excess_deaths_derived,PROD)",
"duration": "DAY",
"rangeFromEnd": "ALL",
},
)
response.raise_for_status()
data = response.json()["value"]
assert len(data["buckets"]) == 3
assert data["buckets"][0]["metrics"]["topSqlQueries"]
fields = data["aggregations"].pop("fields")
assert len(fields) == 12
assert fields[0]["count"] == 7
users = data["aggregations"].pop("users")
assert len(users) == 1
assert users[0]["count"] == 7
assert data["aggregations"] == {
# "fields" and "users" already popped out
"totalSqlQueries": 7,
"uniqueUserCount": 1,
}
@pytest.fixture(scope="session") @pytest.fixture(scope="session")
def frontend_session(wait_for_healthchecks): def frontend_session(wait_for_healthchecks):
session = requests.Session() session = requests.Session()