mirror of
				https://github.com/open-metadata/OpenMetadata.git
				synced 2025-10-31 02:29:03 +00:00 
			
		
		
		
	
		
			
	
	
		
			279 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Markdown
		
	
	
	
	
	
		
		
			
		
	
	
			279 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Markdown
		
	
	
	
	
	
|   | --- | ||
|  | title: OpenLineage | ||
|  | slug: /connectors/pipeline/openlineage | ||
|  | --- | ||
|  | 
 | ||
|  | {% connectorDetailsHeader | ||
|  | name="OpenLineage" | ||
|  | stage="PROD" | ||
|  | platform="OpenMetadata" | ||
|  | availableFeatures=["Pipelines", "Lineage"] | ||
|  | unavailableFeatures=["Pipeline Status", "Owners", "Tags"] | ||
|  | / %} | ||
|  | 
 | ||
|  | In this section, we provide guides and references to use the OpenLineage connector. | ||
|  | 
 | ||
|  | Configure and schedule OpenLineage metadata workflows from the OpenMetadata UI: | ||
|  | 
 | ||
|  | - [Requirements](#requirements) | ||
|  | - [Metadata Ingestion](#metadata-ingestion) | ||
|  | 
 | ||
|  | {% partial file="/v1.5/connectors/ingestion-modes-tiles.md" variables={yamlPath: "/connectors/pipeline/openlineage/yaml"} /%} | ||
|  | 
 | ||
|  | ## Requirements
 | ||
|  | 
 | ||
|  | OpenMetadata is integrated with OpenLineage up to version 1.7.0 and will continue to work for future OpenLineage versions. | ||
|  | 
 | ||
|  | OpenLineage is an open framework for data lineage collection and analysis. At its core is an extensible specification that systems can use to interoperate with lineage metadata. | ||
|  | 
 | ||
|  | Apart from being a specification, it is also a set of integrations collecting lineage from various systems such as Apache Airflow and Spark. | ||
|  | 
 | ||
|  | ### Openlineage connector events
 | ||
|  | 
 | ||
|  | OpenLineage connector consumes open lineage events from kafka broker and translates it to OpenMetadata Lineage information. | ||
|  | 
 | ||
|  | 1. #### Airflow OpenLineage events | ||
|  | 
 | ||
|  | To Configure your Airflow instance  | ||
|  | 
 | ||
|  |     1. To Configure your Airflow instance install appropriate provider in [Airflow](https://airflow.apache.org/docs/apache-airflow-providers-openlineage/stable/index.html) | ||
|  |     2. Configure OpenLineage Provider in [Airflow](https://airflow.apache.org/docs/apache-airflow-providers-openlineage/stable/guides/user.html#using-openlineage-integration) | ||
|  |         - Remember to use `kafka` transport mode here as OL events are collected from kafka topic. Detailed list of configuration options for OpenLineage can be found [here](https://openlineage.io/docs/client/python/#configuration) | ||
|  | 
 | ||
|  | 2. #### Spark OpenLineage events | ||
|  | 
 | ||
|  |     Configure Spark Session to produce OpenLineage events compatible with OpenLineage connector available in OpenMetadata. complete the kafka config using the below code. | ||
|  |     ```shell | ||
|  |     from pyspark.sql import SparkSession | ||
|  |     from uuid import uuid4 | ||
|  | 
 | ||
|  |     spark = SparkSession.builder\ | ||
|  |     .config('spark.openlineage.namespace', 'mynamespace')\ | ||
|  |     .config('spark.openlineage.parentJobName', 'hello-world')\ | ||
|  |     .config('spark.openlineage.parentRunId', str(uuid4()))\ | ||
|  |     .config('spark.jars.packages', 'io.openlineage:openlineage-spark:1.7.0')\ | ||
|  |     .config('spark.extraListeners', 'io.openlineage.spark.agent.OpenLineageSparkListener')\ | ||
|  |     .config('spark.openlineage.transport.type', 'kafka')\ | ||
|  |     .getOrCreate() | ||
|  |     ``` | ||
|  | 
 | ||
|  | ## Metadata Ingestion
 | ||
|  | 
 | ||
|  | {% partial  | ||
|  |   file="/v1.5/connectors/metadata-ingestion-ui.md"  | ||
|  |   variables={ | ||
|  |     connector: "Openlineage",  | ||
|  |     selectServicePath: "/images/v1.3/connectors/openlineage/select-service.webp", | ||
|  |     addNewServicePath: "/images/v1.3/connectors/openlineage/add-new-service.webp", | ||
|  |     serviceConnectionPath: "/images/v1.3/connectors/openlineage/service-connection.webp", | ||
|  | }  | ||
|  | /%} | ||
|  | 
 | ||
|  | {% stepsContainer %} | ||
|  | {% extraContent parentTagNme="stepsContainer" %} | ||
|  | 
 | ||
|  | {% /extraContent %} | ||
|  | {% partial file="/v1.5/connectors/test-connection.md" /%} | ||
|  | {% partial file="/v1.5/connectors/pipeline/configure-ingestion.md" /%} | ||
|  | {% partial file="/v1.5/connectors/ingestion-schedule-and-deploy.md" /%} | ||
|  | {% /stepsContainer %} | ||
|  | 
 | ||
|  | 
 | ||
|  | ##### Providing connection details programmatically via API
 | ||
|  | ###### 1. Preparing the Client
 | ||
|  | 
 | ||
|  | ```python | ||
|  | from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( | ||
|  |     OpenMetadataConnection, | ||
|  | ) | ||
|  | from metadata.generated.schema.security.client.openMetadataJWTClientConfig import ( | ||
|  |     OpenMetadataJWTClientConfig, | ||
|  | ) | ||
|  | from metadata.ingestion.ometa.ometa_api import OpenMetadata | ||
|  | 
 | ||
|  | server_config = OpenMetadataConnection( | ||
|  |     hostPort="http://localhost:8585/api", | ||
|  |     authProvider="openmetadata", | ||
|  |     securityConfig=OpenMetadataJWTClientConfig( | ||
|  |         jwtToken="<token>" | ||
|  |     ), | ||
|  | ) | ||
|  | metadata = OpenMetadata(server_config) | ||
|  | 
 | ||
|  | assert metadata.health_check()  # Will fail if we cannot reach the server | ||
|  | ``` | ||
|  | ###### 2. Creating the OpenLineage Pipeline service
 | ||
|  | ```python | ||
|  | 
 | ||
|  | from metadata.generated.schema.api.services.createPipelineService import  CreatePipelineServiceRequest | ||
|  | from metadata.generated.schema.entity.services.pipelineService import ( | ||
|  |     PipelineServiceType, | ||
|  |     PipelineConnection, | ||
|  | ) | ||
|  | from metadata.generated.schema.entity.services.connections.pipeline.openLineageConnection import ( | ||
|  |     OpenLineageConnection, | ||
|  |     SecurityProtocol as KafkaSecurityProtocol, | ||
|  |     ConsumerOffsets | ||
|  | ) | ||
|  | from metadata.generated.schema.security.ssl.validateSSLClientConfig import ( | ||
|  |     ValidateSslClientConfig, | ||
|  | ) | ||
|  | 
 | ||
|  | openlineage_service_request = CreatePipelineServiceRequest( | ||
|  |     name='openlineage-service', | ||
|  |     displayName='OpenLineage Service', | ||
|  |     serviceType=PipelineServiceType.OpenLineage, | ||
|  |     connection=PipelineConnection( | ||
|  |         config=OpenLineageConnection( | ||
|  |             brokersUrl='broker1:9092,broker2:9092', | ||
|  |             topicName='openlineage-events', | ||
|  |             consumerGroupName='openmetadata-consumer', | ||
|  |             consumerOffsets=ConsumerOffsets.earliest, | ||
|  |             poolTimeout=3.0, | ||
|  |             sessionTimeout=60, | ||
|  |             securityProtocol=KafkaSecurityProtocol.SSL, | ||
|  |             # below ssl confing in optional and used only when securityProtocol=KafkaSecurityProtocol.SSL | ||
|  |             sslConfig=ValidateSslClientConfig( | ||
|  |                 sslCertificate='/path/to/kafka/certs/Certificate.pem', | ||
|  |                 sslKey='/path/to/kafka/certs/Key.pem', | ||
|  |                 caCertificate='/path/to/kafka/certs/RootCA.pem' | ||
|  |                 ) | ||
|  |         ) | ||
|  |     ), | ||
|  | ) | ||
|  | 
 | ||
|  | metadata.create_or_update(openlineage_service_request) | ||
|  | 
 | ||
|  | 
 | ||
|  | ``` | ||
|  | ###### 1. Preparing the Client
 | ||
|  | 
 | ||
|  | ```python | ||
|  | from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( | ||
|  |     OpenMetadataConnection, | ||
|  | ) | ||
|  | from metadata.generated.schema.security.client.openMetadataJWTClientConfig import ( | ||
|  |     OpenMetadataJWTClientConfig, | ||
|  | ) | ||
|  | from metadata.ingestion.ometa.ometa_api import OpenMetadata | ||
|  | 
 | ||
|  | server_config = OpenMetadataConnection( | ||
|  |     hostPort="http://localhost:8585/api", | ||
|  |     authProvider="openmetadata", | ||
|  |     securityConfig=OpenMetadataJWTClientConfig( | ||
|  |         jwtToken="<token>" | ||
|  |     ), | ||
|  | ) | ||
|  | metadata = OpenMetadata(server_config) | ||
|  | 
 | ||
|  | assert metadata.health_check()  # Will fail if we cannot reach the server | ||
|  | ``` | ||
|  | ###### 2. Creating the OpenLineage Pipeline service
 | ||
|  | ```python | ||
|  | 
 | ||
|  | from metadata.generated.schema.api.services.createPipelineService import  CreatePipelineServiceRequest | ||
|  | from metadata.generated.schema.entity.services.pipelineService import ( | ||
|  |     PipelineServiceType, | ||
|  |     PipelineConnection, | ||
|  | ) | ||
|  | from metadata.generated.schema.entity.services.connections.pipeline.openLineageConnection import ( | ||
|  |     OpenLineageConnection, | ||
|  |     SecurityProtocol as KafkaSecurityProtocol, | ||
|  |     ConsumerOffsets | ||
|  | ) | ||
|  | from metadata.generated.schema.security.ssl.validateSSLClientConfig import ( | ||
|  |     ValidateSslClientConfig, | ||
|  | ) | ||
|  | 
 | ||
|  | openlineage_service_request = CreatePipelineServiceRequest( | ||
|  |     name='openlineage-service', | ||
|  |     displayName='OpenLineage Service', | ||
|  |     serviceType=PipelineServiceType.OpenLineage, | ||
|  |     connection=PipelineConnection( | ||
|  |         config=OpenLineageConnection( | ||
|  |             brokersUrl='broker1:9092,broker2:9092', | ||
|  |             topicName='openlineage-events', | ||
|  |             consumerGroupName='openmetadata-consumer', | ||
|  |             consumerOffsets=ConsumerOffsets.earliest, | ||
|  |             poolTimeout=3.0, | ||
|  |             sessionTimeout=60, | ||
|  |             securityProtocol=KafkaSecurityProtocol.SSL, | ||
|  |             # below ssl confing in optional and used only when securityProtocol=KafkaSecurityProtocol.SSL | ||
|  |             sslConfig=ValidateSslClientConfig( | ||
|  |                 sslCertificate='/path/to/kafka/certs/Certificate.pem', | ||
|  |                 sslKey='/path/to/kafka/certs/Key.pem', | ||
|  |                 caCertificate='/path/to/kafka/certs/RootCA.pem' | ||
|  |                 ) | ||
|  |         ) | ||
|  |     ), | ||
|  | ) | ||
|  | 
 | ||
|  | metadata.create_or_update(openlineage_service_request) | ||
|  | 
 | ||
|  | 
 | ||
|  | ``` | ||
|  | ###### 1. Preparing the Client
 | ||
|  | 
 | ||
|  | ```python | ||
|  | from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( | ||
|  |     OpenMetadataConnection, | ||
|  | ) | ||
|  | from metadata.generated.schema.security.client.openMetadataJWTClientConfig import ( | ||
|  |     OpenMetadataJWTClientConfig, | ||
|  | ) | ||
|  | from metadata.ingestion.ometa.ometa_api import OpenMetadata | ||
|  | 
 | ||
|  | server_config = OpenMetadataConnection( | ||
|  |     hostPort="http://localhost:8585/api", | ||
|  |     authProvider="openmetadata", | ||
|  |     securityConfig=OpenMetadataJWTClientConfig( | ||
|  |         jwtToken="<token>" | ||
|  |     ), | ||
|  | ) | ||
|  | metadata = OpenMetadata(server_config) | ||
|  | 
 | ||
|  | assert metadata.health_check()  # Will fail if we cannot reach the server | ||
|  | ``` | ||
|  | ###### 2. Creating the OpenLineage Pipeline service
 | ||
|  | ```python | ||
|  | 
 | ||
|  | from metadata.generated.schema.api.services.createPipelineService import  CreatePipelineServiceRequest | ||
|  | from metadata.generated.schema.entity.services.pipelineService import ( | ||
|  |     PipelineServiceType, | ||
|  |     PipelineConnection, | ||
|  | ) | ||
|  | from metadata.generated.schema.entity.services.connections.pipeline.openLineageConnection import ( | ||
|  |     OpenLineageConnection, | ||
|  |     SecurityProtocol as KafkaSecurityProtocol, | ||
|  |     ConsumerOffsets | ||
|  | ) | ||
|  | 
 | ||
|  | 
 | ||
|  | openlineage_service_request = CreatePipelineServiceRequest( | ||
|  |     name='openlineage-service', | ||
|  |     displayName='OpenLineage Service', | ||
|  |     serviceType=PipelineServiceType.OpenLineage, | ||
|  |     connection=PipelineConnection( | ||
|  |         config=OpenLineageConnection( | ||
|  |             brokersUrl='broker1:9092,broker2:9092', | ||
|  |             topicName='openlineage-events', | ||
|  |             consumerGroupName='openmetadata-consumer', | ||
|  |             consumerOffsets=ConsumerOffsets.earliest, | ||
|  |             poolTimeout=3.0, | ||
|  |             sessionTimeout=60, | ||
|  |             securityProtocol=KafkaSecurityProtocol.SSL, | ||
|  |             # below ssl confing in optional and used only when securityProtocol=KafkaSecurityProtocol.SSL | ||
|  |             sslConfig=ValidateSslClientConfig( | ||
|  |                 sslCertificate='/path/to/kafka/certs/Certificate.pem', | ||
|  |                 sslKey='/path/to/kafka/certs/Key.pem', | ||
|  |                 caCertificate='/path/to/kafka/certs/RootCA.pem' | ||
|  |                 ) | ||
|  |         ) | ||
|  |     ), | ||
|  | ) | ||
|  | 
 | ||
|  | metadata.create_or_update(openlineage_service_request) | ||
|  | 
 | ||
|  | ``` | ||
|  | 
 | ||
|  | {% partial file="/v1.5/connectors/troubleshooting.md" /%} |