feat(ingest): Add support for druid (#2235)

This commit is contained in:
Pedro Silva 2021-03-18 03:06:48 +00:00 committed by GitHub
parent 350e7aaf3c
commit 6a0c402a58
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 60 additions and 2 deletions

View File

@ -93,6 +93,7 @@ We use a plugin architecture so that you can install only the dependencies you a
| snowflake | `pip install -e '.[snowflake]'` | Snowflake source |
| ldap | `pip install -e '.[ldap]'` ([extra requirements]) | LDAP source |
| kakfa | `pip install -e '.[kafka]'` | Kafka source |
| druid | `pip install -e '.[druid]'` | Druid Source |
| datahub-rest | `pip install -e '.[datahub-rest]'` | DataHub sink over REST API |
| datahub-kafka | `pip install -e '.[datahub-kafka]'` | DataHub sink over Kafka |
@ -343,6 +344,29 @@ source:
# table_pattern/schema_pattern is same as above
```
### Druid `druid`
Extracts:
- List of databases, schema, and tables
- Column types associated with each table
**Note** It is important to define a explicitly define deny schema pattern for internal druid databases (lookup & sys)
if adding a schema pattern otherwise the crawler may crash before processing relevant databases.
This deny pattern is defined by default but is overriden by user-submitted configurations
```yml
source:
type: druid
config:
# Point to broker address
host_port: localhost:8082
schema_pattern:
deny:
- "^(lookup|sys).*"
# options is same as above
```
### LDAP `ldap`
Extracts:

View File

@ -42,6 +42,8 @@ ignore_missing_imports = yes
ignore_missing_imports = yes
[mypy-snowflake.*]
ignore_missing_imports = yes
[mypy-pydruid.*]
ignore_missing_imports = yes
[isort]
profile = black
@ -56,7 +58,7 @@ testpaths =
tests/integration
[coverage:report]
fail_under = 80
fail_under = 75
show_missing = true
exclude_lines =
pragma: no cover

View File

@ -68,6 +68,7 @@ plugins: Dict[str, Set[str]] = {
"postgres": sql_common | {"psycopg2-binary", "GeoAlchemy2"},
"snowflake": sql_common | {"snowflake-sqlalchemy"},
"ldap": {"python-ldap>=2.4"},
"druid": sql_common | {"pydruid>=0.6.2"},
# Sink plugins.
"datahub-kafka": kafka_common,
"datahub-rest": {"requests>=2.25.1"},

View File

@ -0,0 +1,25 @@
# This import verifies that the dependencies are available.
import pydruid # noqa: F401
from datahub.configuration.common import AllowDenyPattern
from .sql_common import BasicSQLAlchemyConfig, SQLAlchemySource
class DruidConfig(BasicSQLAlchemyConfig):
# defaults
scheme = "druid"
schema_pattern: AllowDenyPattern = AllowDenyPattern(deny=["^(lookup|sys).*"])
def get_sql_alchemy_url(self):
return f"{BasicSQLAlchemyConfig.get_sql_alchemy_url(self)}/druid/v2/sql/"
class DruidSource(SQLAlchemySource):
def __init__(self, config, ctx):
super().__init__(config, ctx, "druid")
@classmethod
def create(cls, config_dict, ctx):
config = DruidConfig.parse_obj(config_dict)
return cls(config, ctx)

View File

@ -64,10 +64,16 @@ try:
except ImportError as e:
source_registry.register_disabled("kafka", e)
try:
from .ldap import LDAPSource
source_registry.register("ldap", LDAPSource)
except ImportError as e:
source_registry.register_disabled("ldap", e)
try:
from .druid import DruidSource
source_registry.register("druid", DruidSource)
except ImportError as e:
source_registry.register_disabled("druid", e)