
* DOCS - Prepare 1.7 Release and 1.8 SNAPSHOT * DOCS - Prepare 1.7 Release and 1.8 SNAPSHOT
11 KiB
title | slug |
---|---|
Run the MongoDB Connector Externally | /connectors/database/mongodb/yaml |
{% connectorDetailsHeader name="MongoDB" stage="PROD" platform="OpenMetadata" availableFeatures=["Metadata", "Data Profiler", "Sample Data"] unavailableFeatures=["Query Usage", "Data Quality", "dbt", "Owners", "Lineage", "Column-level Lineage", "Tags", "Stored Procedures"] / %}
In this section, we provide guides and references to use the MongoDB connector.
Configure and schedule MongoDB metadata workflows from the OpenMetadata UI:
{% partial file="/v1.8/connectors/ingestion-modes-tiles.md" variables={yamlPath: "/connectors/database/mongodb/yaml"} /%}
{% partial file="/v1.8/connectors/external-ingestion-deployment.md" /%}
Requirements
To fetch the metadata from MongoDB to OpenMetadata, the MongoDB user must have access to perform find
operation on collection and listCollection
operations on database available in MongoDB.
Python Requirements
{% partial file="/v1.8/connectors/python-requirements.md" /%}
To run the MongoDB ingestion, you will need to install:
pip3 install "openmetadata-ingestion[mongo]"
Metadata Ingestion
All connectors are defined as JSON Schemas. Here you can find the structure to create a connection to MongoDB.
In order to create and run a Metadata Ingestion workflow, we will follow the steps to create a YAML configuration able to connect to the source, process the Entities if needed, and reach the OpenMetadata server.
The workflow is modeled around the following JSON Schema
1. Define the YAML Config
This is a sample config for MongoDB:
{% codePreview %}
{% codeInfoContainer %}
Source Configuration - Service Connection
{% codeInfo srNumber=1 %}
username: Username to connect to Mongodb. This user must have access to perform find
operation on collection and listCollection
operations on database available in MongoDB.
{% /codeInfo %}
{% codeInfo srNumber=2 %}
password: Password to connect to MongoDB.
{% /codeInfo %}
{% codeInfo srNumber=3 %}
hostPort: When using the mongodb
connecion schema, the hostPort parameter specifies the host and port of the MongoDB. This should be specified as a string in the format hostname:port
. E.g., localhost:27017
. When using the mongodb+srv
connection schema, the hostPort parameter specifies the host and port of the MongoDB. This should be specified as a string in the format hostname
. E.g., cluster0-abcde.mongodb.net
.
Using Atlas? Follow this guide to get the connection string.
{% /codeInfo %}
{% codeInfo srNumber=6 %}
databaseName: Optional name to give to the database in OpenMetadata. If left blank, we will use default as the database name.
{% /codeInfo %}
{% partial file="/v1.8/connectors/yaml/database/source-config-def.md" /%}
{% partial file="/v1.8/connectors/yaml/ingestion-sink-def.md" /%}
{% partial file="/v1.8/connectors/yaml/workflow-config-def.md" /%}
Advanced Configuration
{% codeInfo srNumber=7 %}
Connection Options (Optional): Enter the details for any additional connection options that can be sent to database during the connection. These details must be added as Key-Value pairs.
{% /codeInfo %}
{% /codeInfoContainer %}
{% codeBlock fileName="filename.yaml" %}
source:
type: mongodb
serviceName: local_mongodb
serviceConnection:
config:
type: MongoDB
username: username
password: password
hostPort: localhost:27017
# connectionOptions:
# key: value
databaseName: custom_database_name
{% partial file="/v1.8/connectors/yaml/database/source-config.md" /%}
{% partial file="/v1.8/connectors/yaml/ingestion-sink.md" /%}
{% partial file="/v1.8/connectors/yaml/workflow-config.md" /%}
{% /codeBlock %}
{% /codePreview %}
{% partial file="/v1.8/connectors/yaml/ingestion-cli.md" /%}
Data Profiler
The Data Profiler workflow will be using the orm-profiler
processor.
After running a Metadata Ingestion workflow, we can run Data Profiler workflow.
While the serviceName
will be the same to that was used in Metadata Ingestion, so the ingestion bot can get the serviceConnection
details from the server.
Limitations
The MongodDB data profiler current supports only the following features:
- Row count: The number of rows in the collection. Sampling or custom query is not supported.
- Sample data: If a custom query is defined it will be used for sample data.
1. Define the YAML Config
This is a sample config for the profiler:
{% codePreview %}
{% codeInfoContainer %}
{% codeInfo srNumber=13 %}
Source Configuration - Source Config
You can find all the definitions and types for the sourceConfig
here.
generateSampleData: Option to turn on/off generating sample data.
{% /codeInfo %}
{% codeInfo srNumber=16 %}
processPiiSensitive: Optional configuration to automatically tag columns that might contain sensitive information.
{% /codeInfo %}
{% codeInfo srNumber=18 %}
timeoutSeconds: Profiler Timeout in Seconds
{% /codeInfo %}
{% codeInfo srNumber=20 %}
schemaFilterPattern: Regex to only fetch tables or databases that matches the pattern.
{% /codeInfo %}
{% codeInfo srNumber=21 %}
tableFilterPattern: Regex to only fetch tables or databases that matches the pattern.
{% /codeInfo %}
{% codeInfo srNumber=22 %}
Processor Configuration
Choose the orm-profiler
. Its config can also be updated to define tests from the YAML itself instead of the UI:
tableConfig: tableConfig
allows you to set up some configuration at the table level.
{% /codeInfo %}
{% codeInfo srNumber=23 %}
Sink Configuration
To send the metadata to OpenMetadata, it needs to be specified as type: metadata-rest
.
{% /codeInfo %}
{% codeInfo srNumber=24 %}
Workflow Configuration
The main property here is the openMetadataServerConfig
, where you can define the host and security provider of your OpenMetadata installation.
For a simple, local installation using our docker containers, this looks like:
{% /codeInfo %}
{% /codeInfoContainer %}
{% codeBlock fileName="filename.yaml" %}
source:
type: monogodb
serviceName: local_mongodb
sourceConfig:
config:
type: Profiler
generateSampleData: true
processPiiSensitive: false
# timeoutSeconds: 43200
# schemaFilterPattern:
# includes:
# - schema1
# - schema2
# excludes:
# - schema3
# - schema4
# tableFilterPattern:
# includes:
# - table1
# - table2
# excludes:
# - table3
# - table4
processor:
type: orm-profiler
config: {} # Remove braces if adding properties
# tableConfig:
# - fullyQualifiedName: <table fqn>
# profileQuery: <query to use for fetching the sample data>
sink:
type: metadata-rest
config: {}
workflowConfig:
# loggerLevel: DEBUG # DEBUG, INFO, WARN or ERROR
openMetadataServerConfig:
hostPort: <OpenMetadata host and port>
authProvider: <OpenMetadata auth provider>
{% /codeBlock %}
{% /codePreview %}
- You can learn more about how to configure and run the Profiler Workflow to extract Profiler data and execute the Data Quality from here
2. Prepare the Profiler DAG
Here, we follow a similar approach as with the metadata and usage pipelines, although we will use a different Workflow class:
{% codePreview %}
{% codeInfoContainer %}
{% codeInfo srNumber=25 %}
Import necessary modules
The ProfilerWorkflow
class that is being imported is a part of a metadata orm_profiler framework, which defines a process of extracting Profiler data.
Here we are also importing all the basic requirements to parse YAMLs, handle dates and build our DAG.
{% /codeInfo %}
{% codeInfo srNumber=26 %}
Default arguments for all tasks in the Airflow DAG.
- Default arguments dictionary contains default arguments for tasks in the DAG, including the owner's name, email address, number of retries, retry delay, and execution timeout.
{% /codeInfo %}
{% codeInfo srNumber=27 %}
- config: Specifies config for the profiler as we prepare above.
{% /codeInfo %}
{% codeInfo srNumber=28 %}
- metadata_ingestion_workflow(): This code defines a function
metadata_ingestion_workflow()
that loads a YAML configuration, creates aProfilerWorkflow
object, executes the workflow, checks its status, prints the status to the console, and stops the workflow.
{% /codeInfo %}
{% codeInfo srNumber=29 %}
- DAG: creates a DAG using the Airflow framework, and tune the DAG configurations to whatever fits with your requirements
- For more Airflow DAGs creation details visit here.
{% /codeInfo %}
{% /codeInfoContainer %}
{% codeBlock fileName="filename.py" %}
import yaml
from datetime import timedelta
from airflow import DAG
from metadata.workflow.profiler import ProfilerWorkflow
try:
from airflow.operators.python import PythonOperator
except ModuleNotFoundError:
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
default_args = {
"owner": "user_name",
"email_on_failure": False,
"retries": 3,
"retry_delay": timedelta(seconds=10),
"execution_timeout": timedelta(minutes=60),
}
config = """
<your YAML configuration>
"""
def metadata_ingestion_workflow():
workflow_config = yaml.safe_load(config)
workflow = ProfilerWorkflow.create(workflow_config)
workflow.execute()
workflow.raise_from_status()
workflow.print_status()
workflow.stop()
with DAG(
"profiler_example",
default_args=default_args,
description="An example DAG which runs a OpenMetadata ingestion workflow",
start_date=days_ago(1),
is_paused_upon_creation=False,
catchup=False,
) as dag:
ingest_task = PythonOperator(
task_id="profile_and_test_using_recipe",
python_callable=metadata_ingestion_workflow,
)
{% /codeBlock %}
{% /codePreview %}
dbt Integration
{% tilesContainer %}
{% tile icon="mediation" title="dbt Integration" description="Learn more about how to ingest dbt models' definitions and their lineage." link="/connectors/ingestion/workflows/dbt" /%}
{% /tilesContainer %}