fix(ingest): Glue lineage compatibility docs and more resilience (#3555)

This commit is contained in:
Kevin Hu 2021-11-13 11:15:59 -05:00 committed by GitHub
parent a510a0c7c1
commit af366711b7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 49 additions and 24 deletions

View File

@ -38,26 +38,26 @@ sink:
Note that a `.` is used to denote nested fields in the YAML recipe.
| Field | Required | Default | Description |
| ------------------------ | -------- | --------------------------- | ---------------------------------------------------------------------------------- |
| `aws_region` | ✅ | | AWS region code. |
| `env` | | `"PROD"` | Environment to use in namespace when constructing URNs. |
| `aws_access_key_id` | | Autodetected | See https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html |
| `aws_secret_access_key` | | Autodetected | See https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html |
| `aws_session_token` | | Autodetected | See https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html |
| `aws_role` | | Autodetected | See https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html |
| `extract_transforms` | | `True` | Whether to extract Glue transform jobs. |
| `database_pattern.allow` | | | List of regex patterns for databases to include in ingestion. |
| `database_pattern.deny` | | | List of regex patterns for databases to exclude from ingestion. |
| `database_pattern.ignoreCase` | | `True` | Whether to ignore case sensitivity during pattern matching. |
| `table_pattern.allow` | | | List of regex patterns for tables to include in ingestion. |
| `table_pattern.deny` | | | List of regex patterns for tables to exclude from ingestion. |
| `table_pattern.ignoreCase` | | `True` | Whether to ignore case sensitivity during pattern matching. |
| `underlying_platform` | | `glue` | Override for platform name. Allowed values - `glue`, `athena` |
| Field | Required | Default | Description |
| ----------------------------- | -------- | ------------ | ---------------------------------------------------------------------------------- |
| `aws_region` | ✅ | | AWS region code. |
| `env` | | `"PROD"` | Environment to use in namespace when constructing URNs. |
| `aws_access_key_id` | | Autodetected | See https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html |
| `aws_secret_access_key` | | Autodetected | See https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html |
| `aws_session_token` | | Autodetected | See https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html |
| `aws_role` | | Autodetected | See https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html |
| `extract_transforms` | | `True` | Whether to extract Glue transform jobs. |
| `database_pattern.allow` | | | List of regex patterns for databases to include in ingestion. |
| `database_pattern.deny` | | | List of regex patterns for databases to exclude from ingestion. |
| `database_pattern.ignoreCase` | | `True` | Whether to ignore case sensitivity during pattern matching. |
| `table_pattern.allow` | | | List of regex patterns for tables to include in ingestion. |
| `table_pattern.deny` | | | List of regex patterns for tables to exclude from ingestion. |
| `table_pattern.ignoreCase` | | `True` | Whether to ignore case sensitivity during pattern matching. |
| `underlying_platform` | | `glue` | Override for platform name. Allowed values - `glue`, `athena` |
## Compatibility
Coming soon!
To capture lineage across Glue jobs and databases, a requirements must be met otherwise the AWS API is unable to report any lineage. The job must be created in Glue Studio with the "Generate classic script" option turned on (this option can be accessed in the "Script" tab). Any custom scripts that do not have the proper annotations will not have reported lineage.
## Questions

View File

@ -1,4 +1,3 @@
import json
import typing
from collections import defaultdict
from dataclasses import dataclass
@ -6,6 +5,8 @@ from dataclasses import field as dataclass_field
from typing import Any, Dict, Iterable, Iterator, List, Optional, Set, Tuple, Union
from urllib.parse import urlparse
import yaml
from datahub.emitter import mce_builder
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.source import Source, SourceReport
@ -157,6 +158,15 @@ class GlueSource(Source):
return None
def get_s3_uri(self, node_args):
s3_uri = node_args.get("connection_options", {}).get("path")
# sometimes the path is a single element in a list rather than a single one
if s3_uri is None:
s3_uri = node_args.get("connection_options", {}).get("paths")[0]
return s3_uri
def get_dataflow_s3_names(
self, dataflow_graph: Dict[str, Any]
) -> Iterator[Tuple[str, Optional[str]]]:
@ -169,12 +179,17 @@ class GlueSource(Source):
# for nodes representing datasets, we construct a dataset URN accordingly
if node_type in ["DataSource", "DataSink"]:
node_args = {x["Name"]: json.loads(x["Value"]) for x in node["Args"]}
node_args = {
x["Name"]: yaml.safe_load(x["Value"]) for x in node["Args"]
}
# if data object is S3 bucket
if node_args.get("connection_type") == "s3":
s3_uri = node_args["connection_options"]["path"]
s3_uri = self.get_s3_uri(node_args)
if s3_uri is None:
continue
extension = node_args.get("format")
@ -187,14 +202,14 @@ class GlueSource(Source):
new_dataset_ids: List[str],
new_dataset_mces: List[MetadataChangeEvent],
s3_formats: typing.DefaultDict[str, Set[Union[str, None]]],
) -> Dict[str, Any]:
) -> Optional[Dict[str, Any]]:
node_type = node["NodeType"]
# for nodes representing datasets, we construct a dataset URN accordingly
if node_type in ["DataSource", "DataSink"]:
node_args = {x["Name"]: json.loads(x["Value"]) for x in node["Args"]}
node_args = {x["Name"]: yaml.safe_load(x["Value"]) for x in node["Args"]}
# if data object is Glue table
if "database" in node_args and "table_name" in node_args:
@ -207,7 +222,14 @@ class GlueSource(Source):
# if data object is S3 bucket
elif node_args.get("connection_type") == "s3":
s3_uri = node_args["connection_options"]["path"]
s3_uri = self.get_s3_uri(node_args)
if s3_uri is None:
self.report.report_warning(
f"{node['Nodetype']}-{node['Id']}",
f"Could not find script path for job {node['Nodetype']}-{node['Id']} in flow {flow_urn}. Skipping",
)
return None
# append S3 format if different ones exist
if len(s3_formats[s3_uri]) > 1:
@ -283,10 +305,13 @@ class GlueSource(Source):
# iterate through each node to populate processed nodes
for node in dataflow_graph["DagNodes"]:
nodes[node["Id"]] = self.process_dataflow_node(
processed_node = self.process_dataflow_node(
node, flow_urn, new_dataset_ids, new_dataset_mces, s3_formats
)
if processed_node is not None:
nodes[node["Id"]] = processed_node
# traverse edges to fill in node properties
for edge in dataflow_graph["DagEdges"]: