feat(ingest/cockroachdb): add cockroachdb ingestion (#10226)

This commit is contained in:
Dotan Mor 2024-04-10 04:36:51 +03:00 committed by GitHub
parent bffefd5735
commit fa0c1b3fa9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 124 additions and 2 deletions

View File

@ -16,6 +16,7 @@ import supersetLogo from '../../../../images/supersetlogo.png';
import athenaLogo from '../../../../images/awsathenalogo.png';
import mssqlLogo from '../../../../images/mssqllogo.png';
import clickhouseLogo from '../../../../images/clickhouselogo.png';
import cockroachdbLogo from '../../../../images/cockroachdblogo.png';
import trinoLogo from '../../../../images/trinologo.png';
import dbtLogo from '../../../../images/dbtlogo.png';
import druidLogo from '../../../../images/druidlogo.png';
@ -44,6 +45,8 @@ export const BIGQUERY_URN = `urn:li:dataPlatform:${BIGQUERY}`;
export const CLICKHOUSE = 'clickhouse';
export const CLICKHOUSE_USAGE = 'clickhouse-usage';
export const CLICKHOUSE_URN = `urn:li:dataPlatform:${CLICKHOUSE}`;
export const COCKROACHDB = 'cockroachdb';
export const COCKROACHDB_URN = `urn:li:dataPlatform:${COCKROACHDB}`;
export const DBT = 'dbt';
export const DBT_URN = `urn:li:dataPlatform:${DBT}`;
export const DRUID = 'druid';
@ -122,6 +125,7 @@ export const PLATFORM_URN_TO_LOGO = {
[AZURE_URN]: azureLogo,
[BIGQUERY_URN]: bigqueryLogo,
[CLICKHOUSE_URN]: clickhouseLogo,
[COCKROACHDB_URN]: cockroachdbLogo,
[DBT_URN]: dbtLogo,
[DRUID_URN]: druidLogo,
[DYNAMODB_URN]: dynamodbLogo,

View File

@ -243,5 +243,12 @@
"displayName": "Qlik Sense",
"docsUrl": "https://datahubproject.io/docs/generated/ingestion/sources/qlik-sense/",
"recipe": "source:\n type: qlik-sense\n config:\n # Coordinates\n tenant_hostname: https://xyz12xz.us.qlikcloud.com\n # Coordinates\n api_key: QLIK_API_KEY\n\n # Optional - filter for certain space names instead of ingesting everything.\n # space_pattern:\n\n # allow:\n # - space_name\n ingest_owner: true"
},
{
"urn": "urn:li:dataPlatform:cockroachdb",
"name": "cockroachdb",
"displayName": "CockroachDb",
"docsUrl": "https://datahubproject.io/docs/generated/ingestion/sources/cockroachdb/",
"recipe": "source: \n type: cockroachdb\n config:\n # Coordinates\n host_port: # Your CockroachDb host and port, e.g. cockroachdb:5432\n database: # Your CockroachDb Database, e.g. sample_db\n\n # Credentials\n # Add secret in Secrets Tab with relevant names for each variable\n username: null # Your CockroachDb username, e.g. admin\n\n # Options\n include_tables: true\n include_views: true\n\n # Profiling\n profiling:\n enabled: true\n profile_table_level_only: true\n stateful_ingestion:\n enabled: true"
}
]

Binary file not shown.

After

Width:  |  Height:  |  Size: 270 KiB

View File

@ -0,0 +1,21 @@
source:
type: cockroachdb
config:
# Coordinates
host_port: localhost:26257
database: DemoDatabase
# Credentials
username: user
password: pass
# Optional: SSL configuration.
# options:
# connect_args:
# sslcert: "<<path to sslcert>>"
# sslkey: "<<path to sslkey>>"
# sslrootcert: "<<path to verification ca chain>>"
# sslmode: "verify-full"
sink:
# sink configs

View File

@ -230,6 +230,11 @@ iceberg_common = {
*pydantic_no_v2,
}
postgres_common = {
"psycopg2-binary",
"GeoAlchemy2",
}
s3_base = {
*aws_common,
"more-itertools>=8.12.0",
@ -311,6 +316,7 @@ plugins: Dict[str, Set[str]] = {
| classification_lib,
"clickhouse": sql_common | clickhouse_common,
"clickhouse-usage": sql_common | usage_common | clickhouse_common,
"cockroachdb": sql_common | postgres_common | {"sqlalchemy-cockroachdb<2.0.0"},
"datahub-lineage-file": set(),
"datahub-business-glossary": set(),
"delta-lake": {*data_lake_profiling, *delta_lake},
@ -365,7 +371,7 @@ plugins: Dict[str, Set[str]] = {
"mariadb": sql_common | {"pymysql>=1.0.2"},
"okta": {"okta~=1.7.0", "nest-asyncio"},
"oracle": sql_common | {"cx_Oracle"},
"postgres": sql_common | {"psycopg2-binary", "GeoAlchemy2"},
"postgres": sql_common | postgres_common,
"presto": sql_common | pyhive_common | trino,
"presto-on-hive": sql_common
| pyhive_common
@ -505,6 +511,7 @@ base_dev_requirements = {
"bigquery",
"clickhouse",
"clickhouse-usage",
"cockroachdb",
"delta-lake",
"druid",
"elasticsearch",
@ -596,6 +603,7 @@ entry_points = {
"bigquery = datahub.ingestion.source.bigquery_v2.bigquery:BigqueryV2Source",
"clickhouse = datahub.ingestion.source.sql.clickhouse:ClickHouseSource",
"clickhouse-usage = datahub.ingestion.source.usage.clickhouse_usage:ClickHouseUsageSource",
"cockroachdb = datahub.ingestion.source.sql.cockroachdb:CockroachDBSource",
"delta-lake = datahub.ingestion.source.delta_lake:DeltaLakeSource",
"s3 = datahub.ingestion.source.s3:S3Source",
"dbt = datahub.ingestion.source.dbt.dbt_core:DBTCoreSource",

View File

@ -0,0 +1,43 @@
from pydantic.fields import Field
from datahub.configuration.common import AllowDenyPattern
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import (
SourceCapability,
SupportStatus,
capability,
config_class,
platform_name,
support_status,
)
from datahub.ingestion.source.sql.postgres import PostgresConfig, PostgresSource
class CockroachDBConfig(PostgresConfig):
scheme = Field(default="cockroachdb+psycopg2", description="database scheme")
schema_pattern = Field(
default=AllowDenyPattern(deny=["information_schema", "crdb_internal"])
)
@platform_name("CockroachDB")
@config_class(CockroachDBConfig)
@support_status(SupportStatus.TESTING)
@capability(SourceCapability.PLATFORM_INSTANCE, "Enabled by default")
@capability(SourceCapability.DOMAINS, "Supported via the `domain` config field")
@capability(SourceCapability.DATA_PROFILING, "Optionally enabled via configuration")
@capability(SourceCapability.DELETION_DETECTION, "Enabled via stateful ingestion")
class CockroachDBSource(PostgresSource):
config: CockroachDBConfig
def __init__(self, config: CockroachDBConfig, ctx: PipelineContext):
super().__init__(config, ctx)
def get_platform(self):
return "cockroachdb"
@classmethod
def create(cls, config_dict, ctx):
config = CockroachDBConfig.parse_obj(config_dict)
return cls(config, ctx)

View File

@ -145,7 +145,10 @@ class PostgresSource(SQLAlchemySource):
config: PostgresConfig
def __init__(self, config: PostgresConfig, ctx: PipelineContext):
super().__init__(config, ctx, "postgres")
super().__init__(config, ctx, self.get_platform())
def get_platform(self):
return "postgres"
@classmethod
def create(cls, config_dict, ctx):

View File

@ -0,0 +1,26 @@
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.source.sql.cockroachdb import (
CockroachDBConfig,
CockroachDBSource,
)
from datahub.ingestion.source.sql.postgres import PostgresConfig, PostgresSource
def _base_config():
return {"username": "user", "password": "password", "host_port": "host:1521"}
def test_platform_correctly_set_cockroachdb():
source = CockroachDBSource(
ctx=PipelineContext(run_id="cockroachdb-source-test"),
config=CockroachDBConfig.parse_obj(_base_config()),
)
assert source.platform == "cockroachdb"
def test_platform_correctly_set_postgres():
source = PostgresSource(
ctx=PipelineContext(run_id="postgres-source-test"),
config=PostgresConfig.parse_obj(_base_config()),
)
assert source.platform == "postgres"

View File

@ -48,6 +48,16 @@
"logoUrl": "/assets/platforms/clickhouselogo.png"
}
},
{
"urn": "urn:li:dataPlatform:cockroachdb",
"aspect": {
"datasetNameDelimiter": ".",
"name": "cockroachdb",
"displayName": "CockroachDb",
"type": "RELATIONAL_DB",
"logoUrl": "/assets/platforms/cockroachdblogo.png"
}
},
{
"urn": "urn:li:dataPlatform:couchbase",
"aspect": {