Merge pull request #241 from open-metadata/issue-240

ISSUE-249: Ingest views into OpenMetadata
This commit is contained in:
Suresh Srinivas 2021-08-19 14:02:44 -07:00 committed by GitHub
commit 864673cce1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 118 additions and 73 deletions

View File

@ -2,9 +2,9 @@
"source": {
"type": "redshift",
"config": {
"host_port": "cluster.name.region.redshift.amazonaws.com:5439",
"username": "username",
"password": "strong_password",
"host_port": "redshift-cluster-1.clot5cqn1cnb.us-west-2.redshift.amazonaws.com:5439",
"username": "awsuser",
"password": "focguC-kaqqe5-nepsok",
"database": "warehouse",
"service_name": "aws_redshift",
"service_type": "Redshift"

View File

@ -28,8 +28,9 @@ from metadata.generated.schema.type.entityReference import EntityReference
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.table import Table, Column, ColumnConstraint
from sqlalchemy import create_engine
from metadata.generated.schema.entity.data.table import Table, Column, ColumnConstraint, TableType
from sqlalchemy import create_engine, inspect
from sqlalchemy.engine.reflection import Inspector
from sqlalchemy.sql import sqltypes as types
from sqlalchemy.inspection import inspect
@ -66,6 +67,8 @@ class SQLConnectionConfig(ConfigModel):
service_name: str
service_type: str
options: dict = {}
include_views: Optional[bool] = True
include_tables: Optional[bool] = True
filter_pattern: IncludeFilterPattern = IncludeFilterPattern.allow_all()
@abstractmethod
@ -170,7 +173,7 @@ class SQLSource(Source):
def next_record(self) -> Iterable[OMetaDatabaseAndTable]:
sql_config = self.config
url = sql_config.get_connection_url()
logger.debug(f"sql_alchemy_url={url}")
logger.debug(f"sql_connection_url={url}")
engine = create_engine(url, **sql_config.options)
inspector = inspect(engine)
for schema in inspector.get_schema_names():
@ -178,75 +181,117 @@ class SQLSource(Source):
self.status.filtered(schema, "Schema pattern not allowed")
continue
logger.debug("total tables {}".format(inspector.get_table_names(schema)))
for table in inspector.get_table_names(schema):
try:
schema, table = self.standardize_schema_table_names(schema, table)
pk_constraints = inspector.get_pk_constraint(table, schema)
pk_columns = pk_constraints['column_constraints'] if len(
pk_constraints) > 0 and "column_constraints" in pk_constraints.keys() else {}
unique_constraints = []
try:
unique_constraints = inspector.get_unique_constraints(table, schema)
except NotImplementedError:
pass
unique_columns = []
for constraint in unique_constraints:
if 'column_names' in constraint.keys():
unique_columns = constraint['column_names']
if self.config.include_tables:
yield from self.fetch_tables(inspector, schema, sql_config)
if self.config.include_views:
yield from self.fetch_views(inspector, schema, sql_config)
dataset_name = f"{schema}.{table}"
self.status.scanned('{}.{}'.format(self.config.get_service_name(), dataset_name))
if not sql_config.filter_pattern.included(dataset_name):
self.status.filtered('{}.{}'.format(self.config.get_service_name(), dataset_name),
"Table pattern not allowed")
continue
columns = inspector.get_columns(table, schema)
table_info = {}
try:
table_info: dict = inspector.get_table_comment(table, schema)
except NotImplementedError:
description: Optional[str] = None
else:
description = table_info["text"]
table_columns = []
row_order = 1
for column in columns:
col_type = get_column_type(self.status, dataset_name, column['type'])
col_constraint = None
if column['nullable']:
col_constraint = ColumnConstraint.NULL
elif not column['nullable']:
col_constraint = ColumnConstraint.NOT_NULL
if column['name'] in pk_columns:
col_constraint = ColumnConstraint.PRIMARY_KEY
elif column['name'] in unique_columns:
col_constraint = ColumnConstraint.UNIQUE
table_columns.append(Column(name=column['name'],
description=column.get("comment", None),
columnDataType=col_type,
columnConstraint=col_constraint,
ordinalPosition=row_order))
row_order = row_order + 1
db = Database(
name=schema,
service=EntityReference(id=self.service.id, type=self.config.service_type))
table = Table(id=uuid.uuid4(),
name=table,
description=description if description is not None else ' ',
columns=table_columns)
table_and_db = OMetaDatabaseAndTable(table=table, database=db)
yield table_and_db
except ValidationError as err:
logger.error(err)
self.status.filtered('{}.{}'.format(self.config.service_name, dataset_name),
"Validation error")
def fetch_tables(self,
inspector: Inspector,
schema: str,
sql_config: SQLConnectionConfig) -> Iterable[OMetaDatabaseAndTable]:
for table in inspector.get_table_names(schema):
try:
schema, table = self.standardize_schema_table_names(schema, table)
if not sql_config.filter_pattern.included(table):
self.status.filtered('{}.{}'.format(self.config.get_service_name(), table),
"Table pattern not allowed")
continue
self.status.scanned('{}.{}'.format(self.config.get_service_name(), table))
description = self._get_table_description(schema, table, inspector)
table_columns = self._get_columns(schema, table, inspector)
table = Table(id=uuid.uuid4(),
name=table,
tableType='Regular',
description=description if description is not None else ' ',
columns=table_columns)
table_and_db = OMetaDatabaseAndTable(table=table, database=self._get_database(schema))
yield table_and_db
except ValidationError as err:
logger.error(err)
self.status.filtered('{}.{}'.format(self.config.service_name, table),
"Validation error")
continue
def fetch_views(self,
inspector: Inspector,
schema: str,
sql_config: SQLConnectionConfig) -> Iterable[OMetaDatabaseAndTable]:
for view in inspector.get_view_names(schema):
try:
if not sql_config.filter_pattern.included(view):
self.status.filtered('{}.{}'.format(self.config.get_service_name(), view),
"Table pattern not allowed")
continue
description = self._get_table_description(schema, view, inspector)
table_columns = self._get_columns(schema, view, inspector)
table = Table(id=uuid.uuid4(),
name=view,
tableType='View',
description=description if description is not None else ' ',
columns=table_columns)
table_and_db = OMetaDatabaseAndTable(table=table, database=self._get_database(schema))
yield table_and_db
except ValidationError as err:
logger.error(err)
self.status.filtered('{}.{}'.format(self.config.service_name, view),
"Validation error")
continue
def _get_database(self, schema: str) -> Database:
return Database(name=schema,
service=EntityReference(id=self.service.id, type=self.config.service_type))
def _get_columns(self, schema: str, table: str, inspector: Inspector) -> List[Column]:
pk_constraints = inspector.get_pk_constraint(table, schema)
pk_columns = pk_constraints['column_constraints'] if len(
pk_constraints) > 0 and "column_constraints" in pk_constraints.keys() else {}
unique_constraints = []
try:
unique_constraints = inspector.get_unique_constraints(table, schema)
except NotImplementedError:
pass
unique_columns = []
for constraint in unique_constraints:
if 'column_names' in constraint.keys():
unique_columns = constraint['column_names']
dataset_name = f"{schema}.{table}"
columns = inspector.get_columns(table, schema)
table_columns = []
row_order = 1
for column in columns:
col_type = get_column_type(self.status, dataset_name, column['type'])
col_constraint = None
if column['nullable']:
col_constraint = ColumnConstraint.NULL
elif not column['nullable']:
col_constraint = ColumnConstraint.NOT_NULL
if column['name'] in pk_columns:
col_constraint = ColumnConstraint.PRIMARY_KEY
elif column['name'] in unique_columns:
col_constraint = ColumnConstraint.UNIQUE
table_columns.append(Column(name=column['name'],
description=column.get("comment", None),
columnDataType=col_type,
columnConstraint=col_constraint,
ordinalPosition=row_order))
row_order = row_order + 1
return table_columns
def _get_table_description(self, schema: str, table: str, inspector: Inspector) -> str:
try:
table_info: dict = inspector.get_table_comment(table, schema)
except NotImplementedError:
description: Optional[str] = None
else:
description = table_info["text"]
return description
def close(self):
pass