From f5194669aeb2d0b7a08c78c3314e7e2fd653ca02 Mon Sep 17 00:00:00 2001 From: Ayush Shah Date: Sun, 15 Aug 2021 01:20:12 +0530 Subject: [PATCH 1/3] Filter Pattern Modification --- docs/install/metadata-ingestion/airflow.md | 2 +- ingestion/examples/workflows/mssql.json | 2 +- ingestion/examples/workflows/snowflake.json | 2 +- ingestion/pipelines/mysql.json | 2 +- ingestion/src/metadata/ingestion/api/common.py | 4 ++-- ingestion/src/metadata/ingestion/sink/ldap_rest_users.py | 3 +-- ingestion/src/metadata/ingestion/source/postgres.py | 4 ++-- ingestion/src/metadata/ingestion/source/sql_source.py | 6 +++--- 8 files changed, 12 insertions(+), 13 deletions(-) diff --git a/docs/install/metadata-ingestion/airflow.md b/docs/install/metadata-ingestion/airflow.md index b8e26a25ef5..7c631bc01c0 100644 --- a/docs/install/metadata-ingestion/airflow.md +++ b/docs/install/metadata-ingestion/airflow.md @@ -63,6 +63,6 @@ def metadata_ingestion_workflow(): workflow.stop() ``` -Create a Worfklow instance and pass a hive configuration which will read metadata from Hive +Create a Workflow instance and pass a hive configuration which will read metadata from Hive and ingest into OpenMetadata Server. You can customize this configuration or add different connectors please refer to our [examples](https://github.com/open-metadata/OpenMetadata/tree/main/ingestion/examples/workflows) and refer to [Metadata Connectors]( diff --git a/ingestion/examples/workflows/mssql.json b/ingestion/examples/workflows/mssql.json index 49b4cc187bd..449f3e4eaba 100644 --- a/ingestion/examples/workflows/mssql.json +++ b/ingestion/examples/workflows/mssql.json @@ -8,7 +8,7 @@ "database":"catalog_test", "username": "sa", "password": "test!Password", - "include_pattern": { + "filter_pattern": { "excludes": ["catalog_test.*"] } } diff --git a/ingestion/examples/workflows/snowflake.json b/ingestion/examples/workflows/snowflake.json index 86128d93665..4b7f4c41351 100644 --- a/ingestion/examples/workflows/snowflake.json +++ b/ingestion/examples/workflows/snowflake.json @@ -9,7 +9,7 @@ "account": "account_name", "service_name": "snowflake", "service_type": "Snowflake", - "include_pattern": { + "filter_pattern": { "includes": [ "(\\w)*tpcds_sf100tcl", "(\\w)*tpcds_sf100tcl", diff --git a/ingestion/pipelines/mysql.json b/ingestion/pipelines/mysql.json index 09e47380441..518eb58b483 100644 --- a/ingestion/pipelines/mysql.json +++ b/ingestion/pipelines/mysql.json @@ -6,7 +6,7 @@ "password": "openmetadata_password", "service_name": "local_mysql", "service_type": "MySQL", - "include_pattern": { + "filter_pattern": { "excludes": ["mysql.*", "information_schema.*"] } } diff --git a/ingestion/src/metadata/ingestion/api/common.py b/ingestion/src/metadata/ingestion/api/common.py index 3b0fb5aa80c..4e4deca2fed 100644 --- a/ingestion/src/metadata/ingestion/api/common.py +++ b/ingestion/src/metadata/ingestion/api/common.py @@ -78,8 +78,8 @@ class IncludeFilterPattern(ConfigModel): raise Exception("Regex Error: {}".format(err)) def is_fully_specified_include_list(self) -> bool: - for include_pattern in self.includes: - if not self.alphabet_pattern.match(include_pattern): + for filter_pattern in self.includes: + if not self.alphabet_pattern.match(filter_pattern): return False return True diff --git a/ingestion/src/metadata/ingestion/sink/ldap_rest_users.py b/ingestion/src/metadata/ingestion/sink/ldap_rest_users.py index a50783623a0..1cd422b23fd 100644 --- a/ingestion/src/metadata/ingestion/sink/ldap_rest_users.py +++ b/ingestion/src/metadata/ingestion/sink/ldap_rest_users.py @@ -38,8 +38,7 @@ class LdapRestUsersSink(Sink): self.config = config self.metadata_config = metadata_config self.status = SinkStatus() - self.api_users = self.metadata_config.api_endpoint + "/v1/users" - self.headers = {'Content-type': 'application/json'} + self.api_users = "/users" self.rest = REST(metadata_config) @classmethod diff --git a/ingestion/src/metadata/ingestion/source/postgres.py b/ingestion/src/metadata/ingestion/source/postgres.py index 35335076691..3166cfaaccd 100644 --- a/ingestion/src/metadata/ingestion/source/postgres.py +++ b/ingestion/src/metadata/ingestion/source/postgres.py @@ -95,7 +95,7 @@ class PostgresSource(Source): self.metadata_config = metadata_config self.status = SQLSourceStatus() self.service = get_service_or_create(config, metadata_config) - self.include_pattern = IncludeFilterPattern + self.filter_pattern = IncludeFilterPattern self.pattern = config @classmethod @@ -144,7 +144,7 @@ class PostgresSource(Source): col_type = 'BOOLEAN' else: col_type = None - if not self.pattern.include_pattern.included(f'{last_row[1]}.{last_row[2]}'): + if not self.pattern.filter_pattern.included(f'{last_row[1]}.{last_row[2]}'): self.status.filtered(f'{last_row[1]}.{last_row[2]}', "pattern not allowed", last_row[2]) continue if col_type is not None: diff --git a/ingestion/src/metadata/ingestion/source/sql_source.py b/ingestion/src/metadata/ingestion/source/sql_source.py index 687b69a20aa..450dc329c3d 100644 --- a/ingestion/src/metadata/ingestion/source/sql_source.py +++ b/ingestion/src/metadata/ingestion/source/sql_source.py @@ -66,7 +66,7 @@ class SQLConnectionConfig(ConfigModel): service_name: str service_type: str options: dict = {} - include_pattern: IncludeFilterPattern = IncludeFilterPattern.allow_all() + filter_pattern: IncludeFilterPattern = IncludeFilterPattern.allow_all() @abstractmethod def get_connection_url(self): @@ -174,7 +174,7 @@ class SQLSource(Source): engine = create_engine(url, **sql_config.options) inspector = inspect(engine) for schema in inspector.get_schema_names(): - if not sql_config.include_pattern.included(schema): + if not sql_config.filter_pattern.included(schema): self.status.filtered(schema, "Schema pattern not allowed") continue logger.debug("total tables {}".format(inspector.get_table_names(schema))) @@ -196,7 +196,7 @@ class SQLSource(Source): dataset_name = f"{schema}.{table}" self.status.scanned('{}.{}'.format(self.config.get_service_name(), dataset_name)) - if not sql_config.include_pattern.included(dataset_name): + if not sql_config.filter_pattern.included(dataset_name): self.status.filtered('{}.{}'.format(self.config.get_service_name(), dataset_name), "Table pattern not allowed") continue From 37e59518a535c1d90485418be7d92ea01acf8fac Mon Sep 17 00:00:00 2001 From: Ayush Shah Date: Sun, 15 Aug 2021 01:32:47 +0530 Subject: [PATCH 2/3] Scheduler Connector --- ingestion/setup.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/ingestion/setup.py b/ingestion/setup.py index e1752f2f6fa..86180b9a4c2 100644 --- a/ingestion/setup.py +++ b/ingestion/setup.py @@ -36,6 +36,11 @@ def get_long_description(): return description +scheduler_requirements = { + "apns@git+git://github.com/djacobs/PyAPNs.git#egg=apns", + "simplescheduler@git+https://github.com/StreamlineData/sdscheduler.git#egg=simplescheduler" +} + base_requirements = { "commonregex", "idna<3,>=2.5", @@ -82,6 +87,7 @@ plugins: Dict[str, Set[str]] = { "postgres": {"pymysql>=1.0.2", "psycopg2-binary", "GeoAlchemy2"}, "redshift": {"sqlalchemy-redshift", "psycopg2-binary", "GeoAlchemy2"}, "redshift-usage": {"sqlalchemy-redshift", "psycopg2-binary", "GeoAlchemy2"}, + "scheduler": scheduler_requirements, "snowflake": {"snowflake-sqlalchemy<=1.2.4"}, "snowflake-usage": {"snowflake-sqlalchemy<=1.2.4"}, "sample-tables": {"faker~=8.1.1", } @@ -102,8 +108,7 @@ setup( package_dir={"": "src"}, zip_safe=False, dependency_links=[ - "apns@git+git://github.com/djacobs/PyAPNs.git#egg=apns", - "simplescheduler@git+https://github.com/StreamlineData/sdscheduler.git#egg=simplescheduler" + ], project_urls={ "Documentation": "https://docs.open-metadata.org/", From 5e949eb202304ff3b6f08c1923b7eecb52eb2453 Mon Sep 17 00:00:00 2001 From: Ayush Shah Date: Sun, 15 Aug 2021 01:45:41 +0530 Subject: [PATCH 3/3] Postgres IncludeFilterPattern Modification --- ingestion/src/metadata/ingestion/source/postgres.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ingestion/src/metadata/ingestion/source/postgres.py b/ingestion/src/metadata/ingestion/source/postgres.py index 3166cfaaccd..c5a609a5e17 100644 --- a/ingestion/src/metadata/ingestion/source/postgres.py +++ b/ingestion/src/metadata/ingestion/source/postgres.py @@ -95,8 +95,8 @@ class PostgresSource(Source): self.metadata_config = metadata_config self.status = SQLSourceStatus() self.service = get_service_or_create(config, metadata_config) - self.filter_pattern = IncludeFilterPattern self.pattern = config + self.filter_pattern: IncludeFilterPattern = IncludeFilterPattern.allow_all() @classmethod def create(cls, config_dict, metadata_config_dict, ctx):