feat(ingestion) dbt: Fixing issue with strip_user_ids_from_email and adding owner_extraction_pattern (#4587)

* Fixing issue with strip_user_ids_from_email and adding owner_extraction_pattern
Co-authored-by: BZ <93607724+BoyuanZhangDE@users.noreply.github.com>
Co-authored-by: Ravindra Lanka <rlanka@acryl.io>
This commit is contained in:
Arun Vasudevan 2022-04-13 18:58:36 -05:00 committed by GitHub
parent 4d7684bae7
commit 5aa3da5c9c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 24764 additions and 17515 deletions

View File

@ -18,7 +18,7 @@ This plugin pulls metadata from dbt's artifact files and generates:
Note:
1. It also generates lineage between the `dbt` nodes (e.g. ephemeral nodes that depend on other dbt sources) as well as lineage between the `dbt` nodes and the underlying (target) platform nodes (e.g. BigQuery Table -> dbt Source, dbt View -> BigQuery View).
2. The previous version of this source (`acryl_datahub<=0.8.16.2`) did not generate `dbt` entities and lineage between `dbt` entities and platform entities. For backwards compatibility with the previous version of this source, there is a config flag `disable_dbt_node_creation` that falls back to the old behavior.
2. The previous version of this source (`acryl_datahub<=0.8.16.2`) did not generate `dbt` entities and lineage between `dbt` entities and platform entities. For backwards compatibility with the previous version of this source, there is a config flag `disable_dbt_node_creation` that falls back to the old behavior.
3. We also support automated actions (like add a tag, term or owner) based on properties defined in dbt meta.
The artifacts used by this source are:
@ -59,33 +59,34 @@ sink:
Note that a `.` is used to denote nested fields in the YAML recipe.
| Field | Required | Default | Description |
| ------------------------- | -------- | -------- | ----------------------------------------------------------------------------------------------------------------------------------------------------- |
| `manifest_path` | ✅ | | Path to dbt manifest JSON. See https://docs.getdbt.com/reference/artifacts/manifest-json Note this can be a local file or a URI. |
| `catalog_path` | ✅ | | Path to dbt catalog JSON. See https://docs.getdbt.com/reference/artifacts/catalog-json Note this can be a local file or a URI. |
| `sources_path` | | | Path to dbt sources JSON. See https://docs.getdbt.com/reference/artifacts/sources-json. If not specified, last-modified fields will not be populated. Note this can be a local file or a URI. |
| `env` | | `"PROD"` | Environment to use in namespace when constructing URNs. |
| `target_platform` | ✅ | | The platform that dbt is loading onto. (e.g. bigquery / redshift / postgres etc.) |
| `use_identifiers` | | `False` | Use model [identifier](https://docs.getdbt.com/reference/resource-properties/identifier) instead of model name if defined (if not, default to model name). |
| `tag_prefix` | | `dbt:` | Prefix added to tags during ingestion. |
| `node_type_pattern.allow` | | | List of regex patterns for dbt nodes to include in ingestion. |
| `node_type_pattern.deny` | | | List of regex patterns for dbt nodes to exclude from ingestion. |
| `node_type_pattern.ignoreCase` | | `True` | Whether to ignore case sensitivity during pattern matching. |
| `node_name_pattern.allow` | | | List of regex patterns for dbt model names to include in ingestion. |
| `node_name_pattern.deny` | | | List of regex patterns for dbt model names to exclude from ingestion. |
| `node_name_pattern.ignoreCase` | | `True` | Whether to ignore case sensitivity during pattern matching. |
| `disable_dbt_node_creation` | | `False` | Whether to suppress `dbt` dataset metadata creation. When set to `True`, this flag applies the dbt metadata to the `target_platform` entities (e.g. populating schema and column descriptions from dbt into the postgres / bigquery table metadata in DataHub) and generates lineage between the platform entities. |
| `load_schemas` | | `True` | This flag is only consulted when `disable_dbt_node_creation` is set to `True`. Load schemas for `target_platform` entities from dbt catalog file, not necessary when you are already ingesting this metadata from the data platform directly. If set to `False`, table schema details (e.g. columns) will not be ingested. |
| `meta_mapping` | | | mapping rules that will be executed against dbt meta properties. Refer to the section below on dbt meta automated mappings. |
| `enable_meta_mapping` | | `False` | When enabled, applies the mappings that are defined through the `meta_mapping` directives. |
| `write_semantics` | | `PATCH` | Whether the new tags, terms and owners to be added will override the existing ones added only by this source or not. Value for this config can be "PATCH" or "OVERRIDE" |
| `strip_user_ids_from_email` | | `FALSE` | Whether or not to strip email id while adding owners using dbt meta actions. |
| Field | Required | Default | Description |
|--------------------------------| -------- |----------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `manifest_path` | ✅ | | Path to dbt manifest JSON. See https://docs.getdbt.com/reference/artifacts/manifest-json Note this can be a local file or a URI. |
| `catalog_path` | ✅ | | Path to dbt catalog JSON. See https://docs.getdbt.com/reference/artifacts/catalog-json Note this can be a local file or a URI. |
| `sources_path` | | | Path to dbt sources JSON. See https://docs.getdbt.com/reference/artifacts/sources-json. If not specified, last-modified fields will not be populated. Note this can be a local file or a URI. |
| `env` | | `"PROD"` | Environment to use in namespace when constructing URNs. |
| `target_platform` | ✅ | | The platform that dbt is loading onto. (e.g. bigquery / redshift / postgres etc.) |
| `use_identifiers` | | `False` | Use model [identifier](https://docs.getdbt.com/reference/resource-properties/identifier) instead of model name if defined (if not, default to model name). |
| `tag_prefix` | | `dbt:` | Prefix added to tags during ingestion. |
| `node_type_pattern.allow` | | | List of regex patterns for dbt nodes to include in ingestion. |
| `node_type_pattern.deny` | | | List of regex patterns for dbt nodes to exclude from ingestion. |
| `node_type_pattern.ignoreCase` | | `True` | Whether to ignore case sensitivity during pattern matching. |
| `node_name_pattern.allow` | | | List of regex patterns for dbt model names to include in ingestion. |
| `node_name_pattern.deny` | | | List of regex patterns for dbt model names to exclude from ingestion. |
| `node_name_pattern.ignoreCase` | | `True` | Whether to ignore case sensitivity during pattern matching. |
| `disable_dbt_node_creation` | | `False` | Whether to suppress `dbt` dataset metadata creation. When set to `True`, this flag applies the dbt metadata to the `target_platform` entities (e.g. populating schema and column descriptions from dbt into the postgres / bigquery table metadata in DataHub) and generates lineage between the platform entities. |
| `load_schemas` | | `True` | This flag is only consulted when `disable_dbt_node_creation` is set to `True`. Load schemas for `target_platform` entities from dbt catalog file, not necessary when you are already ingesting this metadata from the data platform directly. If set to `False`, table schema details (e.g. columns) will not be ingested. |
| `meta_mapping` | | | mapping rules that will be executed against dbt meta properties. Refer to the section below on dbt meta automated mappings. |
| `enable_meta_mapping` | | `False` | When enabled, applies the mappings that are defined through the `meta_mapping` directives. |
| `write_semantics` | | `PATCH` | Whether the new tags, terms and owners to be added will override the existing ones added only by this source or not. Value for this config can be "PATCH" or "OVERRIDE" |
| `strip_user_ids_from_email` | | `FALSE` | Whether or not to strip email id while adding owners using dbt meta actions. |
| `owner_extraction_pattern` | | `None` | Regex string to extract owner from the dbt node using the `(?P<name>...) syntax` of the [match object](https://docs.python.org/3/library/re.html#match-objects), where the group name must be `owner`. Examples: (1)`r"(?P<owner>(.*)): (\w+) (\w+)"` will extract `jdoe` as the owner from `"jdoe: John Doe"` (2) `r"@(?P<owner>(.*))"` will extract `alice` as the owner from `"@alice"`. |
### dbt meta automated mappings
dbt allows authors to define meta properties for datasets. Checkout this link to know more - [dbt meta](https://docs.getdbt.com/reference/resource-configs/meta). Our dbt source allows users to define
actions such as add a tag, term or owner. For example if a dbt model has a meta config ```"has_pii": True```, we can define an action
actions such as add a tag, term or owner. For example if a dbt model has a meta config ```"has_pii": True```, we can define an action
that evaluates if the property is set to true and add, lets say, a ```pii``` tag.
To leverage this feature we require users to define mappings as part of the recipe. Following is how mappings can be defined -
To leverage this feature we require users to define mappings as part of the recipe. Following is how mappings can be defined -
```json
"meta_mapping": {
"business_owner": {
@ -115,7 +116,7 @@ To leverage this feature we require users to define mappings as part of the reci
},
}
```
We support the below actions -
We support the below actions -
1. add_tag - Requires ```tag``` property in config.
2. add_term - Requires ```term``` property in config.
3. add_owner - Requires ```owner_type``` property in config which can be either user or group.

View File

@ -101,7 +101,7 @@ Note that a `.` is used to denote nested fields in the YAML recipe.
| `domain.domain_key.allow` | | | List of regex patterns for tables to set domain_key domain key (domain_key can be any string like `sales`. There can be multiple domain key specified. |
| `domain.domain_key.deny` | | | List of regex patterns for tables to not assign domain_key. There can be multiple domain key specified. |
| `domain.domain_key.ignoreCase` | | `True` | Whether to ignore case sensitivity during pattern matching.There can be multiple domain key specified. |
| `catalog_id` | | None | The aws account id where the target glue catalog lives. If None, datahub will ingest glue in aws caller's account. |
| `catalog_id` | | | The aws account id where the target glue catalog lives. If None, datahub will ingest glue in aws caller's account. |
## Compatibility

View File

@ -10,7 +10,6 @@ from pydantic import validator
from datahub.configuration.common import AllowDenyPattern, ConfigurationError
from datahub.emitter import mce_builder
from datahub.emitter.mce_builder import DEFAULT_ENV
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.ingestion_job_state_provider import JobId
@ -99,7 +98,7 @@ class DBTConfig(StatefulIngestionConfigBase):
manifest_path: str
catalog_path: str
sources_path: Optional[str]
env: str = DEFAULT_ENV
env: str = mce_builder.DEFAULT_ENV
target_platform: str
load_schemas: bool = True
use_identifiers: bool = False
@ -111,6 +110,7 @@ class DBTConfig(StatefulIngestionConfigBase):
enable_meta_mapping = True
write_semantics: str = "PATCH"
strip_user_ids_from_email: bool = False
owner_extraction_pattern: Optional[str]
# Custom Stateful Ingestion settings
stateful_ingestion: Optional[DBTStatefulIngestionConfig] = None
@ -406,7 +406,7 @@ def get_urn_from_dbtNode(
database: Optional[str], schema: str, name: str, target_platform: str, env: str
) -> str:
db_fqn = get_db_fqn(database, schema, name)
return f"urn:li:dataset:(urn:li:dataPlatform:{target_platform},{db_fqn},{env})"
return mce_builder.make_dataset_urn(target_platform, db_fqn, env)
def get_custom_properties(node: DBTNode) -> Dict[str, str]:
@ -560,7 +560,10 @@ def get_schema_metadata(
globalTags = None
if column.tags:
globalTags = GlobalTagsClass(
tags=[TagAssociationClass(f"urn:li:tag:{tag}") for tag in column.tags]
tags=[
TagAssociationClass(mce_builder.make_tag_urn(tag))
for tag in column.tags
]
)
field = SchemaField(
@ -577,7 +580,7 @@ def get_schema_metadata(
last_modified = None
if node.max_loaded_at is not None:
actor = "urn:li:corpuser:dbt_executor"
actor = mce_builder.make_user_urn("dbt_executor")
last_modified = AuditStamp(
time=int(dateutil.parser.parse(node.max_loaded_at).timestamp() * 1000),
actor=actor,
@ -585,7 +588,7 @@ def get_schema_metadata(
return SchemaMetadata(
schemaName=node.dbt_name,
platform=f"urn:li:dataPlatform:{platform}",
platform=mce_builder.make_data_platform_urn(platform),
version=0,
hash="",
platformSchema=MySqlDDL(tableSchema=""),
@ -607,6 +610,11 @@ class DBTSource(StatefulIngestionSourceBase):
self.config: DBTConfig = config
self.platform: str = platform
self.report: DBTSourceReport = DBTSourceReport()
self.compiled_owner_extraction_pattern: Optional[Any] = None
if self.config.owner_extraction_pattern:
self.compiled_owner_extraction_pattern = re.compile(
self.config.owner_extraction_pattern
)
# TODO: Consider refactoring this logic out for use across sources as it is leading to a significant amount of
# code duplication.
@ -900,7 +908,7 @@ class DBTSource(StatefulIngestionSourceBase):
transformed_tag_list = self.get_transformed_tags_by_prefix(
tag_aspect.tags,
mce.proposedSnapshot.urn,
f"urn:li:tag:{self.config.tag_prefix}",
mce_builder.make_tag_urn(self.config.tag_prefix),
)
tag_aspect.tags = transformed_tag_list
@ -940,17 +948,6 @@ class DBTSource(StatefulIngestionSourceBase):
)
return dbt_properties
def _get_owners_aspect(self, node: DBTNode) -> OwnershipClass:
owners = [
OwnerClass(
owner=f"urn:li:corpuser:{node.owner}",
type=OwnershipTypeClass.DATAOWNER,
)
]
return OwnershipClass(
owners=owners,
)
def _create_view_properties_aspect(self, node: DBTNode) -> ViewPropertiesClass:
materialized = node.materialization in {"table", "incremental"}
# this function is only called when raw sql is present. assert is added to satisfy lint checks
@ -1026,14 +1023,29 @@ class DBTSource(StatefulIngestionSourceBase):
) -> List[OwnerClass]:
owner_list: List[OwnerClass] = []
if node.owner:
owner: str = node.owner
if self.compiled_owner_extraction_pattern:
match: Optional[Any] = re.match(
self.compiled_owner_extraction_pattern, owner
)
if match:
owner = match.group("owner")
logger.debug(
f"Owner after applying owner extraction pattern:'{self.config.owner_extraction_pattern}' is '{owner}'."
)
if self.config.strip_user_ids_from_email:
owner = owner.split("@")[0]
logger.debug(f"Owner (after stripping email):{owner}")
owner_list.append(
OwnerClass(
owner=f"urn:li:corpuser:{node.owner}",
owner=mce_builder.make_user_urn(owner),
type=OwnershipTypeClass.DATAOWNER,
)
)
if meta_owner_aspects and self.config.enable_meta_mapping:
owner_list += meta_owner_aspects.owners
owner_list.extend(meta_owner_aspects.owners)
owner_list = sorted(owner_list, key=lambda x: x.owner)
return owner_list

View File

@ -1389,7 +1389,7 @@
"com.linkedin.pegasus2avro.common.Ownership": {
"owners": [
{
"owner": "urn:li:corpuser:@bob",
"owner": "urn:li:corpuser:bob",
"type": "DATAOWNER",
"source": null
}
@ -2056,7 +2056,7 @@
"com.linkedin.pegasus2avro.common.Ownership": {
"owners": [
{
"owner": "urn:li:corpuser:@charles",
"owner": "urn:li:corpuser:charles",
"type": "DATAOWNER",
"source": null
}

File diff suppressed because it is too large Load Diff

View File

@ -32,10 +32,10 @@ class DbtTestConfig:
tmp_path: Union[str, PathLike],
output_file: Union[str, PathLike],
golden_file: Union[str, PathLike],
manifest_file: str = "dbt_manifest.json",
source_config_modifiers: Optional[Dict[str, Any]] = None,
sink_config_modifiers: Optional[Dict[str, Any]] = None,
):
if source_config_modifiers is None:
source_config_modifiers = {}
@ -44,7 +44,7 @@ class DbtTestConfig:
self.run_id = run_id
self.manifest_path = f"{dbt_metadata_uri_prefix}/dbt_manifest.json"
self.manifest_path = f"{dbt_metadata_uri_prefix}/{manifest_file}"
self.catalog_path = f"{dbt_metadata_uri_prefix}/dbt_catalog.json"
self.sources_path = f"{dbt_metadata_uri_prefix}/dbt_sources.json"
self.target_platform = "postgres"
@ -131,6 +131,7 @@ def test_dbt_ingest(pytestconfig, tmp_path, mock_time, **kwargs):
"load_schemas": True,
"disable_dbt_node_creation": True,
"enable_meta_mapping": True,
"owner_extraction_pattern": r"^@(?P<owner>(.*))",
},
),
DbtTestConfig(
@ -143,6 +144,7 @@ def test_dbt_ingest(pytestconfig, tmp_path, mock_time, **kwargs):
source_config_modifiers={
"load_schemas": True,
"disable_dbt_node_creation": True,
"owner_extraction_pattern": r"^@(?P<owner>(.*))",
},
),
DbtTestConfig(
@ -155,6 +157,7 @@ def test_dbt_ingest(pytestconfig, tmp_path, mock_time, **kwargs):
source_config_modifiers={
"load_schemas": False,
"disable_dbt_node_creation": True,
"owner_extraction_pattern": r"^@(?P<owner>(.*))",
},
),
DbtTestConfig(
@ -170,6 +173,7 @@ def test_dbt_ingest(pytestconfig, tmp_path, mock_time, **kwargs):
"deny": ["source.sample_dbt.pagila.payment_p2020_06"]
},
"disable_dbt_node_creation": True,
"owner_extraction_pattern": r"^@(?P<owner>(.*))",
},
),
DbtTestConfig(
@ -179,7 +183,11 @@ def test_dbt_ingest(pytestconfig, tmp_path, mock_time, **kwargs):
tmp_path,
"dbt_enabled_with_schemas_mces.json",
"dbt_enabled_with_schemas_mces_golden.json",
source_config_modifiers={"load_schemas": True, "enable_meta_mapping": True},
source_config_modifiers={
"load_schemas": True,
"enable_meta_mapping": True,
"owner_extraction_pattern": r"^@(?P<owner>(.*))",
},
),
DbtTestConfig(
"dbt-test-without-schemas-dbt-enabled",
@ -188,7 +196,10 @@ def test_dbt_ingest(pytestconfig, tmp_path, mock_time, **kwargs):
tmp_path,
"dbt_enabled_without_schemas_mces.json",
"dbt_enabled_without_schemas_mces_golden.json",
source_config_modifiers={"load_schemas": False},
source_config_modifiers={
"load_schemas": False,
"owner_extraction_pattern": r"^@(?P<owner>(.*))",
},
),
DbtTestConfig(
"dbt-test-without-schemas-with-filter-dbt-enabled",
@ -202,6 +213,24 @@ def test_dbt_ingest(pytestconfig, tmp_path, mock_time, **kwargs):
"node_name_pattern": {
"deny": ["source.sample_dbt.pagila.payment_p2020_06"]
},
"owner_extraction_pattern": r"^@(?P<owner>(.*))",
},
),
DbtTestConfig(
"dbt-test-with-complex-owner-patterns",
test_resources_dir,
test_resources_dir,
tmp_path,
"dbt_test_with_complex_owner_patterns_mces.json",
"dbt_test_with_complex_owner_patterns_mces_golden.json",
manifest_file="dbt_manifest_complex_owner_patterns.json",
source_config_modifiers={
"load_schemas": False,
"node_name_pattern": {
"deny": ["source.sample_dbt.pagila.payment_p2020_06"]
},
"owner_extraction_pattern": "(.*)(?P<owner>(?<=\\().*?(?=\\)))",
"strip_user_ids_from_email": True,
},
),
]
@ -276,6 +305,7 @@ def test_dbt_stateful(pytestconfig, tmp_path, mock_time, mock_datahub_graph):
"load_schemas": True,
# This will bypass check in get_workunits function of dbt.py
"write_semantics": "OVERRIDE",
"owner_extraction_pattern": r"^@(?P<owner>(.*))",
# enable stateful ingestion
**stateful_config,
}
@ -287,6 +317,7 @@ def test_dbt_stateful(pytestconfig, tmp_path, mock_time, mock_datahub_graph):
"target_platform": "postgres",
"load_schemas": True,
"write_semantics": "OVERRIDE",
"owner_extraction_pattern": r"^@(?P<owner>(.*))",
# enable stateful ingestion
**stateful_config,
}
@ -307,7 +338,6 @@ def test_dbt_stateful(pytestconfig, tmp_path, mock_time, mock_datahub_graph):
"datahub.ingestion.source.state_provider.datahub_ingestion_checkpointing_provider.DataHubGraph",
mock_datahub_graph,
) as mock_checkpoint:
mock_checkpoint.return_value = mock_datahub_graph
# Do the first run of the pipeline and get the default job's checkpoint.