feat(ingest): add escape hatch methods to SqlParsingAggregator (#9860)

This commit is contained in:
Harshal Sheth 2024-02-21 11:33:05 -08:00 committed by GitHub
parent ac1ee6cd12
commit f13ae77966
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 764 additions and 169 deletions

View File

@ -5,15 +5,18 @@ import json
import logging
import pathlib
import tempfile
import uuid
from collections import defaultdict
from datetime import datetime, timezone
from typing import Callable, Dict, Iterable, List, Optional, Set, cast
from typing import Callable, Dict, Iterable, List, Optional, Set, Union, cast
import datahub.emitter.mce_builder as builder
import datahub.metadata.schema_classes as models
from datahub.emitter.mce_builder import get_sys_time, make_ts_millis
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.sql_parsing_builder import compute_upstream_fields
from datahub.ingestion.api.report import Report
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.graph.client import DataHubGraph
from datahub.ingestion.source.usage.usage_common import BaseUsageConfig, UsageAggregator
from datahub.metadata.urns import (
@ -32,7 +35,7 @@ from datahub.sql_parsing.sqlglot_lineage import (
infer_output_schema,
sqlglot_lineage,
)
from datahub.sql_parsing.sqlglot_utils import generate_hash
from datahub.sql_parsing.sqlglot_utils import generate_hash, get_query_fingerprint
from datahub.utilities.file_backed_collections import (
ConnectionWrapper,
FileBackedDict,
@ -57,8 +60,6 @@ class QueryLogSetting(enum.Enum):
@dataclasses.dataclass
class ViewDefinition:
# TODO view urn?
view_definition: str
default_db: Optional[str] = None
default_schema: Optional[str] = None
@ -95,6 +96,18 @@ class QueryMetadata:
)
@dataclasses.dataclass
class KnownQueryLineageInfo:
query_text: str
downstream: UrnStr
upstreams: List[UrnStr]
column_lineage: Optional[List[ColumnLineageInfo]] = None
timestamp: Optional[datetime] = None
query_type: QueryType = QueryType.UNKNOWN
@dataclasses.dataclass
class SqlAggregatorReport(Report):
_aggregator: "SqlParsingAggregator"
@ -103,12 +116,16 @@ class SqlAggregatorReport(Report):
num_observed_queries: int = 0
num_observed_queries_failed: int = 0
num_observed_queries_column_failed: int = 0
observed_query_parse_failures = LossyList[str]()
observed_query_parse_failures: LossyList[str] = dataclasses.field(
default_factory=LossyList
)
num_view_definitions: int = 0
num_views_failed: int = 0
num_views_column_failed: int = 0
views_parse_failures = LossyDict[UrnStr, str]()
views_parse_failures: LossyDict[UrnStr, str] = dataclasses.field(
default_factory=LossyDict
)
num_queries_with_temp_tables_in_session: int = 0
@ -142,8 +159,8 @@ class SqlParsingAggregator:
self,
*,
platform: str,
platform_instance: Optional[str],
env: str,
platform_instance: Optional[str] = None,
env: str = builder.DEFAULT_ENV,
graph: Optional[DataHubGraph] = None,
generate_lineage: bool = True,
generate_queries: bool = True,
@ -246,7 +263,7 @@ class SqlParsingAggregator:
return self.generate_lineage or self.generate_usage_statistics
def register_schema(
self, urn: DatasetUrn, schema: models.SchemaMetadataClass
self, urn: Union[str, DatasetUrn], schema: models.SchemaMetadataClass
) -> None:
# If lineage or usage is enabled, adds the schema to the schema resolver
# by putting the condition in here, we can avoid all the conditional
@ -255,6 +272,16 @@ class SqlParsingAggregator:
if self._need_schemas:
self._schema_resolver.add_schema_metadata(str(urn), schema)
def register_schemas_from_stream(
self, stream: Iterable[MetadataWorkUnit]
) -> Iterable[MetadataWorkUnit]:
for wu in stream:
schema_metadata = wu.get_aspect_of_type(models.SchemaMetadataClass)
if schema_metadata:
self.register_schema(wu.get_urn(), schema_metadata)
yield wu
def _initialize_schema_resolver_from_graph(self, graph: DataHubGraph) -> None:
# requires a graph instance
# if no schemas are currently registered in the schema resolver
@ -284,6 +311,96 @@ class SqlParsingAggregator:
env=self.env,
)
def add_known_query_lineage(
self, known_query_lineage: KnownQueryLineageInfo
) -> None:
"""Add a query and it's precomputed lineage to the aggregator.
This is useful for cases where we have lineage information that was
computed outside of the SQL parsing aggregator, e.g. from a data
warehouse's system tables.
This will also generate an operation aspect for the query if there is
a timestamp and the query type field is set to a mutation type.
Args:
known_query_lineage: The known query lineage information.
"""
# Generate a fingerprint for the query.
query_fingerprint = get_query_fingerprint(
known_query_lineage.query_text, self.platform.platform_name
)
# TODO format the query text?
# Register the query.
self._add_to_query_map(
QueryMetadata(
query_id=query_fingerprint,
formatted_query_string=known_query_lineage.query_text,
session_id=_MISSING_SESSION_ID,
query_type=known_query_lineage.query_type,
lineage_type=models.DatasetLineageTypeClass.TRANSFORMED,
latest_timestamp=known_query_lineage.timestamp,
actor=None,
upstreams=known_query_lineage.upstreams,
column_lineage=known_query_lineage.column_lineage or [],
confidence_score=1.0,
)
)
# Register the lineage.
self._lineage_map.for_mutation(
known_query_lineage.downstream, OrderedSet()
).add(query_fingerprint)
def add_known_lineage_mapping(
self,
upstream_urn: UrnStr,
downstream_urn: UrnStr,
lineage_type: str = models.DatasetLineageTypeClass.COPY,
) -> None:
"""Add a known lineage mapping to the aggregator.
By mapping, we mean that the downstream is effectively a copy or
alias of the upstream. This is useful for things like external tables
(e.g. Redshift Spectrum, Redshift UNLOADs, Snowflake external tables).
Because this method takes in urns, it does not require that the urns
are part of the platform that the aggregator is configured for.
TODO: In the future, this method will also generate CLL if we have
schemas for either the upstream or downstream.
The known lineage mapping does not contribute to usage statistics or operations.
Args:
upstream_urn: The upstream dataset URN.
downstream_urn: The downstream dataset URN.
"""
# We generate a fake "query" object to hold the lineage.
query_id = self._known_lineage_query_id()
# Register the query.
self._add_to_query_map(
QueryMetadata(
query_id=query_id,
formatted_query_string="-skip-",
session_id=_MISSING_SESSION_ID,
query_type=QueryType.UNKNOWN,
lineage_type=lineage_type,
latest_timestamp=None,
actor=None,
upstreams=[upstream_urn],
column_lineage=[],
confidence_score=1.0,
)
)
# Register the lineage.
self._lineage_map.for_mutation(downstream_urn, OrderedSet()).add(query_id)
def add_view_definition(
self,
view_urn: DatasetUrn,
@ -449,6 +566,10 @@ class SqlParsingAggregator:
def _process_view_definition(
self, view_urn: UrnStr, view_definition: ViewDefinition
) -> None:
# Note that in some cases, the view definition will be a SELECT statement
# instead of a CREATE VIEW ... AS SELECT statement. In those cases, we can't
# trust the parsed query type or downstream urn.
# Run the SQL parser.
parsed = self._run_sql_parser(
view_definition.view_definition,
@ -464,10 +585,6 @@ class SqlParsingAggregator:
elif parsed.debug_info.error:
self.report.num_views_column_failed += 1
# Note that in some cases, the view definition will be a SELECT statement
# instead of a CREATE VIEW ... AS SELECT statement. In those cases, we can't
# trust the parsed query type or downstream urn.
query_fingerprint = self._view_query_id(view_urn)
# Register the query.
@ -540,15 +657,6 @@ class SqlParsingAggregator:
else:
self._query_map[query_fingerprint] = new
"""
def add_lineage(self) -> None:
# A secondary mechanism for adding non-SQL-based lineage
# e.g. redshift external tables might use this when pointing at s3
# TODO Add this once we have a use case for it
pass
"""
def gen_metadata(self) -> Iterable[MetadataChangeProposalWrapper]:
# diff from v1 - we generate operations here, and it also
# generates MCPWs instead of workunits
@ -569,7 +677,7 @@ class SqlParsingAggregator:
# Generate lineage and queries.
queries_generated: Set[QueryId] = set()
for downstream_urn in self._lineage_map:
for downstream_urn in sorted(self._lineage_map):
yield from self._gen_lineage_for_downstream(
downstream_urn, queries_generated=queries_generated
)
@ -640,7 +748,9 @@ class SqlParsingAggregator:
dataset=upstream_urn,
type=queries_map[query_id].lineage_type,
query=(
self._query_urn(query_id) if self.generate_queries else None
self._query_urn(query_id)
if self.can_generate_query(query_id)
else None
),
created=query.make_created_audit_stamp(),
auditStamp=models.AuditStampClass(
@ -671,7 +781,9 @@ class SqlParsingAggregator:
SchemaFieldUrn(downstream_urn, downstream_column).urn()
],
query=(
self._query_urn(query_id) if self.generate_queries else None
self._query_urn(query_id)
if self.can_generate_query(query_id)
else None
),
confidenceScore=queries_map[query_id].confidence_score,
)
@ -682,9 +794,10 @@ class SqlParsingAggregator:
aspect=upstream_aspect,
)
if not self.generate_queries:
return
for query_id in required_queries:
if not self.can_generate_query(query_id):
continue
# Avoid generating the same query twice.
if query_id in queries_generated:
continue
@ -696,6 +809,7 @@ class SqlParsingAggregator:
entityUrn=self._query_urn(query_id),
aspects=[
models.QueryPropertiesClass(
dataPlatform=self.platform.urn(),
statement=models.QueryStatementClass(
value=query.formatted_query_string,
language=models.QueryLanguageClass.SQL,
@ -729,6 +843,19 @@ class SqlParsingAggregator:
def _view_query_id(cls, view_urn: UrnStr) -> str:
return f"view_{DatasetUrn.url_encode(view_urn)}"
@classmethod
def _known_lineage_query_id(cls) -> str:
return f"known_{uuid.uuid4()}"
@classmethod
def _is_known_lineage_query_id(cls, query_id: QueryId) -> bool:
# Our query fingerprints are hex and won't have underscores, so this will
# never conflict with a real query fingerprint.
return query_id.startswith("known_")
def can_generate_query(self, query_id: QueryId) -> bool:
return self.generate_queries and not self._is_known_lineage_query_id(query_id)
def _resolve_query_with_temp_tables(
self,
base_query: QueryMetadata,
@ -895,8 +1022,10 @@ class SqlParsingAggregator:
operationType=operation_type,
lastUpdatedTimestamp=make_ts_millis(query.latest_timestamp),
actor=query.actor.urn() if query.actor else None,
customProperties={
"query_urn": self._query_urn(query_id),
},
customProperties=(
{"query_urn": self._query_urn(query_id)}
if self.can_generate_query(query_id)
else None
),
)
yield MetadataChangeProposalWrapper(entityUrn=downstream_urn, aspect=aspect)

View File

@ -1,8 +1,11 @@
import hashlib
import logging
from typing import Dict, Iterable, Optional, Union
import sqlglot
import sqlglot.errors
logger = logging.getLogger(__name__)
DialectOrStr = Union[sqlglot.Dialect, str]
@ -139,10 +142,17 @@ def get_query_fingerprint(
The fingerprint for the SQL query.
"""
dialect = get_dialect(dialect)
expression_sql = generalize_query(expression, dialect=dialect)
fingerprint = generate_hash(expression_sql)
try:
dialect = get_dialect(dialect)
expression_sql = generalize_query(expression, dialect=dialect)
except (ValueError, sqlglot.errors.SqlglotError) as e:
if not isinstance(expression, str):
raise
logger.debug("Failed to generalize query for fingerprinting: %s", e)
expression_sql = expression
fingerprint = generate_hash(expression_sql)
return fingerprint

View File

@ -0,0 +1,127 @@
[
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD)",
"changeType": "UPSERT",
"aspectName": "upstreamLineage",
"aspect": {
"json": {
"upstreams": [
{
"auditStamp": {
"time": 1707182625000,
"actor": "urn:li:corpuser:_ingestion"
},
"created": {
"time": 20000,
"actor": "urn:li:corpuser:_ingestion"
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD)",
"type": "TRANSFORMED",
"query": "urn:li:query:02e2ec36678bea2a8c4c855fed5255d087cfeb2710d326e95fd9b48a9c4fc0ae"
}
],
"fineGrainedLineages": [
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD),a)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD),a)"
],
"confidenceScore": 1.0,
"query": "urn:li:query:02e2ec36678bea2a8c4c855fed5255d087cfeb2710d326e95fd9b48a9c4fc0ae"
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD),b)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD),b)"
],
"confidenceScore": 1.0,
"query": "urn:li:query:02e2ec36678bea2a8c4c855fed5255d087cfeb2710d326e95fd9b48a9c4fc0ae"
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD),c)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD),c)"
],
"confidenceScore": 1.0,
"query": "urn:li:query:02e2ec36678bea2a8c4c855fed5255d087cfeb2710d326e95fd9b48a9c4fc0ae"
}
]
}
}
},
{
"entityType": "query",
"entityUrn": "urn:li:query:02e2ec36678bea2a8c4c855fed5255d087cfeb2710d326e95fd9b48a9c4fc0ae",
"changeType": "UPSERT",
"aspectName": "queryProperties",
"aspect": {
"json": {
"statement": {
"value": "insert into foo (a, b, c) select a, b, c from bar",
"language": "SQL"
},
"source": "SYSTEM",
"created": {
"time": 0,
"actor": "urn:li:corpuser:_ingestion"
},
"lastModified": {
"time": 1707182625000,
"actor": "urn:li:corpuser:_ingestion"
},
"dataPlatform": "urn:li:dataPlatform:redshift"
}
}
},
{
"entityType": "query",
"entityUrn": "urn:li:query:02e2ec36678bea2a8c4c855fed5255d087cfeb2710d326e95fd9b48a9c4fc0ae",
"changeType": "UPSERT",
"aspectName": "querySubjects",
"aspect": {
"json": {
"subjects": [
{
"entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD)"
},
{
"entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD)"
}
]
}
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD)",
"changeType": "UPSERT",
"aspectName": "operation",
"aspect": {
"json": {
"timestampMillis": 1707182625000,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
},
"operationType": "INSERT",
"customProperties": {
"query_urn": "urn:li:query:02e2ec36678bea2a8c4c855fed5255d087cfeb2710d326e95fd9b48a9c4fc0ae"
},
"lastUpdatedTimestamp": 20000
}
}
}
]

View File

@ -13,9 +13,6 @@
},
"actor": "urn:li:corpuser:user2",
"operationType": "CREATE",
"customProperties": {
"query_urn": "urn:li:query:cbdb3e148ea7fdae81815da4dd64f57873fb9c3d7d4bfad4e83b3d1ebd3c45c2"
},
"lastUpdatedTimestamp": 25000
}
}
@ -34,9 +31,6 @@
},
"actor": "urn:li:corpuser:user3",
"operationType": "CREATE",
"customProperties": {
"query_urn": "urn:li:query:7fd78ed5f3d60f7f91206f5e0fea6851a2afe940944455fd292267613b7ee1e6"
},
"lastUpdatedTimestamp": 26000
}
}

View File

@ -67,9 +67,10 @@
"actor": "urn:li:corpuser:_ingestion"
},
"lastModified": {
"time": 0,
"time": 1707182625000,
"actor": "urn:li:corpuser:_ingestion"
}
},
"dataPlatform": "urn:li:dataPlatform:redshift"
}
}
},

View File

@ -0,0 +1,149 @@
[
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD)",
"changeType": "UPSERT",
"aspectName": "upstreamLineage",
"aspect": {
"json": {
"upstreams": [
{
"auditStamp": {
"time": 1707182625000,
"actor": "urn:li:corpuser:_ingestion"
},
"created": {
"time": 0,
"actor": "urn:li:corpuser:_ingestion"
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD)",
"type": "TRANSFORMED",
"query": "urn:li:query:fc48287a96588c73bcbdc1400f0c036b8d81196135618fb09a097459d54bd970"
}
],
"fineGrainedLineages": [
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD),a)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD),a)"
],
"confidenceScore": 0.2,
"query": "urn:li:query:fc48287a96588c73bcbdc1400f0c036b8d81196135618fb09a097459d54bd970"
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD),b)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD),b)"
],
"confidenceScore": 0.2,
"query": "urn:li:query:fc48287a96588c73bcbdc1400f0c036b8d81196135618fb09a097459d54bd970"
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD),c)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD),c)"
],
"confidenceScore": 0.2,
"query": "urn:li:query:02e2ec36678bea2a8c4c855fed5255d087cfeb2710d326e95fd9b48a9c4fc0ae"
}
]
}
}
},
{
"entityType": "query",
"entityUrn": "urn:li:query:fc48287a96588c73bcbdc1400f0c036b8d81196135618fb09a097459d54bd970",
"changeType": "UPSERT",
"aspectName": "queryProperties",
"aspect": {
"json": {
"statement": {
"value": "/* query 2 */ insert into foo (a, b) select a, b from bar",
"language": "SQL"
},
"source": "SYSTEM",
"created": {
"time": 0,
"actor": "urn:li:corpuser:_ingestion"
},
"lastModified": {
"time": 1707182625000,
"actor": "urn:li:corpuser:_ingestion"
},
"dataPlatform": "urn:li:dataPlatform:redshift"
}
}
},
{
"entityType": "query",
"entityUrn": "urn:li:query:fc48287a96588c73bcbdc1400f0c036b8d81196135618fb09a097459d54bd970",
"changeType": "UPSERT",
"aspectName": "querySubjects",
"aspect": {
"json": {
"subjects": [
{
"entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD)"
},
{
"entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD)"
}
]
}
}
},
{
"entityType": "query",
"entityUrn": "urn:li:query:02e2ec36678bea2a8c4c855fed5255d087cfeb2710d326e95fd9b48a9c4fc0ae",
"changeType": "UPSERT",
"aspectName": "queryProperties",
"aspect": {
"json": {
"statement": {
"value": "/* query 1 */ insert into foo (a, b, c) select a, b, c from bar",
"language": "SQL"
},
"source": "SYSTEM",
"created": {
"time": 0,
"actor": "urn:li:corpuser:_ingestion"
},
"lastModified": {
"time": 1707182625000,
"actor": "urn:li:corpuser:_ingestion"
},
"dataPlatform": "urn:li:dataPlatform:redshift"
}
}
},
{
"entityType": "query",
"entityUrn": "urn:li:query:02e2ec36678bea2a8c4c855fed5255d087cfeb2710d326e95fd9b48a9c4fc0ae",
"changeType": "UPSERT",
"aspectName": "querySubjects",
"aspect": {
"json": {
"subjects": [
{
"entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD)"
},
{
"entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD)"
}
]
}
}
}
]

View File

@ -0,0 +1,77 @@
[
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD)",
"changeType": "UPSERT",
"aspectName": "upstreamLineage",
"aspect": {
"json": {
"upstreams": [
{
"auditStamp": {
"time": 1707182625000,
"actor": "urn:li:corpuser:_ingestion"
},
"created": {
"time": 0,
"actor": "urn:li:corpuser:_ingestion"
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:s3,bucket1/key1,PROD)",
"type": "COPY"
}
],
"fineGrainedLineages": []
}
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD)",
"changeType": "UPSERT",
"aspectName": "upstreamLineage",
"aspect": {
"json": {
"upstreams": [
{
"auditStamp": {
"time": 1707182625000,
"actor": "urn:li:corpuser:_ingestion"
},
"created": {
"time": 0,
"actor": "urn:li:corpuser:_ingestion"
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD)",
"type": "COPY"
}
],
"fineGrainedLineages": []
}
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,bucket2/key2,PROD)",
"changeType": "UPSERT",
"aspectName": "upstreamLineage",
"aspect": {
"json": {
"upstreams": [
{
"auditStamp": {
"time": 1707182625000,
"actor": "urn:li:corpuser:_ingestion"
},
"created": {
"time": 0,
"actor": "urn:li:corpuser:_ingestion"
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD)",
"type": "COPY"
}
],
"fineGrainedLineages": []
}
}
}
]

View File

@ -87,47 +87,6 @@
}
}
},
{
"entityType": "query",
"entityUrn": "urn:li:query:377a73bbf094c8b176b15157c24242cdfc7a0f407d78e52e63ded08c913468f1",
"changeType": "UPSERT",
"aspectName": "queryProperties",
"aspect": {
"json": {
"statement": {
"value": "insert into downstream (a, b) select a, b from upstream1",
"language": "SQL"
},
"source": "SYSTEM",
"created": {
"time": 0,
"actor": "urn:li:corpuser:_ingestion"
},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:_ingestion"
}
}
}
},
{
"entityType": "query",
"entityUrn": "urn:li:query:377a73bbf094c8b176b15157c24242cdfc7a0f407d78e52e63ded08c913468f1",
"changeType": "UPSERT",
"aspectName": "querySubjects",
"aspect": {
"json": {
"subjects": [
{
"entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.downstream,PROD)"
},
{
"entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.upstream1,PROD)"
}
]
}
}
},
{
"entityType": "query",
"entityUrn": "urn:li:query:c4b3a21ef8c262ebbe99a5bdb6c29cb0be646392bb4af10b6f4a758af881470e",
@ -141,13 +100,14 @@
},
"source": "SYSTEM",
"created": {
"time": 0,
"time": 25000,
"actor": "urn:li:corpuser:_ingestion"
},
"lastModified": {
"time": 0,
"time": 25000,
"actor": "urn:li:corpuser:_ingestion"
}
},
"dataPlatform": "urn:li:dataPlatform:redshift"
}
}
},
@ -168,5 +128,47 @@
]
}
}
},
{
"entityType": "query",
"entityUrn": "urn:li:query:377a73bbf094c8b176b15157c24242cdfc7a0f407d78e52e63ded08c913468f1",
"changeType": "UPSERT",
"aspectName": "queryProperties",
"aspect": {
"json": {
"statement": {
"value": "insert into downstream (a, b) select a, b from upstream1",
"language": "SQL"
},
"source": "SYSTEM",
"created": {
"time": 20000,
"actor": "urn:li:corpuser:_ingestion"
},
"lastModified": {
"time": 20000,
"actor": "urn:li:corpuser:_ingestion"
},
"dataPlatform": "urn:li:dataPlatform:redshift"
}
}
},
{
"entityType": "query",
"entityUrn": "urn:li:query:377a73bbf094c8b176b15157c24242cdfc7a0f407d78e52e63ded08c913468f1",
"changeType": "UPSERT",
"aspectName": "querySubjects",
"aspect": {
"json": {
"subjects": [
{
"entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.downstream,PROD)"
},
{
"entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.upstream1,PROD)"
}
]
}
}
}
]

View File

@ -1,7 +1,7 @@
[
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo_session3,PROD)",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD)",
"changeType": "UPSERT",
"aspectName": "upstreamLineage",
"aspect": {
@ -16,24 +16,49 @@
"time": 0,
"actor": "urn:li:corpuser:_ingestion"
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD)",
"dataset": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD)",
"type": "TRANSFORMED",
"query": "urn:li:query:3e85e6f353c7fa33d6514cb090482852064d23df6491c9a8ae28be0d990a3c71"
"query": "urn:li:query:8b3a079997d562bdb1b14eb500e6123c4b00bb0263565dcaa0b66170e72602a1"
}
],
"fineGrainedLineages": []
"fineGrainedLineages": [
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD),a)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD),a)"
],
"confidenceScore": 0.35,
"query": "urn:li:query:8b3a079997d562bdb1b14eb500e6123c4b00bb0263565dcaa0b66170e72602a1"
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD),b)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD),b)"
],
"confidenceScore": 0.35,
"query": "urn:li:query:8b3a079997d562bdb1b14eb500e6123c4b00bb0263565dcaa0b66170e72602a1"
}
]
}
}
},
{
"entityType": "query",
"entityUrn": "urn:li:query:3e85e6f353c7fa33d6514cb090482852064d23df6491c9a8ae28be0d990a3c71",
"entityUrn": "urn:li:query:8b3a079997d562bdb1b14eb500e6123c4b00bb0263565dcaa0b66170e72602a1",
"changeType": "UPSERT",
"aspectName": "queryProperties",
"aspect": {
"json": {
"statement": {
"value": "create table foo_session3 as select * from foo",
"value": "create table foo as select a, 2*b as b from bar",
"language": "SQL"
},
"source": "SYSTEM",
@ -42,25 +67,26 @@
"actor": "urn:li:corpuser:_ingestion"
},
"lastModified": {
"time": 0,
"time": 1707182625000,
"actor": "urn:li:corpuser:_ingestion"
}
},
"dataPlatform": "urn:li:dataPlatform:redshift"
}
}
},
{
"entityType": "query",
"entityUrn": "urn:li:query:3e85e6f353c7fa33d6514cb090482852064d23df6491c9a8ae28be0d990a3c71",
"entityUrn": "urn:li:query:8b3a079997d562bdb1b14eb500e6123c4b00bb0263565dcaa0b66170e72602a1",
"changeType": "UPSERT",
"aspectName": "querySubjects",
"aspect": {
"json": {
"subjects": [
{
"entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo_session3,PROD)"
"entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD)"
},
{
"entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD)"
"entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD)"
}
]
}
@ -135,60 +161,10 @@
"actor": "urn:li:corpuser:_ingestion"
},
"lastModified": {
"time": 1707251710392,
"time": 1707182625000,
"actor": "urn:li:corpuser:_ingestion"
}
}
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD)",
"changeType": "UPSERT",
"aspectName": "upstreamLineage",
"aspect": {
"json": {
"upstreams": [
{
"auditStamp": {
"time": 1707182625000,
"actor": "urn:li:corpuser:_ingestion"
},
"created": {
"time": 0,
"actor": "urn:li:corpuser:_ingestion"
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD)",
"type": "TRANSFORMED",
"query": "urn:li:query:8b3a079997d562bdb1b14eb500e6123c4b00bb0263565dcaa0b66170e72602a1"
}
],
"fineGrainedLineages": [
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD),a)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD),a)"
],
"confidenceScore": 0.35,
"query": "urn:li:query:8b3a079997d562bdb1b14eb500e6123c4b00bb0263565dcaa0b66170e72602a1"
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD),b)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD),b)"
],
"confidenceScore": 0.35,
"query": "urn:li:query:8b3a079997d562bdb1b14eb500e6123c4b00bb0263565dcaa0b66170e72602a1"
}
]
},
"dataPlatform": "urn:li:dataPlatform:redshift"
}
}
},
@ -210,15 +186,41 @@
}
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo_session3,PROD)",
"changeType": "UPSERT",
"aspectName": "upstreamLineage",
"aspect": {
"json": {
"upstreams": [
{
"auditStamp": {
"time": 1707182625000,
"actor": "urn:li:corpuser:_ingestion"
},
"created": {
"time": 0,
"actor": "urn:li:corpuser:_ingestion"
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD)",
"type": "TRANSFORMED",
"query": "urn:li:query:3e85e6f353c7fa33d6514cb090482852064d23df6491c9a8ae28be0d990a3c71"
}
],
"fineGrainedLineages": []
}
}
},
{
"entityType": "query",
"entityUrn": "urn:li:query:8b3a079997d562bdb1b14eb500e6123c4b00bb0263565dcaa0b66170e72602a1",
"entityUrn": "urn:li:query:3e85e6f353c7fa33d6514cb090482852064d23df6491c9a8ae28be0d990a3c71",
"changeType": "UPSERT",
"aspectName": "queryProperties",
"aspect": {
"json": {
"statement": {
"value": "create table foo as select a, 2*b as b from bar",
"value": "create table foo_session3 as select * from foo",
"language": "SQL"
},
"source": "SYSTEM",
@ -227,25 +229,26 @@
"actor": "urn:li:corpuser:_ingestion"
},
"lastModified": {
"time": 0,
"time": 1707182625000,
"actor": "urn:li:corpuser:_ingestion"
}
},
"dataPlatform": "urn:li:dataPlatform:redshift"
}
}
},
{
"entityType": "query",
"entityUrn": "urn:li:query:8b3a079997d562bdb1b14eb500e6123c4b00bb0263565dcaa0b66170e72602a1",
"entityUrn": "urn:li:query:3e85e6f353c7fa33d6514cb090482852064d23df6491c9a8ae28be0d990a3c71",
"changeType": "UPSERT",
"aspectName": "querySubjects",
"aspect": {
"json": {
"subjects": [
{
"entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD)"
"entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo_session3,PROD)"
},
{
"entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD)"
"entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD)"
}
]
}

View File

@ -69,7 +69,8 @@
"lastModified": {
"time": 1707182625000,
"actor": "urn:li:corpuser:_ingestion"
}
},
"dataPlatform": "urn:li:dataPlatform:redshift"
}
}
},

View File

@ -4,12 +4,14 @@ from datetime import datetime, timezone
import pytest
from freezegun import freeze_time
import datahub.emitter.mce_builder as builder
from datahub.metadata.urns import CorpUserUrn, DatasetUrn
from datahub.sql_parsing.sql_parsing_aggregator_v2 import (
from datahub.sql_parsing.sql_parsing_aggregator import (
KnownQueryLineageInfo,
QueryLogSetting,
SqlParsingAggregator,
)
from datahub.sql_parsing.sql_parsing_common import QueryType
from datahub.sql_parsing.sqlglot_lineage import ColumnLineageInfo, ColumnRef
from tests.test_helpers import mce_helpers
RESOURCE_DIR = pathlib.Path(__file__).parent / "aggregator_goldens"
@ -24,8 +26,6 @@ def _ts(ts: int) -> datetime:
def test_basic_lineage(pytestconfig: pytest.Config) -> None:
aggregator = SqlParsingAggregator(
platform="redshift",
platform_instance=None,
env=builder.DEFAULT_ENV,
generate_lineage=True,
generate_usage_statistics=False,
generate_operations=False,
@ -50,8 +50,6 @@ def test_basic_lineage(pytestconfig: pytest.Config) -> None:
def test_overlapping_inserts(pytestconfig: pytest.Config) -> None:
aggregator = SqlParsingAggregator(
platform="redshift",
platform_instance=None,
env=builder.DEFAULT_ENV,
generate_lineage=True,
generate_usage_statistics=False,
generate_operations=False,
@ -83,8 +81,6 @@ def test_overlapping_inserts(pytestconfig: pytest.Config) -> None:
def test_temp_table(pytestconfig: pytest.Config) -> None:
aggregator = SqlParsingAggregator(
platform="redshift",
platform_instance=None,
env=builder.DEFAULT_ENV,
generate_lineage=True,
generate_usage_statistics=False,
generate_operations=False,
@ -136,8 +132,6 @@ def test_temp_table(pytestconfig: pytest.Config) -> None:
def test_aggregate_operations(pytestconfig: pytest.Config) -> None:
aggregator = SqlParsingAggregator(
platform="redshift",
platform_instance=None,
env=builder.DEFAULT_ENV,
generate_lineage=False,
generate_queries=False,
generate_usage_statistics=False,
@ -181,8 +175,6 @@ def test_aggregate_operations(pytestconfig: pytest.Config) -> None:
def test_view_lineage(pytestconfig: pytest.Config) -> None:
aggregator = SqlParsingAggregator(
platform="redshift",
platform_instance=None,
env=builder.DEFAULT_ENV,
generate_lineage=True,
generate_usage_statistics=False,
generate_operations=False,
@ -215,3 +207,113 @@ def test_view_lineage(pytestconfig: pytest.Config) -> None:
outputs=mcps,
golden_path=RESOURCE_DIR / "test_view_lineage.json",
)
@freeze_time(FROZEN_TIME)
def test_known_lineage_mapping(pytestconfig: pytest.Config) -> None:
aggregator = SqlParsingAggregator(
platform="redshift",
generate_lineage=True,
generate_usage_statistics=False,
generate_operations=False,
)
aggregator.add_known_lineage_mapping(
upstream_urn=DatasetUrn("redshift", "dev.public.bar").urn(),
downstream_urn=DatasetUrn("redshift", "dev.public.foo").urn(),
)
aggregator.add_known_lineage_mapping(
upstream_urn=DatasetUrn("s3", "bucket1/key1").urn(),
downstream_urn=DatasetUrn("redshift", "dev.public.bar").urn(),
)
aggregator.add_known_lineage_mapping(
upstream_urn=DatasetUrn("redshift", "dev.public.foo").urn(),
downstream_urn=DatasetUrn("s3", "bucket2/key2").urn(),
)
mcps = list(aggregator.gen_metadata())
mce_helpers.check_goldens_stream(
pytestconfig,
outputs=mcps,
golden_path=RESOURCE_DIR / "test_known_lineage_mapping.json",
)
@freeze_time(FROZEN_TIME)
def test_column_lineage_deduplication(pytestconfig: pytest.Config) -> None:
aggregator = SqlParsingAggregator(
platform="redshift",
generate_lineage=True,
generate_usage_statistics=False,
generate_operations=False,
)
aggregator.add_observed_query(
query="/* query 1 */ insert into foo (a, b, c) select a, b, c from bar",
default_db="dev",
default_schema="public",
)
aggregator.add_observed_query(
query="/* query 2 */ insert into foo (a, b) select a, b from bar",
default_db="dev",
default_schema="public",
)
mcps = list(aggregator.gen_metadata())
# In this case, the lineage for a and b is attributed to query 2, and
# the lineage for c is attributed to query 1. Note that query 1 does
# not get any credit for a and b, as they are already covered by query 2,
# which came later and hence has higher precedence.
mce_helpers.check_goldens_stream(
pytestconfig,
outputs=mcps,
golden_path=RESOURCE_DIR / "test_column_lineage_deduplication.json",
)
@freeze_time(FROZEN_TIME)
def test_add_known_query_lineage(pytestconfig: pytest.Config) -> None:
aggregator = SqlParsingAggregator(
platform="redshift",
generate_lineage=True,
generate_usage_statistics=False,
generate_operations=True,
)
downstream_urn = DatasetUrn("redshift", "dev.public.foo").urn()
upstream_urn = DatasetUrn("redshift", "dev.public.bar").urn()
known_query_lineage = KnownQueryLineageInfo(
query_text="insert into foo (a, b, c) select a, b, c from bar",
downstream=downstream_urn,
upstreams=[upstream_urn],
column_lineage=[
ColumnLineageInfo(
downstream=ColumnRef(table=downstream_urn, column="a"),
upstreams=[ColumnRef(table=upstream_urn, column="a")],
),
ColumnLineageInfo(
downstream=ColumnRef(table=downstream_urn, column="b"),
upstreams=[ColumnRef(table=upstream_urn, column="b")],
),
ColumnLineageInfo(
downstream=ColumnRef(table=downstream_urn, column="c"),
upstreams=[ColumnRef(table=upstream_urn, column="c")],
),
],
timestamp=_ts(20),
query_type=QueryType.INSERT,
)
aggregator.add_known_query_lineage(known_query_lineage)
mcps = list(aggregator.gen_metadata())
mce_helpers.check_goldens_stream(
pytestconfig,
outputs=mcps,
golden_path=RESOURCE_DIR / "test_add_known_query_lineage.json",
)