Fix #317: Ingestion bug introduced due to auth_token

This commit is contained in:
Suresh Srinivas 2021-08-26 16:26:34 -07:00
parent 8a5484144e
commit afb0d724eb
24 changed files with 49 additions and 41 deletions

View File

@ -5,7 +5,7 @@
"partitions": 56,
"retentionSize": 322122382273,
"cleanupPolicies": ["delete"],
"tags": ["Tier1"],
"schemaType": "Avro",
"schemaText": "{\"namespace\":\"org.open-metadata.kafka\",\"name\":\"Customer\",\"type\":\"record\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"first_name\",\"type\":\"string\"},{\"name\":\"last_name\",\"type\":\"string\"},{\"name\":\"email\",\"type\":\"string\"},{\"name\":\"address_line_1\",\"type\":\"string\"},{\"name\":\"address_line_2\",\"type\":\"string\"},{\"name\":\"post_code\",\"type\":\"string\"},{\"name\":\"country\",\"type\":\"string\"}]}"
},
@ -15,6 +15,7 @@
"partitions": 128,
"retentionSize": 322122382273,
"cleanupPolicies": ["delete"],
"tags": ["Tier2"],
"schemaType": "Avro",
"schemaText": "{\"namespace\":\"org.open-metadata.kafka\",\"type\":\"record\",\"name\":\"Product\",\"fields\":[{\"name\":\"product_id\",\"type\":\"int\"},{\"name\":\"title\",\"type\":\"string\"},{\"name\":\"price\",\"type\":\"double\"},{\"name\":\"sku\",\"type\":\"string\"},{\"name\":\"barcode\",\"type\":\"string\"},{\"name\":\"shop_id\",\"type\":\"int\"}]}"
},
@ -24,6 +25,7 @@
"partitions": 16,
"retentionSize": 322122382273,
"cleanupPolicies": ["delete"],
"tags": ["Tier1"],
"schemaType": "Avro",
"schemaText": "{\"namespace\":\"org.open-metadata.kafka\",\"type\":\"record\",\"name\":\"Shop\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"domain\",\"type\":\"string\"},{\"name\":\"user_id\",\"type\":\"int\"},{\"name\":\"first_name\",\"type\":\"string\"},{\"name\":\"last_name\",\"type\":\"string\"},{\"name\":\"email\",\"type\":\"string\"}]}"
},
@ -43,6 +45,7 @@
"retentionSize": 3222122382273,
"cleanupPolicies": ["delete"],
"schemaType": "Avro",
"tags": ["Tier2"],
"schemaText": "{\"namespace\":\"org.open-metadata.kafka\",\"type\":\"record\",\"name\":\"Order\",\"fields\":[{\"name\":\"order_id\",\"type\":\"int\"},{\"name\":\"api_client_id\",\"type\":\"int\"},{\"name\":\"billing_address_id\",\"type\":\"int\"},{\"name\":\"customer_id\",\"type\":\"int\"},{\"name\":\"location_id\",\"type\":\"int\"},{\"name\":\"shipping_address_id\",\"type\":\"int\"},{\"name\":\"user_id\",\"type\":\"int\"},{\"name\":\"total_price\",\"type\":\"double\"},{\"name\":\"discount_code\",\"type\":\"string\"},{\"name\":\"processed_at\",\"type\":\"int\"}]}"
},
{
@ -51,6 +54,7 @@
"partitions": 128,
"retentionSize": 3222122382273,
"cleanupPolicies": ["delete"],
"tags": ["Tier2"],
"schemaType": "Avro",
"schemaText": "{\"namespace\":\"org.open-metadata.kafka\",\"type\":\"record\",\"name\":\"Order\",\"fields\":[{\"name\":\"sale_id\",\"type\":\"int\"},{\"name\":\"billing_address_id\",\"type\":\"int\"},{\"name\":\"billing_address_id\",\"type\":\"int\"},{\"name\":\"api_client_id\",\"type\":\"int\"},{\"name\":\"customer_id\",\"type\":\"int\"},{\"name\":\"product_id\",\"type\":\"int\"},{\"name\":\"location_id\",\"type\":\"int\"},{\"name\":\"order_id\",\"type\":\"double\"}]}"
}

View File

@ -23,9 +23,8 @@ from metadata.ingestion.api.bulk_sink import BulkSink, BulkSinkStatus
from metadata.ingestion.api.common import WorkflowContext
from metadata.ingestion.models.table_queries import TableUsageCount, TableUsageRequest, TableColumn, \
ColumnJoinedWith
from metadata.ingestion.ometa.auth_provider import MetadataServerConfig
from metadata.ingestion.ometa.client import REST, APIError
from metadata.ingestion.ometa.openmetadata_rest import OpenMetadataAPIClient
from metadata.ingestion.ometa.client import APIError
from metadata.ingestion.ometa.openmetadata_rest import OpenMetadataAPIClient, MetadataServerConfig
logger = logging.getLogger(__name__)

View File

@ -109,7 +109,7 @@ class REST(object):
version = api_version if api_version else self._api_version
url: URL = URL(base_url + '/' + version + path)
headers = {'Content-type': 'application/json'}
if self._auth_token is not None:
if self._auth_token is not None and self._auth_token != 'no_token':
headers[self.config.auth_header] = self._auth_token
opts = {
'headers': headers,
@ -129,8 +129,8 @@ class REST(object):
retry = 0
while retry >= 0:
try:
logger.debug('URL {}, method {}'.format(url, method))
logger.debug('Data {}'.format(opts))
logger.info('URL {}, method {}'.format(url, method))
logger.info('Data {}'.format(opts))
return self._one_request(method, url, opts, retry)
except RetryException:
retry_wait = self._retry_wait

View File

@ -133,6 +133,7 @@ class OpenMetadataAPIClient(object):
self._auth_provider: AuthenticationProvider = NoOpAuthenticationProvider.create(self.config)
client_config: ClientConfig = ClientConfig(base_url=self.config.api_endpoint,
api_version=self.config.api_version,
auth_header='X-Catalog-Source',
auth_token=self._auth_provider.auth_token())
self.client = REST(client_config)
self._use_raw_data = raw_data

View File

@ -28,8 +28,7 @@ from metadata.generated.schema.type.tagLabel import TagLabel
from metadata.ingestion.api.common import WorkflowContext, Record
from metadata.ingestion.api.processor import Processor, ProcessorStatus
from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable
from metadata.ingestion.ometa.auth_provider import MetadataServerConfig
from metadata.ingestion.ometa.openmetadata_rest import OpenMetadataAPIClient
from metadata.ingestion.ometa.openmetadata_rest import OpenMetadataAPIClient, MetadataServerConfig
from metadata.utils.helpers import snake_to_camel

View File

@ -22,7 +22,7 @@ from metadata.config.common import ConfigModel
from metadata.ingestion.api.common import WorkflowContext
from metadata.ingestion.api.processor import Processor, ProcessorStatus
from metadata.ingestion.models.table_queries import TableQuery, QueryParserData
from metadata.ingestion.ometa.auth_provider import MetadataServerConfig
from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig
class QueryParserProcessorConfig(ConfigModel):

View File

@ -24,14 +24,13 @@ import metadata
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.data.topic import Topic
from metadata.ingestion.api.sink import Sink, SinkStatus
from metadata.ingestion.ometa.openmetadata_rest import OpenMetadataAPIClient
from metadata.ingestion.ometa.openmetadata_rest import OpenMetadataAPIClient, MetadataServerConfig
from metadata.ingestion.sink.elasticsearch_constants import TABLE_ELASTICSEARCH_INDEX_MAPPING, \
TOPIC_ELASTICSEARCH_INDEX_MAPPING
from metadata.config.common import ConfigModel
from metadata.ingestion.api.common import WorkflowContext, Record
from metadata.ingestion.models.table_metadata import TableESDocument, TopicESDocument
from metadata.ingestion.ometa.auth_provider import MetadataServerConfig
logger = logging.getLogger(__name__)

View File

@ -24,8 +24,8 @@ from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.common import WorkflowContext
from metadata.ingestion.api.sink import Sink, SinkStatus
from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable
from metadata.ingestion.ometa.client import APIError, MetadataServerConfig
from metadata.ingestion.ometa.openmetadata_rest import OpenMetadataAPIClient
from metadata.ingestion.ometa.client import APIError
from metadata.ingestion.ometa.openmetadata_rest import OpenMetadataAPIClient, MetadataServerConfig
logger = logging.getLogger(__name__)

View File

@ -19,7 +19,7 @@ from metadata.config.common import ConfigModel
from metadata.ingestion.api.common import WorkflowContext, Record
from metadata.ingestion.api.sink import Sink, SinkStatus
from metadata.ingestion.models.user import MetadataTeam, MetadataUser
from metadata.ingestion.ometa.auth_provider import MetadataServerConfig
from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig
from metadata.ingestion.ometa.client import REST, APIError
logger = logging.getLogger(__name__)

View File

@ -17,7 +17,7 @@ from typing import Optional
from urllib.parse import quote_plus
from .sql_source import SQLConnectionConfig, SQLSource
from ..ometa.auth_provider import MetadataServerConfig
from ..ometa.openmetadata_rest import MetadataServerConfig
class AthenaConfig(SQLConnectionConfig):

View File

@ -1,5 +1,17 @@
import concurrent
import uuid
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# This import verifies that the dependencies are available.
from dataclasses import field, dataclass, Field
from typing import List, Iterable, Optional
@ -10,8 +22,6 @@ from metadata.generated.schema.entity.services.messagingService import Messaging
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.common import IncludeFilterPattern, Record, logger, WorkflowContext
from metadata.ingestion.api.source import SourceStatus, Source
from fastavro import json_reader
from fastavro import parse_schema
import confluent_kafka
from confluent_kafka.admin import AdminClient, ConfigResource
@ -20,7 +30,7 @@ from confluent_kafka.schema_registry.schema_registry_client import (
SchemaRegistryClient,
)
import concurrent.futures
from metadata.ingestion.ometa.auth_provider import MetadataServerConfig
from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig
from metadata.utils.helpers import get_messaging_service_or_create

View File

@ -19,9 +19,7 @@ from typing import Iterable, Optional
from metadata.config.common import ConfigModel
from metadata.ingestion.api.common import WorkflowContext, Record
from metadata.ingestion.api.source import SourceStatus, Source
from metadata.ingestion.ometa.auth_provider import MetadataServerConfig
from metadata.ingestion.ometa.client import REST
from metadata.ingestion.ometa.openmetadata_rest import OpenMetadataAPIClient
from metadata.ingestion.ometa.openmetadata_rest import OpenMetadataAPIClient, MetadataServerConfig
logger = logging.getLogger(__name__)

View File

@ -14,7 +14,7 @@
# limitations under the License.
from .sql_source import SQLSource, SQLConnectionConfig
from ..ometa.auth_provider import MetadataServerConfig
from ..ometa.openmetadata_rest import MetadataServerConfig
class MySQLConfig(SQLConnectionConfig):

View File

@ -17,7 +17,7 @@
import cx_Oracle # noqa: F401
from .sql_source import SQLSource, SQLConnectionConfig
from ..ometa.auth_provider import MetadataServerConfig
from ..ometa.openmetadata_rest import MetadataServerConfig
class OracleConfig(SQLConnectionConfig):

View File

@ -32,8 +32,8 @@ from itertools import groupby
from typing import Iterator, Union, Dict, Any, Iterable
from collections import namedtuple
from ..ometa.auth_provider import MetadataServerConfig
from ...utils.helpers import get_service_or_create
from ..ometa.openmetadata_rest import MetadataServerConfig
from ...utils.helpers import get_database_service_or_create
TableKey = namedtuple('TableKey', ['schema', 'table_name'])
@ -96,7 +96,7 @@ class PostgresSource(Source):
self._database = 'postgres'
self.metadata_config = metadata_config
self.status = SQLSourceStatus()
self.service = get_service_or_create(config, metadata_config)
self.service = get_database_service_or_create(config, metadata_config)
self.pattern = config
self.filter_pattern: IncludeFilterPattern = IncludeFilterPattern.allow_all()

View File

@ -15,7 +15,7 @@
from urllib.parse import quote_plus
from .sql_source import SQLSource, SQLConnectionConfig
from ..ometa.auth_provider import MetadataServerConfig
from ..ometa.openmetadata_rest import MetadataServerConfig
class PrestoConfig(SQLConnectionConfig):

View File

@ -16,7 +16,7 @@
# This import verifies that the dependencies are available.
import logging
from metadata.ingestion.models.table_queries import TableQuery
from metadata.ingestion.ometa.auth_provider import MetadataServerConfig
from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig
from metadata.ingestion.source.sql_alchemy_helper import SQLAlchemyHelper, SQLSourceStatus
from metadata.ingestion.api.source import Source, SourceStatus
from typing import Iterator, Union, Dict, Any, Iterable

View File

@ -28,7 +28,7 @@ from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.source import SourceStatus, Source
from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable
from metadata.ingestion.ometa.auth_provider import MetadataServerConfig
from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig
from metadata.generated.schema.api.services.createDatabaseService import CreateDatabaseServiceEntityRequest
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.ingestion.ometa.openmetadata_rest import OpenMetadataAPIClient
@ -233,7 +233,7 @@ class SampleTablesSource(Source):
yield table_and_db
def close(self):
self.close()
pass
def get_status(self):
return self.status

View File

@ -23,8 +23,7 @@ from metadata.generated.schema.api.services.createMessagingService import Create
from metadata.generated.schema.entity.services.messagingService import MessagingService
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.source import SourceStatus, Source
from metadata.ingestion.ometa.auth_provider import MetadataServerConfig
from metadata.ingestion.ometa.openmetadata_rest import OpenMetadataAPIClient
from metadata.ingestion.ometa.openmetadata_rest import OpenMetadataAPIClient, MetadataServerConfig
def get_service_or_create(service_json, metadata_config) -> MessagingService:

View File

@ -2,11 +2,10 @@ import json
import csv
from metadata.ingestion.api.source import Source
from .sample_tables import SampleTableSourceConfig, SampleTableSourceStatus, get_service_or_create
from metadata.ingestion.ometa.auth_provider import MetadataServerConfig
from metadata.ingestion.models.table_queries import TableQuery
from typing import Iterable
from datetime import datetime
from ..ometa.openmetadata_rest import OpenMetadataAPIClient
from ..ometa.openmetadata_rest import OpenMetadataAPIClient, MetadataServerConfig
class SampleUsageSource(Source):

View File

@ -22,7 +22,7 @@ from .sql_source import (
SQLSource,
register_custom_type,
)
from ..ometa.auth_provider import MetadataServerConfig
from ..ometa.openmetadata_rest import MetadataServerConfig
register_custom_type(custom_types.TIMESTAMP_TZ, "TIME")
register_custom_type(custom_types.TIMESTAMP_LTZ, "TIME")

View File

@ -37,7 +37,7 @@ from sqlalchemy.inspection import inspect
from metadata.ingestion.api.common import IncludeFilterPattern, ConfigModel, Record
from metadata.ingestion.api.common import WorkflowContext
from metadata.ingestion.api.source import Source, SourceStatus
from metadata.ingestion.ometa.auth_provider import MetadataServerConfig
from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig
from metadata.utils.helpers import get_database_service_or_create
logger: logging.Logger = logging.getLogger(__name__)

View File

@ -21,7 +21,7 @@ from metadata.config.common import ConfigModel
from metadata.generated.schema.entity.data.table import Table
from metadata.ingestion.api.common import WorkflowContext
from metadata.ingestion.api.stage import Stage, StageStatus
from metadata.ingestion.ometa.client import MetadataServerConfig
from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig
logger = logging.getLogger(__name__)

View File

@ -20,7 +20,7 @@ import pathlib
from metadata.ingestion.api.common import WorkflowContext
from metadata.ingestion.api.stage import Stage, StageStatus
from metadata.ingestion.models.table_queries import TableUsageCount, QueryParserData, TableColumnJoin, TableColumn
from metadata.ingestion.ometa.client import MetadataServerConfig
from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig
from metadata.ingestion.stage.file import FileStageConfig
logger = logging.getLogger(__name__)