Enhance Ingestion Framework: Add Drive Service support and improve logging for User Profiles (#22733)

This commit is contained in:
Ayush Shah 2025-08-05 18:17:55 +05:30 committed by GitHub
parent e7249d2027
commit c68ea8c83f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 22 additions and 6 deletions

View File

@ -77,6 +77,9 @@ from metadata.generated.schema.api.services.createDashboardService import (
from metadata.generated.schema.api.services.createDatabaseService import (
CreateDatabaseServiceRequest,
)
from metadata.generated.schema.api.services.createDriveService import (
CreateDriveServiceRequest,
)
from metadata.generated.schema.api.services.createMessagingService import (
CreateMessagingServiceRequest,
)
@ -160,6 +163,7 @@ from metadata.generated.schema.entity.services.connections.testConnectionDefinit
)
from metadata.generated.schema.entity.services.dashboardService import DashboardService
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.entity.services.driveService import DriveService
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
IngestionPipeline,
)
@ -302,4 +306,7 @@ ROUTES = {
# Data Contracts
DataContract.__name__: "/dataContracts",
CreateDataContractRequest.__name__: "/dataContracts",
# Drives
DriveService.__name__: "/services/driveServices",
CreateDriveServiceRequest.__name__: "/services/driveServices",
}

View File

@ -1064,13 +1064,12 @@ class SampleDataSource(
if directory_data.get("parent"):
parent_name = directory_data["parent"]
if parent_name in directory_refs:
directory_request.parent = directory_refs[parent_name]
directory_request.parent = FullyQualifiedEntityName(
root=directory_refs[parent_name]
)
else:
# For nested references like "Marketing.Campaigns_2024"
# Build parent FQN manually
parent_path = parent_name.replace(".", "/")
directory_request.parent = (
f"{self.drive_service.fullyQualifiedName.root}.{parent_path}"
directory_request.parent = FullyQualifiedEntityName(
f"{self.drive_service.fullyQualifiedName.root}.{parent_name}"
)
# Use direct API call instead of yielding since suffix mapping is missing

View File

@ -36,6 +36,7 @@ from metadata.ingestion.models.life_cycle import OMetaLifeCycleData
from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification
from metadata.ingestion.models.patch_request import PatchRequest
from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus
from metadata.ingestion.models.user import OMetaUserProfile
METADATA_LOGGER = "metadata"
BASE_LOGGING_FORMAT = (
@ -310,6 +311,15 @@ def _(record: DataContractResult) -> str:
return f"DataContractResult for [{record.dataContractFQN.root}]; status: {record.contractExecutionStatus.value}]"
@get_log_name.register
def _(record: OMetaUserProfile) -> str:
"""Get the log of the new entity"""
return (
f"User Profile: {get_log_name(record.user)},"
f"Teams: {record.teams if record.teams else 'None'}, \nRoles: {record.roles if record.roles else 'None'}"
)
def redacted_config(config: Dict[str, Union[str, dict]]) -> Dict[str, Union[str, dict]]:
config_copy = deepcopy(config)