mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-09-27 01:45:32 +00:00
parent
b7555806fd
commit
05a8401c51
@ -10,6 +10,11 @@
|
|||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
"""MSSQL source module"""
|
"""MSSQL source module"""
|
||||||
|
|
||||||
|
from typing import Iterable
|
||||||
|
|
||||||
|
from sqlalchemy.engine.reflection import Inspector
|
||||||
|
from sqlalchemy.inspection import inspect
|
||||||
|
|
||||||
from metadata.generated.schema.entity.services.connections.database.mssqlConnection import (
|
from metadata.generated.schema.entity.services.connections.database.mssqlConnection import (
|
||||||
MssqlConnection,
|
MssqlConnection,
|
||||||
)
|
)
|
||||||
@ -24,6 +29,11 @@ from metadata.generated.schema.metadataIngestion.workflow import (
|
|||||||
)
|
)
|
||||||
from metadata.ingestion.api.source import InvalidSourceException
|
from metadata.ingestion.api.source import InvalidSourceException
|
||||||
from metadata.ingestion.source.sql_source import SQLSource
|
from metadata.ingestion.source.sql_source import SQLSource
|
||||||
|
from metadata.utils.connections import get_connection
|
||||||
|
from metadata.utils.filters import filter_by_database
|
||||||
|
from metadata.utils.logger import ingestion_logger
|
||||||
|
|
||||||
|
logger = ingestion_logger()
|
||||||
|
|
||||||
|
|
||||||
class MssqlSource(SQLSource):
|
class MssqlSource(SQLSource):
|
||||||
@ -49,3 +59,31 @@ class MssqlSource(SQLSource):
|
|||||||
"select top 50 * from [{}].[{}]"
|
"select top 50 * from [{}].[{}]"
|
||||||
)
|
)
|
||||||
return cls(config, metadata_config)
|
return cls(config, metadata_config)
|
||||||
|
|
||||||
|
def get_databases(self) -> Iterable[Inspector]:
|
||||||
|
if self.service_connection.database:
|
||||||
|
yield from super().get_databases()
|
||||||
|
else:
|
||||||
|
query = "SELECT name FROM master.sys.databases order by name;"
|
||||||
|
results = self.connection.execute(query)
|
||||||
|
db_list = []
|
||||||
|
|
||||||
|
for res in results:
|
||||||
|
db_list.append(list(res))
|
||||||
|
|
||||||
|
for row in db_list:
|
||||||
|
try:
|
||||||
|
if filter_by_database(
|
||||||
|
self.source_config.databaseFilterPattern, database_name=row[0]
|
||||||
|
):
|
||||||
|
self.status.filter(row[0], "Database pattern not allowed")
|
||||||
|
continue
|
||||||
|
logger.info(f"Ingesting from database: {row[0]}")
|
||||||
|
self.service_connection.database = row[0]
|
||||||
|
self.engine = get_connection(
|
||||||
|
self.config.serviceConnection.__root__.config
|
||||||
|
)
|
||||||
|
self.engine.connect()
|
||||||
|
yield inspect(self.engine)
|
||||||
|
except Exception as err:
|
||||||
|
logger.error(f"Failed to Connect: {row[0]} due to error {err}")
|
||||||
|
Loading…
x
Reference in New Issue
Block a user