The Collate Ingestion Agent is designed to facilitate metadata ingestion for hybrid deployments, allowing organizations to securely push metadata from their infrastructure into the Collate platform without exposing their internal systems. It provides a secure and efficient channel for running ingestion workflows while maintaining full control over data processing within your network. This document outlines the setup and usage of the Collate Ingestion Agent, emphasizing its role in hybrid environments and key functionalities.
### Overview
The Collate Ingestion Agent is ideal for scenarios where running connectors on-premises is necessary, providing a secure and efficient way to process metadata within your infrastructure. This eliminates concerns about data privacy and streamlines the ingestion process.
With the Collate Ingestion Agent, you can:
- Set up ingestion workflows easily without configuring YAML files manually.
- Leverage the Collate UI for a seamless and user-friendly experience.
- Manage various ingestion types, including metadata, profiling, lineage, usage, DBT, and data quality.
### Setting Up the Collate Ingestion Agent
#### 1. Prepare Your Environment
To begin, download the Collate-provided Docker image for the Ingestion Agent. The Collate team will provide the necessary credentials to authenticate and pull the image from the repository.
**Run the following commands:**
- **Log in to Docker**: Use the credentials provided by Collate to authenticate.
- **Pull the Docker Image**: Run the command to pull the image into your local environment.
Once the image is downloaded, you can start the Docker container to initialize the Ingestion Agent.
Run the ingestion process externally from GitHub Actions
{% /inlineCallout %}
{% /inlineCalloutContainer %}
Let's jump now into some examples on how you could create the function the run the different workflows. Note that this code
can then be executed inside a DAG, a GitHub action, or a vanilla Python script. It will work for any environment.
### Testing
You can easily test every YAML configuration using the `metadata` CLI from the Ingestion Framework.
In order to install it, you just need to get it from [PyPI](https://pypi.org/project/openmetadata-ingestion/).
In each of the examples below, we'll showcase how to run the CLI, assuming you have a YAML file that contains
the workflow configuration.
### Metadata Workflow
This is the first workflow you have to configure and run. It will take care of fetching the metadata from your sources,
be it Database Services, Dashboard Services, Pipelines, etc.
The rest of the workflows (Lineage, Profiler,...) will be executed on top of the metadata already available in the platform.
{% codePreview %}
{% codeInfoContainer %}
{% codeInfo srNumber=1 %}
**Adding the imports**
The first step is to import the `MetadataWorkflow` class, which will take care of the full ingestion logic. We'll
add the import for printing the results at the end.
{% /codeInfo %}
{% codeInfo srNumber=2 %}
**Defining the YAML**
Then, we need to pass the YAML configuration. For this simple example we are defining a variable, but you can
read from a file, parse secrets from your environment, or any other approach you'd need. In the end, it's just
Python code.
{% note %}
You can find complete YAMLs in each connector [docs](/connectors) and find more information about the available
configurations.
{% /note %}
{% /codeInfo %}
{% codeInfo srNumber=3 %}
**Preparing the Workflow**
Finally, we'll prepare a function that we can execute anywhere.
It will take care of instantiating the workflow, executing it and giving us the results.
{% /codeInfo %}
{% /codeInfoContainer %}
{% codeBlock fileName="ingestion.py" %}
```python {% isCodeBlock=true %}
import yaml
```
```python {% srNumber=1 %}
from metadata.workflow.metadata import MetadataWorkflow
```
```python {% srNumber=2 %}
CONFIG = """
source:
type: snowflake
serviceName: <servicename>
serviceConnection:
config:
type: Snowflake
...
sourceConfig:
config:
type: DatabaseMetadata
markDeletedTables: true
includeTables: true
...
sink:
type: metadata-rest
config: {}
workflowConfig:
openMetadataServerConfig:
hostPort: "http://localhost:8585/api"
authProvider: openmetadata
securityConfig:
jwtToken: "{bot_jwt_token}"
"""
```
```python {% srNumber=3 %}
def run():
workflow = MetadataWorkflow.create(CONFIG)
workflow.execute()
workflow.raise_from_status()
workflow.print_status()
workflow.stop()
```
{% /codeBlock %}
{% /codePreview %}
{% note %}
You can test the workflow via `metadata ingest -c <path-to-yaml>`.
{% /note %}
### Lineage Workflow
This workflow will take care of scanning your query history and defining lineage relationships between your tables.
You can find more information about this workflow [here](/connectors/ingestion/lineage).
{% codePreview %}
{% codeInfoContainer %}
{% codeInfo srNumber=1 %}
**Adding the imports**
The first step is to import the `MetadataWorkflow` class, which will take care of the full ingestion logic. We'll
add the import for printing the results at the end.
Note that we are using the same class as in the Metadata Ingestion.
{% /codeInfo %}
{% codeInfo srNumber=2 %}
**Defining the YAML**
Then, we need to pass the YAML configuration. For this simple example we are defining a variable, but you can
read from a file, parse secrets from your environment, or any other approach you'd need.
Note how we have not added here the `serviceConnection`. Since the service would have been created during the
metadata ingestion, we can let the Ingestion Framework dynamically fetch the Service Connection information.
If, however, you are configuring the workflow with `storeServiceConnection: false`, you'll need to explicitly
define the `serviceConnection`.
{% note %}
You can find complete YAMLs in each connector [docs](/connectors) and find more information about the available
configurations.
{% /note %}
{% /codeInfo %}
{% codeInfo srNumber=3 %}
**Preparing the Workflow**
Finally, we'll prepare a function that we can execute anywhere.
It will take care of instantiating the workflow, executing it and giving us the results.
{% /codeInfo %}
{% /codeInfoContainer %}
{% codeBlock fileName="ingestion.py" %}
```python {% isCodeBlock=true %}
import yaml
```
```python {% srNumber=1 %}
from metadata.workflow.metadata import MetadataWorkflow
```
```python {% srNumber=2 %}
CONFIG = """
source:
type: snowflake-lineage
serviceName: <servicename>
sourceConfig:
config:
type: DatabaseLineage
queryLogDuration: 1
parsingTimeoutLimit: 300
...
sink:
type: metadata-rest
config: {}
workflowConfig:
openMetadataServerConfig:
hostPort: "http://localhost:8585/api"
authProvider: openmetadata
securityConfig:
jwtToken: "{bot_jwt_token}"
"""
```
```python {% srNumber=3 %}
def run():
workflow = MetadataWorkflow.create(CONFIG)
workflow.execute()
workflow.raise_from_status()
workflow.print_status()
workflow.stop()
```
{% /codeBlock %}
{% /codePreview %}
{% note %}
You can test the workflow via `metadata ingest -c <path-to-yaml>`.
{% /note %}
### Usage Workflow
As with the lineage workflow, we'll scan the query history for any DML statements. The goal is to ingest queries
into the platform, figure out the relevancy of your assets and frequently joined tables.
{% codePreview %}
{% codeInfoContainer %}
{% codeInfo srNumber=1 %}
**Adding the imports**
The first step is to import the `UsageWorkflow` class, which will take care of the full ingestion logic. We'll
add the import for printing the results at the end.
{% /codeInfo %}
{% codeInfo srNumber=2 %}
**Defining the YAML**
Then, we need to pass the YAML configuration. For this simple example we are defining a variable, but you can
read from a file, parse secrets from your environment, or any other approach you'd need.
Note how we have not added here the `serviceConnection`. Since the service would have been created during the
metadata ingestion, we can let the Ingestion Framework dynamically fetch the Service Connection information.
If, however, you are configuring the workflow with `storeServiceConnection: false`, you'll need to explicitly
define the `serviceConnection`.
{% note %}
You can find complete YAMLs in each connector [docs](/connectors) and find more information about the available
configurations.
{% /note %}
{% /codeInfo %}
{% codeInfo srNumber=3 %}
**Preparing the Workflow**
Finally, we'll prepare a function that we can execute anywhere.
It will take care of instantiating the workflow, executing it and giving us the results.
{% /codeInfo %}
{% /codeInfoContainer %}
{% codeBlock fileName="ingestion.py" %}
```python {% isCodeBlock=true %}
import yaml
```
```python {% srNumber=1 %}
from metadata.workflow.usage import UsageWorkflow
```
```python {% srNumber=2 %}
CONFIG = """
source:
type: snowflake-usage
serviceName: <servicename>
sourceConfig:
config:
type: DatabaseUsage
queryLogDuration: 1
parsingTimeoutLimit: 300
...
processor:
type: query-parser
config: {}
stage:
type: table-usage
config:
filename: "/tmp/snowflake_usage"
bulkSink:
type: metadata-usage
config:
filename: "/tmp/snowflake_usage"
workflowConfig:
openMetadataServerConfig:
hostPort: "http://localhost:8585/api"
authProvider: openmetadata
securityConfig:
jwtToken: "{bot_jwt_token}"
"""
```
```python {% srNumber=3 %}
def run():
workflow = UsageWorkflow.create(CONFIG)
workflow.execute()
workflow.raise_from_status()
workflow.print_status()
workflow.stop()
```
{% /codeBlock %}
{% /codePreview %}
{% note %}
You can test the workflow via `metadata usage -c <path-to-yaml>`.
{% /note %}
### Profiler Workflow
This workflow will execute queries against your database and send the results into OpenMetadata. The goal is to compute
metrics about your data and give you a high-level view of its shape, together with the sample data.
This is an interesting previous step before creating Data Quality Workflows.
You can find more information about this workflow [here](/how-to-guides/data-quality-observability/profiler/workflow).
{% codePreview %}
{% codeInfoContainer %}
{% codeInfo srNumber=1 %}
**Adding the imports**
The first step is to import the `ProfilerWorkflow` class, which will take care of the full ingestion logic. We'll
add the import for printing the results at the end.
{% /codeInfo %}
{% codeInfo srNumber=2 %}
**Defining the YAML**
Then, we need to pass the YAML configuration. For this simple example we are defining a variable, but you can
read from a file, parse secrets from your environment, or any other approach you'd need.
Note how we have not added here the `serviceConnection`. Since the service would have been created during the
metadata ingestion, we can let the Ingestion Framework dynamically fetch the Service Connection information.
If, however, you are configuring the workflow with `storeServiceConnection: false`, you'll need to explicitly
define the `serviceConnection`.
{% note %}
You can find complete YAMLs in each connector [docs](/connectors) and find more information about the available
configurations.
{% /note %}
{% /codeInfo %}
{% codeInfo srNumber=3 %}
**Preparing the Workflow**
Finally, we'll prepare a function that we can execute anywhere.
It will take care of instantiating the workflow, executing it and giving us the results.
{% /codeInfo %}
{% /codeInfoContainer %}
{% codeBlock fileName="ingestion.py" %}
```python {% isCodeBlock=true %}
import yaml
```
```python {% srNumber=1 %}
from metadata.workflow.profiler import ProfilerWorkflow
```
```python {% srNumber=2 %}
CONFIG = """
source:
type: snowflake
serviceName: <servicename>
sourceConfig:
config:
type: Profiler
generateSampleData: true
...
processor:
type: orm-profiler
config: {}
sink:
type: metadata-rest
config: {}
workflowConfig:
openMetadataServerConfig:
hostPort: "http://localhost:8585/api"
authProvider: openmetadata
securityConfig:
jwtToken: "{bot_jwt_token}"
"""
```
```python {% srNumber=3 %}
def run():
workflow = ProfilerWorkflow.create(CONFIG)
workflow.execute()
workflow.raise_from_status()
workflow.print_status()
workflow.stop()
```
{% /codeBlock %}
{% /codePreview %}
{% note %}
You can test the workflow via `metadata profile -c <path-to-yaml>`.
{% /note %}
### Data Quality Workflow
This workflow will execute queries against your database and send the results into OpenMetadata. The goal is to compute
metrics about your data and give you a high-level view of its shape, together with the sample data.
This is an interesting previous step before creating Data Quality Workflows.
You can find more information about this workflow [here](/how-to-guides/data-quality-observability/quality/configure).
{% codePreview %}
{% codeInfoContainer %}
{% codeInfo srNumber=1 %}
**Adding the imports**
The first step is to import the `TestSuiteWorkflow` class, which will take care of the full ingestion logic. We'll
add the import for printing the results at the end.
{% /codeInfo %}
{% codeInfo srNumber=2 %}
**Defining the YAML**
Then, we need to pass the YAML configuration. For this simple example we are defining a variable, but you can
read from a file, parse secrets from your environment, or any other approach you'd need.
Note how we have not added here the `serviceConnection`. Since the service would have been created during the
metadata ingestion, we can let the Ingestion Framework dynamically fetch the Service Connection information.
If, however, you are configuring the workflow with `storeServiceConnection: false`, you'll need to explicitly
define the `serviceConnection`.
Moreover, see how we are not configuring any tests in the `processor`. You can do [that](/how-to-guides/data-quality-observability/quality/configure#full-yaml-config-example),
but even if nothing gets defined in the YAML, we will execute all the tests configured against the table.
{% note %}
You can find complete YAMLs in each connector [docs](/connectors) and find more information about the available
configurations.
{% /note %}
{% /codeInfo %}
{% codeInfo srNumber=3 %}
**Preparing the Workflow**
Finally, we'll prepare a function that we can execute anywhere.
It will take care of instantiating the workflow, executing it and giving us the results.
{% /codeInfo %}
{% /codeInfoContainer %}
{% codeBlock fileName="ingestion.py" %}
```python {% isCodeBlock=true %}
import yaml
```
```python {% srNumber=1 %}
from metadata.workflow.data_quality import TestSuiteWorkflow