fix(ingestion/dremio): Ignore filtered containers in schema allowdeny pattern (#11959)

Co-authored-by: Mayuri Nehate <33225191+mayurinehate@users.noreply.github.com>
This commit is contained in:
Jonny Dixon 2024-12-13 09:25:31 +00:00 committed by GitHub
parent 7c1d3b09ed
commit 06edf23a33
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 12962 additions and 1536 deletions

View File

@ -1,6 +1,7 @@
import concurrent.futures import concurrent.futures
import json import json
import logging import logging
import re
import warnings import warnings
from collections import defaultdict from collections import defaultdict
from enum import Enum from enum import Enum
@ -609,32 +610,6 @@ class DremioAPIOperations:
return self.execute_query(query=jobs_query) return self.execute_query(query=jobs_query)
def get_source_by_id(self, source_id: str) -> Optional[Dict]:
"""
Fetch source details by ID.
"""
response = self.get(
url=f"/source/{source_id}",
)
return response if response else None
def get_source_for_dataset(self, schema: str, dataset: str) -> Optional[Dict]:
"""
Get source information for a dataset given its schema and name.
"""
dataset_id = self.get_dataset_id(schema, dataset)
if not dataset_id:
return None
catalog_entry = self.get(
url=f"/catalog/{dataset_id}",
)
if not catalog_entry or "path" not in catalog_entry:
return None
source_id = catalog_entry["path"][0]
return self.get_source_by_id(source_id)
def get_tags_for_resource(self, resource_id: str) -> Optional[List[str]]: def get_tags_for_resource(self, resource_id: str) -> Optional[List[str]]:
""" """
Get Dremio tags for a given resource_id. Get Dremio tags for a given resource_id.
@ -673,55 +648,119 @@ class DremioAPIOperations:
) )
return None return None
def get_containers_for_location( def _check_pattern_match(
self, resource_id: str, path: List[str] self,
) -> List[Dict[str, str]]: pattern: str,
containers = [] paths: List[str],
allow_prefix: bool = True,
) -> bool:
"""
Helper method to check if a pattern matches any of the paths.
Handles hierarchical matching where each level is matched independently.
Also handles prefix matching for partial paths.
"""
if pattern == ".*":
return True
def traverse_path(location_id: str, entity_path: List[str]) -> List: # Convert the pattern to regex with proper anchoring
nonlocal containers regex_pattern = pattern
try: if pattern.startswith("^"):
response = self.get(url=f"/catalog/{location_id}") # Already has start anchor
if ( regex_pattern = pattern.replace(".", r"\.") # Escape dots
response.get("entityType") regex_pattern = regex_pattern.replace(
== DremioEntityContainerType.FOLDER.value.lower() r"\.*", ".*"
) # Convert .* to wildcard
else:
# Add start anchor and handle dots
regex_pattern = "^" + pattern.replace(".", r"\.").replace(r"\.*", ".*")
# Handle end matching
if not pattern.endswith(".*"):
if pattern.endswith("$"):
# Keep explicit end anchor
pass
elif not allow_prefix:
# Add end anchor for exact matching
regex_pattern = regex_pattern + "$"
for path in paths:
if re.match(regex_pattern, path, re.IGNORECASE):
return True
return False
def should_include_container(self, path: List[str], name: str) -> bool:
"""
Helper method to check if a container should be included based on schema patterns.
Used by both get_all_containers and get_containers_for_location.
"""
path_components = path + [name] if path else [name]
full_path = ".".join(path_components)
# Default allow everything case
if self.allow_schema_pattern == [".*"] and not self.deny_schema_pattern:
self.report.report_container_scanned(full_path)
return True
# Check deny patterns first
if self.deny_schema_pattern:
for pattern in self.deny_schema_pattern:
if self._check_pattern_match(
pattern=pattern,
paths=[full_path],
allow_prefix=False,
): ):
containers.append( self.report.report_container_filtered(full_path)
{ return False
"id": location_id,
"name": entity_path[-1],
"path": entity_path[:-1],
"container_type": DremioEntityContainerType.FOLDER,
}
)
for container in response.get("children", []): # Check allow patterns
if ( for pattern in self.allow_schema_pattern:
container.get("type") # For patterns with wildcards, check if this path is a parent of the pattern
== DremioEntityContainerType.CONTAINER.value if "*" in pattern:
): pattern_parts = pattern.split(".")
traverse_path(container.get("id"), container.get("path")) path_parts = path_components
except Exception as exc: # If pattern has exact same number of parts, check each component
logging.info( if len(pattern_parts) == len(path_parts):
"Location {} contains no tables or views. Skipping...".format(id) matches = True
) for p_part, c_part in zip(pattern_parts, path_parts):
self.report.warning( if p_part != "*" and p_part.lower() != c_part.lower():
message="Failed to get tables or views", matches = False
context=f"{id}", break
exc=exc, if matches:
) self.report.report_container_scanned(full_path)
return True
# Otherwise check if current path is prefix match
else:
# Remove the trailing wildcard if present
if pattern_parts[-1] == "*":
pattern_parts = pattern_parts[:-1]
return containers for i in range(len(path_parts)):
current_path = ".".join(path_parts[: i + 1])
pattern_prefix = ".".join(pattern_parts[: i + 1])
return traverse_path(location_id=resource_id, entity_path=path) if pattern_prefix.startswith(current_path):
self.report.report_container_scanned(full_path)
return True
# Direct pattern matching
if self._check_pattern_match(
pattern=pattern,
paths=[full_path],
allow_prefix=True,
):
self.report.report_container_scanned(full_path)
return True
self.report.report_container_filtered(full_path)
return False
def get_all_containers(self): def get_all_containers(self):
""" """
Query the Dremio sources API and return source information. Query the Dremio sources API and return filtered source information.
""" """
containers = [] containers = []
response = self.get(url="/catalog") response = self.get(url="/catalog")
def process_source(source): def process_source(source):
@ -731,34 +770,41 @@ class DremioAPIOperations:
) )
source_config = source_resp.get("config", {}) source_config = source_resp.get("config", {})
if source_config.get("database"): db = source_config.get(
db = source_config.get("database") "database", source_config.get("databaseName", "")
else: )
db = source_config.get("databaseName", "")
return { if self.should_include_container([], source.get("path")[0]):
"id": source.get("id"), return {
"name": source.get("path")[0], "id": source.get("id"),
"path": [], "name": source.get("path")[0],
"container_type": DremioEntityContainerType.SOURCE, "path": [],
"source_type": source_resp.get("type"), "container_type": DremioEntityContainerType.SOURCE,
"root_path": source_config.get("rootPath"), "source_type": source_resp.get("type"),
"database_name": db, "root_path": source_config.get("rootPath"),
} "database_name": db,
}
else: else:
return { if self.should_include_container([], source.get("path")[0]):
"id": source.get("id"), return {
"name": source.get("path")[0], "id": source.get("id"),
"path": [], "name": source.get("path")[0],
"container_type": DremioEntityContainerType.SPACE, "path": [],
} "container_type": DremioEntityContainerType.SPACE,
}
return None
def process_source_and_containers(source): def process_source_and_containers(source):
container = process_source(source) container = process_source(source)
if not container:
return []
# Get sub-containers
sub_containers = self.get_containers_for_location( sub_containers = self.get_containers_for_location(
resource_id=container.get("id"), resource_id=container.get("id"),
path=[container.get("name")], path=[container.get("name")],
) )
return [container] + sub_containers return [container] + sub_containers
# Use ThreadPoolExecutor to parallelize the processing of sources # Use ThreadPoolExecutor to parallelize the processing of sources
@ -771,7 +817,16 @@ class DremioAPIOperations:
} }
for future in concurrent.futures.as_completed(future_to_source): for future in concurrent.futures.as_completed(future_to_source):
containers.extend(future.result()) source = future_to_source[future]
try:
containers.extend(future.result())
except Exception as exc:
logger.error(f"Error processing source: {exc}")
self.report.warning(
message="Failed to process source",
context=f"{source}",
exc=exc,
)
return containers return containers
@ -785,3 +840,55 @@ class DremioAPIOperations:
) )
else: else:
return "" return ""
def get_containers_for_location(
self, resource_id: str, path: List[str]
) -> List[Dict[str, str]]:
containers = []
def traverse_path(location_id: str, entity_path: List[str]) -> List:
nonlocal containers
try:
response = self.get(url=f"/catalog/{location_id}")
# Check if current folder should be included
if (
response.get("entityType")
== DremioEntityContainerType.FOLDER.value.lower()
):
folder_name = entity_path[-1]
folder_path = entity_path[:-1]
if self.should_include_container(folder_path, folder_name):
containers.append(
{
"id": location_id,
"name": folder_name,
"path": folder_path,
"container_type": DremioEntityContainerType.FOLDER,
}
)
# Recursively process child containers
for container in response.get("children", []):
if (
container.get("type")
== DremioEntityContainerType.CONTAINER.value
):
traverse_path(container.get("id"), container.get("path"))
except Exception as exc:
logging.info(
"Location {} contains no tables or views. Skipping...".format(
location_id
)
)
self.report.warning(
message="Failed to get tables or views",
context=f"{location_id}",
exc=exc,
)
return containers
return traverse_path(location_id=resource_id, entity_path=path)

View File

@ -31,6 +31,7 @@ class DremioToDataHubSourceTypeMapping:
"SNOWFLAKE": "snowflake", "SNOWFLAKE": "snowflake",
"SYNAPSE": "mssql", "SYNAPSE": "mssql",
"TERADATA": "teradata", "TERADATA": "teradata",
"VERTICA": "vertica",
} }
DATABASE_SOURCE_TYPES = { DATABASE_SOURCE_TYPES = {
@ -52,6 +53,7 @@ class DremioToDataHubSourceTypeMapping:
"SNOWFLAKE", "SNOWFLAKE",
"SYNAPSE", "SYNAPSE",
"TERADATA", "TERADATA",
"VERTICA",
} }
FILE_OBJECT_STORAGE_TYPES = { FILE_OBJECT_STORAGE_TYPES = {

View File

@ -14,12 +14,27 @@ class DremioSourceReport(
): ):
num_containers_failed: int = 0 num_containers_failed: int = 0
num_datasets_failed: int = 0 num_datasets_failed: int = 0
containers_scanned: int = 0
containers_filtered: int = 0
def report_upstream_latency(self, start_time: datetime, end_time: datetime) -> None: def report_upstream_latency(self, start_time: datetime, end_time: datetime) -> None:
# recording total combined latency is not very useful, keeping this method as a placeholder # recording total combined latency is not very useful, keeping this method as a placeholder
# for future implementation of min / max / percentiles etc. # for future implementation of min / max / percentiles etc.
pass pass
def report_container_scanned(self, name: str) -> None:
"""
Record that a container was successfully scanned
"""
self.containers_scanned += 1
def report_container_filtered(self, container_name: str) -> None:
"""
Record that a container was filtered out
"""
self.containers_filtered += 1
self.report_dropped(container_name)
def report_entity_scanned(self, name: str, ent_type: str = "View") -> None: def report_entity_scanned(self, name: str, ent_type: str = "View") -> None:
""" """
Entity could be a view or a table Entity could be a view or a table

View File

@ -0,0 +1,26 @@
source:
type: dremio
config:
# Coordinates
hostname: localhost
port: 9047
tls: false
# Credentials
authentication_method: password
username: admin
password: "2310Admin1234!@"
platform_instance: test-platform
include_query_lineage: false
source_mappings:
- platform: s3
source_name: samples
platform_instance: s3_test_samples
sink:
type: file
config:
filename: "./dremio_mces.json"

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,28 @@
source:
type: dremio
config:
# Coordinates
hostname: localhost
port: 9047
tls: false
# Credentials
authentication_method: password
username: admin
password: "2310Admin1234!@"
include_query_lineage: false
source_mappings:
- platform: s3
source_name: samples
platform_instance: s3_test_samples
schema_pattern:
allow:
- "Samples"
sink:
type: file
config:
filename: "./dremio_mces.json"

View File

@ -1,6 +1,7 @@
import json import json
import os import os
import subprocess import subprocess
from typing import Dict
import boto3 import boto3
import pytest import pytest
@ -75,9 +76,10 @@ def create_spaces_and_folders(headers):
def create_sample_source(headers): def create_sample_source(headers):
url = f"{DREMIO_HOST}/apiv2/source/Samples" url = f"{DREMIO_HOST}/api/v3/catalog"
payload = { payload = {
"entityType": "source",
"config": { "config": {
"externalBucketList": ["samples.dremio.com"], "externalBucketList": ["samples.dremio.com"],
"credentialType": "NONE", "credentialType": "NONE",
@ -95,14 +97,15 @@ def create_sample_source(headers):
"type": "S3", "type": "S3",
} }
response = requests.put(url, headers=headers, data=json.dumps(payload)) response = requests.post(url, headers=headers, data=json.dumps(payload))
assert response.status_code == 200, f"Failed to add dataset: {response.text}" assert response.status_code == 200, f"Failed to add dataset: {response.text}"
def create_s3_source(headers): def create_s3_source(headers):
url = f"{DREMIO_HOST}/apiv2/source/s3" url = f"{DREMIO_HOST}/api/v3/catalog"
payload = { payload = {
"entityType": "source",
"name": "s3", "name": "s3",
"config": { "config": {
"credentialType": "ACCESS_KEY", "credentialType": "ACCESS_KEY",
@ -139,24 +142,25 @@ def create_s3_source(headers):
"metadataPolicy": { "metadataPolicy": {
"deleteUnavailableDatasets": True, "deleteUnavailableDatasets": True,
"autoPromoteDatasets": False, "autoPromoteDatasets": False,
"namesRefreshMillis": 3600000, "namesRefreshMs": 3600000,
"datasetDefinitionRefreshAfterMillis": 3600000, "datasetRefreshAfterMs": 3600000,
"datasetDefinitionExpireAfterMillis": 10800000, "datasetExpireAfterMs": 10800000,
"authTTLMillis": 86400000, "authTTLMs": 86400000,
"updateMode": "PREFETCH_QUERIED", "datasetUpdateMode": "PREFETCH_QUERIED",
}, },
"type": "S3", "type": "S3",
"accessControlList": {"userControls": [], "roleControls": []}, "accessControlList": {"userControls": [], "roleControls": []},
} }
response = requests.put(url, headers=headers, data=json.dumps(payload)) response = requests.post(url, headers=headers, data=json.dumps(payload))
assert response.status_code == 200, f"Failed to add s3 datasource: {response.text}" assert response.status_code == 200, f"Failed to add s3 datasource: {response.text}"
def create_mysql_source(headers): def create_mysql_source(headers):
url = f"{DREMIO_HOST}/apiv2/source/mysql" url = f"{DREMIO_HOST}/api/v3/catalog"
payload = { payload = {
"entityType": "source",
"config": { "config": {
"username": "root", "username": "root",
"password": "rootpwd123", "password": "rootpwd123",
@ -169,7 +173,7 @@ def create_mysql_source(headers):
"maxIdleConns": 8, "maxIdleConns": 8,
"idleTimeSec": 60, "idleTimeSec": 60,
}, },
"name": "mysql-source", "name": "mysql",
"accelerationRefreshPeriod": 3600000, "accelerationRefreshPeriod": 3600000,
"accelerationGracePeriod": 10800000, "accelerationGracePeriod": 10800000,
"accelerationActivePolicyType": "PERIOD", "accelerationActivePolicyType": "PERIOD",
@ -177,72 +181,121 @@ def create_mysql_source(headers):
"accelerationRefreshOnDataChanges": False, "accelerationRefreshOnDataChanges": False,
"metadataPolicy": { "metadataPolicy": {
"deleteUnavailableDatasets": True, "deleteUnavailableDatasets": True,
"namesRefreshMillis": 3600000, "namesRefreshMs": 3600000,
"datasetDefinitionRefreshAfterMillis": 3600000, "datasetRefreshAfterMs": 3600000,
"datasetDefinitionExpireAfterMillis": 10800000, "datasetExpireAfterMs": 10800000,
"authTTLMillis": 86400000, "authTTLMs": 86400000,
"updateMode": "PREFETCH_QUERIED", "datasetUpdateMode": "PREFETCH_QUERIED",
}, },
"type": "MYSQL", "type": "MYSQL",
} }
response = requests.put(url, headers=headers, data=json.dumps(payload)) response = requests.post(url, headers=headers, data=json.dumps(payload))
assert ( assert (
response.status_code == 200 response.status_code == 200
), f"Failed to add mysql datasource: {response.text}" ), f"Failed to add mysql datasource: {response.text}"
def upload_dataset(headers): def upload_dataset(headers):
url = f"{DREMIO_HOST}/apiv2/source/s3/file_format/warehouse/sample.parquet" url = f"{DREMIO_HOST}/api/v3/catalog/dremio%3A%2Fs3%2Fwarehouse"
payload = {"ignoreOtherFileFormats": False, "type": "Parquet"}
response = requests.put(url, headers=headers, data=json.dumps(payload))
assert response.status_code == 200, f"Failed to add dataset: {response.text}"
url = f"{DREMIO_HOST}/apiv2/source/Samples/file_format/samples.dremio.com/NYC-weather.csv"
payload = { payload = {
"fieldDelimiter": ",", "entityType": "dataset",
"quote": '"', "type": "PHYSICAL_DATASET",
"comment": "#", "path": [
"lineDelimiter": "\r\n", "s3",
"escape": '"', "warehouse",
"extractHeader": False, ],
"trimHeader": True, "format": {"type": "Parquet"},
"skipFirstLine": False,
"type": "Text",
} }
response = requests.put(url, headers=headers, data=json.dumps(payload)) response = requests.post(url, headers=headers, data=json.dumps(payload))
assert response.status_code == 200, f"Failed to add dataset: {response.text}" assert response.status_code == 200, f"Failed to add dataset: {response.text}"
url = f"{DREMIO_HOST}/apiv2/source/Samples/file_format/samples.dremio.com/Dremio%20University/oracle-departments.xlsx" url = f"{DREMIO_HOST}/api/v3/catalog/dremio%3A%2FSamples%2Fsamples.dremio.com%2FNYC-weather.csv"
payload = {"extractHeader": True, "hasMergedCells": False, "type": "Excel"}
response = requests.put(url, headers=headers, data=json.dumps(payload))
assert response.status_code == 200, f"Failed to add dataset: {response.text}"
url = f"{DREMIO_HOST}/apiv2/source/Samples/file_format/samples.dremio.com/Dremio%20University/googleplaystore.csv"
payload = { payload = {
"fieldDelimiter": ",", "entityType": "dataset",
"quote": '"', "type": "PHYSICAL_DATASET",
"comment": "#", "path": [
"lineDelimiter": "\r\n", "Samples",
"escape": '"', "samples.dremio.com",
"extractHeader": False, "NYC-weather.csv",
"trimHeader": True, ],
"skipFirstLine": False, "format": {
"type": "Text", "fieldDelimiter": ",",
"quote": '"',
"comment": "#",
"lineDelimiter": "\r\n",
"escape": '"',
"extractHeader": False,
"trimHeader": True,
"skipFirstLine": False,
"type": "Text",
},
} }
response = requests.put(url, headers=headers, data=json.dumps(payload)) response = requests.post(url, headers=headers, data=json.dumps(payload))
assert response.status_code == 200, f"Failed to add dataset: {response.text}" assert response.status_code == 200, f"Failed to add dataset: {response.text}"
url = f"{DREMIO_HOST}/apiv2/source/Samples/file_format/samples.dremio.com/tpcds_sf1000/catalog_page/1ab266d5-18eb-4780-711d-0fa337fa6c00/0_0_0.parquet" url = f"{DREMIO_HOST}/api/v3/catalog/dremio%3A%2FSamples%2Fsamples.dremio.com%2FDremio%20University%2Foracle-departments.xlsx"
payload = {"ignoreOtherFileFormats": False, "type": "Parquet"}
response = requests.put(url, headers=headers, data=json.dumps(payload)) payload = {
"entityType": "dataset",
"type": "PHYSICAL_DATASET",
"path": [
"Samples",
"samples.dremio.com",
"Dremio University",
"oracle-departments.xlsx",
],
"format": {"extractHeader": True, "hasMergedCells": False, "type": "Excel"},
}
response = requests.post(url, headers=headers, data=json.dumps(payload))
assert response.status_code == 200, f"Failed to add dataset: {response.text}"
url = f"{DREMIO_HOST}/api/v3/catalog/dremio%3A%2FSamples%2Fsamples.dremio.com%2FDremio%20University%2Fgoogleplaystore.csv"
payload = {
"entityType": "dataset",
"type": "PHYSICAL_DATASET",
"path": [
"Samples",
"samples.dremio.com",
"Dremio University",
"googleplaystore.csv",
],
"format": {
"fieldDelimiter": ",",
"quote": '"',
"comment": "#",
"lineDelimiter": "\r\n",
"escape": '"',
"extractHeader": False,
"trimHeader": True,
"skipFirstLine": False,
"type": "Text",
},
}
response = requests.post(url, headers=headers, data=json.dumps(payload))
assert response.status_code == 200, f"Failed to add dataset: {response.text}"
url = f"{DREMIO_HOST}/api/v3/catalog/dremio%3A%2FSamples%2Fsamples.dremio.com%2Ftpcds_sf1000%2Fcatalog_page%2F1ab266d5-18eb-4780-711d-0fa337fa6c00%2F0_0_0.parquet"
payload = {
"entityType": "dataset",
"type": "PHYSICAL_DATASET",
"path": [
"Samples",
"samples.dremio.com",
"tpcds_sf1000",
"catalog_page",
"1ab266d5-18eb-4780-711d-0fa337fa6c00",
"0_0_0.parquet",
],
"format": {"type": "Parquet"},
}
response = requests.post(url, headers=headers, data=json.dumps(payload))
assert response.status_code == 200, f"Failed to add dataset: {response.text}" assert response.status_code == 200, f"Failed to add dataset: {response.text}"
@ -253,7 +306,7 @@ def create_view(headers):
"entityType": "dataset", "entityType": "dataset",
"type": "VIRTUAL_DATASET", "type": "VIRTUAL_DATASET",
"path": ["space", "test_folder", "raw"], "path": ["space", "test_folder", "raw"],
"sql": 'SELECT * FROM s3.warehouse."sample.parquet"', "sql": "SELECT * FROM s3.warehouse",
} }
response = requests.post(url, headers=headers, data=json.dumps(payload)) response = requests.post(url, headers=headers, data=json.dumps(payload))
assert response.status_code == 200, f"Failed to create view: {response.text}" assert response.status_code == 200, f"Failed to create view: {response.text}"
@ -273,7 +326,7 @@ def create_view(headers):
"entityType": "dataset", "entityType": "dataset",
"type": "VIRTUAL_DATASET", "type": "VIRTUAL_DATASET",
"path": ["space", "test_folder", "customers"], "path": ["space", "test_folder", "customers"],
"sql": 'SELECT * FROM "mysql".northwind.customers', "sql": "SELECT * FROM mysql.northwind.customers",
"sqlContext": ["mysql", "northwind"], "sqlContext": ["mysql", "northwind"],
} }
response = requests.post(url, headers=headers, data=json.dumps(payload)) response = requests.post(url, headers=headers, data=json.dumps(payload))
@ -283,7 +336,7 @@ def create_view(headers):
"entityType": "dataset", "entityType": "dataset",
"type": "VIRTUAL_DATASET", "type": "VIRTUAL_DATASET",
"path": ["space", "test_folder", "orders"], "path": ["space", "test_folder", "orders"],
"sql": 'SELECT * FROM "mysql".northwind.orders', "sql": "SELECT * FROM mysql.northwind.orders",
"sqlContext": ["mysql", "northwind"], "sqlContext": ["mysql", "northwind"],
} }
response = requests.post(url, headers=headers, data=json.dumps(payload)) response = requests.post(url, headers=headers, data=json.dumps(payload))
@ -293,7 +346,7 @@ def create_view(headers):
"entityType": "dataset", "entityType": "dataset",
"type": "VIRTUAL_DATASET", "type": "VIRTUAL_DATASET",
"path": ["space", "test_folder", "metadata_aspect"], "path": ["space", "test_folder", "metadata_aspect"],
"sql": 'SELECT * FROM "mysql".metagalaxy."metadata_aspect"', "sql": "SELECT * FROM mysql.metagalaxy.metadata_aspect",
"sqlContext": ["mysql", "metagalaxy"], "sqlContext": ["mysql", "metagalaxy"],
} }
response = requests.post(url, headers=headers, data=json.dumps(payload)) response = requests.post(url, headers=headers, data=json.dumps(payload))
@ -303,7 +356,7 @@ def create_view(headers):
"entityType": "dataset", "entityType": "dataset",
"type": "VIRTUAL_DATASET", "type": "VIRTUAL_DATASET",
"path": ["space", "test_folder", "metadata_index"], "path": ["space", "test_folder", "metadata_index"],
"sql": 'SELECT * FROM "mysql".metagalaxy."metadata_index"', "sql": "SELECT * FROM mysql.metagalaxy.metadata_index",
"sqlContext": ["mysql", "metagalaxy"], "sqlContext": ["mysql", "metagalaxy"],
} }
response = requests.post(url, headers=headers, data=json.dumps(payload)) response = requests.post(url, headers=headers, data=json.dumps(payload))
@ -313,7 +366,7 @@ def create_view(headers):
"entityType": "dataset", "entityType": "dataset",
"type": "VIRTUAL_DATASET", "type": "VIRTUAL_DATASET",
"path": ["space", "test_folder", "metadata_index_view"], "path": ["space", "test_folder", "metadata_index_view"],
"sql": 'SELECT * FROM "mysql".metagalaxy."metadata_index_view"', "sql": "SELECT * FROM mysql.metagalaxy.metadata_index_view",
"sqlContext": ["mysql", "metagalaxy"], "sqlContext": ["mysql", "metagalaxy"],
} }
response = requests.post(url, headers=headers, data=json.dumps(payload)) response = requests.post(url, headers=headers, data=json.dumps(payload))
@ -422,14 +475,119 @@ def test_dremio_ingest(
pytestconfig, pytestconfig,
tmp_path, tmp_path,
): ):
# Run the metadata ingestion pipeline. # Run the metadata ingestion pipeline with specific output file
config_file = (test_resources_dir / "dremio_to_file.yml").resolve() config_file = (test_resources_dir / "dremio_to_file.yml").resolve()
output_path = tmp_path / "dremio_mces.json"
run_datahub_cmd(["ingest", "-c", f"{config_file}"], tmp_path=tmp_path) run_datahub_cmd(["ingest", "-c", f"{config_file}"], tmp_path=tmp_path)
# Verify the output. # Verify the output
mce_helpers.check_golden_file( mce_helpers.check_golden_file(
pytestconfig, pytestconfig,
output_path=tmp_path / "dremio_mces.json", output_path=output_path,
golden_path=test_resources_dir / "dremio_mces_golden.json", golden_path=test_resources_dir / "dremio_mces_golden.json",
ignore_paths=[], ignore_paths=[],
) )
@freeze_time(FROZEN_TIME)
@pytest.mark.integration
def test_dremio_platform_instance_urns(
test_resources_dir,
dremio_setup,
pytestconfig,
tmp_path,
):
config_file = (
test_resources_dir / "dremio_platform_instance_to_file.yml"
).resolve()
output_path = tmp_path / "dremio_mces.json"
run_datahub_cmd(["ingest", "-c", f"{config_file}"], tmp_path=tmp_path)
with output_path.open() as f:
content = f.read()
# Skip if file is empty or just contains brackets
if not content or content.strip() in ("[]", "[", "]"):
pytest.fail(f"Output file is empty or invalid: {content}")
try:
# Try to load as JSON Lines first
mces = []
for line in content.splitlines():
line = line.strip()
if line and line not in ("[", "]"): # Skip empty lines and bare brackets
mce = json.loads(line)
mces.append(mce)
except json.JSONDecodeError:
# If that fails, try loading as a single JSON array
try:
mces = json.loads(content)
except json.JSONDecodeError as e:
print(f"Failed to parse file content: {content}")
raise e
# Verify MCEs
assert len(mces) > 0, "No MCEs found in output file"
# Verify the platform instances
for mce in mces:
if "entityType" not in mce:
continue
# Check dataset URN structure
if mce["entityType"] == "dataset" and "entityUrn" in mce:
assert (
"test-platform.dremio" in mce["entityUrn"]
), f"Platform instance missing in dataset URN: {mce['entityUrn']}"
# Check aspects for both datasets and containers
if "aspectName" in mce:
# Check dataPlatformInstance aspect
if mce["aspectName"] == "dataPlatformInstance":
aspect = mce["aspect"]
if not isinstance(aspect, Dict) or "json" not in aspect:
continue
aspect_json = aspect["json"]
if not isinstance(aspect_json, Dict):
continue
if "instance" not in aspect_json:
continue
instance = aspect_json["instance"]
expected_instance = "urn:li:dataPlatformInstance:(urn:li:dataPlatform:dremio,test-platform)"
assert (
instance == expected_instance
), f"Invalid platform instance format: {instance}"
# Verify against golden file
mce_helpers.check_golden_file(
pytestconfig,
output_path=output_path,
golden_path=test_resources_dir / "dremio_platform_instance_mces_golden.json",
ignore_paths=[],
)
@freeze_time(FROZEN_TIME)
@pytest.mark.integration
def test_dremio_schema_filter(
test_resources_dir,
dremio_setup,
pytestconfig,
tmp_path,
):
config_file = (test_resources_dir / "dremio_schema_filter_to_file.yml").resolve()
output_path = tmp_path / "dremio_mces.json"
run_datahub_cmd(["ingest", "-c", f"{config_file}"], tmp_path=tmp_path)
# Verify against golden file
mce_helpers.check_golden_file(
pytestconfig,
output_path=output_path,
golden_path=test_resources_dir / "dremio_schema_filter_mces_golden.json",
ignore_paths=[],
)

View File

@ -0,0 +1,123 @@
from unittest.mock import Mock
import pytest
from datahub.ingestion.source.dremio.dremio_api import DremioAPIOperations
from datahub.ingestion.source.dremio.dremio_config import DremioSourceConfig
from datahub.ingestion.source.dremio.dremio_reporting import DremioSourceReport
class TestDremioContainerFiltering:
@pytest.fixture
def dremio_api(self, monkeypatch):
# Mock the requests.Session
mock_session = Mock()
monkeypatch.setattr("requests.Session", Mock(return_value=mock_session))
# Mock the authentication response
mock_session.post.return_value.json.return_value = {"token": "dummy-token"}
mock_session.post.return_value.status_code = 200
config = DremioSourceConfig(
hostname="dummy-host",
port=9047,
tls=False,
authentication_method="password",
username="dummy-user",
password="dummy-password",
schema_pattern=dict(allow=[".*"], deny=[]),
)
report = DremioSourceReport()
return DremioAPIOperations(config, report)
def test_basic_allow_pattern(self, dremio_api):
"""Test basic allow pattern matching"""
dremio_api.allow_schema_pattern = ["test"]
dremio_api.deny_schema_pattern = []
assert dremio_api.should_include_container([], "test")
assert dremio_api.should_include_container(["test"], "subfolder")
assert not dremio_api.should_include_container([], "prod_space")
def test_basic_deny_pattern(self, dremio_api):
"""Test basic deny pattern matching"""
dremio_api.allow_schema_pattern = [".*"]
dremio_api.deny_schema_pattern = ["test_space.*"]
assert not dremio_api.should_include_container([], "test_space")
assert not dremio_api.should_include_container(["test_space"], "subfolder")
assert dremio_api.should_include_container([], "prod_space")
def test_hierarchical_matching(self, dremio_api):
"""Test matching with hierarchical paths"""
dremio_api.allow_schema_pattern = ["prod.data.*"]
dremio_api.deny_schema_pattern = []
assert dremio_api.should_include_container([], "prod")
assert dremio_api.should_include_container(["prod"], "data")
assert dremio_api.should_include_container(["prod", "data"], "sales")
assert not dremio_api.should_include_container([], "dev")
assert not dremio_api.should_include_container(["dev"], "data")
def test_allow_and_deny_patterns(self, dremio_api):
"""Test combination of allow and deny patterns"""
dremio_api.allow_schema_pattern = ["prod.*"]
dremio_api.deny_schema_pattern = ["prod.internal.*"]
assert dremio_api.should_include_container([], "prod")
assert dremio_api.should_include_container(["prod"], "public")
assert dremio_api.should_include_container(["prod", "public"], "next")
assert not dremio_api.should_include_container(["prod"], "internal")
assert not dremio_api.should_include_container(["prod", "internal"], "secrets")
def test_wildcard_patterns(self, dremio_api):
"""Test wildcard pattern handling"""
dremio_api.allow_schema_pattern = [".*"]
dremio_api.deny_schema_pattern = []
assert dremio_api.should_include_container([], "any_space")
assert dremio_api.should_include_container(["any_space"], "any_folder")
# Test with specific wildcard in middle
dremio_api.allow_schema_pattern = ["prod.*.public"]
assert dremio_api.should_include_container(["prod", "customer"], "public")
assert not dremio_api.should_include_container(["prod", "customer"], "private")
def test_case_insensitive_matching(self, dremio_api):
"""Test case-insensitive pattern matching"""
dremio_api.allow_schema_pattern = ["PROD.*"]
dremio_api.deny_schema_pattern = []
assert dremio_api.should_include_container([], "prod")
assert dremio_api.should_include_container([], "PROD")
assert dremio_api.should_include_container(["prod"], "DATA")
assert dremio_api.should_include_container(["PROD"], "data")
def test_empty_patterns(self, dremio_api):
"""Test behavior with empty patterns"""
dremio_api.allow_schema_pattern = [".*"]
dremio_api.deny_schema_pattern = []
# Should allow everything when allow pattern is empty
assert dremio_api.should_include_container([], "any_space")
assert dremio_api.should_include_container(["any_space"], "any_folder")
def test_partial_path_matching(self, dremio_api):
"""Test matching behavior with partial paths"""
dremio_api.allow_schema_pattern = ["^pr.*.data.*"]
dremio_api.deny_schema_pattern = []
assert dremio_api.should_include_container(["prod"], "data")
# Should match the partial path even though pattern doesn't have wildcards
assert dremio_api.should_include_container(["prod", "data"], "sales")
assert not dremio_api.should_include_container([], "dev")
assert not dremio_api.should_include_container(["dev", "data"], "sales")
def test_partial_start_end_chars(self, dremio_api):
"""Test matching behavior with partial paths"""
dremio_api.allow_schema_pattern = ["pr.*.data$"]
dremio_api.deny_schema_pattern = []
assert dremio_api.should_include_container(["prod"], "data")
# Should match the partial path even though pattern doesn't have wildcards
assert not dremio_api.should_include_container(["prod", "data"], "sales")