diff --git a/metadata-ingestion/source_docs/glue.md b/metadata-ingestion/source_docs/glue.md index 9934bbc911..58775be1ab 100644 --- a/metadata-ingestion/source_docs/glue.md +++ b/metadata-ingestion/source_docs/glue.md @@ -38,26 +38,26 @@ sink: Note that a `.` is used to denote nested fields in the YAML recipe. -| Field | Required | Default | Description | -| ------------------------ | -------- | --------------------------- | ---------------------------------------------------------------------------------- | -| `aws_region` | ✅ | | AWS region code. | -| `env` | | `"PROD"` | Environment to use in namespace when constructing URNs. | -| `aws_access_key_id` | | Autodetected | See https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html | -| `aws_secret_access_key` | | Autodetected | See https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html | -| `aws_session_token` | | Autodetected | See https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html | -| `aws_role` | | Autodetected | See https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html | -| `extract_transforms` | | `True` | Whether to extract Glue transform jobs. | -| `database_pattern.allow` | | | List of regex patterns for databases to include in ingestion. | -| `database_pattern.deny` | | | List of regex patterns for databases to exclude from ingestion. | -| `database_pattern.ignoreCase` | | `True` | Whether to ignore case sensitivity during pattern matching. | -| `table_pattern.allow` | | | List of regex patterns for tables to include in ingestion. | -| `table_pattern.deny` | | | List of regex patterns for tables to exclude from ingestion. | -| `table_pattern.ignoreCase` | | `True` | Whether to ignore case sensitivity during pattern matching. | -| `underlying_platform` | | `glue` | Override for platform name. Allowed values - `glue`, `athena` | +| Field | Required | Default | Description | +| ----------------------------- | -------- | ------------ | ---------------------------------------------------------------------------------- | +| `aws_region` | ✅ | | AWS region code. | +| `env` | | `"PROD"` | Environment to use in namespace when constructing URNs. | +| `aws_access_key_id` | | Autodetected | See https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html | +| `aws_secret_access_key` | | Autodetected | See https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html | +| `aws_session_token` | | Autodetected | See https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html | +| `aws_role` | | Autodetected | See https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html | +| `extract_transforms` | | `True` | Whether to extract Glue transform jobs. | +| `database_pattern.allow` | | | List of regex patterns for databases to include in ingestion. | +| `database_pattern.deny` | | | List of regex patterns for databases to exclude from ingestion. | +| `database_pattern.ignoreCase` | | `True` | Whether to ignore case sensitivity during pattern matching. | +| `table_pattern.allow` | | | List of regex patterns for tables to include in ingestion. | +| `table_pattern.deny` | | | List of regex patterns for tables to exclude from ingestion. | +| `table_pattern.ignoreCase` | | `True` | Whether to ignore case sensitivity during pattern matching. | +| `underlying_platform` | | `glue` | Override for platform name. Allowed values - `glue`, `athena` | ## Compatibility -Coming soon! +To capture lineage across Glue jobs and databases, a requirements must be met – otherwise the AWS API is unable to report any lineage. The job must be created in Glue Studio with the "Generate classic script" option turned on (this option can be accessed in the "Script" tab). Any custom scripts that do not have the proper annotations will not have reported lineage. ## Questions diff --git a/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py b/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py index c62f2514da..1bad1a6520 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py +++ b/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py @@ -1,4 +1,3 @@ -import json import typing from collections import defaultdict from dataclasses import dataclass @@ -6,6 +5,8 @@ from dataclasses import field as dataclass_field from typing import Any, Dict, Iterable, Iterator, List, Optional, Set, Tuple, Union from urllib.parse import urlparse +import yaml + from datahub.emitter import mce_builder from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.api.source import Source, SourceReport @@ -157,6 +158,15 @@ class GlueSource(Source): return None + def get_s3_uri(self, node_args): + s3_uri = node_args.get("connection_options", {}).get("path") + + # sometimes the path is a single element in a list rather than a single one + if s3_uri is None: + s3_uri = node_args.get("connection_options", {}).get("paths")[0] + + return s3_uri + def get_dataflow_s3_names( self, dataflow_graph: Dict[str, Any] ) -> Iterator[Tuple[str, Optional[str]]]: @@ -169,12 +179,17 @@ class GlueSource(Source): # for nodes representing datasets, we construct a dataset URN accordingly if node_type in ["DataSource", "DataSink"]: - node_args = {x["Name"]: json.loads(x["Value"]) for x in node["Args"]} + node_args = { + x["Name"]: yaml.safe_load(x["Value"]) for x in node["Args"] + } # if data object is S3 bucket if node_args.get("connection_type") == "s3": - s3_uri = node_args["connection_options"]["path"] + s3_uri = self.get_s3_uri(node_args) + + if s3_uri is None: + continue extension = node_args.get("format") @@ -187,14 +202,14 @@ class GlueSource(Source): new_dataset_ids: List[str], new_dataset_mces: List[MetadataChangeEvent], s3_formats: typing.DefaultDict[str, Set[Union[str, None]]], - ) -> Dict[str, Any]: + ) -> Optional[Dict[str, Any]]: node_type = node["NodeType"] # for nodes representing datasets, we construct a dataset URN accordingly if node_type in ["DataSource", "DataSink"]: - node_args = {x["Name"]: json.loads(x["Value"]) for x in node["Args"]} + node_args = {x["Name"]: yaml.safe_load(x["Value"]) for x in node["Args"]} # if data object is Glue table if "database" in node_args and "table_name" in node_args: @@ -207,7 +222,14 @@ class GlueSource(Source): # if data object is S3 bucket elif node_args.get("connection_type") == "s3": - s3_uri = node_args["connection_options"]["path"] + s3_uri = self.get_s3_uri(node_args) + + if s3_uri is None: + self.report.report_warning( + f"{node['Nodetype']}-{node['Id']}", + f"Could not find script path for job {node['Nodetype']}-{node['Id']} in flow {flow_urn}. Skipping", + ) + return None # append S3 format if different ones exist if len(s3_formats[s3_uri]) > 1: @@ -283,10 +305,13 @@ class GlueSource(Source): # iterate through each node to populate processed nodes for node in dataflow_graph["DagNodes"]: - nodes[node["Id"]] = self.process_dataflow_node( + processed_node = self.process_dataflow_node( node, flow_urn, new_dataset_ids, new_dataset_mces, s3_formats ) + if processed_node is not None: + nodes[node["Id"]] = processed_node + # traverse edges to fill in node properties for edge in dataflow_graph["DagEdges"]: