79 lines
3.4 KiB
Python
Raw Permalink Normal View History

2024-06-26 15:45:06 -04:00
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
import os
from pathlib import Path
2024-07-15 16:42:22 -07:00
from typing import List
2024-06-26 15:45:06 -04:00
from graphrag.callbacks.file_workflow_callbacks import FileWorkflowCallbacks
2025-01-17 01:55:04 -05:00
from graphrag.callbacks.workflow_callbacks import WorkflowCallbacks
from graphrag.callbacks.workflow_callbacks_manager import WorkflowCallbacksManager
2024-06-26 15:45:06 -04:00
2025-01-25 04:07:53 -05:00
from graphrag_app.logger.application_insights_workflow_callbacks import (
2024-06-26 15:45:06 -04:00
ApplicationInsightsWorkflowCallbacks,
)
2025-01-25 04:07:53 -05:00
from graphrag_app.logger.blob_workflow_callbacks import BlobWorkflowCallbacks
from graphrag_app.logger.console_workflow_callbacks import ConsoleWorkflowCallbacks
from graphrag_app.logger.typing import Logger
from graphrag_app.utils.azure_clients import AzureClientManager
2024-06-26 15:45:06 -04:00
2024-12-30 01:59:08 -05:00
def load_pipeline_logger(
2025-01-21 18:43:55 -05:00
logging_dir: str = "",
2024-07-15 16:42:22 -07:00
index_name: str = "",
num_workflow_steps: int = 0,
2024-06-26 15:45:06 -04:00
) -> WorkflowCallbacks:
2025-01-17 01:55:04 -05:00
"""Create and load a list of loggers.
2024-12-30 01:59:08 -05:00
2025-01-21 18:43:55 -05:00
This function creates loggers for two different scenarios. Loggers can be instantiated as generic loggers or associated with a specified indexing job.
1. When an indexing job is running, custom index-specific loggers are created to log the job activity
2. When the fastapi app is running, generic loggers are used to log the app's activities.
2024-07-15 16:42:22 -07:00
"""
2025-01-21 18:43:55 -05:00
loggers: List[Logger] = []
for logger_type in ["BLOB", "CONSOLE", "APP_INSIGHTS"]:
loggers.append(Logger[logger_type])
2024-12-30 01:59:08 -05:00
azure_client_manager = AzureClientManager()
callback_manager = WorkflowCallbacksManager()
for logger in loggers:
match logger:
case Logger.BLOB:
2024-07-15 16:42:22 -07:00
# create a dedicated container for logs
2025-01-21 18:43:55 -05:00
log_blob_name = "logs"
if logging_dir:
log_blob_name = os.path.join(logging_dir, log_blob_name)
2024-07-15 16:42:22 -07:00
# ensure the root directory exists; if not, create it
2024-12-30 01:59:08 -05:00
blob_service_client = azure_client_manager.get_blob_service_client()
2025-01-21 18:43:55 -05:00
container_root = Path(log_blob_name).parts[0]
2024-07-15 16:42:22 -07:00
if not blob_service_client.get_container_client(
container_root
).exists():
blob_service_client.create_container(container_root)
callback_manager.register(
2024-07-15 16:42:22 -07:00
BlobWorkflowCallbacks(
2024-12-30 01:59:08 -05:00
blob_service_client=blob_service_client,
2025-01-21 18:43:55 -05:00
container_name=log_blob_name,
2024-07-15 16:42:22 -07:00
index_name=index_name,
num_workflow_steps=num_workflow_steps,
)
2024-06-26 15:45:06 -04:00
)
case Logger.FILE:
callback_manager.register(FileWorkflowCallbacks(dir=logging_dir))
case Logger.APP_INSIGHTS:
if os.getenv("APPLICATIONINSIGHTS_CONNECTION_STRING"):
callback_manager.register(
2024-07-15 16:42:22 -07:00
ApplicationInsightsWorkflowCallbacks(
index_name=index_name,
num_workflow_steps=num_workflow_steps,
)
)
case Logger.CONSOLE:
callback_manager.register(
2024-12-30 01:59:08 -05:00
ConsoleWorkflowCallbacks(
index_name=index_name, num_workflow_steps=num_workflow_steps
)
)
2024-07-15 16:42:22 -07:00
case _:
print(f"WARNING: unknown logger type: {logger}. Skipping.")
return callback_manager