Override test_connection (#4078)

This commit is contained in:
Pere Miquel Brull 2022-04-12 22:14:17 +02:00 committed by GitHub
parent 5e6e878d8e
commit cc68a3369d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 79 additions and 1 deletions

View File

@ -65,3 +65,7 @@ class Source(Closeable, Generic[Entity], metaclass=ABCMeta):
@abstractmethod
def get_status(self) -> SourceStatus:
pass
@abstractmethod
def test_connection(self) -> None:
pass

View File

@ -270,3 +270,6 @@ class AmundsenSource(Source[Entity]):
CreateDatabaseServiceRequest(**service)
)
return created_service
def test_connection(self) -> None:
pass

View File

@ -265,3 +265,6 @@ class AtlasSource(Source):
if not pipeline:
return
return EntityReference(id=pipeline.id.__root__, type="pipeline")
def test_connection(self) -> None:
pass

View File

@ -299,3 +299,6 @@ class DeltalakeSource(Source[Entity]):
or isinstance(delta_type, ArrayType)
or isinstance(delta_type, MapType)
)
def test_connection(self) -> None:
pass

View File

@ -137,3 +137,6 @@ class DynamodbSource(Source[Entity]):
def get_status(self) -> SourceStatus:
return self.status
def test_connection(self) -> None:
pass

View File

@ -179,3 +179,6 @@ class GcsSource(Source[Entity]):
enabled=True,
name=name,
)
def test_connection(self) -> None:
pass

View File

@ -290,3 +290,6 @@ class GlueSource(Source[Entity]):
def get_status(self) -> SourceStatus:
return self.status
def test_connection(self) -> None:
pass

View File

@ -181,3 +181,6 @@ class KafkaSource(Source[CreateTopicRequest]):
def close(self):
pass
def test_connection(self) -> None:
pass

View File

@ -94,3 +94,6 @@ class LdapUsersSource(Source[OMetaUserProfile]):
def close(self):
pass
def test_connection(self) -> None:
pass

View File

@ -167,3 +167,6 @@ class LookerSource(Source[Entity]):
def close(self):
pass
def test_connection(self) -> None:
pass

View File

@ -257,6 +257,9 @@ class MetabaseSource(Source[Entity]):
def prepare(self):
pass
def test_connection(self) -> None:
pass
def get_card_detail(self, card_list):
metadata = OpenMetadata(self.metadata_config)
for card in card_list:

View File

@ -351,3 +351,6 @@ class MetadataSource(Source[Entity]):
def close(self):
pass
def test_connection(self) -> None:
pass

View File

@ -209,3 +209,6 @@ class MlflowSource(Source[CreateMlModelRequest]):
"""
Don't need to close the client
"""
def test_connection(self) -> None:
pass

View File

@ -178,3 +178,6 @@ class PowerbiSource(Source[Entity]):
def prepare(self):
pass
def test_connection(self) -> None:
pass

View File

@ -172,3 +172,6 @@ class RedashSource(Source[Entity]):
def close(self):
self.client.session.close()
def test_connection(self) -> None:
pass

View File

@ -165,3 +165,6 @@ class S3Source(Source[Entity]):
prefixFilter=prefix_filter,
name=name,
)
def test_connection(self) -> None:
pass

View File

@ -203,3 +203,6 @@ class SalesforceSource(Source[OMetaDatabaseAndTable]):
def close(self):
pass
def test_connection(self) -> None:
pass

View File

@ -673,3 +673,6 @@ class SampleDataSource(Source[Entity]):
def get_status(self):
return self.status
def test_connection(self) -> None:
pass

View File

@ -331,3 +331,6 @@ class SampleEntitySource(Source[Entity]):
def get_status(self):
return self.status
def test_connection(self) -> None:
pass

View File

@ -92,3 +92,6 @@ class SampleUsageSource(Source[TableQuery]):
def get_status(self):
return self.status
def test_connection(self) -> None:
pass

View File

@ -62,7 +62,7 @@ from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.orm_profiler.orm.converter import ometa_to_orm
from metadata.orm_profiler.profiler.default import DefaultProfiler
from metadata.utils.column_type_parser import ColumnTypeParser
from metadata.utils.engines import create_and_bind_session, get_engine
from metadata.utils.engines import create_and_bind_session, get_engine, test_connection
from metadata.utils.filters import filter_by_schema, filter_by_table
from metadata.utils.fqdn_generator import get_fqdn
from metadata.utils.helpers import get_database_service_or_create
@ -130,6 +130,8 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
self.metadata = OpenMetadata(metadata_config)
self.status = SQLSourceStatus()
self.engine = get_engine(service_connection=self.config.serviceConnection)
self.test_connection()
self._session = None # We will instantiate this just if needed
self._connection = None # Lazy init as well
self.data_profiler = None
@ -148,6 +150,13 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
self.dbt_manifest = json.load(manifest)
self.profile_date = datetime.now()
def test_connection(self) -> None:
"""
Used a timed-bound function to test that the engine
can properly reach the source
"""
test_connection(self.engine)
def run_profiler(self, table: Table, schema: str) -> Optional[TableProfile]:
"""
Convert the table to an ORM object and run the ORM

View File

@ -366,3 +366,6 @@ class SupersetSource(Source[Entity]):
def close(self):
pass
def test_connection(self) -> None:
pass

View File

@ -270,3 +270,6 @@ class TableauSource(Source[Entity]):
def close(self):
pass
def test_connection(self) -> None:
pass

View File

@ -27,6 +27,7 @@ from metadata.generated.schema.entity.services.connections.serviceConnection imp
ServiceConnection,
)
from metadata.utils.source_connections import get_connection_args, get_connection_url
from metadata.utils.timeout import timeout
logger = logging.getLogger("Utils")
@ -67,6 +68,7 @@ def create_and_bind_session(engine: Engine) -> Session:
return session()
@timeout(seconds=120)
def test_connection(engine: Engine) -> None:
"""
Test that we can connect to the source using the given engine