diff --git a/ingestion/examples/workflows/redshift.json b/ingestion/examples/workflows/redshift.json index 77cf9824568..59fd2093e0f 100644 --- a/ingestion/examples/workflows/redshift.json +++ b/ingestion/examples/workflows/redshift.json @@ -4,10 +4,13 @@ "config": { "host_port": "redshift-cluster-1.clot5cqn1cnb.us-west-2.redshift.amazonaws.com:5439", "username": "awsuser", - "password": "focguC-kaqqe5-nepsok", + "password": "focguC-kaqqe5-nepsok", "database": "warehouse", "service_name": "aws_redshift", - "service_type": "Redshift" + "service_type": "Redshift", + "filter_pattern": { + "excludes": ["information_schema.*"] + } } }, "processor": { diff --git a/ingestion/src/metadata/ingestion/bulksink/elasticsearch.py b/ingestion/src/metadata/ingestion/bulksink/elasticsearch.py index cf9c3a4cdba..fac2ba9be03 100644 --- a/ingestion/src/metadata/ingestion/bulksink/elasticsearch.py +++ b/ingestion/src/metadata/ingestion/bulksink/elasticsearch.py @@ -119,7 +119,7 @@ class ElasticsearchBulkSink(BulkSink): table_name=table.name.__root__, suggest=suggest, description=table.description, - table_type=table.tableType, + table_type=table.tableType.name, last_updated_timestamp=timestamp, column_names=column_names, column_descriptions=column_descriptions, diff --git a/ingestion/src/metadata/ingestion/source/sample_tables.py b/ingestion/src/metadata/ingestion/source/sample_tables.py index b8b9b08ed15..16bb52a462c 100644 --- a/ingestion/src/metadata/ingestion/source/sample_tables.py +++ b/ingestion/src/metadata/ingestion/source/sample_tables.py @@ -175,7 +175,7 @@ class GenerateFakeSampleData: pass @classmethod - def checkColumns(self, columns): + def check_columns(self, columns): fake = Faker() colList = set() colData = [] @@ -226,7 +226,7 @@ class SampleTablesSource(Source): service=EntityReference(id=self.service.id, type=self.config.service_type)) for table in self.tables['tables']: if not table.get('sampleData'): - table['sampleData'] = GenerateFakeSampleData.checkColumns(table['columns']) + table['sampleData'] = GenerateFakeSampleData.check_columns(table['columns']) table_metadata = Table(**table) table_and_db = OMetaDatabaseAndTable(table=table_metadata, database=db) self.status.scanned(table_metadata.name.__root__) diff --git a/ingestion/src/metadata/ingestion/source/sql_source.py b/ingestion/src/metadata/ingestion/source/sql_source.py index 9cc74e04197..8c0f86d4185 100644 --- a/ingestion/src/metadata/ingestion/source/sql_source.py +++ b/ingestion/src/metadata/ingestion/source/sql_source.py @@ -28,7 +28,7 @@ from metadata.generated.schema.type.entityReference import EntityReference from metadata.generated.schema.entity.data.database import Database -from metadata.generated.schema.entity.data.table import Table, Column, ColumnConstraint, TableType +from metadata.generated.schema.entity.data.table import Table, Column, ColumnConstraint, TableType, TableData from sqlalchemy import create_engine, inspect from sqlalchemy.engine.reflection import Inspector from sqlalchemy.sql import sqltypes as types @@ -69,6 +69,7 @@ class SQLConnectionConfig(ConfigModel): options: dict = {} include_views: Optional[bool] = True include_tables: Optional[bool] = True + generate_sample_data: Optional[bool] = True filter_pattern: IncludeFilterPattern = IncludeFilterPattern.allow_all() @abstractmethod @@ -157,6 +158,9 @@ class SQLSource(Source): self.metadata_config = metadata_config self.service = get_service_or_create(config, metadata_config) self.status = SQLSourceStatus() + self.sql_config = self.config + self.engine = create_engine(self.sql_config.get_connection_url(), **self.sql_config.options) + self.connection = None def prepare(self): pass @@ -165,87 +169,112 @@ class SQLSource(Source): def create(cls, config_dict: dict, metadata_config_dict: dict, ctx: WorkflowContext): pass + def _get_connection(self) -> Any: + """ + Create a SQLAlchemy connection to Database + """ + if self.connection is None: + self.connection = self.engine.connect() + + def fetch_sample_data(self, schema: str, table: str): + try: + query = f"select * from {schema}.{table} limit 50" + logger.info("Fetching sample data, this may take a while {}".format(query)) + results = self.connection.execute(query) + cols = list(results.keys()) + rows = [] + for r in results: + row = list(r) + rows.append(row) + return TableData(columns=cols, rows=rows) + except: + logger.error("Failed to generate sample data for {}".format(table)) + def standardize_schema_table_names( self, schema: str, table: str ) -> Tuple[str, str]: return schema, table def next_record(self) -> Iterable[OMetaDatabaseAndTable]: - sql_config = self.config - url = sql_config.get_connection_url() - logger.debug(f"sql_connection_url={url}") - engine = create_engine(url, **sql_config.options) - inspector = inspect(engine) + + inspector = inspect(self.engine) for schema in inspector.get_schema_names(): - if not sql_config.filter_pattern.included(schema): + if not self.sql_config.filter_pattern.included(schema): self.status.filtered(schema, "Schema pattern not allowed") continue logger.debug("total tables {}".format(inspector.get_table_names(schema))) if self.config.include_tables: - yield from self.fetch_tables(inspector, schema, sql_config) + yield from self.fetch_tables(inspector, schema) if self.config.include_views: - yield from self.fetch_views(inspector, schema, sql_config) + yield from self.fetch_views(inspector, schema) def fetch_tables(self, inspector: Inspector, - schema: str, - sql_config: SQLConnectionConfig) -> Iterable[OMetaDatabaseAndTable]: - for table in inspector.get_table_names(schema): + schema: str) -> Iterable[OMetaDatabaseAndTable]: + for table_name in inspector.get_table_names(schema): try: - schema, table = self.standardize_schema_table_names(schema, table) - if not sql_config.filter_pattern.included(table): - self.status.filtered('{}.{}'.format(self.config.get_service_name(), table), + schema, table = self.standardize_schema_table_names(schema, table_name) + if not self.sql_config.filter_pattern.included(table_name): + self.status.filtered('{}.{}'.format(self.config.get_service_name(), table_name), "Table pattern not allowed") continue - self.status.scanned('{}.{}'.format(self.config.get_service_name(), table)) + self.status.scanned('{}.{}'.format(self.config.get_service_name(), table_name)) - description = self._get_table_description(schema, table, inspector) + description = self._get_table_description(schema, table_name, inspector) + + table_columns = self._get_columns(schema, table_name, inspector) - table_columns = self._get_columns(schema, table, inspector) table = Table(id=uuid.uuid4(), - name=table, + name=table_name, tableType='Regular', description=description if description is not None else ' ', columns=table_columns) + if self.sql_config.generate_sample_data: + self._get_connection() + table_data = self.fetch_sample_data(schema, table_name) + table.sampleData = table_data table_and_db = OMetaDatabaseAndTable(table=table, database=self._get_database(schema)) yield table_and_db except ValidationError as err: logger.error(err) - self.status.filtered('{}.{}'.format(self.config.service_name, table), + self.status.filtered('{}.{}'.format(self.config.service_name, table_name), "Validation error") continue def fetch_views(self, inspector: Inspector, - schema: str, - sql_config: SQLConnectionConfig) -> Iterable[OMetaDatabaseAndTable]: - for view in inspector.get_view_names(schema): + schema: str) -> Iterable[OMetaDatabaseAndTable]: + for view_name in inspector.get_view_names(schema): try: - if not sql_config.filter_pattern.included(view): - self.status.filtered('{}.{}'.format(self.config.get_service_name(), view), - "Table pattern not allowed") + if not self.sql_config.filter_pattern.included(view_name): + self.status.filtered('{}.{}'.format(self.config.get_service_name(), view_name), + "View pattern not allowed") continue - try: - view_definition = inspector.get_view_definition(view, schema) + view_definition = inspector.get_view_definition(view_name, schema) view_definition = "" if view_definition is None else str(view_definition) except NotImplementedError: view_definition = "" - description = self._get_table_description(schema, view, inspector) - table_columns = self._get_columns(schema, view, inspector) + description = self._get_table_description(schema, view_name, inspector) + table_columns = self._get_columns(schema, view_name, inspector) table = Table(id=uuid.uuid4(), - name=view, + name=view_name, tableType='View', description=description if description is not None else ' ', columns=table_columns, viewDefinition=view_definition) + if self.sql_config.generate_sample_data: + self._get_connection() + table_data = self.fetch_sample_data(schema, view_name) + table.sampleData = table_data + table_and_db = OMetaDatabaseAndTable(table=table, database=self._get_database(schema)) yield table_and_db except ValidationError as err: logger.error(err) - self.status.filtered('{}.{}'.format(self.config.service_name, view), + self.status.filtered('{}.{}'.format(self.config.service_name, view_name), "Validation error") continue