Fix #17963: Fix PinotDB Ingestion (#18266)

* Fix #17963: Fix PinotDB Ingestion

* fix conn args
This commit is contained in:
Mayur Singal 2024-10-15 08:36:40 +05:30 committed by GitHub
parent 2ee015e426
commit 8322c0f684
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 24 additions and 2 deletions

View File

@ -12,6 +12,7 @@
""" """
Source connection handler Source connection handler
""" """
from copy import deepcopy
from typing import Optional from typing import Optional
from sqlalchemy.engine import Engine from sqlalchemy.engine import Engine
@ -26,6 +27,7 @@ from metadata.ingestion.connections.builders import (
create_generic_db_connection, create_generic_db_connection,
get_connection_args_common, get_connection_args_common,
get_connection_url_common, get_connection_url_common,
init_empty_connection_arguments,
) )
from metadata.ingestion.connections.test_connections import test_connection_db_common from metadata.ingestion.connections.test_connections import test_connection_db_common
from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.ometa.ometa_api import OpenMetadata
@ -41,8 +43,16 @@ def get_connection(connection: PinotDBConnection) -> Engine:
""" """
Create connection Create connection
""" """
# TODO: Rename database field to DatabaseSchema
# Pinot does not support multi database concept
if connection.database is not None:
if not connection.connectionArguments:
connection.connectionArguments = init_empty_connection_arguments()
connection.connectionArguments.root["database"] = connection.database
connection_copy = deepcopy(connection)
connection_copy.database = None
return create_generic_db_connection( return create_generic_db_connection(
connection=connection, connection=connection_copy,
get_connection_url_fn=get_connection_url, get_connection_url_fn=get_connection_url,
get_connection_args_fn=get_connection_args_common, get_connection_args_fn=get_connection_args_common,
) )

View File

@ -9,7 +9,7 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
"""PinotDb source module""" """PinotDb source module"""
from typing import Optional from typing import Iterable, Optional
from metadata.generated.schema.entity.services.connections.database.pinotDBConnection import ( from metadata.generated.schema.entity.services.connections.database.pinotDBConnection import (
PinotDBConnection, PinotDBConnection,
@ -39,3 +39,15 @@ class PinotdbSource(CommonDbSourceService):
f"Expected PinotdbConnection, but got {connection}" f"Expected PinotdbConnection, but got {connection}"
) )
return cls(config, metadata) return cls(config, metadata)
def get_database_names(self) -> Iterable[str]:
"""
Default case with a single database.
It might come informed - or not - from the source.
Sources with multiple databases should overwrite this and
apply the necessary filters.
"""
# TODO: Add databaseDisplayName field in PinotDBConnection
yield "default"