Merge pull request #167 from open-metadata/filter_pattern

Filter Pattern Modification
This commit is contained in:
Suresh Srinivas 2021-08-14 14:44:10 -07:00 committed by GitHub
commit 6cba58e8da
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 20 additions and 15 deletions

View File

@ -61,5 +61,6 @@ def metadata_ingestion_workflow():
workflow.stop() workflow.stop()
``` ```
Create a Worfklow instance and pass a hive configuration which will read metadata from Hive and ingest into OpenMetadata Server. You can customize this configuration or add different connectors please refer to our [examples](https://github.com/open-metadata/OpenMetadata/tree/main/ingestion/examples/workflows) and refer to \[Metadata Connectors\]\( Create a Workflow instance and pass a hive configuration which will read metadata from Hive
and ingest into OpenMetadata Server. You can customize this configuration or add different connectors please refer to our [examples](https://github.com/open-metadata/OpenMetadata/tree/main/ingestion/examples/workflows) and refer to [Metadata Connectors](

View File

@ -8,7 +8,7 @@
"database":"catalog_test", "database":"catalog_test",
"username": "sa", "username": "sa",
"password": "test!Password", "password": "test!Password",
"include_pattern": { "filter_pattern": {
"excludes": ["catalog_test.*"] "excludes": ["catalog_test.*"]
} }
} }

View File

@ -9,7 +9,7 @@
"account": "account_name", "account": "account_name",
"service_name": "snowflake", "service_name": "snowflake",
"service_type": "Snowflake", "service_type": "Snowflake",
"include_pattern": { "filter_pattern": {
"includes": [ "includes": [
"(\\w)*tpcds_sf100tcl", "(\\w)*tpcds_sf100tcl",
"(\\w)*tpcds_sf100tcl", "(\\w)*tpcds_sf100tcl",

View File

@ -6,7 +6,7 @@
"password": "openmetadata_password", "password": "openmetadata_password",
"service_name": "local_mysql", "service_name": "local_mysql",
"service_type": "MySQL", "service_type": "MySQL",
"include_pattern": { "filter_pattern": {
"excludes": ["mysql.*", "information_schema.*"] "excludes": ["mysql.*", "information_schema.*"]
} }
} }

View File

@ -36,6 +36,11 @@ def get_long_description():
return description return description
scheduler_requirements = {
"apns@git+git://github.com/djacobs/PyAPNs.git#egg=apns",
"simplescheduler@git+https://github.com/StreamlineData/sdscheduler.git#egg=simplescheduler"
}
base_requirements = { base_requirements = {
"commonregex", "commonregex",
"idna<3,>=2.5", "idna<3,>=2.5",
@ -82,6 +87,7 @@ plugins: Dict[str, Set[str]] = {
"postgres": {"pymysql>=1.0.2", "psycopg2-binary", "GeoAlchemy2"}, "postgres": {"pymysql>=1.0.2", "psycopg2-binary", "GeoAlchemy2"},
"redshift": {"sqlalchemy-redshift", "psycopg2-binary", "GeoAlchemy2"}, "redshift": {"sqlalchemy-redshift", "psycopg2-binary", "GeoAlchemy2"},
"redshift-usage": {"sqlalchemy-redshift", "psycopg2-binary", "GeoAlchemy2"}, "redshift-usage": {"sqlalchemy-redshift", "psycopg2-binary", "GeoAlchemy2"},
"scheduler": scheduler_requirements,
"snowflake": {"snowflake-sqlalchemy<=1.2.4"}, "snowflake": {"snowflake-sqlalchemy<=1.2.4"},
"snowflake-usage": {"snowflake-sqlalchemy<=1.2.4"}, "snowflake-usage": {"snowflake-sqlalchemy<=1.2.4"},
"sample-tables": {"faker~=8.1.1", } "sample-tables": {"faker~=8.1.1", }
@ -102,8 +108,7 @@ setup(
package_dir={"": "src"}, package_dir={"": "src"},
zip_safe=False, zip_safe=False,
dependency_links=[ dependency_links=[
"apns@git+git://github.com/djacobs/PyAPNs.git#egg=apns",
"simplescheduler@git+https://github.com/StreamlineData/sdscheduler.git#egg=simplescheduler"
], ],
project_urls={ project_urls={
"Documentation": "https://docs.open-metadata.org/", "Documentation": "https://docs.open-metadata.org/",

View File

@ -78,8 +78,8 @@ class IncludeFilterPattern(ConfigModel):
raise Exception("Regex Error: {}".format(err)) raise Exception("Regex Error: {}".format(err))
def is_fully_specified_include_list(self) -> bool: def is_fully_specified_include_list(self) -> bool:
for include_pattern in self.includes: for filter_pattern in self.includes:
if not self.alphabet_pattern.match(include_pattern): if not self.alphabet_pattern.match(filter_pattern):
return False return False
return True return True

View File

@ -38,8 +38,7 @@ class LdapRestUsersSink(Sink):
self.config = config self.config = config
self.metadata_config = metadata_config self.metadata_config = metadata_config
self.status = SinkStatus() self.status = SinkStatus()
self.api_users = self.metadata_config.api_endpoint + "/v1/users" self.api_users = "/users"
self.headers = {'Content-type': 'application/json'}
self.rest = REST(metadata_config) self.rest = REST(metadata_config)
@classmethod @classmethod

View File

@ -95,8 +95,8 @@ class PostgresSource(Source):
self.metadata_config = metadata_config self.metadata_config = metadata_config
self.status = SQLSourceStatus() self.status = SQLSourceStatus()
self.service = get_service_or_create(config, metadata_config) self.service = get_service_or_create(config, metadata_config)
self.include_pattern = IncludeFilterPattern
self.pattern = config self.pattern = config
self.filter_pattern: IncludeFilterPattern = IncludeFilterPattern.allow_all()
@classmethod @classmethod
def create(cls, config_dict, metadata_config_dict, ctx): def create(cls, config_dict, metadata_config_dict, ctx):
@ -144,7 +144,7 @@ class PostgresSource(Source):
col_type = 'BOOLEAN' col_type = 'BOOLEAN'
else: else:
col_type = None col_type = None
if not self.pattern.include_pattern.included(f'{last_row[1]}.{last_row[2]}'): if not self.pattern.filter_pattern.included(f'{last_row[1]}.{last_row[2]}'):
self.status.filtered(f'{last_row[1]}.{last_row[2]}', "pattern not allowed", last_row[2]) self.status.filtered(f'{last_row[1]}.{last_row[2]}', "pattern not allowed", last_row[2])
continue continue
if col_type is not None: if col_type is not None:

View File

@ -66,7 +66,7 @@ class SQLConnectionConfig(ConfigModel):
service_name: str service_name: str
service_type: str service_type: str
options: dict = {} options: dict = {}
include_pattern: IncludeFilterPattern = IncludeFilterPattern.allow_all() filter_pattern: IncludeFilterPattern = IncludeFilterPattern.allow_all()
@abstractmethod @abstractmethod
def get_connection_url(self): def get_connection_url(self):
@ -174,7 +174,7 @@ class SQLSource(Source):
engine = create_engine(url, **sql_config.options) engine = create_engine(url, **sql_config.options)
inspector = inspect(engine) inspector = inspect(engine)
for schema in inspector.get_schema_names(): for schema in inspector.get_schema_names():
if not sql_config.include_pattern.included(schema): if not sql_config.filter_pattern.included(schema):
self.status.filtered(schema, "Schema pattern not allowed") self.status.filtered(schema, "Schema pattern not allowed")
continue continue
logger.debug("total tables {}".format(inspector.get_table_names(schema))) logger.debug("total tables {}".format(inspector.get_table_names(schema)))
@ -196,7 +196,7 @@ class SQLSource(Source):
dataset_name = f"{schema}.{table}" dataset_name = f"{schema}.{table}"
self.status.scanned('{}.{}'.format(self.config.get_service_name(), dataset_name)) self.status.scanned('{}.{}'.format(self.config.get_service_name(), dataset_name))
if not sql_config.include_pattern.included(dataset_name): if not sql_config.filter_pattern.included(dataset_name):
self.status.filtered('{}.{}'.format(self.config.get_service_name(), dataset_name), self.status.filtered('{}.{}'.format(self.config.get_service_name(), dataset_name),
"Table pattern not allowed") "Table pattern not allowed")
continue continue