mirror of
				https://github.com/open-metadata/OpenMetadata.git
				synced 2025-10-31 10:39:30 +00:00 
			
		
		
		
	 c742835766
			
		
	
	
		c742835766
		
			
		
	
	
	
	
		
			
			* Prepare the skeleton for generic app registration * Prepare the skeleton for generic app registration * Handle app runner * Prepare the skeleton for generic app registration * Prepare the skeleton for generic app registration * Allow deployment * Fix PII APP * Fix lint * Fix PII APP * Fix PII APP * Prepare config-based external apps * Prepare config-based external apps * Fix lint * Prepare config-based external apps * Fix DI errors * Amend comments
		
			
				
	
	
		
			74 lines
		
	
	
		
			2.0 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			74 lines
		
	
	
		
			2.0 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import logging
 | |
| from enum import Enum
 | |
| from logging.handlers import RotatingFileHandler
 | |
| from typing import Union
 | |
| 
 | |
| from airflow.configuration import conf
 | |
| 
 | |
| from metadata.generated.schema.metadataIngestion.application import (
 | |
|     OpenMetadataApplicationConfig,
 | |
| )
 | |
| from metadata.generated.schema.metadataIngestion.workflow import (
 | |
|     OpenMetadataWorkflowConfig,
 | |
| )
 | |
| from metadata.utils.logger import set_loggers_level
 | |
| 
 | |
| BASE_LOGGING_FORMAT = (
 | |
|     "[%(asctime)s] %(levelname)-8s {%(name)s:%(module)s:%(lineno)d} - %(message)s"
 | |
| )
 | |
| 
 | |
| 
 | |
| class Loggers(Enum):
 | |
|     API_ROUTES = "AirflowAPIRoutes"
 | |
|     API = "AirflowAPI"
 | |
|     OPERATIONS = "AirflowOperations"
 | |
|     WORKFLOW = "AirflowWorkflow"
 | |
|     UTILS = "AirflowUtils"
 | |
| 
 | |
| 
 | |
| def build_logger(logger_name: str) -> logging.Logger:
 | |
|     logger = logging.getLogger(logger_name)
 | |
|     log_format = logging.Formatter(BASE_LOGGING_FORMAT)
 | |
|     rotating_log_handler = RotatingFileHandler(
 | |
|         f"{conf.get('logging', 'base_log_folder', fallback='')}/openmetadata_airflow_api.log",
 | |
|         maxBytes=1000000,
 | |
|         backupCount=10,
 | |
|     )
 | |
|     rotating_log_handler.setFormatter(log_format)
 | |
|     logger.addHandler(rotating_log_handler)
 | |
|     # We keep the log level as DEBUG to have all the traces in case anything fails
 | |
|     # during a deployment of a DAG
 | |
|     logger.setLevel(logging.DEBUG)
 | |
|     return logger
 | |
| 
 | |
| 
 | |
| def routes_logger() -> logging.Logger:
 | |
|     return build_logger(Loggers.API_ROUTES.value)
 | |
| 
 | |
| 
 | |
| def api_logger():
 | |
|     return build_logger(Loggers.API.value)
 | |
| 
 | |
| 
 | |
| def operations_logger():
 | |
|     return build_logger(Loggers.OPERATIONS.value)
 | |
| 
 | |
| 
 | |
| def workflow_logger():
 | |
|     return build_logger(Loggers.WORKFLOW.value)
 | |
| 
 | |
| 
 | |
| def utils_logger():
 | |
|     return build_logger(Loggers.UTILS.value)
 | |
| 
 | |
| 
 | |
| def set_operator_logger(
 | |
|     workflow_config: Union[OpenMetadataWorkflowConfig, OpenMetadataApplicationConfig]
 | |
| ) -> None:
 | |
|     """
 | |
|     Handle logging for the Python Operator that
 | |
|     will execute the ingestion
 | |
|     """
 | |
|     logging.getLogger().setLevel(logging.WARNING)
 | |
|     set_loggers_level(workflow_config.workflowConfig.loggerLevel.value)
 |