diff --git a/Makefile b/Makefile index ded4bed04d7..fa0d593c5b0 100644 --- a/Makefile +++ b/Makefile @@ -37,7 +37,7 @@ lint: ## Run pylint on the Python sources to analyze the codebase .PHONY: py_format py_format: ## Run black and isort to format the Python codebase - isort ingestion/ --skip $(PY_SOURCE)/metadata/generated --skip ingestion/build --profile black --multi-line 3 + isort ingestion/ --skip $(PY_SOURCE)/metadata/generated --skip ingestion/env --skip ingestion/build --profile black --multi-line 3 black ingestion/ --extend-exclude $(PY_SOURCE)/metadata/generated .PHONY: py_format_check diff --git a/catalog-rest-service/src/main/resources/json/schema/operations/pipelines/databaseServiceMetadataPipeline.json b/catalog-rest-service/src/main/resources/json/schema/operations/pipelines/databaseServiceMetadataPipeline.json index 925378ff981..199fbc5bb56 100644 --- a/catalog-rest-service/src/main/resources/json/schema/operations/pipelines/databaseServiceMetadataPipeline.json +++ b/catalog-rest-service/src/main/resources/json/schema/operations/pipelines/databaseServiceMetadataPipeline.json @@ -51,12 +51,12 @@ "default": "select * from {}.{} limit 50" }, "warehouse": { - "description": "Optional Warehouse.", + "description": "Sample data extraction query.", "type": "string", "default": null }, "account": { - "description": "Optional Account.", + "description": "Sample data extraction query.", "type": "string", "default": null }, diff --git a/ingestion-core/src/metadata/_version.py b/ingestion-core/src/metadata/_version.py index 10545954fdb..82a5ec83861 100644 --- a/ingestion-core/src/metadata/_version.py +++ b/ingestion-core/src/metadata/_version.py @@ -7,5 +7,5 @@ Provides metadata version information. from incremental import Version -__version__ = Version("metadata", 0, 9, 0, dev=26) +__version__ = Version("metadata", 0, 9, 0) __all__ = ["__version__"] diff --git a/ingestion/examples/workflows/snowflake_usage.json b/ingestion/examples/workflows/snowflake_usage.json index 285df234e5e..da3f9b2115e 100644 --- a/ingestion/examples/workflows/snowflake_usage.json +++ b/ingestion/examples/workflows/snowflake_usage.json @@ -7,7 +7,9 @@ "password": "strong_password", "database": "SNOWFLAKE_SAMPLE_DATA", "account": "account_name", + "warehouse": "COMPUTE_WH", "service_name": "snowflake", + "result_limit": 1000, "duration": 2 } }, diff --git a/ingestion/setup.py b/ingestion/setup.py index 4607f33cb45..86fc2699b44 100644 --- a/ingestion/setup.py +++ b/ingestion/setup.py @@ -23,7 +23,7 @@ def get_long_description(): base_requirements = { - "openmetadata-ingestion-core==0.8.0", + "openmetadata-ingestion-core==0.9.0", "commonregex", "idna<3,>=2.5", "click>=7.1.1", diff --git a/ingestion/src/metadata/ingestion/source/looker.py b/ingestion/src/metadata/ingestion/source/looker.py index dd98bb5c5c3..e495d23d0d9 100644 --- a/ingestion/src/metadata/ingestion/source/looker.py +++ b/ingestion/src/metadata/ingestion/source/looker.py @@ -84,6 +84,7 @@ class LookerSource(Source[Entity]): self.config = config self.metadata_config = metadata_config self.client = self.looker_client() + self.status = LookerDashboardSourceStatus() self.service = get_dashboard_service_or_create( service_name=config.service_name, dashboard_service_type=DashboardServiceType.Looker.name, @@ -137,7 +138,7 @@ class LookerSource(Source[Entity]): id=uuid.uuid4(), name=dashboard_elements.id, displayName=dashboard_elements.id, - description=dashboard_elements.title, + description=dashboard_elements.title if dashboard_elements.title else "", chart_type=dashboard_elements.type, url=self.config.url, service=EntityReference(id=self.service.id, type="dashboardService"), @@ -164,9 +165,8 @@ class LookerSource(Source[Entity]): for iter_chart in dashboard.dashboard_elements: chart = self._get_dashboard_elements(iter_chart) if chart: - charts.append(chart) + charts.append(chart.name) yield Dashboard( - id=uuid.uuid4(), name=dashboard.id, displayName=dashboard.title, description="temp", diff --git a/ingestion/src/metadata/ingestion/source/sample_usage.py b/ingestion/src/metadata/ingestion/source/sample_usage.py index 01f05484f5f..9abe84bd294 100644 --- a/ingestion/src/metadata/ingestion/source/sample_usage.py +++ b/ingestion/src/metadata/ingestion/source/sample_usage.py @@ -64,7 +64,7 @@ class SampleUsageSource(Source[TableQuery]): user_name="", starttime="", endtime="", - analysis_date=datetime.today().strftime("%Y-%m-%d %H:%M:%S"), + analysis_date=datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S"), database="shopify", aborted=False, sql=row["query"], diff --git a/ingestion/src/metadata/ingestion/source/snowflake.py b/ingestion/src/metadata/ingestion/source/snowflake.py index 45be570e585..5b3fdf2cf7d 100644 --- a/ingestion/src/metadata/ingestion/source/snowflake.py +++ b/ingestion/src/metadata/ingestion/source/snowflake.py @@ -36,6 +36,7 @@ class SnowflakeConfig(SQLConnectionConfig): account: str database: str warehouse: str + result_limit: int = 1000 role: Optional[str] duration: Optional[int] service_type = DatabaseServiceType.Snowflake.value diff --git a/ingestion/src/metadata/ingestion/source/snowflake_usage.py b/ingestion/src/metadata/ingestion/source/snowflake_usage.py index fd7e27260a1..2f4b7426e14 100644 --- a/ingestion/src/metadata/ingestion/source/snowflake_usage.py +++ b/ingestion/src/metadata/ingestion/source/snowflake_usage.py @@ -11,7 +11,8 @@ """ Snowflake usage module """ - +import logging +import traceback from typing import Any, Dict, Iterable, Iterator, Union from metadata.generated.schema.entity.services.databaseService import ( @@ -30,6 +31,8 @@ from metadata.ingestion.source.sql_alchemy_helper import ( from metadata.utils.helpers import get_start_and_end, ingest_lineage from metadata.utils.sql_queries import SNOWFLAKE_SQL_STATEMENT +logger: logging.Logger = logging.getLogger(__name__) + class SnowflakeUsageSource(Source[TableQuery]): """ @@ -77,7 +80,7 @@ class SnowflakeUsageSource(Source[TableQuery]): self.analysis_date = start self.metadata_config = metadata_config self.sql_stmt = SnowflakeUsageSource.SQL_STATEMENT.format( - start_date=start, end_date=end + start_date=start, end_date=end, result_limit=self.config.result_limit ) self.alchemy_helper = SQLAlchemyHelper( config, metadata_config, ctx, "Snowflake", self.sql_stmt @@ -111,29 +114,33 @@ class SnowflakeUsageSource(Source[TableQuery]): :return: """ for row in self._get_raw_extract_iter(): - table_query = TableQuery( - query=row["query_type"], - user_name=row["user_name"], - starttime=str(row["start_time"]), - endtime=str(row["end_time"]), - analysis_date=self.analysis_date, - aborted="1969" in str(row["end_time"]), - database=row["database_name"], - sql=row["query_text"], - service_name=self.config.service_name, - ) - if row["schema_name"] is not None: - self.report.scanned(f"{row['database_name']}.{row['schema_name']}") - else: - self.report.scanned(f"{row['database_name']}") - yield table_query - query_info = { - "sql": table_query.sql, - "from_type": "table", - "to_type": "table", - "service_name": self.config.service_name, - } - ingest_lineage(query_info, self.metadata_config) + try: + table_query = TableQuery( + query=row["query_type"], + user_name=row["user_name"], + starttime=str(row["start_time"]), + endtime=str(row["end_time"]), + analysis_date=self.analysis_date, + aborted="1969" in str(row["end_time"]), + database=row["database_name"], + sql=row["query_text"], + service_name=self.config.service_name, + ) + if row["schema_name"] is not None: + self.report.scanned(f"{row['database_name']}.{row['schema_name']}") + else: + self.report.scanned(f"{row['database_name']}") + yield table_query + query_info = { + "sql": table_query.sql, + "from_type": "table", + "to_type": "table", + "service_name": self.config.service_name, + } + ingest_lineage(query_info, self.metadata_config) + except Exception as err: + logger.debug(traceback.print_exc()) + logger.debug(repr(err)) def get_report(self): """ diff --git a/ingestion/src/metadata/ingestion/source/sql_source.py b/ingestion/src/metadata/ingestion/source/sql_source.py index 44502d864c0..bace8616338 100644 --- a/ingestion/src/metadata/ingestion/source/sql_source.py +++ b/ingestion/src/metadata/ingestion/source/sql_source.py @@ -625,7 +625,7 @@ class SQLSource(Source[OMetaDatabaseAndTable]): and "policy_tags" in column and column["policy_tags"] ): - self.metadata.create_primary_tag_category( + self.metadata.create_tag_category( category=self.config.tag_category_name, data=Tag( name=column["policy_tags"], description="" diff --git a/ingestion/src/metadata/utils/sql_queries.py b/ingestion/src/metadata/utils/sql_queries.py index 22801488fdc..70e9ca7825b 100644 --- a/ingestion/src/metadata/utils/sql_queries.py +++ b/ingestion/src/metadata/utils/sql_queries.py @@ -22,7 +22,6 @@ REDSHIFT_SQL_STATEMENT = """ ORDER BY ss.endtime DESC; """ - REDSHIFT_GET_ALL_RELATION_INFO = """ SELECT c.relkind, @@ -46,7 +45,6 @@ REDSHIFT_GET_ALL_RELATION_INFO = """ ORDER BY c.relkind, n.oid, n.nspname; """ - REDSHIFT_GET_SCHEMA_COLUMN_INFO = """ SELECT n.nspname as "schema", @@ -141,7 +139,7 @@ SNOWFLAKE_SQL_STATEMENT = """ schema_name,start_time,end_time from table(information_schema.query_history( end_time_range_start=>to_timestamp_ltz('{start_date}'), - end_time_range_end=>to_timestamp_ltz('{end_date}'))); + end_time_range_end=>to_timestamp_ltz('{end_date}'),RESULT_LIMIT=>{result_limit})); """ NEO4J_AMUNDSEN_TABLE_QUERY = textwrap.dedent(