OpenMetadata/scripts/deploy-pipelines.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

263 lines
9.4 KiB
Python
Raw Permalink Normal View History

"""
Deploy Ingestion Pipelines Script
This script uses the OpenMetadata client to:
1. Paginate over ingestion pipelines in groups of 20
2. Fetch the IDs of those IngestionPipelines
3. Send bulk deploy requests to api/v1/services/ingestionPipelines/bulk/deploy
4. Track responses to monitor deployment success/failure
"""
import argparse
import json
import logging
import sys
from typing import Dict, List
from uuid import UUID
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
IngestionPipeline,
)
from metadata.generated.schema.entity.services.ingestionPipelines.pipelineServiceClientResponse import (
PipelineServiceClientResponse,
)
from metadata.generated.schema.security.client.openMetadataJWTClientConfig import (
OpenMetadataJWTClientConfig,
)
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.ometa.utils import model_str
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
class PipelineDeployer:
"""Class to handle bulk deployment of ingestion pipelines"""
def __init__(self, server_url: str, jwt_token: str):
"""
Initialize the PipelineDeployer with OpenMetadata connection
Args:
server_url: OpenMetadata server URL
jwt_token: JWT token for authentication
"""
# Remove trailing slash if present
self.server_url = server_url.rstrip('/')
# Configure OpenMetadata connection
server_config = OpenMetadataConnection(
hostPort=self.server_url,
authProvider="openmetadata",
securityConfig=OpenMetadataJWTClientConfig(jwtToken=jwt_token),
)
self.metadata = OpenMetadata(server_config)
logger.info(f"Connected to OpenMetadata server: {server_url}")
def bulk_deploy_pipelines(self, pipeline_ids: List[str]) -> List[PipelineServiceClientResponse]:
"""
Send bulk deploy request to OpenMetadata API
Args:
pipeline_ids: List of pipeline UUIDs to deploy
Returns:
List of PipelineServiceClientResponse objects
"""
if not pipeline_ids:
logger.warning("No pipeline IDs provided for deployment")
return []
logger.info(f"Deploying {len(pipeline_ids)} pipelines...")
try:
# Make POST request to bulk deploy endpoint
response = self.metadata.client.post(
"/services/ingestionPipelines/bulk/deploy",
data=json.dumps(pipeline_ids),
)
if response:
# Parse response data into Pydantic models
parsed_responses = [
PipelineServiceClientResponse.model_validate(item)
for item in response
]
logger.info(f"Bulk deploy request completed with {len(parsed_responses)} responses")
return parsed_responses
else:
logger.error("No response from bulk deploy API")
return []
except Exception as e:
logger.error(f"Error during bulk deployment: {e}")
raise
def analyze_deployment_results(
self,
responses: List[PipelineServiceClientResponse],
pipeline_ids: List[UUID]
) -> Dict[str, int]:
"""
Analyze deployment responses to track success/failure
Args:
responses: List of PipelineServiceClientResponse objects
pipeline_ids: List of pipeline IDs that were deployed (for correlation)
Returns:
Dictionary with deployment statistics
"""
stats = {
"total": len(responses),
"success": 0,
"failed": 0,
"unknown": 0
}
success_codes = {200, 201} # HTTP success codes
# Correlate responses with pipeline IDs (assuming same order)
for i, response in enumerate(responses):
pipeline_id = pipeline_ids[i] if i < len(pipeline_ids) else "unknown"
code = response.code
reason = response.reason or ""
platform = response.platform
version = response.version or "unknown"
if code in success_codes:
stats["success"] += 1
logger.info(f"✓ Pipeline {pipeline_id} deployed successfully on {platform} v{version} (code: {code})")
elif code:
stats["failed"] += 1
logger.warning(f"✗ Pipeline {pipeline_id} deployment failed on {platform} v{version} (code: {code}): {reason}")
else:
stats["unknown"] += 1
logger.warning(f"? Pipeline {pipeline_id} unknown deployment status on {platform}: {response.model_dump()}")
return stats
def deploy_all_pipelines(self, batch_size: int = 20) -> Dict[str, int]:
"""
Main method to deploy all ingestion pipelines
Args:
batch_size: Size of batches for pagination and deployment
Returns:
Dictionary with overall deployment statistics
"""
try:
# Fetch all pipelines
pipelines = list(self.metadata.list_all_entities(entity=IngestionPipeline, skip_on_failure=True))
if not pipelines:
logger.warning("No pipelines found to deploy")
return {"total": 0, "success": 0, "failed": 0, "unknown": 0}
logger.info("Found %d pipelines to deploy", len(pipelines))
# Extract pipeline IDs
pipeline_ids = [model_str(pipeline.id) for pipeline in pipelines]
if not pipeline_ids:
logger.error("No valid pipeline IDs found")
return {"total": 0, "success": 0, "failed": 0, "unknown": 0}
# Deploy pipelines in batches
all_responses = []
total_batches = (len(pipeline_ids) + batch_size - 1) // batch_size
for i in range(0, len(pipeline_ids), batch_size):
batch = pipeline_ids[i:i+batch_size]
batch_num = i//batch_size + 1
remaining_batches = total_batches - batch_num
logger.info(f"Deploying batch {batch_num}/{total_batches} with {len(batch)} pipelines... ({remaining_batches} chunks remaining)")
batch_responses = self.bulk_deploy_pipelines(batch)
all_responses.extend(batch_responses)
# Log parsed responses for this batch
logger.info(f"Batch {batch_num} responses:")
for j, response in enumerate(batch_responses):
pipeline_id = batch[j] if j < len(batch) else "unknown"
logger.info(f" Pipeline {pipeline_id}: code={response.code}, platform={response.platform}, reason={response.reason or 'N/A'}")
logger.info(f"Batch {batch_num} completed. Progress: {batch_num}/{total_batches} batches processed")
# Analyze results - correlate responses with the pipeline IDs we sent
stats = self.analyze_deployment_results(all_responses, pipeline_ids)
logger.info("=== Deployment Summary ===")
logger.info(f"Total pipelines: {stats['total']}")
logger.info(f"Successfully deployed: {stats['success']}")
logger.info(f"Failed deployments: {stats['failed']}")
logger.info(f"Unknown status: {stats['unknown']}")
return stats
except Exception as e:
logger.error(f"Deployment process failed: {e}")
raise
def main():
"""Main function to run the pipeline deployment script"""
parser = argparse.ArgumentParser(
description="Deploy all ingestion pipelines using OpenMetadata bulk deploy API"
)
parser.add_argument(
"--server-url",
required=True,
help="OpenMetadata server URL (e.g., http://localhost:8585/api/)"
)
parser.add_argument(
"--jwt-token",
required=True,
help="JWT token for authentication"
)
parser.add_argument(
"--batch-size",
type=int,
default=20,
help="Batch size for pagination and deployment (default: 20)"
)
parser.add_argument(
"--verbose",
"-v",
action="store_true",
help="Enable verbose logging"
)
args = parser.parse_args()
logging.getLogger().setLevel(logging.INFO)
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
try:
deployer = PipelineDeployer(args.server_url, args.jwt_token)
stats = deployer.deploy_all_pipelines(batch_size=args.batch_size)
# Exit with error code if any deployments failed
if stats["failed"] > 0:
sys.exit(1)
else:
sys.exit(0)
except Exception as e:
logger.error(f"Script failed: {e}")
sys.exit(1)
if __name__ == "__main__":
main()