postgres pattern and connection string changes

This commit is contained in:
Ayush Shah 2021-08-11 23:54:45 +05:30
parent a9c92b3b08
commit d87c641c4a

View File

@ -47,6 +47,9 @@ class PostgresSourceConfig(SQLConnectionConfig):
def get_service_type(self) -> DatabaseServiceType: def get_service_type(self) -> DatabaseServiceType:
return DatabaseServiceType[self.service_type] return DatabaseServiceType[self.service_type]
def get_connection_url(self):
return super().get_connection_url()
def get_table_key(row: Dict[str, Any]) -> Union[TableKey, None]: def get_table_key(row: Dict[str, Any]) -> Union[TableKey, None]:
""" """
@ -93,7 +96,7 @@ class PostgresSource(Source):
self.status = SQLSourceStatus() self.status = SQLSourceStatus()
self.service = get_service_or_create(config, metadata_config) self.service = get_service_or_create(config, metadata_config)
self.include_pattern = IncludeFilterPattern self.include_pattern = IncludeFilterPattern
self.pattern = config.include_pattern self.pattern = config
@classmethod @classmethod
def create(cls, config_dict, metadata_config_dict, ctx): def create(cls, config_dict, metadata_config_dict, ctx):
@ -139,8 +142,8 @@ class PostgresSource(Source):
col_type = 'CHAR' col_type = 'CHAR'
else: else:
col_type = row['col_type'].upper() col_type = row['col_type'].upper()
if not self.include_pattern.included(self.pattern, last_row[1]): if not self.pattern.include_pattern.included(f'{last_row[1]}.{last_row[2]}'):
self.status.report_dropped(last_row['name']) self.status.filtered(f'{last_row[1]}.{last_row[2]}', "pattern not allowed", last_row[2])
continue continue
columns.append(Column(name=row['col_name'], description=row['col_description'], columns.append(Column(name=row['col_name'], description=row['col_description'],
columnDataType=col_type, ordinalPosition=int(row['col_sort_order']))) columnDataType=col_type, ordinalPosition=int(row['col_sort_order'])))
@ -149,14 +152,13 @@ class PostgresSource(Source):
description=last_row['description'], description=last_row['description'],
columns=columns) columns=columns)
self.status.report_table_scanned(table_metadata.name) self.status.scanned(table_metadata.name.__root__)
dm = DatabaseEntity(id=uuid.uuid4(), dm = DatabaseEntity(id=uuid.uuid4(),
name=row['schema'], name=row['schema'],
description=row['description'] if row['description'] is not None else ' ', description=row['description'] if row['description'] is not None else ' ',
service=EntityReference(id=self.service.id, type=self.SERVICE_TYPE)) service=EntityReference(id=self.service.id, type=self.SERVICE_TYPE))
table_and_db = OMetaDatabaseAndTable(table=table_metadata, database=dm) table_and_db = OMetaDatabaseAndTable(table=table_metadata, database=dm)
self.status.records_produced(dm)
yield table_and_db yield table_and_db
def close(self): def close(self):