mirror of
https://github.com/datahub-project/datahub.git
synced 2025-09-07 08:13:06 +00:00
feat(bigquery): Ingest lineage metadata from GCP logs. (#3389)
This commit is contained in:
parent
aa52d98994
commit
e717d6a937
@ -13,6 +13,7 @@ This plugin extracts the following:
|
|||||||
- Metadata for databases, schemas, and tables
|
- Metadata for databases, schemas, and tables
|
||||||
- Column types associated with each table
|
- Column types associated with each table
|
||||||
- Table, row, and column statistics via optional [SQL profiling](./sql_profiles.md)
|
- Table, row, and column statistics via optional [SQL profiling](./sql_profiles.md)
|
||||||
|
- Table level lineage.
|
||||||
|
|
||||||
:::tip
|
:::tip
|
||||||
|
|
||||||
@ -44,7 +45,7 @@ Note that a `.` is used to denote nested fields in the YAML recipe.
|
|||||||
As a SQL-based service, the Athena integration is also supported by our SQL profiler. See [here](./sql_profiles.md) for more details on configuration.
|
As a SQL-based service, the Athena integration is also supported by our SQL profiler. See [here](./sql_profiles.md) for more details on configuration.
|
||||||
|
|
||||||
| Field | Required | Default | Description |
|
| Field | Required | Default | Description |
|
||||||
| --------------------------- | -------- | ------------ | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
|
| --------------------------- | -------- | ------------------------------------------------------------------------ | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
|
||||||
| `project_id` | | Autodetected | Project ID to ingest from. If not specified, will infer from environment. |
|
| `project_id` | | Autodetected | Project ID to ingest from. If not specified, will infer from environment. |
|
||||||
| `env` | | `"PROD"` | Environment to use in namespace when constructing URNs. |
|
| `env` | | `"PROD"` | Environment to use in namespace when constructing URNs. |
|
||||||
| `options.<option>` | | | Any options specified here will be passed to SQLAlchemy's `create_engine` as kwargs.<br />See https://docs.sqlalchemy.org/en/14/core/engines.html#sqlalchemy.create_engine for details. |
|
| `options.<option>` | | | Any options specified here will be passed to SQLAlchemy's `create_engine` as kwargs.<br />See https://docs.sqlalchemy.org/en/14/core/engines.html#sqlalchemy.create_engine for details. |
|
||||||
@ -59,6 +60,20 @@ As a SQL-based service, the Athena integration is also supported by our SQL prof
|
|||||||
| `view_pattern.ignoreCase` | | `True` | Whether to ignore case sensitivity during pattern matching. |
|
| `view_pattern.ignoreCase` | | `True` | Whether to ignore case sensitivity during pattern matching. |
|
||||||
| `include_tables` | | `True` | Whether tables should be ingested. |
|
| `include_tables` | | `True` | Whether tables should be ingested. |
|
||||||
| `include_views` | | `True` | Whether views should be ingested. |
|
| `include_views` | | `True` | Whether views should be ingested. |
|
||||||
|
| `include_table_lineage` | | `True` | Whether table level lineage should be ingested and processed. |
|
||||||
|
| `max_query_duration` | | `15` | A time buffer in minutes to adjust start_time and end_time while querying Bigquery audit logs. |
|
||||||
|
| `start_time` | | Start of last full day in UTC (or hour, depending on `bucket_duration`) | Earliest time of lineage data to consider. |
|
||||||
|
| `end_time` | | End of last full day in UTC (or hour, depending on `bucket_duration`) | Latest time of lineage data to consider. |
|
||||||
|
| `extra_client_options` | | | Additional options to pass to `google.cloud.logging_v2.client.Client`. |
|
||||||
|
|
||||||
|
The following parameters are only relevant if include_table_lineage is set to true:
|
||||||
|
|
||||||
|
- max_query_duration
|
||||||
|
- start_time
|
||||||
|
- end_time
|
||||||
|
- extra_client_options
|
||||||
|
|
||||||
|
Note: Since bigquery source also supports dataset level lineage, the auth client will require additional permissions to be able to access the google audit logs. Refer the permissions section in bigquery-usage section below which also accesses the audit logs.
|
||||||
|
|
||||||
## Compatibility
|
## Compatibility
|
||||||
|
|
||||||
@ -88,7 +103,8 @@ Note: the client must have one of the following OAuth scopes, and should be auth
|
|||||||
|
|
||||||
:::note
|
:::note
|
||||||
|
|
||||||
This source only does usage statistics. To get the tables, views, and schemas in your BigQuery project, use the `bigquery` source described above.
|
1. This source only does usage statistics. To get the tables, views, and schemas in your BigQuery project, use the `bigquery` source described above.
|
||||||
|
2. Depending on the compliance policies setup for the bigquery instance, sometimes logging.read permission is not sufficient. In that case, use either admin or private log viewer permission.
|
||||||
|
|
||||||
:::
|
:::
|
||||||
|
|
||||||
|
@ -1,18 +1,65 @@
|
|||||||
|
import collections
|
||||||
import functools
|
import functools
|
||||||
from typing import Any, Optional, Tuple
|
import logging
|
||||||
|
from datetime import timedelta
|
||||||
|
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, Union
|
||||||
from unittest.mock import patch
|
from unittest.mock import patch
|
||||||
|
|
||||||
# This import verifies that the dependencies are available.
|
# This import verifies that the dependencies are available.
|
||||||
import pybigquery # noqa: F401
|
import pybigquery # noqa: F401
|
||||||
import pybigquery.sqlalchemy_bigquery
|
import pybigquery.sqlalchemy_bigquery
|
||||||
|
from google.cloud.logging_v2.client import Client as GCPLoggingClient
|
||||||
from sqlalchemy.engine.reflection import Inspector
|
from sqlalchemy.engine.reflection import Inspector
|
||||||
|
|
||||||
|
from datahub.configuration.time_window_config import BaseTimeWindowConfig
|
||||||
|
from datahub.emitter import mce_builder
|
||||||
|
from datahub.emitter.mcp import MetadataChangeProposalWrapper
|
||||||
|
from datahub.ingestion.api.workunit import MetadataWorkUnit
|
||||||
from datahub.ingestion.source.sql.sql_common import (
|
from datahub.ingestion.source.sql.sql_common import (
|
||||||
SQLAlchemyConfig,
|
SQLAlchemyConfig,
|
||||||
SQLAlchemySource,
|
SQLAlchemySource,
|
||||||
|
SqlWorkUnit,
|
||||||
make_sqlalchemy_type,
|
make_sqlalchemy_type,
|
||||||
register_custom_type,
|
register_custom_type,
|
||||||
)
|
)
|
||||||
|
from datahub.ingestion.source.usage.bigquery_usage import (
|
||||||
|
BQ_DATETIME_FORMAT,
|
||||||
|
GCP_LOGGING_PAGE_SIZE,
|
||||||
|
AuditLogEntry,
|
||||||
|
BigQueryTableRef,
|
||||||
|
QueryEvent,
|
||||||
|
)
|
||||||
|
from datahub.metadata.com.linkedin.pegasus2avro.metadata.key import DatasetKey
|
||||||
|
from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import DatasetSnapshot
|
||||||
|
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
|
||||||
|
from datahub.metadata.schema_classes import (
|
||||||
|
ChangeTypeClass,
|
||||||
|
DatasetLineageTypeClass,
|
||||||
|
UpstreamClass,
|
||||||
|
UpstreamLineageClass,
|
||||||
|
)
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
BQ_FILTER_RULE_TEMPLATE = """
|
||||||
|
protoPayload.serviceName="bigquery.googleapis.com"
|
||||||
|
AND
|
||||||
|
(
|
||||||
|
(
|
||||||
|
protoPayload.methodName="jobservice.jobcompleted"
|
||||||
|
AND
|
||||||
|
protoPayload.serviceData.jobCompletedEvent.eventName="query_job_completed"
|
||||||
|
AND
|
||||||
|
protoPayload.serviceData.jobCompletedEvent.job.jobStatus.state="DONE"
|
||||||
|
AND NOT
|
||||||
|
protoPayload.serviceData.jobCompletedEvent.job.jobStatus.error.code:*
|
||||||
|
)
|
||||||
|
)
|
||||||
|
AND
|
||||||
|
timestamp >= "{start_time}"
|
||||||
|
AND
|
||||||
|
timestamp < "{end_time}"
|
||||||
|
""".strip()
|
||||||
|
|
||||||
# The existing implementation of this method can be found here:
|
# The existing implementation of this method can be found here:
|
||||||
# https://github.com/googleapis/python-bigquery-sqlalchemy/blob/e0f1496c99dd627e0ed04a0c4e89ca5b14611be2/pybigquery/sqlalchemy_bigquery.py#L967-L974.
|
# https://github.com/googleapis/python-bigquery-sqlalchemy/blob/e0f1496c99dd627e0ed04a0c4e89ca5b14611be2/pybigquery/sqlalchemy_bigquery.py#L967-L974.
|
||||||
@ -35,9 +82,13 @@ register_custom_type(GEOGRAPHY)
|
|||||||
assert pybigquery.sqlalchemy_bigquery._type_map
|
assert pybigquery.sqlalchemy_bigquery._type_map
|
||||||
|
|
||||||
|
|
||||||
class BigQueryConfig(SQLAlchemyConfig):
|
class BigQueryConfig(BaseTimeWindowConfig, SQLAlchemyConfig):
|
||||||
scheme = "bigquery"
|
scheme: str = "bigquery"
|
||||||
project_id: Optional[str] = None
|
project_id: Optional[str] = None
|
||||||
|
# extra_client_options, include_table_lineage and max_query_duration are relevant only when computing the lineage.
|
||||||
|
extra_client_options: Dict[str, Any] = {}
|
||||||
|
include_table_lineage: Optional[bool] = True
|
||||||
|
max_query_duration: timedelta = timedelta(minutes=15)
|
||||||
|
|
||||||
def get_sql_alchemy_url(self):
|
def get_sql_alchemy_url(self):
|
||||||
if self.project_id:
|
if self.project_id:
|
||||||
@ -49,21 +100,172 @@ class BigQueryConfig(SQLAlchemyConfig):
|
|||||||
|
|
||||||
|
|
||||||
class BigQuerySource(SQLAlchemySource):
|
class BigQuerySource(SQLAlchemySource):
|
||||||
|
config: BigQueryConfig
|
||||||
|
lineage_metadata: Optional[Dict[str, Set[str]]] = None
|
||||||
|
|
||||||
def __init__(self, config, ctx):
|
def __init__(self, config, ctx):
|
||||||
super().__init__(config, ctx, "bigquery")
|
super().__init__(config, ctx, "bigquery")
|
||||||
|
|
||||||
|
def _compute_big_query_lineage(self) -> None:
|
||||||
|
if self.config.include_table_lineage:
|
||||||
|
try:
|
||||||
|
_clients: List[GCPLoggingClient] = self._make_bigquery_client()
|
||||||
|
log_entries: Iterable[AuditLogEntry] = self._get_bigquery_log_entries(
|
||||||
|
_clients
|
||||||
|
)
|
||||||
|
parsed_entries: Iterable[QueryEvent] = self._parse_bigquery_log_entries(
|
||||||
|
log_entries
|
||||||
|
)
|
||||||
|
self.lineage_metadata = self._create_lineage_map(parsed_entries)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(
|
||||||
|
"Error computing lineage information using GCP logs.",
|
||||||
|
e,
|
||||||
|
)
|
||||||
|
|
||||||
|
def _make_bigquery_client(self) -> List[GCPLoggingClient]:
|
||||||
|
# See https://github.com/googleapis/google-cloud-python/issues/2674 for
|
||||||
|
# why we disable gRPC here.
|
||||||
|
client_options = self.config.extra_client_options.copy()
|
||||||
|
client_options["_use_grpc"] = False
|
||||||
|
project_id = self.config.project_id
|
||||||
|
if project_id is not None:
|
||||||
|
return [GCPLoggingClient(**client_options, project=project_id)]
|
||||||
|
else:
|
||||||
|
return [GCPLoggingClient(**client_options)]
|
||||||
|
|
||||||
|
def _get_bigquery_log_entries(
|
||||||
|
self, clients: List[GCPLoggingClient]
|
||||||
|
) -> Iterable[AuditLogEntry]:
|
||||||
|
# Add a buffer to start and end time to account for delays in logging events.
|
||||||
|
filter = BQ_FILTER_RULE_TEMPLATE.format(
|
||||||
|
start_time=(
|
||||||
|
self.config.start_time - self.config.max_query_duration
|
||||||
|
).strftime(BQ_DATETIME_FORMAT),
|
||||||
|
end_time=(self.config.end_time + self.config.max_query_duration).strftime(
|
||||||
|
BQ_DATETIME_FORMAT
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
logger.debug("Start loading log entries from BigQuery")
|
||||||
|
for client in clients:
|
||||||
|
yield from client.list_entries(
|
||||||
|
filter_=filter, page_size=GCP_LOGGING_PAGE_SIZE
|
||||||
|
)
|
||||||
|
logger.debug("finished loading log entries from BigQuery")
|
||||||
|
|
||||||
|
# Currently we only parse JobCompleted events but in future we would want to parse other
|
||||||
|
# events to also create field level lineage.
|
||||||
|
def _parse_bigquery_log_entries(
|
||||||
|
self, entries: Iterable[AuditLogEntry]
|
||||||
|
) -> Iterable[QueryEvent]:
|
||||||
|
for entry in entries:
|
||||||
|
event: Optional[QueryEvent] = None
|
||||||
|
try:
|
||||||
|
if QueryEvent.can_parse_entry(entry):
|
||||||
|
event = QueryEvent.from_entry(entry)
|
||||||
|
else:
|
||||||
|
raise RuntimeError("Unable to parse log entry as QueryEvent.")
|
||||||
|
except Exception as e:
|
||||||
|
self.report.report_failure(
|
||||||
|
f"{entry.log_name}-{entry.insert_id}",
|
||||||
|
f"unable to parse log entry: {entry!r}",
|
||||||
|
)
|
||||||
|
logger.error("Unable to parse GCP log entry.", e)
|
||||||
|
if event is not None:
|
||||||
|
yield event
|
||||||
|
|
||||||
|
def _create_lineage_map(self, entries: Iterable[QueryEvent]) -> Dict[str, Set[str]]:
|
||||||
|
lineage_map: Dict[str, Set[str]] = collections.defaultdict(set)
|
||||||
|
for e in entries:
|
||||||
|
if (
|
||||||
|
e.destinationTable is None
|
||||||
|
or e.destinationTable.is_anonymous()
|
||||||
|
or not e.referencedTables
|
||||||
|
):
|
||||||
|
continue
|
||||||
|
for ref_table in e.referencedTables:
|
||||||
|
destination_table_str = str(e.destinationTable)
|
||||||
|
ref_table_str = str(ref_table)
|
||||||
|
if ref_table_str != destination_table_str:
|
||||||
|
lineage_map[destination_table_str].add(ref_table_str)
|
||||||
|
return lineage_map
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def create(cls, config_dict, ctx):
|
def create(cls, config_dict, ctx):
|
||||||
config = BigQueryConfig.parse_obj(config_dict)
|
config = BigQueryConfig.parse_obj(config_dict)
|
||||||
return cls(config, ctx)
|
return cls(config, ctx)
|
||||||
|
|
||||||
def get_workunits(self):
|
# Overriding the get_workunits method to first compute the workunits using the base SQLAlchemySource
|
||||||
|
# and then computing lineage information only for those datasets that were ingested. This helps us to
|
||||||
|
# maintain a clear separation between SQLAlchemySource and the BigQuerySource. Also, this way we honor
|
||||||
|
# that flags like schema and table patterns for lineage computation as well.
|
||||||
|
def get_workunits(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit]]:
|
||||||
|
# only compute the lineage if the object is none. This is is safety check in case if in future refactoring we
|
||||||
|
# end up computing lineage multiple times.
|
||||||
|
if self.lineage_metadata is None:
|
||||||
|
self._compute_big_query_lineage()
|
||||||
with patch.dict(
|
with patch.dict(
|
||||||
"pybigquery.sqlalchemy_bigquery._type_map",
|
"pybigquery.sqlalchemy_bigquery._type_map",
|
||||||
{"GEOGRAPHY": GEOGRAPHY},
|
{"GEOGRAPHY": GEOGRAPHY},
|
||||||
clear=False,
|
clear=False,
|
||||||
):
|
):
|
||||||
return super().get_workunits()
|
for wu in super().get_workunits():
|
||||||
|
yield wu
|
||||||
|
if (
|
||||||
|
isinstance(wu, SqlWorkUnit)
|
||||||
|
and isinstance(wu.metadata, MetadataChangeEvent)
|
||||||
|
and isinstance(wu.metadata.proposedSnapshot, DatasetSnapshot)
|
||||||
|
):
|
||||||
|
lineage_mcp = self.get_lineage_mcp(wu.metadata.proposedSnapshot.urn)
|
||||||
|
if lineage_mcp is not None:
|
||||||
|
lineage_wu = MetadataWorkUnit(
|
||||||
|
id=f"{self.platform}-{lineage_mcp.entityUrn}-{lineage_mcp.aspectName}",
|
||||||
|
mcp=lineage_mcp,
|
||||||
|
)
|
||||||
|
yield lineage_wu
|
||||||
|
self.report.report_workunit(lineage_wu)
|
||||||
|
|
||||||
|
def get_lineage_mcp(
|
||||||
|
self, dataset_urn: str
|
||||||
|
) -> Optional[MetadataChangeProposalWrapper]:
|
||||||
|
dataset_key: Optional[DatasetKey] = mce_builder.dataset_urn_to_key(dataset_urn)
|
||||||
|
if dataset_key is None:
|
||||||
|
return None
|
||||||
|
project_id, dataset_name, tablename = dataset_key.name.split(".")
|
||||||
|
bq_table = BigQueryTableRef(project_id, dataset_name, tablename)
|
||||||
|
assert self.lineage_metadata is not None
|
||||||
|
if str(bq_table) in self.lineage_metadata:
|
||||||
|
upstream_list: List[UpstreamClass] = []
|
||||||
|
# Sorting the list of upstream lineage events in order to avoid creating multiple aspects in backend
|
||||||
|
# even if the lineage is same but the order is different.
|
||||||
|
for ref_table in sorted(self.lineage_metadata[str(bq_table)]):
|
||||||
|
upstream_table = BigQueryTableRef.from_string_name(ref_table)
|
||||||
|
upstream_table_class = UpstreamClass(
|
||||||
|
mce_builder.make_dataset_urn(
|
||||||
|
self.platform,
|
||||||
|
"{project}.{database}.{table}".format(
|
||||||
|
project=upstream_table.project,
|
||||||
|
database=upstream_table.dataset,
|
||||||
|
table=upstream_table.table,
|
||||||
|
),
|
||||||
|
self.config.env,
|
||||||
|
),
|
||||||
|
DatasetLineageTypeClass.TRANSFORMED,
|
||||||
|
)
|
||||||
|
upstream_list.append(upstream_table_class)
|
||||||
|
|
||||||
|
if upstream_list:
|
||||||
|
upstream_lineage = UpstreamLineageClass(upstreams=upstream_list)
|
||||||
|
mcp = MetadataChangeProposalWrapper(
|
||||||
|
entityType="dataset",
|
||||||
|
changeType=ChangeTypeClass.UPSERT,
|
||||||
|
entityUrn=dataset_urn,
|
||||||
|
aspectName="upstreamLineage",
|
||||||
|
aspect=upstream_lineage,
|
||||||
|
)
|
||||||
|
return mcp
|
||||||
|
return None
|
||||||
|
|
||||||
def prepare_profiler_args(self, schema: str, table: str) -> dict:
|
def prepare_profiler_args(self, schema: str, table: str) -> dict:
|
||||||
self.config: BigQueryConfig
|
self.config: BigQueryConfig
|
||||||
|
@ -417,7 +417,6 @@ class BigQueryUsageSource(Source):
|
|||||||
str(event.resource),
|
str(event.resource),
|
||||||
"failed to match table read event with job; try increasing `query_log_delay` or `max_query_duration`",
|
"failed to match table read event with job; try increasing `query_log_delay` or `max_query_duration`",
|
||||||
)
|
)
|
||||||
|
|
||||||
yield event
|
yield event
|
||||||
|
|
||||||
def _aggregate_enriched_read_events(
|
def _aggregate_enriched_read_events(
|
||||||
|
Loading…
x
Reference in New Issue
Block a user