mirror of
				https://github.com/open-metadata/OpenMetadata.git
				synced 2025-10-31 18:48:35 +00:00 
			
		
		
		
	 c522f14178
			
		
	
	
		c522f14178
		
			
		
	
	
	
	
		
			
			* Refactor output_handlers to a WorkflowOutputHandler class * Add old methods as deprecated to avoid breaking changes * Extract WorkflowInitErrorHandler from workflow_output_handler * Fix static checks * Fix tests * Fix tests * Update code based on comments from PR * Update comment
		
			
				
	
	
		
			856 lines
		
	
	
		
			24 KiB
		
	
	
	
		
			Markdown
		
	
	
	
	
	
			
		
		
	
	
			856 lines
		
	
	
		
			24 KiB
		
	
	
	
		
			Markdown
		
	
	
	
	
	
| ---
 | |
| title: Run the Ingestion Framework Externally
 | |
| slug: /deployment/ingestion/external
 | |
| ---
 | |
| 
 | |
| # Ingestion Framework External Deployment
 | |
| 
 | |
| Any tool capable of running Python code can be used to configure the metadata extraction from your sources.
 | |
| 
 | |
| {% partial file="/v1.5/connectors/python-requirements.md" /%}
 | |
| 
 | |
| ## 1. How does the Ingestion Framework work?
 | |
| 
 | |
| The Ingestion Framework contains all the logic about how to connect to the sources, extract their metadata
 | |
| and send it to the OpenMetadata server. We have built it from scratch with the main idea of making it an independent
 | |
| component that can be run from - **literally** - anywhere.
 | |
| 
 | |
| In order to install it, you just need to get it from [PyPI](https://pypi.org/project/openmetadata-ingestion/).
 | |
| 
 | |
| ```shell
 | |
| pip install openmetadata-ingestion
 | |
| ```
 | |
| 
 | |
| We will show further examples later, but a piece of code is the best showcase for its simplicity. In order to run
 | |
| a full ingestion process, you just need to execute a single function. For example, if we wanted to run the metadata
 | |
| ingestion from within a simple Python script:
 | |
| 
 | |
| ```python
 | |
| from metadata.workflow.metadata import MetadataWorkflow
 | |
|  
 | |
| 
 | |
| # Specify your YAML configuration
 | |
| CONFIG = """
 | |
| source:
 | |
|   ...
 | |
| workflowConfig:
 | |
|   openMetadataServerConfig:
 | |
|     hostPort: 'http://localhost:8585/api'
 | |
|     authProvider: openmetadata
 | |
|     securityConfig:
 | |
|       jwtToken: ...
 | |
| """
 | |
| 
 | |
| def run():
 | |
|     workflow_config = yaml.safe_load(CONFIG)
 | |
|     workflow = MetadataWorkflow.create(workflow_config)
 | |
|     workflow.execute()
 | |
|     workflow.raise_from_status()
 | |
|     workflow.print_status()
 | |
|     workflow.stop()
 | |
| 
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|   run()
 | |
| ```
 | |
| 
 | |
| Where this function runs is completely up to you, and you can adapt it to what makes the most sense within your
 | |
| organization and engineering context. Below you'll see some examples of different orchestrators you can leverage
 | |
| to execute the ingestion process.
 | |
| 
 | |
| ## 2. Ingestion Configuration
 | |
| 
 | |
| In the example above, the `Workflow` class got created from a YAML configuration. Any Workflow that you execute (ingestion,
 | |
| profiler, lineage,...) will have its own YAML representation.
 | |
| 
 | |
| You can think about this configuration as the recipe you want to execute: where is your source, which pieces do you
 | |
| extract, how are they processed and where are they sent.
 | |
| 
 | |
| An example YAML config for extracting MySQL metadata looks like this:
 | |
| 
 | |
| ```yaml
 | |
| source:
 | |
|   type: mysql
 | |
|   serviceName: mysql
 | |
|   serviceConnection:
 | |
|     config:
 | |
|       type: Mysql
 | |
|       username: openmetadata_user
 | |
|       authType:
 | |
|         password: openmetadata_password
 | |
|       hostPort: localhost:3306
 | |
|       databaseSchema: openmetadata_db
 | |
|   sourceConfig:
 | |
|     config:
 | |
|       type: DatabaseMetadata
 | |
| sink:
 | |
|   type: metadata-rest
 | |
|   config: {}
 | |
| workflowConfig:
 | |
|   openMetadataServerConfig:
 | |
|     hostPort: 'http://localhost:8585/api'
 | |
|     authProvider: openmetadata
 | |
|     securityConfig:
 | |
|       jwtToken: ...
 | |
| ```
 | |
| 
 | |
| {% note %}
 | |
| You will find examples of all the workflow's YAML files at each Connector [page](/connectors).
 | |
| {% /note %}
 | |
| 
 | |
| We will now show you examples on how to configure and run every workflow externally by using Snowflake as an example. But
 | |
| first, let's digest some information that will be common everywhere, the `workflowConfig`.
 | |
| 
 | |
| ### Workflow Config
 | |
| 
 | |
| Here you will define information such as where are you hosting the OpenMetadata server, and the JWT token to authenticate.
 | |
| 
 | |
| {% note noteType="Warning" %}
 | |
| 
 | |
| Review this section carefully to ensure you are properly managing service credentials and other security configurations.
 | |
| 
 | |
| {% /note %}
 | |
| 
 | |
| **Logger Level**
 | |
| 
 | |
| You can specify the `loggerLevel` depending on your needs. If you are trying to troubleshoot an ingestion, running
 | |
| with `DEBUG` will give you far more traces for identifying issues.
 | |
| 
 | |
| **JWT Token**
 | |
| 
 | |
| JWT tokens will allow your clients to authenticate against the OpenMetadata server.
 | |
| To enable JWT Tokens, you will get more details [here](/deployment/security/enable-jwt-tokens).
 | |
| 
 | |
| You can refer to the JWT Troubleshooting section [link](/deployment/security/jwt-troubleshooting) for any issues in
 | |
| your JWT configuration.
 | |
| 
 | |
| **Store Service Connection**
 | |
| 
 | |
| If set to `true` (default), we will store the sensitive information either encrypted via the Fernet Key in the database
 | |
| or externally, if you have configured any [Secrets Manager](/deployment/secrets-manager).
 | |
| 
 | |
| If set to `false`, the service will be created, but the service connection information will only be used by the Ingestion
 | |
| Framework at runtime, and won't be sent to the OpenMetadata server.
 | |
| 
 | |
| **Secrets Manager Configuration**
 | |
| 
 | |
| If you have configured any [Secrets Manager](/deployment/secrets-manager), you need to let the Ingestion Framework know
 | |
| how to retrieve the credentials securely.
 | |
| 
 | |
| Follow the [docs](/deployment/secrets-manager) to configure the secret retrieval based on your environment.
 | |
| 
 | |
| **SSL Configuration**
 | |
| 
 | |
| If you have added SSL to the [OpenMetadata server](/deployment/security/enable-ssl), then you will need to handle
 | |
| the certificates when running the ingestion too. You can either set `verifySSL` to `ignore`, or have it as `validate`,
 | |
| which will require you to set the `sslConfig.caCertificate` with a local path where your ingestion runs that points
 | |
| to the server certificate file.
 | |
| 
 | |
| Find more information on how to troubleshoot SSL issues [here](/deployment/security/enable-ssl/ssl-troubleshooting).
 | |
| 
 | |
| ```yaml
 | |
| workflowConfig:
 | |
|   loggerLevel: INFO  # DEBUG, INFO, WARNING or ERROR
 | |
|   openMetadataServerConfig:
 | |
|     hostPort: "http://localhost:8585/api"
 | |
|     authProvider: openmetadata
 | |
|     securityConfig:
 | |
|       jwtToken: "{bot_jwt_token}"
 | |
|     ## Store the service Connection information
 | |
|     # storeServiceConnection: true or false
 | |
|     ## Secrets Manager Configuration
 | |
|     # secretsManagerProvider: aws, azure or noop
 | |
|     # secretsManagerLoader: airflow or env
 | |
|     ## If SSL, fill the following
 | |
|     # verifySSL: validate  # or ignore
 | |
|     # sslConfig:
 | |
|     #   caCertificate: /local/path/to/certificate
 | |
| ```
 | |
| 
 | |
| #### JWT Token with Secrets Manager
 | |
| 
 | |
| If you are using the [Secrets Manager](/deployment/secrets-manager), you can let the Ingestion client to pick up
 | |
| the JWT Token dynamically from the Secrets Manager at runtime. Let's show an example:
 | |
| 
 | |
| We have an OpenMetadata server running with the `managed-aws` Secrets Manager. Since we used the `OPENMETADATA_CLUSTER_NAME` env var
 | |
| as `test`, our `ingestion-bot` JWT Token is safely stored under the secret ID `
 | |
| /test/bot/ingestion-bot/config/jwttoken`.
 | |
| 
 | |
| Now, we can use the following workflow config to run the ingestion without having to pass the token, but just pointing to the secret itself:
 | |
| 
 | |
| ```yaml
 | |
| workflowConfig:
 | |
|   loggerLevel: INFO  # DEBUG, INFO, WARNING or ERROR
 | |
|   openMetadataServerConfig:
 | |
|     hostPort: "http://localhost:8585/api"
 | |
|     authProvider: openmetadata
 | |
|     securityConfig:
 | |
|       jwtToken: "secret:/test/bot/ingestion-bot/config/jwttoken"
 | |
|     secretsManagerProvider: aws
 | |
|     secretsManagerLoader: env
 | |
| ```
 | |
| 
 | |
| Notice how:
 | |
| 1. We specify the `secretsManagerProvider` pointing to `aws`, since that's the manager we are using.
 | |
| 2. We set `secretsManagerLoader` as `env`. Since we're running this from our local, we'll let the AWS credentials to be 
 | |
|   loaded from the local env vars. (When running this using the UI, note that the generated workflows will have this
 | |
|   value set as `airflow`!)
 | |
| 3. We set the `jwtToken` value as `secret:/test/bot/ingestion-bot/config/jwttoken`, which tells the client that
 | |
|   this value is a `secret` located under `/test/bot/ingestion-bot/config/jwttoken`.
 | |
| 
 | |
| 
 | |
| Those are our env vars:
 | |
| 
 | |
| ```
 | |
| export AWS_ACCESS_KEY_ID=...
 | |
| export AWS_SECRET_ACCESS_KEY=...
 | |
| export AWS_DEFAULT_REGION=...
 | |
| ```
 | |
| 
 | |
| And we can run this normally with `metadata ingest -c <path to yaml>`.
 | |
| 
 | |
| {% note %}
 | |
| 
 | |
| Note that **even if you are not using the Secrets Manager for the OpenMetadata Server**, you can still apply the same
 | |
| approach by storing the JWT token manually to the secrets manager, and let the Ingestion client pick it up
 | |
| from there automatically.
 | |
| 
 | |
| {% /note %}
 | |
| 
 | |
| ## 3. (Optional) Ingestion Pipeline
 | |
| 
 | |
| Additionally, if you want to see your runs logged in the `Ingestions` tab of the connectors page in the UI as you would
 | |
| when running the connectors natively with OpenMetadata, you can add the following configuration on your YAMLs:
 | |
| 
 | |
| ```yaml
 | |
| source:
 | |
|   type: mysql
 | |
|   serviceName: mysql
 | |
| [...]
 | |
| workflowConfig:
 | |
|   openMetadataServerConfig:
 | |
|     hostPort: 'http://localhost:8585/api'
 | |
|     authProvider: openmetadata
 | |
|     securityConfig:
 | |
|       jwtToken: ...
 | |
| ingestionPipelineFQN: <serviceName>.<pipelineName>  # E.g., mysql.marketing_metadata`
 | |
| ```
 | |
| 
 | |
| Adding the `ingestionPipelineFQN` - the Ingestion Pipeline Fully Qualified Name - will tell the Ingestion Framework
 | |
| to log the executions and update the ingestion status, which will appear on the UI. Note that the action buttons
 | |
| will be disabled, since OpenMetadata won't be able to interact with external systems.
 | |
| 
 | |
| ## 4. (Optional) Disable the Pipeline Service Client
 | |
| 
 | |
| If you want to run your workflows **ONLY externally** without relying on OpenMetadata for any workflow management
 | |
| or scheduling, you can update the following server configuration:
 | |
| 
 | |
| ```yaml
 | |
| pipelineServiceClientConfiguration:
 | |
|   enabled: ${PIPELINE_SERVICE_CLIENT_ENABLED:-true}
 | |
| ```
 | |
| 
 | |
| by setting `enabled: false` or setting the `PIPELINE_SERVICE_CLIENT_ENABLED=false` as an environment variable.
 | |
| 
 | |
| This will stop certain APIs and monitors related to the Pipeline Service Client (e.g., Airflow) from being operative.
 | |
| 
 | |
| ## Examples
 | |
| 
 | |
| {% note %}
 | |
| 
 | |
| This is not an exhaustive list, and it will keep growing over time. Not because the orchestrators X or Y are not supported,
 | |
| but just because we did not have the time yet to add it here. If you'd like to chip in and help us expand these guides and examples,
 | |
| don't hesitate to reach to us in [Slack](https://slack.open-metadata.org/) or directly open a PR in
 | |
| [GitHub](https://github.com/open-metadata/OpenMetadata/tree/main/openmetadata-docs/content).
 | |
| 
 | |
| {% /note %}
 | |
| {% inlineCalloutContainer %}
 | |
|   {% inlineCallout
 | |
|     color="violet-70"
 | |
|     icon="10k"
 | |
|     bold="Airflow"
 | |
|     href="/deployment/ingestion/external/airflow" %}
 | |
|     Run the ingestion process externally from Airflow
 | |
|   {% /inlineCallout %}
 | |
|   {% inlineCallout
 | |
|     color="violet-70"
 | |
|     icon="10k"
 | |
|     bold="MWAA"
 | |
|     href="/deployment/ingestion/external/mwaa" %}
 | |
|     Run the ingestion process externally using AWS MWAA
 | |
|   {% /inlineCallout %}
 | |
|   {% inlineCallout
 | |
|     color="violet-70"
 | |
|     icon="10k"
 | |
|     bold="GCS Composer"
 | |
|     href="/deployment/ingestion/external/gcs-composer" %}
 | |
|     Run the ingestion process externally from GCS Composer
 | |
|   {% /inlineCallout %}
 | |
|   {% inlineCallout
 | |
|     color="violet-70"
 | |
|     icon="10k"
 | |
|     bold="GitHub Actions"
 | |
|     href="/deployment/ingestion/external/github-actions" %}
 | |
|     Run the ingestion process externally from GitHub Actions
 | |
|   {% /inlineCallout %}
 | |
| {% /inlineCalloutContainer %}
 | |
| 
 | |
| Let's jump now into some examples on how you could create the function the run the different workflows. Note that this code
 | |
| can then be executed inside a DAG, a GitHub action, or a vanilla Python script. It will work for any environment.
 | |
| 
 | |
| ### Testing
 | |
| 
 | |
| You can easily test every YAML configuration using the `metadata` CLI from the Ingestion Framework.
 | |
| In order to install it, you just need to get it from [PyPI](https://pypi.org/project/openmetadata-ingestion/).
 | |
| 
 | |
| In each of the examples below, we'll showcase how to run the CLI, assuming you have a YAML file that contains
 | |
| the workflow configuration.
 | |
| 
 | |
| ### Metadata Workflow
 | |
| 
 | |
| This is the first workflow you have to configure and run. It will take care of fetching the metadata from your sources,
 | |
| be it Database Services, Dashboard Services, Pipelines, etc.
 | |
| 
 | |
| The rest of the workflows (Lineage, Profiler,...) will be executed on top of the metadata already available in the platform.
 | |
| 
 | |
| {% codePreview %}
 | |
| 
 | |
| {% codeInfoContainer %}
 | |
| 
 | |
| {% codeInfo srNumber=1 %}
 | |
| **Adding the imports**
 | |
| 
 | |
| The first step is to import the `MetadataWorkflow` class, which will take care of the full ingestion logic. We'll
 | |
| add the import for printing the results at the end.
 | |
| {% /codeInfo %}
 | |
| 
 | |
| {% codeInfo srNumber=2 %}
 | |
| **Defining the YAML**
 | |
| 
 | |
| Then, we need to pass the YAML configuration. For this simple example we are defining a variable, but you can
 | |
| read from a file, parse secrets from your environment, or any other approach you'd need. In the end, it's just
 | |
| Python code.
 | |
| 
 | |
| {% note %}
 | |
| You can find complete YAMLs in each connector [docs](/connectors) and find more information about the available
 | |
| configurations.
 | |
| {% /note %}
 | |
| 
 | |
| {% /codeInfo %}
 | |
| 
 | |
| {% codeInfo srNumber=3 %}
 | |
| **Preparing the Workflow**
 | |
| 
 | |
| Finally, we'll prepare a function that we can execute anywhere.
 | |
| 
 | |
| It will take care of instantiating the workflow, executing it and giving us the results.
 | |
| {% /codeInfo %}
 | |
| 
 | |
| {% /codeInfoContainer %}
 | |
| 
 | |
| {% codeBlock fileName="ingestion.py" %}
 | |
| 
 | |
| ```python {% isCodeBlock=true %}
 | |
| import yaml
 | |
| 
 | |
| ```
 | |
| 
 | |
| ```python  {% srNumber=1 %}
 | |
| from metadata.workflow.metadata import MetadataWorkflow
 | |
|  
 | |
| 
 | |
| ```
 | |
| 
 | |
| ```python {% srNumber=2 %}
 | |
| 
 | |
| CONFIG = """
 | |
| source:
 | |
|   type: snowflake
 | |
|   serviceName: <service name>
 | |
|   serviceConnection:
 | |
|     config:
 | |
|       type: Snowflake
 | |
|       ...
 | |
|   sourceConfig:
 | |
|     config:
 | |
|       type: DatabaseMetadata
 | |
|       markDeletedTables: true
 | |
|       includeTables: true
 | |
|       ...
 | |
| sink:
 | |
|   type: metadata-rest
 | |
|   config: {}
 | |
| workflowConfig:
 | |
|   openMetadataServerConfig:
 | |
|     hostPort: "http://localhost:8585/api"
 | |
|     authProvider: openmetadata
 | |
|     securityConfig:
 | |
|       jwtToken: "{bot_jwt_token}"
 | |
| """
 | |
| 
 | |
| ```
 | |
| 
 | |
| ```python {% srNumber=3 %}
 | |
| 
 | |
| def run():
 | |
|     workflow = MetadataWorkflow.create(CONFIG)
 | |
|     workflow.execute()
 | |
|     workflow.raise_from_status()
 | |
|     workflow.print_status()
 | |
|     workflow.stop()
 | |
| ```
 | |
| 
 | |
| {% /codeBlock %}
 | |
| 
 | |
| {% /codePreview %}
 | |
| 
 | |
| {% note %}
 | |
| 
 | |
| You can test the workflow via `metadata ingest -c <path-to-yaml>`.
 | |
| 
 | |
| {% /note %}
 | |
| 
 | |
| 
 | |
| ### Lineage Workflow
 | |
| 
 | |
| This workflow will take care of scanning your query history and defining lineage relationships between your tables.
 | |
| 
 | |
| You can find more information about this workflow [here](/connectors/ingestion/lineage).
 | |
| 
 | |
| {% codePreview %}
 | |
| 
 | |
| {% codeInfoContainer %}
 | |
| 
 | |
| {% codeInfo srNumber=1 %}
 | |
| **Adding the imports**
 | |
| 
 | |
| The first step is to import the `MetadataWorkflow` class, which will take care of the full ingestion logic. We'll
 | |
| add the import for printing the results at the end.
 | |
| 
 | |
| Note that we are using the same class as in the Metadata Ingestion.
 | |
| {% /codeInfo %}
 | |
| 
 | |
| {% codeInfo srNumber=2 %}
 | |
| **Defining the YAML**
 | |
| 
 | |
| Then, we need to pass the YAML configuration. For this simple example we are defining a variable, but you can
 | |
| read from a file, parse secrets from your environment, or any other approach you'd need.
 | |
| 
 | |
| Note how we have not added here the `serviceConnection`. Since the service would have been created during the
 | |
| metadata ingestion, we can let the Ingestion Framework dynamically fetch the Service Connection information.
 | |
| 
 | |
| If, however, you are configuring the workflow with `storeServiceConnection: false`, you'll need to explicitly
 | |
| define the `serviceConnection`.
 | |
| 
 | |
| {% note %}
 | |
| You can find complete YAMLs in each connector [docs](/connectors) and find more information about the available
 | |
| configurations.
 | |
| {% /note %}
 | |
| 
 | |
| {% /codeInfo %}
 | |
| 
 | |
| {% codeInfo srNumber=3 %}
 | |
| **Preparing the Workflow**
 | |
| 
 | |
| Finally, we'll prepare a function that we can execute anywhere.
 | |
| 
 | |
| It will take care of instantiating the workflow, executing it and giving us the results.
 | |
| {% /codeInfo %}
 | |
| 
 | |
| {% /codeInfoContainer %}
 | |
| 
 | |
| {% codeBlock fileName="ingestion.py" %}
 | |
| 
 | |
| ```python {% isCodeBlock=true %}
 | |
| import yaml
 | |
| 
 | |
| ```
 | |
| 
 | |
| ```python  {% srNumber=1 %}
 | |
| from metadata.workflow.metadata import MetadataWorkflow
 | |
|  
 | |
| 
 | |
| ```
 | |
| 
 | |
| ```python {% srNumber=2 %}
 | |
| 
 | |
| CONFIG = """
 | |
| source:
 | |
|   type: snowflake-lineage
 | |
|   serviceName: <service name>
 | |
|   sourceConfig:
 | |
|     config:
 | |
|       type: DatabaseLineage
 | |
|       queryLogDuration: 1
 | |
|       parsingTimeoutLimit: 300
 | |
|       ...
 | |
| sink:
 | |
|   type: metadata-rest
 | |
|   config: {}
 | |
| workflowConfig:
 | |
|   openMetadataServerConfig:
 | |
|     hostPort: "http://localhost:8585/api"
 | |
|     authProvider: openmetadata
 | |
|     securityConfig:
 | |
|       jwtToken: "{bot_jwt_token}"
 | |
| """
 | |
| 
 | |
| ```
 | |
| 
 | |
| ```python {% srNumber=3 %}
 | |
| 
 | |
| def run():
 | |
|     workflow = MetadataWorkflow.create(CONFIG)
 | |
|     workflow.execute()
 | |
|     workflow.raise_from_status()
 | |
|     workflow.print_status()
 | |
|     workflow.stop()
 | |
| ```
 | |
| 
 | |
| {% /codeBlock %}
 | |
| 
 | |
| {% /codePreview %}
 | |
| 
 | |
| {% note %}
 | |
| 
 | |
| You can test the workflow via `metadata ingest -c <path-to-yaml>`.
 | |
| 
 | |
| {% /note %}
 | |
| 
 | |
| 
 | |
| ### Usage Workflow
 | |
| 
 | |
| As with the lineage workflow, we'll scan the query history for any DML statements. The goal is to ingest queries
 | |
| into the platform, figure out the relevancy of your assets and frequently joined tables.
 | |
| 
 | |
| {% codePreview %}
 | |
| 
 | |
| {% codeInfoContainer %}
 | |
| 
 | |
| {% codeInfo srNumber=1 %}
 | |
| **Adding the imports**
 | |
| 
 | |
| The first step is to import the `UsageWorkflow` class, which will take care of the full ingestion logic. We'll
 | |
| add the import for printing the results at the end.
 | |
| 
 | |
| {% /codeInfo %}
 | |
| 
 | |
| {% codeInfo srNumber=2 %}
 | |
| **Defining the YAML**
 | |
| 
 | |
| Then, we need to pass the YAML configuration. For this simple example we are defining a variable, but you can
 | |
| read from a file, parse secrets from your environment, or any other approach you'd need.
 | |
| 
 | |
| Note how we have not added here the `serviceConnection`. Since the service would have been created during the
 | |
| metadata ingestion, we can let the Ingestion Framework dynamically fetch the Service Connection information.
 | |
| 
 | |
| If, however, you are configuring the workflow with `storeServiceConnection: false`, you'll need to explicitly
 | |
| define the `serviceConnection`.
 | |
| 
 | |
| 
 | |
| {% note %}
 | |
| You can find complete YAMLs in each connector [docs](/connectors) and find more information about the available
 | |
| configurations.
 | |
| {% /note %}
 | |
| 
 | |
| {% /codeInfo %}
 | |
| 
 | |
| {% codeInfo srNumber=3 %}
 | |
| **Preparing the Workflow**
 | |
| 
 | |
| Finally, we'll prepare a function that we can execute anywhere.
 | |
| 
 | |
| It will take care of instantiating the workflow, executing it and giving us the results.
 | |
| {% /codeInfo %}
 | |
| 
 | |
| {% /codeInfoContainer %}
 | |
| 
 | |
| {% codeBlock fileName="ingestion.py" %}
 | |
| 
 | |
| ```python {% isCodeBlock=true %}
 | |
| import yaml
 | |
| 
 | |
| ```
 | |
| 
 | |
| ```python  {% srNumber=1 %}
 | |
| from metadata.workflow.usage import UsageWorkflow
 | |
|  
 | |
| 
 | |
| ```
 | |
| 
 | |
| ```python {% srNumber=2 %}
 | |
| 
 | |
| CONFIG = """
 | |
| source:
 | |
|   type: snowflake-usage
 | |
|   serviceName: <service name>
 | |
|   sourceConfig:
 | |
|     config:
 | |
|       type: DatabaseUsage
 | |
|       queryLogDuration: 1
 | |
|       parsingTimeoutLimit: 300
 | |
|       ...
 | |
| processor:
 | |
|   type: query-parser
 | |
|   config: {}
 | |
| stage:
 | |
|   type: table-usage
 | |
|   config:
 | |
|     filename: "/tmp/snowflake_usage"
 | |
| bulkSink:
 | |
|   type: metadata-usage
 | |
|   config:
 | |
|     filename: "/tmp/snowflake_usage"
 | |
| workflowConfig:
 | |
|   openMetadataServerConfig:
 | |
|     hostPort: "http://localhost:8585/api"
 | |
|     authProvider: openmetadata
 | |
|     securityConfig:
 | |
|       jwtToken: "{bot_jwt_token}"
 | |
| """
 | |
| 
 | |
| ```
 | |
| 
 | |
| ```python {% srNumber=3 %}
 | |
| 
 | |
| def run():
 | |
|     workflow = UsageWorkflow.create(CONFIG)
 | |
|     workflow.execute()
 | |
|     workflow.raise_from_status()
 | |
|     workflow.print_status()
 | |
|     workflow.stop()
 | |
| ```
 | |
| 
 | |
| {% /codeBlock %}
 | |
| 
 | |
| {% /codePreview %}
 | |
| 
 | |
| {% note %}
 | |
| 
 | |
| You can test the workflow via `metadata usage -c <path-to-yaml>`.
 | |
| 
 | |
| {% /note %}
 | |
| 
 | |
| ### Profiler Workflow
 | |
| 
 | |
| This workflow will execute queries against your database and send the results into OpenMetadata. The goal is to compute
 | |
| metrics about your data and give you a high-level view of its shape, together with the sample data.
 | |
| 
 | |
| This is an interesting previous step before creating Data Quality Workflows.
 | |
| 
 | |
| You can find more information about this workflow [here](/how-to-guides/data-quality-observability/profiler/workflow).
 | |
| 
 | |
| {% codePreview %}
 | |
| 
 | |
| {% codeInfoContainer %}
 | |
| 
 | |
| {% codeInfo srNumber=1 %}
 | |
| **Adding the imports**
 | |
| 
 | |
| The first step is to import the `ProfilerWorkflow` class, which will take care of the full ingestion logic. We'll
 | |
| add the import for printing the results at the end.
 | |
| 
 | |
| {% /codeInfo %}
 | |
| 
 | |
| {% codeInfo srNumber=2 %}
 | |
| **Defining the YAML**
 | |
| 
 | |
| Then, we need to pass the YAML configuration. For this simple example we are defining a variable, but you can
 | |
| read from a file, parse secrets from your environment, or any other approach you'd need.
 | |
| 
 | |
| Note how we have not added here the `serviceConnection`. Since the service would have been created during the
 | |
| metadata ingestion, we can let the Ingestion Framework dynamically fetch the Service Connection information.
 | |
| 
 | |
| If, however, you are configuring the workflow with `storeServiceConnection: false`, you'll need to explicitly
 | |
| define the `serviceConnection`.
 | |
| 
 | |
| {% note %}
 | |
| You can find complete YAMLs in each connector [docs](/connectors) and find more information about the available
 | |
| configurations.
 | |
| {% /note %}
 | |
| 
 | |
| {% /codeInfo %}
 | |
| 
 | |
| {% codeInfo srNumber=3 %}
 | |
| **Preparing the Workflow**
 | |
| 
 | |
| Finally, we'll prepare a function that we can execute anywhere.
 | |
| 
 | |
| It will take care of instantiating the workflow, executing it and giving us the results.
 | |
| {% /codeInfo %}
 | |
| 
 | |
| {% /codeInfoContainer %}
 | |
| 
 | |
| {% codeBlock fileName="ingestion.py" %}
 | |
| 
 | |
| ```python {% isCodeBlock=true %}
 | |
| import yaml
 | |
| 
 | |
| ```
 | |
| 
 | |
| ```python  {% srNumber=1 %}
 | |
| from metadata.workflow.profiler import ProfilerWorkflow
 | |
|  
 | |
| 
 | |
| ```
 | |
| 
 | |
| ```python {% srNumber=2 %}
 | |
| 
 | |
| CONFIG = """
 | |
| source:
 | |
|   type: snowflake
 | |
|   serviceName: <service name>
 | |
|   sourceConfig:
 | |
|     config:
 | |
|       type: Profiler
 | |
|       generateSampleData: true
 | |
|       ...
 | |
| processor:
 | |
|   type: orm-profiler
 | |
|   config: {}
 | |
| sink:
 | |
|   type: metadata-rest
 | |
|   config: {}
 | |
| workflowConfig:
 | |
|   openMetadataServerConfig:
 | |
|     hostPort: "http://localhost:8585/api"
 | |
|     authProvider: openmetadata
 | |
|     securityConfig:
 | |
|       jwtToken: "{bot_jwt_token}"
 | |
| """
 | |
| 
 | |
| ```
 | |
| 
 | |
| ```python {% srNumber=3 %}
 | |
| 
 | |
| def run():
 | |
|     workflow = ProfilerWorkflow.create(CONFIG)
 | |
|     workflow.execute()
 | |
|     workflow.raise_from_status()
 | |
|     workflow.print_status()
 | |
|     workflow.stop()
 | |
| ```
 | |
| 
 | |
| {% /codeBlock %}
 | |
| 
 | |
| {% /codePreview %}
 | |
| 
 | |
| {% note %}
 | |
| 
 | |
| You can test the workflow via `metadata profile -c <path-to-yaml>`.
 | |
| 
 | |
| {% /note %}
 | |
| 
 | |
| 
 | |
| ### Data Quality Workflow
 | |
| 
 | |
| This workflow will execute queries against your database and send the results into OpenMetadata. The goal is to compute
 | |
| metrics about your data and give you a high-level view of its shape, together with the sample data.
 | |
| 
 | |
| This is an interesting previous step before creating Data Quality Workflows.
 | |
| 
 | |
| You can find more information about this workflow [here](/how-to-guides/data-quality-observability/quality/configure).
 | |
| 
 | |
| {% codePreview %}
 | |
| 
 | |
| {% codeInfoContainer %}
 | |
| 
 | |
| {% codeInfo srNumber=1 %}
 | |
| **Adding the imports**
 | |
| 
 | |
| The first step is to import the `TestSuiteWorkflow` class, which will take care of the full ingestion logic. We'll
 | |
| add the import for printing the results at the end.
 | |
| 
 | |
| {% /codeInfo %}
 | |
| 
 | |
| {% codeInfo srNumber=2 %}
 | |
| **Defining the YAML**
 | |
| 
 | |
| Then, we need to pass the YAML configuration. For this simple example we are defining a variable, but you can
 | |
| read from a file, parse secrets from your environment, or any other approach you'd need.
 | |
| 
 | |
| Note how we have not added here the `serviceConnection`. Since the service would have been created during the
 | |
| metadata ingestion, we can let the Ingestion Framework dynamically fetch the Service Connection information.
 | |
| 
 | |
| If, however, you are configuring the workflow with `storeServiceConnection: false`, you'll need to explicitly
 | |
| define the `serviceConnection`.
 | |
| 
 | |
| Moreover, see how we are not configuring any tests in the `processor`. You can do [that](/how-to-guides/data-quality-observability/quality/configure#full-yaml-config-example),
 | |
| but even if nothing gets defined in the YAML, we will execute all the tests configured against the table.
 | |
| 
 | |
| {% note %}
 | |
| You can find complete YAMLs in each connector [docs](/connectors) and find more information about the available
 | |
| configurations.
 | |
| {% /note %}
 | |
| 
 | |
| {% /codeInfo %}
 | |
| 
 | |
| {% codeInfo srNumber=3 %}
 | |
| **Preparing the Workflow**
 | |
| 
 | |
| Finally, we'll prepare a function that we can execute anywhere.
 | |
| 
 | |
| It will take care of instantiating the workflow, executing it and giving us the results.
 | |
| {% /codeInfo %}
 | |
| 
 | |
| {% /codeInfoContainer %}
 | |
| 
 | |
| {% codeBlock fileName="ingestion.py" %}
 | |
| 
 | |
| ```python {% isCodeBlock=true %}
 | |
| import yaml
 | |
| 
 | |
| ```
 | |
| 
 | |
| ```python  {% srNumber=1 %}
 | |
| from metadata.workflow.data_quality import TestSuiteWorkflow
 | |
|  
 | |
| 
 | |
| ```
 | |
| 
 | |
| ```python {% srNumber=2 %}
 | |
| 
 | |
| CONFIG = """
 | |
| source:
 | |
|   type: TestSuite
 | |
|   serviceName: <service name>
 | |
|   sourceConfig:
 | |
|     config:
 | |
|       type: TestSuite
 | |
|       entityFullyQualifiedName: <Table FQN, e.g., `service.database.schema.table`>
 | |
| processor:
 | |
|   type: orm-test-runner
 | |
|   config: {}
 | |
| sink:
 | |
|   type: metadata-rest
 | |
|   config: {}
 | |
| workflowConfig:
 | |
|   openMetadataServerConfig:
 | |
|     hostPort: "http://localhost:8585/api"
 | |
|     authProvider: openmetadata
 | |
|     securityConfig:
 | |
|       jwtToken: "{bot_jwt_token}"
 | |
| """
 | |
| 
 | |
| ```
 | |
| 
 | |
| ```python {% srNumber=3 %}
 | |
| 
 | |
| def run():
 | |
|     workflow = TestSuiteWorkflow.create(CONFIG)
 | |
|     workflow.execute()
 | |
|     workflow.raise_from_status()
 | |
|     workflow.print_status()
 | |
|     workflow.stop()
 | |
| ```
 | |
| 
 | |
| {% /codeBlock %}
 | |
| 
 | |
| {% /codePreview %}
 | |
| 
 | |
| {% note %}
 | |
| 
 | |
| You can test the workflow via `metadata test -c <path-to-yaml>`.
 | |
| 
 | |
| {% /note %}
 |