diff --git a/ingestion/src/metadata/ingestion/api/source.py b/ingestion/src/metadata/ingestion/api/source.py index ea26ee42628..e4294be2628 100644 --- a/ingestion/src/metadata/ingestion/api/source.py +++ b/ingestion/src/metadata/ingestion/api/source.py @@ -65,3 +65,7 @@ class Source(Closeable, Generic[Entity], metaclass=ABCMeta): @abstractmethod def get_status(self) -> SourceStatus: pass + + @abstractmethod + def test_connection(self) -> None: + pass diff --git a/ingestion/src/metadata/ingestion/source/amundsen.py b/ingestion/src/metadata/ingestion/source/amundsen.py index 655121a1031..5b7e8d737a5 100644 --- a/ingestion/src/metadata/ingestion/source/amundsen.py +++ b/ingestion/src/metadata/ingestion/source/amundsen.py @@ -270,3 +270,6 @@ class AmundsenSource(Source[Entity]): CreateDatabaseServiceRequest(**service) ) return created_service + + def test_connection(self) -> None: + pass diff --git a/ingestion/src/metadata/ingestion/source/atlas.py b/ingestion/src/metadata/ingestion/source/atlas.py index 11f77f3970d..82df6b0d690 100644 --- a/ingestion/src/metadata/ingestion/source/atlas.py +++ b/ingestion/src/metadata/ingestion/source/atlas.py @@ -265,3 +265,6 @@ class AtlasSource(Source): if not pipeline: return return EntityReference(id=pipeline.id.__root__, type="pipeline") + + def test_connection(self) -> None: + pass diff --git a/ingestion/src/metadata/ingestion/source/deltalake.py b/ingestion/src/metadata/ingestion/source/deltalake.py index 2f98d3c2079..32fb3b4a8a6 100644 --- a/ingestion/src/metadata/ingestion/source/deltalake.py +++ b/ingestion/src/metadata/ingestion/source/deltalake.py @@ -299,3 +299,6 @@ class DeltalakeSource(Source[Entity]): or isinstance(delta_type, ArrayType) or isinstance(delta_type, MapType) ) + + def test_connection(self) -> None: + pass diff --git a/ingestion/src/metadata/ingestion/source/dynamodb.py b/ingestion/src/metadata/ingestion/source/dynamodb.py index 47e2eaa16cb..5f0a758e6ec 100644 --- a/ingestion/src/metadata/ingestion/source/dynamodb.py +++ b/ingestion/src/metadata/ingestion/source/dynamodb.py @@ -137,3 +137,6 @@ class DynamodbSource(Source[Entity]): def get_status(self) -> SourceStatus: return self.status + + def test_connection(self) -> None: + pass diff --git a/ingestion/src/metadata/ingestion/source/gcs.py b/ingestion/src/metadata/ingestion/source/gcs.py index 01c3da7aded..6e2e7204b41 100644 --- a/ingestion/src/metadata/ingestion/source/gcs.py +++ b/ingestion/src/metadata/ingestion/source/gcs.py @@ -179,3 +179,6 @@ class GcsSource(Source[Entity]): enabled=True, name=name, ) + + def test_connection(self) -> None: + pass diff --git a/ingestion/src/metadata/ingestion/source/glue.py b/ingestion/src/metadata/ingestion/source/glue.py index c70ef2a223d..cb951b1ff7e 100644 --- a/ingestion/src/metadata/ingestion/source/glue.py +++ b/ingestion/src/metadata/ingestion/source/glue.py @@ -290,3 +290,6 @@ class GlueSource(Source[Entity]): def get_status(self) -> SourceStatus: return self.status + + def test_connection(self) -> None: + pass diff --git a/ingestion/src/metadata/ingestion/source/kafka.py b/ingestion/src/metadata/ingestion/source/kafka.py index ce07c097e25..c4d34bcdb2d 100644 --- a/ingestion/src/metadata/ingestion/source/kafka.py +++ b/ingestion/src/metadata/ingestion/source/kafka.py @@ -181,3 +181,6 @@ class KafkaSource(Source[CreateTopicRequest]): def close(self): pass + + def test_connection(self) -> None: + pass diff --git a/ingestion/src/metadata/ingestion/source/ldap_users.py b/ingestion/src/metadata/ingestion/source/ldap_users.py index e0fcaded48d..3063b1c98f8 100644 --- a/ingestion/src/metadata/ingestion/source/ldap_users.py +++ b/ingestion/src/metadata/ingestion/source/ldap_users.py @@ -94,3 +94,6 @@ class LdapUsersSource(Source[OMetaUserProfile]): def close(self): pass + + def test_connection(self) -> None: + pass diff --git a/ingestion/src/metadata/ingestion/source/looker.py b/ingestion/src/metadata/ingestion/source/looker.py index f569dd3f4fb..0a312401fb5 100644 --- a/ingestion/src/metadata/ingestion/source/looker.py +++ b/ingestion/src/metadata/ingestion/source/looker.py @@ -167,3 +167,6 @@ class LookerSource(Source[Entity]): def close(self): pass + + def test_connection(self) -> None: + pass diff --git a/ingestion/src/metadata/ingestion/source/metabase.py b/ingestion/src/metadata/ingestion/source/metabase.py index e61aba319de..a8ee0777972 100644 --- a/ingestion/src/metadata/ingestion/source/metabase.py +++ b/ingestion/src/metadata/ingestion/source/metabase.py @@ -257,6 +257,9 @@ class MetabaseSource(Source[Entity]): def prepare(self): pass + def test_connection(self) -> None: + pass + def get_card_detail(self, card_list): metadata = OpenMetadata(self.metadata_config) for card in card_list: diff --git a/ingestion/src/metadata/ingestion/source/metadata.py b/ingestion/src/metadata/ingestion/source/metadata.py index 0bbf7c00c91..150fc289ee7 100644 --- a/ingestion/src/metadata/ingestion/source/metadata.py +++ b/ingestion/src/metadata/ingestion/source/metadata.py @@ -351,3 +351,6 @@ class MetadataSource(Source[Entity]): def close(self): pass + + def test_connection(self) -> None: + pass diff --git a/ingestion/src/metadata/ingestion/source/mlflow.py b/ingestion/src/metadata/ingestion/source/mlflow.py index 88f4244944a..3d8902809e6 100644 --- a/ingestion/src/metadata/ingestion/source/mlflow.py +++ b/ingestion/src/metadata/ingestion/source/mlflow.py @@ -209,3 +209,6 @@ class MlflowSource(Source[CreateMlModelRequest]): """ Don't need to close the client """ + + def test_connection(self) -> None: + pass diff --git a/ingestion/src/metadata/ingestion/source/powerbi.py b/ingestion/src/metadata/ingestion/source/powerbi.py index 9ca818b0159..2e7bb0a3ef2 100644 --- a/ingestion/src/metadata/ingestion/source/powerbi.py +++ b/ingestion/src/metadata/ingestion/source/powerbi.py @@ -178,3 +178,6 @@ class PowerbiSource(Source[Entity]): def prepare(self): pass + + def test_connection(self) -> None: + pass diff --git a/ingestion/src/metadata/ingestion/source/redash.py b/ingestion/src/metadata/ingestion/source/redash.py index 6b7aaaf7895..4d74d54cdf2 100644 --- a/ingestion/src/metadata/ingestion/source/redash.py +++ b/ingestion/src/metadata/ingestion/source/redash.py @@ -172,3 +172,6 @@ class RedashSource(Source[Entity]): def close(self): self.client.session.close() + + def test_connection(self) -> None: + pass diff --git a/ingestion/src/metadata/ingestion/source/s3.py b/ingestion/src/metadata/ingestion/source/s3.py index 06548f3f1dc..2d7d9d6876f 100644 --- a/ingestion/src/metadata/ingestion/source/s3.py +++ b/ingestion/src/metadata/ingestion/source/s3.py @@ -165,3 +165,6 @@ class S3Source(Source[Entity]): prefixFilter=prefix_filter, name=name, ) + + def test_connection(self) -> None: + pass diff --git a/ingestion/src/metadata/ingestion/source/salesforce.py b/ingestion/src/metadata/ingestion/source/salesforce.py index b5ff6992cbb..63a44f3fdcc 100644 --- a/ingestion/src/metadata/ingestion/source/salesforce.py +++ b/ingestion/src/metadata/ingestion/source/salesforce.py @@ -203,3 +203,6 @@ class SalesforceSource(Source[OMetaDatabaseAndTable]): def close(self): pass + + def test_connection(self) -> None: + pass diff --git a/ingestion/src/metadata/ingestion/source/sample_data.py b/ingestion/src/metadata/ingestion/source/sample_data.py index 305d7531a93..1eeb1fe6005 100644 --- a/ingestion/src/metadata/ingestion/source/sample_data.py +++ b/ingestion/src/metadata/ingestion/source/sample_data.py @@ -673,3 +673,6 @@ class SampleDataSource(Source[Entity]): def get_status(self): return self.status + + def test_connection(self) -> None: + pass diff --git a/ingestion/src/metadata/ingestion/source/sample_entity.py b/ingestion/src/metadata/ingestion/source/sample_entity.py index d1d7bc76119..354440ee61f 100644 --- a/ingestion/src/metadata/ingestion/source/sample_entity.py +++ b/ingestion/src/metadata/ingestion/source/sample_entity.py @@ -331,3 +331,6 @@ class SampleEntitySource(Source[Entity]): def get_status(self): return self.status + + def test_connection(self) -> None: + pass diff --git a/ingestion/src/metadata/ingestion/source/sample_usage.py b/ingestion/src/metadata/ingestion/source/sample_usage.py index fe8528fb5bb..095d1755f08 100644 --- a/ingestion/src/metadata/ingestion/source/sample_usage.py +++ b/ingestion/src/metadata/ingestion/source/sample_usage.py @@ -92,3 +92,6 @@ class SampleUsageSource(Source[TableQuery]): def get_status(self): return self.status + + def test_connection(self) -> None: + pass diff --git a/ingestion/src/metadata/ingestion/source/sql_source.py b/ingestion/src/metadata/ingestion/source/sql_source.py index b99c16828d9..f2c9b2b5b2f 100644 --- a/ingestion/src/metadata/ingestion/source/sql_source.py +++ b/ingestion/src/metadata/ingestion/source/sql_source.py @@ -62,7 +62,7 @@ from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.orm_profiler.orm.converter import ometa_to_orm from metadata.orm_profiler.profiler.default import DefaultProfiler from metadata.utils.column_type_parser import ColumnTypeParser -from metadata.utils.engines import create_and_bind_session, get_engine +from metadata.utils.engines import create_and_bind_session, get_engine, test_connection from metadata.utils.filters import filter_by_schema, filter_by_table from metadata.utils.fqdn_generator import get_fqdn from metadata.utils.helpers import get_database_service_or_create @@ -130,6 +130,8 @@ class SQLSource(Source[OMetaDatabaseAndTable]): self.metadata = OpenMetadata(metadata_config) self.status = SQLSourceStatus() self.engine = get_engine(service_connection=self.config.serviceConnection) + self.test_connection() + self._session = None # We will instantiate this just if needed self._connection = None # Lazy init as well self.data_profiler = None @@ -148,6 +150,13 @@ class SQLSource(Source[OMetaDatabaseAndTable]): self.dbt_manifest = json.load(manifest) self.profile_date = datetime.now() + def test_connection(self) -> None: + """ + Used a timed-bound function to test that the engine + can properly reach the source + """ + test_connection(self.engine) + def run_profiler(self, table: Table, schema: str) -> Optional[TableProfile]: """ Convert the table to an ORM object and run the ORM diff --git a/ingestion/src/metadata/ingestion/source/superset.py b/ingestion/src/metadata/ingestion/source/superset.py index a55b8205ec0..f30c5bd1d35 100644 --- a/ingestion/src/metadata/ingestion/source/superset.py +++ b/ingestion/src/metadata/ingestion/source/superset.py @@ -366,3 +366,6 @@ class SupersetSource(Source[Entity]): def close(self): pass + + def test_connection(self) -> None: + pass diff --git a/ingestion/src/metadata/ingestion/source/tableau.py b/ingestion/src/metadata/ingestion/source/tableau.py index bfc963b9727..f4a1031d604 100644 --- a/ingestion/src/metadata/ingestion/source/tableau.py +++ b/ingestion/src/metadata/ingestion/source/tableau.py @@ -270,3 +270,6 @@ class TableauSource(Source[Entity]): def close(self): pass + + def test_connection(self) -> None: + pass diff --git a/ingestion/src/metadata/utils/engines.py b/ingestion/src/metadata/utils/engines.py index ce7c4c19423..b83ab2abe03 100644 --- a/ingestion/src/metadata/utils/engines.py +++ b/ingestion/src/metadata/utils/engines.py @@ -27,6 +27,7 @@ from metadata.generated.schema.entity.services.connections.serviceConnection imp ServiceConnection, ) from metadata.utils.source_connections import get_connection_args, get_connection_url +from metadata.utils.timeout import timeout logger = logging.getLogger("Utils") @@ -67,6 +68,7 @@ def create_and_bind_session(engine: Engine) -> Session: return session() +@timeout(seconds=120) def test_connection(engine: Engine) -> None: """ Test that we can connect to the source using the given engine