Minor Fixes - Ingestion - Datetime format, Looker fix (#3365)

* Minor Fixes - UTC timezone for usage, tag method modification, looker fix

* Modified core version

* Snowflake Usage Result limit and Makefile ignore env fix
This commit is contained in:
Ayush Shah 2022-03-11 13:31:42 +05:30 committed by GitHub
parent 59867aaad7
commit 7c84f063e6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 46 additions and 38 deletions

View File

@ -37,7 +37,7 @@ lint: ## Run pylint on the Python sources to analyze the codebase
.PHONY: py_format .PHONY: py_format
py_format: ## Run black and isort to format the Python codebase py_format: ## Run black and isort to format the Python codebase
isort ingestion/ --skip $(PY_SOURCE)/metadata/generated --skip ingestion/build --profile black --multi-line 3 isort ingestion/ --skip $(PY_SOURCE)/metadata/generated --skip ingestion/env --skip ingestion/build --profile black --multi-line 3
black ingestion/ --extend-exclude $(PY_SOURCE)/metadata/generated black ingestion/ --extend-exclude $(PY_SOURCE)/metadata/generated
.PHONY: py_format_check .PHONY: py_format_check

View File

@ -51,12 +51,12 @@
"default": "select * from {}.{} limit 50" "default": "select * from {}.{} limit 50"
}, },
"warehouse": { "warehouse": {
"description": "Optional Warehouse.", "description": "Sample data extraction query.",
"type": "string", "type": "string",
"default": null "default": null
}, },
"account": { "account": {
"description": "Optional Account.", "description": "Sample data extraction query.",
"type": "string", "type": "string",
"default": null "default": null
}, },

View File

@ -7,5 +7,5 @@ Provides metadata version information.
from incremental import Version from incremental import Version
__version__ = Version("metadata", 0, 9, 0, dev=26) __version__ = Version("metadata", 0, 9, 0)
__all__ = ["__version__"] __all__ = ["__version__"]

View File

@ -7,7 +7,9 @@
"password": "strong_password", "password": "strong_password",
"database": "SNOWFLAKE_SAMPLE_DATA", "database": "SNOWFLAKE_SAMPLE_DATA",
"account": "account_name", "account": "account_name",
"warehouse": "COMPUTE_WH",
"service_name": "snowflake", "service_name": "snowflake",
"result_limit": 1000,
"duration": 2 "duration": 2
} }
}, },

View File

@ -23,7 +23,7 @@ def get_long_description():
base_requirements = { base_requirements = {
"openmetadata-ingestion-core==0.8.0", "openmetadata-ingestion-core==0.9.0",
"commonregex", "commonregex",
"idna<3,>=2.5", "idna<3,>=2.5",
"click>=7.1.1", "click>=7.1.1",

View File

@ -84,6 +84,7 @@ class LookerSource(Source[Entity]):
self.config = config self.config = config
self.metadata_config = metadata_config self.metadata_config = metadata_config
self.client = self.looker_client() self.client = self.looker_client()
self.status = LookerDashboardSourceStatus()
self.service = get_dashboard_service_or_create( self.service = get_dashboard_service_or_create(
service_name=config.service_name, service_name=config.service_name,
dashboard_service_type=DashboardServiceType.Looker.name, dashboard_service_type=DashboardServiceType.Looker.name,
@ -137,7 +138,7 @@ class LookerSource(Source[Entity]):
id=uuid.uuid4(), id=uuid.uuid4(),
name=dashboard_elements.id, name=dashboard_elements.id,
displayName=dashboard_elements.id, displayName=dashboard_elements.id,
description=dashboard_elements.title, description=dashboard_elements.title if dashboard_elements.title else "",
chart_type=dashboard_elements.type, chart_type=dashboard_elements.type,
url=self.config.url, url=self.config.url,
service=EntityReference(id=self.service.id, type="dashboardService"), service=EntityReference(id=self.service.id, type="dashboardService"),
@ -164,9 +165,8 @@ class LookerSource(Source[Entity]):
for iter_chart in dashboard.dashboard_elements: for iter_chart in dashboard.dashboard_elements:
chart = self._get_dashboard_elements(iter_chart) chart = self._get_dashboard_elements(iter_chart)
if chart: if chart:
charts.append(chart) charts.append(chart.name)
yield Dashboard( yield Dashboard(
id=uuid.uuid4(),
name=dashboard.id, name=dashboard.id,
displayName=dashboard.title, displayName=dashboard.title,
description="temp", description="temp",

View File

@ -64,7 +64,7 @@ class SampleUsageSource(Source[TableQuery]):
user_name="", user_name="",
starttime="", starttime="",
endtime="", endtime="",
analysis_date=datetime.today().strftime("%Y-%m-%d %H:%M:%S"), analysis_date=datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S"),
database="shopify", database="shopify",
aborted=False, aborted=False,
sql=row["query"], sql=row["query"],

View File

@ -36,6 +36,7 @@ class SnowflakeConfig(SQLConnectionConfig):
account: str account: str
database: str database: str
warehouse: str warehouse: str
result_limit: int = 1000
role: Optional[str] role: Optional[str]
duration: Optional[int] duration: Optional[int]
service_type = DatabaseServiceType.Snowflake.value service_type = DatabaseServiceType.Snowflake.value

View File

@ -11,7 +11,8 @@
""" """
Snowflake usage module Snowflake usage module
""" """
import logging
import traceback
from typing import Any, Dict, Iterable, Iterator, Union from typing import Any, Dict, Iterable, Iterator, Union
from metadata.generated.schema.entity.services.databaseService import ( from metadata.generated.schema.entity.services.databaseService import (
@ -30,6 +31,8 @@ from metadata.ingestion.source.sql_alchemy_helper import (
from metadata.utils.helpers import get_start_and_end, ingest_lineage from metadata.utils.helpers import get_start_and_end, ingest_lineage
from metadata.utils.sql_queries import SNOWFLAKE_SQL_STATEMENT from metadata.utils.sql_queries import SNOWFLAKE_SQL_STATEMENT
logger: logging.Logger = logging.getLogger(__name__)
class SnowflakeUsageSource(Source[TableQuery]): class SnowflakeUsageSource(Source[TableQuery]):
""" """
@ -77,7 +80,7 @@ class SnowflakeUsageSource(Source[TableQuery]):
self.analysis_date = start self.analysis_date = start
self.metadata_config = metadata_config self.metadata_config = metadata_config
self.sql_stmt = SnowflakeUsageSource.SQL_STATEMENT.format( self.sql_stmt = SnowflakeUsageSource.SQL_STATEMENT.format(
start_date=start, end_date=end start_date=start, end_date=end, result_limit=self.config.result_limit
) )
self.alchemy_helper = SQLAlchemyHelper( self.alchemy_helper = SQLAlchemyHelper(
config, metadata_config, ctx, "Snowflake", self.sql_stmt config, metadata_config, ctx, "Snowflake", self.sql_stmt
@ -111,29 +114,33 @@ class SnowflakeUsageSource(Source[TableQuery]):
:return: :return:
""" """
for row in self._get_raw_extract_iter(): for row in self._get_raw_extract_iter():
table_query = TableQuery( try:
query=row["query_type"], table_query = TableQuery(
user_name=row["user_name"], query=row["query_type"],
starttime=str(row["start_time"]), user_name=row["user_name"],
endtime=str(row["end_time"]), starttime=str(row["start_time"]),
analysis_date=self.analysis_date, endtime=str(row["end_time"]),
aborted="1969" in str(row["end_time"]), analysis_date=self.analysis_date,
database=row["database_name"], aborted="1969" in str(row["end_time"]),
sql=row["query_text"], database=row["database_name"],
service_name=self.config.service_name, sql=row["query_text"],
) service_name=self.config.service_name,
if row["schema_name"] is not None: )
self.report.scanned(f"{row['database_name']}.{row['schema_name']}") if row["schema_name"] is not None:
else: self.report.scanned(f"{row['database_name']}.{row['schema_name']}")
self.report.scanned(f"{row['database_name']}") else:
yield table_query self.report.scanned(f"{row['database_name']}")
query_info = { yield table_query
"sql": table_query.sql, query_info = {
"from_type": "table", "sql": table_query.sql,
"to_type": "table", "from_type": "table",
"service_name": self.config.service_name, "to_type": "table",
} "service_name": self.config.service_name,
ingest_lineage(query_info, self.metadata_config) }
ingest_lineage(query_info, self.metadata_config)
except Exception as err:
logger.debug(traceback.print_exc())
logger.debug(repr(err))
def get_report(self): def get_report(self):
""" """

View File

@ -625,7 +625,7 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
and "policy_tags" in column and "policy_tags" in column
and column["policy_tags"] and column["policy_tags"]
): ):
self.metadata.create_primary_tag_category( self.metadata.create_tag_category(
category=self.config.tag_category_name, category=self.config.tag_category_name,
data=Tag( data=Tag(
name=column["policy_tags"], description="" name=column["policy_tags"], description=""

View File

@ -22,7 +22,6 @@ REDSHIFT_SQL_STATEMENT = """
ORDER BY ss.endtime DESC; ORDER BY ss.endtime DESC;
""" """
REDSHIFT_GET_ALL_RELATION_INFO = """ REDSHIFT_GET_ALL_RELATION_INFO = """
SELECT SELECT
c.relkind, c.relkind,
@ -46,7 +45,6 @@ REDSHIFT_GET_ALL_RELATION_INFO = """
ORDER BY c.relkind, n.oid, n.nspname; ORDER BY c.relkind, n.oid, n.nspname;
""" """
REDSHIFT_GET_SCHEMA_COLUMN_INFO = """ REDSHIFT_GET_SCHEMA_COLUMN_INFO = """
SELECT SELECT
n.nspname as "schema", n.nspname as "schema",
@ -141,7 +139,7 @@ SNOWFLAKE_SQL_STATEMENT = """
schema_name,start_time,end_time schema_name,start_time,end_time
from table(information_schema.query_history( from table(information_schema.query_history(
end_time_range_start=>to_timestamp_ltz('{start_date}'), end_time_range_start=>to_timestamp_ltz('{start_date}'),
end_time_range_end=>to_timestamp_ltz('{end_date}'))); end_time_range_end=>to_timestamp_ltz('{end_date}'),RESULT_LIMIT=>{result_limit}));
""" """
NEO4J_AMUNDSEN_TABLE_QUERY = textwrap.dedent( NEO4J_AMUNDSEN_TABLE_QUERY = textwrap.dedent(