mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-08-07 08:38:23 +00:00
* Use new JSON schemas in lineage * Add sqlite
This commit is contained in:
parent
b93c4611bb
commit
4e55ea0154
@ -1,115 +0,0 @@
|
|||||||
# Copyright 2021 Collate
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
# you may not use this file except in compliance with the License.
|
|
||||||
# You may obtain a copy of the License at
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
# See the License for the specific language governing permissions and
|
|
||||||
# limitations under the License.
|
|
||||||
|
|
||||||
"""
|
|
||||||
OpenMetadata Airflow Lineage Backend
|
|
||||||
"""
|
|
||||||
import json
|
|
||||||
import os
|
|
||||||
from typing import Optional
|
|
||||||
|
|
||||||
from airflow.configuration import conf
|
|
||||||
|
|
||||||
|
|
||||||
# TODO refactor https://github.com/open-metadata/OpenMetadata/issues/3844
|
|
||||||
class OpenMetadataLineageConfig(MetadataServerConfig):
|
|
||||||
"""
|
|
||||||
Base class for OpenMetada lineage config
|
|
||||||
|
|
||||||
Attributes
|
|
||||||
airflow_service_name (str): name of the service
|
|
||||||
api_endpoint (str): the endpoint for the API
|
|
||||||
auth_provider_type (str):
|
|
||||||
secret_key (str):
|
|
||||||
"""
|
|
||||||
|
|
||||||
airflow_service_name: str = "airflow"
|
|
||||||
api_endpoint: str = "http://localhost:8585"
|
|
||||||
auth_provider_type: str = "no-auth"
|
|
||||||
secret_key: Optional[str] = None
|
|
||||||
|
|
||||||
|
|
||||||
def get_lineage_config() -> OpenMetadataLineageConfig:
|
|
||||||
"""
|
|
||||||
Load the lineage config from airflow_provider_openmetadata.cfg.
|
|
||||||
"""
|
|
||||||
airflow_service_name = conf.get("lineage", "airflow_service_name", fallback=None)
|
|
||||||
if airflow_service_name:
|
|
||||||
api_endpoint = conf.get(
|
|
||||||
"lineage", "openmetadata_api_endpoint", fallback="http://localhost:8585"
|
|
||||||
)
|
|
||||||
auth_provider_type = conf.get(
|
|
||||||
"lineage", "auth_provider_type", fallback="no-auth"
|
|
||||||
)
|
|
||||||
secret_key = conf.get("lineage", "secret_key", fallback=None)
|
|
||||||
api_version = conf.get("lineage", "api_version", fallback="v1")
|
|
||||||
retry = conf.get("lineage", "retry", fallback=3)
|
|
||||||
retry_wait = conf.get("lineage", "retry_wait", fallback=3)
|
|
||||||
org_url = conf.get("lineage", "org_url", fallback=None)
|
|
||||||
client_id = conf.get("lineage", "client_id", fallback=None)
|
|
||||||
private_key = conf.get("lineage", "private_key", fallback=None)
|
|
||||||
domain = conf.get("lineage", "domain", fallback=None)
|
|
||||||
email = conf.get("lineage", "email", fallback=None)
|
|
||||||
audience = conf.get(
|
|
||||||
"lineage", "audience", fallback="https://www.googleapis.com/oauth2/v4/token"
|
|
||||||
)
|
|
||||||
auth_header = conf.get("lineage", "auth_header", fallback="Authorization")
|
|
||||||
authority = conf.get("lineage", "authority", fallback="")
|
|
||||||
scopes = conf.get("lineage", "scopes", fallback=[])
|
|
||||||
return OpenMetadataLineageConfig.parse_obj(
|
|
||||||
{
|
|
||||||
"airflow_service_name": airflow_service_name,
|
|
||||||
"api_endpoint": api_endpoint,
|
|
||||||
"auth_provider_type": auth_provider_type,
|
|
||||||
"secret_key": secret_key,
|
|
||||||
"audience": audience,
|
|
||||||
"auth_header": auth_header,
|
|
||||||
"email": email,
|
|
||||||
"domain": domain,
|
|
||||||
"private_key": private_key,
|
|
||||||
"client_id": client_id,
|
|
||||||
"org_url": org_url,
|
|
||||||
"retry_wait": retry_wait,
|
|
||||||
"retry": retry,
|
|
||||||
"api_version": api_version,
|
|
||||||
"authority": authority,
|
|
||||||
"scopes": scopes,
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
openmetadata_config_file = os.getenv("OPENMETADATA_LINEAGE_CONFIG")
|
|
||||||
if openmetadata_config_file:
|
|
||||||
with open(openmetadata_config_file, encoding="utf-8") as config_file:
|
|
||||||
config = json.load(config_file)
|
|
||||||
return OpenMetadataLineageConfig.parse_obj(config)
|
|
||||||
|
|
||||||
return OpenMetadataLineageConfig.parse_obj(
|
|
||||||
{
|
|
||||||
"airflow_service_name": "airflow",
|
|
||||||
"api_endpoint": "http://localhost:8585/api",
|
|
||||||
"auth_provider_type": "no-auth",
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def get_metadata_config(config: OpenMetadataLineageConfig) -> MetadataServerConfig:
|
|
||||||
"""
|
|
||||||
Return MetadataServerConfig to interact with the API.
|
|
||||||
:param config: get_lineage_config()
|
|
||||||
"""
|
|
||||||
|
|
||||||
return MetadataServerConfig.parse_obj(
|
|
||||||
{
|
|
||||||
"api_endpoint": config.api_endpoint,
|
|
||||||
"auth_provider_type": config.auth_provider_type,
|
|
||||||
"secret_key": config.secret_key,
|
|
||||||
}
|
|
||||||
)
|
|
@ -0,0 +1,16 @@
|
|||||||
|
# Copyright 2021 Collate
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
"""
|
||||||
|
OpenMetadata Airflow Lineage Backend config commons & cte
|
||||||
|
"""
|
||||||
|
|
||||||
|
LINEAGE = "lineage"
|
@ -0,0 +1,89 @@
|
|||||||
|
# Copyright 2021 Collate
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
"""
|
||||||
|
OpenMetadata Airflow Lineage Backend
|
||||||
|
"""
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
|
||||||
|
from airflow.configuration import conf
|
||||||
|
from pydantic import BaseModel
|
||||||
|
|
||||||
|
from airflow_provider_openmetadata.lineage.config.commons import LINEAGE
|
||||||
|
from airflow_provider_openmetadata.lineage.config.providers import (
|
||||||
|
provider_config_registry,
|
||||||
|
)
|
||||||
|
from metadata.generated.schema.metadataIngestion.workflow import (
|
||||||
|
AuthProvider,
|
||||||
|
OpenMetadataServerConfig,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class AirflowLineageConfig(BaseModel):
|
||||||
|
airflow_service_name: str
|
||||||
|
metadata_config: OpenMetadataServerConfig
|
||||||
|
|
||||||
|
|
||||||
|
def parse_airflow_config(airflow_service_name: str) -> AirflowLineageConfig:
|
||||||
|
"""
|
||||||
|
Get airflow config from airflow.cfg and parse it
|
||||||
|
to the config model
|
||||||
|
"""
|
||||||
|
auth_provider_type = conf.get(
|
||||||
|
LINEAGE, "auth_provider_type", fallback=AuthProvider.no_auth.value
|
||||||
|
)
|
||||||
|
|
||||||
|
if auth_provider_type == AuthProvider.no_auth.value:
|
||||||
|
security_config = None
|
||||||
|
else:
|
||||||
|
load_security_config_fn = provider_config_registry.registry(auth_provider_type)
|
||||||
|
security_config = load_security_config_fn()
|
||||||
|
|
||||||
|
return AirflowLineageConfig(
|
||||||
|
airflow_service_name=airflow_service_name,
|
||||||
|
metadata_config=OpenMetadataServerConfig(
|
||||||
|
hostPort=conf.get(
|
||||||
|
LINEAGE,
|
||||||
|
"openmetadata_api_endpoint",
|
||||||
|
fallback="http://localhost:8585/api",
|
||||||
|
),
|
||||||
|
authProvider=auth_provider_type,
|
||||||
|
securityConfig=security_config,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def get_lineage_config() -> AirflowLineageConfig:
|
||||||
|
"""
|
||||||
|
Load the lineage config from airflow.cfg, from
|
||||||
|
a JSON file path configures as env in OPENMETADATA_LINEAGE_CONFIG
|
||||||
|
or return a default config.
|
||||||
|
"""
|
||||||
|
airflow_service_name = conf.get(LINEAGE, "airflow_service_name", fallback=None)
|
||||||
|
if airflow_service_name:
|
||||||
|
return parse_airflow_config(airflow_service_name)
|
||||||
|
|
||||||
|
openmetadata_config_file = os.getenv("OPENMETADATA_LINEAGE_CONFIG")
|
||||||
|
|
||||||
|
# If config file, parse the JSON config, that should conform to AirflowLineageConfig
|
||||||
|
if openmetadata_config_file:
|
||||||
|
with open(openmetadata_config_file, encoding="utf-8") as config_file:
|
||||||
|
config = json.load(config_file)
|
||||||
|
return AirflowLineageConfig.parse_obj(config)
|
||||||
|
|
||||||
|
# If nothing is configured, let's use a default for local
|
||||||
|
return AirflowLineageConfig(
|
||||||
|
airflow_service_name="airflow",
|
||||||
|
metadata_config=OpenMetadataServerConfig(
|
||||||
|
hostPort="http://localhost:8585/api",
|
||||||
|
),
|
||||||
|
)
|
@ -0,0 +1,85 @@
|
|||||||
|
# Copyright 2021 Collate
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
"""
|
||||||
|
OpenMetadata Airflow Lineage Backend security providers config
|
||||||
|
"""
|
||||||
|
|
||||||
|
from collections import namedtuple
|
||||||
|
|
||||||
|
from airflow.configuration import conf
|
||||||
|
|
||||||
|
from airflow_provider_openmetadata.lineage.config.commons import LINEAGE
|
||||||
|
from metadata.generated.schema.metadataIngestion.workflow import (
|
||||||
|
Auth0SSOConfig,
|
||||||
|
AuthProvider,
|
||||||
|
GoogleSSOConfig,
|
||||||
|
OktaSSOConfig,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def register_provider_config():
|
||||||
|
"""
|
||||||
|
Helps us register custom functions to parse provider config
|
||||||
|
"""
|
||||||
|
registry = dict()
|
||||||
|
|
||||||
|
def add(provider: str):
|
||||||
|
def inner(fn):
|
||||||
|
registry[provider] = fn
|
||||||
|
return fn
|
||||||
|
|
||||||
|
return inner
|
||||||
|
|
||||||
|
Register = namedtuple("Register", ["add", "registry"])
|
||||||
|
return Register(add, registry)
|
||||||
|
|
||||||
|
|
||||||
|
provider_config_registry = register_provider_config()
|
||||||
|
|
||||||
|
|
||||||
|
@provider_config_registry.add(AuthProvider.google.value)
|
||||||
|
def load_google_auth() -> GoogleSSOConfig:
|
||||||
|
"""
|
||||||
|
Load config for Google Auth
|
||||||
|
"""
|
||||||
|
return GoogleSSOConfig(
|
||||||
|
secretKey=conf.get(LINEAGE, "secret_key"),
|
||||||
|
audience=conf.get(
|
||||||
|
LINEAGE, "audience", fallback="https://www.googleapis.com/oauth2/v4/token"
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@provider_config_registry.add(AuthProvider.okta.value)
|
||||||
|
def load_okta_auth() -> OktaSSOConfig:
|
||||||
|
"""
|
||||||
|
Load config for Google Auth
|
||||||
|
"""
|
||||||
|
return OktaSSOConfig(
|
||||||
|
clientId=conf.get(LINEAGE, "client_id"),
|
||||||
|
orgURL=conf.get(LINEAGE, "org_url"),
|
||||||
|
privateKey=conf.get(LINEAGE, "private_key"),
|
||||||
|
email=conf.get(LINEAGE, "email"),
|
||||||
|
scopes=conf.get(LINEAGE, "scopes", fallback=[]),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@provider_config_registry.add(AuthProvider.auth0.value)
|
||||||
|
def load_auth0_auth() -> Auth0SSOConfig:
|
||||||
|
"""
|
||||||
|
Load config for Google Auth
|
||||||
|
"""
|
||||||
|
return Auth0SSOConfig(
|
||||||
|
clientId=conf.get(LINEAGE, "client_id"),
|
||||||
|
secretKey=conf.get(LINEAGE, "secret_key"),
|
||||||
|
domain=conf.get(LINEAGE, "domain"),
|
||||||
|
)
|
@ -18,9 +18,9 @@ from typing import TYPE_CHECKING, Dict, List, Optional
|
|||||||
|
|
||||||
from airflow.lineage.backend import LineageBackend
|
from airflow.lineage.backend import LineageBackend
|
||||||
|
|
||||||
from airflow_provider_openmetadata.lineage.config import (
|
from airflow_provider_openmetadata.lineage.config.loader import (
|
||||||
|
AirflowLineageConfig,
|
||||||
get_lineage_config,
|
get_lineage_config,
|
||||||
get_metadata_config,
|
|
||||||
)
|
)
|
||||||
from airflow_provider_openmetadata.lineage.utils import get_xlets, parse_lineage
|
from airflow_provider_openmetadata.lineage.utils import get_xlets, parse_lineage
|
||||||
from metadata.ingestion.ometa.ometa_api import OpenMetadata
|
from metadata.ingestion.ometa.ometa_api import OpenMetadata
|
||||||
@ -78,9 +78,8 @@ class OpenMetadataLineageBackend(LineageBackend):
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
try:
|
try:
|
||||||
config = get_lineage_config()
|
config: AirflowLineageConfig = get_lineage_config()
|
||||||
metadata_config = get_metadata_config(config)
|
client = OpenMetadata(config.metadata_config)
|
||||||
client = OpenMetadata(metadata_config)
|
|
||||||
|
|
||||||
op_inlets = get_xlets(operator, "_inlets")
|
op_inlets = get_xlets(operator, "_inlets")
|
||||||
op_outlets = get_xlets(operator, "_outlets")
|
op_outlets = get_xlets(operator, "_outlets")
|
||||||
|
@ -18,7 +18,7 @@ from typing import TYPE_CHECKING, Dict, List, Optional
|
|||||||
|
|
||||||
from airflow.configuration import conf
|
from airflow.configuration import conf
|
||||||
|
|
||||||
from airflow_provider_openmetadata.lineage.config import OpenMetadataLineageConfig
|
from airflow_provider_openmetadata.lineage.config.loader import AirflowLineageConfig
|
||||||
from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest
|
from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest
|
||||||
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
|
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
|
||||||
from metadata.generated.schema.api.services.createPipelineService import (
|
from metadata.generated.schema.api.services.createPipelineService import (
|
||||||
@ -308,7 +308,7 @@ def add_status(
|
|||||||
|
|
||||||
# pylint: disable=too-many-arguments,too-many-locals
|
# pylint: disable=too-many-arguments,too-many-locals
|
||||||
def parse_lineage(
|
def parse_lineage(
|
||||||
config: OpenMetadataLineageConfig,
|
config: AirflowLineageConfig,
|
||||||
context: Dict,
|
context: Dict,
|
||||||
operator: "BaseOperator",
|
operator: "BaseOperator",
|
||||||
inlets: List,
|
inlets: List,
|
||||||
@ -382,7 +382,7 @@ def parse_lineage(
|
|||||||
|
|
||||||
|
|
||||||
def get_or_create_pipeline_service(
|
def get_or_create_pipeline_service(
|
||||||
operator: "BaseOperator", client: OpenMetadata, config: OpenMetadataLineageConfig
|
operator: "BaseOperator", client: OpenMetadata, config: AirflowLineageConfig
|
||||||
) -> PipelineService:
|
) -> PipelineService:
|
||||||
"""
|
"""
|
||||||
Check if we already have the airflow instance as a PipelineService,
|
Check if we already have the airflow instance as a PipelineService,
|
||||||
|
@ -11,7 +11,6 @@
|
|||||||
"""
|
"""
|
||||||
Generic source to build SQL connectors.
|
Generic source to build SQL connectors.
|
||||||
"""
|
"""
|
||||||
import copy
|
|
||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
import re
|
import re
|
||||||
@ -209,7 +208,6 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
|
|||||||
inspectors = self.get_databases()
|
inspectors = self.get_databases()
|
||||||
for inspector in inspectors:
|
for inspector in inspectors:
|
||||||
schema_names = inspector.get_schema_names()
|
schema_names = inspector.get_schema_names()
|
||||||
print(schema_names)
|
|
||||||
for schema in schema_names:
|
for schema in schema_names:
|
||||||
# clear any previous source database state
|
# clear any previous source database state
|
||||||
self.database_source_state.clear()
|
self.database_source_state.clear()
|
||||||
|
@ -17,6 +17,9 @@ from urllib.parse import quote_plus
|
|||||||
from metadata.generated.schema.entity.services.connections.database.mysqlConnection import (
|
from metadata.generated.schema.entity.services.connections.database.mysqlConnection import (
|
||||||
MysqlConnection,
|
MysqlConnection,
|
||||||
)
|
)
|
||||||
|
from metadata.generated.schema.entity.services.connections.database.sqliteConnection import (
|
||||||
|
SQLiteConnection,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@singledispatch
|
@singledispatch
|
||||||
@ -53,3 +56,12 @@ def _(connection: MysqlConnection):
|
|||||||
url = f"{url}?{params}"
|
url = f"{url}?{params}"
|
||||||
|
|
||||||
return url
|
return url
|
||||||
|
|
||||||
|
|
||||||
|
@get_connection_url.register
|
||||||
|
def _(connection: SQLiteConnection):
|
||||||
|
"""
|
||||||
|
SQLite is only used for testing with the in-memory db
|
||||||
|
"""
|
||||||
|
|
||||||
|
return f"{connection.scheme.value}:///:memory:"
|
||||||
|
Loading…
x
Reference in New Issue
Block a user