feat(ingest): Glue - Support for domains and containers (#4110)

* Add container and domain support for Glue.
Adding option to set aws profile for Glue.

* Adding domain doc for Glue

* Making get_workunits less complex

* Updating golden file

* Addressing pr review comments

* Remove unneded empty line
This commit is contained in:
Tamas Nemeth 2022-02-16 17:29:14 +01:00 committed by GitHub
parent 5640999670
commit b2664916e3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 272 additions and 74 deletions

View File

@ -20,6 +20,8 @@ This plugin extracts the following:
| Capability | Status | Details |
| -----------| ------ | ---- |
| Platform Instance | 🛑 | [link](../../docs/platform-instances.md) |
| Data Containers | ✔️ | |
| Data Domains | ✔️ | [link](../../docs/domains.md) |
## Quickstart recipe
@ -73,26 +75,30 @@ plus `s3:GetObject` for the job script locations.
Note that a `.` is used to denote nested fields in the YAML recipe.
| Field | Required | Default | Description |
| ------------------------------- | -------- | ------------ | ---------------------------------------------------------------------------------- |
| `aws_region` | ✅ | | AWS region code. |
| `env` | | `"PROD"` | Environment to use in namespace when constructing URNs. |
| `aws_access_key_id` | | Autodetected | See https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html |
| `aws_secret_access_key` | | Autodetected | See https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html |
| `aws_session_token` | | Autodetected | See https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html |
| `aws_role` | | Autodetected | See https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html |
| `extract_transforms` | | `True` | Whether to extract Glue transform jobs. |
| `database_pattern.allow` | | | List of regex patterns for databases to include in ingestion. |
| `database_pattern.deny` | | | List of regex patterns for databases to exclude from ingestion. |
| `database_pattern.ignoreCase` | | `True` | Whether to ignore case sensitivity during pattern matching. |
| `table_pattern.allow` | | | List of regex patterns for tables to include in ingestion. |
| `table_pattern.deny` | | | List of regex patterns for tables to exclude from ingestion. |
| `table_pattern.ignoreCase` | | `True` | Whether to ignore case sensitivity during pattern matching. |
| `underlying_platform` | | `glue` | Override for platform name. Allowed values - `glue`, `athena` |
| `ignore_unsupported_connectors` | | `True` | Whether to ignore unsupported connectors. If disabled, an error will be raised. |
| `emit_s3_lineage` | | `True` | Whether to emit S3-to-Glue lineage. |
| `glue_s3_lineage_direction` | | `upstream` | If `upstream`, S3 is upstream to Glue. If `downstream` S3 is downstream to Glue. |
| `extract_owners` | | `True` | When enabled, extracts ownership from Glue directly and overwrites existing owners. When disabled, ownership is left empty for datasets. |
| Field | Required | Default | Description |
|---------------------------------|----------|--------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `aws_region` | ✅ | | AWS region code. |
| `env` | | `"PROD"` | Environment to use in namespace when constructing URNs. |
| `aws_access_key_id` | | Autodetected | See https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html |
| `aws_secret_access_key` | | Autodetected | See https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html |
| `aws_session_token` | | Autodetected | See https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html |
| `aws_role` | | Autodetected | See https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html |
| `aws_profile` | | | Named AWS profile to use, if not set the default will be used |
| `extract_transforms` | | `True` | Whether to extract Glue transform jobs. |
| `database_pattern.allow` | | | List of regex patterns for databases to include in ingestion. |
| `database_pattern.deny` | | | List of regex patterns for databases to exclude from ingestion. |
| `database_pattern.ignoreCase` | | `True` | Whether to ignore case sensitivity during pattern matching. |
| `table_pattern.allow` | | | List of regex patterns for tables to include in ingestion. |
| `table_pattern.deny` | | | List of regex patterns for tables to exclude from ingestion. |
| `table_pattern.ignoreCase` | | `True` | Whether to ignore case sensitivity during pattern matching. |
| `underlying_platform` | | `glue` | Override for platform name. Allowed values - `glue`, `athena` |
| `ignore_unsupported_connectors` | | `True` | Whether to ignore unsupported connectors. If disabled, an error will be raised. |
| `emit_s3_lineage` | | `True` | Whether to emit S3-to-Glue lineage. |
| `glue_s3_lineage_direction` | | `upstream` | If `upstream`, S3 is upstream to Glue. If `downstream` S3 is downstream to Glue. |
| `extract_owners` | | `True` | When enabled, extracts ownership from Glue directly and overwrites existing owners. When disabled, ownership is left empty for datasets. |
| `domain.domain_key.allow` | | | List of regex patterns for tables to set domain_key domain key (domain_key can be any string like `sales`. There can be multiple domain key specified. |
| `domain.domain_key.deny` | | | List of regex patterns for tables to not assign domain_key. There can be multiple domain key specified. |
| `domain.domain_key.ignoreCase` | | `True` | Whether to ignore case sensitivity during pattern matching.There can be multiple domain key specified. |
## Compatibility

View File

@ -51,6 +51,7 @@ class AwsSourceConfig(ConfigModel):
aws_secret_access_key: Optional[str] = None
aws_session_token: Optional[str] = None
aws_role: Optional[Union[str, List[str]]] = None
aws_profile: Optional[str] = None
aws_region: str
def get_session(self) -> Session:
@ -89,7 +90,7 @@ class AwsSourceConfig(ConfigModel):
region_name=self.aws_region,
)
else:
return Session(region_name=self.aws_region)
return Session(region_name=self.aws_region, profile_name=self.aws_profile)
def get_s3_client(self) -> "S3Client":
return self.get_session().client("s3")

View File

@ -9,14 +9,22 @@ from urllib.parse import urlparse
import yaml
from pydantic import validator
from datahub.configuration.common import ConfigurationError
from datahub.configuration.common import AllowDenyPattern, ConfigurationError
from datahub.emitter import mce_builder
from datahub.emitter.mce_builder import make_dataset_urn, make_domain_urn
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.mcp_builder import (
DatabaseKey,
add_dataset_to_container,
add_domain_to_entity_wu,
gen_containers,
)
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.aws.aws_common import AwsSourceConfig
from datahub.ingestion.source.aws.s3_util import make_s3_urn
from datahub.ingestion.source.sql.sql_common import SqlContainerSubTypes
from datahub.metadata.com.linkedin.pegasus2avro.common import Status
from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import DatasetSnapshot
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
@ -54,6 +62,7 @@ class GlueSourceConfig(AwsSourceConfig):
ignore_unsupported_connectors: Optional[bool] = True
emit_s3_lineage: bool = False
glue_s3_lineage_direction: str = "upstream"
domain: Dict[str, AllowDenyPattern] = dict()
@property
def glue_client(self):
@ -523,8 +532,63 @@ class GlueSource(Source):
return mcp
return None
def get_workunits(self) -> Iterable[MetadataWorkUnit]:
def gen_database_key(self, database: str) -> DatabaseKey:
return DatabaseKey(
database=database,
platform=self.get_underlying_platform(),
instance=self.env,
)
def gen_database_containers(self, database: str) -> Iterable[MetadataWorkUnit]:
domain_urn = self._gen_domain_urn(database)
database_container_key = self.gen_database_key(database)
container_workunits = gen_containers(
container_key=database_container_key,
name=database,
sub_types=[SqlContainerSubTypes.DATABASE],
domain_urn=domain_urn,
)
for wu in container_workunits:
self.report.report_workunit(wu)
yield wu
def add_table_to_database_container(
self, dataset_urn: str, db_name: str
) -> Iterable[MetadataWorkUnit]:
database_container_key = self.gen_database_key(db_name)
container_workunits = add_dataset_to_container(
container_key=database_container_key,
dataset_urn=dataset_urn,
)
for wu in container_workunits:
self.report.report_workunit(wu)
yield wu
def _gen_domain_urn(self, dataset_name: str) -> Optional[str]:
for domain, pattern in self.source_config.domain.items():
if pattern.allowed(dataset_name):
return make_domain_urn(domain)
return None
def _get_domain_wu(
self, dataset_name: str, entity_urn: str, entity_type: str
) -> Iterable[MetadataWorkUnit]:
domain_urn = self._gen_domain_urn(dataset_name)
if domain_urn:
wus = add_domain_to_entity_wu(
entity_type=entity_type,
entity_urn=entity_urn,
domain_urn=domain_urn,
)
for wu in wus:
self.report.report_workunit(wu)
yield wu
def get_workunits(self) -> Iterable[MetadataWorkUnit]:
database_seen = set()
tables = self.get_all_tables()
for table in tables:
@ -537,11 +601,26 @@ class GlueSource(Source):
) or not self.source_config.table_pattern.allowed(full_table_name):
self.report.report_table_dropped(full_table_name)
continue
if database_name not in database_seen:
database_seen.add(database_name)
yield from self.gen_database_containers(database_name)
mce = self._extract_record(table, full_table_name)
workunit = MetadataWorkUnit(full_table_name, mce=mce)
self.report.report_workunit(workunit)
yield workunit
dataset_urn: str = make_dataset_urn(
self.get_underlying_platform(), full_table_name, self.env
)
yield from self._get_domain_wu(
dataset_name=full_table_name,
entity_urn=dataset_urn,
entity_type="dataset",
)
yield from self.add_table_to_database_container(
dataset_urn=dataset_urn, db_name=database_name
)
mcp = self.get_lineage_if_enabled(mce)
if mcp:
mcp_wu = MetadataWorkUnit(
@ -551,67 +630,62 @@ class GlueSource(Source):
yield mcp_wu
if self.extract_transforms:
yield from self._transform_extraction()
dags = {}
flow_names: Dict[str, str] = {}
def _transform_extraction(self) -> Iterable[MetadataWorkUnit]:
dags: Dict[str, Optional[Dict[str, Any]]] = {}
flow_names: Dict[str, str] = {}
for job in self.get_all_jobs():
for job in self.get_all_jobs():
flow_urn = mce_builder.make_data_flow_urn(
self.get_underlying_platform(), job["Name"], self.env
)
flow_wu = self.get_dataflow_wu(flow_urn, job)
self.report.report_workunit(flow_wu)
yield flow_wu
job_script_location = job.get("Command", {}).get("ScriptLocation")
dag: Optional[Dict[str, Any]] = None
if job_script_location is not None:
dag = self.get_dataflow_graph(job_script_location)
dags[flow_urn] = dag
flow_names[flow_urn] = job["Name"]
# run a first pass to pick up s3 bucket names and formats
# in Glue, it's possible for two buckets to have files of different extensions
# if this happens, we append the extension in the URN so the sources can be distinguished
# see process_dataflow_node() for details
s3_formats: typing.DefaultDict[str, Set[Union[str, None]]] = defaultdict(
lambda: set()
flow_urn = mce_builder.make_data_flow_urn(
self.get_underlying_platform(), job["Name"], self.env
)
for dag in dags.values():
if dag is not None:
for s3_name, extension in self.get_dataflow_s3_names(dag):
s3_formats[s3_name].add(extension)
flow_wu = self.get_dataflow_wu(flow_urn, job)
self.report.report_workunit(flow_wu)
yield flow_wu
# run second pass to generate node workunits
for flow_urn, dag in dags.items():
job_script_location = job.get("Command", {}).get("ScriptLocation")
if dag is None:
continue
dag: Optional[Dict[str, Any]] = None
nodes, new_dataset_ids, new_dataset_mces = self.process_dataflow_graph(
dag, flow_urn, s3_formats
)
if job_script_location is not None:
dag = self.get_dataflow_graph(job_script_location)
for node in nodes.values():
dags[flow_urn] = dag
flow_names[flow_urn] = job["Name"]
# run a first pass to pick up s3 bucket names and formats
# in Glue, it's possible for two buckets to have files of different extensions
# if this happens, we append the extension in the URN so the sources can be distinguished
# see process_dataflow_node() for details
s3_formats: typing.DefaultDict[str, Set[Optional[str]]] = defaultdict(
lambda: set()
)
for dag in dags.values():
if dag is not None:
for s3_name, extension in self.get_dataflow_s3_names(dag):
s3_formats[s3_name].add(extension)
# run second pass to generate node workunits
for flow_urn, dag in dags.items():
if node["NodeType"] not in ["DataSource", "DataSink"]:
job_wu = self.get_datajob_wu(node, flow_names[flow_urn])
self.report.report_workunit(job_wu)
yield job_wu
if dag is None:
continue
for dataset_id, dataset_mce in zip(new_dataset_ids, new_dataset_mces):
nodes, new_dataset_ids, new_dataset_mces = self.process_dataflow_graph(
dag, flow_urn, s3_formats
)
dataset_wu = MetadataWorkUnit(id=dataset_id, mce=dataset_mce)
self.report.report_workunit(dataset_wu)
yield dataset_wu
for node in nodes.values():
if node["NodeType"] not in ["DataSource", "DataSink"]:
job_wu = self.get_datajob_wu(node, flow_names[flow_urn])
self.report.report_workunit(job_wu)
yield job_wu
for dataset_id, dataset_mce in zip(new_dataset_ids, new_dataset_mces):
dataset_wu = MetadataWorkUnit(id=dataset_id, mce=dataset_mce)
self.report.report_workunit(dataset_wu)
yield dataset_wu
def _extract_record(self, table: Dict, table_name: str) -> MetadataChangeEvent:
def get_owner() -> Optional[OwnershipClass]:
@ -676,7 +750,7 @@ class GlueSource(Source):
)
dataset_snapshot = DatasetSnapshot(
urn=f"urn:li:dataset:(urn:li:dataPlatform:{self.get_underlying_platform()},{table_name},{self.env})",
urn=make_dataset_urn(self.get_underlying_platform(), table_name, self.env),
aspects=[],
)

View File

@ -1,4 +1,43 @@
[
{
"auditHeader": null,
"entityType": "container",
"entityUrn": "urn:li:container:0b9f1f731ecf6743be6207fec3dc9cba",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "containerProperties",
"aspect": {
"value": "{\"customProperties\": {\"platform\": \"glue\", \"instance\": \"PROD\", \"database\": \"flights-database\"}, \"name\": \"flights-database\"}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "container",
"entityUrn": "urn:li:container:0b9f1f731ecf6743be6207fec3dc9cba",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"value": "{\"platform\": \"urn:li:dataPlatform:glue\"}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "container",
"entityUrn": "urn:li:container:0b9f1f731ecf6743be6207fec3dc9cba",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
"value": "{\"typeNames\": [\"Database\"]}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"proposedSnapshot": {
@ -229,6 +268,58 @@
"proposedDelta": null,
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:glue,flights-database.avro,PROD)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "container",
"aspect": {
"value": "{\"container\": \"urn:li:container:0b9f1f731ecf6743be6207fec3dc9cba\"}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "container",
"entityUrn": "urn:li:container:bdf4342ea6899d162eae685bfe9074a7",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "containerProperties",
"aspect": {
"value": "{\"customProperties\": {\"platform\": \"glue\", \"instance\": \"PROD\", \"database\": \"test-database\"}, \"name\": \"test-database\"}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "container",
"entityUrn": "urn:li:container:bdf4342ea6899d162eae685bfe9074a7",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"value": "{\"platform\": \"urn:li:dataPlatform:glue\"}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "container",
"entityUrn": "urn:li:container:bdf4342ea6899d162eae685bfe9074a7",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
"value": "{\"typeNames\": [\"Database\"]}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"proposedSnapshot": {
@ -396,6 +487,19 @@
"proposedDelta": null,
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:glue,test-database.test_jsons_markers,PROD)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "container",
"aspect": {
"value": "{\"container\": \"urn:li:container:bdf4342ea6899d162eae685bfe9074a7\"}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"proposedSnapshot": {
@ -574,6 +678,19 @@
"proposedDelta": null,
"systemMetadata": null
},
{
"auditHeader": null,
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:glue,test-database.test_parquet,PROD)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "container",
"aspect": {
"value": "{\"container\": \"urn:li:container:bdf4342ea6899d162eae685bfe9074a7\"}",
"contentType": "application/json"
},
"systemMetadata": null
},
{
"auditHeader": null,
"proposedSnapshot": {