diff --git a/.github/workflows/docker-openmetadata-ingestion-base.yml b/.github/workflows/docker-openmetadata-ingestion-base.yml new file mode 100644 index 00000000000..80adec974b3 --- /dev/null +++ b/.github/workflows/docker-openmetadata-ingestion-base.yml @@ -0,0 +1,67 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name: docker-openmetadata-ingestion-base docker +on: + workflow_dispatch: + inputs: + tag: + description: "Input tag" + required: true + release: + types: [published] + +jobs: + push_to_docker_hub: + runs-on: ubuntu-latest + env: + input: ${{ github.event.inputs.tag }} + + steps: + - name: Check trigger type + if: ${{ env.input == '' }} + run: echo "input=0.12.0" >> $GITHUB_ENV + + - name: Check out the Repo + uses: actions/checkout@v2 + + - name: Set up QEMU + uses: docker/setup-qemu-action@v1 + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v1 + + - name: Login to DockerHub + uses: docker/login-action@v1 + with: + username: ${{ secrets.DOCKERHUB_OPENMETADATA_USERNAME }} + password: ${{ secrets.DOCKERHUB_OPENMETADATA_TOKEN }} + + - name: Install Ubuntu dependencies + run: | + sudo apt-get install -y python3-venv + + - name: Install open-metadata dependencies + run: | + python3 -m venv env + source env/bin/activate + sudo make install_antlr_cli + make install_dev generate + + - name: Build and push + uses: docker/build-push-action@v2 + with: + context: . + platforms: linux/amd64,linux/arm64 + push: ${{ github.event_name == 'release' }} + # Update tags before every release + tags: 'openmetadata/ingestion-base:${{ env.input }},openmetadata/ingestion-base:latest' + file: ./ingestion/operators/docker/Dockerfile diff --git a/Makefile b/Makefile index 542012b070d..9cd11f1d7be 100644 --- a/Makefile +++ b/Makefile @@ -118,12 +118,6 @@ publish: ## Publish the ingestion module to PyPI twine check dist/*; \ twine upload dist/* -## Docker operators -.PHONY: build_docker_base -build_docker_base: ## Build the base Docker image for the Ingestion Framework Sources - $(MAKE) install_dev generate - docker build -f ingestion/connectors/Dockerfile-base ingestion/ -t openmetadata/ingestion-connector-base - .PHONY: build_docker_connectors build_docker_connectors: ## Build all Ingestion Framework Sources Images to be used as Docker Operators in Airflow @echo "Building Docker connectors. Make sure to run build_docker_base first" @@ -269,3 +263,9 @@ export-snyk-html-report: ## export json file from security-report/ to HTML @echo "Reading all results" npm install snyk-to-html -g ls security-report | xargs -I % snyk-to-html -i security-report/% -o security-report/%.html + +# Ingestion Operators +.PHONY: build-ingestion-base-local +build-ingestion-base-local: ## Builds the ingestion DEV docker operator with the local ingestion files + $(MAKE) install_dev generate + docker build -f ingestion/operators/docker/Dockerfile-dev . -t openmetadata/ingestion-base:local diff --git a/docker/local-metadata/docker-compose-postgres.yml b/docker/local-metadata/docker-compose-postgres.yml index 3d74820db93..59828b36fc3 100644 --- a/docker/local-metadata/docker-compose-postgres.yml +++ b/docker/local-metadata/docker-compose-postgres.yml @@ -181,6 +181,7 @@ services: - ingestion-volume-dag-airflow:/opt/airflow/dag_generated_configs - ingestion-volume-dags:/opt/airflow/dags - ingestion-volume-tmp:/tmp + - /var/run/docker.sock:/var/run/docker.sock:z # Need 600 permissions to run DockerOperator networks: local_app_net: diff --git a/docker/local-metadata/docker-compose.yml b/docker/local-metadata/docker-compose.yml index b25af161493..c8f47e3e62d 100644 --- a/docker/local-metadata/docker-compose.yml +++ b/docker/local-metadata/docker-compose.yml @@ -184,6 +184,7 @@ services: - ingestion-volume-dag-airflow:/opt/airflow/dag_generated_configs - ingestion-volume-dags:/opt/airflow/dags - ingestion-volume-tmp:/tmp + - /var/run/docker.sock:/var/run/docker.sock:z # Need 600 permissions to run DockerOperator networks: diff --git a/ingestion/examples/airflow/dags/airflow_docker_operator.py b/ingestion/examples/airflow/dags/airflow_docker_operator.py new file mode 100644 index 00000000000..4c7f45ebc63 --- /dev/null +++ b/ingestion/examples/airflow/dags/airflow_docker_operator.py @@ -0,0 +1,68 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +You can run this DAG from the default OM installation +""" +from datetime import datetime + +from airflow import models +from airflow.providers.docker.operators.docker import DockerOperator + +from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( + PipelineType, +) + +config = """ +source: + type: mysql + serviceName: local_mysql + serviceConnection: + config: + type: Mysql + username: openmetadata_user + password: openmetadata_password + hostPort: localhost:3306 + databaseSchema: openmetadata_db + connectionOptions: {} + connectionArguments: {} + sourceConfig: + config: + type: DatabaseMetadata +sink: + type: metadata-rest + config: {} +workflowConfig: + openMetadataServerConfig: + hostPort: http://localhost:8585/api + authProvider: openmetadata + securityConfig: + jwtToken: "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg" +""" + + +with models.DAG( + "ingestion-docker-operator", + schedule_interval="@once", + start_date=datetime(2021, 1, 1), + catchup=False, + tags=["OpenMetadata"], +) as dag: + DockerOperator( + command="python main.py", + image="openmetadata/ingestion-base:local", + environment={"config": config, "pipelineType": PipelineType.metadata.value}, + docker_url="unix://var/run/docker.sock", # To allow to start Docker. Needs chmod 666 permissions + tty=True, + auto_remove="True", + network_mode="host", # To reach the OM server + task_id="ingest", + dag=dag, + ) diff --git a/ingestion/ingestion_dependency.sh b/ingestion/ingestion_dependency.sh index 22458e09a3d..5b2b78d0d29 100755 --- a/ingestion/ingestion_dependency.sh +++ b/ingestion/ingestion_dependency.sh @@ -39,6 +39,7 @@ airflow users create \ (sleep 5; airflow db upgrade) (sleep 5; airflow db upgrade) + # we need to this in case the container is restarted and the scheduler exited without tidying up its lock file rm -f /opt/airflow/airflow-webserver-monitor.pid airflow webserver --port 8080 -D & diff --git a/ingestion/operators/README.md b/ingestion/operators/README.md new file mode 100644 index 00000000000..a052500d82d --- /dev/null +++ b/ingestion/operators/README.md @@ -0,0 +1,3 @@ +# Ingestion Operators + +Directory containing the required files and scripts to handle Airflow Operators. diff --git a/ingestion/operators/docker/Dockerfile b/ingestion/operators/docker/Dockerfile new file mode 100644 index 00000000000..c165d8d02e9 --- /dev/null +++ b/ingestion/operators/docker/Dockerfile @@ -0,0 +1,57 @@ +FROM python:3.9-buster + +# Install Dependencies (listed in alphabetical order) +RUN apt-get update \ + && apt-get install -y build-essential \ + ca-certificates \ + default-libmysqlclient-dev \ + freetds-bin \ + freetds-dev \ + gcc \ + gnupg \ + libevent-dev \ + libffi-dev \ + libpq-dev \ + libsasl2-dev \ + libsasl2-modules \ + libssl-dev \ + libxml2 \ + openjdk-11-jre \ + openssl \ + postgresql \ + postgresql-contrib \ + tdsodbc \ + unixodbc \ + unixodbc-dev \ + wget --no-install-recommends + +# Prep to install msodbcsql18 +RUN apt-get update && \ + apt-get install -y apt-transport-https && \ + curl https://packages.microsoft.com/keys/microsoft.asc | apt-key add - && \ + curl https://packages.microsoft.com/config/debian/11/prod.list > /etc/apt/sources.list.d/mssql-release.list && \ + apt-get update && \ + ACCEPT_EULA=Y apt-get install msodbcsql18 unixodbc-dev -y + +# Prep to install confluent-kafka https://github.com/confluentinc/confluent-kafka-python/issues/1326 +RUN apt-get update && \ + apt-get install -y --no-install-recommends git g++ make && \ + cd /tmp && git clone https://github.com/edenhill/librdkafka.git && \ + cd librdkafka && git checkout tags/v1.9.0 && \ + ./configure && make && make install && \ + cd ../ && rm -rf librdkafka + +WORKDIR ingestion/ + +# Required for Airflow DockerOperator, as we need to run the workflows from a `python main.py` command in the container. +COPY ingestion/operators/docker/main.py . + +RUN pip install --upgrade pip + +ARG INGESTION_DEPENDENCY="all" +RUN pip install --upgrade openmetadata-ingestion[${INGESTION_DEPENDENCY}] +# Uninstalling psycopg2-binary and installing psycopg2 instead +# because the psycopg2-binary generates a architecture specific error +# while authrenticating connection with the airflow, psycopg2 solves this error +RUN pip uninstall psycopg2-binary -y +RUN pip install psycopg2 mysqlclient diff --git a/ingestion/operators/docker/Dockerfile-dev b/ingestion/operators/docker/Dockerfile-dev new file mode 100644 index 00000000000..8224c32c11e --- /dev/null +++ b/ingestion/operators/docker/Dockerfile-dev @@ -0,0 +1,63 @@ +FROM python:3.9-buster + +# Install Dependencies (listed in alphabetical order) +RUN apt-get update \ + && apt-get install -y build-essential \ + ca-certificates \ + default-libmysqlclient-dev \ + freetds-bin \ + freetds-dev \ + gcc \ + gnupg \ + libevent-dev \ + libffi-dev \ + libpq-dev \ + libsasl2-dev \ + libsasl2-modules \ + libssl-dev \ + libxml2 \ + openjdk-11-jre \ + openssl \ + postgresql \ + postgresql-contrib \ + tdsodbc \ + unixodbc \ + unixodbc-dev \ + wget --no-install-recommends + +# Prep to install msodbcsql18 +RUN apt-get update && \ + apt-get install -y apt-transport-https && \ + curl https://packages.microsoft.com/keys/microsoft.asc | apt-key add - && \ + curl https://packages.microsoft.com/config/debian/11/prod.list > /etc/apt/sources.list.d/mssql-release.list && \ + apt-get update && \ + ACCEPT_EULA=Y apt-get install msodbcsql18 unixodbc-dev -y + +# Prep to install confluent-kafka https://github.com/confluentinc/confluent-kafka-python/issues/1326 +RUN apt-get update && \ + apt-get install -y --no-install-recommends git g++ make && \ + cd /tmp && git clone https://github.com/edenhill/librdkafka.git && \ + cd librdkafka && git checkout tags/v1.9.0 && \ + ./configure && make && make install && \ + cd ../ && rm -rf librdkafka + +WORKDIR ingestion/ + +# Only copy the necessary source files to execute Workflows +COPY ingestion/src/ src/ +COPY ingestion/setup.* ./ +COPY ingestion/README.md . + +# Required for Airflow DockerOperator, as we need to run the workflows from a `python main.py` command in the container. +COPY ingestion/operators/docker/main.py . + +RUN pip install --upgrade pip + +ARG INGESTION_DEPENDENCY="all" +RUN pip install --upgrade ".[${INGESTION_DEPENDENCY}]" + +# Uninstalling psycopg2-binary and installing psycopg2 instead +# because the psycopg2-binary generates a architecture specific error +# while authrenticating connection with the airflow, psycopg2 solves this error +RUN pip uninstall psycopg2-binary -y +RUN pip install psycopg2 mysqlclient diff --git a/ingestion/operators/docker/README.md b/ingestion/operators/docker/README.md new file mode 100644 index 00000000000..88dec67e580 --- /dev/null +++ b/ingestion/operators/docker/README.md @@ -0,0 +1,63 @@ +# OpenMetadata Ingestion Docker Operator + +Utilities required to handle metadata ingestion in Airflow using `DockerOperator`. + +The whole idea behind this approach is to avoid having to install packages directly +in any Airflow host, as this adds many (unnecessary) constraints to be aligned +on the `openmetadata-ingestion` package just to have the Python installation +as a `virtualenv` within the Airflow host. + +The proposed solution - or alternative approach - is to use the +[DockerOperator](https://airflow.apache.org/docs/apache-airflow-providers-docker/stable/_api/airflow/providers/docker/operators/docker/index.html) +and run the ingestion workflows dynamically. + +This requires the following: +- Docker image with the bare `openmetadata-ingestion` requirements, +- `main.py` file to execute the `Workflow`s, +- Handling of environment variables as input parameters for the operator. + +Note that Airflow's Docker Operator works as follows (example from [here](https://github.com/apache/airflow/blob/providers-docker/3.0.0/tests/system/providers/docker/example_docker.py)): + +```python +DockerOperator( + docker_url='unix://var/run/docker.sock', # Set your docker URL + command='/bin/sleep 30', + image='centos:latest', + network_mode='bridge', + task_id='docker_op_tester', + dag=dag, +) +``` + +We need to provide as ingredients: +1. Docker image to execute, +2. And command to run. + +This is not a Python-first approach, and therefore it is not allowing us to set a base image and pass a Python function +as a parameter (which would have been the preferred approach). Instead, we will leverage the `environment` input +parameter of the `DockerOperator` and pass all the necessary information in there. + +Our `main.py` Python file will then be in charge of: +1. Loading the workflow configuration from the environment variables, +2. Get the required workflow class to run and finally, +3. Execute the workflow. + +To try this locally, you can build the DEV image with `make build-ingestion-base-local` from the project root. + +## Further improvements + +We have two operator to leverage if we don't want to run the ingestion from Airflow's host environment: + +```python +from airflow.providers.docker.operators.docker import DockerOperator +from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator +``` + +Which can be installed from `apache-airflow[docker]` and `apache-airflow[kubernetes]` respectively. + +If we want to handle both of these directly on the `openmetadata-managed-apis` we need to consider a couple of things: +1. `DockerOperator` will only work with Docker and `KubernetesPodOperator` will only work with a k8s cluster. This means + that we'll need to dynamically handle the internal logic to use either of them depending on the deployment. + [Docs](https://airflow.apache.org/docs/apache-airflow-providers-cncf-kubernetes/stable/operators.html). +2. For GKE deployment things get a bit more complicated, as we'll need to use and test yet another operator + custom-built for GKE: `GKEStartPodOperator`. [Docs](https://airflow.apache.org/docs/apache-airflow-providers-google/stable/operators/cloud/kubernetes_engine.html#howto-operator-gkestartpodoperator) diff --git a/ingestion/operators/docker/main.py b/ingestion/operators/docker/main.py new file mode 100644 index 00000000000..c277e189216 --- /dev/null +++ b/ingestion/operators/docker/main.py @@ -0,0 +1,40 @@ +import os + +import yaml + +from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( + PipelineType, +) +from metadata.ingestion.api.workflow import Workflow +from metadata.orm_profiler.api.workflow import ProfilerWorkflow +from metadata.test_suite.api.workflow import TestSuiteWorkflow + +WORKFLOW_MAP = { + PipelineType.metadata.value: Workflow, + PipelineType.usage.value: Workflow, + PipelineType.lineage.value: Workflow, + PipelineType.profiler.value: ProfilerWorkflow, + PipelineType.TestSuite.value: TestSuiteWorkflow, +} + + +def main(): + # DockerOperator expects an env var called config + config = os.environ["config"] + pipeline_type = os.environ["pipelineType"] + + workflow_class = WORKFLOW_MAP.get(pipeline_type) + if workflow_class is None: + raise ValueError(f"Missing workflow_class loaded from {pipeline_type}") + + # Load the config string representation + workflow_config = yaml.safe_load(config) + workflow = workflow_class.create(workflow_config) + workflow.execute() + workflow.raise_from_status() + workflow.print_status() + workflow.stop() + + +if __name__ == "__main__": + main()