#13696: add support for dot in schema name to fetch tables (#14246)

This commit is contained in:
NiharDoshi99 2023-12-08 12:04:28 +05:30 committed by GitHub
parent 2cfa562d63
commit 8d925c46a5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 674 additions and 4 deletions

View File

@ -27,8 +27,13 @@ from metadata.ingestion.source.database.common_db_source import CommonDbSourceSe
from metadata.ingestion.source.database.mssql.queries import MSSQL_GET_DATABASE from metadata.ingestion.source.database.mssql.queries import MSSQL_GET_DATABASE
from metadata.ingestion.source.database.mssql.utils import ( from metadata.ingestion.source.database.mssql.utils import (
get_columns, get_columns,
get_foreign_keys,
get_pk_constraint,
get_table_comment, get_table_comment,
get_table_names,
get_unique_constraints,
get_view_definition, get_view_definition,
get_view_names,
) )
from metadata.ingestion.source.database.multi_db_source import MultiDBSource from metadata.ingestion.source.database.multi_db_source import MultiDBSource
from metadata.utils import fqn from metadata.utils import fqn
@ -53,6 +58,11 @@ MSDialect.get_view_definition = get_view_definition
MSDialect.get_all_view_definitions = get_all_view_definitions MSDialect.get_all_view_definitions = get_all_view_definitions
MSDialect.get_all_table_comments = get_all_table_comments MSDialect.get_all_table_comments = get_all_table_comments
MSDialect.get_columns = get_columns MSDialect.get_columns = get_columns
MSDialect.get_pk_constraint = get_pk_constraint
MSDialect.get_unique_constraints = get_unique_constraints
MSDialect.get_foreign_keys = get_foreign_keys
MSDialect.get_table_names = get_table_names
MSDialect.get_view_names = get_view_names
class MssqlSource(CommonDbSourceService, MultiDBSource): class MssqlSource(CommonDbSourceService, MultiDBSource):

View File

@ -86,3 +86,103 @@ MSSQL_TEST_GET_QUERIES = textwrap.dedent(
ON db.database_id = t.dbid ON db.database_id = t.dbid
""" """
) )
MSSQL_GET_FOREIGN_KEY = """\
WITH fk_info AS (
SELECT
ischema_ref_con.constraint_schema,
ischema_ref_con.constraint_name,
ischema_key_col.ordinal_position,
ischema_key_col.[table_schema],
ischema_key_col.table_name,
ischema_ref_con.unique_constraint_schema,
ischema_ref_con.unique_constraint_name,
ischema_ref_con.match_option,
ischema_ref_con.update_rule,
ischema_ref_con.delete_rule,
ischema_key_col.column_name AS constrained_column
FROM
INFORMATION_SCHEMA.REFERENTIAL_CONSTRAINTS ischema_ref_con
INNER JOIN
INFORMATION_SCHEMA.KEY_COLUMN_USAGE ischema_key_col ON
ischema_key_col.[table_schema] = ischema_ref_con.constraint_schema
AND ischema_key_col.constraint_name =
ischema_ref_con.constraint_name
WHERE ischema_key_col.table_name = :tablename
AND ischema_key_col.[table_schema] = :owner
),
constraint_info AS (
SELECT
ischema_key_col.constraint_schema,
ischema_key_col.constraint_name,
ischema_key_col.ordinal_position,
ischema_key_col.[table_schema],
ischema_key_col.table_name,
ischema_key_col.column_name
FROM
INFORMATION_SCHEMA.KEY_COLUMN_USAGE ischema_key_col
),
index_info AS (
SELECT
sys.schemas.name AS index_schema,
sys.indexes.name AS index_name,
sys.index_columns.key_ordinal AS ordinal_position,
sys.schemas.name AS [table_schema],
sys.objects.name AS table_name,
sys.columns.name AS column_name
FROM
sys.indexes
INNER JOIN
sys.objects ON
sys.objects.object_id = sys.indexes.object_id
INNER JOIN
sys.schemas ON
sys.schemas.schema_id = sys.objects.schema_id
INNER JOIN
sys.index_columns ON
sys.index_columns.object_id = sys.objects.object_id
AND sys.index_columns.index_id = sys.indexes.index_id
INNER JOIN
sys.columns ON
sys.columns.object_id = sys.indexes.object_id
AND sys.columns.column_id = sys.index_columns.column_id
)
SELECT
fk_info.constraint_schema,
fk_info.constraint_name,
fk_info.ordinal_position,
fk_info.constrained_column,
constraint_info.[table_schema] AS referred_table_schema,
constraint_info.table_name AS referred_table_name,
constraint_info.column_name AS referred_column,
fk_info.match_option,
fk_info.update_rule,
fk_info.delete_rule
FROM
fk_info INNER JOIN constraint_info ON
constraint_info.constraint_schema =
fk_info.unique_constraint_schema
AND constraint_info.constraint_name =
fk_info.unique_constraint_name
AND constraint_info.ordinal_position = fk_info.ordinal_position
UNION
SELECT
fk_info.constraint_schema,
fk_info.constraint_name,
fk_info.ordinal_position,
fk_info.constrained_column,
index_info.[table_schema] AS referred_table_schema,
index_info.table_name AS referred_table_name,
index_info.column_name AS referred_column,
fk_info.match_option,
fk_info.update_rule,
fk_info.delete_rule
FROM
fk_info INNER JOIN index_info ON
index_info.index_schema = fk_info.unique_constraint_schema
AND index_info.index_name = fk_info.unique_constraint_name
AND index_info.ordinal_position = fk_info.ordinal_position
ORDER BY fk_info.constraint_schema, fk_info.constraint_name,
fk_info.ordinal_position
"""

View File

@ -12,7 +12,7 @@
MSSQL SQLAlchemy Helper Methods MSSQL SQLAlchemy Helper Methods
""" """
from sqlalchemy import Column, Integer, MetaData, String, Table, alias, sql from sqlalchemy import Column, Integer, MetaData, String, Table, alias, sql, text
from sqlalchemy import types as sqltypes from sqlalchemy import types as sqltypes
from sqlalchemy import util from sqlalchemy import util
from sqlalchemy.dialects.mssql import information_schema as ischema from sqlalchemy.dialects.mssql import information_schema as ischema
@ -25,7 +25,9 @@ from sqlalchemy.dialects.mssql.base import (
MSString, MSString,
MSText, MSText,
MSVarBinary, MSVarBinary,
_db_plus_owner, _owner_plus_db,
_switch_db,
update_wrapper,
) )
from sqlalchemy.engine import reflection from sqlalchemy.engine import reflection
from sqlalchemy.sql import func from sqlalchemy.sql import func
@ -34,6 +36,7 @@ from sqlalchemy.util import compat
from metadata.ingestion.source.database.mssql.queries import ( from metadata.ingestion.source.database.mssql.queries import (
MSSQL_ALL_VIEW_DEFINITIONS, MSSQL_ALL_VIEW_DEFINITIONS,
MSSQL_GET_FOREIGN_KEY,
MSSQL_GET_TABLE_COMMENTS, MSSQL_GET_TABLE_COMMENTS,
) )
from metadata.utils.logger import ingestion_logger from metadata.utils.logger import ingestion_logger
@ -59,8 +62,39 @@ def get_table_comment(
) )
def db_plus_owner_listing(fn):
def wrap(dialect, connection, schema=None, **kw):
schema = f"[{schema}]" if schema and "." in schema else schema
dbname, owner = _owner_plus_db(dialect, schema)
return _switch_db(
dbname, connection, fn, dialect, connection, dbname, owner, schema, **kw
)
return update_wrapper(wrap, fn)
def db_plus_owner(fn):
def wrap(dialect, connection, tablename, schema=None, **kw):
schema = f"[{schema}]" if schema and "." in schema else schema
dbname, owner = _owner_plus_db(dialect, schema)
return _switch_db(
dbname,
connection,
fn,
dialect,
connection,
tablename,
dbname,
owner,
schema,
**kw,
)
return update_wrapper(wrap, fn)
@reflection.cache @reflection.cache
@_db_plus_owner @db_plus_owner
def get_columns( def get_columns(
self, connection, tablename, dbname, owner, schema, **kw self, connection, tablename, dbname, owner, schema, **kw
): # pylint: disable=unused-argument, too-many-locals, disable=too-many-branches, too-many-statements ): # pylint: disable=unused-argument, too-many-locals, disable=too-many-branches, too-many-statements
@ -271,7 +305,7 @@ def get_columns(
@reflection.cache @reflection.cache
@_db_plus_owner @db_plus_owner
def get_view_definition( def get_view_definition(
self, connection, viewname, dbname, owner, schema, **kw self, connection, viewname, dbname, owner, schema, **kw
): # pylint: disable=unused-argument ): # pylint: disable=unused-argument
@ -282,3 +316,168 @@ def get_view_definition(
schema=owner, schema=owner,
query=MSSQL_ALL_VIEW_DEFINITIONS, query=MSSQL_ALL_VIEW_DEFINITIONS,
) )
@reflection.cache
@db_plus_owner
def get_pk_constraint(
self, connection, tablename, dbname, owner=None, schema=None, **kw
): # pylint: disable=unused-argument
"""
This function overrides to get pk constraint
"""
pkeys = []
tc = ischema.constraints
c = ischema.key_constraints.alias("C")
# Primary key constraints
s = (
sql.select(c.c.column_name, tc.c.constraint_type, c.c.constraint_name)
.where(
sql.and_(
tc.c.constraint_name == c.c.constraint_name,
tc.c.table_schema == c.c.table_schema,
c.c.table_name == tablename,
c.c.table_schema == owner,
),
)
.order_by(tc.c.constraint_name, c.c.ordinal_position)
)
cursor = connection.execution_options(future_result=True).execute(s)
constraint_name = None
for row in cursor.mappings():
if "PRIMARY" in row[tc.c.constraint_type.name]:
pkeys.append(row["COLUMN_NAME"])
if constraint_name is None:
constraint_name = row[c.c.constraint_name.name]
return {"constrained_columns": pkeys, "name": constraint_name}
@reflection.cache
def get_unique_constraints(self, connection, table_name, schema=None, **kw):
raise NotImplementedError()
@reflection.cache
@db_plus_owner
def get_foreign_keys(
self, connection, tablename, dbname, owner=None, schema=None, **kw
): # pylint: disable=unused-argument, too-many-locals
"""
This function overrides to get foreign key constraint
"""
s = (
text(MSSQL_GET_FOREIGN_KEY)
.bindparams(
sql.bindparam("tablename", tablename, ischema.CoerceUnicode()),
sql.bindparam("owner", owner, ischema.CoerceUnicode()),
)
.columns(
constraint_schema=sqltypes.Unicode(),
constraint_name=sqltypes.Unicode(),
table_schema=sqltypes.Unicode(),
table_name=sqltypes.Unicode(),
constrained_column=sqltypes.Unicode(),
referred_table_schema=sqltypes.Unicode(),
referred_table_name=sqltypes.Unicode(),
referred_column=sqltypes.Unicode(),
)
)
# group rows by constraint ID, to handle multi-column FKs
fkeys = []
def fkey_rec():
return {
"name": None,
"constrained_columns": [],
"referred_schema": None,
"referred_table": None,
"referred_columns": [],
"options": {},
}
fkeys = util.defaultdict(fkey_rec)
for r in connection.execute(s).fetchall():
(
_, # constraint schema
rfknm,
_, # ordinal position
scol,
rschema,
rtbl,
rcol,
# TODO: we support match=<keyword> for foreign keys so
# we can support this also, PG has match=FULL for example
# but this seems to not be a valid value for SQL Server
_, # match rule
fkuprule,
fkdelrule,
) = r
rec = fkeys[rfknm]
rec["name"] = rfknm
if fkuprule != "NO ACTION":
rec["options"]["onupdate"] = fkuprule
if fkdelrule != "NO ACTION":
rec["options"]["ondelete"] = fkdelrule
if not rec["referred_table"]:
rec["referred_table"] = rtbl
if schema is not None or owner != rschema:
if dbname:
rschema = dbname + "." + rschema
rec["referred_schema"] = rschema
local_cols, remote_cols = (
rec["constrained_columns"],
rec["referred_columns"],
)
local_cols.append(scol)
remote_cols.append(rcol)
return list(fkeys.values())
@reflection.cache
@db_plus_owner_listing
def get_table_names(
self, connection, dbname, owner, schema, **kw
): # pylint: disable=unused-argument
tables = ischema.tables
s = (
sql.select(tables.c.table_name)
.where(
sql.and_(
tables.c.table_schema == owner,
tables.c.table_type == "BASE TABLE",
)
)
.order_by(tables.c.table_name)
)
table_names = [r[0] for r in connection.execute(s)]
return table_names
@reflection.cache
@db_plus_owner_listing
def get_view_names(
self, connection, dbname, owner, schema, **kw
): # pylint: disable=unused-argument
tables = ischema.tables
s = (
sql.select(tables.c.table_name)
.where(
sql.and_(
tables.c.table_schema == owner,
tables.c.table_type == "VIEW",
)
)
.order_by(tables.c.table_name)
)
view_names = [r[0] for r in connection.execute(s)]
return view_names

View File

@ -0,0 +1,361 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Test Mssql using the topology
"""
import types
from unittest import TestCase
from unittest.mock import patch
from sqlalchemy.types import INTEGER, VARCHAR
import metadata.ingestion.source.database.mssql.utils as mssql_dialet
from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest
from metadata.generated.schema.api.data.createDatabaseSchema import (
CreateDatabaseSchemaRequest,
)
from metadata.generated.schema.api.data.createTable import CreateTableRequest
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
from metadata.generated.schema.entity.data.table import (
Column,
ColumnName,
DataType,
TableType,
)
from metadata.generated.schema.entity.services.databaseService import (
DatabaseConnection,
DatabaseService,
DatabaseServiceType,
)
from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig,
)
from metadata.generated.schema.type.basic import EntityName, FullyQualifiedEntityName
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.source.database.mssql.metadata import MssqlSource
mock_mssql_config = {
"source": {
"type": "mssql",
"serviceName": "test2",
"serviceConnection": {
"config": {
"type": "Mssql",
"database": "test_database",
"username": "username",
"password": "password",
"hostPort": "localhost:1466",
}
},
"sourceConfig": {"config": {"type": "DatabaseMetadata"}},
},
"sink": {"type": "metadata-rest", "config": {}},
"workflowConfig": {
"openMetadataServerConfig": {
"hostPort": "http://localhost:8585/api",
"authProvider": "openmetadata",
"securityConfig": {
"jwtToken": "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGc"
"iOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE"
"2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXB"
"iEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fN"
"r3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3u"
"d-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"
},
}
},
}
MOCK_DATABASE_SERVICE = DatabaseService(
id="c3eb265f-5445-4ad3-ba5e-797d3a3071bb",
name="mssql_source_test",
connection=DatabaseConnection(),
serviceType=DatabaseServiceType.Mssql,
)
MOCK_DATABASE = Database(
id="a58b1856-729c-493b-bc87-6d2269b43ec0",
name="sample_database",
fullyQualifiedName="mssql_source_test.sample_database",
displayName="sample_database",
description="",
service=EntityReference(
id="85811038-099a-11ed-861d-0242ac120002", type="databaseService"
),
)
MOCK_DATABASE_SCHEMA = DatabaseSchema(
id="c3eb265f-5445-4ad3-ba5e-797d3a3071bb",
name="sample.schema",
fullyQualifiedName="mssql_source_test.sample_database.sample.schema",
service=EntityReference(id="c3eb265f-5445-4ad3-ba5e-797d3a3071bb", type="database"),
database=EntityReference(
id="a58b1856-729c-493b-bc87-6d2269b43ec0",
type="database",
),
)
MOCK_COLUMN_VALUE = [
{
"name": "sample_col_1",
"type": VARCHAR(),
"nullable": True,
"default": None,
"autoincrement": False,
"system_data_type": "varchar(50)",
"comment": None,
},
{
"name": "sample_col_2",
"type": INTEGER(),
"nullable": True,
"default": None,
"autoincrement": False,
"system_data_type": "int",
"comment": None,
},
{
"name": "sample_col_3",
"type": VARCHAR(),
"nullable": True,
"default": None,
"autoincrement": False,
"system_data_type": "varchar(50)",
"comment": None,
},
{
"name": "sample_col_4",
"type": VARCHAR(),
"nullable": True,
"default": None,
"autoincrement": False,
"comment": None,
"system_data_type": "varchar(50)",
},
]
EXPECTED_DATABASE = [
CreateDatabaseRequest(
name=EntityName(__root__="sample_database"),
displayName=None,
description=None,
tags=None,
owner=None,
service=FullyQualifiedEntityName(__root__="mssql_source_test"),
dataProducts=None,
default=False,
retentionPeriod=None,
extension=None,
sourceUrl=None,
domain=None,
lifeCycle=None,
sourceHash=None,
)
]
EXPECTED_DATABASE_SCHEMA = [
CreateDatabaseSchemaRequest(
name=EntityName(__root__="sample.schema"),
displayName=None,
description=None,
owner=None,
database=FullyQualifiedEntityName(__root__="mssql_source_test.sample_database"),
dataProducts=None,
tags=None,
retentionPeriod=None,
extension=None,
sourceUrl=None,
domain=None,
lifeCycle=None,
sourceHash=None,
)
]
EXPECTED_TABLE = [
CreateTableRequest(
name=EntityName(__root__="sample_table"),
displayName=None,
description=None,
tableType=TableType.Regular.name,
columns=[
Column(
name=ColumnName(__root__="sample_col_1"),
displayName=None,
dataType=DataType.VARCHAR.name,
arrayDataType=None,
dataLength=1,
precision=None,
scale=None,
dataTypeDisplay="varchar(50)",
description=None,
fullyQualifiedName=None,
tags=None,
constraint="NULL",
ordinalPosition=None,
jsonSchema=None,
children=None,
profile=None,
customMetrics=None,
),
Column(
name=ColumnName(__root__="sample_col_2"),
displayName=None,
dataType=DataType.INT.name,
arrayDataType=None,
dataLength=1,
precision=None,
scale=None,
dataTypeDisplay="int",
description=None,
fullyQualifiedName=None,
tags=None,
constraint="NULL",
ordinalPosition=None,
jsonSchema=None,
children=None,
profile=None,
customMetrics=None,
),
Column(
name=ColumnName(__root__="sample_col_3"),
displayName=None,
dataType=DataType.VARCHAR.name,
arrayDataType=None,
dataLength=1,
precision=None,
scale=None,
dataTypeDisplay="varchar(50)",
description=None,
fullyQualifiedName=None,
tags=None,
constraint="NULL",
ordinalPosition=None,
jsonSchema=None,
children=None,
profile=None,
customMetrics=None,
),
Column(
name=ColumnName(__root__="sample_col_4"),
displayName=None,
dataType=DataType.VARCHAR.name,
arrayDataType=None,
dataLength=1,
precision=None,
scale=None,
dataTypeDisplay="varchar(50)",
description=None,
fullyQualifiedName=None,
tags=None,
constraint="NULL",
ordinalPosition=None,
jsonSchema=None,
children=None,
profile=None,
customMetrics=None,
),
],
tableConstraints=[],
tablePartition=None,
tableProfilerConfig=None,
owner=None,
databaseSchema=FullyQualifiedEntityName(
__root__='mssql_source_test.sample_database."sample.schema"'
),
tags=None,
viewDefinition=None,
retentionPeriod=None,
extension=None,
sourceUrl=None,
domain=None,
dataProducts=None,
fileFormat=None,
lifeCycle=None,
sourceHash=None,
)
]
class MssqlUnitTest(TestCase):
"""
Implements the necessary methods to extract
Mssql Unit Test
"""
@patch(
"metadata.ingestion.source.database.common_db_source.CommonDbSourceService.test_connection"
)
def __init__(
self,
methodName,
test_connection,
) -> None:
super().__init__(methodName)
test_connection.return_value = False
self.config = OpenMetadataWorkflowConfig.parse_obj(mock_mssql_config)
self.mssql = MssqlSource.create(
mock_mssql_config["source"],
self.config.workflowConfig.openMetadataServerConfig,
)
self.mssql.context.__dict__[
"database_service"
] = MOCK_DATABASE_SERVICE.name.__root__
self.mssql.inspector = types.SimpleNamespace()
self.mssql.inspector.get_columns = (
lambda table_name, schema_name, db_name: MOCK_COLUMN_VALUE
)
self.mssql.inspector.get_pk_constraint = lambda table_name, schema_name: []
self.mssql.inspector.get_unique_constraints = lambda table_name, schema_name: []
self.mssql.inspector.get_foreign_keys = lambda table_name, schema_name: []
def test_yield_database(self):
assert EXPECTED_DATABASE == [
either.right for either in self.mssql.yield_database(MOCK_DATABASE.name)
]
self.mssql.context.__dict__[
"database_service"
] = MOCK_DATABASE_SERVICE.name.__root__
self.mssql.context.__dict__["database"] = MOCK_DATABASE.name.__root__
@mssql_dialet.db_plus_owner
def mock_function(
self, connection, tablename, dbname, owner, schema, **kw
): # pylint: disable=unused-argument
# Mock function for testing
return schema
def test_schema_with_dot(self):
# Test when the schema contains a dot
result = self.mock_function( # pylint: disable=no-value-for-parameter
"mock_dialect",
"mock_connection",
"your.schema",
)
self.assertEqual(result, "[your.schema]")
def test_yield_schema(self):
assert EXPECTED_DATABASE_SCHEMA == [
either.right
for either in self.mssql.yield_database_schema(MOCK_DATABASE_SCHEMA.name)
]
self.mssql.context.__dict__[
"database_schema"
] = MOCK_DATABASE_SCHEMA.name.__root__
def test_yield_table(self):
assert EXPECTED_TABLE == [
either.right
for either in self.mssql.yield_table(("sample_table", "Regular"))
]