mirror of
				https://github.com/datahub-project/datahub.git
				synced 2025-11-03 20:27:50 +00:00 
			
		
		
		
	fix(report): too long report causes MSG_SIZE_TOO_LARGE in kafka (#8857)
This commit is contained in:
		
							parent
							
								
									7f4395945e
								
							
						
					
					
						commit
						e254a50b50
					
				@ -367,12 +367,12 @@ class SQLAlchemySource(StatefulIngestionSourceBase):
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
    def warn(self, log: logging.Logger, key: str, reason: str) -> None:
 | 
			
		||||
        self.report.report_warning(key, reason)
 | 
			
		||||
        self.report.report_warning(key, reason[:100])
 | 
			
		||||
        log.warning(f"{key} => {reason}")
 | 
			
		||||
 | 
			
		||||
    def error(self, log: logging.Logger, key: str, reason: str) -> None:
 | 
			
		||||
        self.report.report_failure(key, reason)
 | 
			
		||||
        log.error(f"{key} => {reason}")
 | 
			
		||||
        self.report.report_failure(key, reason[:100])
 | 
			
		||||
        log.error(f"{key} => {reason}\n{traceback.format_exc()}")
 | 
			
		||||
 | 
			
		||||
    def get_inspectors(self) -> Iterable[Inspector]:
 | 
			
		||||
        # This method can be overridden in the case that you want to dynamically
 | 
			
		||||
@ -528,10 +528,8 @@ class SQLAlchemySource(StatefulIngestionSourceBase):
 | 
			
		||||
                try:
 | 
			
		||||
                    self.add_profile_metadata(inspector)
 | 
			
		||||
                except Exception as e:
 | 
			
		||||
                    logger.warning(
 | 
			
		||||
                        "Failed to get enrichment data for profiler", exc_info=True
 | 
			
		||||
                    )
 | 
			
		||||
                    self.report.report_warning(
 | 
			
		||||
                    self.warn(
 | 
			
		||||
                        logger,
 | 
			
		||||
                        "profile_metadata",
 | 
			
		||||
                        f"Failed to get enrichment data for profile {e}",
 | 
			
		||||
                    )
 | 
			
		||||
@ -638,14 +636,9 @@ class SQLAlchemySource(StatefulIngestionSourceBase):
 | 
			
		||||
                        dataset_name, inspector, schema, table, sql_config
 | 
			
		||||
                    )
 | 
			
		||||
                except Exception as e:
 | 
			
		||||
                    logger.warning(
 | 
			
		||||
                        f"Unable to ingest {schema}.{table} due to an exception.\n {traceback.format_exc()}"
 | 
			
		||||
                    )
 | 
			
		||||
                    self.report.report_warning(
 | 
			
		||||
                        f"{schema}.{table}", f"Ingestion error: {e}"
 | 
			
		||||
                    )
 | 
			
		||||
                    self.warn(logger, f"{schema}.{table}", f"Ingestion error: {e}")
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            self.report.report_failure(f"{schema}", f"Tables error: {e}")
 | 
			
		||||
            self.error(logger, f"{schema}", f"Tables error: {e}")
 | 
			
		||||
 | 
			
		||||
    def add_information_for_schema(self, inspector: Inspector, schema: str) -> None:
 | 
			
		||||
        pass
 | 
			
		||||
@ -806,9 +799,10 @@ class SQLAlchemySource(StatefulIngestionSourceBase):
 | 
			
		||||
        try:
 | 
			
		||||
            columns = inspector.get_columns(table, schema)
 | 
			
		||||
            if len(columns) == 0:
 | 
			
		||||
                self.report.report_warning(MISSING_COLUMN_INFO, dataset_name)
 | 
			
		||||
                self.warn(logger, MISSING_COLUMN_INFO, dataset_name)
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            self.report.report_warning(
 | 
			
		||||
            self.warn(
 | 
			
		||||
                logger,
 | 
			
		||||
                dataset_name,
 | 
			
		||||
                f"unable to get column information due to an error -> {e}",
 | 
			
		||||
            )
 | 
			
		||||
@ -903,14 +897,9 @@ class SQLAlchemySource(StatefulIngestionSourceBase):
 | 
			
		||||
                        sql_config=sql_config,
 | 
			
		||||
                    )
 | 
			
		||||
                except Exception as e:
 | 
			
		||||
                    logger.warning(
 | 
			
		||||
                        f"Unable to ingest view {schema}.{view} due to an exception.\n {traceback.format_exc()}"
 | 
			
		||||
                    )
 | 
			
		||||
                    self.report.report_warning(
 | 
			
		||||
                        f"{schema}.{view}", f"Ingestion error: {e}"
 | 
			
		||||
                    )
 | 
			
		||||
                    self.warn(logger, f"{schema}.{view}", f"Ingestion error: {e}")
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            self.report.report_failure(f"{schema}", f"Views error: {e}")
 | 
			
		||||
            self.error(logger, f"{schema}", f"Views error: {e}")
 | 
			
		||||
 | 
			
		||||
    def _process_view(
 | 
			
		||||
        self,
 | 
			
		||||
@ -924,9 +913,7 @@ class SQLAlchemySource(StatefulIngestionSourceBase):
 | 
			
		||||
            columns = inspector.get_columns(view, schema)
 | 
			
		||||
        except KeyError:
 | 
			
		||||
            # For certain types of views, we are unable to fetch the list of columns.
 | 
			
		||||
            self.report.report_warning(
 | 
			
		||||
                dataset_name, "unable to get schema for this view"
 | 
			
		||||
            )
 | 
			
		||||
            self.warn(logger, dataset_name, "unable to get schema for this view")
 | 
			
		||||
            schema_metadata = None
 | 
			
		||||
        else:
 | 
			
		||||
            schema_fields = self.get_schema_fields(dataset_name, columns)
 | 
			
		||||
@ -1112,7 +1099,8 @@ class SQLAlchemySource(StatefulIngestionSourceBase):
 | 
			
		||||
            if partition is None and self.is_table_partitioned(
 | 
			
		||||
                database=None, schema=schema, table=table
 | 
			
		||||
            ):
 | 
			
		||||
                self.report.report_warning(
 | 
			
		||||
                self.warn(
 | 
			
		||||
                    logger,
 | 
			
		||||
                    "profile skipped as partitioned table is empty or partition id was invalid",
 | 
			
		||||
                    dataset_name,
 | 
			
		||||
                )
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user