feat(ingest/vertica): use 3 part naming (#10636)

This commit is contained in:
Rajasekhar-Vuppala 2024-06-15 05:34:55 +05:30 committed by GitHub
parent edb9cf61f7
commit a2d8a099d8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 6172 additions and 6286 deletions

Binary file not shown.

Before

Width:  |  Height:  |  Size: 25 KiB

View File

@ -426,7 +426,7 @@ plugins: Dict[str, Set[str]] = {
"nifi": {"requests", "packaging", "requests-gssapi"},
"powerbi": microsoft_common | {"lark[regex]==1.1.4", "sqlparse"} | sqlglot_lib,
"powerbi-report-server": powerbi_report_server,
"vertica": sql_common | {"vertica-sqlalchemy-dialect[vertica-python]==0.0.8.1"},
"vertica": sql_common | {"vertica-sqlalchemy-dialect[vertica-python]==0.0.8.2"},
"unity-catalog": databricks | sql_common | sqllineage_lib,
# databricks is alias for unity-catalog and needs to be kept in sync
"databricks": databricks | sql_common | sqllineage_lib,

View File

@ -133,17 +133,8 @@ class VerticaSource(SQLAlchemySource):
return cls(config, ctx)
def get_workunits_internal(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit]]:
yield from super().get_workunits_internal()
sql_config = self.config
if logger.isEnabledFor(logging.DEBUG):
# If debug logging is enabled, we also want to echo each SQL query issued.
sql_config.options.setdefault("echo", True)
# Extra default SQLAlchemy option for better connection pooling and threading.
# https://docs.sqlalchemy.org/en/14/core/pooling.html#sqlalchemy.pool.QueuePool.params.max_overflow
if sql_config.is_profiling_enabled():
sql_config.options.setdefault(
"max_overflow", sql_config.profiling.max_workers
)
for inspector in self.get_inspectors():
profiler = None
@ -170,11 +161,6 @@ class VerticaSource(SQLAlchemySource):
),
)
if sql_config.include_tables:
yield from self.loop_tables(inspector, schema, sql_config)
if sql_config.include_views:
yield from self.loop_views(inspector, schema, sql_config)
if sql_config.include_projections:
yield from self.loop_projections(inspector, schema, sql_config)
if sql_config.include_models:
@ -190,6 +176,15 @@ class VerticaSource(SQLAlchemySource):
profile_requests, profiler, platform=self.platform
)
def get_identifier(
self, *, schema: str, entity: str, inspector: VerticaInspector, **kwargs: Any
) -> str:
regular = f"{schema}.{entity}"
if self.config.database:
return f"{self.config.database}.{regular}"
current_database = self.get_db_name(inspector)
return f"{current_database}.{regular}"
def get_database_properties(
self, inspector: VerticaInspector, database: str
) -> Optional[Dict[str, str]]:

View File

@ -11,6 +11,7 @@ source:
include_models: true
include_view_lineage: true
include_projection_lineage: true
include_view_column_lineage: true
sink: