From b91d0cf63b8b37f62bd38182f3c886bbb5902db5 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Mon, 15 Feb 2021 14:39:59 -0800 Subject: [PATCH] Add bigquery and refactor others --- metadata-ingestion/CHANGELOG | 3 +++ metadata-ingestion/README.md | 24 ++++++++++++++++--- metadata-ingestion/setup.py | 5 ++-- .../src/gometa/ingestion/source/__init__.py | 7 +++--- .../src/gometa/ingestion/source/bigquery.py | 21 ++++++++++++++++ .../src/gometa/ingestion/source/hive.py | 4 ++-- .../src/gometa/ingestion/source/mssql.py | 4 ++-- .../src/gometa/ingestion/source/mysql.py | 4 ++-- .../src/gometa/ingestion/source/postgres.py | 4 ++-- .../src/gometa/ingestion/source/snowflake.py | 4 ++-- .../src/gometa/ingestion/source/sql_common.py | 14 ++++++++--- 11 files changed, 73 insertions(+), 21 deletions(-) create mode 100644 metadata-ingestion/CHANGELOG create mode 100644 metadata-ingestion/src/gometa/ingestion/source/bigquery.py diff --git a/metadata-ingestion/CHANGELOG b/metadata-ingestion/CHANGELOG new file mode 100644 index 0000000000..17299958f0 --- /dev/null +++ b/metadata-ingestion/CHANGELOG @@ -0,0 +1,3 @@ +0.0.1 +----- +* Modernizing python scripts and creating first package diff --git a/metadata-ingestion/README.md b/metadata-ingestion/README.md index a619f5f32c..f7b3ae5d4f 100644 --- a/metadata-ingestion/README.md +++ b/metadata-ingestion/README.md @@ -160,7 +160,7 @@ Extracts: - List of databases, schema, and tables - Column types associated with each table -Extra requirements: `pip install psycopg2-binary` +Extra requirements: `pip install psycopg2-binary` or `pip install psycopg2` ```yml source: @@ -190,6 +190,23 @@ source: # table_pattern is same as above ``` +## Google BigQuery `bigquery` +Extracts: +- List of databases, schema, and tables +- Column types associated with each table + +Extra requirements: `pip install pybigquery` + +```yml +source: + type: snowflake + config: + project_id: project + options: + credential_path: "/path/to/keyfile.json" + # table_pattern is same as above +``` + ## File `file` Pulls metadata from a previously generated file. Note that the file sink can produce such files, and a number of samples are included in the @@ -265,9 +282,10 @@ pytest tests/integration ## Sanity check code before checkin ```sh -flake8 src tests -mypy -p gometa +# Requries test_requirements.txt to have been installed. black --exclude 'gometa/metadata' -S -t py36 src tests isort src tests +flake8 src tests +mypy -p gometa pytest ``` diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index b93f81e2c9..d5d1373546 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -71,12 +71,13 @@ setuptools.setup( "toml>=0.10.0", "pydantic>=1.5.1", "requests>=2.25.1", - "confluent_kafka[avro]>=1.5.0", "avro_gen @ https://api.github.com/repos/hsheth2/avro_gen/tarball/master", # Note: we currently require both Avro libraries. The codegen uses avro-python3 # schema parsers at runtime for generating and reading JSON into Python objects. # At the same time, we use Kafka's AvroSerializer, which internally relies on - # fastavro for serialization. + # fastavro for serialization. We do not use confluent_kafka[avro], since it + # is incompatible with its own dep on avro-python3. + "confluent_kafka>=1.5.0", "fastavro>=1.3.0", "avro-python3>=1.8.2", "sqlalchemy>=1.3.23", # Required for SQL sources diff --git a/metadata-ingestion/src/gometa/ingestion/source/__init__.py b/metadata-ingestion/src/gometa/ingestion/source/__init__.py index d49edb5b3f..311026e892 100644 --- a/metadata-ingestion/src/gometa/ingestion/source/__init__.py +++ b/metadata-ingestion/src/gometa/ingestion/source/__init__.py @@ -2,13 +2,13 @@ from typing import Dict, Type from gometa.ingestion.api.source import Source -from .kafka import KafkaSource - # from .ldap import LDAPSource +from .bigquery import BigQuerySource +from .hive import HiveSource +from .kafka import KafkaSource from .mce_file import MetadataFileSource from .mssql import SQLServerSource from .mysql import MySQLSource -from .hive import HiveSource from .postgres import PostgresSource from .snowflake import SnowflakeSource @@ -18,6 +18,7 @@ source_class_mapping: Dict[str, Type[Source]] = { "hive": HiveSource, "postgres": PostgresSource, "snowflake": SnowflakeSource, + "bigquery": BigQuerySource, "kafka": KafkaSource, # "ldap": LDAPSource, "file": MetadataFileSource, diff --git a/metadata-ingestion/src/gometa/ingestion/source/bigquery.py b/metadata-ingestion/src/gometa/ingestion/source/bigquery.py new file mode 100644 index 0000000000..a65b05d80f --- /dev/null +++ b/metadata-ingestion/src/gometa/ingestion/source/bigquery.py @@ -0,0 +1,21 @@ +from typing import Optional + +from .sql_common import SQLAlchemyConfig, SQLAlchemySource + + +class BigQueryConfig(SQLAlchemyConfig): + scheme = "bigquery" + project_id: Optional[str] + + def get_sql_alchemy_url(self): + return f"{self.scheme}://{self.project_id}" + + +class BigQuerySource(SQLAlchemySource): + def __init__(self, config, ctx): + super().__init__(config, ctx, "bigquery") + + @classmethod + def create(cls, config_dict, ctx): + config = BigQueryConfig.parse_obj(config_dict) + return cls(config, ctx) diff --git a/metadata-ingestion/src/gometa/ingestion/source/hive.py b/metadata-ingestion/src/gometa/ingestion/source/hive.py index 4bf76d9948..16e2b4d646 100644 --- a/metadata-ingestion/src/gometa/ingestion/source/hive.py +++ b/metadata-ingestion/src/gometa/ingestion/source/hive.py @@ -1,7 +1,7 @@ -from .sql_common import SQLAlchemyConfig, SQLAlchemySource +from .sql_common import BasicSQLAlchemyConfig, SQLAlchemySource -class HiveConfig(SQLAlchemyConfig): +class HiveConfig(BasicSQLAlchemyConfig): # defaults scheme = "hive" diff --git a/metadata-ingestion/src/gometa/ingestion/source/mssql.py b/metadata-ingestion/src/gometa/ingestion/source/mssql.py index fc7dd92a69..05ca149e74 100644 --- a/metadata-ingestion/src/gometa/ingestion/source/mssql.py +++ b/metadata-ingestion/src/gometa/ingestion/source/mssql.py @@ -1,7 +1,7 @@ -from .sql_common import SQLAlchemyConfig, SQLAlchemySource +from .sql_common import BasicSQLAlchemyConfig, SQLAlchemySource -class SQLServerConfig(SQLAlchemyConfig): +class SQLServerConfig(BasicSQLAlchemyConfig): # defaults host_port = "localhost:1433" scheme = "mssql+pytds" diff --git a/metadata-ingestion/src/gometa/ingestion/source/mysql.py b/metadata-ingestion/src/gometa/ingestion/source/mysql.py index 00ffe9dca7..23951d2614 100644 --- a/metadata-ingestion/src/gometa/ingestion/source/mysql.py +++ b/metadata-ingestion/src/gometa/ingestion/source/mysql.py @@ -1,7 +1,7 @@ -from .sql_common import SQLAlchemyConfig, SQLAlchemySource +from .sql_common import BasicSQLAlchemyConfig, SQLAlchemySource -class MySQLConfig(SQLAlchemyConfig): +class MySQLConfig(BasicSQLAlchemyConfig): # defaults host_port = "localhost:3306" scheme = "mysql+pymysql" diff --git a/metadata-ingestion/src/gometa/ingestion/source/postgres.py b/metadata-ingestion/src/gometa/ingestion/source/postgres.py index 6e838762bf..aaaeb12df0 100644 --- a/metadata-ingestion/src/gometa/ingestion/source/postgres.py +++ b/metadata-ingestion/src/gometa/ingestion/source/postgres.py @@ -1,7 +1,7 @@ -from .sql_common import SQLAlchemyConfig, SQLAlchemySource +from .sql_common import BasicSQLAlchemyConfig, SQLAlchemySource -class PostgresConfig(SQLAlchemyConfig): +class PostgresConfig(BasicSQLAlchemyConfig): # defaults scheme = "postgresql+psycopg2" diff --git a/metadata-ingestion/src/gometa/ingestion/source/snowflake.py b/metadata-ingestion/src/gometa/ingestion/source/snowflake.py index 4b91563ac7..4aac53bde1 100644 --- a/metadata-ingestion/src/gometa/ingestion/source/snowflake.py +++ b/metadata-ingestion/src/gometa/ingestion/source/snowflake.py @@ -1,7 +1,7 @@ -from .sql_common import SQLAlchemyConfig, SQLAlchemySource +from .sql_common import BasicSQLAlchemyConfig, SQLAlchemySource -class SnowflakeConfig(SQLAlchemyConfig): +class SnowflakeConfig(BasicSQLAlchemyConfig): # defaults scheme = "snowflake" diff --git a/metadata-ingestion/src/gometa/ingestion/source/sql_common.py b/metadata-ingestion/src/gometa/ingestion/source/sql_common.py index 068d2f07b6..cc4da47aec 100644 --- a/metadata-ingestion/src/gometa/ingestion/source/sql_common.py +++ b/metadata-ingestion/src/gometa/ingestion/source/sql_common.py @@ -1,5 +1,6 @@ import logging import time +from abc import abstractmethod from dataclasses import dataclass, field from typing import Any, Dict, List, Optional @@ -48,17 +49,23 @@ class SQLSourceReport(SourceReport): class SQLAlchemyConfig(BaseModel): + options: Optional[dict] = {} + table_pattern: AllowDenyPattern = AllowDenyPattern.allow_all() + + @abstractmethod + def get_sql_alchemy_url(self): + pass + + +class BasicSQLAlchemyConfig(SQLAlchemyConfig): username: str password: str host_port: str database: str = "" scheme: str - options: Optional[dict] = {} - table_pattern: AllowDenyPattern = AllowDenyPattern.allow_all() def get_sql_alchemy_url(self): url = f"{self.scheme}://{self.username}:{self.password}@{self.host_port}/{self.database}" - logger.debug("sql_alchemy_url={url}") return url @@ -145,6 +152,7 @@ class SQLAlchemySource(Source): sql_config = self.config platform = self.platform url = sql_config.get_sql_alchemy_url() + logger.debug(f"sql_alchemy_url={url}") engine = create_engine(url, **sql_config.options) inspector = reflection.Inspector.from_engine(engine) database = sql_config.database