feat(ingest): add aws athena ingestion source (#2213)

Co-authored-by: thomas.larsson <thomas.larsson@klarna.com>
This commit is contained in:
Thomas Larsson 2021-03-11 19:12:27 +01:00 committed by GitHub
parent 94e485e4cb
commit 2b470c2d85
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 67 additions and 0 deletions

View File

@ -295,6 +295,29 @@ source:
# table_pattern/schema_pattern is same as above
```
### AWS Athena `athena`
Extracts:
- List of databases and tables
- Column types associated with each table
Extra requirements: `pip install PyAthena[SQLAlchemy]`
```yml
source:
type: athena
config:
username: aws_access_key_id # Optional. If not specified, credentials are picked up according to boto3 rules
# See https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html
password: aws_secret_access_key # Optional.
database: database # Optional, defaults to "default"
aws_region: aws_region_name # i.e. "eu-west-1"
s3_output_location: s3_location # "s3://<bucket-name>/prefix/"
work_group: athena_workgroup # "primary"
# table_pattern/schema_pattern is same as above
```
### LDAP `ldap`
Extracts:

View File

@ -5,4 +5,5 @@ psycopg2-binary # Driver for Postgres
snowflake-sqlalchemy # Driver for Snowflake
pybigquery # Driver for BigQuery
python-ldap>=2.4 # LDAP client library
PyAthena[SQLAlchemy]<2.0.0 # Driver for Aws Athena

View File

@ -15,6 +15,7 @@ logger = logging.getLogger(__name__)
# Set to debug on the root logger.
logging.getLogger(None).setLevel(logging.DEBUG)
logging.getLogger("urllib3").setLevel(logging.WARN)
logging.getLogger("botocore").setLevel(logging.INFO)
# Configure logger.
BASE_LOGGING_FORMAT = (

View File

@ -0,0 +1,40 @@
from typing import Optional
from urllib.parse import quote_plus
from .sql_common import SQLAlchemyConfig, SQLAlchemySource
class AthenaConfig(SQLAlchemyConfig):
scheme: str = "awsathena+rest"
username: Optional[str] = None
password: Optional[str] = None
database: Optional[str] = None
aws_region: str
s3_output_location: str
work_group: str
def get_sql_alchemy_url(self):
url = f"{self.scheme}://"
if self.username:
url += f"{quote_plus(self.username)}"
if self.password:
url += f":{quote_plus(self.password)}"
else:
url += ":"
url += f"@athena.{self.aws_region}.amazonaws.com:443/"
if self.database:
url += f"{self.database}"
url += f"?s3_staging_dir={quote_plus(self.s3_output_location)}"
url += f"&work_group={self.work_group}"
return url
class AthenaSource(SQLAlchemySource):
def __init__(self, config, ctx):
super().__init__(config, ctx, "athena")
@classmethod
def create(cls, config_dict, ctx):
config = AthenaConfig.parse_obj(config_dict)
return cls(config, ctx)

View File

@ -1,6 +1,7 @@
from datahub.ingestion.api.registry import Registry
from datahub.ingestion.api.source import Source
from .athena import AthenaSource
from .bigquery import BigQuerySource
from .hive import HiveSource
from .kafka import KafkaSource
@ -21,6 +22,7 @@ source_registry.register("postgres", PostgresSource)
source_registry.register("snowflake", SnowflakeSource)
source_registry.register("bigquery", BigQuerySource)
source_registry.register("kafka", KafkaSource)
source_registry.register("athena", AthenaSource)
# Attempt to enable the LDAP source. Because it has some imports that we don't
# want to install by default, we instead use this approach.