feat(ingestion): improve logging, docs for bigquery, snowflake, redshift (#4344)

This commit is contained in:
Aseem Bansal 2022-03-14 21:20:29 +05:30 committed by GitHub
parent bb413beb24
commit 4bcc2b3d12
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
40 changed files with 953 additions and 457 deletions

View File

@ -211,6 +211,8 @@ mypy_stubs = {
"types-click==0.1.12",
"boto3-stubs[s3,glue,sagemaker]",
"types-tabulate",
# avrogen package requires this
"types-pytz",
}
base_dev_requirements = {
@ -223,7 +225,7 @@ base_dev_requirements = {
"flake8>=3.8.3",
"flake8-tidy-imports>=4.3.0",
"isort>=5.7.0",
"mypy>=0.920",
"mypy>=0.920,<0.940",
# pydantic 1.8.2 is incompatible with mypy 0.910.
# See https://github.com/samuelcolvin/pydantic/pull/3175#issuecomment-995382910.
"pydantic>=1.9.0",

View File

@ -1,14 +1,17 @@
# BigQuery
To get all metadata from BigQuery you need to use two plugins `bigquery` and `bigquery-usage`. Both of them are described in this page. These will require 2 separate recipes. We understand this is not ideal and we plan to make this easier in the future.
For context on getting started with ingestion, check out our [metadata ingestion guide](../README.md).
## Setup
## `bigquery`
### Setup
To install this plugin, run `pip install 'acryl-datahub[bigquery]'`.
## Prerequisites
### Create a datahub profile in GCP:
1. Create a custom role for datahub (https://cloud.google.com/iam/docs/creating-custom-roles#creating_a_custom_role)
### Prerequisites
#### Create a datahub profile in GCP
1. Create a custom role for datahub as per [BigQuery docs](https://cloud.google.com/iam/docs/creating-custom-roles#creating_a_custom_role)
2. Grant the following permissions to this role:
```
bigquery.datasets.get
@ -27,9 +30,9 @@ To install this plugin, run `pip install 'acryl-datahub[bigquery]'`.
logging.logEntries.list # Needs for lineage generation
resourcemanager.projects.get
```
### Create a service account:
#### Create a service account
1. Setup a ServiceAccount (https://cloud.google.com/iam/docs/creating-managing-service-accounts#iam-service-accounts-create-console)
1. Setup a ServiceAccount as per [BigQuery docs](https://cloud.google.com/iam/docs/creating-managing-service-accounts#iam-service-accounts-create-console)
and assign the previously created role to this service account.
2. Download a service account JSON keyfile.
Example credential file:
@ -64,7 +67,7 @@ and assign the previously created role to this service account.
client_id: "123456678890"
```
## Capabilities
### Capabilities
This plugin extracts the following:
@ -81,11 +84,11 @@ This plugin extracts the following:
:::tip
You can also get fine-grained usage statistics for BigQuery using the `bigquery-usage` source described below.
You can also get fine-grained usage statistics for BigQuery using the `bigquery-usage` source described [below](#bigquery-usage-plugin).
:::
## Quickstart recipe
### Quickstart recipe
Check out the following recipe to get started with ingestion! See [below](#config-details) for full configuration options.
@ -102,7 +105,7 @@ sink:
# sink configs
```
## Config details
### Config details
Note that a `.` is used to denote nested fields in the YAML recipe.
@ -155,7 +158,7 @@ Note: the bigquery_audit_metadata_datasets parameter receives a list of datasets
Note: Since bigquery source also supports dataset level lineage, the auth client will require additional permissions to be able to access the google audit logs. Refer the permissions section in bigquery-usage section below which also accesses the audit logs.
## Profiling
### Profiling
Profiling can profile normal/partitioned and sharded tables as well but due to performance reasons, we only profile the latest partition for Partitioned tables and the latest shard for sharded tables.
If limit/offset parameter is set or partitioning partitioned or sharded table Great Expectation (the profiling framework we use) needs to create temporary
@ -175,11 +178,11 @@ Due to performance reasons, we only profile the latest partition for Partitioned
You can set partition explicitly with `partition.partition_datetime` property if you want. (partition will be applied to all partitioned tables)
:::
# BigQuery Usage Stats
## `bigquery-usage`
For context on getting started with ingestion, check out our [metadata ingestion guide](../README.md).
## Setup
### Setup
To install this plugin, run `pip install 'acryl-datahub[bigquery-usage]'`.
@ -194,7 +197,7 @@ The Google Identity must have one of the following OAuth scopes granted to it:
And should be authorized on all projects you'd like to ingest usage stats from.
## Capabilities
### Capabilities
This plugin extracts the following:
@ -208,7 +211,7 @@ This plugin extracts the following:
:::
## Quickstart recipe
### Quickstart recipe
Check out the following recipe to get started with ingestion! See [below](#config-details) for full configuration options.
@ -230,7 +233,7 @@ sink:
# sink configs
```
## Config details
### Config details
Note that a `.` is used to denote nested fields in the YAML recipe.

View File

@ -1,8 +1,12 @@
# Redshift
To get all metadata from BigQuery you need to use two plugins `redshift` and `redshift-usage`. Both of them are described in this page. These will require 2 separate recipes. We understand this is not ideal and we plan to make this easier in the future.
For context on getting started with ingestion, check out our [metadata ingestion guide](../README.md).
## Setup
## `redshift`
### Setup
To install this plugin, run `pip install 'acryl-datahub[redshift]'`.
@ -19,7 +23,7 @@ Giving a user unrestricted access to system tables gives the user visibility to
:::
## Capabilities
### Capabilities
This plugin extracts the following:
@ -41,7 +45,7 @@ You can also get fine-grained usage statistics for Redshift using the `redshift-
| Data Containers | ✔️ | |
| Data Domains | ✔️ | [link](../../docs/domains.md) |
## Quickstart recipe
### Quickstart recipe
Check out the following recipe to get started with ingestion! See [below](#config-details) for full configuration options.
@ -93,7 +97,7 @@ sink:
</details>
## Config details
### Config details
Like all SQL-based sources, the Redshift integration supports:
- Stale Metadata Deletion: See [here](./stateful_ingestion.md) for more details on configuration.
@ -130,11 +134,11 @@ Note that a `.` is used to denote nested fields in the YAML recipe.
| `domain.domain_key.deny` | | | List of regex patterns for tables/schemas to not assign domain_key. There can be multiple domain key specified. |
| `domain.domain_key.ignoreCase` | | `True` | Whether to ignore case sensitivity during pattern matching.There can be multiple domain key specified. |
## Lineage
### Lineage
There are multiple lineage collector implementations as Redshift does not support table lineage out of the box.
### stl_scan_based
#### stl_scan_based
The stl_scan based collector uses Redshift's [stl_insert](https://docs.aws.amazon.com/redshift/latest/dg/r_STL_INSERT.html) and [stl_scan](https://docs.aws.amazon.com/redshift/latest/dg/r_STL_SCAN.html) system tables to
discover lineage between tables.
Pros:
@ -145,7 +149,7 @@ Cons:
- Does not work with Spectrum/external tables because those scans do not show up in stl_scan table.
- If a table is depending on a view then the view won't be listed as dependency. Instead the table will be connected with the view's dependencies.
### sql_based
#### sql_based
The sql_based based collector uses Redshift's [stl_insert](https://docs.aws.amazon.com/redshift/latest/dg/r_STL_INSERT.html) to discover all the insert queries
and uses sql parsing to discover the dependecies.
@ -157,7 +161,7 @@ Cons:
- Slow.
- Less reliable as the query parser can fail on certain queries
### mixed
#### mixed
Using both collector above and first applying the sql based and then the stl_scan based one.
Pros:
@ -169,10 +173,13 @@ Cons:
- Slow
- May be incorrect at times as the query parser can fail on certain queries
# Note
- The redshift stl redshift tables which are used for getting data lineage only retain approximately two to five days of log history. This means you cannot extract lineage from queries issued outside that window.
:::note
# Redshift Usage Stats
The redshift stl redshift tables which are used for getting data lineage only retain approximately two to five days of log history. This means you cannot extract lineage from queries issued outside that window.
:::
## `redshift-usage`
This plugin extracts usage statistics for datasets in Amazon Redshift. For context on getting started with ingestion, check out our [metadata ingestion guide](../README.md).
@ -187,10 +194,10 @@ To grant access this plugin for all system tables, please alter your datahub Red
ALTER USER datahub_user WITH SYSLOG ACCESS UNRESTRICTED;
```
## Setup
### Setup
To install this plugin, run `pip install 'acryl-datahub[redshift-usage]'`.
## Capabilities
### Capabilities
| Capability | Status | Details |
| -----------| ------ | ---- |
@ -210,7 +217,7 @@ This source only does usage statistics. To get the tables, views, and schemas in
:::
## Quickstart recipe
### Quickstart recipe
Check out the following recipe to get started with ingestion! See [below](#config-details) for full configuration options.
@ -233,7 +240,7 @@ sink:
# sink configs
```
## Config details
### Config details
Note that a `.` is used to denote nested fields in the YAML recipe.
By default, we extract usage stats for the last day, with the recommendation that this source is executed every day.

View File

@ -1,16 +1,22 @@
# Snowflake
To get all metadata from Snowflake you need to use two plugins `snowflake` and `snowflake-usage`. Both of them are described in this page. These will require 2 separate recipes. We understand this is not ideal and we plan to make this easier in the future.
For context on getting started with ingestion, check out our [metadata ingestion guide](../README.md).
## Setup
## `snowflake`
### Setup
To install this plugin, run `pip install 'acryl-datahub[snowflake]'`.
### Prerequisites
In order to execute this source, your Snowflake user will need to have specific privileges granted to it for reading metadata
from your warehouse. You can create a DataHub-specific role, assign it the required privileges, and assign it to a new DataHub user
by executing the following Snowflake commands from a user with the `ACCOUNTADMIN` role:
from your warehouse.
You can use the `provision_role` block in the recipe to grant the requires roles.
If your system admins prefer running the commands themselves then they can follow this guide to create a DataHub-specific role, assign it the required privileges, and assign it to a new DataHub user by executing the following Snowflake commands from a user with the `ACCOUNTADMIN` role:
```sql
create or replace role datahub_role;
@ -36,25 +42,21 @@ grant role datahub_role to user datahub_user;
This represents the bare minimum privileges required to extract databases, schemas, views, tables from Snowflake.
If you plan to enable extraction of table lineage, via the `include_table_lineage` config flag, you'll also need to grant privileges
to access the Snowflake Account Usage views. You can execute the following using the `ACCOUNTADMIN` role to do so:
If you plan to enable extraction of table lineage, via the `include_table_lineage` config flag, you'll need to grant additional privileges. See [snowflake usage prerequisites](#prerequisites-1) as the same privilege is required for this purpose too.
```sql
grant imported privileges on database snowflake to role datahub_role;
```
## Capabilities
### Capabilities
This plugin extracts the following:
- Metadata for databases, schemas, views and tables
- Column types associated with each table
- Table, row, and column statistics via optional [SQL profiling](./sql_profiles.md)
- Table lineage.
- Table lineage
:::tip
You can also get fine-grained usage statistics for Snowflake using the `snowflake-usage` source described below.
You can also get fine-grained usage statistics for Snowflake using the `snowflake-usage` source described [below](#snowflake-usage-plugin).
:::
@ -64,7 +66,7 @@ You can also get fine-grained usage statistics for Snowflake using the `snowflak
| Data Containers | ✔️ | |
| Data Domains | ✔️ | [link](../../docs/domains.md) |
## Quickstart recipe
### Quickstart recipe
Check out the following recipe to get started with ingestion! See [below](#config-details) for full configuration options.
@ -74,20 +76,28 @@ For general pointers on writing and running a recipe, see our [main recipe guide
source:
type: snowflake
config:
provision_role: # Optional
enabled: false
dry_run: true
run_ingestion: false
admin_username: "${SNOWFLAKE_ADMIN_USER}"
admin_password: "${SNOWFLAKE_ADMIN_PASS}"
# Coordinates
host_port: account_name
warehouse: "COMPUTE_WH"
# Credentials
username: user
password: pass
role: "accountadmin"
username: "${SNOWFLAKE_USER}"
password: "${SNOWFLAKE_PASS}"
role: "datahub_role"
sink:
# sink configs
```
## Config details
### Config details
Like all SQL-based sources, the Snowflake integration supports:
- Stale Metadata Deletion: See [here](./stateful_ingestion.md) for more details on configuration.
@ -122,8 +132,8 @@ Note that a `.` is used to denote nested fields in the YAML recipe.
| `view_pattern.ignoreCase` | | `True` | Whether to ignore case sensitivity during pattern matching. |
| `include_tables` | | `True` | Whether tables should be ingested. |
| `include_views` | | `True` | Whether views should be ingested. |
| `include_table_lineage` | | `True` | If enabled, populates the snowflake table-to-table and s3-to-snowflake table lineage. Requires role to be `accountadmin` |
| `include_view_lineage` | | `True` | If enabled, populates the snowflake view->table and table->view lineages (no view->view lineage yet). Requires role to be `accountadmin`, and `include_table_lineage` to be `True`. |
| `include_table_lineage` | | `True` | If enabled, populates the snowflake table-to-table and s3-to-snowflake table lineage. Requires appropriate grants given to the role. |
| `include_view_lineage` | | `True` | If enabled, populates the snowflake view->table and table->view lineages (no view->view lineage yet). Requires appropriate grants given to the role, and `include_table_lineage` to be `True`. |
| `bucket_duration` | | `"DAY"` | Duration to bucket lineage data extraction by. Can be `"DAY"` or `"HOUR"`. |
| `start_time` | | Start of last full day in UTC (or hour, depending on `bucket_duration`) | Earliest time of lineage data to consider. For the bootstrap run, set it as far back in time as possible. |
| `end_time` | | End of last full day in UTC (or hour, depending on `bucket_duration`) | Latest time of lineage data to consider. |
@ -131,41 +141,46 @@ Note that a `.` is used to denote nested fields in the YAML recipe.
| `domain.domain_key.allow` | | | List of regex patterns for tables/schemas to set domain_key domain key (domain_key can be any string like `sales`. There can be multiple domain key specified. |
| `domain.domain_key.deny` | | | List of regex patterns for tables/schemas to not assign domain_key. There can be multiple domain key specified. |
| `domain.domain_key.ignoreCase` | | `True` | Whether to ignore case sensitivity during pattern matching.There can be multiple domain key specified. |
| `provision_role.enabled` | | `False` | Whether provisioning of Snowflake role (used for ingestion) is enabled or not |
| `provision_role.dry_run` | | `False` | If `provision_role` is enabled, whether to dry run the sql commands for system admins to see what sql grant commands would be run without actually running the grant commands |
| `provision_role.drop_role_if_exists` | | `False` | Useful during testing to ensure you have a clean slate role. Not recommended for production use cases |
| `provision_role.run_ingestion` | | `False` | If system admins wish to skip actual ingestion of metadata during testing of the provisioning of `role` |
| `provision_role.admin_role` | | `accountadmin` | The Snowflake role of admin user used for provisioning of the role specified by `role` config. System admins can audit the open source code and decide to use a different role |
| `provision_role.admin_username` | ✅ | | The username to be used for provisioning of role |
| `provision_role.admin_password` | ✅ | | The password to be used for provisioning of role |
## Compatibility
Table lineage requires Snowflake's [Access History](https://docs.snowflake.com/en/user-guide/access-history.html) feature.
# Snowflake Usage Stats
## `snowflake-usage`
For context on getting started with ingestion, check out our [metadata ingestion guide](../README.md).
## Setup
### Setup
To install this plugin, run `pip install 'acryl-datahub[snowflake-usage]'`.
### Prerequisites
In order to execute the snowflake-usage source, your Snowflake user will need to have specific privileges granted to it. Specifically,
you'll need to grant access to the [Account Usage](https://docs.snowflake.com/en/sql-reference/account-usage.html) system tables, using which the DataHub source extracts information. Assuming
you've followed the steps outlined above to create a DataHub-specific User & Role, you'll simply need to execute the following commands
in Snowflake from a user with the `ACCOUNTADMIN` role:
:::note
Table lineage requires Snowflake's [Access History](https://docs.snowflake.com/en/user-guide/access-history.html) feature. The "accountadmin" role has this by default.
The underlying access history views that we use are only available in Snowflake's enterprise edition or higher.
:::
In order to execute the snowflake-usage source, your Snowflake user will need to have specific privileges granted to it. Specifically, you'll need to grant access to the [Account Usage](https://docs.snowflake.com/en/sql-reference/account-usage.html) system tables, using which the DataHub source extracts information. Assuming you've followed the steps outlined in `snowflake` plugin to create a DataHub-specific User & Role, you'll simply need to execute the following commands in Snowflake. This will require a user with the `ACCOUNTADMIN` role (or a role granted the IMPORT SHARES global privilege). Please see [Snowflake docs for more details](https://docs.snowflake.com/en/user-guide/data-share-consumers.html).
```sql
grant imported privileges on database snowflake to role datahub_role;
```
## Capabilities
### Capabilities
This plugin extracts the following:
- Statistics on queries issued and tables and columns accessed (excludes views)
- Aggregation of these statistics into buckets, by day or hour granularity
Note: the user/role must have access to the account usage table. The "accountadmin" role has this by default, and other roles can be [granted this permission](https://docs.snowflake.com/en/sql-reference/account-usage.html#enabling-account-usage-for-other-roles).
Note: the underlying access history views that we use are only available in Snowflake's enterprise edition or higher.
:::note
@ -173,7 +188,7 @@ This source only does usage statistics. To get the tables, views, and schemas in
:::
## Quickstart recipe
### Quickstart recipe
Check out the following recipe to get started with ingestion! See [below](#config-details) for full configuration options.
@ -188,9 +203,9 @@ source:
warehouse: "COMPUTE_WH"
# Credentials
username: user
password: pass
role: "sysadmin"
username: "${SNOWFLAKE_USER}"
password: "${SNOWFLAKE_PASS}"
role: "datahub_role"
# Options
top_n_queries: 10
@ -199,7 +214,7 @@ sink:
# sink configs
```
## Config details
### Config details
Snowflake integration also supports prevention of redundant reruns for the same data. See [here](./stateful_ingestion.md) for more details on configuration.
@ -235,7 +250,7 @@ User's without email address will be ignored from usage if you don't set `email_
# Compatibility
## Compatibility
Coming soon!

View File

@ -136,16 +136,12 @@ class DatahubRestEmitter:
self._session.mount("http://", adapter)
self._session.mount("https://", adapter)
def test_connection(self) -> str:
def test_connection(self) -> dict:
response = self._session.get(f"{self._gms_server}/config")
if response.status_code == 200:
config: dict = response.json()
if config.get("noCode") == "true":
return (
config.get("versions", {})
.get("linkedin/datahub", {})
.get("version", "")
)
return config
else:
# Looks like we either connected to an old GMS or to some other service. Let's see if we can determine which before raising an error

View File

@ -1,5 +1,6 @@
import logging
import os
import platform
import sys
import click
@ -18,6 +19,7 @@ from datahub.cli.telemetry import telemetry as telemetry_cli
from datahub.cli.timeline_cli import timeline
from datahub.configuration import SensitiveError
from datahub.telemetry import telemetry
from datahub.utilities.server_config_util import get_gms_config
logger = logging.getLogger(__name__)
@ -156,4 +158,11 @@ def main(**kwargs):
**kwargs,
)
)
logger.info(
f"DataHub CLI version: {datahub_package.__version__} at {datahub_package.__file__}"
)
logger.info(
f"Python version: {sys.version} at {sys.executable} on {platform.platform()}"
)
logger.info(f"GMS config {get_gms_config()}")
sys.exit(1)

View File

@ -1,7 +1,10 @@
import platform
import sys
from abc import ABCMeta, abstractmethod
from dataclasses import dataclass, field
from typing import Dict, Generic, Iterable, List, TypeVar
import datahub
from datahub.ingestion.api.closeable import Closeable
from datahub.ingestion.api.common import PipelineContext, RecordEnvelope, WorkUnit
from datahub.ingestion.api.report import Report
@ -14,7 +17,11 @@ class SourceReport(Report):
warnings: Dict[str, List[str]] = field(default_factory=dict)
failures: Dict[str, List[str]] = field(default_factory=dict)
cli_version: str = ""
cli_version: str = datahub.nice_version_name()
cli_entry_location: str = datahub.__file__
py_version: str = sys.version
py_exec_path: str = sys.executable
os_details: str = platform.platform()
def report_workunit(self, wu: WorkUnit) -> None:
self.workunits_produced += 1

View File

@ -8,7 +8,6 @@ from typing import Any, Dict, Iterable, List, Optional
import click
from pydantic import validator
import datahub
from datahub.configuration.common import (
ConfigModel,
DynamicTypedConfig,
@ -117,6 +116,12 @@ class Pipeline:
preview_mode=preview_mode,
)
sink_type = self.config.sink.type
sink_class = sink_registry.get(sink_type)
sink_config = self.config.sink.dict().get("config", {})
self.sink: Sink = sink_class.create(sink_config, self.ctx)
logger.debug(f"Sink type:{self.config.sink.type},{sink_class} configured")
source_type = self.config.source.type
source_class = source_registry.get(source_type)
self.source: Source = source_class.create(
@ -124,12 +129,6 @@ class Pipeline:
)
logger.debug(f"Source type:{source_type},{source_class} configured")
sink_type = self.config.sink.type
sink_class = sink_registry.get(sink_type)
sink_config = self.config.sink.dict().get("config", {})
self.sink: Sink = sink_class.create(sink_config, self.ctx)
logger.debug(f"Sink type:{self.config.sink.type},{sink_class} configured")
self.extractor_class = extractor_registry.get(self.config.source.extractor)
self._configure_transforms()
@ -179,7 +178,6 @@ class Pipeline:
callback = LoggingCallback()
extractor: Extractor = self.extractor_class()
self.source.get_report().cli_version = datahub.nice_version_name()
for wu in itertools.islice(
self.source.get_workunits(), 10 if self.preview_mode else None
):

View File

@ -16,6 +16,7 @@ from datahub.metadata.com.linkedin.pegasus2avro.mxe import (
MetadataChangeProposal,
)
from datahub.metadata.com.linkedin.pegasus2avro.usage import UsageAggregation
from datahub.utilities.server_config_util import set_gms_config
logger = logging.getLogger(__name__)
@ -50,7 +51,14 @@ class DatahubRestSink(Sink):
extra_headers=self.config.extra_headers,
ca_certificate_path=self.config.ca_certificate_path,
)
self.report.gms_version = self.emitter.test_connection()
gms_config = self.emitter.test_connection()
self.report.gms_version = (
gms_config.get("versions", {})
.get("linkedin/datahub", {})
.get("version", "")
)
logger.info("Setting gms config")
set_gms_config(gms_config)
self.executor = concurrent.futures.ThreadPoolExecutor(
max_workers=self.config.max_threads
)

View File

@ -20,7 +20,6 @@ from datahub.emitter.mce_builder import (
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.mcp_builder import add_domain_to_entity_wu
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.source import SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.extractor import schema_util
from datahub.ingestion.source.state.checkpoint import Checkpoint
@ -29,6 +28,7 @@ from datahub.ingestion.source.state.stateful_ingestion_base import (
JobId,
StatefulIngestionConfig,
StatefulIngestionConfigBase,
StatefulIngestionReport,
StatefulIngestionSourceBase,
)
from datahub.metadata.com.linkedin.pegasus2avro.common import Status
@ -71,7 +71,7 @@ class KafkaSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigBase):
@dataclass
class KafkaSourceReport(SourceReport):
class KafkaSourceReport(StatefulIngestionReport):
topics_scanned: int = 0
filtered: List[str] = field(default_factory=list)
soft_deleted_stale_entities: List[str] = field(default_factory=list)

View File

@ -30,6 +30,7 @@ from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.sql.sql_common import (
SQLAlchemyConfig,
SQLAlchemySource,
SQLSourceReport,
SqlWorkUnit,
make_sqlalchemy_type,
register_custom_type,
@ -284,13 +285,18 @@ class BigQueryDatasetKey(ProjectIdKey):
dataset_id: str
class BigQuerySource(SQLAlchemySource):
config: BigQueryConfig
maximum_shard_ids: Dict[str, str] = dict()
lineage_metadata: Optional[Dict[str, Set[str]]] = None
@dataclass
class BigQueryReport(SQLSourceReport):
pass
class BigQuerySource(SQLAlchemySource):
def __init__(self, config, ctx):
super().__init__(config, ctx, "bigquery")
self.config: BigQueryConfig = config
self.report: BigQueryReport = BigQueryReport()
self.lineage_metadata: Optional[Dict[str, Set[str]]] = None
self.maximum_shard_ids: Dict[str, str] = dict()
def get_db_name(self, inspector: Inspector = None) -> str:
if self.config.project_id:
@ -772,7 +778,6 @@ WHERE
partition: Optional[str],
custom_sql: Optional[str] = None,
) -> dict:
self.config: BigQueryConfig
return dict(
schema=self.config.project_id,
table=f"{schema}.{table}",

View File

@ -1,12 +1,14 @@
import logging
from collections import defaultdict
from dataclasses import dataclass, field
from enum import Enum
from typing import Dict, Iterable, List, Optional, Set, Tuple, Union
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, Union
from urllib.parse import urlparse
# These imports verify that the dependencies are available.
import psycopg2 # noqa: F401
import pydantic # noqa: F401
import sqlalchemy
import sqlalchemy_redshift # noqa: F401
from sqlalchemy import create_engine, inspect
from sqlalchemy.engine import Connection, reflection
@ -24,8 +26,8 @@ from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.sql.postgres import PostgresConfig
from datahub.ingestion.source.sql.sql_common import (
SQLAlchemySource,
SQLSourceReport,
SqlWorkUnit,
logger,
)
# TRICKY: it's necessary to import the Postgres source because
@ -41,6 +43,8 @@ from datahub.metadata.schema_classes import (
UpstreamClass,
)
logger: logging.Logger = logging.getLogger(__name__)
class LineageMode(Enum):
SQL_BASED = "sql_based"
@ -336,18 +340,11 @@ def _get_schema_column_info(self, connection, schema=None, **kw):
def _get_external_db_mapping(connection):
# SQL query to get mapping of external schemas in redshift to its external database.
try:
result = connection.execute(
"""
select * from svv_external_schemas
"""
)
return result
except Exception as e:
logger.error(
"Error querying svv_external_schemas to get external database mapping.", e
)
return None
return connection.execute(
"""
select * from svv_external_schemas
"""
)
# This monkey-patching enables us to batch fetch the table descriptions, rather than
@ -360,15 +357,23 @@ RedshiftDialect._get_schema_column_info = _get_schema_column_info
redshift_datetime_format = "%Y-%m-%d %H:%M:%S"
@dataclass
class RedshiftReport(SQLSourceReport):
# https://forums.aws.amazon.com/ann.jspa?annID=9105
saas_version: str = ""
upstream_lineage: Dict[str, List[str]] = field(default_factory=dict)
class RedshiftSource(SQLAlchemySource):
config: RedshiftConfig
catalog_metadata: Dict = {}
eskind_to_platform = {1: "glue", 2: "hive", 3: "postgres", 4: "redshift"}
def __init__(self, config: RedshiftConfig, ctx: PipelineContext):
super().__init__(config, ctx, "redshift")
self.catalog_metadata: Dict = {}
self.config: RedshiftConfig = config
self._lineage_map: Optional[Dict[str, LineageItem]] = None
self._all_tables_set: Optional[Set[str]] = None
self.report: RedshiftReport = RedshiftReport()
@classmethod
def create(cls, config_dict, ctx):
@ -376,9 +381,12 @@ class RedshiftSource(SQLAlchemySource):
return cls(config, ctx)
def get_catalog_metadata(self, conn: Connection) -> None:
catalog_metadata = _get_external_db_mapping(conn)
if catalog_metadata is None:
try:
catalog_metadata = _get_external_db_mapping(conn)
except Exception as e:
self.error(logger, "external-svv_external_schemas", f"Error was {e}")
return
db_name = self.get_db_name()
external_schema_mapping = {}
@ -401,15 +409,30 @@ class RedshiftSource(SQLAlchemySource):
def get_inspectors(self) -> Iterable[Inspector]:
# This method can be overridden in the case that you want to dynamically
# run on multiple databases.
url = self.config.get_sql_alchemy_url()
logger.debug(f"sql_alchemy_url={url}")
engine = create_engine(url, **self.config.options)
engine = self.get_metadata_engine()
with engine.connect() as conn:
self.get_catalog_metadata(conn)
inspector = inspect(conn)
yield inspector
def get_metadata_engine(self) -> sqlalchemy.engine.Engine:
url = self.config.get_sql_alchemy_url()
logger.debug(f"sql_alchemy_url={url}")
return create_engine(url, **self.config.options)
def inspect_version(self) -> Any:
db_engine = self.get_metadata_engine()
logger.info("Checking current version")
for db_row in db_engine.execute("select version()"):
self.report.saas_version = db_row[0]
def get_workunits(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit]]:
try:
self.inspect_version()
except Exception as e:
self.report.report_failure("version", f"Error: {e}")
return
for wu in super().get_workunits():
yield wu
if (
@ -498,9 +521,7 @@ class RedshiftSource(SQLAlchemySource):
db_name = self.get_db_name()
all_tables_set = set()
url = self.config.get_sql_alchemy_url()
logger.debug(f"sql_alchemy_url={url}")
engine = create_engine(url, **self.config.options)
engine = self.get_metadata_engine()
for db_row in engine.execute(all_tables_query):
all_tables_set.add(
f'{db_name}.{db_row["schemaname"]}.{db_row["tablename"]}'
@ -554,9 +575,7 @@ class RedshiftSource(SQLAlchemySource):
if not self._all_tables_set:
self._all_tables_set = self._get_all_tables()
url = self.config.get_sql_alchemy_url()
logger.debug(f"sql_alchemy_url={url}")
engine = create_engine(url, **self.config.options)
engine = self.get_metadata_engine()
db_name = self.get_db_name()
@ -593,17 +612,21 @@ class RedshiftSource(SQLAlchemySource):
)
except Exception as e:
target.query_parser_failed_sqls.append(db_row["ddl"])
logger.warning(
self.warn(
logger,
"parsing-query",
f'Error parsing query {db_row["ddl"]} for getting lineage .'
f"\nError was {e}."
f"\nError was {e}.",
)
else:
if lineage_type == lineage_type.COPY:
platform = LineageDatasetPlatform.S3
path = db_row["filename"].strip()
if urlparse(path).scheme != "s3":
logger.warning(
f"Only s3 source supported with copy. The source was: {path}. ."
self.warn(
logger,
"non-s3-lineage",
f"Only s3 source supported with copy. The source was: {path}.",
)
continue
else:
@ -624,7 +647,9 @@ class RedshiftSource(SQLAlchemySource):
source.platform == LineageDatasetPlatform.REDSHIFT
and source.path not in self._all_tables_set
):
logger.warning(f"{source.path} missing table")
self.warn(
logger, "missing-table", f"{source.path} missing table"
)
continue
target.upstreams.add(source)
@ -648,10 +673,7 @@ class RedshiftSource(SQLAlchemySource):
)
except Exception as e:
logger.warning(
f"Extracting {lineage_type.name} lineage from Redshift failed."
f"Continuing...\nError was {e}."
)
self.warn(logger, f"extract-{lineage_type.name}", f"Error was {e}")
def _populate_lineage(self) -> None:
@ -957,7 +979,11 @@ class RedshiftSource(SQLAlchemySource):
if custom_properties:
properties = DatasetPropertiesClass(customProperties=custom_properties)
if not upstream_lineage:
if upstream_lineage:
self.report.upstream_lineage[dataset_urn] = [
u.dataset for u in upstream_lineage
]
else:
return None, properties
mcp = MetadataChangeProposalWrapper(

View File

@ -1,7 +1,6 @@
import json
import logging
from collections import defaultdict
from dataclasses import dataclass, field
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, Union
import pydantic
@ -9,35 +8,25 @@ import pydantic
# This import verifies that the dependencies are available.
import snowflake.sqlalchemy # noqa: F401
import sqlalchemy.engine
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from snowflake.connector.network import (
DEFAULT_AUTHENTICATOR,
EXTERNAL_BROWSER_AUTHENTICATOR,
KEY_PAIR_AUTHENTICATOR,
)
from snowflake.sqlalchemy import custom_types, snowdialect
from sqlalchemy import create_engine, inspect
from sqlalchemy.engine.reflection import Inspector
from sqlalchemy.sql import sqltypes, text
import datahub.emitter.mce_builder as builder
from datahub.configuration.common import AllowDenyPattern
from datahub.configuration.time_window_config import BaseTimeWindowConfig
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.aws.s3_util import make_s3_urn
from datahub.ingestion.source.sql.sql_common import (
RecordTypeClass,
SQLAlchemyConfig,
SQLAlchemySource,
SQLSourceReport,
SqlWorkUnit,
TimeTypeClass,
make_sqlalchemy_uri,
register_custom_type,
)
from datahub.ingestion.source_config.sql.snowflake import SnowflakeConfig
from datahub.ingestion.source_report.sql.snowflake import SnowflakeReport
from datahub.metadata.com.linkedin.pegasus2avro.dataset import (
DatasetLineageTypeClass,
UpstreamClass,
@ -54,146 +43,9 @@ register_custom_type(custom_types.VARIANT, RecordTypeClass)
logger: logging.Logger = logging.getLogger(__name__)
APPLICATION_NAME = "acryl_datahub"
snowdialect.ischema_names["GEOGRAPHY"] = sqltypes.NullType
@dataclass
class SnowflakeReport(SQLSourceReport):
num_table_to_table_edges_scanned: int = 0
num_table_to_view_edges_scanned: int = 0
num_view_to_table_edges_scanned: int = 0
num_external_table_edges_scanned: int = 0
upstream_lineage: Dict[str, List[str]] = field(default_factory=dict)
# https://community.snowflake.com/s/topic/0TO0Z000000Unu5WAC/releases
saas_version: str = ""
role: str = ""
role_grants: List[str] = field(default_factory=list)
class BaseSnowflakeConfig(BaseTimeWindowConfig):
# Note: this config model is also used by the snowflake-usage source.
scheme = "snowflake"
username: Optional[str] = None
password: Optional[pydantic.SecretStr] = pydantic.Field(default=None, exclude=True)
private_key_path: Optional[str]
private_key_password: Optional[pydantic.SecretStr] = pydantic.Field(
default=None, exclude=True
)
authentication_type: Optional[str] = "DEFAULT_AUTHENTICATOR"
host_port: str
warehouse: Optional[str]
role: Optional[str]
include_table_lineage: Optional[bool] = True
include_view_lineage: Optional[bool] = True
connect_args: Optional[dict]
@pydantic.validator("authentication_type", always=True)
def authenticator_type_is_valid(cls, v, values, **kwargs):
valid_auth_types = {
"DEFAULT_AUTHENTICATOR": DEFAULT_AUTHENTICATOR,
"EXTERNAL_BROWSER_AUTHENTICATOR": EXTERNAL_BROWSER_AUTHENTICATOR,
"KEY_PAIR_AUTHENTICATOR": KEY_PAIR_AUTHENTICATOR,
}
if v not in valid_auth_types.keys():
raise ValueError(
f"unsupported authenticator type '{v}' was provided,"
f" use one of {list(valid_auth_types.keys())}"
)
else:
if v == "KEY_PAIR_AUTHENTICATOR":
# If we are using key pair auth, we need the private key path and password to be set
if values.get("private_key_path") is None:
raise ValueError(
f"'private_key_path' was none "
f"but should be set when using {v} authentication"
)
if values.get("private_key_password") is None:
raise ValueError(
f"'private_key_password' was none "
f"but should be set when using {v} authentication"
)
logger.info(f"using authenticator type '{v}'")
return valid_auth_types.get(v)
@pydantic.validator("include_view_lineage")
def validate_include_view_lineage(cls, v, values):
if not values.get("include_table_lineage") and v:
raise ValueError(
"include_table_lineage must be True for include_view_lineage to be set."
)
return v
def get_sql_alchemy_url(self, database=None):
return make_sqlalchemy_uri(
self.scheme,
self.username,
self.password.get_secret_value() if self.password else None,
self.host_port,
f'"{database}"' if database is not None else database,
uri_opts={
# Drop the options if value is None.
key: value
for (key, value) in {
"authenticator": self.authentication_type,
"warehouse": self.warehouse,
"role": self.role,
"application": APPLICATION_NAME,
}.items()
if value
},
)
def get_sql_alchemy_connect_args(self) -> dict:
if self.authentication_type != KEY_PAIR_AUTHENTICATOR:
return {}
if self.connect_args is None:
if self.private_key_path is None:
raise ValueError("missing required private key path to read key from")
if self.private_key_password is None:
raise ValueError("missing required private key password")
with open(self.private_key_path, "rb") as key:
p_key = serialization.load_pem_private_key(
key.read(),
password=self.private_key_password.get_secret_value().encode(),
backend=default_backend(),
)
pkb = p_key.private_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption(),
)
self.connect_args = {"private_key": pkb}
return self.connect_args
class SnowflakeConfig(BaseSnowflakeConfig, SQLAlchemyConfig):
database_pattern: AllowDenyPattern = AllowDenyPattern(
deny=[r"^UTIL_DB$", r"^SNOWFLAKE$", r"^SNOWFLAKE_SAMPLE_DATA$"]
)
database: Optional[str] # deprecated
@pydantic.validator("database")
def note_database_opt_deprecation(cls, v, values, **kwargs):
logger.warning(
"snowflake's `database` option has been deprecated; use database_pattern instead"
)
values["database_pattern"].allow = f"^{v}$"
return None
def get_sql_alchemy_url(self, database: str = None) -> str:
return super().get_sql_alchemy_url(database=database)
def get_sql_alchemy_connect_args(self) -> dict:
return super().get_sql_alchemy_connect_args()
class SnowflakeSource(SQLAlchemySource):
def __init__(self, config: SnowflakeConfig, ctx: PipelineContext):
super().__init__(config, ctx, "snowflake")
@ -201,6 +53,7 @@ class SnowflakeSource(SQLAlchemySource):
self._external_lineage_map: Optional[Dict[str, Set[str]]] = None
self.report: SnowflakeReport = SnowflakeReport()
self.config: SnowflakeConfig = config
self.provision_role_in_progress: bool = False
@classmethod
def create(cls, config_dict, ctx):
@ -210,7 +63,20 @@ class SnowflakeSource(SQLAlchemySource):
def get_metadata_engine(
self, database: Optional[str] = None
) -> sqlalchemy.engine.Engine:
url = self.config.get_sql_alchemy_url(database=database)
if self.provision_role_in_progress and self.config.provision_role is not None:
username: Optional[str] = self.config.provision_role.admin_username
password: Optional[
pydantic.SecretStr
] = self.config.provision_role.admin_password
role: Optional[str] = self.config.provision_role.admin_role
else:
username = self.config.username
password = self.config.password
role = self.config.role
url = self.config.get_sql_alchemy_url(
database=database, username=username, password=password, role=role
)
logger.debug(f"sql_alchemy_url={url}")
return create_engine(
url,
@ -643,8 +509,104 @@ QUALIFY ROW_NUMBER() OVER (PARTITION BY downstream_table_name, upstream_table_na
return UpstreamLineage(upstreams=upstream_tables), column_lineage
return None
def add_config_to_report(self):
self.report.cleaned_host_port = self.config.host_port
if self.config.provision_role is not None:
self.report.run_ingestion = self.config.provision_role.run_ingestion
def do_provision_role_internal(self):
provision_role_block = self.config.provision_role
if provision_role_block is None:
return
self.report.provision_role_done = not provision_role_block.dry_run
role = self.config.role
if role is None:
role = "datahub_role"
self.warn(
logger,
"role-grant",
f"role not specified during provision role using {role} as default",
)
self.report.role = role
warehouse = self.config.warehouse
logger.info("Creating connection for provision_role")
engine = self.get_metadata_engine(database=None)
sqls: List[str] = []
if provision_role_block.drop_role_if_exists:
sqls.append(f"DROP ROLE IF EXISTS {role}")
sqls.append(f"CREATE ROLE IF NOT EXISTS {role}")
if warehouse is None:
self.warn(
logger, "role-grant", "warehouse not specified during provision role"
)
else:
sqls.append(f"grant operate, usage on warehouse {warehouse} to role {role}")
for inspector in self.get_inspectors():
db_name = self.get_db_name(inspector)
sqls.extend(
[
f"grant usage on DATABASE {db_name} to role {role}",
f"grant usage on all schemas in database {db_name} to role {role}",
f"grant select on all tables in database {db_name} to role {role}",
f"grant select on all external tables in database {db_name} to role {role}",
f"grant select on all views in database {db_name} to role {role}",
f"grant usage on future schemas in database {db_name} to role {role}",
f"grant select on future tables in database {db_name} to role {role}",
]
)
if self.config.username is not None:
sqls.append(f"grant role {role} to user {self.config.username}")
sqls.append(f"grant imported privileges on database snowflake to role {role}")
dry_run_str = "[DRY RUN] " if provision_role_block.dry_run else ""
for sql in sqls:
logger.info(f"{dry_run_str} Attempting to run sql {sql}")
if provision_role_block.dry_run:
continue
try:
engine.execute(sql)
except Exception as e:
self.error(logger, "role-grant", f"Exception: {e}")
self.report.provision_role_success = not provision_role_block.dry_run
def do_provision_role(self):
if (
self.config.provision_role is None
or self.config.provision_role.enabled is False
):
return
try:
self.provision_role_in_progress = True
self.do_provision_role_internal()
finally:
self.provision_role_in_progress = False
def should_run_ingestion(self) -> bool:
return (
self.config.provision_role is None
or self.config.provision_role.enabled is False
or self.config.provision_role.run_ingestion
)
# Override the base class method.
def get_workunits(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit]]:
self.add_config_to_report()
self.do_provision_role()
if not self.should_run_ingestion():
return
try:
self.inspect_version()
except Exception as e:

View File

@ -42,7 +42,6 @@ from datahub.emitter.mcp_builder import (
gen_containers,
)
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.source import SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.state.checkpoint import Checkpoint
from datahub.ingestion.source.state.sql_common_state import (
@ -52,6 +51,7 @@ from datahub.ingestion.source.state.stateful_ingestion_base import (
JobId,
StatefulIngestionConfig,
StatefulIngestionConfigBase,
StatefulIngestionReport,
StatefulIngestionSourceBase,
)
from datahub.metadata.com.linkedin.pegasus2avro.common import StatusClass
@ -161,7 +161,7 @@ class SqlContainerSubTypes(str, Enum):
@dataclass
class SQLSourceReport(SourceReport):
class SQLSourceReport(StatefulIngestionReport):
tables_scanned: int = 0
views_scanned: int = 0
entities_profiled: int = 0
@ -413,7 +413,7 @@ class SQLAlchemySource(StatefulIngestionSourceBase):
super(SQLAlchemySource, self).__init__(config, ctx)
self.config = config
self.platform = platform
self.report = SQLSourceReport()
self.report: SQLSourceReport = SQLSourceReport()
config_report = {
config_option: config.dict().get(config_option)
@ -447,7 +447,7 @@ class SQLAlchemySource(StatefulIngestionSourceBase):
def error(self, log: logging.Logger, key: str, reason: str) -> Any:
self.report.report_failure(key, reason)
log.error(reason)
log.error(f"{key} => {reason}")
def get_inspectors(self) -> Iterable[Inspector]:
# This method can be overridden in the case that you want to dynamically

View File

@ -1,5 +1,6 @@
import logging
import platform
from dataclasses import dataclass
from datetime import datetime
from typing import Any, Dict, Optional, Type, cast
@ -20,7 +21,7 @@ from datahub.ingestion.api.ingestion_job_checkpointing_provider_base import (
from datahub.ingestion.api.ingestion_job_reporting_provider_base import (
IngestionReportingProviderBase,
)
from datahub.ingestion.api.source import Source
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.source.state.checkpoint import Checkpoint, CheckpointStateBase
from datahub.ingestion.source.state_provider.state_provider_registry import (
ingestion_checkpoint_provider_registry,
@ -65,6 +66,11 @@ class StatefulIngestionConfigBase(DatasetSourceConfigBase):
stateful_ingestion: Optional[StatefulIngestionConfig] = None
@dataclass
class StatefulIngestionReport(SourceReport):
pass
class StatefulIngestionSourceBase(Source):
"""
Defines the base class for all stateful sources.
@ -80,6 +86,15 @@ class StatefulIngestionSourceBase(Source):
self.cur_checkpoints: Dict[JobId, Optional[Checkpoint]] = {}
self.run_summaries_to_report: Dict[JobId, DatahubIngestionRunSummaryClass] = {}
self._initialize_checkpointing_state_provider()
self.report: StatefulIngestionReport = StatefulIngestionReport()
def warn(self, log: logging.Logger, key: str, reason: str) -> Any:
self.report.report_warning(key, reason)
log.warning(reason)
def error(self, log: logging.Logger, key: str, reason: str) -> Any:
self.report.report_failure(key, reason)
log.error(f"{key} => {reason}")
#
# Checkpointing specific support.

View File

@ -572,13 +572,10 @@ class BigQueryUsageSourceReport(SourceReport):
class BigQueryUsageSource(Source):
config: BigQueryUsageConfig
report: BigQueryUsageSourceReport
def __init__(self, config: BigQueryUsageConfig, ctx: PipelineContext):
super().__init__(ctx)
self.config = config
self.report = BigQueryUsageSourceReport()
self.config: BigQueryUsageConfig = config
self.report: BigQueryUsageSourceReport = BigQueryUsageSourceReport()
@classmethod
def create(cls, config_dict: dict, ctx: PipelineContext) -> "BigQueryUsageSource":

View File

@ -1,6 +1,7 @@
import collections
import dataclasses
import logging
from dataclasses import dataclass
from datetime import datetime
from typing import Any, Dict, Iterable, List, Optional, Union
@ -13,6 +14,7 @@ from sqlalchemy.engine import Engine
import datahub.emitter.mce_builder as builder
from datahub.configuration.time_window_config import get_time_bucket
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.sql.redshift import RedshiftConfig
@ -87,15 +89,21 @@ class RedshiftUsageConfig(RedshiftConfig, BaseUsageConfig):
return super().get_sql_alchemy_url()
@dataclass
class RedshiftUsageReport(SourceReport):
pass
@dataclasses.dataclass
class RedshiftUsageSource(Source):
config: RedshiftUsageConfig
report: SourceReport = dataclasses.field(default_factory=SourceReport)
def __init__(self, config: RedshiftUsageConfig, ctx: PipelineContext):
self.config: RedshiftUsageConfig = config
self.report: RedshiftUsageReport = RedshiftUsageReport()
@classmethod
def create(cls, config_dict, ctx):
config = RedshiftUsageConfig.parse_obj(config_dict)
return cls(ctx, config)
return cls(config, ctx)
def get_workunits(self) -> Iterable[MetadataWorkUnit]:
"""Gets Redshift usage stats as work units"""

View File

@ -15,21 +15,16 @@ from datahub.configuration.common import AllowDenyPattern
from datahub.configuration.time_window_config import get_time_bucket
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.source import SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.sql.snowflake import BaseSnowflakeConfig
from datahub.ingestion.source.state.checkpoint import Checkpoint
from datahub.ingestion.source.state.stateful_ingestion_base import (
JobId,
StatefulIngestionConfig,
StatefulIngestionConfigBase,
StatefulIngestionSourceBase,
)
from datahub.ingestion.source.state.usage_common_state import BaseUsageCheckpointState
from datahub.ingestion.source.usage.usage_common import (
BaseUsageConfig,
GenericAggregatedDataset,
)
from datahub.ingestion.source.usage.usage_common import GenericAggregatedDataset
from datahub.ingestion.source_config.usage.snowflake_usage import SnowflakeUsageConfig
from datahub.ingestion.source_report.usage.snowflake_usage import SnowflakeUsageReport
from datahub.metadata.schema_classes import (
ChangeTypeClass,
JobStatusClass,
@ -37,6 +32,7 @@ from datahub.metadata.schema_classes import (
OperationTypeClass,
TimeWindowSizeClass,
)
from datahub.utilities.perf_timer import PerfTimer
logger = logging.getLogger(__name__)
@ -126,50 +122,11 @@ class SnowflakeJoinedAccessEvent(PermissiveModel):
role_name: str
class SnowflakeStatefulIngestionConfig(StatefulIngestionConfig):
"""
Specialization of basic StatefulIngestionConfig to adding custom config.
This will be used to override the stateful_ingestion config param of StatefulIngestionConfigBase
in the SnowflakeUsageConfig.
"""
ignore_old_state = pydantic.Field(False, alias="force_rerun")
class SnowflakeUsageConfig(
BaseSnowflakeConfig, BaseUsageConfig, StatefulIngestionConfigBase
):
options: dict = {}
database_pattern: AllowDenyPattern = AllowDenyPattern(
deny=[r"^UTIL_DB$", r"^SNOWFLAKE$", r"^SNOWFLAKE_SAMPLE_DATA$"]
)
email_domain: Optional[str]
schema_pattern: AllowDenyPattern = AllowDenyPattern.allow_all()
table_pattern: AllowDenyPattern = AllowDenyPattern.allow_all()
view_pattern: AllowDenyPattern = AllowDenyPattern.allow_all()
apply_view_usage_to_tables: bool = False
stateful_ingestion: Optional[SnowflakeStatefulIngestionConfig] = None
@pydantic.validator("role", always=True)
def role_accountadmin(cls, v):
if not v or v.lower() != "accountadmin":
# This isn't an error, since the privileges can be delegated to other
# roles as well: https://docs.snowflake.com/en/sql-reference/account-usage.html#enabling-account-usage-for-other-roles
logger.info(
'snowflake usage tables are only accessible by role "accountadmin" by default; you set %s',
v,
)
return v
def get_sql_alchemy_url(self):
return super().get_sql_alchemy_url(database="snowflake")
class SnowflakeUsageSource(StatefulIngestionSourceBase):
def __init__(self, config: SnowflakeUsageConfig, ctx: PipelineContext):
super(SnowflakeUsageSource, self).__init__(config, ctx)
self.config: SnowflakeUsageConfig = config
self.report: SourceReport = SourceReport()
self.report: SnowflakeUsageReport = SnowflakeUsageReport()
self.should_skip_this_run = self._should_skip_this_run()
@classmethod
@ -275,7 +232,23 @@ class SnowflakeUsageSource(StatefulIngestionSourceBase):
summary.config = self.config.json()
summary.custom_summary = self.report.as_string()
def check_email_domain_missing(self) -> Any:
if self.config.email_domain is not None and self.config.email_domain != "":
return
self.warn(
logger,
"missing-email-domain",
"User's without email address will be ignored from usage if you don't set email_domain property",
)
def add_config_to_report(self):
self.report.start_time = self.config.start_time
self.report.end_time = self.config.end_time
def get_workunits(self) -> Iterable[MetadataWorkUnit]:
self.add_config_to_report()
self.check_email_domain_missing()
if not self.should_skip_this_run:
# Initialize the checkpoints
self._init_checkpoints()
@ -316,107 +289,143 @@ class SnowflakeUsageSource(StatefulIngestionSourceBase):
)
return engine
def _check_usage_date_ranges(self, engine: Engine) -> Any:
query = """
select
min(query_start_time) as min_time,
max(query_start_time) as max_time
from snowflake.account_usage.access_history
where ARRAY_SIZE(base_objects_accessed) > 0
"""
with PerfTimer() as timer:
for db_row in engine.execute(query):
self.report.min_access_history_time = db_row[0].astimezone(
tz=timezone.utc
)
self.report.max_access_history_time = db_row[1].astimezone(
tz=timezone.utc
)
self.report.access_history_range_query_secs = round(
timer.elapsed_seconds(), 2
)
break
def _is_unsupported_object_accessed(self, obj: Dict[str, Any]) -> bool:
unsupported_keys = ["locations"]
return any([obj.get(key) is not None for key in unsupported_keys])
def _is_object_valid(self, obj: Dict[str, Any]) -> bool:
if self._is_unsupported_object_accessed(
obj
) or not self._is_dataset_pattern_allowed(
obj.get("objectName"), obj.get("objectDomain")
):
return False
return True
def _is_dataset_pattern_allowed(
self, dataset_name: Optional[Any], dataset_type: Optional[Any]
) -> bool:
# TODO: support table/view patterns for usage logs by pulling that information as well from the usage query
if not dataset_type or not dataset_name:
return True
table_or_view_pattern: Optional[AllowDenyPattern] = AllowDenyPattern.allow_all()
# Test domain type = external_table and then add it
table_or_view_pattern = (
self.config.table_pattern
if dataset_type.lower() in {"table"}
else (
self.config.view_pattern
if dataset_type.lower() in {"view", "materialized_view"}
else None
)
)
if table_or_view_pattern is None:
return True
dataset_params = dataset_name.split(".")
assert len(dataset_params) == 3
if (
not self.config.database_pattern.allowed(dataset_params[0])
or not self.config.schema_pattern.allowed(dataset_params[1])
or not table_or_view_pattern.allowed(dataset_params[2])
):
return False
return True
def _process_snowflake_history_row(
self, row: Any
) -> Iterable[SnowflakeJoinedAccessEvent]:
self.report.rows_processed += 1
# Make some minor type conversions.
if hasattr(row, "_asdict"):
# Compat with SQLAlchemy 1.3 and 1.4
# See https://docs.sqlalchemy.org/en/14/changelog/migration_14.html#rowproxy-is-no-longer-a-proxy-is-now-called-row-and-behaves-like-an-enhanced-named-tuple.
event_dict = row._asdict()
else:
event_dict = dict(row)
# no use processing events that don't have a query text
if not event_dict["query_text"]:
self.report.rows_missing_query_text += 1
return
event_dict["base_objects_accessed"] = [
obj
for obj in json.loads(event_dict["base_objects_accessed"])
if self._is_object_valid(obj)
]
if len(event_dict["base_objects_accessed"]) == 0:
self.report.rows_zero_base_objects_accessed += 1
event_dict["direct_objects_accessed"] = [
obj
for obj in json.loads(event_dict["direct_objects_accessed"])
if self._is_object_valid(obj)
]
if len(event_dict["direct_objects_accessed"]) == 0:
self.report.rows_zero_direct_objects_accessed += 1
event_dict["query_start_time"] = (event_dict["query_start_time"]).astimezone(
tz=timezone.utc
)
if not event_dict["email"] and self.config.email_domain:
if not event_dict["user_name"]:
self.report.report_warning("user-name-miss", f"Missing in {event_dict}")
logger.warning(
f"The user_name is missing from {event_dict}. Skipping ...."
)
self.report.rows_missing_email += 1
return
event_dict[
"email"
] = f'{event_dict["user_name"]}@{self.config.email_domain}'.lower()
try: # big hammer try block to ensure we don't fail on parsing events
event = SnowflakeJoinedAccessEvent(**event_dict)
yield event
except Exception as e:
self.report.rows_parsing_error += 1
self.warn(logger, "usage", f"Failed to parse usage line {event_dict}, {e}")
def _get_snowflake_history(self) -> Iterable[SnowflakeJoinedAccessEvent]:
query = self._make_usage_query()
engine = self._make_sql_engine()
results = engine.execute(query)
logger.info("Checking usage date ranges")
self._check_usage_date_ranges(engine)
logger.info("Getting usage history")
with PerfTimer() as timer:
query = self._make_usage_query()
results = engine.execute(query)
self.report.access_history_query_secs = round(timer.elapsed_seconds(), 2)
for row in results:
# Make some minor type conversions.
if hasattr(row, "_asdict"):
# Compat with SQLAlchemy 1.3 and 1.4
# See https://docs.sqlalchemy.org/en/14/changelog/migration_14.html#rowproxy-is-no-longer-a-proxy-is-now-called-row-and-behaves-like-an-enhanced-named-tuple.
event_dict = row._asdict()
else:
event_dict = dict(row)
# no use processing events that don't have a query text
if not event_dict["query_text"]:
continue
def is_unsupported_object_accessed(obj: Dict[str, Any]) -> bool:
unsupported_keys = ["locations"]
return any([obj.get(key) is not None for key in unsupported_keys])
def is_dataset_pattern_allowed(
dataset_name: Optional[Any], dataset_type: Optional[Any]
) -> bool:
# TODO: support table/view patterns for usage logs by pulling that information as well from the usage query
if not dataset_type or not dataset_name:
return True
table_or_view_pattern: Optional[
AllowDenyPattern
] = AllowDenyPattern.allow_all()
# Test domain type = external_table and then add it
table_or_view_pattern = (
self.config.table_pattern
if dataset_type.lower() in {"table"}
else (
self.config.view_pattern
if dataset_type.lower() in {"view", "materialized_view"}
else None
)
)
if table_or_view_pattern is None:
return True
dataset_params = dataset_name.split(".")
assert len(dataset_params) == 3
if (
not self.config.database_pattern.allowed(dataset_params[0])
or not self.config.schema_pattern.allowed(dataset_params[1])
or not table_or_view_pattern.allowed(dataset_params[2])
):
return False
return True
def is_object_valid(obj: Dict[str, Any]) -> bool:
if is_unsupported_object_accessed(
obj
) or not is_dataset_pattern_allowed(
obj.get("objectName"), obj.get("objectDomain")
):
return False
return True
event_dict["base_objects_accessed"] = [
obj
for obj in json.loads(event_dict["base_objects_accessed"])
if is_object_valid(obj)
]
event_dict["direct_objects_accessed"] = [
obj
for obj in json.loads(event_dict["direct_objects_accessed"])
if is_object_valid(obj)
]
event_dict["query_start_time"] = (
event_dict["query_start_time"]
).astimezone(tz=timezone.utc)
if not event_dict["email"] and self.config.email_domain:
if not event_dict["user_name"]:
self.report.report_warning(
"user-name-miss", f"Missing in {event_dict}"
)
logger.warning(
f"The user_name is missing from {event_dict}. Skipping ...."
)
continue
event_dict[
"email"
] = f'{event_dict["user_name"]}@{self.config.email_domain}'.lower()
try: # big hammer try block to ensure we don't fail on parsing events
event = SnowflakeJoinedAccessEvent(**event_dict)
yield event
except Exception as e:
logger.warning(f"Failed to parse usage line {event_dict}", e)
self.report.report_warning(
"usage", f"Failed to parse usage line {event_dict}"
)
yield from self._process_snowflake_history_row(row)
def _get_operation_aspect_work_unit(
self, event: SnowflakeJoinedAccessEvent

View File

@ -0,0 +1,210 @@
import logging
from typing import Optional
import pydantic
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from snowflake.connector.network import (
DEFAULT_AUTHENTICATOR,
EXTERNAL_BROWSER_AUTHENTICATOR,
KEY_PAIR_AUTHENTICATOR,
)
from datahub.configuration.common import AllowDenyPattern, ConfigModel
from datahub.configuration.time_window_config import BaseTimeWindowConfig
from datahub.ingestion.source.sql.sql_common import (
SQLAlchemyConfig,
make_sqlalchemy_uri,
)
from datahub.utilities.config_clean import (
remove_protocol,
remove_suffix,
remove_trailing_slashes,
)
APPLICATION_NAME = "acryl_datahub"
logger: logging.Logger = logging.getLogger(__name__)
class SnowflakeProvisionRoleConfig(ConfigModel):
enabled: bool = False
# Can be used by account admin to test what sql statements will be run
dry_run: bool = False
# Setting this to True is helpful in case you want a clean role without any extra privileges
# Not set to True by default because multiple parallel
# snowflake ingestions can be dependent on single role
drop_role_if_exists: bool = False
# When Account admin is testing they might not want to actually do the ingestion
# Set this to False in case the account admin would want to
# create role
# grant role to user in main config
# run ingestion as the user in main config
run_ingestion: bool = False
admin_role: Optional[str] = "accountadmin"
admin_username: str
admin_password: pydantic.SecretStr = pydantic.Field(default=None, exclude=True)
@pydantic.validator("admin_username", always=True)
def username_not_empty(cls, v, values, **kwargs):
v_str: str = str(v)
if v_str.strip() == "":
raise ValueError("username is empty")
return v
class BaseSnowflakeConfig(BaseTimeWindowConfig):
# Note: this config model is also used by the snowflake-usage source.
scheme = "snowflake"
username: Optional[str] = None
password: Optional[pydantic.SecretStr] = pydantic.Field(default=None, exclude=True)
private_key_path: Optional[str]
private_key_password: Optional[pydantic.SecretStr] = pydantic.Field(
default=None, exclude=True
)
authentication_type: Optional[str] = "DEFAULT_AUTHENTICATOR"
host_port: str
warehouse: Optional[str]
role: Optional[str]
include_table_lineage: Optional[bool] = True
include_view_lineage: Optional[bool] = True
connect_args: Optional[dict]
@pydantic.validator("host_port", always=True)
def host_port_is_valid(cls, v, values, **kwargs):
v = remove_protocol(v)
v = remove_trailing_slashes(v)
v = remove_suffix(v, ".snowflakecomputing.com")
logger.info(f"Cleaned Host port is {v}")
return v
@pydantic.validator("authentication_type", always=True)
def authenticator_type_is_valid(cls, v, values, **kwargs):
valid_auth_types = {
"DEFAULT_AUTHENTICATOR": DEFAULT_AUTHENTICATOR,
"EXTERNAL_BROWSER_AUTHENTICATOR": EXTERNAL_BROWSER_AUTHENTICATOR,
"KEY_PAIR_AUTHENTICATOR": KEY_PAIR_AUTHENTICATOR,
}
if v not in valid_auth_types.keys():
raise ValueError(
f"unsupported authenticator type '{v}' was provided,"
f" use one of {list(valid_auth_types.keys())}"
)
else:
if v == "KEY_PAIR_AUTHENTICATOR":
# If we are using key pair auth, we need the private key path and password to be set
if values.get("private_key_path") is None:
raise ValueError(
f"'private_key_path' was none "
f"but should be set when using {v} authentication"
)
if values.get("private_key_password") is None:
raise ValueError(
f"'private_key_password' was none "
f"but should be set when using {v} authentication"
)
logger.info(f"using authenticator type '{v}'")
return valid_auth_types.get(v)
@pydantic.validator("include_view_lineage")
def validate_include_view_lineage(cls, v, values):
if not values.get("include_table_lineage") and v:
raise ValueError(
"include_table_lineage must be True for include_view_lineage to be set."
)
return v
def get_sql_alchemy_url(
self,
database: Optional[str] = None,
username: Optional[str] = None,
password: Optional[pydantic.SecretStr] = None,
role: Optional[str] = None,
) -> str:
if username is None:
username = self.username
if password is None:
password = self.password
if role is None:
role = self.role
return make_sqlalchemy_uri(
self.scheme,
username,
password.get_secret_value() if password else None,
self.host_port,
f'"{database}"' if database is not None else database,
uri_opts={
# Drop the options if value is None.
key: value
for (key, value) in {
"authenticator": self.authentication_type,
"warehouse": self.warehouse,
"role": role,
"application": APPLICATION_NAME,
}.items()
if value
},
)
def get_sql_alchemy_connect_args(self) -> dict:
if self.authentication_type != KEY_PAIR_AUTHENTICATOR:
return {}
if self.connect_args is None:
if self.private_key_path is None:
raise ValueError("missing required private key path to read key from")
if self.private_key_password is None:
raise ValueError("missing required private key password")
with open(self.private_key_path, "rb") as key:
p_key = serialization.load_pem_private_key(
key.read(),
password=self.private_key_password.get_secret_value().encode(),
backend=default_backend(),
)
pkb = p_key.private_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption(),
)
self.connect_args = {"private_key": pkb}
return self.connect_args
class SnowflakeConfig(BaseSnowflakeConfig, SQLAlchemyConfig):
database_pattern: AllowDenyPattern = AllowDenyPattern(
deny=[r"^UTIL_DB$", r"^SNOWFLAKE$", r"^SNOWFLAKE_SAMPLE_DATA$"]
)
database: Optional[str] # deprecated
provision_role: Optional[SnowflakeProvisionRoleConfig] = None
@pydantic.validator("database")
def note_database_opt_deprecation(cls, v, values, **kwargs):
logger.warning(
"snowflake's `database` option has been deprecated; use database_pattern instead"
)
values["database_pattern"].allow = f"^{v}$"
return None
def get_sql_alchemy_url(
self,
database: str = None,
username: Optional[str] = None,
password: Optional[pydantic.SecretStr] = None,
role: Optional[str] = None,
) -> str:
return super().get_sql_alchemy_url(
database=database, username=username, password=password, role=role
)
def get_sql_alchemy_connect_args(self) -> dict:
return super().get_sql_alchemy_connect_args()

View File

@ -0,0 +1,58 @@
import logging
from typing import Optional
import pydantic
from datahub.configuration.common import AllowDenyPattern
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionConfig,
StatefulIngestionConfigBase,
)
from datahub.ingestion.source.usage.usage_common import BaseUsageConfig
from datahub.ingestion.source_config.sql.snowflake import BaseSnowflakeConfig
logger = logging.getLogger(__name__)
class SnowflakeStatefulIngestionConfig(StatefulIngestionConfig):
"""
Specialization of basic StatefulIngestionConfig to adding custom config.
This will be used to override the stateful_ingestion config param of StatefulIngestionConfigBase
in the SnowflakeUsageConfig.
"""
ignore_old_state = pydantic.Field(False, alias="force_rerun")
class SnowflakeUsageConfig(
BaseSnowflakeConfig, BaseUsageConfig, StatefulIngestionConfigBase
):
options: dict = {}
database_pattern: AllowDenyPattern = AllowDenyPattern(
deny=[r"^UTIL_DB$", r"^SNOWFLAKE$", r"^SNOWFLAKE_SAMPLE_DATA$"]
)
email_domain: Optional[str]
schema_pattern: AllowDenyPattern = AllowDenyPattern.allow_all()
table_pattern: AllowDenyPattern = AllowDenyPattern.allow_all()
view_pattern: AllowDenyPattern = AllowDenyPattern.allow_all()
apply_view_usage_to_tables: bool = False
stateful_ingestion: Optional[SnowflakeStatefulIngestionConfig] = None
@pydantic.validator("role", always=True)
def role_accountadmin(cls, v):
if not v or v.lower() != "accountadmin":
# This isn't an error, since the privileges can be delegated to other
# roles as well: https://docs.snowflake.com/en/sql-reference/account-usage.html#enabling-account-usage-for-other-roles
logger.info(
'snowflake usage tables are only accessible by role "accountadmin" by default; you set %s',
v,
)
return v
def get_sql_alchemy_url(self):
return super().get_sql_alchemy_url(
database="snowflake",
username=self.username,
password=self.password,
role=self.role,
)

View File

@ -0,0 +1,29 @@
from dataclasses import dataclass, field
from typing import Dict, List
from datahub.ingestion.source.sql.sql_common import SQLSourceReport
from datahub.ingestion.source_report.time_window import BaseTimeWindowReport
@dataclass
class BaseSnowflakeReport(BaseTimeWindowReport):
pass
@dataclass
class SnowflakeReport(BaseSnowflakeReport, SQLSourceReport):
num_table_to_table_edges_scanned: int = 0
num_table_to_view_edges_scanned: int = 0
num_view_to_table_edges_scanned: int = 0
num_external_table_edges_scanned: int = 0
upstream_lineage: Dict[str, List[str]] = field(default_factory=dict)
cleaned_host_port: str = ""
run_ingestion: bool = False
provision_role_done: bool = False
provision_role_success: bool = False
# https://community.snowflake.com/s/topic/0TO0Z000000Unu5WAC/releases
saas_version: str = ""
role: str = ""
role_grants: List[str] = field(default_factory=list)

View File

@ -0,0 +1,9 @@
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
@dataclass
class BaseTimeWindowReport:
end_time: Optional[datetime] = None
start_time: Optional[datetime] = None

View File

@ -0,0 +1,23 @@
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionReport,
)
from datahub.ingestion.source_report.sql.snowflake import BaseSnowflakeReport
@dataclass
class SnowflakeUsageReport(BaseSnowflakeReport, StatefulIngestionReport):
min_access_history_time: Optional[datetime] = None
max_access_history_time: Optional[datetime] = None
access_history_range_query_secs: float = -1
access_history_query_secs: float = -1
rows_processed: int = 0
rows_missing_query_text: int = 0
rows_zero_base_objects_accessed: int = 0
rows_zero_direct_objects_accessed: int = 0
rows_missing_email: int = 0
rows_parsing_error: int = 0

View File

@ -29,6 +29,7 @@ class Telemetry:
client_id: str
enabled: bool = True
tracking_init: bool = False
def __init__(self):
@ -47,14 +48,6 @@ class Telemetry:
self.mp = Mixpanel(
MIXPANEL_TOKEN, consumer=Consumer(request_timeout=int(TIMEOUT))
)
self.mp.people_set(
self.client_id,
{
"datahub_version": datahub_package.nice_version_name(),
"os": platform.system(),
"python_version": platform.python_version(),
},
)
except Exception as e:
logger.debug(f"Error connecting to mixpanel: {e}")
@ -62,6 +55,7 @@ class Telemetry:
"""
Update the config file with the current client ID and enabled status.
"""
logger.info("Updating telemetry config")
if not DATAHUB_FOLDER.exists():
os.makedirs(DATAHUB_FOLDER)
@ -124,6 +118,21 @@ class Telemetry:
f"{CONFIG_FILE} had an IOError, please inspect this file for issues."
)
def init_tracking(self) -> None:
if not self.enabled or self.mp is None or self.tracking_init is True:
return
logger.info("Sending init Telemetry")
self.mp.people_set(
self.client_id,
{
"datahub_version": datahub_package.nice_version_name(),
"os": platform.system(),
"python_version": platform.python_version(),
},
)
self.init_track = True
def ping(
self,
action: str,
@ -144,6 +153,7 @@ class Telemetry:
# send event
try:
logger.info("Sending Telemetry")
self.mp.track(self.client_id, action, properties)
except Exception as e:
@ -155,6 +165,13 @@ telemetry_instance = Telemetry()
T = TypeVar("T")
def set_telemetry_enable(enable: bool) -> Any:
telemetry_instance.enabled = enable
if not enable:
logger.info("Disabling Telemetry locally due to server config")
telemetry_instance.update_config()
def get_full_class_name(obj):
module = obj.__class__.__module__
if module is None or module == str.__class__.__module__:
@ -168,6 +185,7 @@ def with_telemetry(func: Callable[..., T]) -> Callable[..., T]:
action = f"function:{func.__module__}.{func.__name__}"
telemetry_instance.init_tracking()
telemetry_instance.ping(action)
try:
res = func(*args, **kwargs)

View File

@ -1,10 +1,16 @@
import re
def remove_suffix(inp: str, suffix: str, remove_all: bool = False) -> str:
while suffix and inp.endswith(suffix):
inp = inp[: -len(suffix)]
if not remove_all:
break
return inp
def remove_trailing_slashes(url: str) -> str:
while url.endswith("/"):
url = url[:-1]
return url
return remove_suffix(url, "/", remove_all=True)
def remove_protocol(url: str) -> str:

View File

@ -0,0 +1,22 @@
from typing import Any, Dict, Optional
from datahub.telemetry.telemetry import set_telemetry_enable
# Only to be written to for logging server related information
global_debug: Dict[str, Any] = dict()
def set_gms_config(config: Dict) -> Any:
global_debug["gms_config"] = config
cli_telemtry_enabled = is_cli_telemetry_enabled()
if cli_telemtry_enabled is not None:
set_telemetry_enable(cli_telemtry_enabled)
def get_gms_config() -> Dict:
return global_debug.get("gms_config", {})
def is_cli_telemetry_enabled() -> Optional[bool]:
return get_gms_config().get("telemetry", {}).get("enabledCli", None)

View File

@ -1,6 +1,20 @@
from datahub.utilities import config_clean
def test_remove_suffix():
assert (
config_clean.remove_suffix(
"xaaabcdef.snowflakecomputing.com", ".snowflakecomputing.com"
)
== "xaaabcdef"
)
assert (
config_clean.remove_suffix("xaaabcdef", ".snowflakecomputing.com")
== "xaaabcdef"
)
def test_url_without_slash_suffix():
assert (
config_clean.remove_trailing_slashes("http://example.com")

View File

@ -1,7 +1,3 @@
import pytest
@pytest.mark.integration
def test_snowflake_uri_default_authentication():
from datahub.ingestion.source.sql.snowflake import SnowflakeConfig
@ -23,7 +19,6 @@ def test_snowflake_uri_default_authentication():
)
@pytest.mark.integration
def test_snowflake_uri_external_browser_authentication():
from datahub.ingestion.source.sql.snowflake import SnowflakeConfig
@ -45,7 +40,6 @@ def test_snowflake_uri_external_browser_authentication():
)
@pytest.mark.integration
def test_snowflake_uri_key_pair_authentication():
from datahub.ingestion.source.sql.snowflake import SnowflakeConfig

View File

@ -10,10 +10,10 @@ public class IngestionConfiguration {
/**
* Whether managed ingestion is enabled
*/
private boolean enabled;
public boolean enabled;
/**
* The default CLI version to use in managed ingestion
*/
private String defaultCliVersion;
public String defaultCliVersion;
}

View File

@ -22,4 +22,9 @@ public class ConfigurationProvider {
* Ingestion related configs
*/
private IngestionConfiguration ingestion;
/**
* Telemetry related configs
*/
private TelemetryConfiguration telemetry;
}

View File

@ -0,0 +1,17 @@
package com.linkedin.gms.factory.config;
import lombok.Data;
/**
* POJO representing the "telemtry" configuration block in application.yml.
*/
@Data
public class TelemetryConfiguration {
/**
* Whether cli telemtry is enabled
*/
public boolean enabledCli;
/**
* Whether reporting telemetry is enabled
*/
public boolean enabledIngestion;
}

View File

@ -28,6 +28,10 @@ ingestion:
enabled: ${UI_INGESTION_ENABLED:true}
defaultCliVersion: '${UI_INGESTION_DEFAULT_CLI_VERSION:0.8.26.6}'
telemetry:
enabledCli: ${CLI_TELEMETRY_ENABLED:true}
enabledIngestion: ${INGESTION_REPORTING_ENABLED:false}
secretService:
encryptionKey: ${SECRET_SERVICE_ENCRYPTION_KEY:ENCRYPTION_KEY}

View File

@ -9,4 +9,5 @@ dependencies {
compile externalDependency.springWebMVC
annotationProcessor externalDependency.lombok
compile project(':entity-registry')
compile project(':metadata-service:factories')
}

View File

@ -20,6 +20,7 @@ import javax.servlet.http.HttpServletResponse;
import org.apache.maven.artifact.versioning.ComparableVersion;
import org.springframework.web.context.WebApplicationContext;
import org.springframework.web.context.support.WebApplicationContextUtils;
import com.linkedin.gms.factory.config.ConfigurationProvider;
// Return a 200 for health checks
@ -50,6 +51,10 @@ public class Config extends HttpServlet {
return patchDiagnostics;
}
private ConfigurationProvider getConfigProvider(WebApplicationContext ctx) {
return (ConfigurationProvider) ctx.getBean("configurationProvider");
}
private GitVersion getGitVersion(WebApplicationContext ctx) {
return (GitVersion) ctx.getBean("gitVersion");
}
@ -71,6 +76,20 @@ public class Config extends HttpServlet {
versionConfig.put("linkedin/datahub", version.toConfig());
config.put("versions", versionConfig);
ConfigurationProvider configProvider = getConfigProvider(ctx);
Map<String, Object> telemetryConfig = new HashMap<String, Object>() {{
put("enabledCli", configProvider.getTelemetry().enabledCli);
put("enabledIngestion", configProvider.getTelemetry().enabledIngestion);
}};
config.put("telemetry", telemetryConfig);
Map<String, Object> ingestionConfig = new HashMap<String, Object>() {{
put("enabled", configProvider.getIngestion().enabled);
put("defaultCliVersion", configProvider.getIngestion().defaultCliVersion);
}};
config.put("managedIngestion", ingestionConfig);
resp.setContentType("application/json");
PrintWriter out = resp.getWriter();