mirror of
				https://github.com/open-metadata/OpenMetadata.git
				synced 2025-11-04 12:36:23 +00:00 
			
		
		
		
	
		
			
	
	
		
			739 lines
		
	
	
		
			19 KiB
		
	
	
	
		
			Markdown
		
	
	
	
	
	
		
		
			
		
	
	
			739 lines
		
	
	
		
			19 KiB
		
	
	
	
		
			Markdown
		
	
	
	
	
	
| 
								 | 
							
								---
							 | 
						||
| 
								 | 
							
								title: Run Hive Connector using Airflow SDK
							 | 
						||
| 
								 | 
							
								slug: /connectors/database/hive/airflow
							 | 
						||
| 
								 | 
							
								---
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								# Run Hive using the Airflow SDK
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% multiTablesWrapper %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								| Feature            | Status                       |
							 | 
						||
| 
								 | 
							
								| :----------------- | :--------------------------- |
							 | 
						||
| 
								 | 
							
								| Stage              | PROD                         |
							 | 
						||
| 
								 | 
							
								| Metadata           | {% icon iconName="check" /%} |
							 | 
						||
| 
								 | 
							
								| Query Usage        | {% icon iconName="cross" /%} |
							 | 
						||
| 
								 | 
							
								| Data Profiler      | {% icon iconName="check" /%} |
							 | 
						||
| 
								 | 
							
								| Data Quality       | {% icon iconName="check" /%} |
							 | 
						||
| 
								 | 
							
								| Lineage            | Partially via Views          |
							 | 
						||
| 
								 | 
							
								| DBT                | {% icon iconName="cross" /%} |
							 | 
						||
| 
								 | 
							
								| Supported Versions | Hive >= 2.0                         |
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								| Feature      | Status                       |
							 | 
						||
| 
								 | 
							
								| :----------- | :--------------------------- |
							 | 
						||
| 
								 | 
							
								| Lineage      | Partially via Views          |
							 | 
						||
| 
								 | 
							
								| Table-level  | {% icon iconName="check" /%} |
							 | 
						||
| 
								 | 
							
								| Column-level | {% icon iconName="check" /%} |
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% /multiTablesWrapper %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								In this section, we provide guides and references to use the Hive connector.
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								Configure and schedule Hive metadata and profiler workflows from the OpenMetadata UI:
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								- [Requirements](#requirements)
							 | 
						||
| 
								 | 
							
								- [Metadata Ingestion](#metadata-ingestion)
							 | 
						||
| 
								 | 
							
								- [Data Profiler](#data-profiler)
							 | 
						||
| 
								 | 
							
								- [dbt Integration](#dbt-integration)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								## Requirements
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{%inlineCallout icon="description" bold="OpenMetadata 0.12 or later" href="/deployment"%}
							 | 
						||
| 
								 | 
							
								To deploy OpenMetadata, check the Deployment guides.
							 | 
						||
| 
								 | 
							
								{%/inlineCallout%}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								To run the Ingestion via the UI you'll need to use the OpenMetadata Ingestion Container, which comes shipped with
							 | 
						||
| 
								 | 
							
								custom Airflow plugins to handle the workflow deployment.
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								### Python Requirements
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								To run the Hive ingestion, you will need to install:
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								```bash
							 | 
						||
| 
								 | 
							
								pip3 install "openmetadata-ingestion[hive]"
							 | 
						||
| 
								 | 
							
								```
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								## Metadata Ingestion
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								All connectors are defined as JSON Schemas.
							 | 
						||
| 
								 | 
							
								[Here](https://github.com/open-metadata/OpenMetadata/blob/main/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/database/hiveConnection.json)
							 | 
						||
| 
								 | 
							
								you can find the structure to create a connection to Hive.
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								In order to create and run a Metadata Ingestion workflow, we will follow
							 | 
						||
| 
								 | 
							
								the steps to create a YAML configuration able to connect to the source,
							 | 
						||
| 
								 | 
							
								process the Entities if needed, and reach the OpenMetadata server.
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								The workflow is modeled around the following
							 | 
						||
| 
								 | 
							
								[JSON Schema](https://github.com/open-metadata/OpenMetadata/blob/main/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/workflow.json)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								### 1. Define the YAML Config
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								This is a sample config for Hive:
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% codePreview %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% codeInfoContainer %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								#### Source Configuration - Service Connection
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% codeInfo srNumber=1 %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								**username**: Specify the User to connect to Hive. It should have enough privileges to read all the metadata.
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% /codeInfo %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% codeInfo srNumber=2 %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								**password**: Password to connect to Hive.
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% /codeInfo %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% codeInfo srNumber=3 %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								**hostPort**: Enter the fully qualified hostname and port number for your Hive deployment in the Host and Port field.
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% /codeInfo %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% codeInfo srNumber=4 %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								**authOptions**: Enter the auth options string for hive connection.
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% /codeInfo %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								#### Source Configuration - Source Config
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% codeInfo srNumber=7 %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								The `sourceConfig` is defined [here](https://github.com/open-metadata/OpenMetadata/blob/main/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/databaseServiceMetadataPipeline.json):
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								**markDeletedTables**: To flag tables as soft-deleted if they are not present anymore in the source system.
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								**includeTables**: true or false, to ingest table data. Default is true.
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								**includeViews**: true or false, to ingest views definitions.
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								**databaseFilterPattern**, **schemaFilterPattern**, **tableFilternPattern**: Note that the they support regex as include or exclude. E.g.,
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% /codeInfo %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								#### Sink Configuration
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% codeInfo srNumber=8 %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								To send the metadata to OpenMetadata, it needs to be specified as `type: metadata-rest`.
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% /codeInfo %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								#### Workflow Configuration
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% codeInfo srNumber=9 %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								The main property here is the `openMetadataServerConfig`, where you can define the host and security provider of your OpenMetadata installation.
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								For a simple, local installation using our docker containers, this looks like:
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% /codeInfo %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								#### Advanced Configuration
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% codeInfo srNumber=5 %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								**Connection Options (Optional)**: Enter the details for any additional connection options that can be sent to Athena during the connection. These details must be added as Key-Value pairs.
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% /codeInfo %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% codeInfo srNumber=6 %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								**Connection Arguments (Optional)**: Enter the details for any additional connection arguments such as security or protocol configs that can be sent to Athena during the connection. These details must be added as Key-Value pairs.
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								- In case you are using Single-Sign-On (SSO) for authentication, add the `authenticator` details in the Connection Arguments as a Key-Value pair as follows: `"authenticator" : "sso_login_url"`
							 | 
						||
| 
								 | 
							
								- In case you authenticate with SSO using an external browser popup, then add the `authenticator` details in the Connection Arguments as a Key-Value pair as follows: `"authenticator" : "externalbrowser"`
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% /codeInfo %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% /codeInfoContainer %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% codeBlock fileName="filename.yaml" %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								```yaml
							 | 
						||
| 
								 | 
							
								source:
							 | 
						||
| 
								 | 
							
								  type: hive
							 | 
						||
| 
								 | 
							
								  serviceName: local_hive
							 | 
						||
| 
								 | 
							
								  serviceConnection:
							 | 
						||
| 
								 | 
							
								    config:
							 | 
						||
| 
								 | 
							
								      type: Hive
							 | 
						||
| 
								 | 
							
								```
							 | 
						||
| 
								 | 
							
								```yaml {% srNumber=1 %}
							 | 
						||
| 
								 | 
							
								      username: <username>
							 | 
						||
| 
								 | 
							
								```
							 | 
						||
| 
								 | 
							
								```yaml {% srNumber=2 %}
							 | 
						||
| 
								 | 
							
								      password: <password>
							 | 
						||
| 
								 | 
							
								```
							 | 
						||
| 
								 | 
							
								```yaml {% srNumber=3 %}
							 | 
						||
| 
								 | 
							
								      authOptions: <auth options>
							 | 
						||
| 
								 | 
							
								```
							 | 
						||
| 
								 | 
							
								```yaml {% srNumber=4 %}
							 | 
						||
| 
								 | 
							
								      hostPort: <hive connection host & port>
							 | 
						||
| 
								 | 
							
								```
							 | 
						||
| 
								 | 
							
								```yaml {% srNumber=5 %}
							 | 
						||
| 
								 | 
							
								      # connectionOptions:
							 | 
						||
| 
								 | 
							
								      #   key: value
							 | 
						||
| 
								 | 
							
								```
							 | 
						||
| 
								 | 
							
								```yaml {% srNumber=6 %}
							 | 
						||
| 
								 | 
							
								      # connectionArguments:
							 | 
						||
| 
								 | 
							
								      #   key: value
							 | 
						||
| 
								 | 
							
								```
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								```yaml {% srNumber=7 %}
							 | 
						||
| 
								 | 
							
								      sourceConfig:
							 | 
						||
| 
								 | 
							
								        config:
							 | 
						||
| 
								 | 
							
								          type: DatabaseMetadata
							 | 
						||
| 
								 | 
							
								          markDeletedTables: true
							 | 
						||
| 
								 | 
							
								          includeTables: true
							 | 
						||
| 
								 | 
							
								          includeViews: true
							 | 
						||
| 
								 | 
							
								          # includeTags: true
							 | 
						||
| 
								 | 
							
								          # databaseFilterPattern:
							 | 
						||
| 
								 | 
							
								          #   includes:
							 | 
						||
| 
								 | 
							
								          #     - database1
							 | 
						||
| 
								 | 
							
								          #     - database2
							 | 
						||
| 
								 | 
							
								          #   excludes:
							 | 
						||
| 
								 | 
							
								          #     - database3
							 | 
						||
| 
								 | 
							
								          #     - database4
							 | 
						||
| 
								 | 
							
								          # schemaFilterPattern:
							 | 
						||
| 
								 | 
							
								          #   includes:
							 | 
						||
| 
								 | 
							
								          #     - schema1
							 | 
						||
| 
								 | 
							
								          #     - schema2
							 | 
						||
| 
								 | 
							
								          #   excludes:
							 | 
						||
| 
								 | 
							
								          #     - schema3
							 | 
						||
| 
								 | 
							
								          #     - schema4
							 | 
						||
| 
								 | 
							
								          # tableFilterPattern:
							 | 
						||
| 
								 | 
							
								          #   includes:
							 | 
						||
| 
								 | 
							
								          #     - users
							 | 
						||
| 
								 | 
							
								          #     - type_test
							 | 
						||
| 
								 | 
							
								          #   excludes:
							 | 
						||
| 
								 | 
							
								          #     - table3
							 | 
						||
| 
								 | 
							
								          #     - table4
							 | 
						||
| 
								 | 
							
								```
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								```yaml {% srNumber=8 %}
							 | 
						||
| 
								 | 
							
								sink:
							 | 
						||
| 
								 | 
							
								  type: metadata-rest
							 | 
						||
| 
								 | 
							
								  config: {}
							 | 
						||
| 
								 | 
							
								```
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								```yaml {% srNumber=9 %}
							 | 
						||
| 
								 | 
							
								workflowConfig:
							 | 
						||
| 
								 | 
							
								  openMetadataServerConfig:
							 | 
						||
| 
								 | 
							
								    hostPort: "http://localhost:8585/api"
							 | 
						||
| 
								 | 
							
								    authProvider: openmetadata
							 | 
						||
| 
								 | 
							
								    securityConfig:
							 | 
						||
| 
								 | 
							
								      jwtToken: "{bot_jwt_token}"
							 | 
						||
| 
								 | 
							
								```
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% /codeBlock %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% /codePreview %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								### Workflow Configs for Security Provider
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								We support different security providers. You can find their definitions [here](https://github.com/open-metadata/OpenMetadata/tree/main/openmetadata-spec/src/main/resources/json/schema/security/client).
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								## Openmetadata JWT Auth
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								- JWT tokens will allow your clients to authenticate against the OpenMetadata server. To enable JWT Tokens, you will get more details [here](/deployment/security/enable-jwt-tokens).
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								```yaml
							 | 
						||
| 
								 | 
							
								workflowConfig:
							 | 
						||
| 
								 | 
							
								  openMetadataServerConfig:
							 | 
						||
| 
								 | 
							
								    hostPort: "http://localhost:8585/api"
							 | 
						||
| 
								 | 
							
								    authProvider: openmetadata
							 | 
						||
| 
								 | 
							
								    securityConfig:
							 | 
						||
| 
								 | 
							
								      jwtToken: "{bot_jwt_token}"
							 | 
						||
| 
								 | 
							
								```
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								- You can refer to the JWT Troubleshooting section [link](/deployment/security/jwt-troubleshooting) for any issues in your JWT configuration. If you need information on configuring the ingestion with other security providers in your bots, you can follow this doc [link](/deployment/security/workflow-config-auth).
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								### 2. Prepare the Ingestion DAG
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								Create a Python file in your Airflow DAGs directory with the following contents:
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% codePreview %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% codeInfoContainer %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% codeInfo srNumber=10 %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								#### Import necessary modules
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								The `Workflow` class that is being imported is a part of a metadata ingestion framework, which defines a process of getting data from different sources and ingesting it into a central metadata repository.
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								Here we are also importing all the basic requirements to parse YAMLs, handle dates and build our DAG.
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% /codeInfo %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% codeInfo srNumber=11 %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								**Default arguments for all tasks in the Airflow DAG.** 
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								- Default arguments dictionary contains default arguments for tasks in the DAG, including the owner's name, email address, number of retries, retry delay, and execution timeout.
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% /codeInfo %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% codeInfo srNumber=12 %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								- **config**: Specifies config for the metadata ingestion as we prepare above.
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% /codeInfo %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% codeInfo srNumber=13 %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								- **metadata_ingestion_workflow()**: This code defines a function `metadata_ingestion_workflow()` that loads a YAML configuration, creates a `Workflow` object, executes the workflow, checks its status, prints the status to the console, and stops the workflow.
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% /codeInfo %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% codeInfo srNumber=14 %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								- **DAG**: creates a DAG using the Airflow framework, and tune the DAG configurations to whatever fits with your requirements
							 | 
						||
| 
								 | 
							
								- For more Airflow DAGs creation details visit [here](https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/dags.html#declaring-a-dag).
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% /codeInfo %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								Note that from connector to connector, this recipe will always be the same.
							 | 
						||
| 
								 | 
							
								By updating the `YAML configuration`, you will be able to extract metadata from different sources.
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% /codeInfoContainer %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% codeBlock fileName="filename.py" %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								```python {% srNumber=10 %}
							 | 
						||
| 
								 | 
							
								import pathlib
							 | 
						||
| 
								 | 
							
								import yaml
							 | 
						||
| 
								 | 
							
								from datetime import timedelta
							 | 
						||
| 
								 | 
							
								from airflow import DAG
							 | 
						||
| 
								 | 
							
								from metadata.config.common import load_config_file
							 | 
						||
| 
								 | 
							
								from metadata.ingestion.api.workflow import Workflow
							 | 
						||
| 
								 | 
							
								from airflow.utils.dates import days_ago
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								try:
							 | 
						||
| 
								 | 
							
								    from airflow.operators.python import PythonOperator
							 | 
						||
| 
								 | 
							
								except ModuleNotFoundError:
							 | 
						||
| 
								 | 
							
								    from airflow.operators.python_operator import PythonOperator
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								```
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								```python {% srNumber=11 %}
							 | 
						||
| 
								 | 
							
								default_args = {
							 | 
						||
| 
								 | 
							
								    "owner": "user_name",
							 | 
						||
| 
								 | 
							
								    "email": ["username@org.com"],
							 | 
						||
| 
								 | 
							
								    "email_on_failure": False,
							 | 
						||
| 
								 | 
							
								    "retries": 3,
							 | 
						||
| 
								 | 
							
								    "retry_delay": timedelta(minutes=5),
							 | 
						||
| 
								 | 
							
								    "execution_timeout": timedelta(minutes=60)
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								```
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								```python {% srNumber=12 %}
							 | 
						||
| 
								 | 
							
								config = """
							 | 
						||
| 
								 | 
							
								<your YAML configuration>
							 | 
						||
| 
								 | 
							
								"""
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								```
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								```python {% srNumber=13 %}
							 | 
						||
| 
								 | 
							
								def metadata_ingestion_workflow():
							 | 
						||
| 
								 | 
							
								    workflow_config = yaml.safe_load(config)
							 | 
						||
| 
								 | 
							
								    workflow = Workflow.create(workflow_config)
							 | 
						||
| 
								 | 
							
								    workflow.execute()
							 | 
						||
| 
								 | 
							
								    workflow.raise_from_status()
							 | 
						||
| 
								 | 
							
								    workflow.print_status()
							 | 
						||
| 
								 | 
							
								    workflow.stop()
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								```
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								```python {% srNumber=14 %}
							 | 
						||
| 
								 | 
							
								with DAG(
							 | 
						||
| 
								 | 
							
								    "sample_data",
							 | 
						||
| 
								 | 
							
								    default_args=default_args,
							 | 
						||
| 
								 | 
							
								    description="An example DAG which runs a OpenMetadata ingestion workflow",
							 | 
						||
| 
								 | 
							
								    start_date=days_ago(1),
							 | 
						||
| 
								 | 
							
								    is_paused_upon_creation=False,
							 | 
						||
| 
								 | 
							
								    schedule_interval='*/5 * * * *',
							 | 
						||
| 
								 | 
							
								    catchup=False,
							 | 
						||
| 
								 | 
							
								) as dag:
							 | 
						||
| 
								 | 
							
								    ingest_task = PythonOperator(
							 | 
						||
| 
								 | 
							
								        task_id="ingest_using_recipe",
							 | 
						||
| 
								 | 
							
								        python_callable=metadata_ingestion_workflow,
							 | 
						||
| 
								 | 
							
								    )
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								```
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% /codeBlock %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% /codePreview %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								## Data Profiler
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								The Data Profiler workflow will be using the `orm-profiler` processor.
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								After running a Metadata Ingestion workflow, we can run Data Profiler workflow.
							 | 
						||
| 
								 | 
							
								While the `serviceName` will be the same to that was used in Metadata Ingestion, so the ingestion bot can get the `serviceConnection` details from the server.
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								### 1. Define the YAML Config
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								This is a sample config for the profiler:
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% codePreview %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% codeInfoContainer %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% codeInfo srNumber=15 %}
							 | 
						||
| 
								 | 
							
								#### Source Configuration - Source Config
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								You can find all the definitions and types for the  `sourceConfig` [here](https://github.com/open-metadata/OpenMetadata/blob/main/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/databaseServiceProfilerPipeline.json).
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								**generateSampleData**: Option to turn on/off generating sample data.
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% /codeInfo %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% codeInfo srNumber=16 %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								**profileSample**: Percentage of data or no. of rows we want to execute the profiler and tests on.
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% /codeInfo %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% codeInfo srNumber=17 %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								**threadCount**: Number of threads to use during metric computations.
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% /codeInfo %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% codeInfo srNumber=18 %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								**processPiiSensitive**: Optional configuration to automatically tag columns that might contain sensitive information.
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% /codeInfo %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% codeInfo srNumber=19 %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								**confidence**: Set the Confidence value for which you want the column to be marked
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% /codeInfo %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% codeInfo srNumber=20 %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								**timeoutSeconds**: Profiler Timeout in Seconds
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% /codeInfo %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% codeInfo srNumber=21 %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								**databaseFilterPattern**: Regex to only fetch databases that matches the pattern.
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% /codeInfo %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% codeInfo srNumber=22 %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								**schemaFilterPattern**: Regex to only fetch tables or databases that matches the pattern.
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% /codeInfo %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% codeInfo srNumber=23 %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								**tableFilterPattern**: Regex to only fetch tables or databases that matches the pattern.
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% /codeInfo %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% codeInfo srNumber=24 %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								#### Processor Configuration
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								Choose the `orm-profiler`. Its config can also be updated to define tests from the YAML itself instead of the UI:
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								**tableConfig**: `tableConfig` allows you to set up some configuration at the table level.
							 | 
						||
| 
								 | 
							
								{% /codeInfo %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% codeInfo srNumber=25 %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								#### Sink Configuration
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								To send the metadata to OpenMetadata, it needs to be specified as `type: metadata-rest`.
							 | 
						||
| 
								 | 
							
								{% /codeInfo %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% codeInfo srNumber=26 %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								#### Workflow Configuration
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								The main property here is the `openMetadataServerConfig`, where you can define the host and security provider of your OpenMetadata installation.
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								For a simple, local installation using our docker containers, this looks like:
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% /codeInfo %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% /codeInfoContainer %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% codeBlock fileName="filename.yaml" %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								```yaml
							 | 
						||
| 
								 | 
							
								source:
							 | 
						||
| 
								 | 
							
								  type: hive
							 | 
						||
| 
								 | 
							
								  serviceName: local_hive
							 | 
						||
| 
								 | 
							
								  sourceConfig:
							 | 
						||
| 
								 | 
							
								    config:
							 | 
						||
| 
								 | 
							
								      type: Profiler
							 | 
						||
| 
								 | 
							
								```
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								```yaml {% srNumber=15 %}
							 | 
						||
| 
								 | 
							
								      generateSampleData: true
							 | 
						||
| 
								 | 
							
								```
							 | 
						||
| 
								 | 
							
								```yaml {% srNumber=16 %}
							 | 
						||
| 
								 | 
							
								      # profileSample: 85
							 | 
						||
| 
								 | 
							
								```
							 | 
						||
| 
								 | 
							
								```yaml {% srNumber=17 %}
							 | 
						||
| 
								 | 
							
								      # threadCount: 5
							 | 
						||
| 
								 | 
							
								```
							 | 
						||
| 
								 | 
							
								```yaml {% srNumber=18 %}
							 | 
						||
| 
								 | 
							
								      processPiiSensitive: false
							 | 
						||
| 
								 | 
							
								```
							 | 
						||
| 
								 | 
							
								```yaml {% srNumber=19 %}
							 | 
						||
| 
								 | 
							
								      # confidence: 80
							 | 
						||
| 
								 | 
							
								```
							 | 
						||
| 
								 | 
							
								```yaml {% srNumber=20 %}
							 | 
						||
| 
								 | 
							
								      # timeoutSeconds: 43200
							 | 
						||
| 
								 | 
							
								```
							 | 
						||
| 
								 | 
							
								```yaml {% srNumber=21 %}
							 | 
						||
| 
								 | 
							
								      # databaseFilterPattern:
							 | 
						||
| 
								 | 
							
								      #   includes:
							 | 
						||
| 
								 | 
							
								      #     - database1
							 | 
						||
| 
								 | 
							
								      #     - database2
							 | 
						||
| 
								 | 
							
								      #   excludes:
							 | 
						||
| 
								 | 
							
								      #     - database3
							 | 
						||
| 
								 | 
							
								      #     - database4
							 | 
						||
| 
								 | 
							
								```
							 | 
						||
| 
								 | 
							
								```yaml {% srNumber=22 %}
							 | 
						||
| 
								 | 
							
								      # schemaFilterPattern:
							 | 
						||
| 
								 | 
							
								      #   includes:
							 | 
						||
| 
								 | 
							
								      #     - schema1
							 | 
						||
| 
								 | 
							
								      #     - schema2
							 | 
						||
| 
								 | 
							
								      #   excludes:
							 | 
						||
| 
								 | 
							
								      #     - schema3
							 | 
						||
| 
								 | 
							
								      #     - schema4
							 | 
						||
| 
								 | 
							
								```
							 | 
						||
| 
								 | 
							
								```yaml {% srNumber=23 %}
							 | 
						||
| 
								 | 
							
								      # tableFilterPattern:
							 | 
						||
| 
								 | 
							
								      #   includes:
							 | 
						||
| 
								 | 
							
								      #     - table1
							 | 
						||
| 
								 | 
							
								      #     - table2
							 | 
						||
| 
								 | 
							
								      #   excludes:
							 | 
						||
| 
								 | 
							
								      #     - table3
							 | 
						||
| 
								 | 
							
								      #     - table4
							 | 
						||
| 
								 | 
							
								```
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								```yaml {% srNumber=24 %}
							 | 
						||
| 
								 | 
							
								processor:
							 | 
						||
| 
								 | 
							
								  type: orm-profiler
							 | 
						||
| 
								 | 
							
								  config: {}  # Remove braces if adding properties
							 | 
						||
| 
								 | 
							
								    # tableConfig:
							 | 
						||
| 
								 | 
							
								    #   - fullyQualifiedName: <table fqn>
							 | 
						||
| 
								 | 
							
								    #     profileSample: <number between 0 and 99> # default 
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    #     profileSample: <number between 0 and 99> # default will be 100 if omitted
							 | 
						||
| 
								 | 
							
								    #     profileQuery: <query to use for sampling data for the profiler>
							 | 
						||
| 
								 | 
							
								    #     columnConfig:
							 | 
						||
| 
								 | 
							
								    #       excludeColumns:
							 | 
						||
| 
								 | 
							
								    #         - <column name>
							 | 
						||
| 
								 | 
							
								    #       includeColumns:
							 | 
						||
| 
								 | 
							
								    #         - columnName: <column name>
							 | 
						||
| 
								 | 
							
								    #         - metrics:
							 | 
						||
| 
								 | 
							
								    #           - MEAN
							 | 
						||
| 
								 | 
							
								    #           - MEDIAN
							 | 
						||
| 
								 | 
							
								    #           - ...
							 | 
						||
| 
								 | 
							
								    #     partitionConfig:
							 | 
						||
| 
								 | 
							
								    #       enablePartitioning: <set to true to use partitioning>
							 | 
						||
| 
								 | 
							
								    #       partitionColumnName: <partition column name. Must be a timestamp or datetime/date field type>
							 | 
						||
| 
								 | 
							
								    #       partitionInterval: <partition interval>
							 | 
						||
| 
								 | 
							
								    #       partitionIntervalUnit: <YEAR, MONTH, DAY, HOUR>
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								```
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								```yaml {% srNumber=25 %}
							 | 
						||
| 
								 | 
							
								sink:
							 | 
						||
| 
								 | 
							
								  type: metadata-rest
							 | 
						||
| 
								 | 
							
								  config: {}
							 | 
						||
| 
								 | 
							
								```
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								```yaml {% srNumber=26 %}
							 | 
						||
| 
								 | 
							
								workflowConfig:
							 | 
						||
| 
								 | 
							
								  # loggerLevel: DEBUG  # DEBUG, INFO, WARN or ERROR
							 | 
						||
| 
								 | 
							
								  openMetadataServerConfig:
							 | 
						||
| 
								 | 
							
								    hostPort: <OpenMetadata host and port>
							 | 
						||
| 
								 | 
							
								    authProvider: <OpenMetadata auth provider>
							 | 
						||
| 
								 | 
							
								```
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% /codeBlock %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% /codePreview %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								- You can learn more about how to configure and run the Profiler Workflow to extract Profiler data and execute the Data Quality from [here](/connectors/ingestion/workflows/profiler)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								### 2. Prepare the Profiler DAG
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								Here, we follow a similar approach as with the metadata and usage pipelines, although we will use a different Workflow class:
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% codePreview %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% codeInfoContainer %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% codeInfo srNumber=27 %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								#### Import necessary modules
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								The `ProfilerWorkflow` class that is being imported is a part of a metadata orm_profiler framework, which defines a process of extracting Profiler data. 
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								Here we are also importing all the basic requirements to parse YAMLs, handle dates and build our DAG.
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% /codeInfo %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% codeInfo srNumber=28 %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								**Default arguments for all tasks in the Airflow DAG.** 
							 | 
						||
| 
								 | 
							
								- Default arguments dictionary contains default arguments for tasks in the DAG, including the owner's name, email address, number of retries, retry delay, and execution timeout.
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% /codeInfo %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% codeInfo srNumber=29 %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								- **config**: Specifies config for the profiler as we prepare above.
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% /codeInfo %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% codeInfo srNumber=30 %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								- **metadata_ingestion_workflow()**: This code defines a function `metadata_ingestion_workflow()` that loads a YAML configuration, creates a `ProfilerWorkflow` object, executes the workflow, checks its status, prints the status to the console, and stops the workflow.
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% /codeInfo %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% codeInfo srNumber=31 %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								- **DAG**: creates a DAG using the Airflow framework, and tune the DAG configurations to whatever fits with your requirements
							 | 
						||
| 
								 | 
							
								- For more Airflow DAGs creation details visit [here](https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/dags.html#declaring-a-dag).
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% /codeInfo %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% /codeInfoContainer %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% codeBlock fileName="filename.py" %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								```python {% srNumber=27 %}
							 | 
						||
| 
								 | 
							
								import yaml
							 | 
						||
| 
								 | 
							
								from datetime import timedelta
							 | 
						||
| 
								 | 
							
								from airflow import DAG
							 | 
						||
| 
								 | 
							
								from metadata.orm_profiler.api.workflow import ProfilerWorkflow
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								try:
							 | 
						||
| 
								 | 
							
								   from airflow.operators.python import PythonOperator
							 | 
						||
| 
								 | 
							
								except ModuleNotFoundError:
							 | 
						||
| 
								 | 
							
								   from airflow.operators.python_operator import PythonOperator
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								from airflow.utils.dates import days_ago
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								```
							 | 
						||
| 
								 | 
							
								```python {% srNumber=28 %}
							 | 
						||
| 
								 | 
							
								default_args = {
							 | 
						||
| 
								 | 
							
								   "owner": "user_name",
							 | 
						||
| 
								 | 
							
								   "email_on_failure": False,
							 | 
						||
| 
								 | 
							
								   "retries": 3,
							 | 
						||
| 
								 | 
							
								   "retry_delay": timedelta(seconds=10),
							 | 
						||
| 
								 | 
							
								   "execution_timeout": timedelta(minutes=60),
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								```
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								```python {% srNumber=29 %}
							 | 
						||
| 
								 | 
							
								config = """
							 | 
						||
| 
								 | 
							
								<your YAML configuration>
							 | 
						||
| 
								 | 
							
								"""
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								```
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								```python {% srNumber=30 %}
							 | 
						||
| 
								 | 
							
								def metadata_ingestion_workflow():
							 | 
						||
| 
								 | 
							
								   workflow_config = yaml.safe_load(config)
							 | 
						||
| 
								 | 
							
								   workflow = ProfilerWorkflow.create(workflow_config)
							 | 
						||
| 
								 | 
							
								   workflow.execute()
							 | 
						||
| 
								 | 
							
								   workflow.raise_from_status()
							 | 
						||
| 
								 | 
							
								   workflow.print_status()
							 | 
						||
| 
								 | 
							
								   workflow.stop()
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								```
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								```python {% srNumber=31 %}
							 | 
						||
| 
								 | 
							
								with DAG(
							 | 
						||
| 
								 | 
							
								   "profiler_example",
							 | 
						||
| 
								 | 
							
								   default_args=default_args,
							 | 
						||
| 
								 | 
							
								   description="An example DAG which runs a OpenMetadata ingestion workflow",
							 | 
						||
| 
								 | 
							
								   start_date=days_ago(1),
							 | 
						||
| 
								 | 
							
								   is_paused_upon_creation=False,
							 | 
						||
| 
								 | 
							
								   catchup=False,
							 | 
						||
| 
								 | 
							
								) as dag:
							 | 
						||
| 
								 | 
							
								   ingest_task = PythonOperator(
							 | 
						||
| 
								 | 
							
								       task_id="profile_and_test_using_recipe",
							 | 
						||
| 
								 | 
							
								       python_callable=metadata_ingestion_workflow,
							 | 
						||
| 
								 | 
							
								   )
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								```
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% /codeBlock %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% /codePreview %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								## dbt Integration
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% tilesContainer %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% tile
							 | 
						||
| 
								 | 
							
								  icon="mediation"
							 | 
						||
| 
								 | 
							
								  title="dbt Integration"
							 | 
						||
| 
								 | 
							
								  description="Learn more about how to ingest dbt models' definitions and their lineage."
							 | 
						||
| 
								 | 
							
								  link="/connectors/ingestion/workflows/dbt" /%}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% /tilesContainer %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								## Related
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% tilesContainer %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% tile
							 | 
						||
| 
								 | 
							
								    title="Ingest with the CLI"
							 | 
						||
| 
								 | 
							
								    description="Run a one-time ingestion using the metadata CLI"
							 | 
						||
| 
								 | 
							
								    link="/connectors/database/hive/cli"
							 | 
						||
| 
								 | 
							
								  / %}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								{% /tilesContainer %}
							 |