Operators directory cleanup (#10834)

* Remove old connectors directory

* Add mssql test for url

* Format
This commit is contained in:
Pere Miquel Brull 2023-03-30 09:51:21 +02:00 committed by GitHub
parent 7a5a6e31ec
commit 2e2c6a0cdf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 23 additions and 262 deletions

View File

@ -1,16 +0,0 @@
FROM python:3.9-slim
MAINTAINER OpenMetadata Committers
WORKDIR /ingestion
RUN apt-get update && \
apt-get install -y gcc libsasl2-dev curl build-essential libssl-dev libffi-dev librdkafka-dev unixodbc-dev python3.9-dev libevent-dev wget --no-install-recommends && \
rm -rf /var/lib/apt/lists/*
COPY src/ src/
COPY setup.* ./
COPY README.md .
COPY connectors/main.py .
RUN pip install .

View File

@ -1,53 +0,0 @@
# Ingestion Framework Connectors
Directory containing the necessary Dockerfiles to prepare the images that will hold the Connectors' code.
These images can then be used either in isolation or as `DockerOperator` in Airflow ingestions.
- `Dockerfile-base` contains the minimum basic requirements for all Connectors, i.e., the ingestion framework with the base requirements.
All the connector's images will be based on this basic image, and they will only add the necessary extra dependencies to run their own connector.
The Connector images will be named `ingestion-connector-${connectorName}`.
## Process
- We first need to build the base image `make build_docker_base`
- Then, we will build the connectors `make build_docker_connectors`
- Optionally, we can push the connectors' images to Dockerhub with `make push_docker_connectors`
We can then use the `DockerOperator` in Airflow as follows:
```python
ingest_task = DockerOperator(
task_id="ingest_using_docker",
image="openmetadata/ingestion-connector-base",
command="python main.py",
environment={
"config": config
},
tty=True,
auto_remove=True,
mounts=[
Mount(
source='/tmp/openmetadata/examples/',
target='/opt/operator/',
type='bind'
),
],
mount_tmp_dir=False,
network_mode="host", # Needed to reach Docker OM
)
```
Note that the `config` object should be a `str` representing the JSON connector
configuration. The connector images have been built packaging a `main.py` script that
will load the `config` environment variable and pass it to the `Workflow`.
## Configs
Note that in the example DAG for `airflow_sample_data.py` we are passing the `config` object with `"sample_data_folder": "/opt/operator/sample_data"`.
In the DAG definition we are mounting a volume with the `examples` data to `/opt/operator/` in the `DockerOperator`. A symlink is being generated in `run_local_docker.sh`.
Note that these specific configurations are just needed in our development/showcase DAG because of specific infra requirements. Each architecture will need to prepare its own volume and network settings.

View File

@ -1,106 +0,0 @@
"""
Dynamically build docker images for
all the connectors
"""
import argparse
import io
import logging
import sys
import traceback
from distutils.core import run_setup
from enum import Enum
import docker
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
logger = logging.getLogger(__name__)
client = docker.from_env()
TARGET = "openmetadata/ingestion-connector-{name}"
DOCKERFILE = "FROM openmetadata/ingestion-connector-base\n"
REQUIREMENTS = "RUN pip install {requirements}\n"
ENTRYPOINT = 'ENTRYPOINT ["python", "main.py"]'
class DockerCommands(Enum):
BUILD = "build"
PUSH = "push"
def get_setup_data():
"""
Get setup and filtered plugins data from setup.py
"""
setup = run_setup("./ingestion/setup.py", stop_after="init")
plugins = {
item[0]: item[1]
for item in setup.extras_require.items()
if item[0] not in {"base", "all"}
}
return setup, plugins
def build():
"""
Build all docker images for the connectors
"""
setup, plugins = get_setup_data()
for conn in plugins.keys():
logger.info(f"Building docker image for {conn}")
conn_reqs = " ".join((f'"{req}"' for req in plugins[conn]))
if plugins[conn]:
file = DOCKERFILE + REQUIREMENTS.format(requirements=conn_reqs) + ENTRYPOINT
else:
file = DOCKERFILE + ENTRYPOINT
target = TARGET.format(name=conn)
try:
client.images.build(
fileobj=io.BytesIO(file.encode()), tag=f"{target}:latest"
)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Error trying to build {conn}: {exc}")
def push():
"""
Push the previously built images for the connectors
to DockerHub
"""
setup, plugins = get_setup_data()
for conn in plugins.keys():
logger.info(f"Pushing docker image for {conn}")
target = TARGET.format(name=conn)
try:
client.images.push(
f"{target}:{setup.get_version()}",
stream=True,
decode=True,
)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Error trying to push {conn}: {exc}")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Docker Framework for OpenMetadata")
sub_parser = parser.add_subparsers(dest="command")
sub_parser.add_parser(DockerCommands.BUILD.value)
sub_parser.add_parser(DockerCommands.PUSH.value)
has_args = vars(parser.parse_args())
if has_args == DockerCommands.BUILD.value:
build()
if has_args == DockerCommands.PUSH.value:
push()

View File

@ -1,21 +0,0 @@
import json
import os
from metadata.ingestion.api.workflow import Workflow
def main():
# DockerOperator expects an env var called config
config = os.environ["config"]
# Load the config string representation
workflow_config = json.loads(config)
workflow = Workflow.create(workflow_config)
workflow.execute()
workflow.raise_from_status()
workflow.print_status()
workflow.stop()
if __name__ == "__main__":
main()

View File

@ -1,66 +0,0 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Entrypoint to test the connection to a source
"""
import os
import yaml
from metadata.generated.schema.entity.automations.testServiceConnection import (
TestServiceConnectionRequest,
)
from metadata.ingestion.source.connections import get_connection, get_test_connection_fn
from metadata.utils.secrets.secrets_manager_factory import SecretsManagerFactory
def main():
"""
Test Connection entrypoint.
The expected information is in the shape of `TestServiceConnectionRequest`, e.g.,
```
connection:
config:
type: Mysql
scheme: mysql+pymysql
username: openmetadata_user
password: openmetadata_password
hostPort: 'localhost:3306'
connectionType: Database
```
"""
config = os.getenv("config")
if not config:
raise RuntimeError(
"Missing environment variable `config` with the TestServiceConnectionRequest dict."
)
test_connection_dict = yaml.safe_load(config)
test_service_connection = TestServiceConnectionRequest.parse_obj(
test_connection_dict
)
# we need to instantiate the secret manager in case secrets are passed
SecretsManagerFactory(test_service_connection.secretsManagerProvider, None)
connection = get_connection(test_service_connection.connection.config)
# We won't wrap the call in a try/catch. If the connection fails, we want to
# raise the SourceConnectionException as it comes.
test_connection_fn = get_test_connection_fn(
test_service_connection.connection.config
)
test_connection_fn(connection, test_service_connection.connection.config)
if __name__ == "__main__":
main()

View File

@ -14,6 +14,10 @@ OpenMetadata source URL building tests
"""
from unittest import TestCase
from metadata.generated.schema.entity.services.connections.database.mssqlConnection import (
MssqlConnection,
MssqlScheme,
)
from metadata.generated.schema.entity.services.connections.database.mysqlConnection import (
MysqlConnection,
)
@ -48,3 +52,22 @@ class TestConfig(TestCase):
)
url = get_connection_url_common(connection)
assert url == "redshift+psycopg2://username:password@localhost:1234/dev"
def test_mssql_url(self):
"""
Validate URL building for MSSQL
"""
from metadata.ingestion.source.database.mssql.connection import (
get_connection_url,
)
expected_url = "mssql+pytds://sa:password@james\\bond:1433"
mssql_conn_obj = MssqlConnection(
username="sa",
password="password",
hostPort="james\\bond:1433",
scheme=MssqlScheme.mssql_pytds,
database=None,
)
assert expected_url == get_connection_url(mssql_conn_obj)