Merge branch 'main' into drive-missing-unit-tests

This commit is contained in:
Aniket Katkar 2025-09-26 19:40:03 +05:30 committed by GitHub
commit 29b738d265
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 261 additions and 61 deletions

View File

@ -10,6 +10,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
RED='\033[0;31m'
cd "$(dirname "${BASH_SOURCE[0]}")" || exit
helpFunction()
@ -116,6 +118,13 @@ until curl -s -f "http://localhost:9200/_cat/indices/openmetadata_team_search_in
done
until curl -s -f --header 'Authorization: Basic YWRtaW46YWRtaW4=' "http://localhost:8080/api/v1/dags/sample_data"; do
IMPORT_ERRORS="$(curl -s --header 'Authorization: Basic YWRtaW46YWRtaW4=' "http://localhost:8080/api/v1/importErrors")"
echo $IMPORT_ERRORS | grep "/airflow_sample_data.py" > /dev/null;
if [[ "$?" == "0" ]]; then
echo -e "${RED}Airflow found an error importing \`sample_data\` DAG"
printf '%s' "$IMPORT_ERRORS" | jq '.["import_errors"] | .[] | select(.filename | endswith("airflow_sample_data.py"))'
exit 1
fi
echo 'Checking if Sample Data DAG is reachable...\n'
sleep 5
done

View File

@ -72,7 +72,9 @@ class DatabricksClient:
"Content-Type": "application/json",
}
self.api_timeout = self.config.connectionTimeout or 120
self._job_table_lineage_executed: bool = False
self.job_table_lineage: dict[str, list[dict[str, str]]] = {}
self._job_column_lineage_executed: bool = False
self.job_column_lineage: dict[
str, dict[Tuple[str, str], list[Tuple[str, str]]]
] = {}
@ -193,7 +195,6 @@ class DatabricksClient:
"""
Method returns List all the created jobs in a Databricks Workspace
"""
self.cache_lineage()
try:
iteration_count = 1
data = {"limit": PAGE_SIZE, "expand_tasks": True, "offset": 0}
@ -266,6 +267,10 @@ class DatabricksClient:
Method returns table lineage for a job by the specified job_id
"""
try:
if not self._job_table_lineage_executed:
logger.debug("Executing cache_lineage...")
self.cache_lineage()
return self.job_table_lineage.get(str(job_id))
except Exception as exc:
logger.debug(
@ -281,6 +286,10 @@ class DatabricksClient:
Method returns column lineage for a job by the specified job_id and table key
"""
try:
if not self._job_column_lineage_executed:
logger.debug("Job column lineage not found. Executing cache_lineage...")
self.cache_lineage()
return self.job_column_lineage.get(str(job_id), {}).get(TableKey)
except Exception as exc:
logger.debug(
@ -325,6 +334,7 @@ class DatabricksClient:
f"Error parsing row: {row} due to {traceback.format_exc()}"
)
continue
self._job_table_lineage_executed = True
# Not every job has column lineage, so we need to check if the job exists in the column_lineage table
# we will cache the column lineage for jobs that have column lineage
@ -355,3 +365,5 @@ class DatabricksClient:
f"Error parsing row: {row} due to {traceback.format_exc()}"
)
continue
self._job_column_lineage_executed = True
logger.debug("Table and column lineage caching completed.")

View File

@ -96,6 +96,7 @@ DATABRICKS_GET_TABLE_LINEAGE_FOR_JOB = textwrap.dedent(
target_table_full_name as target_table_full_name
FROM system.access.table_lineage
WHERE entity_type ILIKE 'job'
AND event_time >= current_date() - INTERVAL 90 DAYS
"""
)
@ -110,5 +111,6 @@ DATABRICKS_GET_COLUMN_LINEAGE_FOR_JOB = textwrap.dedent(
target_column_name as target_column_name
FROM system.access.column_lineage
WHERE entity_type ILIKE 'job'
AND event_time >= current_date() - INTERVAL 90 DAYS
"""
)

View File

@ -130,9 +130,11 @@ class DatabrickspipelineSource(PipelineServiceSource):
displayName=pipeline_details.settings.name,
description=Markdown(description) if description else None,
tasks=self.get_tasks(pipeline_details),
scheduleInterval=str(pipeline_details.settings.schedule.cron)
if pipeline_details.settings.schedule
else None,
scheduleInterval=(
str(pipeline_details.settings.schedule.cron)
if pipeline_details.settings.schedule
else None
),
service=FullyQualifiedEntityName(self.context.get().pipeline_service),
)
yield Either(right=pipeline_request)
@ -176,12 +178,14 @@ class DatabrickspipelineSource(PipelineServiceSource):
Task(
name=str(task.name),
taskType=pipeline_details.settings.task_type,
sourceUrl=SourceUrl(run.run_page_url)
if run.run_page_url
else None,
description=Markdown(task.description)
if task.description
else None,
sourceUrl=(
SourceUrl(run.run_page_url)
if run.run_page_url
else None
),
description=(
Markdown(task.description) if task.description else None
),
downstreamTasks=[
depend_task.name
for depend_task in task.depends_on or []
@ -314,6 +318,9 @@ class DatabrickspipelineSource(PipelineServiceSource):
table_lineage_list = self.client.get_table_lineage(
job_id=pipeline_details.job_id
)
logger.debug(
f"Processing pipeline lineage for job {pipeline_details.job_id}"
)
if table_lineage_list:
for table_lineage in table_lineage_list:
source_table_full_name = table_lineage.get("source_table_full_name")

View File

@ -11,33 +11,58 @@
* limitations under the License.
*/
import test, { expect } from '@playwright/test';
import { SidebarItem } from '../../constant/sidebar';
import { Domain } from '../../support/domain/Domain';
import { TableClass } from '../../support/entity/TableClass';
import { UserClass } from '../../support/user/UserClass';
import { createNewPage, redirectToHomePage } from '../../utils/common';
import {
getEncodedFqn,
waitForAllLoadersToDisappear,
} from '../../utils/entity';
import { getJsonTreeObject } from '../../utils/exploreDiscovery';
import { sidebarClick } from '../../utils/sidebar';
// use the admin user to login
test.use({ storageState: 'playwright/.auth/admin.json' });
const table = new TableClass();
const table1 = new TableClass();
const user = new UserClass();
const domain = new Domain();
test.describe('Explore Assets Discovery', () => {
test.beforeAll(async ({ browser }) => {
const { apiContext, afterAction } = await createNewPage(browser);
await user.create(apiContext);
await domain.create(apiContext);
await table.create(apiContext);
await table1.create(apiContext);
await table.patch({
apiContext,
patchData: [
{
op: 'add',
value: {
type: 'user',
id: user.responseData.id,
},
path: '/owners/0',
},
{
op: 'add',
path: '/domains/0',
value: {
id: domain.responseData.id,
type: 'domain',
name: domain.responseData.name,
displayName: domain.responseData.displayName,
},
},
],
});
await table.delete(apiContext, false);
// await table1.delete(apiContext, false);
await afterAction();
});
test.afterAll(async ({ browser }) => {
const { apiContext, afterAction } = await createNewPage(browser);
await table.delete(apiContext);
await table1.delete(apiContext);
await afterAction();
});
@ -229,4 +254,144 @@ test.describe('Explore Assets Discovery', () => {
page.locator('.ant-popover-inner-content').textContent()
).not.toContain(table1.entityResponseData.name);
});
test('Should not display domain and owner of deleted asset in suggestions when showDeleted is off', async ({
page,
}) => {
await sidebarClick(page, SidebarItem.EXPLORE);
await page.waitForLoadState('networkidle');
await waitForAllLoadersToDisappear(page);
// The user should not be visible in the owners filter when the deleted switch is off
await page.click('[data-testid="search-dropdown-Owners"]');
const searchResOwner = page.waitForResponse(
`/api/v1/search/aggregate?index=dataAsset&field=owners.displayName.keyword*deleted=false*`
);
await page.fill(
'[data-testid="search-input"]',
user.responseData.displayName
);
await searchResOwner;
await waitForAllLoadersToDisappear(page);
await expect(
page
.getByTestId('drop-down-menu')
.getByTestId(user.responseData.displayName)
).not.toBeAttached();
await page.getByTestId('close-btn').click();
// The domain should not be visible in the domains filter when the deleted switch is off
await page.click('[data-testid="search-dropdown-Domains"]');
const searchResDomain = page.waitForResponse(
`/api/v1/search/aggregate?index=dataAsset&field=domains.displayName.keyword*deleted=false*`
);
await page.fill(
'[data-testid="search-input"]',
domain.responseData.displayName
);
await searchResDomain;
await waitForAllLoadersToDisappear(page);
await expect(
page
.getByTestId('drop-down-menu')
.getByTestId(domain.responseData.displayName)
).not.toBeAttached();
await page.getByTestId('close-btn').click();
});
test('Should display domain and owner of deleted asset in suggestions when showDeleted is on', async ({
page,
}) => {
await sidebarClick(page, SidebarItem.EXPLORE);
await page.waitForLoadState('networkidle');
await waitForAllLoadersToDisappear(page);
// Click on the show deleted toggle button
await page.getByTestId('show-deleted').click();
await page.waitForLoadState('networkidle');
await waitForAllLoadersToDisappear(page);
// The user should be visible in the owners filter when the deleted switch is on
const ownerSearchText = user.responseData.displayName.toLowerCase();
await page.click('[data-testid="search-dropdown-Owners"]');
const searchResOwner = page.waitForResponse(
`/api/v1/search/aggregate?index=dataAsset&field=owners.displayName.keyword*deleted=true*`
);
await page.fill('[data-testid="search-input"]', ownerSearchText);
await searchResOwner;
await waitForAllLoadersToDisappear(page);
await expect(
page.getByTestId('drop-down-menu').getByTestId(ownerSearchText)
).toBeAttached();
await page
.getByTestId('drop-down-menu')
.getByTestId(ownerSearchText)
.click();
const fetchWithOwner = page.waitForResponse(
`/api/v1/search/query?*deleted=true*owners.displayName.keyword*${ownerSearchText}*`
);
await page.getByTestId('update-btn').click();
await fetchWithOwner;
await page.waitForLoadState('networkidle');
await waitForAllLoadersToDisappear(page);
// The domain should be visible in the domains filter when the deleted switch is on
const domainSearchText = domain.responseData.displayName.toLowerCase();
await page.click('[data-testid="search-dropdown-Domains"]');
const searchResDomain = page.waitForResponse(
`/api/v1/search/aggregate?index=dataAsset&field=domains.displayName.keyword*deleted=true*`
);
await page.fill('[data-testid="search-input"]', domainSearchText);
await searchResDomain;
await waitForAllLoadersToDisappear(page);
await expect(
page.getByTestId('drop-down-menu').getByTestId(domainSearchText)
).toBeAttached();
await page
.getByTestId('drop-down-menu')
.getByTestId(domainSearchText)
.click();
const fetchWithDomain = page.waitForResponse(
`/api/v1/search/query?*deleted=true*domains.displayName.keyword*${getEncodedFqn(
domainSearchText,
true
)}*`
);
await page.getByTestId('update-btn').click();
await fetchWithDomain;
await page.waitForLoadState('networkidle');
await waitForAllLoadersToDisappear(page);
// Only the table option should be visible for the data assets filter when the deleted switch is on
// with the owner and domain filter applied
await page.click('[data-testid="search-dropdown-Data Assets"]');
await expect(
page.getByTestId('drop-down-menu').getByTestId('table')
).toBeAttached();
});
});

View File

@ -94,7 +94,8 @@ const ExploreQuickFilters: FC<ExploreQuickFiltersProps> = ({
key,
'',
JSON.stringify(combinedQueryFilter),
independent
independent,
showDeleted
),
key === TIER_FQN_KEY
? getTags({ parent: 'Tier', limit: 50 })
@ -159,7 +160,8 @@ const ExploreQuickFilters: FC<ExploreQuickFiltersProps> = ({
key,
value,
JSON.stringify(combinedQueryFilter),
independent
independent,
showDeleted
);
const buckets = res.data.aggregations[`sterms#${key}`].buckets;

View File

@ -168,7 +168,8 @@ export const getAggregateFieldOptions = (
field: string,
value: string,
q: string,
sourceFields?: string
sourceFields?: string,
deleted = false
) => {
const withWildCardValue = value
? `.*${escapeESReservedCharacters(value)}.*`
@ -179,6 +180,7 @@ export const getAggregateFieldOptions = (
value: withWildCardValue,
q,
sourceFields,
deleted,
};
return APIClient.get<SearchResponse<ExploreSearchIndex>>(

View File

@ -200,41 +200,6 @@ export const extractTermKeys = (objects: QueryFieldInterface[]): string[] => {
return termKeys;
};
export const getSubLevelHierarchyKey = (
isDatabaseHierarchy = false,
filterField?: ExploreQuickFilterField[],
key?: EntityFields,
value?: string
) => {
const queryFilter = {
query: { bool: {} },
};
if ((key && value) || filterField) {
(queryFilter.query.bool as EsBoolQuery).must = isUndefined(filterField)
? { term: { [key ?? '']: value } }
: getExploreQueryFilterMust(filterField);
}
const bucketMapping = isDatabaseHierarchy
? {
[EntityFields.SERVICE_TYPE]: EntityFields.SERVICE,
[EntityFields.SERVICE]: EntityFields.DATABASE_DISPLAY_NAME,
[EntityFields.DATABASE_DISPLAY_NAME]:
EntityFields.DATABASE_SCHEMA_DISPLAY_NAME,
[EntityFields.DATABASE_SCHEMA_DISPLAY_NAME]: EntityFields.ENTITY_TYPE,
}
: {
[EntityFields.SERVICE_TYPE]: EntityFields.SERVICE,
[EntityFields.SERVICE]: EntityFields.ENTITY_TYPE,
};
return {
bucket: bucketMapping[key as DatabaseFields] ?? EntityFields.SERVICE_TYPE,
queryFilter,
};
};
export const getExploreQueryFilterMust = (data: ExploreQuickFilterField[]) => {
const must = [] as Array<QueryFieldInterface>;
@ -281,6 +246,41 @@ export const getExploreQueryFilterMust = (data: ExploreQuickFilterField[]) => {
return must;
};
export const getSubLevelHierarchyKey = (
isDatabaseHierarchy = false,
filterField?: ExploreQuickFilterField[],
key?: EntityFields,
value?: string
) => {
const queryFilter = {
query: { bool: {} },
};
if ((key && value) || filterField) {
(queryFilter.query.bool as EsBoolQuery).must = isUndefined(filterField)
? { term: { [key ?? '']: value } }
: getExploreQueryFilterMust(filterField);
}
const bucketMapping = isDatabaseHierarchy
? {
[EntityFields.SERVICE_TYPE]: EntityFields.SERVICE,
[EntityFields.SERVICE]: EntityFields.DATABASE_DISPLAY_NAME,
[EntityFields.DATABASE_DISPLAY_NAME]:
EntityFields.DATABASE_SCHEMA_DISPLAY_NAME,
[EntityFields.DATABASE_SCHEMA_DISPLAY_NAME]: EntityFields.ENTITY_TYPE,
}
: {
[EntityFields.SERVICE_TYPE]: EntityFields.SERVICE,
[EntityFields.SERVICE]: EntityFields.ENTITY_TYPE,
};
return {
bucket: bucketMapping[key as DatabaseFields] ?? EntityFields.SERVICE_TYPE,
queryFilter,
};
};
export const updateTreeData = (
list: ExploreTreeNode[],
key: React.Key,
@ -360,11 +360,12 @@ export const getAggregationOptions = async (
key: string,
value: string,
filter: string,
isIndependent: boolean
isIndependent: boolean,
deleted = false
) => {
return isIndependent
? postAggregateFieldOptions(index, key, value, filter)
: getAggregateFieldOptions(index, key, value, filter);
: getAggregateFieldOptions(index, key, value, filter, undefined, deleted);
};
export const updateTreeDataWithCounts = (
@ -412,7 +413,7 @@ export const isElasticsearchError = (error: unknown): boolean => {
return false;
}
const data = axiosError.response.data as Record<string, any>;
const data = axiosError.response.data as Record<string, unknown>;
const message = data.message as string;
return (