Merge pull request #244 from open-metadata/issue-243

ISSUE-243: Generate sample data
This commit is contained in:
Suresh Srinivas 2021-08-20 07:47:58 -07:00 committed by GitHub
commit dcf76af169
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 69 additions and 37 deletions

View File

@ -7,7 +7,10 @@
"password": "focguC-kaqqe5-nepsok", "password": "focguC-kaqqe5-nepsok",
"database": "warehouse", "database": "warehouse",
"service_name": "aws_redshift", "service_name": "aws_redshift",
"service_type": "Redshift" "service_type": "Redshift",
"filter_pattern": {
"excludes": ["information_schema.*"]
}
} }
}, },
"processor": { "processor": {

View File

@ -119,7 +119,7 @@ class ElasticsearchBulkSink(BulkSink):
table_name=table.name.__root__, table_name=table.name.__root__,
suggest=suggest, suggest=suggest,
description=table.description, description=table.description,
table_type=table.tableType, table_type=table.tableType.name,
last_updated_timestamp=timestamp, last_updated_timestamp=timestamp,
column_names=column_names, column_names=column_names,
column_descriptions=column_descriptions, column_descriptions=column_descriptions,

View File

@ -175,7 +175,7 @@ class GenerateFakeSampleData:
pass pass
@classmethod @classmethod
def checkColumns(self, columns): def check_columns(self, columns):
fake = Faker() fake = Faker()
colList = set() colList = set()
colData = [] colData = []
@ -226,7 +226,7 @@ class SampleTablesSource(Source):
service=EntityReference(id=self.service.id, type=self.config.service_type)) service=EntityReference(id=self.service.id, type=self.config.service_type))
for table in self.tables['tables']: for table in self.tables['tables']:
if not table.get('sampleData'): if not table.get('sampleData'):
table['sampleData'] = GenerateFakeSampleData.checkColumns(table['columns']) table['sampleData'] = GenerateFakeSampleData.check_columns(table['columns'])
table_metadata = Table(**table) table_metadata = Table(**table)
table_and_db = OMetaDatabaseAndTable(table=table_metadata, database=db) table_and_db = OMetaDatabaseAndTable(table=table_metadata, database=db)
self.status.scanned(table_metadata.name.__root__) self.status.scanned(table_metadata.name.__root__)

View File

@ -28,7 +28,7 @@ from metadata.generated.schema.type.entityReference import EntityReference
from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.table import Table, Column, ColumnConstraint, TableType from metadata.generated.schema.entity.data.table import Table, Column, ColumnConstraint, TableType, TableData
from sqlalchemy import create_engine, inspect from sqlalchemy import create_engine, inspect
from sqlalchemy.engine.reflection import Inspector from sqlalchemy.engine.reflection import Inspector
from sqlalchemy.sql import sqltypes as types from sqlalchemy.sql import sqltypes as types
@ -69,6 +69,7 @@ class SQLConnectionConfig(ConfigModel):
options: dict = {} options: dict = {}
include_views: Optional[bool] = True include_views: Optional[bool] = True
include_tables: Optional[bool] = True include_tables: Optional[bool] = True
generate_sample_data: Optional[bool] = True
filter_pattern: IncludeFilterPattern = IncludeFilterPattern.allow_all() filter_pattern: IncludeFilterPattern = IncludeFilterPattern.allow_all()
@abstractmethod @abstractmethod
@ -157,6 +158,9 @@ class SQLSource(Source):
self.metadata_config = metadata_config self.metadata_config = metadata_config
self.service = get_service_or_create(config, metadata_config) self.service = get_service_or_create(config, metadata_config)
self.status = SQLSourceStatus() self.status = SQLSourceStatus()
self.sql_config = self.config
self.engine = create_engine(self.sql_config.get_connection_url(), **self.sql_config.options)
self.connection = None
def prepare(self): def prepare(self):
pass pass
@ -165,87 +169,112 @@ class SQLSource(Source):
def create(cls, config_dict: dict, metadata_config_dict: dict, ctx: WorkflowContext): def create(cls, config_dict: dict, metadata_config_dict: dict, ctx: WorkflowContext):
pass pass
def _get_connection(self) -> Any:
"""
Create a SQLAlchemy connection to Database
"""
if self.connection is None:
self.connection = self.engine.connect()
def fetch_sample_data(self, schema: str, table: str):
try:
query = f"select * from {schema}.{table} limit 50"
logger.info("Fetching sample data, this may take a while {}".format(query))
results = self.connection.execute(query)
cols = list(results.keys())
rows = []
for r in results:
row = list(r)
rows.append(row)
return TableData(columns=cols, rows=rows)
except:
logger.error("Failed to generate sample data for {}".format(table))
def standardize_schema_table_names( def standardize_schema_table_names(
self, schema: str, table: str self, schema: str, table: str
) -> Tuple[str, str]: ) -> Tuple[str, str]:
return schema, table return schema, table
def next_record(self) -> Iterable[OMetaDatabaseAndTable]: def next_record(self) -> Iterable[OMetaDatabaseAndTable]:
sql_config = self.config
url = sql_config.get_connection_url() inspector = inspect(self.engine)
logger.debug(f"sql_connection_url={url}")
engine = create_engine(url, **sql_config.options)
inspector = inspect(engine)
for schema in inspector.get_schema_names(): for schema in inspector.get_schema_names():
if not sql_config.filter_pattern.included(schema): if not self.sql_config.filter_pattern.included(schema):
self.status.filtered(schema, "Schema pattern not allowed") self.status.filtered(schema, "Schema pattern not allowed")
continue continue
logger.debug("total tables {}".format(inspector.get_table_names(schema))) logger.debug("total tables {}".format(inspector.get_table_names(schema)))
if self.config.include_tables: if self.config.include_tables:
yield from self.fetch_tables(inspector, schema, sql_config) yield from self.fetch_tables(inspector, schema)
if self.config.include_views: if self.config.include_views:
yield from self.fetch_views(inspector, schema, sql_config) yield from self.fetch_views(inspector, schema)
def fetch_tables(self, def fetch_tables(self,
inspector: Inspector, inspector: Inspector,
schema: str, schema: str) -> Iterable[OMetaDatabaseAndTable]:
sql_config: SQLConnectionConfig) -> Iterable[OMetaDatabaseAndTable]: for table_name in inspector.get_table_names(schema):
for table in inspector.get_table_names(schema):
try: try:
schema, table = self.standardize_schema_table_names(schema, table) schema, table = self.standardize_schema_table_names(schema, table_name)
if not sql_config.filter_pattern.included(table): if not self.sql_config.filter_pattern.included(table_name):
self.status.filtered('{}.{}'.format(self.config.get_service_name(), table), self.status.filtered('{}.{}'.format(self.config.get_service_name(), table_name),
"Table pattern not allowed") "Table pattern not allowed")
continue continue
self.status.scanned('{}.{}'.format(self.config.get_service_name(), table)) self.status.scanned('{}.{}'.format(self.config.get_service_name(), table_name))
description = self._get_table_description(schema, table, inspector) description = self._get_table_description(schema, table_name, inspector)
table_columns = self._get_columns(schema, table_name, inspector)
table_columns = self._get_columns(schema, table, inspector)
table = Table(id=uuid.uuid4(), table = Table(id=uuid.uuid4(),
name=table, name=table_name,
tableType='Regular', tableType='Regular',
description=description if description is not None else ' ', description=description if description is not None else ' ',
columns=table_columns) columns=table_columns)
if self.sql_config.generate_sample_data:
self._get_connection()
table_data = self.fetch_sample_data(schema, table_name)
table.sampleData = table_data
table_and_db = OMetaDatabaseAndTable(table=table, database=self._get_database(schema)) table_and_db = OMetaDatabaseAndTable(table=table, database=self._get_database(schema))
yield table_and_db yield table_and_db
except ValidationError as err: except ValidationError as err:
logger.error(err) logger.error(err)
self.status.filtered('{}.{}'.format(self.config.service_name, table), self.status.filtered('{}.{}'.format(self.config.service_name, table_name),
"Validation error") "Validation error")
continue continue
def fetch_views(self, def fetch_views(self,
inspector: Inspector, inspector: Inspector,
schema: str, schema: str) -> Iterable[OMetaDatabaseAndTable]:
sql_config: SQLConnectionConfig) -> Iterable[OMetaDatabaseAndTable]: for view_name in inspector.get_view_names(schema):
for view in inspector.get_view_names(schema):
try: try:
if not sql_config.filter_pattern.included(view): if not self.sql_config.filter_pattern.included(view_name):
self.status.filtered('{}.{}'.format(self.config.get_service_name(), view), self.status.filtered('{}.{}'.format(self.config.get_service_name(), view_name),
"Table pattern not allowed") "View pattern not allowed")
continue continue
try: try:
view_definition = inspector.get_view_definition(view, schema) view_definition = inspector.get_view_definition(view_name, schema)
view_definition = "" if view_definition is None else str(view_definition) view_definition = "" if view_definition is None else str(view_definition)
except NotImplementedError: except NotImplementedError:
view_definition = "" view_definition = ""
description = self._get_table_description(schema, view, inspector) description = self._get_table_description(schema, view_name, inspector)
table_columns = self._get_columns(schema, view, inspector) table_columns = self._get_columns(schema, view_name, inspector)
table = Table(id=uuid.uuid4(), table = Table(id=uuid.uuid4(),
name=view, name=view_name,
tableType='View', tableType='View',
description=description if description is not None else ' ', description=description if description is not None else ' ',
columns=table_columns, columns=table_columns,
viewDefinition=view_definition) viewDefinition=view_definition)
if self.sql_config.generate_sample_data:
self._get_connection()
table_data = self.fetch_sample_data(schema, view_name)
table.sampleData = table_data
table_and_db = OMetaDatabaseAndTable(table=table, database=self._get_database(schema)) table_and_db = OMetaDatabaseAndTable(table=table, database=self._get_database(schema))
yield table_and_db yield table_and_db
except ValidationError as err: except ValidationError as err:
logger.error(err) logger.error(err)
self.status.filtered('{}.{}'.format(self.config.service_name, view), self.status.filtered('{}.{}'.format(self.config.service_name, view_name),
"Validation error") "Validation error")
continue continue