Snowflake optional config (#3319)

* Update AirflowUtils.java

* Update databaseServiceMetadataPipeline.json

* Updated Account and schema description

* snowfliks ui support (#3318)

* added lineage for snowflake

* Formatted file

* initialized metadata_config

Co-authored-by: Shailesh Parmar <shailesh.parmar.webdev@gmail.com>
Co-authored-by: Ayush Shah <ayush@getcollate.io>
This commit is contained in:
Mayur Singal 2022-03-09 17:55:37 +05:30 committed by GitHub
parent 65c4030fb1
commit 40e7d37244
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 101 additions and 7 deletions

View File

@ -58,6 +58,8 @@ public final class AirflowUtils {
public static final String INGESTION_CONNECTION_ARGS = "connect_args";
public static final String INGESTION_USAGE_STAGE_FILE_PATH = "filename";
public static final String INGESTION_STATUS = "status";
public static final String INGESTION_ACCOUNT = "account";
public static final String INGESTION_WAREHOUSE = "warehouse";
private AirflowUtils() {}
@ -98,6 +100,12 @@ public final class AirflowUtils {
if (databaseServiceMetadataPipeline.getTableFilterPattern() != null) {
dbConfig.put(INGESTION_TABLE_FILTER_PATTERN, databaseServiceMetadataPipeline.getTableFilterPattern());
}
if (databaseServiceMetadataPipeline.getAccount() != null) {
dbConfig.put(INGESTION_ACCOUNT, databaseServiceMetadataPipeline.getAccount());
}
if (databaseServiceMetadataPipeline.getWarehouse() != null) {
dbConfig.put(INGESTION_WAREHOUSE, databaseServiceMetadataPipeline.getWarehouse());
}
dbConfig.put(INGESTION_MARK_DELETED_TABLES, databaseServiceMetadataPipeline.getMarkDeletedTables());
dbConfig.put(INGESTION_DBT_CATALOG_FILE_PATH, databaseServiceMetadataPipeline.getDbtCatalogFilePath());
dbConfig.put(INGESTION_DBT_MANIFEST_FILE_PATH, databaseServiceMetadataPipeline.getDbtManifestFilePath());

View File

@ -50,6 +50,16 @@
"type": "string",
"default": "select * from {}.{} limit 50"
},
"warehouse": {
"description": "Optional Warehouse.",
"type": "string",
"default": null
},
"account": {
"description": "Optional Account.",
"type": "string",
"default": null
},
"enableDataProfiler": {
"description": "Run data profiler as part of this metadata ingestion to get table profile data.",
"type": "boolean",

View File

@ -27,7 +27,7 @@ from metadata.ingestion.source.sql_alchemy_helper import (
SQLAlchemyHelper,
SQLSourceStatus,
)
from metadata.utils.helpers import get_start_and_end
from metadata.utils.helpers import get_start_and_end, ingest_lineage
from metadata.utils.sql_queries import SNOWFLAKE_SQL_STATEMENT
@ -75,6 +75,7 @@ class SnowflakeUsageSource(Source[TableQuery]):
self.config = config
start, end = get_start_and_end(config.duration)
self.analysis_date = start
self.metadata_config = metadata_config
self.sql_stmt = SnowflakeUsageSource.SQL_STATEMENT.format(
start_date=start, end_date=end
)
@ -126,6 +127,13 @@ class SnowflakeUsageSource(Source[TableQuery]):
else:
self.report.scanned(f"{row['database_name']}")
yield table_query
query_info = {
"sql": table_query.sql,
"from_type": "table",
"to_type": "table",
"service_name": self.config.service_name,
}
ingest_lineage(query_info, self.metadata_config)
def get_report(self):
"""

View File

@ -42,7 +42,10 @@ import {
} from '../../../generated/api/operations/pipelines/createAirflowPipeline';
import { DashboardServiceType } from '../../../generated/entity/services/dashboardService';
// import { DashboardService } from '../../../generated/entity/services/dashboardService';
import { DatabaseService } from '../../../generated/entity/services/databaseService';
import {
DatabaseService,
DatabaseServiceType,
} from '../../../generated/entity/services/databaseService';
import {
MessagingService,
MessagingServiceType,
@ -336,6 +339,9 @@ export const AddServiceModal: FunctionComponent<Props> = ({
DynamicFormFieldType[]
>(getKeyValuePair(data?.databaseConnection?.connectionArguments || {}) || []);
const [warehouse, setWarehouse] = useState('');
const [account, setAccount] = useState('');
const markdownRef = useRef<EditorContentRef>();
const getBrokerUrlPlaceholder = (): string => {
@ -548,6 +554,8 @@ export const AddServiceModal: FunctionComponent<Props> = ({
pipelineConfig: {
schema: Schema.DatabaseServiceMetadataPipeline,
config: {
warehouse: warehouse || undefined,
account: account || undefined,
includeViews: value.includeView,
generateSampleData: value.ingestSampleData,
enableDataProfiler: value.enableDataProfiler,
@ -870,6 +878,42 @@ export const AddServiceModal: FunctionComponent<Props> = ({
/>
</Field>
{/* optional filed for snowflik */}
{selectService === DatabaseServiceType.Snowflake && (
<div className="tw-mt-4 tw-grid tw-grid-cols-2 tw-gap-2">
<div>
<label className="tw-block tw-form-label" htmlFor="warehouse">
Warehouse:
</label>
<input
className="tw-form-inputs tw-px-3 tw-py-1"
data-testid="warehouse"
id="warehouse"
name="warehouse"
placeholder="Warehouse name"
type="text"
value={warehouse}
onChange={(e) => setWarehouse(e.target.value)}
/>
</div>
<div>
<label className="tw-block tw-form-label" htmlFor="account">
Account:
</label>
<input
className="tw-form-inputs tw-px-3 tw-py-1"
data-testid="account"
id="account"
name="account"
placeholder="Account name"
type="text"
value={account}
onChange={(e) => setAccount(e.target.value)}
/>
</div>
</div>
)}
<div data-testid="connection-options">
<div className="tw-flex tw-items-center tw-mt-6">
<p className="w-form-label tw-mr-3">Connection Options</p>

View File

@ -127,11 +127,15 @@ export interface EntityReference {
* OpenMetadata Pipeline Config.
*/
export interface PipelineConfig {
config?: any[] | boolean | number | null | ConfigObject | string;
config?: any[] | boolean | ConfigClass | number | null | string;
schema?: Schema;
}
export interface ConfigObject {
export interface ConfigClass {
/**
* Sample data extraction query.
*/
account?: string;
/**
* DBT catalog file to extract dbt models with their column schemas.
*/
@ -169,6 +173,10 @@ export interface ConfigObject {
* Regex exclude tables or databases that matches the pattern.
*/
tableFilterPattern?: FilterPattern;
/**
* Sample data extraction query.
*/
warehouse?: string;
/**
* Configuration to tune how far we want to look back in query logs to process usage data.
*/

View File

@ -210,11 +210,15 @@ export interface EntityReference {
* OpenMetadata Pipeline Config.
*/
export interface PipelineConfig {
config?: any[] | boolean | number | null | ConfigObject | string;
config?: any[] | boolean | ConfigClass | number | null | string;
schema?: Schema;
}
export interface ConfigObject {
export interface ConfigClass {
/**
* Sample data extraction query.
*/
account?: string;
/**
* DBT catalog file to extract dbt models with their column schemas.
*/
@ -252,6 +256,10 @@ export interface ConfigObject {
* Regex exclude tables or databases that matches the pattern.
*/
tableFilterPattern?: FilterPattern;
/**
* Sample data extraction query.
*/
warehouse?: string;
/**
* Configuration to tune how far we want to look back in query logs to process usage data.
*/

View File

@ -12,7 +12,11 @@
* limitations under the License.
*/
export interface DatabaseServiceMetadataPipelineObject {
export interface DatabaseServiceMetadataPipelineClass {
/**
* Sample data extraction query.
*/
account?: string;
/**
* DBT catalog file to extract dbt models with their column schemas.
*/
@ -50,6 +54,10 @@ export interface DatabaseServiceMetadataPipelineObject {
* Regex exclude tables or databases that matches the pattern.
*/
tableFilterPattern?: FilterPattern;
/**
* Sample data extraction query.
*/
warehouse?: string;
}
/**