diff --git a/metadata-ingestion/README.md b/metadata-ingestion/README.md index e829e41982..71865587ba 100644 --- a/metadata-ingestion/README.md +++ b/metadata-ingestion/README.md @@ -387,6 +387,8 @@ Extracts: - List of databases, schema, and tables - Column types associated with each table +Using the Oracle source requires that you've also installed the correct drivers; see the [cx_Oracle docs](https://cx-oracle.readthedocs.io/en/latest/user_guide/installation.html). The easiest one is the [Oracle Instant Client](https://www.oracle.com/database/technologies/instant-client.html). + ```yml source: type: oracle @@ -398,6 +400,7 @@ source: password: pass host_port: localhost:5432 database: dbname + service_name: svc # omit database if using this option # table_pattern/schema_pattern is same as above # options is same as above ``` @@ -730,6 +733,8 @@ sink: connection: bootstrap: "localhost:9092" producer_config: {} # passed to https://docs.confluent.io/platform/current/clients/confluent-kafka-python/index.html#serializingproducer + schema_registry_url: "http://localhost:8081" + schema_registry_config: {} # passed to https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#confluent_kafka.schema_registry.SchemaRegistryClient ``` ### Console `console` diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 4f229bedd7..10d9d442c5 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -144,6 +144,7 @@ base_dev_requirements = { "looker", "glue", "hive", + "oracle", "datahub-kafka", "datahub-rest", # airflow is added below diff --git a/metadata-ingestion/src/datahub/ingestion/source/oracle.py b/metadata-ingestion/src/datahub/ingestion/source/oracle.py index 0d493e51f7..497330d6dd 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/oracle.py +++ b/metadata-ingestion/src/datahub/ingestion/source/oracle.py @@ -1,5 +1,8 @@ +from typing import Optional + # This import verifies that the dependencies are available. import cx_Oracle # noqa: F401 +import pydantic from .sql_common import BasicSQLAlchemyConfig, SQLAlchemySource @@ -8,6 +11,23 @@ class OracleConfig(BasicSQLAlchemyConfig): # defaults scheme = "oracle+cx_oracle" + service_name: Optional[str] + + @pydantic.validator("service_name") + def check_service_name(cls, v, values): + if values.get("database") and v: + raise ValueError( + "specify one of 'database' and 'service_name', but not both" + ) + return v + + def get_sql_alchemy_url(self): + url = super().get_sql_alchemy_url() + if self.service_name: + assert not self.database + url = f"{url}/?service_name={self.service_name}" + return url + class OracleSource(SQLAlchemySource): def __init__(self, config, ctx): diff --git a/metadata-ingestion/tests/unit/test_oracle_source.py b/metadata-ingestion/tests/unit/test_oracle_source.py new file mode 100644 index 0000000000..22e94c7e92 --- /dev/null +++ b/metadata-ingestion/tests/unit/test_oracle_source.py @@ -0,0 +1,31 @@ +import pytest + +from datahub.ingestion.source.oracle import OracleConfig + + +def test_oracle_config(): + base_config = { + "username": "user", + "password": "password", + "host_port": "host:1521", + } + + config = OracleConfig.parse_obj( + { + **base_config, + "service_name": "svc01", + } + ) + assert ( + config.get_sql_alchemy_url() + == "oracle+cx_oracle://user:password@host:1521/?service_name=svc01" + ) + + with pytest.raises(ValueError): + config = OracleConfig.parse_obj( + { + **base_config, + "database": "db", + "service_name": "svc01", + } + )