773 lines
32 KiB
Python
Raw Permalink Normal View History

import json
import random
from datetime import datetime, timezone
from datahub.configuration.common import AllowDenyPattern
from datahub.configuration.time_window_config import BucketDuration
from datahub.ingestion.source.snowflake import snowflake_query
from datahub.ingestion.source.snowflake.snowflake_query import SnowflakeQuery
from datahub.utilities.prefix_batch_builder import PrefixGroup
NUM_TABLES = 10
NUM_VIEWS = 2
NUM_STREAMS = 1
NUM_COLS = 10
NUM_OPS = 10
NUM_USAGE = 0
def is_secure(view_idx):
return view_idx == 1
FROZEN_TIME = "2022-06-07 17:00:00"
large_sql_query = """WITH object_access_history AS
(
select
object.value : "objectName"::varchar AS object_name,
object.value : "objectDomain"::varchar AS object_domain,
object.value : "columns" AS object_columns,
query_start_time,
query_id,
user_name
from
(
select
query_id,
query_start_time,
user_name,
-- Construct the email in the query, should match the Python behavior.
-- The user_email is only used by the email_filter_query.
NVL(USERS.email, CONCAT(LOWER(user_name), '')) AS user_email,
DIRECT_OBJECTS_ACCESSED
from
snowflake.account_usage.access_history
LEFT JOIN
snowflake.account_usage.users USERS
ON user_name = users.name
WHERE
query_start_time >= to_timestamp_ltz(1710720000000, 3)
AND query_start_time < to_timestamp_ltz(1713332173148, 3)
)
t,
lateral flatten(input => t.DIRECT_OBJECTS_ACCESSED) object
where
NOT RLIKE(object_name,'.*\\.FIVETRAN_.*_STAGING\\..*','i') AND NOT RLIKE(object_name,'.*__DBT_TMP$','i') AND NOT RLIKE(object_name,'.*\\.SEGMENT_[a-f0-9]{8}[-_][a-f0-9]{4}[-_][a-f0-9]{4}[-_][a-f0-9]{4}[-_][a-f0-9]{12}','i') AND NOT RLIKE(object_name,'.*\\.STAGING_.*_[a-f0-9]{8}[-_][a-f0-9]{4}[-_][a-f0-9]{4}[-_][a-f0-9]{4}[-_][a-f0-9]{12}','i')
)
,
field_access_history AS
(
select
o.*,
col.value : "columnName"::varchar AS column_name
from
object_access_history o,
lateral flatten(input => o.object_columns) col
)
,
basic_usage_counts AS
(
SELECT
object_name,
ANY_VALUE(object_domain) AS object_domain,
DATE_TRUNC('DAY', CONVERT_TIMEZONE('UTC', query_start_time)) AS bucket_start_time,
count(distinct(query_id)) AS total_queries,
count( distinct(user_name) ) AS total_users
FROM
object_access_history
GROUP BY
bucket_start_time,
object_name
)
,
field_usage_counts AS
(
SELECT
object_name,
column_name,
DATE_TRUNC('DAY', CONVERT_TIMEZONE('UTC', query_start_time)) AS bucket_start_time,
count(distinct(query_id)) AS total_queries
FROM
field_access_history
GROUP BY
bucket_start_time,
object_name,
column_name
)
,
user_usage_counts AS
(
SELECT
object_name,
DATE_TRUNC('DAY', CONVERT_TIMEZONE('UTC', query_start_time)) AS bucket_start_time,
count(distinct(query_id)) AS total_queries,
user_name,
ANY_VALUE(users.email) AS user_email
FROM
object_access_history
LEFT JOIN
snowflake.account_usage.users users
ON user_name = users.name
GROUP BY
bucket_start_time,
object_name,
user_name
)
,
top_queries AS
(
SELECT
object_name,
DATE_TRUNC('DAY', CONVERT_TIMEZONE('UTC', query_start_time)) AS bucket_start_time,
query_history.query_text AS query_text,
count(distinct(access_history.query_id)) AS total_queries
FROM
object_access_history access_history
LEFT JOIN
(
SELECT * FROM snowflake.account_usage.query_history
WHERE query_history.start_time >= to_timestamp_ltz(1710720000000, 3)
AND query_history.start_time < to_timestamp_ltz(1713332173148, 3)
) query_history
ON access_history.query_id = query_history.query_id
GROUP BY
bucket_start_time,
object_name,
query_text
QUALIFY row_number() over ( partition by bucket_start_time, object_name
order by
total_queries desc, query_text asc ) <= 10
)
select
basic_usage_counts.object_name AS "OBJECT_NAME",
basic_usage_counts.bucket_start_time AS "BUCKET_START_TIME",
ANY_VALUE(basic_usage_counts.object_domain) AS "OBJECT_DOMAIN",
ANY_VALUE(basic_usage_counts.total_queries) AS "TOTAL_QUERIES",
ANY_VALUE(basic_usage_counts.total_users) AS "TOTAL_USERS",
ARRAY_UNIQUE_AGG(top_queries.query_text) AS "TOP_SQL_QUERIES",
ARRAY_UNIQUE_AGG(OBJECT_CONSTRUCT( 'col', field_usage_counts.column_name, 'total', field_usage_counts.total_queries ) ) AS "FIELD_COUNTS",
ARRAY_UNIQUE_AGG(OBJECT_CONSTRUCT( 'user_name', user_usage_counts.user_name, 'email', user_usage_counts.user_email, 'total', user_usage_counts.total_queries ) ) AS "USER_COUNTS"
from
basic_usage_counts basic_usage_counts
left join
top_queries top_queries
on basic_usage_counts.bucket_start_time = top_queries.bucket_start_time
and basic_usage_counts.object_name = top_queries.object_name
left join
field_usage_counts field_usage_counts
on basic_usage_counts.bucket_start_time = field_usage_counts.bucket_start_time
and basic_usage_counts.object_name = field_usage_counts.object_name
left join
user_usage_counts user_usage_counts
on basic_usage_counts.bucket_start_time = user_usage_counts.bucket_start_time
and basic_usage_counts.object_name = user_usage_counts.object_name
where
basic_usage_counts.object_domain in ('Table','External table','View','Materialized view','Iceberg table')
and basic_usage_counts.object_name is not null
group by
basic_usage_counts.object_name,
basic_usage_counts.bucket_start_time
order by
basic_usage_counts.bucket_start_time
"""
class RowCountList(list):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
@property
def rowcount(self):
return len(self)
def inject_rowcount(func):
def wrapper(*args, **kwargs):
result = func(*args, **kwargs)
if result is None or isinstance(result, RowCountList):
return result
if not isinstance(result, list):
raise ValueError(f"Mocked result is not a list: {result}")
result = RowCountList(result)
return result
return wrapper
@inject_rowcount
def default_query_results( # noqa: C901
query,
num_tables=NUM_TABLES,
num_views=NUM_VIEWS,
num_streams=NUM_STREAMS,
num_cols=NUM_COLS,
num_ops=NUM_OPS,
num_usages=NUM_USAGE,
):
if query == SnowflakeQuery.current_account():
return [{"CURRENT_ACCOUNT()": "ABC12345"}]
if query == SnowflakeQuery.current_region():
return [{"CURRENT_REGION()": "AWS_AP_SOUTH_1"}]
if query == SnowflakeQuery.show_tags():
return []
if query == SnowflakeQuery.current_role():
return [{"CURRENT_ROLE()": "TEST_ROLE"}]
elif query == SnowflakeQuery.current_version():
return [{"CURRENT_VERSION()": "X.Y.Z"}]
elif query == SnowflakeQuery.current_warehouse():
return [{"CURRENT_WAREHOUSE()": "TEST_WAREHOUSE"}]
elif query == SnowflakeQuery.show_databases():
return [
{
"name": "TEST_DB",
"created_on": datetime(2021, 6, 8, 0, 0, 0, 0),
"comment": "Comment for TEST_DB",
}
]
elif query == SnowflakeQuery.get_databases("TEST_DB"):
return [
{
"DATABASE_NAME": "TEST_DB",
"CREATED": datetime(2021, 6, 8, 0, 0, 0, 0),
"LAST_ALTERED": datetime(2021, 6, 8, 0, 0, 0, 0),
"COMMENT": "Comment for TEST_DB",
}
]
elif query == SnowflakeQuery.schemas_for_database("TEST_DB"):
return [
{
"SCHEMA_NAME": "TEST_SCHEMA",
"CREATED": datetime(2021, 6, 8, 0, 0, 0, 0),
"LAST_ALTERED": datetime(2021, 6, 8, 0, 0, 0, 0),
"COMMENT": "comment for TEST_DB.TEST_SCHEMA",
},
{
"SCHEMA_NAME": "TEST2_SCHEMA",
"CREATED": datetime(2021, 6, 8, 0, 0, 0, 0),
"LAST_ALTERED": datetime(2021, 6, 8, 0, 0, 0, 0),
"COMMENT": "comment for TEST_DB.TEST_SCHEMA",
},
]
elif query == SnowflakeQuery.tables_for_database("TEST_DB"):
raise Exception("Information schema query returned too much data")
elif query == SnowflakeQuery.tables_for_schema("TEST_SCHEMA", "TEST_DB"):
return [
{
"TABLE_SCHEMA": "TEST_SCHEMA",
"TABLE_NAME": f"TABLE_{tbl_idx}",
"TABLE_TYPE": "BASE TABLE",
"CREATED": datetime(2021, 6, 8, 0, 0, 0, 0),
"LAST_ALTERED": datetime(2021, 6, 8, 0, 0, 0, 0),
"BYTES": 1024,
"ROW_COUNT": 10000,
"COMMENT": "Comment for Table",
"CLUSTERING_KEY": "LINEAR(COL_1)",
"IS_ICEBERG": "YES" if tbl_idx == 1 else "NO",
"IS_DYNAMIC": "YES" if tbl_idx == 2 else "NO",
"IS_HYBRID": "YES" if tbl_idx == 3 else "NO",
}
for tbl_idx in range(1, num_tables + 1)
]
elif query == SnowflakeQuery.show_views_for_database("TEST_DB"):
# TODO: Add tests for view pagination.
return [
{
"schema_name": "TEST_SCHEMA",
"name": f"VIEW_{view_idx}",
"created_on": datetime(2021, 6, 8, 0, 0, 0, 0),
"comment": "Comment for View",
"is_secure": "true" if is_secure(view_idx) else "false",
"text": (
f"create view view_{view_idx} as select * from table_{view_idx}"
if not is_secure(view_idx)
else None
),
}
for view_idx in range(1, num_views + 1)
]
elif query == SnowflakeQuery.get_secure_view_definitions():
return [
{
"TABLE_CATALOG": "TEST_DB",
"TABLE_SCHEMA": "TEST_SCHEMA",
"TABLE_NAME": f"VIEW_{view_idx}",
"VIEW_DEFINITION": f"create view view_{view_idx} as select * from table_{view_idx}",
}
for view_idx in range(1, num_views + 1)
if is_secure(view_idx)
]
elif query == SnowflakeQuery.columns_for_schema(
"TEST_SCHEMA",
"TEST_DB",
[PrefixGroup(prefix="TABLE_1", names=[], exact_match=True)],
):
return [
{
"TABLE_CATALOG": "TEST_DB",
"TABLE_SCHEMA": "TEST_SCHEMA",
"TABLE_NAME": "TABLE_1",
"COLUMN_NAME": f"COL_{col_idx}",
"ORDINAL_POSITION": col_idx,
"IS_NULLABLE": "NO",
"DATA_TYPE": "TEXT" if col_idx > 1 else "NUMBER",
"COMMENT": "Comment for column",
"CHARACTER_MAXIMUM_LENGTH": 255 if col_idx > 1 else None,
"NUMERIC_PRECISION": None if col_idx > 1 else 38,
"NUMERIC_SCALE": None if col_idx > 1 else 0,
}
for col_idx in range(1, num_cols + 1)
]
elif query == SnowflakeQuery.columns_for_schema("TEST_SCHEMA", "TEST_DB"):
return [
{
"TABLE_CATALOG": "TEST_DB",
"TABLE_SCHEMA": "TEST_SCHEMA",
"TABLE_NAME": table_name,
"COLUMN_NAME": f"COL_{col_idx}",
"ORDINAL_POSITION": col_idx,
"IS_NULLABLE": "NO",
"DATA_TYPE": "TEXT" if col_idx > 1 else "NUMBER",
"COMMENT": "Comment for column",
"CHARACTER_MAXIMUM_LENGTH": 255 if col_idx > 1 else None,
"NUMERIC_PRECISION": None if col_idx > 1 else 38,
"NUMERIC_SCALE": None if col_idx > 1 else 0,
}
for table_name in (
[f"TABLE_{tbl_idx}" for tbl_idx in range(1, num_tables + 1)]
+ [f"VIEW_{view_idx}" for view_idx in range(1, num_views + 1)]
)
for col_idx in range(1, num_cols + 1)
]
elif query == SnowflakeQuery.streams_for_database("TEST_DB"):
# TODO: Add tests for stream pagination.
return [
{
"created_on": datetime(2021, 6, 8, 0, 0, 0, 0, tzinfo=timezone.utc),
"name": f"STREAM_{stream_idx}",
"database_name": "TEST_DB",
"schema_name": "TEST_SCHEMA",
"owner": "ACCOUNTADMIN",
"comment": f"Comment for Stream {stream_idx}",
"table_name": f"TEST_DB.TEST_SCHEMA.TABLE_{stream_idx}",
"source_type": "Table",
"base_tables": f"TEST_DB.TEST_SCHEMA.TABLE_{stream_idx}",
"type": "DELTA",
"stale": "false",
"mode": "DEFAULT",
"stale_after": datetime(2021, 6, 22, 0, 0, 0, 0, tzinfo=timezone.utc),
"invalid_reason": None,
"owner_role_type": "ROLE",
}
for stream_idx in range(1, num_streams + 1)
]
elif query in (
SnowflakeQuery.use_database("TEST_DB"),
SnowflakeQuery.show_primary_keys_for_schema("TEST_SCHEMA", "TEST_DB"),
SnowflakeQuery.show_foreign_keys_for_schema("TEST_SCHEMA", "TEST_DB"),
):
return []
elif query == SnowflakeQuery.get_access_history_date_range():
return [
{
"MIN_TIME": datetime(2021, 6, 8, 0, 0, 0, 0),
"MAX_TIME": datetime(2022, 6, 7, 7, 17, 0, 0),
}
]
elif query == snowflake_query.SnowflakeQuery.operational_data_for_time_window(
1654473600000,
1654586220000,
):
return [
{
"QUERY_START_TIME": datetime(2022, 6, 2, 4, 41, 1, 367000).replace(
tzinfo=timezone.utc
),
"QUERY_TEXT": "create or replace table TABLE_{} as select * from TABLE_2 left join TABLE_3 using COL_1 left join TABLE 4 using COL2".format(
op_idx
),
"QUERY_TYPE": "CREATE_TABLE_AS_SELECT",
"ROWS_INSERTED": 0,
"ROWS_UPDATED": 0,
"ROWS_DELETED": 0,
"BASE_OBJECTS_ACCESSED": json.dumps(
[
{
"columns": [
{"columnId": 0, "columnName": f"COL_{col_idx}"}
for col_idx in range(1, num_cols + 1)
],
"objectDomain": "Table",
"objectId": 0,
"objectName": "TEST_DB.TEST_SCHEMA.TABLE_2",
},
{
"columns": [
{"columnId": 0, "columnName": f"COL_{col_idx}"}
for col_idx in range(1, num_cols + 1)
],
"objectDomain": "Table",
"objectId": 0,
"objectName": "TEST_DB.TEST_SCHEMA.TABLE_3",
},
{
"columns": [
{"columnId": 0, "columnName": f"COL_{col_idx}"}
for col_idx in range(1, num_cols + 1)
],
"objectDomain": "Table",
"objectId": 0,
"objectName": "TEST_DB.TEST_SCHEMA.TABLE_4",
},
]
),
"DIRECT_OBJECTS_ACCESSED": json.dumps(
[
{
"columns": [
{"columnId": 0, "columnName": f"COL_{col_idx}"}
for col_idx in range(1, num_cols + 1)
],
"objectDomain": "Table",
"objectId": 0,
"objectName": "TEST_DB.TEST_SCHEMA.TABLE_2",
},
{
"columns": [
{"columnId": 0, "columnName": f"COL_{col_idx}"}
for col_idx in range(1, num_cols + 1)
],
"objectDomain": "Table",
"objectId": 0,
"objectName": "TEST_DB.TEST_SCHEMA.TABLE_3",
},
{
"columns": [
{"columnId": 0, "columnName": f"COL_{col_idx}"}
for col_idx in range(1, num_cols + 1)
],
"objectDomain": "Table",
"objectId": 0,
"objectName": "TEST_DB.TEST_SCHEMA.TABLE_4",
},
]
),
"OBJECTS_MODIFIED": json.dumps(
[
{
"columns": [
{
"columnId": 0,
"columnName": f"COL_{col_idx}",
"directSources": [
{
"columnName": f"COL_{col_idx}",
"objectDomain": "Table",
"objectId": 0,
"objectName": "TEST_DB.TEST_SCHEMA.TABLE_2",
}
],
}
for col_idx in range(1, num_cols + 1)
],
"objectDomain": "Table",
"objectId": 0,
"objectName": f"TEST_DB.TEST_SCHEMA.TABLE_{op_idx}",
}
]
),
"USER_NAME": "SERVICE_ACCOUNT_TESTS_ADMIN",
"FIRST_NAME": None,
"LAST_NAME": None,
"DISPLAY_NAME": "SERVICE_ACCOUNT_TESTS_ADMIN",
"EMAIL": "abc@xyz.com",
"ROLE_NAME": "ACCOUNTADMIN",
}
for op_idx in range(1, num_ops + 1)
]
elif (
query
== snowflake_query.SnowflakeQuery.usage_per_object_per_time_bucket_for_time_window(
1654473600000,
1654586220000,
use_base_objects=False,
top_n_queries=10,
include_top_n_queries=True,
time_bucket_size=BucketDuration.DAY,
email_domain=None,
email_filter=AllowDenyPattern.allow_all(),
)
):
return [
{
"OBJECT_NAME": f"TEST_DB.TEST_SCHEMA.TABLE_{i}{random.randint(99, 999) if i > num_tables else ''}",
"BUCKET_START_TIME": datetime(2022, 6, 6, 0, 0, 0, 0).replace(
tzinfo=timezone.utc
),
"OBJECT_DOMAIN": "Table",
"TOTAL_QUERIES": 10,
"TOTAL_USERS": 1,
"TOP_SQL_QUERIES": json.dumps([large_sql_query for _ in range(10)]),
"FIELD_COUNTS": json.dumps(
[{"col": f"col{c}", "total": 10} for c in range(num_cols)]
),
"USER_COUNTS": json.dumps(
[
{"email": f"abc{i}@xyz.com", "user_name": f"abc{i}", "total": 1}
for i in range(10)
]
),
}
for i in range(num_usages)
]
elif query in (
snowflake_query.SnowflakeQuery.table_to_table_lineage_history_v2(
start_time_millis=1654473600000,
end_time_millis=1654586220000,
include_column_lineage=True,
),
):
return [
{
"DOWNSTREAM_TABLE_NAME": f"TEST_DB.TEST_SCHEMA.TABLE_{op_idx}",
"DOWNSTREAM_TABLE_DOMAIN": "TABLE",
"UPSTREAM_TABLES": json.dumps(
[
{
"upstream_object_name": "TEST_DB.TEST_SCHEMA.TABLE_2",
"upstream_object_domain": "TABLE",
"query_id": f"01b2576e-0804-4957-0034-7d83066cd0ee{op_idx}",
}
]
+ ( # This additional upstream is only for TABLE_1
[
{
"upstream_object_name": "TEST_DB.TEST_SCHEMA.VIEW_1",
"upstream_object_domain": "VIEW",
"query_id": f"01b2576e-0804-4957-0034-7d83066cd0ee{op_idx}",
},
{
"upstream_object_name": "OTHER_DB.OTHER_SCHEMA.TABLE_1",
"upstream_object_domain": "TABLE",
"query_id": f"01b2576e-0804-4957-0034-7d83066cd0ee{op_idx}",
},
]
if op_idx == 1
else []
)
),
"UPSTREAM_COLUMNS": json.dumps(
[
{
"column_name": f"COL_{col_idx}",
"upstreams": [
{
"query_id": f"01b2576e-0804-4957-0034-7d83066cd0ee{op_idx}",
"column_upstreams": [
{
"object_name": "TEST_DB.TEST_SCHEMA.TABLE_2",
"object_domain": "Table",
"column_name": f"COL_{col_idx}",
}
],
}
],
}
for col_idx in range(1, num_cols + 1)
]
+ ( # This additional upstream is only for TABLE_1
[
{
"column_name": "COL_1",
"upstreams": [
{
"query_id": f"01b2576e-0804-4957-0034-7d83066cd0ee{op_idx}",
"column_upstreams": [
{
"object_name": "OTHER_DB.OTHER_SCHEMA.TABLE_1",
"object_domain": "Table",
"column_name": "COL_1",
}
],
}
],
}
]
if op_idx == 1
else []
)
),
"QUERIES": json.dumps(
[
{
"query_text": f"INSERT INTO TEST_DB.TEST_SCHEMA.TABLE_{op_idx} SELECT * FROM TEST_DB.TEST_SCHEMA.TABLE_2",
"query_id": f"01b2576e-0804-4957-0034-7d83066cd0ee{op_idx}",
"start_time": "06-06-2022",
}
]
),
}
for op_idx in range(1, num_ops + 1)
]
elif query in (
snowflake_query.SnowflakeQuery.table_to_table_lineage_history_v2(
start_time_millis=1654473600000,
end_time_millis=1654586220000,
include_column_lineage=False,
),
):
return [
{
"DOWNSTREAM_TABLE_NAME": f"TEST_DB.TEST_SCHEMA.TABLE_{op_idx}",
"DOWNSTREAM_TABLE_DOMAIN": "TABLE",
"UPSTREAM_TABLES": json.dumps(
[
{
"upstream_object_name": "TEST_DB.TEST_SCHEMA.TABLE_2",
"upstream_object_domain": "TABLE",
"query_id": f"01b2576e-0804-4957-0034-7d83066cd0ee{op_idx}",
},
]
+ ( # This additional upstream is only for TABLE_1
[
{
"upstream_object_name": "OTHER_DB.OTHER_SCHEMA.TABLE_1",
"upstream_object_domain": "TABLE",
"query_id": f"01b2576e-0804-4957-0034-7d83066cd0ee{op_idx}",
},
]
if op_idx == 1
else []
)
),
"QUERIES": json.dumps(
[
{
"query_text": f"INSERT INTO TEST_DB.TEST_SCHEMA.TABLE_{op_idx} SELECT * FROM TEST_DB.TEST_SCHEMA.TABLE_2",
"query_id": f"01b2576e-0804-4957-0034-7d83066cd0ee{op_idx}",
"start_time": (
datetime(2022, 6, 6, 0, 0, 0, 0)
.replace(tzinfo=timezone.utc)
.date()
.isoformat()
),
}
]
),
}
for op_idx in range(1, num_ops + 1)
]
elif query in [
snowflake_query.SnowflakeQuery.show_external_tables(),
snowflake_query.SnowflakeQuery.copy_lineage_history(
start_time_millis=1654473600000, end_time_millis=1654621200000
),
snowflake_query.SnowflakeQuery.copy_lineage_history(
start_time_millis=1654473600000, end_time_millis=1654586220000
),
]:
return []
elif query == snowflake_query.SnowflakeQuery.get_all_tags():
return [
*[
{
"TAG_DATABASE": "TEST_DB",
"TAG_SCHEMA": "TEST_SCHEMA",
"TAG_NAME": f"my_tag_{ix}",
}
for ix in range(3)
],
{
"TAG_DATABASE": "TEST_DB",
"TAG_SCHEMA": "TEST_SCHEMA",
"TAG_NAME": "security",
},
{
"TAG_DATABASE": "OTHER_DB",
"TAG_SCHEMA": "OTHER_SCHEMA",
"TAG_NAME": "my_other_tag",
},
]
elif (
query
== snowflake_query.SnowflakeQuery.get_all_tags_in_database_without_propagation(
"TEST_DB"
)
):
return [
*[
{
"TAG_DATABASE": "TEST_DB",
"TAG_SCHEMA": "TEST_SCHEMA",
"TAG_NAME": f"my_tag_{ix}",
"TAG_VALUE": f"my_value_{ix}",
"OBJECT_DATABASE": "TEST_DB",
"OBJECT_SCHEMA": "TEST_SCHEMA",
"OBJECT_NAME": "VIEW_2",
"COLUMN_NAME": None,
"DOMAIN": "TABLE",
}
for ix in range(3)
],
{
"TAG_DATABASE": "TEST_DB",
"TAG_SCHEMA": "TEST_SCHEMA",
"TAG_NAME": "security",
"TAG_VALUE": "pii",
"OBJECT_DATABASE": "TEST_DB",
"OBJECT_SCHEMA": "TEST_SCHEMA",
"OBJECT_NAME": "VIEW_1",
"COLUMN_NAME": "COL_1",
"DOMAIN": "COLUMN",
},
{
"TAG_DATABASE": "OTHER_DB",
"TAG_SCHEMA": "OTHER_SCHEMA",
"TAG_NAME": "my_other_tag",
"TAG_VALUE": "other",
"OBJECT_DATABASE": "TEST_DB",
"OBJECT_SCHEMA": None,
"OBJECT_NAME": "TEST_SCHEMA",
"COLUMN_NAME": None,
"DOMAIN": "SCHEMA",
},
{
"TAG_DATABASE": "OTHER_DB",
"TAG_SCHEMA": "OTHER_SCHEMA",
"TAG_NAME": "my_other_tag",
"TAG_VALUE": "other",
"OBJECT_DATABASE": None,
"OBJECT_SCHEMA": None,
"OBJECT_NAME": "TEST_DB",
"COLUMN_NAME": None,
"DOMAIN": "DATABASE",
},
]
elif query == SnowflakeQuery.procedures_for_database("TEST_DB"):
return [
{
"PROCEDURE_CATALOG": "TEST_DB",
"PROCEDURE_SCHEMA": "TEST_SCHEMA",
"PROCEDURE_NAME": "my_procedure",
"PROCEDURE_LANGUAGE": "SQL",
"ARGUMENT_SIGNATURE": "(arg1 VARCHAR, arg2 VARCHAR)",
"PROCEDURE_RETURN_TYPE": "VARCHAR",
"PROCEDURE_DEFINITION": "BEGIN RETURN 'Hello World'; END",
"CREATED": "2021-01-01T00:00:00.000Z",
"LAST_ALTERED": "2021-01-01T00:00:00.000Z",
"COMMENT": "This is a test procedure",
},
{
"PROCEDURE_CATALOG": "TEST_DB",
"PROCEDURE_SCHEMA": "TEST_SCHEMA",
"PROCEDURE_NAME": "my_procedure",
"PROCEDURE_LANGUAGE": "SQL",
"ARGUMENT_SIGNATURE": "(arg1 VARCHAR)",
"PROCEDURE_RETURN_TYPE": "VARCHAR",
"PROCEDURE_DEFINITION": "BEGIN RETURN 'Hello World'; END",
"CREATED": "2021-01-01T00:00:00.000Z",
"LAST_ALTERED": "2021-01-01T00:00:00.000Z",
"COMMENT": "This is a test procedure 2",
},
]
raise ValueError(f"Unexpected query: {query}")