fix(ingestion): add logging, make job more resilient to errors (#4331)

This commit is contained in:
Aseem Bansal 2022-03-08 04:02:44 +05:30 committed by GitHub
parent 2903646a15
commit beb51ebf59
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 184 additions and 101 deletions

3
.gitignore vendored
View File

@ -59,4 +59,5 @@ smoke-test/spark-smoke-test/__pycache__/
metadata-ingestion/generated/**
# docs
docs/generated/
docs/generated/
tmp*

View File

@ -17,7 +17,7 @@ create or replace role datahub_role;
// Grant privileges to use and select from your target warehouses / dbs / schemas / tables
grant operate, usage on warehouse <your-warehouse> to role datahub_role;
grant usage on <your-database> to role datahub_role;
grant usage on DATABASE <your-database> to role datahub_role;
grant usage on all schemas in database <your-database> to role datahub_role;
grant select on all tables in database <your-database> to role datahub_role;
grant select on all external tables in database <your-database> to role datahub_role;
@ -27,6 +27,9 @@ grant select on all views in database <your-database> to role datahub_role;
grant usage on future schemas in database "<your-database>" to role datahub_role;
grant select on future tables in database "<your-database>" to role datahub_role;
// Grant privileges on snowflake default database - needed for lineage
grant imported privileges on DATABASE snowflake to role datahub_role;
// Create a new DataHub user and assign the DataHub role to it
create user datahub_user display_name = 'DataHub' password='' default_role = datahub_role default_warehouse = '<your-warehouse>';

View File

@ -136,12 +136,17 @@ class DatahubRestEmitter:
self._session.mount("http://", adapter)
self._session.mount("https://", adapter)
def test_connection(self) -> None:
def test_connection(self) -> str:
response = self._session.get(f"{self._gms_server}/config")
if response.status_code == 200:
config: dict = response.json()
if config.get("noCode") == "true":
return
return (
config.get("versions", {})
.get("linkedin/datahub", {})
.get("version", "")
)
else:
# Looks like we either connected to an old GMS or to some other service. Let's see if we can determine which before raising an error
# A common misconfiguration is connecting to datahub-frontend so we special-case this check

View File

@ -14,6 +14,7 @@ class SourceReport(Report):
warnings: Dict[str, List[str]] = field(default_factory=dict)
failures: Dict[str, List[str]] = field(default_factory=dict)
cli_version: str = ""
def report_workunit(self, wu: WorkUnit) -> None:
self.workunits_produced += 1

View File

@ -8,6 +8,7 @@ from typing import Any, Dict, Iterable, List, Optional
import click
from pydantic import validator
import datahub
from datahub.configuration.common import (
ConfigModel,
DynamicTypedConfig,
@ -178,6 +179,7 @@ class Pipeline:
callback = LoggingCallback()
extractor: Extractor = self.extractor_class()
self.source.get_report().cli_version = datahub.nice_version_name()
for wu in itertools.islice(
self.source.get_workunits(), 10 if self.preview_mode else None
):

View File

@ -24,17 +24,22 @@ class DatahubRestSinkConfig(DatahubClientConfig):
pass
@dataclass
class DataHubRestSinkReport(SinkReport):
gms_version: str = ""
@dataclass
class DatahubRestSink(Sink):
config: DatahubRestSinkConfig
emitter: DatahubRestEmitter
report: SinkReport
report: DataHubRestSinkReport
treat_errors_as_warnings: bool = False
def __init__(self, ctx: PipelineContext, config: DatahubRestSinkConfig):
super().__init__(ctx)
self.config = config
self.report = SinkReport()
self.report = DataHubRestSinkReport()
self.emitter = DatahubRestEmitter(
self.config.server,
self.config.token,
@ -45,7 +50,7 @@ class DatahubRestSink(Sink):
extra_headers=self.config.extra_headers,
ca_certificate_path=self.config.ca_certificate_path,
)
self.emitter.test_connection()
self.report.gms_version = self.emitter.test_connection()
self.executor = concurrent.futures.ThreadPoolExecutor(
max_workers=self.config.max_threads
)

View File

@ -364,9 +364,10 @@ class BigQuerySource(SQLAlchemySource):
)
self.lineage_metadata = self._create_lineage_map(parsed_entries)
except Exception as e:
logger.error(
"Error computing lineage information using GCP logs.",
e,
self.error(
logger,
"lineage-gcp-logs",
f"Error was {e}",
)
def _compute_bigquery_lineage_via_exported_bigquery_audit_metadata(
@ -385,9 +386,10 @@ class BigQuerySource(SQLAlchemySource):
)
self.lineage_metadata = self._create_lineage_map(parsed_entries)
except Exception as e:
logger.error(
"Error computing lineage information using exported GCP audit logs.",
e,
self.error(
logger,
"lineage-exported-gcp-audit-logs",
f"Error: {e}",
)
def _make_bigquery_client(

View File

@ -66,6 +66,10 @@ class SnowflakeReport(SQLSourceReport):
num_view_to_table_edges_scanned: int = 0
num_external_table_edges_scanned: int = 0
upstream_lineage: Dict[str, List[str]] = field(default_factory=dict)
# https://community.snowflake.com/s/topic/0TO0Z000000Unu5WAC/releases
saas_version: str = ""
role: str = ""
role_grants: List[str] = field(default_factory=list)
class BaseSnowflakeConfig(BaseTimeWindowConfig):
@ -203,27 +207,59 @@ class SnowflakeSource(SQLAlchemySource):
config = SnowflakeConfig.parse_obj(config_dict)
return cls(config, ctx)
def get_inspectors(self) -> Iterable[Inspector]:
url = self.config.get_sql_alchemy_url(database=None)
def get_metadata_engine(
self, database: Optional[str] = None
) -> sqlalchemy.engine.Engine:
url = self.config.get_sql_alchemy_url(database=database)
logger.debug(f"sql_alchemy_url={url}")
db_listing_engine = create_engine(
return create_engine(
url,
connect_args=self.config.get_sql_alchemy_connect_args(),
**self.config.options,
)
def inspect_version(self) -> Any:
db_engine = self.get_metadata_engine()
logger.info("Checking current version")
for db_row in db_engine.execute("select CURRENT_VERSION()"):
self.report.saas_version = db_row[0]
def inspect_role_grants(self) -> Any:
db_engine = self.get_metadata_engine()
cur_role = None
if self.config.role is None:
for db_row in db_engine.execute("select CURRENT_ROLE()"):
cur_role = db_row[0]
else:
cur_role = self.config.role
if cur_role is None:
return
self.report.role = cur_role
logger.info(f"Current role is {cur_role}")
if cur_role.lower() == "accountadmin":
return
logger.info(f"Checking grants for role {cur_role}")
for db_row in db_engine.execute(text(f"show grants to role {cur_role}")):
privilege = db_row["privilege"]
granted_on = db_row["granted_on"]
name = db_row["name"]
self.report.role_grants.append(
f"{privilege} granted on {granted_on} {name}"
)
def get_inspectors(self) -> Iterable[Inspector]:
db_listing_engine = self.get_metadata_engine(database=None)
for db_row in db_listing_engine.execute(text("SHOW DATABASES")):
db = db_row.name
if self.config.database_pattern.allowed(db):
# We create a separate engine for each database in order to ensure that
# they are isolated from each other.
self.current_database = db
engine = create_engine(
self.config.get_sql_alchemy_url(database=db),
connect_args=self.config.get_sql_alchemy_connect_args(),
**self.config.options,
)
engine = self.get_metadata_engine(database=db)
with engine.connect() as conn:
inspector = inspect(conn)
@ -273,9 +309,11 @@ WHERE
f"Upstream->View: Lineage[View(Down)={view_name}]:Upstream={view_upstream}"
)
except Exception as e:
logger.warning(
f"Extracting the upstream view lineage from Snowflake failed."
f"Please check your permissions. Continuing...\nError was {e}."
self.warn(
logger,
"view_upstream_lineage",
"Extracting the upstream view lineage from Snowflake failed."
+ f"Please check your permissions. Continuing...\nError was {e}.",
)
logger.info(f"A total of {num_edges} View upstream edges found.")
self.report.num_table_to_view_edges_scanned = num_edges
@ -387,9 +425,11 @@ WHERE
num_edges += 1
except Exception as e:
logger.warning(
self.warn(
logger,
"view_downstream_lineage",
f"Extracting the view lineage from Snowflake failed."
f"Please check your permissions. Continuing...\nError was {e}."
f"Please check your permissions. Continuing...\nError was {e}.",
)
logger.info(
f"Found {num_edges} View->Table edges. Removed {num_false_edges} false Table->Table edges."
@ -399,16 +439,12 @@ WHERE
def _populate_view_lineage(self) -> None:
if not self.config.include_view_lineage:
return
url = self.config.get_sql_alchemy_url()
logger.debug(f"sql_alchemy_url={url}")
engine = create_engine(url, **self.config.options)
engine = self.get_metadata_engine(database=None)
self._populate_view_upstream_lineage(engine)
self._populate_view_downstream_lineage(engine)
def _populate_external_lineage(self) -> None:
url = self.config.get_sql_alchemy_url()
logger.debug(f"sql_alchemy_url={url}")
engine = create_engine(url, **self.config.options)
engine = self.get_metadata_engine(database=None)
# Handles the case where a table is populated from an external location via copy.
# Eg: copy into category_english from 's3://acryl-snow-demo-olist/olist_raw_data/category_english'credentials=(aws_key_id='...' aws_secret_key='...') pattern='.*.csv';
query: str = """
@ -464,21 +500,17 @@ WHERE
)
num_edges += 1
except Exception as e:
logger.warning(
self.warn(
logger,
"external_lineage",
f"Populating external table lineage from Snowflake failed."
f"Please check your premissions. Continuing...\nError was {e}."
f"Please check your premissions. Continuing...\nError was {e}.",
)
logger.info(f"Found {num_edges} external lineage edges.")
self.report.num_external_table_edges_scanned = num_edges
def _populate_lineage(self) -> None:
url = self.config.get_sql_alchemy_url()
logger.debug(f"sql_alchemy_url={url}")
engine = create_engine(
url,
connect_args=self.config.get_sql_alchemy_connect_args(),
**self.config.options,
)
engine = self.get_metadata_engine(database=None)
query: str = """
WITH table_lineage_history AS (
SELECT
@ -521,9 +553,11 @@ QUALIFY ROW_NUMBER() OVER (PARTITION BY downstream_table_name, upstream_table_na
f"Lineage[Table(Down)={key}]:Table(Up)={self._lineage_map[key]}"
)
except Exception as e:
logger.warning(
self.warn(
logger,
"lineage",
f"Extracting lineage from Snowflake failed."
f"Please check your premissions. Continuing...\nError was {e}."
f"Please check your premissions. Continuing...\nError was {e}.",
)
logger.info(
f"A total of {num_edges} Table->Table edges found"
@ -611,6 +645,13 @@ QUALIFY ROW_NUMBER() OVER (PARTITION BY downstream_table_name, upstream_table_na
# Override the base class method.
def get_workunits(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit]]:
try:
self.inspect_version()
except Exception as e:
self.report.report_failure("version", f"Error: {e}")
return
self.inspect_role_grants()
for wu in super().get_workunits():
if (
self.config.include_table_lineage

View File

@ -441,6 +441,14 @@ class SQLAlchemySource(StatefulIngestionSourceBase):
},
)
def warn(self, log: logging.Logger, key: str, reason: str) -> Any:
self.report.report_warning(key, reason)
log.warning(reason)
def error(self, log: logging.Logger, key: str, reason: str) -> Any:
self.report.report_failure(key, reason)
log.error(reason)
def get_inspectors(self) -> Iterable[Inspector]:
# This method can be overridden in the case that you want to dynamically
# run on multiple databases.
@ -767,37 +775,42 @@ class SQLAlchemySource(StatefulIngestionSourceBase):
sql_config: SQLAlchemyConfig,
) -> Iterable[Union[SqlWorkUnit, MetadataWorkUnit]]:
tables_seen: Set[str] = set()
for table in inspector.get_table_names(schema):
schema, table = self.standardize_schema_table_names(
schema=schema, entity=table
)
dataset_name = self.get_identifier(
schema=schema, entity=table, inspector=inspector
)
dataset_name = self.normalise_dataset_name(dataset_name)
if dataset_name not in tables_seen:
tables_seen.add(dataset_name)
else:
logger.debug(f"{dataset_name} has already been seen, skipping...")
continue
self.report.report_entity_scanned(dataset_name, ent_type="table")
if not sql_config.table_pattern.allowed(dataset_name):
self.report.report_dropped(dataset_name)
continue
try:
yield from self._process_table(
dataset_name, inspector, schema, table, sql_config
try:
for table in inspector.get_table_names(schema):
schema, table = self.standardize_schema_table_names(
schema=schema, entity=table
)
except Exception as e:
logger.warning(
f"Unable to ingest {schema}.{table} due to an exception.\n {traceback.format_exc()}"
dataset_name = self.get_identifier(
schema=schema, entity=table, inspector=inspector
)
self.report.report_warning(f"{schema}.{table}", f"Ingestion error: {e}")
dataset_name = self.normalise_dataset_name(dataset_name)
if dataset_name not in tables_seen:
tables_seen.add(dataset_name)
else:
logger.debug(f"{dataset_name} has already been seen, skipping...")
continue
self.report.report_entity_scanned(dataset_name, ent_type="table")
if not sql_config.table_pattern.allowed(dataset_name):
self.report.report_dropped(dataset_name)
continue
try:
yield from self._process_table(
dataset_name, inspector, schema, table, sql_config
)
except Exception as e:
logger.warning(
f"Unable to ingest {schema}.{table} due to an exception.\n {traceback.format_exc()}"
)
self.report.report_warning(
f"{schema}.{table}", f"Ingestion error: {e}"
)
except Exception as e:
self.report.report_failure(f"{schema}", f"Tables error: {e}")
def _process_table(
self,
@ -979,34 +992,39 @@ class SQLAlchemySource(StatefulIngestionSourceBase):
schema: str,
sql_config: SQLAlchemyConfig,
) -> Iterable[Union[SqlWorkUnit, MetadataWorkUnit]]:
for view in inspector.get_view_names(schema):
schema, view = self.standardize_schema_table_names(
schema=schema, entity=view
)
dataset_name = self.get_identifier(
schema=schema, entity=view, inspector=inspector
)
dataset_name = self.normalise_dataset_name(dataset_name)
self.report.report_entity_scanned(dataset_name, ent_type="view")
if not sql_config.view_pattern.allowed(dataset_name):
self.report.report_dropped(dataset_name)
continue
try:
yield from self._process_view(
dataset_name=dataset_name,
inspector=inspector,
schema=schema,
view=view,
sql_config=sql_config,
try:
for view in inspector.get_view_names(schema):
schema, view = self.standardize_schema_table_names(
schema=schema, entity=view
)
except Exception as e:
logger.warning(
f"Unable to ingest view {schema}.{view} due to an exception.\n {traceback.format_exc()}"
dataset_name = self.get_identifier(
schema=schema, entity=view, inspector=inspector
)
self.report.report_warning(f"{schema}.{view}", f"Ingestion error: {e}")
dataset_name = self.normalise_dataset_name(dataset_name)
self.report.report_entity_scanned(dataset_name, ent_type="view")
if not sql_config.view_pattern.allowed(dataset_name):
self.report.report_dropped(dataset_name)
continue
try:
yield from self._process_view(
dataset_name=dataset_name,
inspector=inspector,
schema=schema,
view=view,
sql_config=sql_config,
)
except Exception as e:
logger.warning(
f"Unable to ingest view {schema}.{view} due to an exception.\n {traceback.format_exc()}"
)
self.report.report_warning(
f"{schema}.{view}", f"Ingestion error: {e}"
)
except Exception as e:
self.report.report_failure(f"{schema}", f"Views error: {e}")
def _process_view(
self,

View File

@ -235,11 +235,13 @@ class SnowflakeUsageSource(StatefulIngestionSourceBase):
and int(self.config.start_time.timestamp() * 1000)
<= last_successful_pipeline_run_end_time_millis
):
logger.info(
warn_msg = (
f"Skippig this run, since the last run's bucket duration end: "
f"{datetime.fromtimestamp(last_successful_pipeline_run_end_time_millis/1000, tz=timezone.utc)}"
f" is later than the current start_time: {self.config.start_time}"
)
logger.warning(warn_msg)
self.report.report_warning("skip-run", warn_msg)
return True
return False
@ -395,7 +397,10 @@ class SnowflakeUsageSource(StatefulIngestionSourceBase):
if not event_dict["email"] and self.config.email_domain:
if not event_dict["user_name"]:
logging.warning(
self.report.report_warning(
"user-name-miss", f"Missing in {event_dict}"
)
logger.warning(
f"The user_name is missing from {event_dict}. Skipping ...."
)
continue