645 lines
20 KiB
Markdown
Raw Normal View History

---
title: Run Postgres Connector using Airflow SDK
slug: /connectors/database/postgres/airflow
---
2022-08-27 02:57:09 +02:00
# Run Postgres using the Airflow SDK
2022-08-27 02:57:09 +02:00
In this section, we provide guides and references to use the Postgres connector.
2022-08-27 02:57:09 +02:00
Configure and schedule Postgres metadata and profiler workflows from the OpenMetadata UI:
- [Requirements](#requirements)
- [Metadata Ingestion](#metadata-ingestion)
- [Query Usage and Lineage Ingestion](#query-usage-and-lineage-ingestion)
- [Data Profiler](#data-profiler)
- [dbt Integration](#dbt-integration)
2022-08-27 02:57:09 +02:00
## Requirements
2022-08-27 02:57:09 +02:00
<InlineCallout color="violet-70" icon="description" bold="OpenMetadata 0.12 or later" href="/deployment">
To deploy OpenMetadata, check the <a href="/deployment">Deployment</a> guides.
</InlineCallout>
To run the Ingestion via the UI you'll need to use the OpenMetadata Ingestion Container, which comes shipped with
custom Airflow plugins to handle the workflow deployment.
2022-11-30 10:29:48 +01:00
<Note>
Note that we only support officially supported Postgres versions. You can check the version list [here](https://www.postgresql.org/support/versioning/).
</Note>
2022-10-01 20:01:49 +02:00
### Usage and Lineage considerations
When extracting lineage and usage information from Postgres we base our finding on the `pg_stat_statements` table.
You can find more information about it on the official [docs](https://www.postgresql.org/docs/current/pgstatstatements.html#id-1.11.7.39.6).
Another interesting consideration here is explained in the following SO [question](https://stackoverflow.com/questions/50803147/what-is-the-timeframe-for-pg-stat-statements).
As a summary:
- The `pg_stat_statements` has no time data embedded in it.
- It will show all queries from the last reset (one can call `pg_stat_statements_reset()`).
Then, when extracting usage and lineage data, the query log duration will have no impact, only the query limit.
2022-08-27 02:57:09 +02:00
### Python Requirements
To run the Postgres ingestion, you will need to install:
```bash
pip3 install "openmetadata-ingestion[postgres]"
```
## Metadata Ingestion
All connectors are defined as JSON Schemas.
[Here](https://github.com/open-metadata/OpenMetadata/blob/main/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/database/postgresConnection.json)
2022-08-27 02:57:09 +02:00
you can find the structure to create a connection to Postgres.
In order to create and run a Metadata Ingestion workflow, we will follow
the steps to create a YAML configuration able to connect to the source,
process the Entities if needed, and reach the OpenMetadata server.
The workflow is modeled around the following
[JSON Schema](https://github.com/open-metadata/OpenMetadata/blob/main/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/workflow.json)
2022-08-27 02:57:09 +02:00
### 1. Define the YAML Config
This is a sample config for Postgres:
```yaml
source:
type: postgres
serviceName: local_postgres
serviceConnection:
config:
type: Postgres
username: username
password: password
hostPort: localhost:5432
# database: database
sourceConfig:
config:
markDeletedTables: true
includeTables: true
includeViews: true
# includeTags: true
# databaseFilterPattern:
# includes:
# - database1
# - database2
# excludes:
# - database3
# - database4
# schemaFilterPattern:
# includes:
# - schema1
# - schema2
# excludes:
# - schema3
# - schema4
# tableFilterPattern:
# includes:
# - table1
# - table2
# excludes:
# - table3
# - table4
# For dbt, choose one of Cloud, Local, HTTP, S3 or GCS configurations
2022-08-27 02:57:09 +02:00
# dbtConfigSource:
# # For cloud
# dbtCloudAuthToken: token
# dbtCloudAccountId: ID
# # For Local
# dbtCatalogFilePath: path-to-catalog.json
# dbtManifestFilePath: path-to-manifest.json
# # For HTTP
# dbtCatalogHttpPath: http://path-to-catalog.json
# dbtManifestHttpPath: http://path-to-manifest.json
# # For S3
# dbtSecurityConfig: # These are modeled after all AWS credentials
# awsAccessKeyId: KEY
# awsSecretAccessKey: SECRET
# awsRegion: us-east-2
# dbtPrefixConfig:
# dbtBucketName: bucket
# dbtObjectPrefix: "dbt/"
# # For GCS
# dbtSecurityConfig: # These are modeled after all GCS credentials
# type: My Type
# projectId: project ID
# privateKeyId: us-east-2
# privateKey: |
# -----BEGIN PRIVATE KEY-----
# Super secret key
# -----END PRIVATE KEY-----
# clientEmail: client@mail.com
# clientId: 1234
# authUri: https://accounts.google.com/o/oauth2/auth (default)
# tokenUri: https://oauth2.googleapis.com/token (default)
# authProviderX509CertUrl: https://www.googleapis.com/oauth2/v1/certs (default)
# clientX509CertUrl: https://cert.url (URI)
# dbtPrefixConfig:
# dbtBucketName: bucket
# dbtObjectPrefix: "dbt/"
sink:
type: metadata-rest
config: {}
workflowConfig:
# loggerLevel: DEBUG # DEBUG, INFO, WARN or ERROR
2022-08-27 02:57:09 +02:00
openMetadataServerConfig:
hostPort: <OpenMetadata host and port>
authProvider: <OpenMetadata auth provider>2. Configure service settings
```
#### Source Configuration - Service Connection
- **username**: Specify the User to connect to Postgres. It should have enough privileges to read all the metadata.
- **password**: Password to connect to Postgres.
- **hostPort**: Enter the fully qualified hostname and port number for your Postgres deployment in the Host and Port field.
- **Connection Options (Optional)**: Enter the details for any additional connection options that can be sent to Postgres during the connection. These details must be added as Key-Value pairs.
2022-08-27 02:57:09 +02:00
- **Connection Arguments (Optional)**: Enter the details for any additional connection arguments such as security or protocol configs that can be sent to Postgres during the connection. These details must be added as Key-Value pairs.
- In case you are using Single-Sign-On (SSO) for authentication, add the `authenticator` details in the Connection Arguments as a Key-Value pair as follows: `"authenticator" : "sso_login_url"`
- In case you authenticate with SSO using an external browser popup, then add the `authenticator` details in the Connection Arguments as a Key-Value pair as follows: `"authenticator" : "externalbrowser"`
2022-08-27 02:57:09 +02:00
#### Source Configuration - Source Config
The `sourceConfig` is defined [here](https://github.com/open-metadata/OpenMetadata/blob/main/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/databaseServiceMetadataPipeline.json):
2022-08-27 02:57:09 +02:00
- `markDeletedTables`: To flag tables as soft-deleted if they are not present anymore in the source system.
- `includeTables`: true or false, to ingest table data. Default is true.
- `includeViews`: true or false, to ingest views definitions.
- `databaseFilterPattern`, `schemaFilterPattern`, `tableFilternPattern`: Note that the they support regex as include or exclude. E.g.,
```yaml
tableFilterPattern:
includes:
- users
- type_test
```
#### Sink Configuration
To send the metadata to OpenMetadata, it needs to be specified as `type: metadata-rest`.
#### Workflow Configuration
The main property here is the `openMetadataServerConfig`, where you can define the host and security provider of your OpenMetadata installation.
For a simple, local installation using our docker containers, this looks like:
```yaml
workflowConfig:
openMetadataServerConfig:
hostPort: 'http://localhost:8585/api'
authProvider: openmetadata
securityConfig:
jwtToken: '{bot_jwt_token}'
2022-08-27 02:57:09 +02:00
```
We support different security providers. You can find their definitions [here](https://github.com/open-metadata/OpenMetadata/tree/main/openmetadata-spec/src/main/resources/json/schema/security/client).
2022-08-27 02:57:09 +02:00
You can find the different implementation of the ingestion below.
<Collapse title="Configure SSO in the Ingestion Workflows">
### Openmetadata JWT Auth
```yaml
workflowConfig:
openMetadataServerConfig:
hostPort: 'http://localhost:8585/api'
authProvider: openmetadata
securityConfig:
jwtToken: '{bot_jwt_token}'
```
2022-08-27 02:57:09 +02:00
### Auth0 SSO
```yaml
workflowConfig:
openMetadataServerConfig:
hostPort: 'http://localhost:8585/api'
authProvider: auth0
securityConfig:
clientId: '{your_client_id}'
secretKey: '{your_client_secret}'
domain: '{your_domain}'
```
### Azure SSO
```yaml
workflowConfig:
openMetadataServerConfig:
hostPort: 'http://localhost:8585/api'
authProvider: azure
securityConfig:
clientSecret: '{your_client_secret}'
authority: '{your_authority_url}'
clientId: '{your_client_id}'
scopes:
- your_scopes
```
### Custom OIDC SSO
```yaml
workflowConfig:
openMetadataServerConfig:
hostPort: 'http://localhost:8585/api'
authProvider: custom-oidc
securityConfig:
clientId: '{your_client_id}'
secretKey: '{your_client_secret}'
domain: '{your_domain}'
```
### Google SSO
```yaml
workflowConfig:
openMetadataServerConfig:
hostPort: 'http://localhost:8585/api'
authProvider: google
securityConfig:
secretKey: '{path-to-json-creds}'
```
### Okta SSO
```yaml
workflowConfig:
openMetadataServerConfig:
hostPort: http://localhost:8585/api
authProvider: okta
securityConfig:
clientId: "{CLIENT_ID - SPA APP}"
orgURL: "{ISSUER_URL}/v1/token"
privateKey: "{public/private keypair}"
email: "{email}"
scopes:
- token
```
### Amazon Cognito SSO
The ingestion can be configured by [Enabling JWT Tokens](https://docs.open-metadata.org/deployment/security/enable-jwt-tokens)
```yaml
workflowConfig:
openMetadataServerConfig:
hostPort: 'http://localhost:8585/api'
authProvider: auth0
securityConfig:
clientId: '{your_client_id}'
secretKey: '{your_client_secret}'
domain: '{your_domain}'
```
### OneLogin SSO
Which uses Custom OIDC for the ingestion
```yaml
workflowConfig:
openMetadataServerConfig:
hostPort: 'http://localhost:8585/api'
authProvider: custom-oidc
securityConfig:
clientId: '{your_client_id}'
secretKey: '{your_client_secret}'
domain: '{your_domain}'
```
### KeyCloak SSO
Which uses Custom OIDC for the ingestion
```yaml
workflowConfig:
openMetadataServerConfig:
hostPort: 'http://localhost:8585/api'
authProvider: custom-oidc
securityConfig:
clientId: '{your_client_id}'
secretKey: '{your_client_secret}'
domain: '{your_domain}'
```
</Collapse>
### 2. Prepare the Ingestion DAG
Create a Python file in your Airflow DAGs directory with the following contents:
```python
import pathlib
import yaml
from datetime import timedelta
from airflow import DAG
try:
from airflow.operators.python import PythonOperator
except ModuleNotFoundError:
from airflow.operators.python_operator import PythonOperator
from metadata.config.common import load_config_file
from metadata.ingestion.api.workflow import Workflow
from airflow.utils.dates import days_ago
default_args = {
"owner": "user_name",
"email": ["username@org.com"],
"email_on_failure": False,
"retries": 3,
"retry_delay": timedelta(minutes=5),
"execution_timeout": timedelta(minutes=60)
}
config = """
<your YAML configuration>
"""
def metadata_ingestion_workflow():
workflow_config = yaml.safe_load(config)
workflow = Workflow.create(workflow_config)
workflow.execute()
workflow.raise_from_status()
workflow.print_status()
workflow.stop()
with DAG(
"sample_data",
default_args=default_args,
description="An example DAG which runs a OpenMetadata ingestion workflow",
start_date=days_ago(1),
is_paused_upon_creation=False,
schedule_interval='*/5 * * * *',
catchup=False,
) as dag:
ingest_task = PythonOperator(
task_id="ingest_using_recipe",
python_callable=metadata_ingestion_workflow,
)
```
Note that from connector to connector, this recipe will always be the same.
By updating the YAML configuration, you will be able to extract metadata from different sources.
## Query Usage and Lineage Ingestion
To ingest the Query Usage and Lineage information, the `serviceConnection` configuration will remain the same.
However, the `sourceConfig` is now modeled after this JSON Schema.
### 1. Define the YAML Config
This is a sample config for Postgres Usage:
```yaml
source:
type: postgres
serviceName: local_postgres
serviceConnection:
config:
type: Postgres
username: username
password: password
hostPort: localhost:5432
# database: database
sourceConfig:
config:
# Number of days to look back
queryLogDuration: 7
# This is a directory that will be DELETED after the usage runs
stageFileLocation: <path to store the stage file>
# resultLimit: 1000
# If instead of getting the query logs from the database we want to pass a file with the queries
# queryLogFilePath: path-to-file
processor:
type: query-parser
config: {}
stage:
type: table-usage
config:
filename: /tmp/postgres_usage
bulkSink:
type: metadata-usage
config:
filename: /tmp/postgres_usage
workflowConfig:
# loggerLevel: DEBUG # DEBUG, INFO, WARN or ERROR
openMetadataServerConfig:
hostPort: <OpenMetadata host and port>
authProvider: <OpenMetadata auth provider>
```
#### Source Configuration - Service Connection
You can find all the definitions and types for the `serviceConnection` [here](https://github.com/open-metadata/OpenMetadata/blob/main/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/database/postgresConnection.json).
They are the same as metadata ingestion.
#### Source Configuration - Source Config
The `sourceConfig` is defined [here](https://github.com/open-metadata/OpenMetadata/blob/main/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/databaseServiceQueryUsagePipeline.json).
- `queryLogDuration`: Configuration to tune how far we want to look back in query logs to process usage data.
- `resultLimit`: Configuration to set the limit for query logs
#### Processor, Stage and Bulk Sink
To specify where the staging files will be located.
Note that the location is a directory that will be cleaned at the end of the ingestion.
#### Workflow Configuration
The same as the metadata ingestion.
### 2. Run with the CLI
There is an extra requirement to run the Usage pipelines. You will need to install:
```bash
pip3 install --upgrade 'openmetadata-ingestion[postgres]'
```
For the usage workflow creation, the Airflow file will look the same as for the metadata ingestion. Updating the YAML configuration will be enough.
## Data Profiler
2022-08-27 02:57:09 +02:00
The Data Profiler workflow will be using the `orm-profiler` processor.
While the `serviceConnection` will still be the same to reach the source system, the `sourceConfig` will be
updated from previous configurations.
### 1. Define the YAML Config
This is a sample config for the profiler:
```yaml
source:
type: postgres
serviceName: local_postgres
serviceConnection:
config:
type: Postgres
username: username
password: password
hostPort: localhost:5432
# database: database
sourceConfig:
config:
type: Profiler
# generateSampleData: true
# profileSample: 85
# threadCount: 5 (default)
# databaseFilterPattern:
# includes:
# - database1
# - database2
# excludes:
# - database3
# - database4
# schemaFilterPattern:
# includes:
# - schema1
# - schema2
# excludes:
# - schema3
# - schema4
# tableFilterPattern:
# includes:
# - table1
# - table2
# excludes:
# - table3
# - table4
processor:
type: orm-profiler
config: {} # Remove braces if adding properties
# tableConfig:
# - fullyQualifiedName: <table fqn>
2022-10-13 16:53:00 +02:00
# profileSample: <number between 0 and 99> # default will be 100 if omitted
# profileQuery: <query to use for sampling data for the profiler>
2022-08-27 02:57:09 +02:00
# columnConfig:
# excludeColumns:
# - <column name>
# includeColumns:
# - columnName: <column name>
# - metrics:
# - MEAN
# - MEDIAN
# - ...
sink:
type: metadata-rest
config: {}
workflowConfig:
# loggerLevel: DEBUG # DEBUG, INFO, WARN or ERROR
2022-08-27 02:57:09 +02:00
openMetadataServerConfig:
hostPort: <OpenMetadata host and port>
authProvider: <OpenMetadata auth provider>
```
#### Source Configuration
- You can find all the definitions and types for the `serviceConnection` [here](https://github.com/open-metadata/OpenMetadata/blob/main/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/database/postgresConnection.json).
- The `sourceConfig` is defined [here](https://github.com/open-metadata/OpenMetadata/blob/main/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/databaseServiceProfilerPipeline.json).
2022-08-27 02:57:09 +02:00
Note that the filter patterns support regex as includes or excludes. E.g.,
```yaml
tableFilterPattern:
includes:
- *users$
```
#### Processor
Choose the `orm-profiler`. Its config can also be updated to define tests from the YAML itself instead of the UI:
```yaml
processor:
type: orm-profiler
config:
tableConfig:
- fullyQualifiedName: <table fqn>
profileSample: <number between 0 and 99>
2022-10-13 16:53:00 +02:00
partitionConfig:
partitionField: <field to use as a partition field>
partitionQueryDuration: <for date/datetime partitioning based set the offset from today>
partitionValues: <values to uses as a predicate for the query>
profileQuery: <query to use for sampling data for the profiler>
2022-08-27 02:57:09 +02:00
columnConfig:
excludeColumns:
- <column name>
includeColumns:
- columnName: <column name>
- metrics:
- MEAN
- MEDIAN
- ...
```
`tableConfig` allows you to set up some configuration at the table level.
All the properties are optional. `metrics` should be one of the metrics listed [here](https://docs.open-metadata.org/openmetadata/ingestion/workflows/profiler/metrics)
#### Workflow Configuration
The same as the metadata ingestion.
### 2. Prepare the Profiler DAG
Here, we follow a similar approach as with the metadata and usage pipelines, although we will use a different Workflow class:
```python
import yaml
from datetime import timedelta
from airflow import DAG
try:
from airflow.operators.python import PythonOperator
except ModuleNotFoundError:
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
from metadata.orm_profiler.api.workflow import ProfilerWorkflow
default_args = {
"owner": "user_name",
"email_on_failure": False,
"retries": 3,
"retry_delay": timedelta(seconds=10),
"execution_timeout": timedelta(minutes=60),
}
config = """
<your YAML configuration>
"""
def metadata_ingestion_workflow():
workflow_config = yaml.safe_load(config)
workflow = ProfilerWorkflow.create(workflow_config)
workflow.execute()
workflow.raise_from_status()
workflow.print_status()
workflow.stop()
with DAG(
"profiler_example",
default_args=default_args,
description="An example DAG which runs a OpenMetadata ingestion workflow",
start_date=days_ago(1),
is_paused_upon_creation=False,
catchup=False,
) as dag:
ingest_task = PythonOperator(
task_id="profile_and_test_using_recipe",
python_callable=metadata_ingestion_workflow,
)
```
## dbt Integration
2022-08-27 02:57:09 +02:00
You can learn more about how to ingest dbt models' definitions and their lineage [here](/connectors/ingestion/workflows/dbt).