Prepare Ingestion Base Docker image (#8065)

Prepare Ingestion Base Docker image (#8065)
This commit is contained in:
Pere Miquel Brull 2022-10-11 07:50:49 +02:00 committed by GitHub
parent 801012d2a6
commit 339abc5bf3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 370 additions and 6 deletions

View File

@ -0,0 +1,67 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
name: docker-openmetadata-ingestion-base docker
on:
workflow_dispatch:
inputs:
tag:
description: "Input tag"
required: true
release:
types: [published]
jobs:
push_to_docker_hub:
runs-on: ubuntu-latest
env:
input: ${{ github.event.inputs.tag }}
steps:
- name: Check trigger type
if: ${{ env.input == '' }}
run: echo "input=0.12.0" >> $GITHUB_ENV
- name: Check out the Repo
uses: actions/checkout@v2
- name: Set up QEMU
uses: docker/setup-qemu-action@v1
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v1
- name: Login to DockerHub
uses: docker/login-action@v1
with:
username: ${{ secrets.DOCKERHUB_OPENMETADATA_USERNAME }}
password: ${{ secrets.DOCKERHUB_OPENMETADATA_TOKEN }}
- name: Install Ubuntu dependencies
run: |
sudo apt-get install -y python3-venv
- name: Install open-metadata dependencies
run: |
python3 -m venv env
source env/bin/activate
sudo make install_antlr_cli
make install_dev generate
- name: Build and push
uses: docker/build-push-action@v2
with:
context: .
platforms: linux/amd64,linux/arm64
push: ${{ github.event_name == 'release' }}
# Update tags before every release
tags: 'openmetadata/ingestion-base:${{ env.input }},openmetadata/ingestion-base:latest'
file: ./ingestion/operators/docker/Dockerfile

View File

@ -118,12 +118,6 @@ publish: ## Publish the ingestion module to PyPI
twine check dist/*; \
twine upload dist/*
## Docker operators
.PHONY: build_docker_base
build_docker_base: ## Build the base Docker image for the Ingestion Framework Sources
$(MAKE) install_dev generate
docker build -f ingestion/connectors/Dockerfile-base ingestion/ -t openmetadata/ingestion-connector-base
.PHONY: build_docker_connectors
build_docker_connectors: ## Build all Ingestion Framework Sources Images to be used as Docker Operators in Airflow
@echo "Building Docker connectors. Make sure to run build_docker_base first"
@ -269,3 +263,9 @@ export-snyk-html-report: ## export json file from security-report/ to HTML
@echo "Reading all results"
npm install snyk-to-html -g
ls security-report | xargs -I % snyk-to-html -i security-report/% -o security-report/%.html
# Ingestion Operators
.PHONY: build-ingestion-base-local
build-ingestion-base-local: ## Builds the ingestion DEV docker operator with the local ingestion files
$(MAKE) install_dev generate
docker build -f ingestion/operators/docker/Dockerfile-dev . -t openmetadata/ingestion-base:local

View File

@ -181,6 +181,7 @@ services:
- ingestion-volume-dag-airflow:/opt/airflow/dag_generated_configs
- ingestion-volume-dags:/opt/airflow/dags
- ingestion-volume-tmp:/tmp
- /var/run/docker.sock:/var/run/docker.sock:z # Need 600 permissions to run DockerOperator
networks:
local_app_net:

View File

@ -184,6 +184,7 @@ services:
- ingestion-volume-dag-airflow:/opt/airflow/dag_generated_configs
- ingestion-volume-dags:/opt/airflow/dags
- ingestion-volume-tmp:/tmp
- /var/run/docker.sock:/var/run/docker.sock:z # Need 600 permissions to run DockerOperator
networks:

View File

@ -0,0 +1,68 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
You can run this DAG from the default OM installation
"""
from datetime import datetime
from airflow import models
from airflow.providers.docker.operators.docker import DockerOperator
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
PipelineType,
)
config = """
source:
type: mysql
serviceName: local_mysql
serviceConnection:
config:
type: Mysql
username: openmetadata_user
password: openmetadata_password
hostPort: localhost:3306
databaseSchema: openmetadata_db
connectionOptions: {}
connectionArguments: {}
sourceConfig:
config:
type: DatabaseMetadata
sink:
type: metadata-rest
config: {}
workflowConfig:
openMetadataServerConfig:
hostPort: http://localhost:8585/api
authProvider: openmetadata
securityConfig:
jwtToken: "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"
"""
with models.DAG(
"ingestion-docker-operator",
schedule_interval="@once",
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["OpenMetadata"],
) as dag:
DockerOperator(
command="python main.py",
image="openmetadata/ingestion-base:local",
environment={"config": config, "pipelineType": PipelineType.metadata.value},
docker_url="unix://var/run/docker.sock", # To allow to start Docker. Needs chmod 666 permissions
tty=True,
auto_remove="True",
network_mode="host", # To reach the OM server
task_id="ingest",
dag=dag,
)

View File

@ -39,6 +39,7 @@ airflow users create \
(sleep 5; airflow db upgrade)
(sleep 5; airflow db upgrade)
# we need to this in case the container is restarted and the scheduler exited without tidying up its lock file
rm -f /opt/airflow/airflow-webserver-monitor.pid
airflow webserver --port 8080 -D &

View File

@ -0,0 +1,3 @@
# Ingestion Operators
Directory containing the required files and scripts to handle Airflow Operators.

View File

@ -0,0 +1,57 @@
FROM python:3.9-buster
# Install Dependencies (listed in alphabetical order)
RUN apt-get update \
&& apt-get install -y build-essential \
ca-certificates \
default-libmysqlclient-dev \
freetds-bin \
freetds-dev \
gcc \
gnupg \
libevent-dev \
libffi-dev \
libpq-dev \
libsasl2-dev \
libsasl2-modules \
libssl-dev \
libxml2 \
openjdk-11-jre \
openssl \
postgresql \
postgresql-contrib \
tdsodbc \
unixodbc \
unixodbc-dev \
wget --no-install-recommends
# Prep to install msodbcsql18
RUN apt-get update && \
apt-get install -y apt-transport-https && \
curl https://packages.microsoft.com/keys/microsoft.asc | apt-key add - && \
curl https://packages.microsoft.com/config/debian/11/prod.list > /etc/apt/sources.list.d/mssql-release.list && \
apt-get update && \
ACCEPT_EULA=Y apt-get install msodbcsql18 unixodbc-dev -y
# Prep to install confluent-kafka https://github.com/confluentinc/confluent-kafka-python/issues/1326
RUN apt-get update && \
apt-get install -y --no-install-recommends git g++ make && \
cd /tmp && git clone https://github.com/edenhill/librdkafka.git && \
cd librdkafka && git checkout tags/v1.9.0 && \
./configure && make && make install && \
cd ../ && rm -rf librdkafka
WORKDIR ingestion/
# Required for Airflow DockerOperator, as we need to run the workflows from a `python main.py` command in the container.
COPY ingestion/operators/docker/main.py .
RUN pip install --upgrade pip
ARG INGESTION_DEPENDENCY="all"
RUN pip install --upgrade openmetadata-ingestion[${INGESTION_DEPENDENCY}]
# Uninstalling psycopg2-binary and installing psycopg2 instead
# because the psycopg2-binary generates a architecture specific error
# while authrenticating connection with the airflow, psycopg2 solves this error
RUN pip uninstall psycopg2-binary -y
RUN pip install psycopg2 mysqlclient

View File

@ -0,0 +1,63 @@
FROM python:3.9-buster
# Install Dependencies (listed in alphabetical order)
RUN apt-get update \
&& apt-get install -y build-essential \
ca-certificates \
default-libmysqlclient-dev \
freetds-bin \
freetds-dev \
gcc \
gnupg \
libevent-dev \
libffi-dev \
libpq-dev \
libsasl2-dev \
libsasl2-modules \
libssl-dev \
libxml2 \
openjdk-11-jre \
openssl \
postgresql \
postgresql-contrib \
tdsodbc \
unixodbc \
unixodbc-dev \
wget --no-install-recommends
# Prep to install msodbcsql18
RUN apt-get update && \
apt-get install -y apt-transport-https && \
curl https://packages.microsoft.com/keys/microsoft.asc | apt-key add - && \
curl https://packages.microsoft.com/config/debian/11/prod.list > /etc/apt/sources.list.d/mssql-release.list && \
apt-get update && \
ACCEPT_EULA=Y apt-get install msodbcsql18 unixodbc-dev -y
# Prep to install confluent-kafka https://github.com/confluentinc/confluent-kafka-python/issues/1326
RUN apt-get update && \
apt-get install -y --no-install-recommends git g++ make && \
cd /tmp && git clone https://github.com/edenhill/librdkafka.git && \
cd librdkafka && git checkout tags/v1.9.0 && \
./configure && make && make install && \
cd ../ && rm -rf librdkafka
WORKDIR ingestion/
# Only copy the necessary source files to execute Workflows
COPY ingestion/src/ src/
COPY ingestion/setup.* ./
COPY ingestion/README.md .
# Required for Airflow DockerOperator, as we need to run the workflows from a `python main.py` command in the container.
COPY ingestion/operators/docker/main.py .
RUN pip install --upgrade pip
ARG INGESTION_DEPENDENCY="all"
RUN pip install --upgrade ".[${INGESTION_DEPENDENCY}]"
# Uninstalling psycopg2-binary and installing psycopg2 instead
# because the psycopg2-binary generates a architecture specific error
# while authrenticating connection with the airflow, psycopg2 solves this error
RUN pip uninstall psycopg2-binary -y
RUN pip install psycopg2 mysqlclient

View File

@ -0,0 +1,63 @@
# OpenMetadata Ingestion Docker Operator
Utilities required to handle metadata ingestion in Airflow using `DockerOperator`.
The whole idea behind this approach is to avoid having to install packages directly
in any Airflow host, as this adds many (unnecessary) constraints to be aligned
on the `openmetadata-ingestion` package just to have the Python installation
as a `virtualenv` within the Airflow host.
The proposed solution - or alternative approach - is to use the
[DockerOperator](https://airflow.apache.org/docs/apache-airflow-providers-docker/stable/_api/airflow/providers/docker/operators/docker/index.html)
and run the ingestion workflows dynamically.
This requires the following:
- Docker image with the bare `openmetadata-ingestion` requirements,
- `main.py` file to execute the `Workflow`s,
- Handling of environment variables as input parameters for the operator.
Note that Airflow's Docker Operator works as follows (example from [here](https://github.com/apache/airflow/blob/providers-docker/3.0.0/tests/system/providers/docker/example_docker.py)):
```python
DockerOperator(
docker_url='unix://var/run/docker.sock', # Set your docker URL
command='/bin/sleep 30',
image='centos:latest',
network_mode='bridge',
task_id='docker_op_tester',
dag=dag,
)
```
We need to provide as ingredients:
1. Docker image to execute,
2. And command to run.
This is not a Python-first approach, and therefore it is not allowing us to set a base image and pass a Python function
as a parameter (which would have been the preferred approach). Instead, we will leverage the `environment` input
parameter of the `DockerOperator` and pass all the necessary information in there.
Our `main.py` Python file will then be in charge of:
1. Loading the workflow configuration from the environment variables,
2. Get the required workflow class to run and finally,
3. Execute the workflow.
To try this locally, you can build the DEV image with `make build-ingestion-base-local` from the project root.
## Further improvements
We have two operator to leverage if we don't want to run the ingestion from Airflow's host environment:
```python
from airflow.providers.docker.operators.docker import DockerOperator
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
```
Which can be installed from `apache-airflow[docker]` and `apache-airflow[kubernetes]` respectively.
If we want to handle both of these directly on the `openmetadata-managed-apis` we need to consider a couple of things:
1. `DockerOperator` will only work with Docker and `KubernetesPodOperator` will only work with a k8s cluster. This means
that we'll need to dynamically handle the internal logic to use either of them depending on the deployment.
[Docs](https://airflow.apache.org/docs/apache-airflow-providers-cncf-kubernetes/stable/operators.html).
2. For GKE deployment things get a bit more complicated, as we'll need to use and test yet another operator
custom-built for GKE: `GKEStartPodOperator`. [Docs](https://airflow.apache.org/docs/apache-airflow-providers-google/stable/operators/cloud/kubernetes_engine.html#howto-operator-gkestartpodoperator)

View File

@ -0,0 +1,40 @@
import os
import yaml
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
PipelineType,
)
from metadata.ingestion.api.workflow import Workflow
from metadata.orm_profiler.api.workflow import ProfilerWorkflow
from metadata.test_suite.api.workflow import TestSuiteWorkflow
WORKFLOW_MAP = {
PipelineType.metadata.value: Workflow,
PipelineType.usage.value: Workflow,
PipelineType.lineage.value: Workflow,
PipelineType.profiler.value: ProfilerWorkflow,
PipelineType.TestSuite.value: TestSuiteWorkflow,
}
def main():
# DockerOperator expects an env var called config
config = os.environ["config"]
pipeline_type = os.environ["pipelineType"]
workflow_class = WORKFLOW_MAP.get(pipeline_type)
if workflow_class is None:
raise ValueError(f"Missing workflow_class loaded from {pipeline_type}")
# Load the config string representation
workflow_config = yaml.safe_load(config)
workflow = workflow_class.create(workflow_config)
workflow.execute()
workflow.raise_from_status()
workflow.print_status()
workflow.stop()
if __name__ == "__main__":
main()