feat(ingest): adding support for AWS Glue (#2319)

Co-authored-by: Harry Nash <harrywilliamnash@gmail.com>
This commit is contained in:
amy m 2021-04-04 19:00:27 +01:00 committed by GitHub
parent 3fb71acf71
commit 759288161c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 486 additions and 0 deletions

View File

@ -89,6 +89,7 @@ We use a plugin architecture so that you can install only the dependencies you a
| console | _included by default_ | Console sink |
| athena | `pip install -e '.[athena]'` | AWS Athena source |
| bigquery | `pip install -e '.[bigquery]'` | BigQuery source |
| glue | `pip install -e '.[glue]'` | AWS Glue source |
| hive | `pip install -e '.[hive]'` | Hive source |
| mssql | `pip install -e '.[mssql]'` | SQL Server source |
| mysql | `pip install -e '.[mysql]'` | MySQL source |
@ -355,6 +356,24 @@ source:
# table_pattern/schema_pattern is same as above
```
### AWS Glue `glue`
Extracts:
- List of tables
- Column types associated with each table
- Table metadata, such as owner, description and parameters
```yml
source:
type: glue
config:
aws_region: aws_region_name # i.e. "eu-west-1"
env: environment used for the DatasetSnapshot URN, one of "DEV", "EI", "PROD" or "CORP". # Optional, defaults to "PROD".
databases: list of databases to process. # Optional, if not specified then all databases will be processed.
aws_access_key_id # Optional. If not specified, credentials are picked up according to boto3 rules.
# See https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html
aws_secret_access_key # Optional.
aws_session_token # Optional.
```
### Druid `druid`
Extracts:

View File

@ -0,0 +1,9 @@
source:
type: glue
config:
aws_region: "us-east-1"
sink:
type: "datahub-rest"
config:
server: 'http://localhost:8080'

View File

@ -82,6 +82,7 @@ plugins: Dict[str, Set[str]] = {
# Sink plugins.
"datahub-kafka": kafka_common,
"datahub-rest": {"requests>=2.25.1"},
"glue": {"boto3"},
}
dev_requirements = {
@ -97,6 +98,8 @@ dev_requirements = {
"pytest-docker",
"sqlalchemy-stubs",
"deepdiff",
"freezegun",
"botocore",
# Also add the plugins which are used for tests.
*list(
dependency
@ -108,6 +111,7 @@ dev_requirements = {
"ldap",
"datahub-kafka",
"datahub-rest",
"glue",
]
for dependency in plugins[plugin]
),
@ -159,6 +163,7 @@ setuptools.setup(
"bigquery = datahub.ingestion.source.bigquery:BigQuerySource",
"dbt = datahub.ingestion.source.dbt:DBTSource",
"druid = datahub.ingestion.source.druid:DruidSource",
"glue = datahub.ingestion.source.glue:GlueSource",
"hive = datahub.ingestion.source.hive:HiveSource",
"kafka = datahub.ingestion.source.kafka:KafkaSource",
"ldap = datahub.ingestion.source.ldap:LDAPSource",

View File

@ -0,0 +1,268 @@
import time
from dataclasses import dataclass
from typing import Dict, Iterable, List, Optional
import boto3
from datahub.configuration import ConfigModel
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.source.metadata_common import MetadataWorkUnit
from datahub.metadata.com.linkedin.pegasus2avro.common import AuditStamp, Status
from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import DatasetSnapshot
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
from datahub.metadata.com.linkedin.pegasus2avro.schema import (
ArrayTypeClass,
BooleanTypeClass,
BytesTypeClass,
DateTypeClass,
MySqlDDL,
NullTypeClass,
NumberTypeClass,
SchemaField,
SchemaFieldDataType,
SchemaMetadata,
StringTypeClass,
TimeTypeClass,
UnionTypeClass,
)
from datahub.metadata.schema_classes import (
AuditStampClass,
DatasetPropertiesClass,
MapTypeClass,
OwnerClass,
OwnershipClass,
OwnershipTypeClass,
)
class GlueSourceConfig(ConfigModel):
env: str = "PROD"
databases: Optional[List[str]] = None
aws_access_key_id: Optional[str] = None
aws_secret_access_key: Optional[str] = None
aws_session_token: Optional[str] = None
aws_region: str
@property
def glue_client(self):
if (
self.aws_access_key_id
and self.aws_secret_access_key
and self.aws_session_token
):
return boto3.client(
"glue",
aws_access_key_id=self.aws_access_key_id,
aws_secret_access_key=self.aws_secret_access_key,
aws_session_token=self.aws_session_token,
region_name=self.aws_region,
)
elif self.aws_access_key_id and self.aws_secret_access_key:
return boto3.client(
"glue",
aws_access_key_id=self.aws_access_key_id,
aws_secret_access_key=self.aws_secret_access_key,
region_name=self.aws_region,
)
else:
return boto3.client("glue", region_name=self.aws_region)
@dataclass
class GlueSourceReport(SourceReport):
tables_scanned = 0
def report_table_scanned(self) -> None:
self.tables_scanned += 1
class GlueSource(Source):
source_config: GlueSourceConfig
report = GlueSourceReport()
def __init__(self, config: GlueSourceConfig, ctx: PipelineContext):
super().__init__(ctx)
self.source_config = config
self.report = GlueSourceReport()
self.glue_client = config.glue_client
self.env = config.env
self.databases = config.databases
@classmethod
def create(cls, config_dict, ctx):
config = GlueSourceConfig.parse_obj(config_dict)
return cls(config, ctx)
def get_workunits(self) -> Iterable[MetadataWorkUnit]:
def get_all_tables(database_names: Optional[List[str]]):
def get_tables_from_database(database_name: str, tables: List):
kwargs = {"DatabaseName": database_name}
while True:
data = self.glue_client.get_tables(**kwargs)
tables += data["TableList"]
if "NextToken" in data:
kwargs["NextToken"] = data["NextToken"]
else:
break
return tables
def get_tables_from_all_databases():
tables = []
kwargs: Dict = {}
while True:
data = self.glue_client.search_tables(**kwargs)
tables += data["TableList"]
if "NextToken" in data:
kwargs["NextToken"] = data["NextToken"]
else:
break
return tables
if database_names:
all_tables: List = []
for database in database_names:
all_tables += get_tables_from_database(database, all_tables)
else:
all_tables = get_tables_from_all_databases()
return all_tables
tables = get_all_tables(self.databases)
for table in tables:
table_name = table["Name"]
database_name = table["DatabaseName"]
full_table_name = f"{database_name}.{table_name}"
self.report.report_table_scanned()
mce = self._extract_record(table, full_table_name)
workunit = MetadataWorkUnit(id=f"glue-{full_table_name}", mce=mce)
self.report.report_workunit(workunit)
yield workunit
def _extract_record(self, table: Dict, table_name: str) -> MetadataChangeEvent:
def get_owner(time) -> OwnershipClass:
owner = table.get("Owner")
if owner:
owners = [
OwnerClass(
owner=f"urn:li:corpuser:{owner}",
type=OwnershipTypeClass.DATAOWNER,
)
]
else:
owners = []
return OwnershipClass(
owners=owners,
lastModified=AuditStampClass(
time=time,
actor="urn:li:corpuser:datahub",
),
)
def get_dataset_properties() -> DatasetPropertiesClass:
return DatasetPropertiesClass(
description=table.get("Description"),
customProperties={
**table.get("Parameters", {}),
**{
k: str(v)
for k, v in table["StorageDescriptor"].items()
if k not in ["Columns", "Parameters"]
},
},
uri=table.get("Location"),
tags=[],
)
def get_schema_metadata(glue_source: GlueSource):
schema = table["StorageDescriptor"]["Columns"]
fields: List[SchemaField] = []
for field in schema:
schema_field = SchemaField(
fieldPath=field["Name"],
nativeDataType=field["Type"],
type=get_column_type(
glue_source, field["Type"], table_name, field["Name"]
),
description=field.get("Comment"),
recursive=False,
nullable=True,
)
fields.append(schema_field)
return SchemaMetadata(
schemaName=table_name,
version=0,
fields=fields,
platform="urn:li:dataPlatform:glue",
created=AuditStamp(time=sys_time, actor="urn:li:corpuser:etl"),
lastModified=AuditStamp(time=sys_time, actor="urn:li:corpuser:etl"),
hash="",
platformSchema=MySqlDDL(tableSchema=""),
)
sys_time = int(time.time() * 1000)
metadata_record = MetadataChangeEvent()
dataset_snapshot = DatasetSnapshot(
urn=f"urn:li:dataset:(urn:li:dataPlatform:glue,{table_name},{self.env})",
aspects=[],
)
dataset_snapshot.aspects.append(Status(removed=False))
dataset_snapshot.aspects.append(get_owner(sys_time))
dataset_snapshot.aspects.append(get_dataset_properties())
dataset_snapshot.aspects.append(get_schema_metadata(self))
metadata_record.proposedSnapshot = dataset_snapshot
return metadata_record
def get_report(self):
return self.report
def get_column_type(
glue_source: GlueSource, field_type: str, table_name: str, field_name: str
) -> SchemaFieldDataType:
field_type_mapping = {
"array": ArrayTypeClass,
"bigint": NumberTypeClass,
"binary": BytesTypeClass,
"boolean": BooleanTypeClass,
"char": StringTypeClass,
"date": DateTypeClass,
"decimal": NumberTypeClass,
"double": NumberTypeClass,
"float": NumberTypeClass,
"int": NumberTypeClass,
"integer": NumberTypeClass,
"interval": TimeTypeClass,
"long": NumberTypeClass,
"map": MapTypeClass,
"null": NullTypeClass,
"set": ArrayTypeClass,
"smallint": NumberTypeClass,
"string": StringTypeClass,
"struct": MapTypeClass,
"timestamp": TimeTypeClass,
"tinyint": NumberTypeClass,
"union": UnionTypeClass,
"varchar": StringTypeClass,
}
if field_type in field_type_mapping.keys():
type_class = field_type_mapping[field_type]
elif field_type.startswith("array"):
type_class = ArrayTypeClass
elif field_type.startswith("map") or field_type.startswith("struct"):
type_class = MapTypeClass
elif field_type.startswith("set"):
type_class = ArrayTypeClass
else:
glue_source.report.report_warning(
field_type,
f"The type '{field_type}' is not recognised for field '{field_name}' in table '{table_name}', setting as StringTypeClass.",
)
type_class = StringTypeClass
data_type = SchemaFieldDataType(type=type_class())
return data_type

View File

@ -0,0 +1,185 @@
import unittest
from datetime import datetime
from botocore.stub import Stubber
from freezegun import freeze_time
from datahub.ingestion.source.glue import GlueSource, GlueSourceConfig, get_column_type
from datahub.ingestion.source.metadata_common import MetadataWorkUnit
from datahub.metadata.com.linkedin.pegasus2avro.common import AuditStamp, Status
from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import DatasetSnapshot
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
from datahub.metadata.com.linkedin.pegasus2avro.schema import (
ArrayTypeClass,
MapTypeClass,
MySqlDDL,
NumberTypeClass,
SchemaField,
SchemaFieldDataType,
SchemaMetadata,
StringTypeClass,
)
from datahub.metadata.schema_classes import (
AuditStampClass,
DatasetPropertiesClass,
OwnerClass,
OwnershipClass,
OwnershipTypeClass,
)
FROZEN_TIME = "2020-04-14 07:00:00"
class GlueSourceTest(unittest.TestCase):
glue_source = GlueSource(ctx=None, config=GlueSourceConfig(aws_region="us-east-1"))
def test_get_column_type_contains_key(self):
field_type = "char"
data_type = get_column_type(self.glue_source, field_type, "a_table", "a_field")
self.assertEqual(
data_type.to_obj(), SchemaFieldDataType(type=StringTypeClass()).to_obj()
)
def test_get_column_type_contains_array(self):
field_type = "array_lol"
data_type = get_column_type(self.glue_source, field_type, "a_table", "a_field")
self.assertEqual(
data_type.to_obj(), SchemaFieldDataType(type=ArrayTypeClass()).to_obj()
)
def test_get_column_type_contains_map(self):
field_type = "map_hehe"
data_type = get_column_type(self.glue_source, field_type, "a_table", "a_field")
self.assertEqual(
data_type.to_obj(), SchemaFieldDataType(type=MapTypeClass()).to_obj()
)
def test_get_column_type_contains_set(self):
field_type = "set_yolo"
data_type = get_column_type(self.glue_source, field_type, "a_table", "a_field")
self.assertEqual(
data_type.to_obj(), SchemaFieldDataType(type=ArrayTypeClass()).to_obj()
)
def test_get_column_type_not_contained(self):
field_type = "bad_column_type"
data_type = get_column_type(self.glue_source, field_type, "a_table", "a_field")
self.assertEqual(
data_type.to_obj(), SchemaFieldDataType(type=StringTypeClass()).to_obj()
)
self.assertEqual(
self.glue_source.report.warnings["bad_column_type"],
[
"The type 'bad_column_type' is not recognised for field 'a_field' in table 'a_table', "
"setting as StringTypeClass."
],
)
@freeze_time(FROZEN_TIME)
def test_turn_boto_glue_data_to_metadata_event(self):
stringy_timestamp = datetime.strptime(FROZEN_TIME, "%Y-%m-%d %H:%M:%S")
timestamp = int(datetime.timestamp(stringy_timestamp) * 1000)
response = {
"TableList": [
{
"Name": "Barbeque",
"Owner": "Susan",
"DatabaseName": "datalake_grilled",
"Description": "Grilled Food",
"StorageDescriptor": {
"Columns": [
{
"Name": "Size",
"Type": "int",
"Comment": "Maximum attendees permitted",
}
]
},
}
]
}
def flatten(d):
out = {}
for key, val in d.items():
if isinstance(val, dict):
val = [val]
if isinstance(val, list):
for subdict in val:
deeper = flatten(subdict).items()
out.update({key + "_" + key2: val2 for key2, val2 in deeper})
else:
out[key] = val
return out
with Stubber(self.glue_source.glue_client) as stubber:
stubber.add_response("search_tables", response, {})
actual_work_unit = next(self.glue_source.get_workunits())
expected_metadata_work_unit = create_metadata_work_unit(timestamp)
self.assertTrue(
sorted(flatten(vars(expected_metadata_work_unit)))
== sorted(flatten(vars(actual_work_unit)))
)
def create_metadata_work_unit(timestamp):
mce = MetadataChangeEvent()
dataset_snapshot = DatasetSnapshot(
urn="urn:li:dataset:(urn:li:dataPlatform:glue,datalake_grilled.Barbeque,PROD)",
aspects=[],
)
dataset_snapshot.aspects.append(
OwnershipClass(
owners=[
OwnerClass(
owner="urn:li:corpuser:Susan", type=OwnershipTypeClass.DATAOWNER
)
],
lastModified=AuditStampClass(
time=timestamp, actor="urn:li:corpuser:datahub"
),
)
)
dataset_snapshot.aspects.append(
DatasetPropertiesClass(
description="Grilled Food",
customProperties={},
uri=None,
tags=[],
)
)
dataset_snapshot.aspects.append(Status(removed=False))
mce.proposedSnapshot = dataset_snapshot
fields = [
SchemaField(
fieldPath="Size",
nativeDataType="int",
type=SchemaFieldDataType(type=NumberTypeClass()),
description="Maximum attendees permitted",
nullable=True,
)
]
schema_metadata = SchemaMetadata(
schemaName="datalake_grilled.Barbeque",
version=0,
fields=fields,
platform="urn:li:dataPlatform:glue",
created=AuditStamp(time=timestamp, actor="urn:li:corpuser:etl"),
lastModified=AuditStamp(time=timestamp, actor="urn:li:corpuser:etl"),
hash="",
platformSchema=MySqlDDL(tableSchema=""),
)
dataset_snapshot.aspects.append(schema_metadata)
return MetadataWorkUnit(id="glue-datalake_grilled.Barbeque", mce=mce)