mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-11-05 21:13:14 +00:00
MINOR: Snowflake fix lineage & usage muti db query (#14866)
This commit is contained in:
parent
1552aeb2de
commit
20605cf7c7
@ -24,7 +24,6 @@ from metadata.generated.schema.metadataIngestion.workflow import (
|
|||||||
from metadata.generated.schema.type.tableQuery import TableQuery
|
from metadata.generated.schema.type.tableQuery import TableQuery
|
||||||
from metadata.ingestion.api.steps import InvalidSourceException
|
from metadata.ingestion.api.steps import InvalidSourceException
|
||||||
from metadata.ingestion.ometa.ometa_api import OpenMetadata
|
from metadata.ingestion.ometa.ometa_api import OpenMetadata
|
||||||
from metadata.ingestion.source.connections import get_connection
|
|
||||||
from metadata.ingestion.source.database.query_parser_source import QueryParserSource
|
from metadata.ingestion.source.database.query_parser_source import QueryParserSource
|
||||||
from metadata.ingestion.source.database.snowflake.queries import (
|
from metadata.ingestion.source.database.snowflake.queries import (
|
||||||
SNOWFLAKE_SESSION_TAG_QUERY,
|
SNOWFLAKE_SESSION_TAG_QUERY,
|
||||||
@ -89,24 +88,8 @@ class SnowflakeQueryParserSource(QueryParserSource, ABC):
|
|||||||
)
|
)
|
||||||
|
|
||||||
def get_table_query(self) -> Iterable[TableQuery]:
|
def get_table_query(self) -> Iterable[TableQuery]:
|
||||||
database = self.config.serviceConnection.__root__.config.database
|
self.set_session_query_tag()
|
||||||
if database:
|
yield from super().get_table_query()
|
||||||
use_db_query = f"USE DATABASE {database}"
|
|
||||||
self.engine.execute(use_db_query)
|
|
||||||
self.set_session_query_tag()
|
|
||||||
yield from super().get_table_query()
|
|
||||||
else:
|
|
||||||
query = "SHOW DATABASES"
|
|
||||||
results = self.engine.execute(query)
|
|
||||||
for res in results:
|
|
||||||
row = list(res)
|
|
||||||
use_db_query = f"USE DATABASE {row[1]}"
|
|
||||||
self.engine.execute(use_db_query)
|
|
||||||
logger.info(f"Ingesting from database: {row[1]}")
|
|
||||||
self.config.serviceConnection.__root__.config.database = row[1]
|
|
||||||
self.engine = get_connection(self.service_connection)
|
|
||||||
self.set_session_query_tag()
|
|
||||||
yield from super().get_table_query()
|
|
||||||
|
|
||||||
def get_database_name(self, data: dict) -> str: # pylint: disable=arguments-differ
|
def get_database_name(self, data: dict) -> str: # pylint: disable=arguments-differ
|
||||||
"""
|
"""
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user