Merge pull request #280 from open-metadata/pipeline-fix

Pipeline fix
This commit is contained in:
Suresh Srinivas 2021-08-24 08:05:42 -07:00 committed by GitHub
commit 6ba87a5297
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 93 additions and 46 deletions

View File

@ -39,14 +39,14 @@ You only need to run above command once.
```text ```text
source env/bin/activate source env/bin/activate
metadata ingest -c ./pipelines/redshift.json metadata ingest -c ./examples/workflows/redshift.json
``` ```
#### Generate Redshift Usage Data #### Generate Redshift Usage Data
```text ```text
source env/bin/activate source env/bin/activate
metadata ingest -c ./pipelines/redshift_usage.json metadata ingest -c ./examples/workflows/redshift_usage.json
``` ```
#### Generate Sample Tables #### Generate Sample Tables
@ -55,7 +55,12 @@ metadata ingest -c ./pipelines/redshift.json
source env/bin/activate source env/bin/activate
metadata ingest -c ./pipelines/sample_tables.json metadata ingest -c ./pipelines/sample_tables.json
``` ```
#### Generate Sample Usage
```text
source env/bin/activate
metadata ingest -c ./pipelines/sample_usage.json
```
#### Generate Sample Users #### Generate Sample Users
```text ```text
@ -75,7 +80,7 @@ metadata ingest -c ./pipelines/redshift.json
```text ```text
source env/bin/activate source env/bin/activate
export GOOGLE_APPLICATION_CREDENTIALS="$PWD/examples/creds/bigquery-cred.json" export GOOGLE_APPLICATION_CREDENTIALS="$PWD/examples/creds/bigquery-cred.json"
metadata ingest -c ./pipelines/bigquery.json metadata ingest -c ./examples/workflows/bigquery.json
``` ```
#### Index Metadata into ElasticSearch #### Index Metadata into ElasticSearch

View File

@ -6,7 +6,14 @@
"host_port": "https://bigquery.googleapis.com", "host_port": "https://bigquery.googleapis.com",
"username": "gcpuser@project_id.iam.gserviceaccount.com", "username": "gcpuser@project_id.iam.gserviceaccount.com",
"service_name": "gcp_bigquery", "service_name": "gcp_bigquery",
"service_type": "BigQuery" "service_type": "BigQuery",
"filter_pattern": {
"excludes": [
"[\\w]*cloudaudit.*",
"[\\w]*logging_googleapis_com.*",
"[\\w]*clouderrorreporting.*"
]
}
} }
}, },
"processor": { "processor": {

View File

@ -10,10 +10,8 @@
"service_name": "snowflake", "service_name": "snowflake",
"service_type": "Snowflake", "service_type": "Snowflake",
"filter_pattern": { "filter_pattern": {
"includes": [ "excludes": [
"(\\w)*tpcds_sf100tcl", "tpcds_sf100tcl"
"(\\w)*tpcds_sf100tcl",
"(\\w)*tpcds_sf10tcl"
] ]
} }
} }

View File

@ -35,6 +35,7 @@ class MetadataLoaderJob(job.JobBase, Workflow):
def run(self, pipeline_data, *args, **kwargs): def run(self, pipeline_data, *args, **kwargs):
config_data = json.loads(pipeline_data) config_data = json.loads(pipeline_data)
if config_data.get('cron'):
del config_data['cron'] del config_data['cron']
self.workflow = Workflow.create(config_data) self.workflow = Workflow.create(config_data)
self.workflow.execute() self.workflow.execute()

View File

@ -66,6 +66,7 @@ def ingest(config: str) -> None:
try: try:
logger.info(f"Using config: {workflow_config}") logger.info(f"Using config: {workflow_config}")
if workflow_config.get('cron'):
del workflow_config['cron'] del workflow_config['cron']
workflow = Workflow.create(workflow_config) workflow = Workflow.create(workflow_config)
except ValidationError as e: except ValidationError as e:

View File

@ -25,21 +25,18 @@ from .status import Status
class SourceStatus(Status): class SourceStatus(Status):
records = 0 records = 0
warnings: Dict[str, List[str]] = field(default_factory=dict) warnings: List[str] = field(default_factory=list)
failures: Dict[str, List[str]] = field(default_factory=dict) failures: List[str] = field(default_factory=list)
def scanned(self, record: Record) -> None: def scanned(self, record: Record) -> None:
self.records += 1 self.records += 1
def warning(self, key: str, reason: str) -> None: def warning(self, key: str, reason: str) -> None:
if key not in self.warnings: self.warnings.append({key:reason})
self.warnings[key] = []
self.warnings[key].append(reason)
def failure(self, key: str, reason: str) -> None: def failure(self, key: str, reason: str) -> None:
if key not in self.failures: self.failures.append({key:reason})
self.failures[key] = []
self.failures[key].append(reason)
@dataclass # type: ignore[misc] @dataclass # type: ignore[misc]

View File

@ -71,6 +71,8 @@ class MetadataUsageBulkSink(BulkSink):
usage_records = [json.loads(l) for l in self.file_handler.readlines()] usage_records = [json.loads(l) for l in self.file_handler.readlines()]
for record in usage_records: for record in usage_records:
table_usage = TableUsageCount(**json.loads(record)) table_usage = TableUsageCount(**json.loads(record))
if '.' in table_usage.table:
table_usage.table = table_usage.table.split(".")[1]
if table_usage.table in self.tables_dict: if table_usage.table in self.tables_dict:
table_entity = self.tables_dict[table_usage.table] table_entity = self.tables_dict[table_usage.table]
table_usage_request = TableUsageRequest(date=table_usage.date, count=table_usage.count) table_usage_request = TableUsageRequest(date=table_usage.date, count=table_usage.count)

View File

@ -46,6 +46,7 @@ class AthenaConfig(SQLConnectionConfig):
return url return url
class AthenaSource(SQLSource): class AthenaSource(SQLSource):
def __init__(self, config, metadata_config, ctx): def __init__(self, config, metadata_config, ctx):
super().__init__(config, metadata_config, ctx) super().__init__(config, metadata_config, ctx)

View File

@ -15,6 +15,8 @@
from typing import Optional, Tuple from typing import Optional, Tuple
from metadata.generated.schema.entity.data.table import TableData
# This import verifies that the dependencies are available. # This import verifies that the dependencies are available.
from .sql_source import SQLConnectionConfig, SQLSource from .sql_source import SQLConnectionConfig, SQLSource
@ -41,6 +43,16 @@ class BigquerySource(SQLSource):
metadata_config = MetadataServerConfig.parse_obj(metadata_config_dict) metadata_config = MetadataServerConfig.parse_obj(metadata_config_dict)
return cls(config, metadata_config, ctx) return cls(config, metadata_config, ctx)
def fetch_sample_data(self, schema: str, table: str):
query = f"select * from {self.config.project_id}.{schema}.{table} limit 50"
results = self.connection.execute(query)
cols = list(results.keys())
rows = []
for r in results:
row = list(r)
rows.append(row)
return TableData(columns=cols, rows=rows)
def standardize_schema_table_names( def standardize_schema_table_names(
self, schema: str, table: str self, schema: str, table: str
) -> Tuple[str, str]: ) -> Tuple[str, str]:

View File

@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from typing import Optional
from pyhive import hive # noqa: F401 from pyhive import hive # noqa: F401
from pyhive.sqlalchemy_hive import HiveDate, HiveDecimal, HiveTimestamp from pyhive.sqlalchemy_hive import HiveDate, HiveDecimal, HiveTimestamp
@ -30,9 +31,11 @@ register_custom_type(HiveDecimal, "NUMBER")
class HiveConfig(SQLConnectionConfig): class HiveConfig(SQLConnectionConfig):
scheme = "hive" scheme = "hive"
auth_options: Optional[str] = None
def get_connection_url(self): def get_connection_url(self):
return super().get_connection_url() url = super().get_connection_url()
return f'{url};{self.auth_options}'
class HiveSource(SQLSource): class HiveSource(SQLSource):

View File

@ -28,6 +28,8 @@ class MssqlConfig(SQLConnectionConfig):
return super().get_connection_url() return super().get_connection_url()
class MssqlSource(SQLSource): class MssqlSource(SQLSource):
def __init__(self, config, metadata_config, ctx): def __init__(self, config, metadata_config, ctx):
super().__init__(config, metadata_config, ctx) super().__init__(config, metadata_config, ctx)

View File

@ -25,6 +25,11 @@ class OracleConfig(SQLConnectionConfig):
scheme = "oracle+cx_oracle" scheme = "oracle+cx_oracle"
def get_connection_url(self):
return super().get_connection_url()
class OracleSource(SQLSource): class OracleSource(SQLSource):
def __init__(self, config, metadata_config, ctx): def __init__(self, config, metadata_config, ctx):
super().__init__(config, metadata_config, ctx) super().__init__(config, metadata_config, ctx)

View File

@ -51,6 +51,8 @@ class PostgresSourceConfig(SQLConnectionConfig):
return super().get_connection_url() return super().get_connection_url()
def get_table_key(row: Dict[str, Any]) -> Union[TableKey, None]: def get_table_key(row: Dict[str, Any]) -> Union[TableKey, None]:
""" """
Table key consists of schema and table name Table key consists of schema and table name

View File

@ -34,6 +34,8 @@ class PrestoConfig(SQLConnectionConfig):
return url return url
class PrestoSource(SQLSource): class PrestoSource(SQLSource):
def __init__(self, config, metadata_config, ctx): def __init__(self, config, metadata_config, ctx):
super().__init__(config, metadata_config, ctx) super().__init__(config, metadata_config, ctx)

View File

@ -38,6 +38,8 @@ class RedshiftConfig(SQLConnectionConfig):
return super().get_connection_url() return super().get_connection_url()
class RedshiftSource(SQLSource): class RedshiftSource(SQLSource):
def __init__(self, config, metadata_config, ctx): def __init__(self, config, metadata_config, ctx):

View File

@ -50,6 +50,8 @@ class SnowflakeConfig(SQLConnectionConfig):
return connect_string return connect_string
class SnowflakeSource(SQLSource): class SnowflakeSource(SQLSource):
def __init__(self, config, metadata_config, ctx): def __init__(self, config, metadata_config, ctx):
super().__init__(config, metadata_config, ctx) super().__init__(config, metadata_config, ctx)

View File

@ -83,7 +83,10 @@ class SnowflakeUsageSource(Source):
for row in self._get_raw_extract_iter(): for row in self._get_raw_extract_iter():
tq = TableQuery(row['query'], row['label'], 0, 0, 0, str(row['starttime']), tq = TableQuery(row['query'], row['label'], 0, 0, 0, str(row['starttime']),
str(row['endtime']), str(row['starttime'])[0:19], 2, row['database'], 0, row['sql']) str(row['endtime']), str(row['starttime'])[0:19], 2, row['database'], 0, row['sql'])
if row['schema_name'] is not None:
self.report.scanned(f"{row['database']}.{row['schema_name']}") self.report.scanned(f"{row['database']}.{row['schema_name']}")
else:
self.report.scanned(f"{row['database']}")
yield tq yield tq
def get_report(self): def get_report(self):

View File

@ -55,7 +55,7 @@ class SQLSourceStatus(SourceStatus):
def filtered(self, table_name: str, err: str, dataset_name: str = None, col_type: str = None) -> None: def filtered(self, table_name: str, err: str, dataset_name: str = None, col_type: str = None) -> None:
self.warnings.append(table_name) self.warnings.append(table_name)
logger.warning("Dropped Table {} due to {}".format(dataset_name, err)) logger.warning("Dropped Table {} due to {}".format(table_name, err))
class SQLConnectionConfig(ConfigModel): class SQLConnectionConfig(ConfigModel):
@ -83,6 +83,7 @@ class SQLConnectionConfig(ConfigModel):
url += f"{self.host_port}" url += f"{self.host_port}"
if self.database: if self.database:
url += f"/{self.database}" url += f"/{self.database}"
logger.info(url)
return url return url
def get_service_type(self) -> DatabaseServiceType: def get_service_type(self) -> DatabaseServiceType:
@ -160,7 +161,7 @@ class SQLSource(Source):
self.status = SQLSourceStatus() self.status = SQLSourceStatus()
self.sql_config = self.config self.sql_config = self.config
self.engine = create_engine(self.sql_config.get_connection_url(), **self.sql_config.options) self.engine = create_engine(self.sql_config.get_connection_url(), **self.sql_config.options)
self.connection = None self.connection = self.engine.connect()
def prepare(self): def prepare(self):
pass pass
@ -169,10 +170,14 @@ class SQLSource(Source):
def create(cls, config_dict: dict, metadata_config_dict: dict, ctx: WorkflowContext): def create(cls, config_dict: dict, metadata_config_dict: dict, ctx: WorkflowContext):
pass pass
def standardize_schema_table_names(
self, schema: str, table: str
) -> Tuple[str, str]:
print("IN SQL SOURCE")
return schema, table
def fetch_sample_data(self, schema: str, table: str): def fetch_sample_data(self, schema: str, table: str):
try: try:
if self.connection is None:
self.connection = self.engine.connect()
query = f"select * from {schema}.{table} limit 50" query = f"select * from {schema}.{table} limit 50"
logger.info("Fetching sample data, this may take a while {}".format(query)) logger.info("Fetching sample data, this may take a while {}".format(query))
results = self.connection.execute(query) results = self.connection.execute(query)
@ -182,16 +187,10 @@ class SQLSource(Source):
row = list(r) row = list(r)
rows.append(row) rows.append(row)
return TableData(columns=cols, rows=rows) return TableData(columns=cols, rows=rows)
except: except Exception as err:
logger.error("Failed to generate sample data for {}".format(table)) logger.error("Failed to generate sample data for {} - {}".format(table, err))
def standardize_schema_table_names(
self, schema: str, table: str
) -> Tuple[str, str]:
return schema, table
def next_record(self) -> Iterable[OMetaDatabaseAndTable]: def next_record(self) -> Iterable[OMetaDatabaseAndTable]:
inspector = inspect(self.engine) inspector = inspect(self.engine)
for schema in inspector.get_schema_names(): for schema in inspector.get_schema_names():
if not self.sql_config.filter_pattern.included(schema): if not self.sql_config.filter_pattern.included(schema):
@ -208,7 +207,7 @@ class SQLSource(Source):
schema: str) -> Iterable[OMetaDatabaseAndTable]: schema: str) -> Iterable[OMetaDatabaseAndTable]:
for table_name in inspector.get_table_names(schema): for table_name in inspector.get_table_names(schema):
try: try:
schema, table = self.standardize_schema_table_names(schema, table_name) schema, table_name = self.standardize_schema_table_names(schema, table_name)
if not self.sql_config.filter_pattern.included(table_name): if not self.sql_config.filter_pattern.included(table_name):
self.status.filtered('{}.{}'.format(self.config.get_service_name(), table_name), self.status.filtered('{}.{}'.format(self.config.get_service_name(), table_name),
"Table pattern not allowed") "Table pattern not allowed")
@ -218,17 +217,16 @@ class SQLSource(Source):
description = self._get_table_description(schema, table_name, inspector) description = self._get_table_description(schema, table_name, inspector)
table_columns = self._get_columns(schema, table_name, inspector) table_columns = self._get_columns(schema, table_name, inspector)
table_entity = Table(id=uuid.uuid4(),
table = Table(id=uuid.uuid4(),
name=table_name, name=table_name,
tableType='Regular', tableType='Regular',
description=description if description is not None else ' ', description=description if description is not None else ' ',
columns=table_columns) columns=table_columns)
if self.sql_config.generate_sample_data: if self.sql_config.generate_sample_data:
table_data = self.fetch_sample_data(schema, table_name) table_data = self.fetch_sample_data(schema, table_name)
table.sampleData = table_data table_entity.sampleData = table_data
table_and_db = OMetaDatabaseAndTable(table=table, database=self._get_database(schema)) table_and_db = OMetaDatabaseAndTable(table=table_entity, database=self._get_database(schema))
yield table_and_db yield table_and_db
except ValidationError as err: except ValidationError as err:
logger.error(err) logger.error(err)
@ -293,7 +291,11 @@ class SQLSource(Source):
table_columns = [] table_columns = []
row_order = 1 row_order = 1
for column in columns: for column in columns:
col_type = None
try:
col_type = get_column_type(self.status, dataset_name, column['type']) col_type = get_column_type(self.status, dataset_name, column['type'])
except Exception as err:
logger.error(err)
col_constraint = None col_constraint = None
if column['nullable']: if column['nullable']:
col_constraint = ColumnConstraint.NULL col_constraint = ColumnConstraint.NULL
@ -304,7 +306,6 @@ class SQLSource(Source):
col_constraint = ColumnConstraint.PRIMARY_KEY col_constraint = ColumnConstraint.PRIMARY_KEY
elif column['name'] in unique_columns: elif column['name'] in unique_columns:
col_constraint = ColumnConstraint.UNIQUE col_constraint = ColumnConstraint.UNIQUE
table_columns.append(Column(name=column['name'], table_columns.append(Column(name=column['name'],
description=column.get("comment", None), description=column.get("comment", None),
columnDataType=col_type, columnDataType=col_type,
@ -315,10 +316,11 @@ class SQLSource(Source):
return table_columns return table_columns
def _get_table_description(self, schema: str, table: str, inspector: Inspector) -> str: def _get_table_description(self, schema: str, table: str, inspector: Inspector) -> str:
description = None
try: try:
table_info: dict = inspector.get_table_comment(table, schema) table_info: dict = inspector.get_table_comment(table, schema)
except NotImplementedError: except Exception as err:
description: Optional[str] = None logger.error(f"Table Description Error : {err}")
else: else:
description = table_info["text"] description = table_info["text"]
return description return description