2022-06-27 15:14:04 +02:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
								---
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								title: Run Postgres Connector using Airflow SDK
							 
						 
					
						
							
								
									
										
										
										
											2022-07-09 11:56:37 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								slug: /openmetadata/connectors/database/postgres/airflow
							 
						 
					
						
							
								
									
										
										
										
											2022-06-27 15:14:04 +02:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
								---
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2022-08-27 02:57:09 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								# Run Postgres using the Airflow SDK
  
						 
					
						
							
								
									
										
										
										
											2022-06-27 15:14:04 +02:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2022-08-27 02:57:09 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								In this section, we provide guides and references to use the Postgres connector.
							 
						 
					
						
							
								
									
										
										
										
											2022-06-27 15:14:04 +02:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2022-08-27 02:57:09 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								Configure and schedule Postgres metadata and profiler workflows from the OpenMetadata UI:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								-  [Requirements ](#requirements ) 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								-  [Metadata Ingestion ](#metadata-ingestion ) 
						 
					
						
							
								
									
										
										
										
											2022-09-03 18:19:55 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								-  [Data Profiler ](#data-profiler ) 
						 
					
						
							
								
									
										
										
										
											2022-08-27 02:57:09 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								-  [DBT Integration ](#dbt-integration ) 
						 
					
						
							
								
									
										
										
										
											2022-08-18 11:43:45 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2022-08-27 02:57:09 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								## Requirements
  
						 
					
						
							
								
									
										
										
										
											2022-06-27 15:14:04 +02:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2022-08-27 02:57:09 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								< InlineCallout  color = "violet-70"  icon = "description"  bold = "OpenMetadata 0.12 or later"  href = "/deployment" >  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								To deploy OpenMetadata, check the < a  href = "/deployment" > Deployment< / a >  guides.
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								< / InlineCallout >  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								To run the Ingestion via the UI you'll need to use the OpenMetadata Ingestion Container, which comes shipped with
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								custom Airflow plugins to handle the workflow deployment.
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								### Python Requirements
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								To run the Postgres ingestion, you will need to install:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								```bash
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								pip3 install "openmetadata-ingestion[postgres]"
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								```
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								## Metadata Ingestion
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								All connectors are defined as JSON Schemas.
							 
						 
					
						
							
								
									
										
										
										
											2022-09-14 23:14:02 -07:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								[Here ](https://github.com/open-metadata/OpenMetadata/blob/main/openmetadata-service/src/main/resources/json/schema/entity/services/connections/database/postgresConnection.json )
							 
						 
					
						
							
								
									
										
										
										
											2022-08-27 02:57:09 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								you can find the structure to create a connection to Postgres.
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								In order to create and run a Metadata Ingestion workflow, we will follow
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								the steps to create a YAML configuration able to connect to the source,
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								process the Entities if needed, and reach the OpenMetadata server.
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								The workflow is modeled around the following
							 
						 
					
						
							
								
									
										
										
										
											2022-09-14 23:14:02 -07:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								[JSON Schema ](https://github.com/open-metadata/OpenMetadata/blob/main/openmetadata-service/src/main/resources/json/schema/metadataIngestion/workflow.json )
							 
						 
					
						
							
								
									
										
										
										
											2022-08-27 02:57:09 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								### 1. Define the YAML Config
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								This is a sample config for Postgres:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								```yaml
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								source:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  type: postgres
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  serviceName: local_postgres
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  serviceConnection:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    config:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      type: Postgres
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      username: username
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      password: password
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      hostPort: localhost:5432
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      # database: database
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  sourceConfig:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    config:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      markDeletedTables: true
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      includeTables: true
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      includeViews: true
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      # includeTags: true
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      # databaseFilterPattern:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      #   includes:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      #     - database1
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      #     - database2
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      #   excludes:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      #     - database3
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      #     - database4
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      # schemaFilterPattern:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      #   includes:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      #     - schema1
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      #     - schema2
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      #   excludes:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      #     - schema3
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      #     - schema4
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      # tableFilterPattern:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      #   includes:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      #     - table1
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      #     - table2
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      #   excludes:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      #     - table3
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      #     - table4
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      # For DBT, choose one of Cloud, Local, HTTP, S3 or GCS configurations
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      # dbtConfigSource:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      # # For cloud
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      #   dbtCloudAuthToken: token
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      #   dbtCloudAccountId: ID
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      # # For Local
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      #   dbtCatalogFilePath: path-to-catalog.json
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      #   dbtManifestFilePath: path-to-manifest.json
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      # # For HTTP
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      #   dbtCatalogHttpPath: http://path-to-catalog.json
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      #   dbtManifestHttpPath: http://path-to-manifest.json
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      # # For S3
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      #   dbtSecurityConfig:  # These are modeled after all AWS credentials
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      #     awsAccessKeyId: KEY
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      #     awsSecretAccessKey: SECRET
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      #     awsRegion: us-east-2
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      #   dbtPrefixConfig:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      #     dbtBucketName: bucket
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      #     dbtObjectPrefix: "dbt/"
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      # # For GCS
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      #   dbtSecurityConfig:  # These are modeled after all GCS credentials
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      #     type: My Type
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      #     projectId: project ID
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      #     privateKeyId: us-east-2
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      #     privateKey: |
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      #      -----BEGIN PRIVATE KEY-----
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      #      Super secret key
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      #      -----END PRIVATE KEY-----
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      #     clientEmail: client@mail .com
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      #     clientId: 1234
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      #     authUri: https://accounts.google.com/o/oauth2/auth (default)
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      #     tokenUri: https://oauth2.googleapis.com/token (default)
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      #     authProviderX509CertUrl: https://www.googleapis.com/oauth2/v1/certs (default)
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      #     clientX509CertUrl: https://cert.url (URI)
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      #   dbtPrefixConfig:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      #     dbtBucketName: bucket
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      #     dbtObjectPrefix: "dbt/"
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								sink:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  type: metadata-rest
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  config: {}
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								workflowConfig:
							 
						 
					
						
							
								
									
										
										
										
											2022-08-31 15:11:11 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								  # loggerLevel: DEBUG  # DEBUG, INFO, WARN or ERROR
							 
						 
					
						
							
								
									
										
										
										
											2022-08-27 02:57:09 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								  openMetadataServerConfig:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    hostPort: < OpenMetadata  host  and  port > 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    authProvider: < OpenMetadata  auth  provider > 2. Configure service settings
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								```
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								#### Source Configuration - Service Connection
  
						 
					
						
							
								
									
										
										
										
											2022-06-27 15:14:04 +02:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								-  **username**: Specify the User to connect to Postgres. It should have enough privileges to read all the metadata. 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								-  **password**: Password to connect to Postgres. 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								-  **hostPort**: Enter the fully qualified hostname and port number for your Postgres deployment in the Host and Port field. 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								-  **Connection Options (Optional)**: Enter the details for any additional connection options that can be sent to Postgres during the connection. These details must be added as Key-Value pairs. 
						 
					
						
							
								
									
										
										
										
											2022-08-27 02:57:09 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								-  **Connection Arguments (Optional)**: Enter the details for any additional connection arguments such as security or protocol configs that can be sent to Postgres during the connection. These details must be added as Key-Value pairs. 
						 
					
						
							
								
									
										
										
										
											2022-06-27 15:14:04 +02:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
								  -  In case you are using Single-Sign-On (SSO) for authentication, add the `authenticator`  details in the Connection Arguments as a Key-Value pair as follows: `"authenticator" : "sso_login_url"` 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  -  In case you authenticate with SSO using an external browser popup, then add the `authenticator`  details in the Connection Arguments as a Key-Value pair as follows: `"authenticator" : "externalbrowser"` 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2022-08-27 02:57:09 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								#### Source Configuration - Source Config
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2022-09-14 23:14:02 -07:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								The `sourceConfig`  is defined [here ](https://github.com/open-metadata/OpenMetadata/blob/main/openmetadata-service/src/main/resources/json/schema/metadataIngestion/databaseServiceMetadataPipeline.json ):
							 
						 
					
						
							
								
									
										
										
										
											2022-08-27 02:57:09 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								-  `markDeletedTables` : To flag tables as soft-deleted if they are not present anymore in the source system. 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								-  `includeTables` : true or false, to ingest table data. Default is true. 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								-  `includeViews` : true or false, to ingest views definitions. 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								-  `databaseFilterPattern` , `schemaFilterPattern` , `tableFilternPattern` : Note that the they support regex as include or exclude. E.g., 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								```yaml
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								tableFilterPattern:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  includes:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    -  users
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    -  type_test
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								```
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								#### Sink Configuration
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								To send the metadata to OpenMetadata, it needs to be specified as `type: metadata-rest` .
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								#### Workflow Configuration
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								The main property here is the `openMetadataServerConfig` , where you can define the host and security provider of your OpenMetadata installation.
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								For a simple, local installation using our docker containers, this looks like:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								```yaml
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								workflowConfig:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  openMetadataServerConfig:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    hostPort: http://localhost:8585/api
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    authProvider: no-auth
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								```
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2022-09-14 23:14:02 -07:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								We support different security providers. You can find their definitions [here ](https://github.com/open-metadata/OpenMetadata/tree/main/openmetadata-service/src/main/resources/json/schema/security/client ).
							 
						 
					
						
							
								
									
										
										
										
											2022-08-27 02:57:09 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								You can find the different implementation of the ingestion below.
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								< Collapse  title = "Configure SSO in the Ingestion Workflows" >  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								### Auth0 SSO
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								```yaml
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								workflowConfig:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  openMetadataServerConfig:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    hostPort: 'http://localhost:8585/api'
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    authProvider: auth0
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    securityConfig:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      clientId: '{your_client_id}'
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      secretKey: '{your_client_secret}'
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      domain: '{your_domain}'
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								```
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								### Azure SSO
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								```yaml
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								workflowConfig:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  openMetadataServerConfig:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    hostPort: 'http://localhost:8585/api'
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    authProvider: azure
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    securityConfig:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      clientSecret: '{your_client_secret}'
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      authority: '{your_authority_url}'
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      clientId: '{your_client_id}'
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      scopes:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        -  your_scopes
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								```
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								### Custom OIDC SSO
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								```yaml
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								workflowConfig:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  openMetadataServerConfig:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    hostPort: 'http://localhost:8585/api'
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    authProvider: custom-oidc
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    securityConfig:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      clientId: '{your_client_id}'
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      secretKey: '{your_client_secret}'
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      domain: '{your_domain}'
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								```
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								### Google SSO
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								```yaml
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								workflowConfig:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  openMetadataServerConfig:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    hostPort: 'http://localhost:8585/api'
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    authProvider: google
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    securityConfig:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      secretKey: '{path-to-json-creds}'
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								```
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								### Okta SSO
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								```yaml
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								workflowConfig:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  openMetadataServerConfig:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    hostPort: http://localhost:8585/api
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    authProvider: okta
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    securityConfig:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      clientId: "{CLIENT_ID - SPA APP}"
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      orgURL: "{ISSUER_URL}/v1/token"
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      privateKey: "{public/private keypair}"
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      email: "{email}"
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      scopes:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        -  token
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								```
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								### Amazon Cognito SSO
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								The ingestion can be configured by [Enabling JWT Tokens ](https://docs.open-metadata.org/deployment/security/enable-jwt-tokens )
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								```yaml
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								workflowConfig:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  openMetadataServerConfig:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    hostPort: 'http://localhost:8585/api'
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    authProvider: auth0
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    securityConfig:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      clientId: '{your_client_id}'
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      secretKey: '{your_client_secret}'
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      domain: '{your_domain}'
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								```
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								### OneLogin SSO
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								Which uses Custom OIDC for the ingestion
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								```yaml
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								workflowConfig:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  openMetadataServerConfig:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    hostPort: 'http://localhost:8585/api'
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    authProvider: custom-oidc
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    securityConfig:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      clientId: '{your_client_id}'
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      secretKey: '{your_client_secret}'
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      domain: '{your_domain}'
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								```
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								### KeyCloak SSO
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								Which uses Custom OIDC for the ingestion
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								```yaml
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								workflowConfig:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  openMetadataServerConfig:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    hostPort: 'http://localhost:8585/api'
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    authProvider: custom-oidc
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    securityConfig:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      clientId: '{your_client_id}'
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      secretKey: '{your_client_secret}'
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      domain: '{your_domain}'
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								```
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								< / Collapse >  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								### 2. Prepare the Ingestion DAG
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								Create a Python file in your Airflow DAGs directory with the following contents:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								```python
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import pathlib
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import yaml
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from datetime import timedelta
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from airflow import DAG
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								try:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    from airflow.operators.python import PythonOperator
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								except ModuleNotFoundError:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    from airflow.operators.python_operator import PythonOperator
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from metadata.config.common import load_config_file
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from metadata.ingestion.api.workflow import Workflow
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from airflow.utils.dates import days_ago
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								default_args = {
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    "owner": "user_name",
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    "email": ["username@org .com"],
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    "email_on_failure": False,
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    "retries": 3,
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    "retry_delay": timedelta(minutes=5),
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    "execution_timeout": timedelta(minutes=60)
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								}
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								config = """
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								< your  YAML  configuration >  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								"""
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								def metadata_ingestion_workflow():
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    workflow_config = yaml.safe_load(config)
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    workflow = Workflow.create(workflow_config)
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    workflow.execute()
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    workflow.raise_from_status()
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    workflow.print_status()
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    workflow.stop()
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								with DAG(
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    "sample_data",
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    default_args=default_args,
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    description="An example DAG which runs a OpenMetadata ingestion workflow",
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    start_date=days_ago(1),
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    is_paused_upon_creation=False,
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    schedule_interval='*/5 * *  * * ',
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    catchup=False,
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								) as dag:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    ingest_task = PythonOperator(
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        task_id="ingest_using_recipe",
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        python_callable=metadata_ingestion_workflow,
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    )
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								```
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								Note that from connector to connector, this recipe will always be the same.
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								By updating the YAML configuration, you will be able to extract metadata from different sources.
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2022-09-03 18:19:55 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								## Data Profiler
  
						 
					
						
							
								
									
										
										
										
											2022-08-27 02:57:09 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								The Data Profiler workflow will be using the `orm-profiler`  processor.
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								While the `serviceConnection`  will still be the same to reach the source system, the `sourceConfig`  will be
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								updated from previous configurations.
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								### 1. Define the YAML Config
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								This is a sample config for the profiler:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								```yaml
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								source:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  type: postgres
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  serviceName: local_postgres
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  serviceConnection:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    config:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      type: Postgres
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      username: username
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      password: password
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      hostPort: localhost:5432
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      # database: database
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  sourceConfig:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    config:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      type: Profiler
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      # generateSampleData: true
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      # profileSample: 85
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      # threadCount: 5 (default)
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      # databaseFilterPattern:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      #   includes:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      #     - database1
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      #     - database2
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      #   excludes:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      #     - database3
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      #     - database4
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      # schemaFilterPattern:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      #   includes:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      #     - schema1
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      #     - schema2
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      #   excludes:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      #     - schema3
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      #     - schema4
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      # tableFilterPattern:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      #   includes:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      #     - table1
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      #     - table2
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      #   excludes:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      #     - table3
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      #     - table4
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								processor:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  type: orm-profiler
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  config: {}  # Remove braces if adding properties
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  # tableConfig:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  #   - fullyQualifiedName: < table  fqn > 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  #     profileSample: < number  between  0  and  99 > 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  #     columnConfig:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  #       profileQuery: < query  to  use  for  sampling  data  for  the  profiler > 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  #       excludeColumns:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  #         - < column  name > 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  #       includeColumns:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  #         - columnName: < column  name > 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  #         - metrics:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  #           - MEAN
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  #           - MEDIAN
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  #           - ...
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								sink:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  type: metadata-rest
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  config: {}
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								workflowConfig:
							 
						 
					
						
							
								
									
										
										
										
											2022-08-31 15:11:11 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								  # loggerLevel: DEBUG  # DEBUG, INFO, WARN or ERROR
							 
						 
					
						
							
								
									
										
										
										
											2022-08-27 02:57:09 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								  openMetadataServerConfig:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    hostPort: < OpenMetadata  host  and  port > 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    authProvider: < OpenMetadata  auth  provider > 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								```
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								#### Source Configuration
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2022-09-14 23:14:02 -07:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								-  You can find all the definitions and types for the `serviceConnection`  [here ](https://github.com/open-metadata/OpenMetadata/blob/main/openmetadata-service/src/main/resources/json/schema/entity/services/connections/database/postgresConnection.json ). 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								-  The `sourceConfig`  is defined [here ](https://github.com/open-metadata/OpenMetadata/blob/main/openmetadata-service/src/main/resources/json/schema/metadataIngestion/databaseServiceProfilerPipeline.json ). 
						 
					
						
							
								
									
										
										
										
											2022-08-27 02:57:09 +02:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								Note that the filter patterns support regex as includes or excludes. E.g.,
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								```yaml
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								tableFilterPattern:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  includes:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  -  *users$
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								```
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								#### Processor
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								Choose the `orm-profiler` . Its config can also be updated to define tests from the YAML itself instead of the UI:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								```yaml
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								processor:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  type: orm-profiler
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  config:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    tableConfig:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      -  fullyQualifiedName: < table  fqn > 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        profileSample: < number  between  0  and  99 > 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        columnConfig:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          partitionConfig:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            partitionField: < field  to  use  as  a  partition  field > 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            partitionQueryDuration: < for  date / datetime  partitioning  based  set  the  offset  from  today > 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            partitionValues: < values  to  uses  as  a  predicate  for  the  query > 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          profileQuery: < query  to  use  for  sampling  data  for  the  profiler > 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          excludeColumns:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            -  < column  name > 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          includeColumns:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            -  columnName: < column  name > 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            -  metrics:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								                -  MEAN
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								                -  MEDIAN
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								                -  ...
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								```
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								`tableConfig`  allows you to set up some configuration at the table level. 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								All the properties are optional. `metrics`  should be one of the metrics listed [here ](https://docs.open-metadata.org/openmetadata/ingestion/workflows/profiler/metrics )
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								#### Workflow Configuration
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								The same as the metadata ingestion.
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								### 2. Prepare the Profiler DAG
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								Here, we follow a similar approach as with the metadata and usage pipelines, although we will use a different Workflow class:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								```python
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import yaml
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from datetime import timedelta
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from airflow import DAG
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								try:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								   from airflow.operators.python import PythonOperator
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								except ModuleNotFoundError:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								   from airflow.operators.python_operator import PythonOperator
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from airflow.utils.dates import days_ago
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from metadata.orm_profiler.api.workflow import ProfilerWorkflow
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								default_args = {
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								   "owner": "user_name",
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								   "email_on_failure": False,
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								   "retries": 3,
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								   "retry_delay": timedelta(seconds=10),
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								   "execution_timeout": timedelta(minutes=60),
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								}
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								config = """
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								< your  YAML  configuration >  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								"""
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								def metadata_ingestion_workflow():
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								   workflow_config = yaml.safe_load(config)
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								   workflow = ProfilerWorkflow.create(workflow_config)
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								   workflow.execute()
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								   workflow.raise_from_status()
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								   workflow.print_status()
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								   workflow.stop()
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								with DAG(
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								   "profiler_example",
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								   default_args=default_args,
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								   description="An example DAG which runs a OpenMetadata ingestion workflow",
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								   start_date=days_ago(1),
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								   is_paused_upon_creation=False,
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								   catchup=False,
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								) as dag:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								   ingest_task = PythonOperator(
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								       task_id="profile_and_test_using_recipe",
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								       python_callable=metadata_ingestion_workflow,
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								   )
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								```
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								## DBT Integration
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								You can learn more about how to ingest DBT models' definitions and their lineage [here ](https://docs.open-metadata.org/openmetadata/ingestion/workflows/metadata/dbt ).