From 4d389b3a3002c6f8865b4fccb025286dfeb70d0b Mon Sep 17 00:00:00 2001 From: Suresh Srinivas Date: Thu, 19 Aug 2021 13:33:07 -0700 Subject: [PATCH] ISSUE-249: Ingest views into OpenMetadata --- ingestion/examples/workflows/redshift.json | 6 +- .../metadata/ingestion/source/sql_source.py | 185 +++++++++++------- 2 files changed, 118 insertions(+), 73 deletions(-) diff --git a/ingestion/examples/workflows/redshift.json b/ingestion/examples/workflows/redshift.json index 45eea9f86ba..77cf9824568 100644 --- a/ingestion/examples/workflows/redshift.json +++ b/ingestion/examples/workflows/redshift.json @@ -2,9 +2,9 @@ "source": { "type": "redshift", "config": { - "host_port": "cluster.name.region.redshift.amazonaws.com:5439", - "username": "username", - "password": "strong_password", + "host_port": "redshift-cluster-1.clot5cqn1cnb.us-west-2.redshift.amazonaws.com:5439", + "username": "awsuser", + "password": "focguC-kaqqe5-nepsok", "database": "warehouse", "service_name": "aws_redshift", "service_type": "Redshift" diff --git a/ingestion/src/metadata/ingestion/source/sql_source.py b/ingestion/src/metadata/ingestion/source/sql_source.py index b8f67a97af2..bcde41887d8 100644 --- a/ingestion/src/metadata/ingestion/source/sql_source.py +++ b/ingestion/src/metadata/ingestion/source/sql_source.py @@ -28,8 +28,9 @@ from metadata.generated.schema.type.entityReference import EntityReference from metadata.generated.schema.entity.data.database import Database -from metadata.generated.schema.entity.data.table import Table, Column, ColumnConstraint -from sqlalchemy import create_engine +from metadata.generated.schema.entity.data.table import Table, Column, ColumnConstraint, TableType +from sqlalchemy import create_engine, inspect +from sqlalchemy.engine.reflection import Inspector from sqlalchemy.sql import sqltypes as types from sqlalchemy.inspection import inspect @@ -66,6 +67,8 @@ class SQLConnectionConfig(ConfigModel): service_name: str service_type: str options: dict = {} + include_views: Optional[bool] = True + include_tables: Optional[bool] = True filter_pattern: IncludeFilterPattern = IncludeFilterPattern.allow_all() @abstractmethod @@ -170,7 +173,7 @@ class SQLSource(Source): def next_record(self) -> Iterable[OMetaDatabaseAndTable]: sql_config = self.config url = sql_config.get_connection_url() - logger.debug(f"sql_alchemy_url={url}") + logger.debug(f"sql_connection_url={url}") engine = create_engine(url, **sql_config.options) inspector = inspect(engine) for schema in inspector.get_schema_names(): @@ -178,75 +181,117 @@ class SQLSource(Source): self.status.filtered(schema, "Schema pattern not allowed") continue logger.debug("total tables {}".format(inspector.get_table_names(schema))) - for table in inspector.get_table_names(schema): - try: - schema, table = self.standardize_schema_table_names(schema, table) - pk_constraints = inspector.get_pk_constraint(table, schema) - pk_columns = pk_constraints['column_constraints'] if len( - pk_constraints) > 0 and "column_constraints" in pk_constraints.keys() else {} - unique_constraints = [] - try: - unique_constraints = inspector.get_unique_constraints(table, schema) - except NotImplementedError: - pass - unique_columns = [] - for constraint in unique_constraints: - if 'column_names' in constraint.keys(): - unique_columns = constraint['column_names'] + if self.config.include_tables: + yield from self.fetch_tables(inspector, schema, sql_config) + if self.config.include_views: + yield from self.fetch_views(inspector, schema, sql_config) - dataset_name = f"{schema}.{table}" - self.status.scanned('{}.{}'.format(self.config.get_service_name(), dataset_name)) - if not sql_config.filter_pattern.included(dataset_name): - self.status.filtered('{}.{}'.format(self.config.get_service_name(), dataset_name), - "Table pattern not allowed") - continue - - columns = inspector.get_columns(table, schema) - table_info = {} - try: - table_info: dict = inspector.get_table_comment(table, schema) - except NotImplementedError: - description: Optional[str] = None - else: - description = table_info["text"] - - table_columns = [] - row_order = 1 - for column in columns: - col_type = get_column_type(self.status, dataset_name, column['type']) - col_constraint = None - if column['nullable']: - col_constraint = ColumnConstraint.NULL - elif not column['nullable']: - col_constraint = ColumnConstraint.NOT_NULL - - if column['name'] in pk_columns: - col_constraint = ColumnConstraint.PRIMARY_KEY - elif column['name'] in unique_columns: - col_constraint = ColumnConstraint.UNIQUE - - table_columns.append(Column(name=column['name'], - description=column.get("comment", None), - columnDataType=col_type, - columnConstraint=col_constraint, - ordinalPosition=row_order)) - row_order = row_order + 1 - - db = Database( - name=schema, - service=EntityReference(id=self.service.id, type=self.config.service_type)) - table = Table(id=uuid.uuid4(), - name=table, - description=description if description is not None else ' ', - columns=table_columns) - - table_and_db = OMetaDatabaseAndTable(table=table, database=db) - yield table_and_db - except ValidationError as err: - logger.error(err) - self.status.filtered('{}.{}'.format(self.config.service_name, dataset_name), - "Validation error") + def fetch_tables(self, + inspector: Inspector, + schema: str, + sql_config: SQLConnectionConfig) -> Iterable[OMetaDatabaseAndTable]: + for table in inspector.get_table_names(schema): + try: + schema, table = self.standardize_schema_table_names(schema, table) + if not sql_config.filter_pattern.included(table): + self.status.filtered('{}.{}'.format(self.config.get_service_name(), table), + "Table pattern not allowed") continue + self.status.scanned('{}.{}'.format(self.config.get_service_name(), table)) + + description = self._get_table_description(schema, table, inspector) + + table_columns = self._get_columns(schema, table, inspector) + table = Table(id=uuid.uuid4(), + name=table, + tableType='Regular', + description=description if description is not None else ' ', + columns=table_columns) + + table_and_db = OMetaDatabaseAndTable(table=table, database=self._get_database(schema)) + yield table_and_db + except ValidationError as err: + logger.error(err) + self.status.filtered('{}.{}'.format(self.config.service_name, table), + "Validation error") + continue + + def fetch_views(self, + inspector: Inspector, + schema: str, + sql_config: SQLConnectionConfig) -> Iterable[OMetaDatabaseAndTable]: + for view in inspector.get_view_names(schema): + try: + if not sql_config.filter_pattern.included(view): + self.status.filtered('{}.{}'.format(self.config.get_service_name(), view), + "Table pattern not allowed") + continue + description = self._get_table_description(schema, view, inspector) + table_columns = self._get_columns(schema, view, inspector) + table = Table(id=uuid.uuid4(), + name=view, + tableType='View', + description=description if description is not None else ' ', + columns=table_columns) + table_and_db = OMetaDatabaseAndTable(table=table, database=self._get_database(schema)) + yield table_and_db + except ValidationError as err: + logger.error(err) + self.status.filtered('{}.{}'.format(self.config.service_name, view), + "Validation error") + continue + + def _get_database(self, schema: str) -> Database: + return Database(name=schema, + service=EntityReference(id=self.service.id, type=self.config.service_type)) + + def _get_columns(self, schema: str, table: str, inspector: Inspector) -> List[Column]: + pk_constraints = inspector.get_pk_constraint(table, schema) + pk_columns = pk_constraints['column_constraints'] if len( + pk_constraints) > 0 and "column_constraints" in pk_constraints.keys() else {} + unique_constraints = [] + try: + unique_constraints = inspector.get_unique_constraints(table, schema) + except NotImplementedError: + pass + unique_columns = [] + for constraint in unique_constraints: + if 'column_names' in constraint.keys(): + unique_columns = constraint['column_names'] + dataset_name = f"{schema}.{table}" + columns = inspector.get_columns(table, schema) + table_columns = [] + row_order = 1 + for column in columns: + col_type = get_column_type(self.status, dataset_name, column['type']) + col_constraint = None + if column['nullable']: + col_constraint = ColumnConstraint.NULL + elif not column['nullable']: + col_constraint = ColumnConstraint.NOT_NULL + + if column['name'] in pk_columns: + col_constraint = ColumnConstraint.PRIMARY_KEY + elif column['name'] in unique_columns: + col_constraint = ColumnConstraint.UNIQUE + + table_columns.append(Column(name=column['name'], + description=column.get("comment", None), + columnDataType=col_type, + columnConstraint=col_constraint, + ordinalPosition=row_order)) + row_order = row_order + 1 + + return table_columns + + def _get_table_description(self, schema: str, table: str, inspector: Inspector) -> str: + try: + table_info: dict = inspector.get_table_comment(table, schema) + except NotImplementedError: + description: Optional[str] = None + else: + description = table_info["text"] + return description def close(self): pass