mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-09-04 06:33:10 +00:00
Pipeline Ingestion refactoring
This commit is contained in:
parent
f543c3f99b
commit
07dd7b36f4
@ -6,7 +6,14 @@
|
|||||||
"host_port": "https://bigquery.googleapis.com",
|
"host_port": "https://bigquery.googleapis.com",
|
||||||
"username": "gcpuser@project_id.iam.gserviceaccount.com",
|
"username": "gcpuser@project_id.iam.gserviceaccount.com",
|
||||||
"service_name": "gcp_bigquery",
|
"service_name": "gcp_bigquery",
|
||||||
"service_type": "BigQuery"
|
"service_type": "BigQuery",
|
||||||
|
"filter_pattern": {
|
||||||
|
"excludes": [
|
||||||
|
"[\\w]*cloudaudit.*",
|
||||||
|
"[\\w]*logging_googleapis_com.*",
|
||||||
|
"[\\w]*clouderrorreporting.*"
|
||||||
|
]
|
||||||
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"processor": {
|
"processor": {
|
||||||
|
@ -45,8 +45,7 @@ class AthenaConfig(SQLConnectionConfig):
|
|||||||
|
|
||||||
return url
|
return url
|
||||||
|
|
||||||
def fetch_sample_data(self, schema: str, table: str, connection):
|
|
||||||
return super().fetch_sample_data(schema, table, connection)
|
|
||||||
|
|
||||||
class AthenaSource(SQLSource):
|
class AthenaSource(SQLSource):
|
||||||
def __init__(self, config, metadata_config, ctx):
|
def __init__(self, config, metadata_config, ctx):
|
||||||
|
@ -32,16 +32,6 @@ class BigQueryConfig(SQLConnectionConfig, SQLSource):
|
|||||||
return f"{self.scheme}://{self.project_id}"
|
return f"{self.scheme}://{self.project_id}"
|
||||||
return f"{self.scheme}://"
|
return f"{self.scheme}://"
|
||||||
|
|
||||||
def fetch_sample_data(self, schema: str, table: str, connection):
|
|
||||||
query = f"select * from {self.project_id}.{schema},{table} limit 50"
|
|
||||||
results = self.connection.execute(query)
|
|
||||||
cols = list(results.keys())
|
|
||||||
rows = []
|
|
||||||
for r in results:
|
|
||||||
row = list(r)
|
|
||||||
rows.append(row)
|
|
||||||
return TableData(columns=cols, rows=rows)
|
|
||||||
|
|
||||||
|
|
||||||
class BigquerySource(SQLSource):
|
class BigquerySource(SQLSource):
|
||||||
def __init__(self, config, metadata_config, ctx):
|
def __init__(self, config, metadata_config, ctx):
|
||||||
@ -52,3 +42,23 @@ class BigquerySource(SQLSource):
|
|||||||
config = BigQueryConfig.parse_obj(config_dict)
|
config = BigQueryConfig.parse_obj(config_dict)
|
||||||
metadata_config = MetadataServerConfig.parse_obj(metadata_config_dict)
|
metadata_config = MetadataServerConfig.parse_obj(metadata_config_dict)
|
||||||
return cls(config, metadata_config, ctx)
|
return cls(config, metadata_config, ctx)
|
||||||
|
|
||||||
|
def fetch_sample_data(self, schema: str, table: str):
|
||||||
|
query = f"select * from {self.config.project_id}.{schema}.{table} limit 50"
|
||||||
|
results = self.connection.execute(query)
|
||||||
|
cols = list(results.keys())
|
||||||
|
rows = []
|
||||||
|
for r in results:
|
||||||
|
row = list(r)
|
||||||
|
rows.append(row)
|
||||||
|
return TableData(columns=cols, rows=rows)
|
||||||
|
|
||||||
|
def standardize_schema_table_names(
|
||||||
|
self, schema: str, table: str
|
||||||
|
) -> Tuple[str, str]:
|
||||||
|
segments = table.split(".")
|
||||||
|
if len(segments) != 2:
|
||||||
|
raise ValueError(f"expected table to contain schema name already {table}")
|
||||||
|
if segments[0] != schema:
|
||||||
|
raise ValueError(f"schema {schema} does not match table {table}")
|
||||||
|
return segments[0], segments[1]
|
||||||
|
@ -31,15 +31,12 @@ register_custom_type(HiveDecimal, "NUMBER")
|
|||||||
|
|
||||||
class HiveConfig(SQLConnectionConfig):
|
class HiveConfig(SQLConnectionConfig):
|
||||||
scheme = "hive"
|
scheme = "hive"
|
||||||
auth_options = Optional[str]
|
auth_options: Optional[str] = None
|
||||||
|
|
||||||
def get_connection_url(self):
|
def get_connection_url(self):
|
||||||
url = super().get_connection_url()
|
url = super().get_connection_url()
|
||||||
return f'{url};{self.auth_options}'
|
return f'{url};{self.auth_options}'
|
||||||
|
|
||||||
def fetch_sample_data(self, schema: str, table: str, connection):
|
|
||||||
return super().fetch_sample_data(schema, table, connection)
|
|
||||||
|
|
||||||
|
|
||||||
class HiveSource(SQLSource):
|
class HiveSource(SQLSource):
|
||||||
def __init__(self, config, metadata_config, ctx):
|
def __init__(self, config, metadata_config, ctx):
|
||||||
|
@ -27,8 +27,7 @@ class MssqlConfig(SQLConnectionConfig):
|
|||||||
def get_connection_url(self):
|
def get_connection_url(self):
|
||||||
return super().get_connection_url()
|
return super().get_connection_url()
|
||||||
|
|
||||||
def fetch_sample_data(self, schema: str, table: str, connection):
|
|
||||||
return super().fetch_sample_data(schema, table, connection)
|
|
||||||
|
|
||||||
|
|
||||||
class MssqlSource(SQLSource):
|
class MssqlSource(SQLSource):
|
||||||
|
@ -24,9 +24,6 @@ class MySQLConfig(SQLConnectionConfig):
|
|||||||
def get_connection_url(self):
|
def get_connection_url(self):
|
||||||
return super().get_connection_url()
|
return super().get_connection_url()
|
||||||
|
|
||||||
def fetch_sample_data(self, schema: str, table: str, connection):
|
|
||||||
return super().fetch_sample_data(schema, table, connection)
|
|
||||||
|
|
||||||
|
|
||||||
class MysqlSource(SQLSource):
|
class MysqlSource(SQLSource):
|
||||||
def __init__(self, config, metadata_config, ctx):
|
def __init__(self, config, metadata_config, ctx):
|
||||||
|
@ -24,8 +24,7 @@ class OracleConfig(SQLConnectionConfig):
|
|||||||
# defaults
|
# defaults
|
||||||
scheme = "oracle+cx_oracle"
|
scheme = "oracle+cx_oracle"
|
||||||
|
|
||||||
def fetch_sample_data(self, schema: str, table: str, connection):
|
|
||||||
return super().fetch_sample_data(schema, table, connection)
|
|
||||||
|
|
||||||
def get_connection_url(self):
|
def get_connection_url(self):
|
||||||
return super().get_connection_url()
|
return super().get_connection_url()
|
||||||
|
@ -50,8 +50,7 @@ class PostgresSourceConfig(SQLConnectionConfig):
|
|||||||
def get_connection_url(self):
|
def get_connection_url(self):
|
||||||
return super().get_connection_url()
|
return super().get_connection_url()
|
||||||
|
|
||||||
def fetch_sample_data(self, schema: str, table: str, connection):
|
|
||||||
return super().fetch_sample_data(schema, table, connection)
|
|
||||||
|
|
||||||
|
|
||||||
def get_table_key(row: Dict[str, Any]) -> Union[TableKey, None]:
|
def get_table_key(row: Dict[str, Any]) -> Union[TableKey, None]:
|
||||||
|
@ -33,8 +33,7 @@ class PrestoConfig(SQLConnectionConfig):
|
|||||||
url += f"?schema={quote_plus(self.database)}"
|
url += f"?schema={quote_plus(self.database)}"
|
||||||
return url
|
return url
|
||||||
|
|
||||||
def fetch_sample_data(self, schema: str, table: str, connection):
|
|
||||||
return super().fetch_sample_data(schema, table, connection)
|
|
||||||
|
|
||||||
|
|
||||||
class PrestoSource(SQLSource):
|
class PrestoSource(SQLSource):
|
||||||
|
@ -37,8 +37,7 @@ class RedshiftConfig(SQLConnectionConfig):
|
|||||||
def get_connection_url(self):
|
def get_connection_url(self):
|
||||||
return super().get_connection_url()
|
return super().get_connection_url()
|
||||||
|
|
||||||
def fetch_sample_data(self, schema: str, table: str, connection):
|
|
||||||
return super().fetch_sample_data(schema, table, connection)
|
|
||||||
|
|
||||||
|
|
||||||
class RedshiftSource(SQLSource):
|
class RedshiftSource(SQLSource):
|
||||||
|
@ -49,8 +49,7 @@ class SnowflakeConfig(SQLConnectionConfig):
|
|||||||
connect_string = f"{connect_string}?{params}"
|
connect_string = f"{connect_string}?{params}"
|
||||||
return connect_string
|
return connect_string
|
||||||
|
|
||||||
def fetch_sample_data(self, schema: str, table: str, connection):
|
|
||||||
return super().fetch_sample_data(schema, table, connection)
|
|
||||||
|
|
||||||
|
|
||||||
class SnowflakeSource(SQLSource):
|
class SnowflakeSource(SQLSource):
|
||||||
|
@ -55,7 +55,7 @@ class SQLSourceStatus(SourceStatus):
|
|||||||
|
|
||||||
def filtered(self, table_name: str, err: str, dataset_name: str = None, col_type: str = None) -> None:
|
def filtered(self, table_name: str, err: str, dataset_name: str = None, col_type: str = None) -> None:
|
||||||
self.warnings.append(table_name)
|
self.warnings.append(table_name)
|
||||||
logger.warning("Dropped Table {} due to {}".format(dataset_name, err))
|
logger.warning("Dropped Table {} due to {}".format(table_name, err))
|
||||||
|
|
||||||
|
|
||||||
class SQLConnectionConfig(ConfigModel):
|
class SQLConnectionConfig(ConfigModel):
|
||||||
@ -72,21 +72,6 @@ class SQLConnectionConfig(ConfigModel):
|
|||||||
generate_sample_data: Optional[bool] = True
|
generate_sample_data: Optional[bool] = True
|
||||||
filter_pattern: IncludeFilterPattern = IncludeFilterPattern.allow_all()
|
filter_pattern: IncludeFilterPattern = IncludeFilterPattern.allow_all()
|
||||||
|
|
||||||
@abstractmethod
|
|
||||||
def fetch_sample_data(self, schema: str, table: str, connection):
|
|
||||||
try:
|
|
||||||
query = f"select * from {schema}.{table} limit 50"
|
|
||||||
logger.info("Fetching sample data, this may take a while {}".format(query))
|
|
||||||
results = connection.execute(query)
|
|
||||||
cols = list(results.keys())
|
|
||||||
rows = []
|
|
||||||
for r in results:
|
|
||||||
row = list(r)
|
|
||||||
rows.append(row)
|
|
||||||
return TableData(columns=cols, rows=rows)
|
|
||||||
except Exception as err:
|
|
||||||
logger.error("Failed to generate sample data for {} - {}".format(table, err))
|
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def get_connection_url(self):
|
def get_connection_url(self):
|
||||||
url = f"{self.scheme}://"
|
url = f"{self.scheme}://"
|
||||||
@ -185,6 +170,26 @@ class SQLSource(Source):
|
|||||||
def create(cls, config_dict: dict, metadata_config_dict: dict, ctx: WorkflowContext):
|
def create(cls, config_dict: dict, metadata_config_dict: dict, ctx: WorkflowContext):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
def standardize_schema_table_names(
|
||||||
|
self, schema: str, table: str
|
||||||
|
) -> Tuple[str, str]:
|
||||||
|
print("IN SQL SOURCE")
|
||||||
|
return schema, table
|
||||||
|
|
||||||
|
def fetch_sample_data(self, schema: str, table: str):
|
||||||
|
try:
|
||||||
|
query = f"select * from {schema}.{table} limit 50"
|
||||||
|
logger.info("Fetching sample data, this may take a while {}".format(query))
|
||||||
|
results = self.connection.execute(query)
|
||||||
|
cols = list(results.keys())
|
||||||
|
rows = []
|
||||||
|
for r in results:
|
||||||
|
row = list(r)
|
||||||
|
rows.append(row)
|
||||||
|
return TableData(columns=cols, rows=rows)
|
||||||
|
except Exception as err:
|
||||||
|
logger.error("Failed to generate sample data for {} - {}".format(table, err))
|
||||||
|
|
||||||
def next_record(self) -> Iterable[OMetaDatabaseAndTable]:
|
def next_record(self) -> Iterable[OMetaDatabaseAndTable]:
|
||||||
inspector = inspect(self.engine)
|
inspector = inspect(self.engine)
|
||||||
for schema in inspector.get_schema_names():
|
for schema in inspector.get_schema_names():
|
||||||
@ -202,6 +207,7 @@ class SQLSource(Source):
|
|||||||
schema: str) -> Iterable[OMetaDatabaseAndTable]:
|
schema: str) -> Iterable[OMetaDatabaseAndTable]:
|
||||||
for table_name in inspector.get_table_names(schema):
|
for table_name in inspector.get_table_names(schema):
|
||||||
try:
|
try:
|
||||||
|
schema, table_name = self.standardize_schema_table_names(schema, table_name)
|
||||||
if not self.sql_config.filter_pattern.included(table_name):
|
if not self.sql_config.filter_pattern.included(table_name):
|
||||||
self.status.filtered('{}.{}'.format(self.config.get_service_name(), table_name),
|
self.status.filtered('{}.{}'.format(self.config.get_service_name(), table_name),
|
||||||
"Table pattern not allowed")
|
"Table pattern not allowed")
|
||||||
@ -211,17 +217,16 @@ class SQLSource(Source):
|
|||||||
description = self._get_table_description(schema, table_name, inspector)
|
description = self._get_table_description(schema, table_name, inspector)
|
||||||
|
|
||||||
table_columns = self._get_columns(schema, table_name, inspector)
|
table_columns = self._get_columns(schema, table_name, inspector)
|
||||||
|
table_entity = Table(id=uuid.uuid4(),
|
||||||
table = Table(id=uuid.uuid4(),
|
|
||||||
name=table_name,
|
name=table_name,
|
||||||
tableType='Regular',
|
tableType='Regular',
|
||||||
description=description if description is not None else ' ',
|
description=description if description is not None else ' ',
|
||||||
columns=table_columns)
|
columns=table_columns)
|
||||||
if self.sql_config.generate_sample_data:
|
if self.sql_config.generate_sample_data:
|
||||||
table_data = self.sql_config.fetch_sample_data(schema, table_name, self.connection)
|
table_data = self.fetch_sample_data(schema, table_name)
|
||||||
table.sampleData = table_data
|
table_entity.sampleData = table_data
|
||||||
|
|
||||||
table_and_db = OMetaDatabaseAndTable(table=table, database=self._get_database(schema))
|
table_and_db = OMetaDatabaseAndTable(table=table_entity, database=self._get_database(schema))
|
||||||
yield table_and_db
|
yield table_and_db
|
||||||
except ValidationError as err:
|
except ValidationError as err:
|
||||||
logger.error(err)
|
logger.error(err)
|
||||||
@ -253,7 +258,7 @@ class SQLSource(Source):
|
|||||||
columns=table_columns,
|
columns=table_columns,
|
||||||
viewDefinition=view_definition)
|
viewDefinition=view_definition)
|
||||||
if self.sql_config.generate_sample_data:
|
if self.sql_config.generate_sample_data:
|
||||||
table_data = self.sql_config.fetch_sample_data(schema, view_name, self.connection)
|
table_data = self.fetch_sample_data(schema, view_name)
|
||||||
table.sampleData = table_data
|
table.sampleData = table_data
|
||||||
|
|
||||||
table_and_db = OMetaDatabaseAndTable(table=table, database=self._get_database(schema))
|
table_and_db = OMetaDatabaseAndTable(table=table, database=self._get_database(schema))
|
||||||
@ -301,7 +306,6 @@ class SQLSource(Source):
|
|||||||
col_constraint = ColumnConstraint.PRIMARY_KEY
|
col_constraint = ColumnConstraint.PRIMARY_KEY
|
||||||
elif column['name'] in unique_columns:
|
elif column['name'] in unique_columns:
|
||||||
col_constraint = ColumnConstraint.UNIQUE
|
col_constraint = ColumnConstraint.UNIQUE
|
||||||
|
|
||||||
table_columns.append(Column(name=column['name'],
|
table_columns.append(Column(name=column['name'],
|
||||||
description=column.get("comment", None),
|
description=column.get("comment", None),
|
||||||
columnDataType=col_type,
|
columnDataType=col_type,
|
||||||
|
Loading…
x
Reference in New Issue
Block a user