fix(ingest): oracle - support large view definitions (#4027)

This commit is contained in:
Harshal Sheth 2022-02-10 02:18:19 -05:00 committed by GitHub
parent 54eb155f3e
commit 076848ff55
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -1,10 +1,12 @@
from typing import Optional
from typing import Iterable, Optional
from unittest.mock import patch
# This import verifies that the dependencies are available.
import cx_Oracle # noqa: F401
import pydantic
from sqlalchemy import event
from sqlalchemy.dialects.oracle.base import OracleDialect
from sqlalchemy.engine.reflection import Inspector
from datahub.ingestion.source.sql.sql_common import (
BasicSQLAlchemyConfig,
@ -21,6 +23,19 @@ extra_oracle_types = {
assert OracleDialect.ischema_names
def output_type_handler(cursor, name, defaultType, size, precision, scale):
"""Add CLOB and BLOB support to Oracle connection."""
if defaultType == cx_Oracle.CLOB:
return cursor.var(cx_Oracle.LONG_STRING, arraysize=cursor.arraysize)
elif defaultType == cx_Oracle.BLOB:
return cursor.var(cx_Oracle.LONG_BINARY, arraysize=cursor.arraysize)
def before_cursor_execute(conn, cursor, statement, parameters, context, executemany):
cursor.outputtypehandler = output_type_handler
class OracleConfig(BasicSQLAlchemyConfig):
# defaults
scheme = "oracle+cx_oracle"
@ -52,6 +67,13 @@ class OracleSource(SQLAlchemySource):
config = OracleConfig.parse_obj(config_dict)
return cls(config, ctx)
def get_inspectors(self) -> Iterable[Inspector]:
for inspector in super().get_inspectors():
event.listen(
inspector.engine, "before_cursor_execute", before_cursor_execute
)
yield inspector
def get_workunits(self):
with patch.dict(
"sqlalchemy.dialects.oracle.base.OracleDialect.ischema_names",