diff --git a/.gitignore b/.gitignore index 3c931eb8ea..aa814fcf5b 100644 --- a/.gitignore +++ b/.gitignore @@ -59,4 +59,5 @@ smoke-test/spark-smoke-test/__pycache__/ metadata-ingestion/generated/** # docs -docs/generated/ \ No newline at end of file +docs/generated/ +tmp* \ No newline at end of file diff --git a/metadata-ingestion/source_docs/snowflake.md b/metadata-ingestion/source_docs/snowflake.md index 9811d9a5e1..bc3387baa2 100644 --- a/metadata-ingestion/source_docs/snowflake.md +++ b/metadata-ingestion/source_docs/snowflake.md @@ -17,7 +17,7 @@ create or replace role datahub_role; // Grant privileges to use and select from your target warehouses / dbs / schemas / tables grant operate, usage on warehouse to role datahub_role; -grant usage on to role datahub_role; +grant usage on DATABASE to role datahub_role; grant usage on all schemas in database to role datahub_role; grant select on all tables in database to role datahub_role; grant select on all external tables in database to role datahub_role; @@ -27,6 +27,9 @@ grant select on all views in database to role datahub_role; grant usage on future schemas in database "" to role datahub_role; grant select on future tables in database "" to role datahub_role; +// Grant privileges on snowflake default database - needed for lineage +grant imported privileges on DATABASE snowflake to role datahub_role; + // Create a new DataHub user and assign the DataHub role to it create user datahub_user display_name = 'DataHub' password='' default_role = datahub_role default_warehouse = ''; diff --git a/metadata-ingestion/src/datahub/emitter/rest_emitter.py b/metadata-ingestion/src/datahub/emitter/rest_emitter.py index a35356e7c8..e9beb06b9a 100644 --- a/metadata-ingestion/src/datahub/emitter/rest_emitter.py +++ b/metadata-ingestion/src/datahub/emitter/rest_emitter.py @@ -136,12 +136,17 @@ class DatahubRestEmitter: self._session.mount("http://", adapter) self._session.mount("https://", adapter) - def test_connection(self) -> None: + def test_connection(self) -> str: response = self._session.get(f"{self._gms_server}/config") if response.status_code == 200: config: dict = response.json() if config.get("noCode") == "true": - return + return ( + config.get("versions", {}) + .get("linkedin/datahub", {}) + .get("version", "") + ) + else: # Looks like we either connected to an old GMS or to some other service. Let's see if we can determine which before raising an error # A common misconfiguration is connecting to datahub-frontend so we special-case this check diff --git a/metadata-ingestion/src/datahub/ingestion/api/source.py b/metadata-ingestion/src/datahub/ingestion/api/source.py index b39b12a125..dbb1d9981e 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/source.py +++ b/metadata-ingestion/src/datahub/ingestion/api/source.py @@ -14,6 +14,7 @@ class SourceReport(Report): warnings: Dict[str, List[str]] = field(default_factory=dict) failures: Dict[str, List[str]] = field(default_factory=dict) + cli_version: str = "" def report_workunit(self, wu: WorkUnit) -> None: self.workunits_produced += 1 diff --git a/metadata-ingestion/src/datahub/ingestion/run/pipeline.py b/metadata-ingestion/src/datahub/ingestion/run/pipeline.py index ea6c14a9ad..02b81e0e96 100644 --- a/metadata-ingestion/src/datahub/ingestion/run/pipeline.py +++ b/metadata-ingestion/src/datahub/ingestion/run/pipeline.py @@ -8,6 +8,7 @@ from typing import Any, Dict, Iterable, List, Optional import click from pydantic import validator +import datahub from datahub.configuration.common import ( ConfigModel, DynamicTypedConfig, @@ -178,6 +179,7 @@ class Pipeline: callback = LoggingCallback() extractor: Extractor = self.extractor_class() + self.source.get_report().cli_version = datahub.nice_version_name() for wu in itertools.islice( self.source.get_workunits(), 10 if self.preview_mode else None ): diff --git a/metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py b/metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py index 5b240ce335..7e028b7d5e 100644 --- a/metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py +++ b/metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py @@ -24,17 +24,22 @@ class DatahubRestSinkConfig(DatahubClientConfig): pass +@dataclass +class DataHubRestSinkReport(SinkReport): + gms_version: str = "" + + @dataclass class DatahubRestSink(Sink): config: DatahubRestSinkConfig emitter: DatahubRestEmitter - report: SinkReport + report: DataHubRestSinkReport treat_errors_as_warnings: bool = False def __init__(self, ctx: PipelineContext, config: DatahubRestSinkConfig): super().__init__(ctx) self.config = config - self.report = SinkReport() + self.report = DataHubRestSinkReport() self.emitter = DatahubRestEmitter( self.config.server, self.config.token, @@ -45,7 +50,7 @@ class DatahubRestSink(Sink): extra_headers=self.config.extra_headers, ca_certificate_path=self.config.ca_certificate_path, ) - self.emitter.test_connection() + self.report.gms_version = self.emitter.test_connection() self.executor = concurrent.futures.ThreadPoolExecutor( max_workers=self.config.max_threads ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py index effc41ebea..0ddf1df654 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py @@ -364,9 +364,10 @@ class BigQuerySource(SQLAlchemySource): ) self.lineage_metadata = self._create_lineage_map(parsed_entries) except Exception as e: - logger.error( - "Error computing lineage information using GCP logs.", - e, + self.error( + logger, + "lineage-gcp-logs", + f"Error was {e}", ) def _compute_bigquery_lineage_via_exported_bigquery_audit_metadata( @@ -385,9 +386,10 @@ class BigQuerySource(SQLAlchemySource): ) self.lineage_metadata = self._create_lineage_map(parsed_entries) except Exception as e: - logger.error( - "Error computing lineage information using exported GCP audit logs.", - e, + self.error( + logger, + "lineage-exported-gcp-audit-logs", + f"Error: {e}", ) def _make_bigquery_client( diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/snowflake.py b/metadata-ingestion/src/datahub/ingestion/source/sql/snowflake.py index f28fc26a8d..3e5299f06f 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/snowflake.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/snowflake.py @@ -66,6 +66,10 @@ class SnowflakeReport(SQLSourceReport): num_view_to_table_edges_scanned: int = 0 num_external_table_edges_scanned: int = 0 upstream_lineage: Dict[str, List[str]] = field(default_factory=dict) + # https://community.snowflake.com/s/topic/0TO0Z000000Unu5WAC/releases + saas_version: str = "" + role: str = "" + role_grants: List[str] = field(default_factory=list) class BaseSnowflakeConfig(BaseTimeWindowConfig): @@ -203,27 +207,59 @@ class SnowflakeSource(SQLAlchemySource): config = SnowflakeConfig.parse_obj(config_dict) return cls(config, ctx) - def get_inspectors(self) -> Iterable[Inspector]: - url = self.config.get_sql_alchemy_url(database=None) + def get_metadata_engine( + self, database: Optional[str] = None + ) -> sqlalchemy.engine.Engine: + url = self.config.get_sql_alchemy_url(database=database) logger.debug(f"sql_alchemy_url={url}") - - db_listing_engine = create_engine( + return create_engine( url, connect_args=self.config.get_sql_alchemy_connect_args(), **self.config.options, ) + def inspect_version(self) -> Any: + db_engine = self.get_metadata_engine() + logger.info("Checking current version") + for db_row in db_engine.execute("select CURRENT_VERSION()"): + self.report.saas_version = db_row[0] + + def inspect_role_grants(self) -> Any: + db_engine = self.get_metadata_engine() + cur_role = None + if self.config.role is None: + for db_row in db_engine.execute("select CURRENT_ROLE()"): + cur_role = db_row[0] + else: + cur_role = self.config.role + + if cur_role is None: + return + + self.report.role = cur_role + logger.info(f"Current role is {cur_role}") + if cur_role.lower() == "accountadmin": + return + + logger.info(f"Checking grants for role {cur_role}") + for db_row in db_engine.execute(text(f"show grants to role {cur_role}")): + privilege = db_row["privilege"] + granted_on = db_row["granted_on"] + name = db_row["name"] + self.report.role_grants.append( + f"{privilege} granted on {granted_on} {name}" + ) + + def get_inspectors(self) -> Iterable[Inspector]: + db_listing_engine = self.get_metadata_engine(database=None) + for db_row in db_listing_engine.execute(text("SHOW DATABASES")): db = db_row.name if self.config.database_pattern.allowed(db): # We create a separate engine for each database in order to ensure that # they are isolated from each other. self.current_database = db - engine = create_engine( - self.config.get_sql_alchemy_url(database=db), - connect_args=self.config.get_sql_alchemy_connect_args(), - **self.config.options, - ) + engine = self.get_metadata_engine(database=db) with engine.connect() as conn: inspector = inspect(conn) @@ -273,9 +309,11 @@ WHERE f"Upstream->View: Lineage[View(Down)={view_name}]:Upstream={view_upstream}" ) except Exception as e: - logger.warning( - f"Extracting the upstream view lineage from Snowflake failed." - f"Please check your permissions. Continuing...\nError was {e}." + self.warn( + logger, + "view_upstream_lineage", + "Extracting the upstream view lineage from Snowflake failed." + + f"Please check your permissions. Continuing...\nError was {e}.", ) logger.info(f"A total of {num_edges} View upstream edges found.") self.report.num_table_to_view_edges_scanned = num_edges @@ -387,9 +425,11 @@ WHERE num_edges += 1 except Exception as e: - logger.warning( + self.warn( + logger, + "view_downstream_lineage", f"Extracting the view lineage from Snowflake failed." - f"Please check your permissions. Continuing...\nError was {e}." + f"Please check your permissions. Continuing...\nError was {e}.", ) logger.info( f"Found {num_edges} View->Table edges. Removed {num_false_edges} false Table->Table edges." @@ -399,16 +439,12 @@ WHERE def _populate_view_lineage(self) -> None: if not self.config.include_view_lineage: return - url = self.config.get_sql_alchemy_url() - logger.debug(f"sql_alchemy_url={url}") - engine = create_engine(url, **self.config.options) + engine = self.get_metadata_engine(database=None) self._populate_view_upstream_lineage(engine) self._populate_view_downstream_lineage(engine) def _populate_external_lineage(self) -> None: - url = self.config.get_sql_alchemy_url() - logger.debug(f"sql_alchemy_url={url}") - engine = create_engine(url, **self.config.options) + engine = self.get_metadata_engine(database=None) # Handles the case where a table is populated from an external location via copy. # Eg: copy into category_english from 's3://acryl-snow-demo-olist/olist_raw_data/category_english'credentials=(aws_key_id='...' aws_secret_key='...') pattern='.*.csv'; query: str = """ @@ -464,21 +500,17 @@ WHERE ) num_edges += 1 except Exception as e: - logger.warning( + self.warn( + logger, + "external_lineage", f"Populating external table lineage from Snowflake failed." - f"Please check your premissions. Continuing...\nError was {e}." + f"Please check your premissions. Continuing...\nError was {e}.", ) logger.info(f"Found {num_edges} external lineage edges.") self.report.num_external_table_edges_scanned = num_edges def _populate_lineage(self) -> None: - url = self.config.get_sql_alchemy_url() - logger.debug(f"sql_alchemy_url={url}") - engine = create_engine( - url, - connect_args=self.config.get_sql_alchemy_connect_args(), - **self.config.options, - ) + engine = self.get_metadata_engine(database=None) query: str = """ WITH table_lineage_history AS ( SELECT @@ -521,9 +553,11 @@ QUALIFY ROW_NUMBER() OVER (PARTITION BY downstream_table_name, upstream_table_na f"Lineage[Table(Down)={key}]:Table(Up)={self._lineage_map[key]}" ) except Exception as e: - logger.warning( + self.warn( + logger, + "lineage", f"Extracting lineage from Snowflake failed." - f"Please check your premissions. Continuing...\nError was {e}." + f"Please check your premissions. Continuing...\nError was {e}.", ) logger.info( f"A total of {num_edges} Table->Table edges found" @@ -611,6 +645,13 @@ QUALIFY ROW_NUMBER() OVER (PARTITION BY downstream_table_name, upstream_table_na # Override the base class method. def get_workunits(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit]]: + try: + self.inspect_version() + except Exception as e: + self.report.report_failure("version", f"Error: {e}") + return + + self.inspect_role_grants() for wu in super().get_workunits(): if ( self.config.include_table_lineage diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py index 5d10181ea5..2ada238ae1 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py @@ -441,6 +441,14 @@ class SQLAlchemySource(StatefulIngestionSourceBase): }, ) + def warn(self, log: logging.Logger, key: str, reason: str) -> Any: + self.report.report_warning(key, reason) + log.warning(reason) + + def error(self, log: logging.Logger, key: str, reason: str) -> Any: + self.report.report_failure(key, reason) + log.error(reason) + def get_inspectors(self) -> Iterable[Inspector]: # This method can be overridden in the case that you want to dynamically # run on multiple databases. @@ -767,37 +775,42 @@ class SQLAlchemySource(StatefulIngestionSourceBase): sql_config: SQLAlchemyConfig, ) -> Iterable[Union[SqlWorkUnit, MetadataWorkUnit]]: tables_seen: Set[str] = set() - for table in inspector.get_table_names(schema): - schema, table = self.standardize_schema_table_names( - schema=schema, entity=table - ) - dataset_name = self.get_identifier( - schema=schema, entity=table, inspector=inspector - ) - - dataset_name = self.normalise_dataset_name(dataset_name) - - if dataset_name not in tables_seen: - tables_seen.add(dataset_name) - else: - logger.debug(f"{dataset_name} has already been seen, skipping...") - continue - - self.report.report_entity_scanned(dataset_name, ent_type="table") - - if not sql_config.table_pattern.allowed(dataset_name): - self.report.report_dropped(dataset_name) - continue - - try: - yield from self._process_table( - dataset_name, inspector, schema, table, sql_config + try: + for table in inspector.get_table_names(schema): + schema, table = self.standardize_schema_table_names( + schema=schema, entity=table ) - except Exception as e: - logger.warning( - f"Unable to ingest {schema}.{table} due to an exception.\n {traceback.format_exc()}" + dataset_name = self.get_identifier( + schema=schema, entity=table, inspector=inspector ) - self.report.report_warning(f"{schema}.{table}", f"Ingestion error: {e}") + + dataset_name = self.normalise_dataset_name(dataset_name) + + if dataset_name not in tables_seen: + tables_seen.add(dataset_name) + else: + logger.debug(f"{dataset_name} has already been seen, skipping...") + continue + + self.report.report_entity_scanned(dataset_name, ent_type="table") + + if not sql_config.table_pattern.allowed(dataset_name): + self.report.report_dropped(dataset_name) + continue + + try: + yield from self._process_table( + dataset_name, inspector, schema, table, sql_config + ) + except Exception as e: + logger.warning( + f"Unable to ingest {schema}.{table} due to an exception.\n {traceback.format_exc()}" + ) + self.report.report_warning( + f"{schema}.{table}", f"Ingestion error: {e}" + ) + except Exception as e: + self.report.report_failure(f"{schema}", f"Tables error: {e}") def _process_table( self, @@ -979,34 +992,39 @@ class SQLAlchemySource(StatefulIngestionSourceBase): schema: str, sql_config: SQLAlchemyConfig, ) -> Iterable[Union[SqlWorkUnit, MetadataWorkUnit]]: - for view in inspector.get_view_names(schema): - schema, view = self.standardize_schema_table_names( - schema=schema, entity=view - ) - dataset_name = self.get_identifier( - schema=schema, entity=view, inspector=inspector - ) - dataset_name = self.normalise_dataset_name(dataset_name) - - self.report.report_entity_scanned(dataset_name, ent_type="view") - - if not sql_config.view_pattern.allowed(dataset_name): - self.report.report_dropped(dataset_name) - continue - - try: - yield from self._process_view( - dataset_name=dataset_name, - inspector=inspector, - schema=schema, - view=view, - sql_config=sql_config, + try: + for view in inspector.get_view_names(schema): + schema, view = self.standardize_schema_table_names( + schema=schema, entity=view ) - except Exception as e: - logger.warning( - f"Unable to ingest view {schema}.{view} due to an exception.\n {traceback.format_exc()}" + dataset_name = self.get_identifier( + schema=schema, entity=view, inspector=inspector ) - self.report.report_warning(f"{schema}.{view}", f"Ingestion error: {e}") + dataset_name = self.normalise_dataset_name(dataset_name) + + self.report.report_entity_scanned(dataset_name, ent_type="view") + + if not sql_config.view_pattern.allowed(dataset_name): + self.report.report_dropped(dataset_name) + continue + + try: + yield from self._process_view( + dataset_name=dataset_name, + inspector=inspector, + schema=schema, + view=view, + sql_config=sql_config, + ) + except Exception as e: + logger.warning( + f"Unable to ingest view {schema}.{view} due to an exception.\n {traceback.format_exc()}" + ) + self.report.report_warning( + f"{schema}.{view}", f"Ingestion error: {e}" + ) + except Exception as e: + self.report.report_failure(f"{schema}", f"Views error: {e}") def _process_view( self, diff --git a/metadata-ingestion/src/datahub/ingestion/source/usage/snowflake_usage.py b/metadata-ingestion/src/datahub/ingestion/source/usage/snowflake_usage.py index ef3115d707..8b20e92876 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/usage/snowflake_usage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/usage/snowflake_usage.py @@ -235,11 +235,13 @@ class SnowflakeUsageSource(StatefulIngestionSourceBase): and int(self.config.start_time.timestamp() * 1000) <= last_successful_pipeline_run_end_time_millis ): - logger.info( + warn_msg = ( f"Skippig this run, since the last run's bucket duration end: " f"{datetime.fromtimestamp(last_successful_pipeline_run_end_time_millis/1000, tz=timezone.utc)}" f" is later than the current start_time: {self.config.start_time}" ) + logger.warning(warn_msg) + self.report.report_warning("skip-run", warn_msg) return True return False @@ -395,7 +397,10 @@ class SnowflakeUsageSource(StatefulIngestionSourceBase): if not event_dict["email"] and self.config.email_domain: if not event_dict["user_name"]: - logging.warning( + self.report.report_warning( + "user-name-miss", f"Missing in {event_dict}" + ) + logger.warning( f"The user_name is missing from {event_dict}. Skipping ...." ) continue