feat(ingest/cassandra): Add support for Cassandra as a source (#11822)

This commit is contained in:
sagar-salvi-apptware 2024-11-15 20:41:21 +05:30 committed by GitHub
parent becda8fd34
commit fd2da83ff4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
23 changed files with 6327 additions and 96 deletions

View File

@ -36,6 +36,7 @@ import csvLogo from '../../../../images/csv-logo.png';
import qlikLogo from '../../../../images/qliklogo.png';
import sigmaLogo from '../../../../images/sigmalogo.png';
import sacLogo from '../../../../images/saclogo.svg';
import cassandraLogo from '../../../../images/cassandralogo.png';
import datahubLogo from '../../../../images/datahublogo.png';
export const ATHENA = 'athena';
@ -129,6 +130,8 @@ export const SIGMA = 'sigma';
export const SIGMA_URN = `urn:li:dataPlatform:${SIGMA}`;
export const SAC = 'sac';
export const SAC_URN = `urn:li:dataPlatform:${SAC}`;
export const CASSANDRA = 'cassandra';
export const CASSANDRA_URN = `urn:li:dataPlatform:${CASSANDRA}`;
export const DATAHUB = 'datahub';
export const DATAHUB_GC = 'datahub-gc';
export const DATAHUB_LINEAGE_FILE = 'datahub-lineage-file';
@ -175,6 +178,7 @@ export const PLATFORM_URN_TO_LOGO = {
[QLIK_SENSE_URN]: qlikLogo,
[SIGMA_URN]: sigmaLogo,
[SAC_URN]: sacLogo,
[CASSANDRA_URN]: cassandraLogo,
[DATAHUB_URN]: datahubLogo,
};

View File

@ -310,5 +310,12 @@
"description": "Import Spaces, Sources, Tables and statistics from Dremio.",
"docsUrl": "https://datahubproject.io/docs/metadata-ingestion/",
"recipe": "source:\n type: dremio\n config:\n # Coordinates\n hostname: null\n port: null\n #true if https, otherwise false\n tls: true\n\n #For cloud instance\n #is_dremio_cloud: True\n #dremio_cloud_project_id: <project_id>\n\n #Credentials with personal access token\n authentication_method: PAT\n password: pass\n\n #Or Credentials with basic auth\n #authentication_method: password\n #username: null\n #password: null\n\n stateful_ingestion:\n enabled: true"
},
{
"urn": "urn:li:dataPlatform:cassandra",
"name": "cassandra",
"displayName": "CassandraDB",
"docsUrl": "https://datahubproject.io/docs/generated/ingestion/sources/cassandra",
"recipe": "source:\n type: cassandra\n config:\n # Credentials for on prem cassandra\n contact_point: localhost\n port: 9042\n username: admin\n password: password\n\n # Or\n # Credentials Astra Cloud\n #cloud_config:\n # secure_connect_bundle: Path to Secure Connect Bundle (.zip)\n # token: Application Token\n\n # Optional Allow / Deny extraction of particular keyspaces.\n keyspace_pattern:\n allow: [.*]\n\n # Optional Allow / Deny extraction of particular tables.\n table_pattern:\n allow: [.*]"
}
]

Binary file not shown.

After

Width:  |  Height:  |  Size: 124 KiB

View File

@ -0,0 +1,40 @@
### Setup
This integration pulls metadata directly from Cassandra databases, including both **DataStax Astra DB** and **Cassandra Enterprise Edition (EE)**.
Youll need to have a Cassandra instance or an Astra DB setup with appropriate access permissions.
#### Steps to Get the Required Information
1. **Set Up User Credentials**:
- **For Astra DB**:
- Log in to your Astra DB Console.
- Navigate to **Organization Settings** > **Token Management**.
- Generate an **Application Token** with the required permissions for read access.
- Download the **Secure Connect Bundle** from the Astra DB Console.
- **For Cassandra EE**:
- Ensure you have a **username** and **password** with read access to the necessary keyspaces.
2. **Permissions**:
- The user or token must have `SELECT` permissions that allow it to:
- Access metadata in system keyspaces (e.g., `system_schema`) to retrieve information about keyspaces, tables, columns, and views.
- Perform `SELECT` operations on the data tables if data profiling is enabled.
3. **Verify Database Access**:
- For Astra DB: Ensure the **Secure Connect Bundle** is used and configured correctly.
- For Cassandra Opensource: Ensure the **contact point** and **port** are accessible.
:::caution
When enabling profiling, make sure to set a limit on the number of rows to sample. Profiling large tables without a limit may lead to excessive resource consumption and slow performance.
:::
:::note
For cloud configuration with Astra DB, it is necessary to specify the Secure Connect Bundle path in the configuration. For that reason, use the CLI to ingest metadata into DataHub.
:::

View File

@ -0,0 +1,30 @@
source:
type: "cassandra"
config:
# Credentials for on prem cassandra
contact_point: "localhost"
port: 9042
username: "admin"
password: "password"
# Or
# Credentials Astra Cloud
#cloud_config:
# secure_connect_bundle: "Path to Secure Connect Bundle (.zip)"
# token: "Application Token"
# Optional Allow / Deny extraction of particular keyspaces.
keyspace_pattern:
allow: [".*"]
# Optional Allow / Deny extraction of particular tables.
table_pattern:
allow: [".*"]
# Optional
profiling:
enabled: true
profile_table_level_only: true
sink:
# config sinks

View File

@ -404,6 +404,13 @@ plugins: Dict[str, Set[str]] = {
# https://www.elastic.co/guide/en/elasticsearch/client/python-api/current/release-notes.html#rn-7-14-0
# https://github.com/elastic/elasticsearch-py/issues/1639#issuecomment-883587433
"elasticsearch": {"elasticsearch==7.13.4"},
"cassandra": {
"cassandra-driver>=3.28.0",
# We were seeing an error like this `numpy.dtype size changed, may indicate binary incompatibility. Expected 96 from C header, got 88 from PyObject`
# with numpy 2.0. This likely indicates a mismatch between scikit-learn and numpy versions.
# https://stackoverflow.com/questions/40845304/runtimewarning-numpy-dtype-size-changed-may-indicate-binary-incompatibility
"numpy<2",
},
"feast": {
"feast>=0.34.0,<1",
"flask-openid>=1.3.0",
@ -660,6 +667,7 @@ base_dev_requirements = {
"qlik-sense",
"sigma",
"sac",
"cassandra",
]
if plugin
for dependency in plugins[plugin]
@ -778,6 +786,7 @@ entry_points = {
"qlik-sense = datahub.ingestion.source.qlik_sense.qlik_sense:QlikSenseSource",
"sigma = datahub.ingestion.source.sigma.sigma:SigmaSource",
"sac = datahub.ingestion.source.sac.sac:SACSource",
"cassandra = datahub.ingestion.source.cassandra.cassandra:CassandraSource",
],
"datahub.ingestion.transformer.plugins": [
"pattern_cleanup_ownership = datahub.ingestion.transformer.pattern_cleanup_ownership:PatternCleanUpOwnership",

View File

@ -0,0 +1,476 @@
import dataclasses
import json
import logging
from typing import Any, Dict, Iterable, List, Optional
from datahub.emitter.mce_builder import (
make_data_platform_urn,
make_dataplatform_instance_urn,
make_dataset_urn_with_platform_instance,
make_schema_field_urn,
)
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.mcp_builder import (
ContainerKey,
add_dataset_to_container,
gen_containers,
)
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import (
SourceCapability,
SupportStatus,
capability,
config_class,
platform_name,
support_status,
)
from datahub.ingestion.api.source import MetadataWorkUnitProcessor
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.cassandra.cassandra_api import (
CassandraAPI,
CassandraColumn,
CassandraEntities,
CassandraKeyspace,
CassandraTable,
CassandraView,
)
from datahub.ingestion.source.cassandra.cassandra_config import CassandraSourceConfig
from datahub.ingestion.source.cassandra.cassandra_profiling import CassandraProfiler
from datahub.ingestion.source.cassandra.cassandra_utils import (
SYSTEM_KEYSPACE_LIST,
CassandraSourceReport,
CassandraToSchemaFieldConverter,
)
from datahub.ingestion.source.common.subtypes import (
DatasetContainerSubTypes,
DatasetSubTypes,
)
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityRemovalHandler,
)
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionSourceBase,
)
from datahub.metadata.com.linkedin.pegasus2avro.common import StatusClass
from datahub.metadata.com.linkedin.pegasus2avro.schema import (
SchemaField,
SchemaMetadata,
)
from datahub.metadata.schema_classes import (
DataPlatformInstanceClass,
DatasetLineageTypeClass,
DatasetPropertiesClass,
FineGrainedLineageClass,
FineGrainedLineageDownstreamTypeClass,
FineGrainedLineageUpstreamTypeClass,
OtherSchemaClass,
SubTypesClass,
UpstreamClass,
UpstreamLineageClass,
ViewPropertiesClass,
)
logger = logging.getLogger(__name__)
PLATFORM_NAME_IN_DATAHUB = "cassandra"
class KeyspaceKey(ContainerKey):
keyspace: str
@platform_name("Cassandra")
@config_class(CassandraSourceConfig)
@support_status(SupportStatus.INCUBATING)
@capability(SourceCapability.CONTAINERS, "Enabled by default")
@capability(SourceCapability.SCHEMA_METADATA, "Enabled by default")
@capability(SourceCapability.PLATFORM_INSTANCE, "Enabled by default")
@capability(
SourceCapability.DELETION_DETECTION,
"Optionally enabled via `stateful_ingestion.remove_stale_metadata`",
supported=True,
)
class CassandraSource(StatefulIngestionSourceBase):
"""
This plugin extracts the following:
- Metadata for tables
- Column types associated with each table column
- The keyspace each table belongs to
"""
config: CassandraSourceConfig
report: CassandraSourceReport
platform: str
def __init__(self, ctx: PipelineContext, config: CassandraSourceConfig):
super().__init__(config, ctx)
self.ctx = ctx
self.platform = PLATFORM_NAME_IN_DATAHUB
self.config = config
self.report = CassandraSourceReport()
self.cassandra_api = CassandraAPI(config, self.report)
self.cassandra_data = CassandraEntities()
# For profiling
self.profiler = CassandraProfiler(config, self.report, self.cassandra_api)
@classmethod
def create(cls, config_dict, ctx):
config = CassandraSourceConfig.parse_obj(config_dict)
return cls(ctx, config)
def get_platform(self) -> str:
return PLATFORM_NAME_IN_DATAHUB
def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
return [
*super().get_workunit_processors(),
StaleEntityRemovalHandler.create(
self, self.config, self.ctx
).workunit_processor,
]
def get_workunits_internal(
self,
) -> Iterable[MetadataWorkUnit]:
if not self.cassandra_api.authenticate():
return
keyspaces: List[CassandraKeyspace] = self.cassandra_api.get_keyspaces()
for keyspace in keyspaces:
keyspace_name: str = keyspace.keyspace_name
if keyspace_name in SYSTEM_KEYSPACE_LIST:
continue
if not self.config.keyspace_pattern.allowed(keyspace_name):
self.report.report_dropped(keyspace_name)
continue
yield from self._generate_keyspace_container(keyspace)
try:
yield from self._extract_tables_from_keyspace(keyspace_name)
except Exception as e:
self.report.num_tables_failed += 1
self.report.failure(
message="Failed to extract table metadata for keyspace",
context=keyspace_name,
exc=e,
)
try:
yield from self._extract_views_from_keyspace(keyspace_name)
except Exception as e:
self.report.num_views_failed += 1
self.report.failure(
message="Failed to extract view metadata for keyspace ",
context=keyspace_name,
exc=e,
)
# Profiling
if self.config.is_profiling_enabled():
yield from self.profiler.get_workunits(self.cassandra_data)
def _generate_keyspace_container(
self, keyspace: CassandraKeyspace
) -> Iterable[MetadataWorkUnit]:
keyspace_container_key = self._generate_keyspace_container_key(
keyspace.keyspace_name
)
yield from gen_containers(
container_key=keyspace_container_key,
name=keyspace.keyspace_name,
qualified_name=keyspace.keyspace_name,
extra_properties={
"durable_writes": str(keyspace.durable_writes),
"replication": json.dumps(keyspace.replication),
},
sub_types=[DatasetContainerSubTypes.KEYSPACE],
)
def _generate_keyspace_container_key(self, keyspace_name: str) -> ContainerKey:
return KeyspaceKey(
keyspace=keyspace_name,
platform=self.platform,
instance=self.config.platform_instance,
env=self.config.env,
)
# get all tables for a given keyspace, iterate over them to extract column metadata
def _extract_tables_from_keyspace(
self, keyspace_name: str
) -> Iterable[MetadataWorkUnit]:
self.cassandra_data.keyspaces.append(keyspace_name)
tables: List[CassandraTable] = self.cassandra_api.get_tables(keyspace_name)
for table in tables:
# define the dataset urn for this table to be used downstream
table_name: str = table.table_name
dataset_name: str = f"{keyspace_name}.{table_name}"
if not self.config.table_pattern.allowed(dataset_name):
self.report.report_dropped(dataset_name)
continue
self.cassandra_data.tables.setdefault(keyspace_name, []).append(table_name)
self.report.report_entity_scanned(dataset_name, ent_type="Table")
dataset_urn = make_dataset_urn_with_platform_instance(
platform=self.platform,
name=dataset_name,
env=self.config.env,
platform_instance=self.config.platform_instance,
)
# 1. Extract columns from table, then construct and emit the schemaMetadata aspect.
try:
yield from self._extract_columns_from_table(
keyspace_name, table_name, dataset_urn
)
except Exception as e:
self.report.failure(
message="Failed to extract columns from table",
context=table_name,
exc=e,
)
yield MetadataChangeProposalWrapper(
entityUrn=dataset_urn,
aspect=StatusClass(removed=False),
).as_workunit()
yield MetadataChangeProposalWrapper(
entityUrn=dataset_urn,
aspect=SubTypesClass(
typeNames=[
DatasetSubTypes.TABLE,
]
),
).as_workunit()
yield MetadataChangeProposalWrapper(
entityUrn=dataset_urn,
aspect=DatasetPropertiesClass(
name=table_name,
qualifiedName=f"{keyspace_name}.{table_name}",
description=table.comment,
customProperties={
"bloom_filter_fp_chance": str(table.bloom_filter_fp_chance),
"caching": json.dumps(table.caching),
"compaction": json.dumps(table.compaction),
"compression": json.dumps(table.compression),
"crc_check_chance": str(table.crc_check_chance),
"dclocal_read_repair_chance": str(
table.dclocal_read_repair_chance
),
"default_time_to_live": str(table.default_time_to_live),
"extensions": json.dumps(table.extensions),
"gc_grace_seconds": str(table.gc_grace_seconds),
"max_index_interval": str(table.max_index_interval),
"min_index_interval": str(table.min_index_interval),
"memtable_flush_period_in_ms": str(
table.memtable_flush_period_in_ms
),
"read_repair_chance": str(table.read_repair_chance),
"speculative_retry": str(table.speculative_retry),
},
),
).as_workunit()
yield from add_dataset_to_container(
container_key=self._generate_keyspace_container_key(keyspace_name),
dataset_urn=dataset_urn,
)
if self.config.platform_instance:
yield MetadataChangeProposalWrapper(
entityUrn=dataset_urn,
aspect=DataPlatformInstanceClass(
platform=make_data_platform_urn(self.platform),
instance=make_dataplatform_instance_urn(
self.platform, self.config.platform_instance
),
),
).as_workunit()
# get all columns for a given table, iterate over them to extract column metadata
def _extract_columns_from_table(
self, keyspace_name: str, table_name: str, dataset_urn: str
) -> Iterable[MetadataWorkUnit]:
column_infos: List[CassandraColumn] = self.cassandra_api.get_columns(
keyspace_name, table_name
)
schema_fields: List[SchemaField] = list(
CassandraToSchemaFieldConverter.get_schema_fields(column_infos)
)
if not schema_fields:
self.report.report_warning(
message="Table has no columns, skipping", context=table_name
)
return
jsonable_column_infos: List[Dict[str, Any]] = []
for column in column_infos:
self.cassandra_data.columns.setdefault(table_name, []).append(column)
jsonable_column_infos.append(dataclasses.asdict(column))
schema_metadata: SchemaMetadata = SchemaMetadata(
schemaName=table_name,
platform=make_data_platform_urn(self.platform),
version=0,
hash="",
platformSchema=OtherSchemaClass(
rawSchema=json.dumps(jsonable_column_infos)
),
fields=schema_fields,
)
yield MetadataChangeProposalWrapper(
entityUrn=dataset_urn,
aspect=schema_metadata,
).as_workunit()
def _extract_views_from_keyspace(
self, keyspace_name: str
) -> Iterable[MetadataWorkUnit]:
views: List[CassandraView] = self.cassandra_api.get_views(keyspace_name)
for view in views:
view_name: str = view.view_name
dataset_name: str = f"{keyspace_name}.{view_name}"
self.report.report_entity_scanned(dataset_name)
dataset_urn: str = make_dataset_urn_with_platform_instance(
platform=self.platform,
name=dataset_name,
env=self.config.env,
platform_instance=self.config.platform_instance,
)
yield MetadataChangeProposalWrapper(
entityUrn=dataset_urn,
aspect=StatusClass(removed=False),
).as_workunit()
yield MetadataChangeProposalWrapper(
entityUrn=dataset_urn,
aspect=SubTypesClass(
typeNames=[
DatasetSubTypes.VIEW,
]
),
).as_workunit()
yield MetadataChangeProposalWrapper(
entityUrn=dataset_urn,
aspect=ViewPropertiesClass(
materialized=True,
viewLogic=view.where_clause, # Use the WHERE clause as view logic
viewLanguage="CQL", # Use "CQL" as the language
),
).as_workunit()
yield MetadataChangeProposalWrapper(
entityUrn=dataset_urn,
aspect=DatasetPropertiesClass(
name=view_name,
qualifiedName=f"{keyspace_name}.{view_name}",
description=view.comment,
customProperties={
"bloom_filter_fp_chance": str(view.bloom_filter_fp_chance),
"caching": json.dumps(view.caching),
"compaction": json.dumps(view.compaction),
"compression": json.dumps(view.compression),
"crc_check_chance": str(view.crc_check_chance),
"include_all_columns": str(view.include_all_columns),
"dclocal_read_repair_chance": str(
view.dclocal_read_repair_chance
),
"default_time_to_live": str(view.default_time_to_live),
"extensions": json.dumps(view.extensions),
"gc_grace_seconds": str(view.gc_grace_seconds),
"max_index_interval": str(view.max_index_interval),
"min_index_interval": str(view.min_index_interval),
"memtable_flush_period_in_ms": str(
view.memtable_flush_period_in_ms
),
"read_repair_chance": str(view.read_repair_chance),
"speculative_retry": str(view.speculative_retry),
},
),
).as_workunit()
try:
yield from self._extract_columns_from_table(
keyspace_name, view_name, dataset_urn
)
except Exception as e:
self.report.failure(
message="Failed to extract columns from views",
context=view_name,
exc=e,
)
# Construct and emit lineage off of 'base_table_name'
# NOTE: we don't need to use 'base_table_id' since table is always in same keyspace, see https://docs.datastax.com/en/cql-oss/3.3/cql/cql_reference/cqlCreateMaterializedView.html#cqlCreateMaterializedView__keyspace-name
upstream_urn: str = make_dataset_urn_with_platform_instance(
platform=self.platform,
name=f"{keyspace_name}.{view.table_name}",
env=self.config.env,
platform_instance=self.config.platform_instance,
)
fineGrainedLineages = self.get_upstream_fields_of_field_in_datasource(
view_name, dataset_urn, upstream_urn
)
yield MetadataChangeProposalWrapper(
entityUrn=dataset_urn,
aspect=UpstreamLineageClass(
upstreams=[
UpstreamClass(
dataset=upstream_urn,
type=DatasetLineageTypeClass.VIEW,
)
],
fineGrainedLineages=fineGrainedLineages,
),
).as_workunit()
yield from add_dataset_to_container(
container_key=self._generate_keyspace_container_key(keyspace_name),
dataset_urn=dataset_urn,
)
if self.config.platform_instance:
yield MetadataChangeProposalWrapper(
entityUrn=dataset_urn,
aspect=DataPlatformInstanceClass(
platform=make_data_platform_urn(self.platform),
instance=make_dataplatform_instance_urn(
self.platform, self.config.platform_instance
),
),
).as_workunit()
def get_upstream_fields_of_field_in_datasource(
self, table_name: str, dataset_urn: str, upstream_urn: str
) -> List[FineGrainedLineageClass]:
column_infos = self.cassandra_data.columns.get(table_name, [])
# Collect column-level lineage
fine_grained_lineages = []
for column_info in column_infos:
source_column = column_info.column_name
if source_column:
fine_grained_lineages.append(
FineGrainedLineageClass(
upstreamType=FineGrainedLineageUpstreamTypeClass.FIELD_SET,
downstreamType=FineGrainedLineageDownstreamTypeClass.FIELD,
downstreams=[make_schema_field_urn(dataset_urn, source_column)],
upstreams=[make_schema_field_urn(upstream_urn, source_column)],
)
)
return fine_grained_lineages
def get_report(self):
return self.report
def close(self):
self.cassandra_api.close()
super().close()

View File

@ -0,0 +1,325 @@
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional
from cassandra import DriverException, OperationTimedOut
from cassandra.auth import PlainTextAuthProvider
from cassandra.cluster import (
EXEC_PROFILE_DEFAULT,
Cluster,
ExecutionProfile,
ProtocolVersion,
Session,
)
from datahub.ingestion.api.source import SourceReport
from datahub.ingestion.source.cassandra.cassandra_config import CassandraSourceConfig
@dataclass
class CassandraKeyspace:
keyspace_name: str
durable_writes: bool
replication: Dict
@dataclass
class CassandraTable:
keyspace_name: str
table_name: str
bloom_filter_fp_chance: Optional[float]
caching: Optional[Dict[str, str]]
comment: Optional[str]
compaction: Optional[Dict[str, Any]]
compression: Optional[Dict[str, Any]]
crc_check_chance: Optional[float]
dclocal_read_repair_chance: Optional[float]
default_time_to_live: Optional[int]
extensions: Optional[Dict[str, Any]]
gc_grace_seconds: Optional[int]
max_index_interval: Optional[int]
memtable_flush_period_in_ms: Optional[int]
min_index_interval: Optional[int]
read_repair_chance: Optional[float]
speculative_retry: Optional[str]
@dataclass
class CassandraColumn:
keyspace_name: str
table_name: str
column_name: str
type: str
clustering_order: Optional[str]
kind: Optional[str]
position: Optional[int]
@dataclass
class CassandraView(CassandraTable):
view_name: str
include_all_columns: Optional[bool]
where_clause: str = ""
@dataclass
class CassandraEntities:
keyspaces: List[str] = field(default_factory=list)
tables: Dict[str, List[str]] = field(
default_factory=dict
) # Maps keyspace -> tables
columns: Dict[str, List[CassandraColumn]] = field(
default_factory=dict
) # Maps tables -> columns
# - Referencing system_schema: https://docs.datastax.com/en/cql-oss/3.x/cql/cql_using/useQuerySystem.html#Table3.ColumnsinSystem_SchemaTables-Cassandra3.0 - #
# this keyspace contains details about the cassandra cluster's keyspaces, tables, and columns
class CassandraQueries:
# get all keyspaces
GET_KEYSPACES_QUERY = "SELECT * FROM system_schema.keyspaces"
# get all tables for a keyspace
GET_TABLES_QUERY = "SELECT * FROM system_schema.tables WHERE keyspace_name = %s"
# get all columns for a table
GET_COLUMNS_QUERY = "SELECT * FROM system_schema.columns WHERE keyspace_name = %s AND table_name = %s"
# get all views for a keyspace
GET_VIEWS_QUERY = "SELECT * FROM system_schema.views WHERE keyspace_name = %s"
# Row Count
ROW_COUNT = 'SELECT COUNT(*) AS row_count FROM {}."{}"'
# Column Count
COLUMN_COUNT = "SELECT COUNT(*) AS column_count FROM system_schema.columns WHERE keyspace_name = '{}' AND table_name = '{}'"
class CassandraAPI:
def __init__(self, config: CassandraSourceConfig, report: SourceReport):
self.config = config
self.report = report
self._cassandra_session: Optional[Session] = None
def authenticate(self) -> bool:
"""Establish a connection to Cassandra and return the session."""
try:
if self.config.cloud_config:
cloud_config = self.config.cloud_config
cluster_cloud_config = {
"connect_timeout": cloud_config.connect_timeout,
"use_default_tempdir": True,
"secure_connect_bundle": cloud_config.secure_connect_bundle,
}
profile = ExecutionProfile(request_timeout=cloud_config.request_timeout)
auth_provider = PlainTextAuthProvider(
"token",
cloud_config.token,
)
cluster = Cluster(
cloud=cluster_cloud_config,
auth_provider=auth_provider,
execution_profiles={EXEC_PROFILE_DEFAULT: profile},
protocol_version=ProtocolVersion.V4,
)
self._cassandra_session = cluster.connect()
return True
if self.config.username and self.config.password:
auth_provider = PlainTextAuthProvider(
username=self.config.username, password=self.config.password
)
cluster = Cluster(
[self.config.contact_point],
port=self.config.port,
auth_provider=auth_provider,
load_balancing_policy=None,
)
else:
cluster = Cluster(
[self.config.contact_point],
port=self.config.port,
load_balancing_policy=None,
)
self._cassandra_session = cluster.connect()
return True
except OperationTimedOut as e:
self.report.failure(
message="Failed to Authenticate", context=f"{str(e.errors)}", exc=e
)
return False
except DriverException as e:
self.report.failure(message="Failed to Authenticate", exc=e)
return False
except Exception as e:
self.report.failure(message="Failed to authenticate to Cassandra", exc=e)
return False
def get(self, query: str, parameters: Optional[List] = []) -> List:
if not self._cassandra_session:
return []
resp = self._cassandra_session.execute(query, parameters)
return resp
def get_keyspaces(self) -> List[CassandraKeyspace]:
"""Fetch all keyspaces."""
try:
keyspaces = self.get(CassandraQueries.GET_KEYSPACES_QUERY)
keyspace_list = [
CassandraKeyspace(
keyspace_name=row.keyspace_name,
durable_writes=row.durable_writes,
replication=dict(row.replication),
)
for row in keyspaces
]
return keyspace_list
except DriverException as e:
self.report.warning(
message="Failed to fetch keyspaces", context=f"{str(e)}", exc=e
)
return []
except Exception as e:
self.report.warning(message="Failed to fetch keyspaces", exc=e)
return []
def get_tables(self, keyspace_name: str) -> List[CassandraTable]:
"""Fetch all tables for a given keyspace."""
try:
tables = self.get(CassandraQueries.GET_TABLES_QUERY, [keyspace_name])
table_list = [
CassandraTable(
keyspace_name=row.keyspace_name,
table_name=row.table_name,
bloom_filter_fp_chance=row.bloom_filter_fp_chance,
caching=dict(row.caching),
comment=row.comment,
compaction=dict(row.compaction),
compression=dict(row.compression),
crc_check_chance=row.crc_check_chance,
dclocal_read_repair_chance=row.dclocal_read_repair_chance,
default_time_to_live=row.default_time_to_live,
extensions=dict(row.extensions),
gc_grace_seconds=row.gc_grace_seconds,
max_index_interval=row.max_index_interval,
memtable_flush_period_in_ms=row.memtable_flush_period_in_ms,
min_index_interval=row.min_index_interval,
read_repair_chance=row.read_repair_chance,
speculative_retry=row.speculative_retry,
)
for row in tables
]
return table_list
except DriverException as e:
self.report.warning(
message="Failed to fetch tables for keyspace",
context=f"{str(e)}",
exc=e,
)
return []
except Exception as e:
self.report.warning(
message="Failed to fetch tables for keyspace",
context=f"{keyspace_name}",
exc=e,
)
return []
def get_columns(self, keyspace_name: str, table_name: str) -> List[CassandraColumn]:
"""Fetch all columns for a given table."""
try:
column_infos = self.get(
CassandraQueries.GET_COLUMNS_QUERY, [keyspace_name, table_name]
)
column_list = [
CassandraColumn(
keyspace_name=row.keyspace_name,
table_name=row.table_name,
column_name=row.column_name,
clustering_order=row.clustering_order,
kind=row.kind,
position=row.position,
type=row.type,
)
for row in column_infos
]
return column_list
except DriverException as e:
self.report.warning(
message="Failed to fetch columns for table", context=f"{str(e)}", exc=e
)
return []
except Exception as e:
self.report.warning(
message="Failed to fetch columns for table",
context=f"{keyspace_name}.{table_name}",
exc=e,
)
return []
def get_views(self, keyspace_name: str) -> List[CassandraView]:
"""Fetch all views for a given keyspace."""
try:
views = self.get(CassandraQueries.GET_VIEWS_QUERY, [keyspace_name])
view_list = [
CassandraView(
table_name=row.base_table_name,
keyspace_name=row.keyspace_name,
view_name=row.view_name,
bloom_filter_fp_chance=row.bloom_filter_fp_chance,
caching=dict(row.caching),
comment=row.comment,
compaction=dict(row.compaction),
compression=dict(row.compression),
crc_check_chance=row.crc_check_chance,
dclocal_read_repair_chance=row.dclocal_read_repair_chance,
default_time_to_live=row.default_time_to_live,
extensions=dict(row.extensions),
gc_grace_seconds=row.gc_grace_seconds,
include_all_columns=row.include_all_columns,
max_index_interval=row.max_index_interval,
memtable_flush_period_in_ms=row.memtable_flush_period_in_ms,
min_index_interval=row.min_index_interval,
read_repair_chance=row.read_repair_chance,
speculative_retry=row.speculative_retry,
where_clause=row.where_clause,
)
for row in views
]
return view_list
except DriverException as e:
self.report.warning(
message="Failed to fetch views for keyspace", context=f"{str(e)}", exc=e
)
return []
except Exception as e:
self.report.warning(
message="Failed to fetch views for keyspace",
context=f"{keyspace_name}",
exc=e,
)
return []
def execute(self, query: str, limit: Optional[int] = None) -> List:
"""Fetch stats for cassandra"""
try:
if not self._cassandra_session:
return []
if limit:
query = query + f" LIMIT {limit}"
result_set = self._cassandra_session.execute(query).all()
return result_set
except DriverException as e:
self.report.warning(
message="Failed to fetch stats for keyspace", context=str(e), exc=e
)
return []
except Exception:
self.report.warning(
message="Failed to fetch stats for keyspace",
context=f"{query}",
)
return []
def close(self):
"""Close the Cassandra session."""
if self._cassandra_session:
self._cassandra_session.shutdown()

View File

@ -0,0 +1,111 @@
from typing import Optional
from pydantic import Field
from datahub.configuration.common import AllowDenyPattern, ConfigModel
from datahub.configuration.source_common import (
EnvConfigMixin,
PlatformInstanceConfigMixin,
)
from datahub.ingestion.source.ge_profiling_config import GEProfilingBaseConfig
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StatefulStaleMetadataRemovalConfig,
)
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionConfigBase,
)
from datahub.ingestion.source_config.operation_config import is_profiling_enabled
# - Referencing https://docs.datastax.com/en/cql-oss/3.x/cql/cql_using/useQuerySystem.html#Table3.ColumnsinSystem_SchemaTables-Cassandra3.0 - #
# this keyspace contains details about the cassandra cluster's keyspaces, tables, and columns
SYSTEM_SCHEMA_KEYSPACE_NAME = "system_schema"
# Reference:
# https://docs.datastax.com/en/astra-db-serverless/databases/python-driver.html
# https://docs.datastax.com/en/astra-db-serverless/databases/python-driver.html#production-configuration
class CassandraCloudConfig(ConfigModel):
"""
Configuration for connecting to DataStax Astra DB in the cloud.
"""
token: str = Field(
description="The Astra DB application token used for authentication.",
)
secure_connect_bundle: str = Field(
description="File path to the Secure Connect Bundle (.zip) used for a secure connection to DataStax Astra DB.",
)
connect_timeout: int = Field(
default=600,
description="Timeout in seconds for establishing new connections to Cassandra.",
)
request_timeout: int = Field(
default=600, description="Timeout in seconds for individual Cassandra requests."
)
class CassandraSourceConfig(
PlatformInstanceConfigMixin, StatefulIngestionConfigBase, EnvConfigMixin
):
"""
Configuration for connecting to a Cassandra or DataStax Astra DB source.
"""
contact_point: str = Field(
default="localhost",
description="Domain or IP address of the Cassandra instance (excluding port).",
)
port: int = Field(
default=9042, description="Port number to connect to the Cassandra instance."
)
username: Optional[str] = Field(
default=None,
description=f"Username credential with read access to the {SYSTEM_SCHEMA_KEYSPACE_NAME} keyspace.",
)
password: Optional[str] = Field(
default=None,
description="Password credential associated with the specified username.",
)
cloud_config: Optional[CassandraCloudConfig] = Field(
default=None,
description="Configuration for cloud-based Cassandra, such as DataStax Astra DB.",
)
keyspace_pattern: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
description="Regex patterns to filter keyspaces for ingestion.",
)
table_pattern: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
description="Regex patterns to filter keyspaces.tables for ingestion.",
)
stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = Field(
default=None,
description="Configuration for stateful ingestion and stale metadata removal.",
)
# Profiling
profile_pattern: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
description="Regex patterns for tables to profile",
)
profiling: GEProfilingBaseConfig = Field(
default=GEProfilingBaseConfig(),
description="Configuration for profiling",
)
def is_profiling_enabled(self) -> bool:
return self.profiling.enabled and is_profiling_enabled(
self.profiling.operation_config
)

View File

@ -0,0 +1,296 @@
import logging
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass, field
from typing import Any, Dict, Iterable, List, Optional
import numpy as np
from cassandra.util import OrderedMapSerializedKey, SortedSet
from datahub.emitter.mce_builder import make_dataset_urn_with_platform_instance
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.cassandra.cassandra_api import (
CassandraAPI,
CassandraColumn,
CassandraEntities,
CassandraQueries,
)
from datahub.ingestion.source.cassandra.cassandra_config import CassandraSourceConfig
from datahub.ingestion.source.cassandra.cassandra_utils import CassandraSourceReport
from datahub.ingestion.source_report.ingestion_stage import PROFILING
from datahub.metadata.schema_classes import (
DatasetFieldProfileClass,
DatasetProfileClass,
QuantileClass,
)
logger = logging.getLogger(__name__)
@dataclass
class ColumnMetric:
col_type: str = ""
values: List[Any] = field(default_factory=list)
null_count: int = 0
total_count: int = 0
distinct_count: Optional[int] = None
min: Optional[Any] = None
max: Optional[Any] = None
mean: Optional[float] = None
stdev: Optional[float] = None
median: Optional[float] = None
quantiles: Optional[List[float]] = None
sample_values: Optional[Any] = None
@dataclass
class ProfileData:
row_count: Optional[int] = None
column_count: Optional[int] = None
column_metrics: Dict[str, ColumnMetric] = field(default_factory=dict)
class CassandraProfiler:
config: CassandraSourceConfig
report: CassandraSourceReport
def __init__(
self,
config: CassandraSourceConfig,
report: CassandraSourceReport,
api: CassandraAPI,
) -> None:
self.api = api
self.config = config
self.report = report
def get_workunits(
self, cassandra_data: CassandraEntities
) -> Iterable[MetadataWorkUnit]:
for keyspace_name in cassandra_data.keyspaces:
tables = cassandra_data.tables.get(keyspace_name, [])
self.report.set_ingestion_stage(keyspace_name, PROFILING)
with ThreadPoolExecutor(
max_workers=self.config.profiling.max_workers
) as executor:
future_to_dataset = {
executor.submit(
self.generate_profile,
keyspace_name,
table_name,
cassandra_data.columns.get(table_name, []),
): table_name
for table_name in tables
}
for future in as_completed(future_to_dataset):
table_name = future_to_dataset[future]
try:
yield from future.result()
except Exception as exc:
self.report.profiling_skipped_other[table_name] += 1
self.report.failure(
message="Failed to profile for table",
context=f"{keyspace_name}.{table_name}",
exc=exc,
)
def generate_profile(
self,
keyspace_name: str,
table_name: str,
columns: List[CassandraColumn],
) -> Iterable[MetadataWorkUnit]:
dataset_name: str = f"{keyspace_name}.{table_name}"
dataset_urn = make_dataset_urn_with_platform_instance(
platform="cassandra",
name=dataset_name,
env=self.config.env,
platform_instance=self.config.platform_instance,
)
if not columns:
self.report.warning(
message="Skipping profiling as no columns found for table",
context=f"{keyspace_name}.{table_name}",
)
self.report.profiling_skipped_other[table_name] += 1
return
if not self.config.profile_pattern.allowed(f"{keyspace_name}.{table_name}"):
self.report.profiling_skipped_table_profile_pattern[keyspace_name] += 1
logger.info(
f"Table {table_name} in {keyspace_name}, not allowed for profiling"
)
return
try:
profile_data = self.profile_table(keyspace_name, table_name, columns)
except Exception as e:
self.report.warning(
message="Profiling Failed",
context=f"{keyspace_name}.{table_name}",
exc=e,
)
return
profile_aspect = self.populate_profile_aspect(profile_data)
if profile_aspect:
self.report.report_entity_profiled(table_name)
mcp = MetadataChangeProposalWrapper(
entityUrn=dataset_urn, aspect=profile_aspect
)
yield mcp.as_workunit()
def populate_profile_aspect(self, profile_data: ProfileData) -> DatasetProfileClass:
field_profiles = [
self._create_field_profile(column_name, column_metrics)
for column_name, column_metrics in profile_data.column_metrics.items()
]
return DatasetProfileClass(
timestampMillis=round(time.time() * 1000),
rowCount=profile_data.row_count,
columnCount=profile_data.column_count,
fieldProfiles=field_profiles,
)
def _create_field_profile(
self, field_name: str, field_stats: ColumnMetric
) -> DatasetFieldProfileClass:
quantiles = field_stats.quantiles
return DatasetFieldProfileClass(
fieldPath=field_name,
uniqueCount=field_stats.distinct_count,
nullCount=field_stats.null_count,
min=str(field_stats.min) if field_stats.min else None,
max=str(field_stats.max) if field_stats.max else None,
mean=str(field_stats.mean) if field_stats.mean else None,
median=str(field_stats.median) if field_stats.median else None,
stdev=str(field_stats.stdev) if field_stats.stdev else None,
quantiles=[
QuantileClass(quantile=str(0.25), value=str(quantiles[0])),
QuantileClass(quantile=str(0.75), value=str(quantiles[1])),
]
if quantiles
else None,
sampleValues=field_stats.sample_values
if field_stats.sample_values
else None,
)
def profile_table(
self, keyspace_name: str, table_name: str, columns: List[CassandraColumn]
) -> ProfileData:
profile_data = ProfileData()
resp = self.api.execute(
CassandraQueries.ROW_COUNT.format(keyspace_name, table_name)
)
if resp:
profile_data.row_count = resp[0].row_count
profile_data.column_count = len(columns)
if not self.config.profiling.profile_table_level_only:
resp = self.api.execute(
f'SELECT {", ".join([col.column_name for col in columns])} FROM {keyspace_name}."{table_name}"'
)
profile_data.column_metrics = self._collect_column_data(resp, columns)
return self._parse_profile_results(profile_data)
def _parse_profile_results(self, profile_data: ProfileData) -> ProfileData:
for cl_name, column_metrics in profile_data.column_metrics.items():
if column_metrics.values:
try:
self._compute_field_statistics(column_metrics)
except Exception as e:
self.report.warning(
message="Profiling Failed For Column Stats",
context=cl_name,
exc=e,
)
raise e
return profile_data
def _collect_column_data(
self, rows: List[Any], columns: List[CassandraColumn]
) -> Dict[str, ColumnMetric]:
metrics = {column.column_name: ColumnMetric() for column in columns}
for row in rows:
for column in columns:
if self._is_skippable_type(column.type):
continue
value: Any = getattr(row, column.column_name, None)
metric = metrics[column.column_name]
metric.col_type = column.type
metric.total_count += 1
if value is None:
metric.null_count += 1
else:
metric.values.extend(self._parse_value(value))
return metrics
def _is_skippable_type(self, data_type: str) -> bool:
return data_type.lower() in ["timeuuid", "blob", "frozen<tuple<tinyint, text>>"]
def _parse_value(self, value: Any) -> List[Any]:
if isinstance(value, SortedSet):
return list(value)
elif isinstance(value, OrderedMapSerializedKey):
return list(dict(value).values())
elif isinstance(value, list):
return value
return [value]
def _compute_field_statistics(self, column_metrics: ColumnMetric) -> None:
values = column_metrics.values
if not values:
return
# ByDefault Null count is added
if not self.config.profiling.include_field_null_count:
column_metrics.null_count = 0
if self.config.profiling.include_field_distinct_count:
column_metrics.distinct_count = len(set(values))
if self.config.profiling.include_field_min_value:
column_metrics.min = min(values)
if self.config.profiling.include_field_max_value:
column_metrics.max = max(values)
if values and self._is_numeric_type(column_metrics.col_type):
if self.config.profiling.include_field_mean_value:
column_metrics.mean = round(float(np.mean(values)), 2)
if self.config.profiling.include_field_stddev_value:
column_metrics.stdev = round(float(np.std(values)), 2)
if self.config.profiling.include_field_median_value:
column_metrics.median = round(float(np.median(values)), 2)
if self.config.profiling.include_field_quantiles:
column_metrics.quantiles = [
float(np.percentile(values, 25)),
float(np.percentile(values, 75)),
]
if values and self.config.profiling.include_field_sample_values:
column_metrics.sample_values = [str(v) for v in values[:5]]
def _is_numeric_type(self, data_type: str) -> bool:
return data_type.lower() in [
"int",
"counter",
"bigint",
"float",
"double",
"decimal",
"smallint",
"tinyint",
"varint",
]

View File

@ -0,0 +1,152 @@
import logging
from dataclasses import dataclass, field
from typing import Dict, Generator, List, Optional, Type
from datahub.ingestion.source.cassandra.cassandra_api import CassandraColumn
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityRemovalSourceReport,
)
from datahub.ingestion.source_report.ingestion_stage import IngestionStageReport
from datahub.metadata.com.linkedin.pegasus2avro.schema import (
SchemaField,
SchemaFieldDataType,
)
from datahub.metadata.schema_classes import (
ArrayTypeClass,
BooleanTypeClass,
BytesTypeClass,
DateTypeClass,
NullTypeClass,
NumberTypeClass,
RecordTypeClass,
StringTypeClass,
TimeTypeClass,
)
from datahub.utilities.lossy_collections import LossyList
from datahub.utilities.stats_collections import TopKDict, int_top_k_dict
logger = logging.getLogger(__name__)
# we always skip over ingesting metadata about these keyspaces
SYSTEM_KEYSPACE_LIST = set(
["system", "system_auth", "system_schema", "system_distributed", "system_traces"]
)
@dataclass
class CassandraSourceReport(StaleEntityRemovalSourceReport, IngestionStageReport):
num_tables_failed: int = 0
num_views_failed: int = 0
tables_scanned: int = 0
views_scanned: int = 0
entities_profiled: int = 0
filtered: LossyList[str] = field(default_factory=LossyList)
def report_entity_scanned(self, name: str, ent_type: str = "View") -> None:
"""
Entity could be a view or a table
"""
if ent_type == "Table":
self.tables_scanned += 1
elif ent_type == "View":
self.views_scanned += 1
else:
raise KeyError(f"Unknown entity {ent_type}.")
def set_ingestion_stage(self, keyspace: str, stage: str) -> None:
self.report_ingestion_stage_start(f"{keyspace}: {stage}")
# TODO Need to create seperate common config for profiling report
profiling_skipped_other: TopKDict[str, int] = field(default_factory=int_top_k_dict)
profiling_skipped_table_profile_pattern: TopKDict[str, int] = field(
default_factory=int_top_k_dict
)
def report_entity_profiled(self, name: str) -> None:
self.entities_profiled += 1
def report_dropped(self, ent_name: str) -> None:
self.filtered.append(ent_name)
# This class helps convert cassandra column types to SchemaFieldDataType for use by the datahaub metadata schema
class CassandraToSchemaFieldConverter:
# Mapping from cassandra field types to SchemaFieldDataType.
# https://cassandra.apache.org/doc/stable/cassandra/cql/types.html (version 4.1)
_field_type_to_schema_field_type: Dict[str, Type] = {
# Bool
"boolean": BooleanTypeClass,
# Binary
"blob": BytesTypeClass,
# Numbers
"bigint": NumberTypeClass,
"counter": NumberTypeClass,
"decimal": NumberTypeClass,
"double": NumberTypeClass,
"float": NumberTypeClass,
"int": NumberTypeClass,
"smallint": NumberTypeClass,
"tinyint": NumberTypeClass,
"varint": NumberTypeClass,
# Dates
"date": DateTypeClass,
# Times
"duration": TimeTypeClass,
"time": TimeTypeClass,
"timestamp": TimeTypeClass,
# Strings
"text": StringTypeClass,
"ascii": StringTypeClass,
"inet": StringTypeClass,
"timeuuid": StringTypeClass,
"uuid": StringTypeClass,
"varchar": StringTypeClass,
# Records
"geo_point": RecordTypeClass,
# Arrays
"histogram": ArrayTypeClass,
}
@staticmethod
def get_column_type(cassandra_column_type: str) -> SchemaFieldDataType:
type_class: Optional[
Type
] = CassandraToSchemaFieldConverter._field_type_to_schema_field_type.get(
cassandra_column_type
)
if type_class is None:
logger.warning(
f"Cannot map {cassandra_column_type!r} to SchemaFieldDataType, using NullTypeClass."
)
type_class = NullTypeClass
return SchemaFieldDataType(type=type_class())
def _get_schema_fields(
self, cassandra_column_infos: List[CassandraColumn]
) -> Generator[SchemaField, None, None]:
# append each schema field (sort so output is consistent)
for column_info in cassandra_column_infos:
column_name: str = column_info.column_name
cassandra_type: str = column_info.type
schema_field_data_type: SchemaFieldDataType = self.get_column_type(
cassandra_type
)
schema_field: SchemaField = SchemaField(
fieldPath=column_name,
nativeDataType=cassandra_type,
type=schema_field_data_type,
description=None,
nullable=True,
recursive=False,
)
yield schema_field
@classmethod
def get_schema_fields(
cls, cassandra_column_infos: List[CassandraColumn]
) -> Generator[SchemaField, None, None]:
converter = cls()
yield from converter._get_schema_fields(cassandra_column_infos)

View File

@ -40,6 +40,7 @@ class DatasetContainerSubTypes(StrEnum):
S3_BUCKET = "S3 bucket"
GCS_BUCKET = "GCS bucket"
ABS_CONTAINER = "ABS container"
KEYSPACE = "Keyspace" # Cassandra
class BIContainerSubTypes(StrEnum):

View File

@ -1,9 +1,7 @@
import datetime
import os
from typing import List, Literal, Optional
import certifi
import pydantic
from pydantic import Field, validator
from datahub.configuration.common import AllowDenyPattern, ConfigModel
@ -11,7 +9,7 @@ from datahub.configuration.source_common import (
EnvConfigMixin,
PlatformInstanceConfigMixin,
)
from datahub.ingestion.source.ge_profiling_config import GEProfilingConfig
from datahub.ingestion.source.ge_profiling_config import GEProfilingBaseConfig
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StatefulStaleMetadataRemovalConfig,
)
@ -97,79 +95,15 @@ class DremioConnectionConfig(ConfigModel):
return value
class ProfileConfig(GEProfilingConfig):
class ProfileConfig(GEProfilingBaseConfig):
query_timeout: int = Field(
default=300, description="Time before cancelling Dremio profiling query"
)
row_count: bool = True
column_count: bool = True
sample_values: bool = True
# Below Configs inherited from GEProfilingConfig
# but not used in Dremio so we hide them from docs.
include_field_median_value: bool = Field(
default=False,
hidden_from_docs=True,
description="Median causes a number of issues in Dremio.",
)
partition_profiling_enabled: bool = Field(default=True, hidden_from_docs=True)
profile_table_row_count_estimate_only: bool = Field(
default=False, hidden_from_docs=True
)
query_combiner_enabled: bool = Field(default=True, hidden_from_docs=True)
max_number_of_fields_to_profile: Optional[pydantic.PositiveInt] = Field(
default=None, hidden_from_docs=True
)
profile_if_updated_since_days: Optional[pydantic.PositiveFloat] = Field(
default=None, hidden_from_docs=True
)
profile_table_size_limit: Optional[int] = Field(
default=5,
description="Profile tables only if their size is less then specified GBs. If set to `null`, no limit on the size of tables to profile. Supported only in `snowflake` and `BigQuery`",
hidden_from_docs=True,
)
profile_table_row_limit: Optional[int] = Field(
default=5000000,
hidden_from_docs=True,
description="Profile tables only if their row count is less then specified count. If set to `null`, no limit on the row count of tables to profile. Supported only in `snowflake` and `BigQuery`",
)
partition_datetime: Optional[datetime.datetime] = Field(
default=None,
hidden_from_docs=True,
description="If specified, profile only the partition which matches this datetime. "
"If not specified, profile the latest partition. Only Bigquery supports this.",
)
use_sampling: bool = Field(
default=True,
hidden_from_docs=True,
description="Whether to profile column level stats on sample of table. Only BigQuery and Snowflake support this. "
"If enabled, profiling is done on rows sampled from table. Sampling is not done for smaller tables. ",
)
sample_size: int = Field(
default=10000,
hidden_from_docs=True,
description="Number of rows to be sampled from table for column level profiling."
"Applicable only if `use_sampling` is set to True.",
)
profile_external_tables: bool = Field(
default=False,
hidden_from_docs=True,
description="Whether to profile external tables. Only Snowflake and Redshift supports this.",
)
tags_to_ignore_sampling: Optional[List[str]] = pydantic.Field(
default=None,
hidden_from_docs=True,
description=(
"Fixed list of tags to ignore sampling."
" If not specified, tables will be sampled based on `use_sampling`."
),
)
class DremioSourceMapping(EnvConfigMixin, PlatformInstanceConfigMixin, ConfigModel):

View File

@ -149,11 +149,8 @@ class DremioProfiler:
) -> str:
metrics = []
if self.config.profiling.row_count:
metrics.append("COUNT(*) AS row_count")
if self.config.profiling.column_count:
metrics.append(f"{len(columns)} AS column_count")
metrics.append("COUNT(*) AS row_count")
metrics.append(f"{len(columns)} AS column_count")
if not self.config.profiling.profile_table_level_only:
for column_name, data_type in columns:
@ -239,11 +236,9 @@ class DremioProfiler:
profile: Dict[str, Any] = {"column_stats": {}}
result = results[0] if results else {} # We expect only one row of results
if self.config.profiling.row_count:
profile["row_count"] = int(result.get("row_count", 0))
profile["row_count"] = int(result.get("row_count", 0))
if self.config.profiling.column_count:
profile["column_count"] = int(result.get("column_count", 0))
profile["column_count"] = int(result.get("column_count", 0))
for column_name, data_type in columns:
safe_column_name = re.sub(r"\W|^(?=\d)", "_", column_name)

View File

@ -19,7 +19,7 @@ _PROFILING_FLAGS_TO_REPORT = {
logger = logging.getLogger(__name__)
class GEProfilingConfig(ConfigModel):
class GEProfilingBaseConfig(ConfigModel):
enabled: bool = Field(
default=False, description="Whether profiling should be done."
)
@ -35,15 +35,6 @@ class GEProfilingConfig(ConfigModel):
default=None,
description="Offset in documents to profile. By default, uses no offset.",
)
report_dropped_profiles: bool = Field(
default=False,
description="Whether to report datasets or dataset columns which were not profiled. Set to `True` for debugging purposes.",
)
turn_off_expensive_profiling_metrics: bool = Field(
default=False,
description="Whether to turn off expensive profiling or not. This turns off profiling for quantiles, distinct_value_frequencies, histogram & sample_values. This also limits maximum number of fields being profiled to 10.",
)
profile_table_level_only: bool = Field(
default=False,
description="Whether to perform profiling at table-level only, or include column-level profiling as well.",
@ -92,6 +83,29 @@ class GEProfilingConfig(ConfigModel):
default=True,
description="Whether to profile for the sample values for all columns.",
)
# The default of (5 * cpu_count) is adopted from the default max_workers
# parameter of ThreadPoolExecutor. Given that profiling is often an I/O-bound
# task, it may make sense to increase this default value in the future.
# https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor
max_workers: int = Field(
default=5 * (os.cpu_count() or 4),
description="Number of worker threads to use for profiling. Set to 1 to disable.",
)
class GEProfilingConfig(GEProfilingBaseConfig):
report_dropped_profiles: bool = Field(
default=False,
description="Whether to report datasets or dataset columns which were not profiled. Set to `True` for debugging purposes.",
)
turn_off_expensive_profiling_metrics: bool = Field(
default=False,
description="Whether to turn off expensive profiling or not. This turns off profiling for quantiles, distinct_value_frequencies, histogram & sample_values. This also limits maximum number of fields being profiled to 10.",
)
field_sample_values_limit: int = Field(
default=20,
description="Upper limit for number of sample values to collect for all columns.",
@ -126,15 +140,6 @@ class GEProfilingConfig(ConfigModel):
"less accurate. Only supported for Postgres and MySQL. ",
)
# The default of (5 * cpu_count) is adopted from the default max_workers
# parameter of ThreadPoolExecutor. Given that profiling is often an I/O-bound
# task, it may make sense to increase this default value in the future.
# https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor
max_workers: int = Field(
default=5 * (os.cpu_count() or 4),
description="Number of worker threads to use for profiling. Set to 1 to disable.",
)
# The query combiner enables us to combine multiple queries into a single query,
# reducing the number of round-trips to the database and speeding up profiling.
query_combiner_enabled: bool = Field(

File diff suppressed because one or more lines are too long

View File

@ -0,0 +1,38 @@
version: "1"
services:
test-cassandra:
image: cassandra:latest
container_name: test-cassandra
ports:
- 9042:9042
volumes:
- ./setup/cassandra.yaml:/etc/cassandra/cassandra.yaml
- ./setup/init_keyspaces.cql:/docker-entrypoint-initdb.d/init_keyspaces.cql
networks:
- testnet
healthcheck:
test: ["CMD-SHELL", "cqlsh -e 'describe keyspaces' || exit 1"]
interval: 10s
timeout: 10s
retries: 10
test-cassandra-load-keyspace:
container_name: test-cassandra-load-keyspace
image: cassandra:latest
depends_on:
test-cassandra:
condition: service_healthy
volumes:
- ./setup/init_keyspaces.cql:/init_keyspaces.cql
command: /bin/bash -c "echo loading cassandra keyspace && cqlsh test-cassandra -f init_keyspaces.cql"
deploy:
restart_policy:
condition: on-failure
delay: 5s
max_attempts: 3
window: 100s
networks:
- testnet
networks:
testnet:

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,131 @@
CREATE KEYSPACE cass_test_1 WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1};
CREATE KEYSPACE cass_test_2 WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1};
CREATE TABLE cass_test_1.information (
person_id int PRIMARY KEY,
last_updated timestamp,
details text
);
CREATE TABLE cass_test_1.people (
person_id int PRIMARY KEY,
name text,
email text
);
CREATE TABLE cass_test_2.tasks (
task_id int PRIMARY KEY,
last_updated timestamp,
details text,
status text
);
CREATE MATERIALIZED VIEW cass_test_2.task_status AS
SELECT
task_id,
status
FROM cass_test_2.tasks
WHERE status IS NOT NULL AND task_id IS NOT NULL
PRIMARY KEY (task_id, status);
-- Create Keyspace with comments
CREATE KEYSPACE IF NOT EXISTS example_keyspace
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
-- Use Keyspace
USE example_keyspace;
-- Table with non-counter column types
CREATE TABLE IF NOT EXISTS all_data_types (
id uuid PRIMARY KEY,
ascii_column ascii,
bigint_column bigint,
blob_column blob,
boolean_column boolean,
date_column date,
decimal_column decimal,
double_column double,
float_column float,
inet_column inet,
int_column int,
list_column list<text>,
map_column map<text, text>,
set_column set<text>,
smallint_column smallint,
text_column text,
time_column time,
timestamp_column timestamp,
timeuuid_column timeuuid,
tinyint_column tinyint,
tuple_column tuple<int, text>,
uuid_column uuid,
varchar_column varchar,
varint_column varint,
frozen_map_column frozen<map<text, text>>,
frozen_list_column frozen<list<text>>,
frozen_set_column frozen<set<text>>
) WITH COMMENT = 'Table containing all supported Cassandra data types, excluding counters';
-- Separate table for counters
CREATE TABLE IF NOT EXISTS counter_table (
id uuid PRIMARY KEY,
counter_column counter
) WITH COMMENT = 'Separate table containing only counter column';
-- Sample view
CREATE MATERIALIZED VIEW IF NOT EXISTS example_view_1 AS
SELECT id, ascii_column, bigint_column
FROM all_data_types
WHERE id IS NOT NULL AND ascii_column IS NOT NULL
PRIMARY KEY (id, ascii_column) WITH COMMENT = 'Example view definition with id and ascii_column';
CREATE MATERIALIZED VIEW IF NOT EXISTS example_view_2 AS
SELECT id, ascii_column, float_column
FROM all_data_types
WHERE id IS NOT NULL AND ascii_column IS NOT NULL
PRIMARY KEY (id, ascii_column) WITH COMMENT = 'Example view definition with id and ascii_column';
-- Table created for profilling
CREATE TABLE IF NOT EXISTS shopping_cart (
userid text PRIMARY KEY,
item_count int,
last_update_timestamp timestamp
);
-- Insert some data
INSERT INTO shopping_cart
(userid, item_count, last_update_timestamp)
VALUES ('9876', 2, '2024-11-01T00:00:00.000+0000');
INSERT INTO shopping_cart
(userid, item_count, last_update_timestamp)
VALUES ('1234', 5, '2024-11-02T00:00:00.000+0000');
INSERT INTO shopping_cart
(userid, item_count, last_update_timestamp)
VALUES ('1235', 100, '2024-11-03T00:00:00.000+0000');
INSERT INTO shopping_cart
(userid, item_count, last_update_timestamp)
VALUES ('1236', 50, '2024-11-04T00:00:00.000+0000');
INSERT INTO shopping_cart
(userid, item_count, last_update_timestamp)
VALUES ('1237', 75, '2024-11-05T00:00:00.000+0000');
INSERT INTO shopping_cart
(userid, last_update_timestamp)
VALUES ('1238', '2024-11-06T00:00:00.000+0000');
INSERT INTO shopping_cart
(userid, last_update_timestamp)
VALUES ('1239', '2024-11-07T00:00:00.000+0000');
INSERT INTO shopping_cart
(userid, last_update_timestamp)
VALUES ('1240', '2024-11-08T00:00:00.000+0000');
INSERT INTO shopping_cart
(userid, last_update_timestamp)
VALUES ('1241', '2024-11-09T00:00:00.000+0000');

View File

@ -0,0 +1,53 @@
import logging
import time
import pytest
from datahub.ingestion.run.pipeline import Pipeline
from tests.test_helpers import mce_helpers
from tests.test_helpers.docker_helpers import wait_for_port
logger = logging.getLogger(__name__)
@pytest.mark.integration
def test_cassandra_ingest(docker_compose_runner, pytestconfig, tmp_path):
test_resources_dir = pytestconfig.rootpath / "tests/integration/cassandra"
with docker_compose_runner(
test_resources_dir / "docker-compose.yml", "cassandra"
) as docker_services:
wait_for_port(docker_services, "test-cassandra", 9042)
time.sleep(5)
# Run the metadata ingestion pipeline.
logger.info("Starting the ingestion test...")
pipeline_default_platform_instance = Pipeline.create(
{
"run_id": "cassandra-test",
"source": {
"type": "cassandra",
"config": {
"contact_point": "localhost",
"port": 9042,
"profiling": {"enabled": True},
},
},
"sink": {
"type": "file",
"config": {
"filename": f"{tmp_path}/cassandra_mcps.json",
},
},
}
)
pipeline_default_platform_instance.run()
pipeline_default_platform_instance.raise_from_status()
# Verify the output.
logger.info("Verifying output.")
mce_helpers.check_golden_file(
pytestconfig,
output_path=f"{tmp_path}/cassandra_mcps.json",
golden_path=test_resources_dir / "cassandra_mcps_golden.json",
)

View File

@ -0,0 +1,81 @@
import json
import logging
import re
from typing import Any, Dict, List, Tuple
import pytest
from datahub.ingestion.source.cassandra.cassandra import CassandraToSchemaFieldConverter
from datahub.ingestion.source.cassandra.cassandra_api import CassandraColumn
from datahub.metadata.com.linkedin.pegasus2avro.schema import SchemaField
logger = logging.getLogger(__name__)
def assert_field_paths_are_unique(fields: List[SchemaField]) -> None:
fields_paths = [f.fieldPath for f in fields if re.match(".*[^]]$", f.fieldPath)]
if fields_paths:
assert len(fields_paths) == len(set(fields_paths))
def assert_field_paths_match(
fields: List[SchemaField], expected_field_paths: List[str]
) -> None:
logger.debug('FieldPaths=\n"' + '",\n"'.join(f.fieldPath for f in fields) + '"')
assert len(fields) == len(expected_field_paths)
for f, efp in zip(fields, expected_field_paths):
assert f.fieldPath == efp
assert_field_paths_are_unique(fields)
# TODO: cover one for every item on https://cassandra.apache.org/doc/stable/cassandra/cql/types.html (version 4.1)
schema_test_cases: Dict[str, Tuple[str, List[str]]] = {
"all_types_on_4.1": (
"""{
"column_infos": [
{"keyspace_name": "playground", "table_name": "people", "column_name": "birthday", "clustering_order": "none", "column_name_bytes": null, "kind": "regular", "position": -1, "type": "timestamp"},
{"keyspace_name": "playground", "table_name": "people", "column_name": "email", "clustering_order": "none", "column_name_bytes": null, "kind": "partition_key", "position": 0, "type": "text"},
{"keyspace_name": "playground", "table_name": "people", "column_name": "name", "clustering_order": "none", "column_name_bytes": null, "kind": "regular", "position": -1, "type": "text"}
]
}""",
[
"birthday",
"email",
"name",
],
)
}
@pytest.mark.parametrize(
"schema, expected_field_paths",
schema_test_cases.values(),
ids=schema_test_cases.keys(),
)
def test_cassandra_schema_conversion(
schema: str, expected_field_paths: List[str]
) -> None:
schema_dict: Dict[str, List[Any]] = json.loads(schema)
column_infos: List = schema_dict["column_infos"]
column_list: List[CassandraColumn] = [
CassandraColumn(
keyspace_name=row["keyspace_name"],
table_name=row["table_name"],
column_name=row["column_name"],
clustering_order=row["clustering_order"],
kind=row["kind"],
position=row["position"],
type=row["type"],
)
for row in column_infos
]
actual_fields = list(CassandraToSchemaFieldConverter.get_schema_fields(column_list))
assert_field_paths_match(actual_fields, expected_field_paths)
def test_no_properties_in_mappings_schema() -> None:
fields = list(CassandraToSchemaFieldConverter.get_schema_fields([]))
assert fields == []

View File

@ -717,3 +717,13 @@
displayName: Dremio
type: QUERY_ENGINE
logoUrl: "/assets/platforms/dremiologo.png"
- entityUrn: urn:li:dataPlatform:cassandra
entityType: dataPlatform
aspectName: dataPlatformInfo
changeType: UPSERT
aspect:
datasetNameDelimiter: "."
name: cassandra
displayName: Cassandra
type: KEY_VALUE_STORE
logoUrl: "/assets/platforms/cassandralogo.png"