mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-12-27 23:48:19 +00:00
* Fix #6744: allow more than one metadata ingestion workflow * Rename to markDeletedTablesFromFilterOnly * ui support for new field markDeletedTablesFromFilterOnly Co-authored-by: Chirag Madlani <12962843+chirag-madlani@users.noreply.github.com>
This commit is contained in:
parent
322f6445e1
commit
404cc67911
@ -72,6 +72,7 @@
|
||||
},
|
||||
"oracleConnectionType": {
|
||||
"title": "Oracle Connection Type",
|
||||
"type": "object",
|
||||
"description": "Connect with oracle by either passing service name or database schema name.",
|
||||
"oneOf": [
|
||||
{
|
||||
|
||||
@ -126,6 +126,11 @@
|
||||
"type": "boolean",
|
||||
"default": true
|
||||
},
|
||||
"markDeletedTablesFromFilterOnly": {
|
||||
"description": "Optional configuration to mark deleted tables only to the filtered schema",
|
||||
"type": "boolean",
|
||||
"default": false
|
||||
},
|
||||
"includeTables": {
|
||||
"description": "Optional configuration to turn off fetching metadata for tables.",
|
||||
"type": "boolean",
|
||||
|
||||
@ -137,23 +137,24 @@ class CommonDbSourceService(
|
||||
),
|
||||
)
|
||||
|
||||
def get_raw_database_schema_names(self) -> Iterable[str]:
|
||||
if self.service_connection.__dict__.get("databaseSchema"):
|
||||
yield self.service_connection.databaseSchema
|
||||
else:
|
||||
for schema_name in self.inspector.get_schema_names():
|
||||
yield schema_name
|
||||
|
||||
def get_database_schema_names(self) -> Iterable[str]:
|
||||
"""
|
||||
return schema names
|
||||
"""
|
||||
if self.service_connection.__dict__.get("databaseSchema"):
|
||||
yield self.service_connection.databaseSchema
|
||||
|
||||
else:
|
||||
for schema_name in self.inspector.get_schema_names():
|
||||
|
||||
if filter_by_schema(
|
||||
self.source_config.schemaFilterPattern, schema_name=schema_name
|
||||
):
|
||||
self.status.filter(schema_name, "Schema pattern not allowed")
|
||||
continue
|
||||
|
||||
yield schema_name
|
||||
for schema_name in self.get_raw_database_schema_names():
|
||||
if filter_by_schema(
|
||||
self.source_config.schemaFilterPattern, schema_name=schema_name
|
||||
):
|
||||
self.status.filter(schema_name, "Schema pattern not allowed")
|
||||
continue
|
||||
yield schema_name
|
||||
|
||||
def yield_database_schema(
|
||||
self, schema_name: str
|
||||
|
||||
@ -337,6 +337,12 @@ class DatabaseServiceSource(DBTMixin, TopologyRunnerMixin, Source, ABC):
|
||||
"""
|
||||
return
|
||||
|
||||
def get_raw_database_schema_names(self) -> Iterable[str]:
|
||||
"""
|
||||
fetch all schema names without any filtering.
|
||||
"""
|
||||
yield from self.get_database_schema_names()
|
||||
|
||||
def yield_datamodel(
|
||||
self, table_name_and_type: Tuple[str, TableType]
|
||||
) -> Iterable[DataModelLink]:
|
||||
@ -440,12 +446,12 @@ class DatabaseServiceSource(DBTMixin, TopologyRunnerMixin, Source, ABC):
|
||||
self.database_source_state.add(table_fqn)
|
||||
self.status.scanned(table_fqn)
|
||||
|
||||
def delete_database_tables(self, database_fqn: str) -> Iterable[DeleteTable]:
|
||||
def delete_schema_tables(self, schema_fqn: str) -> Iterable[DeleteTable]:
|
||||
"""
|
||||
Returns Deleted tables
|
||||
"""
|
||||
database_state = self.metadata.list_all_entities(
|
||||
entity=Table, params={"database": database_fqn}
|
||||
entity=Table, params={"database": schema_fqn}
|
||||
)
|
||||
for table in database_state:
|
||||
if str(table.fullyQualifiedName.__root__) not in self.database_source_state:
|
||||
@ -459,10 +465,18 @@ class DatabaseServiceSource(DBTMixin, TopologyRunnerMixin, Source, ABC):
|
||||
logger.info(
|
||||
f"Mark Deleted Tables set to True. Processing database [{self.context.database.name.__root__}]"
|
||||
)
|
||||
databse_fqn = fqn.build(
|
||||
self.metadata,
|
||||
entity_type=Database,
|
||||
service_name=self.config.serviceName,
|
||||
database_name=self.context.database.name.__root__,
|
||||
schema_names_list = (
|
||||
self.get_database_schema_names()
|
||||
if self.source_config.markDeletedTablesFromFilterOnly
|
||||
else self.get_raw_database_schema_names()
|
||||
)
|
||||
yield from self.delete_database_tables(databse_fqn)
|
||||
for schema_name in schema_names_list:
|
||||
schema_fqn = fqn.build(
|
||||
self.metadata,
|
||||
entity_type=DatabaseSchema,
|
||||
service_name=self.config.serviceName,
|
||||
database_name=self.context.database.name.__root__,
|
||||
schema_name=schema_name,
|
||||
)
|
||||
|
||||
yield from self.delete_schema_tables(schema_fqn)
|
||||
|
||||
@ -154,6 +154,15 @@ const AddIngestion = ({
|
||||
)
|
||||
: undefined
|
||||
);
|
||||
const [markDeletedTablesFromFilterOnly, setMarkDeletedTablesFromFilterOnly] =
|
||||
useState(
|
||||
isDatabaseService
|
||||
? Boolean(
|
||||
(data?.sourceConfig.config as ConfigClass)
|
||||
?.markDeletedTablesFromFilterOnly ?? false
|
||||
)
|
||||
: undefined
|
||||
);
|
||||
const [includeView, setIncludeView] = useState(
|
||||
Boolean((data?.sourceConfig.config as ConfigClass)?.includeViews)
|
||||
);
|
||||
@ -400,6 +409,7 @@ const AddIngestion = ({
|
||||
showTableFilter
|
||||
),
|
||||
markDeletedTables,
|
||||
markDeletedTablesFromFilterOnly,
|
||||
...DatabaseConfigData,
|
||||
type: ConfigType.DatabaseMetadata,
|
||||
};
|
||||
@ -636,6 +646,9 @@ const AddIngestion = ({
|
||||
handleIngestSampleData={() => setIngestSampleData((pre) => !pre)}
|
||||
handleIngestionName={(val) => setIngestionName(val)}
|
||||
handleMarkDeletedTables={() => setMarkDeletedTables((pre) => !pre)}
|
||||
handleMarkDeletedTablesFromFilterOnly={() =>
|
||||
setMarkDeletedTablesFromFilterOnly((pre) => !pre)
|
||||
}
|
||||
handleProfileSample={(val) => setProfileSample(val)}
|
||||
handleQueryLogDuration={(val) => setQueryLogDuration(val)}
|
||||
handleResultLimit={setResultLimit}
|
||||
@ -648,6 +661,7 @@ const AddIngestion = ({
|
||||
ingestSampleData={ingestSampleData}
|
||||
ingestionName={ingestionName}
|
||||
markDeletedTables={markDeletedTables}
|
||||
markDeletedTablesFromFilterOnly={markDeletedTablesFromFilterOnly}
|
||||
pipelineFilterPattern={pipelineFilterPattern}
|
||||
pipelineType={pipelineType}
|
||||
profileSample={profileSample}
|
||||
|
||||
@ -40,6 +40,7 @@ const ConfigureIngestion = ({
|
||||
includeView,
|
||||
includeTags,
|
||||
markDeletedTables,
|
||||
markDeletedTablesFromFilterOnly,
|
||||
serviceCategory,
|
||||
pipelineType,
|
||||
showDatabaseFilter,
|
||||
@ -66,6 +67,7 @@ const ConfigureIngestion = ({
|
||||
handleIncludeView,
|
||||
handleIncludeTags,
|
||||
handleMarkDeletedTables,
|
||||
handleMarkDeletedTablesFromFilterOnly,
|
||||
handleIngestSampleData,
|
||||
handleDatasetServiceName,
|
||||
handleQueryLogDuration,
|
||||
@ -228,6 +230,27 @@ const ConfigureIngestion = ({
|
||||
{getSeparator('')}
|
||||
</Field>
|
||||
)}
|
||||
{!isNil(markDeletedTablesFromFilterOnly) && (
|
||||
<Field>
|
||||
<div className="tw-flex tw-gap-1">
|
||||
<label>Mark Deleted Tables from Filter Only</label>
|
||||
<ToggleSwitchV1
|
||||
checked={markDeletedTablesFromFilterOnly}
|
||||
handleCheck={() => {
|
||||
if (handleMarkDeletedTablesFromFilterOnly) {
|
||||
handleMarkDeletedTablesFromFilterOnly();
|
||||
}
|
||||
}}
|
||||
testId="mark-deleted-filter-only"
|
||||
/>
|
||||
</div>
|
||||
<p className="tw-text-grey-muted tw-mt-3">
|
||||
Optional configuration to mark deleted tables only to the
|
||||
filtered schema
|
||||
</p>
|
||||
{getSeparator('')}
|
||||
</Field>
|
||||
)}
|
||||
</div>
|
||||
</>
|
||||
);
|
||||
|
||||
@ -70,6 +70,7 @@ export interface ConfigureIngestionProps {
|
||||
includeView: boolean;
|
||||
includeTags: boolean;
|
||||
markDeletedTables?: boolean;
|
||||
markDeletedTablesFromFilterOnly?: boolean;
|
||||
enableDebugLog: boolean;
|
||||
profileSample?: number;
|
||||
ingestSampleData: boolean;
|
||||
@ -92,6 +93,7 @@ export interface ConfigureIngestionProps {
|
||||
handleIncludeView: () => void;
|
||||
handleIncludeTags: () => void;
|
||||
handleMarkDeletedTables?: () => void;
|
||||
handleMarkDeletedTablesFromFilterOnly?: () => void;
|
||||
handleEnableDebugLog: () => void;
|
||||
handleIngestSampleData: () => void;
|
||||
getIncludeValue: (value: string[], type: FilterPatternEnum) => void;
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user