Data insights with metadata command-line (#11045)

* Data insights with metadata command-line

* Python Checkstyle

* Python Checkstyle
This commit is contained in:
Milan Bariya 2023-04-13 19:12:07 +05:30 committed by GitHub
parent de70c585bc
commit dda5963e34
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 146 additions and 1 deletions

View File

@ -0,0 +1,49 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Data Insigt utility for the metadata CLI
"""
import pathlib
import sys
import traceback
from metadata.config.common import load_config_file
from metadata.data_insight.api.workflow import DataInsightWorkflow
from metadata.utils.logger import cli_logger
from metadata.utils.workflow_output_handler import WorkflowType, print_init_error
logger = cli_logger()
def run_insight(config_path: str) -> None:
"""
Run the Data Insigt workflow from a config path
to a JSON or YAML file
:param config_path: Path to load JSON config
"""
config_file = pathlib.Path(config_path)
config_dict = None
try:
config_dict = load_config_file(config_file)
workflow = DataInsightWorkflow.create(config_dict)
logger.debug(f"Using config: {workflow.config}")
except Exception as exc:
logger.debug(traceback.format_exc())
print_init_error(exc, config_dict, WorkflowType.INSIGHT)
sys.exit(1)
workflow.execute()
workflow.stop()
workflow.print_status()
ret = workflow.result_status()
sys.exit(ret)

View File

@ -22,6 +22,7 @@ from metadata.cli.backup import UploadDestinationType, run_backup
from metadata.cli.dataquality import run_test
from metadata.cli.docker import BACKEND_DATABASES, DockerActions, run_docker
from metadata.cli.ingest import run_ingest
from metadata.cli.insight import run_insight
from metadata.cli.openmetadata_dag_config_migration import (
run_openmetadata_dag_config_migration,
)
@ -44,6 +45,7 @@ class MetadataCommands(Enum):
BACKUP = "backup"
RESTORE = "restore"
WEBHOOK = "webhook"
INSIGHT = "insight"
OPENMETADATA_IMPORTS_MIGRATION = "openmetadata_imports_migration"
OPENMETADATA_DAG_CONFIG_MIGRATION = "openmetadata_dag_config_migration"
@ -378,13 +380,18 @@ def get_parser(args=None):
help="Simple Webserver to test webhook metadata events",
)
)
create_common_config_parser_args(
sub_parser.add_parser(
MetadataCommands.INSIGHT.value, help="Data Insigt Workflow"
)
)
add_metadata_args(parser)
parser.add_argument("--debug", help="Debug Mode", action="store_true")
return parser.parse_args(args)
def metadata(args=None):
def metadata(args=None): # pylint: disable=too-many-branches
"""
This method implements parsing of the arguments passed from CLI
"""
@ -400,6 +407,8 @@ def metadata(args=None):
if metadata_workflow == MetadataCommands.INGEST.value:
run_ingest(config_path=config_file)
if metadata_workflow == MetadataCommands.INSIGHT.value:
run_insight(config_path=config_file)
if metadata_workflow == MetadataCommands.PROFILE.value:
run_profiler(config_path=config_file)
if metadata_workflow == MetadataCommands.TEST.value:

View File

@ -0,0 +1,20 @@
source:
type: dataInsight
serviceName: OpenMetadata
sourceConfig:
config:
type: MetadataToElasticSearch
processor:
type: data-insight-processor
config: {}
sink:
type: elasticsearch
config:
es_host: localhost
es_port: 9200
recreate_indexes: false
workflowConfig:
loggerLevel: DEBUG
openMetadataServerConfig:
hostPort: http://localhost:8585/api
authProvider: no-auth

View File

@ -74,6 +74,7 @@ class WorkflowType(Enum):
TEST = "test"
LINEAGE = "lineage"
USAGE = "usage"
INSIGHT = "insight"
EXAMPLES_WORKFLOW_PATH: Path = Path(__file__).parent / "../examples" / "workflows"

View File

@ -276,6 +276,72 @@ with DAG(
)
```
# Run Data Insights using the metadata CLI
### 1. Define the YAML Config
This is a sample config for Data Insights:
```yaml
source:
type: dataInsight
serviceName: OpenMetadata
sourceConfig:
config:
type: MetadataToElasticSearch
processor:
type: data-insight-processor
config: {}
sink:
type: elasticsearch
config:
es_host: localhost
es_port: 9200
recreate_indexes: false
workflowConfig:
loggerLevel: DEBUG
openMetadataServerConfig:
hostPort: "<OpenMetadata host and port>"
authProvider: openmetadata
securityConfig:
jwtToken: '{bot_jwt_token}'
```
#### Source Configuration - Source Config
- To send the metadata to OpenMetadata, it needs to be specified as `type: MetadataToElasticSearch`.
#### processor Configuration
- To send the metadata to OpenMetadata, it needs to be specified as `type: data-insight-processor`.
#### Workflow Configuration
The main property here is the `openMetadataServerConfig`, where you can define the host and security provider of your OpenMetadata installation.
For a simple, local installation using our docker containers, this looks like:
```yaml
workflowConfig:
openMetadataServerConfig:
hostPort: 'http://localhost:8585/api'
authProvider: openmetadata
securityConfig:
jwtToken: '{bot_jwt_token}'
```
We support different security providers. You can find their definitions [here](https://github.com/open-metadata/OpenMetadata/tree/main/openmetadata-spec/src/main/resources/json/schema/security/client).
You can find the different implementation of the ingestion below.
### 2. Run with the CLI
First, we will need to save the YAML file. Afterward, and with all requirements installed, we can run:
```bash
metadata insight -c <path-to-yaml>
```
# Run Elasticsearch Reindex using the Airflow SDK