mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-10-02 04:13:17 +00:00
Fix post profiler time series migration (#23444)
* Fix post profiler time series migration * add prefix index * Update prefix index length for entityFQNHash * Update prefix index length for LIKE queries * add prefix index * fix mysql syntax
This commit is contained in:
parent
8528625ae9
commit
ffc4dcf643
@ -1,23 +0,0 @@
|
||||
-- Migrate individual profile data to EntityProfile format
|
||||
UPDATE profiler_data_time_series pdts
|
||||
INNER JOIN table_entity te ON (
|
||||
pdts.entityFQNHash = te.fqnHash OR
|
||||
pdts.entityFQNHash LIKE CONCAT(te.fqnHash, '.%')
|
||||
)
|
||||
SET pdts.json = JSON_OBJECT(
|
||||
'id', UUID(),
|
||||
'entityReference', JSON_OBJECT(
|
||||
'id', te.json -> '$.id',
|
||||
'type', 'table',
|
||||
'fullyQualifiedName', te.json -> '$.fullyQualifiedName',
|
||||
'name', te.name
|
||||
),
|
||||
'timestamp', pdts.timestamp,
|
||||
'profileData', pdts.json,
|
||||
'profileType',
|
||||
(CASE pdts.extension
|
||||
WHEN 'table.tableProfile' THEN 'table'
|
||||
WHEN 'table.columnProfile' THEN 'column'
|
||||
WHEN 'table.systemProfile' THEN 'system'
|
||||
END)
|
||||
);
|
@ -1,24 +0,0 @@
|
||||
-- Migrate individual profile data to EntityProfile format
|
||||
UPDATE profiler_data_time_series pdts
|
||||
SET json = jsonb_build_object(
|
||||
'id', gen_random_uuid(),
|
||||
'entityReference', jsonb_build_object(
|
||||
'id', te.json ->> 'id',
|
||||
'type', 'table',
|
||||
'fullyQualifiedName', te.json ->> 'fullyQualifiedName',
|
||||
'name', te.name
|
||||
),
|
||||
'timestamp', pdts.timestamp,
|
||||
'profileData', pdts.json,
|
||||
'profileType',
|
||||
(CASE pdts.extension
|
||||
WHEN 'table.tableProfile' THEN 'table'
|
||||
WHEN 'table.columnProfile' THEN 'column'
|
||||
WHEN 'table.systemProfile' THEN 'system'
|
||||
END)
|
||||
)
|
||||
FROM table_entity te
|
||||
WHERE (
|
||||
pdts.entityFQNHash = te.fqnHash OR
|
||||
pdts.entityFQNHash LIKE CONCAT(te.fqnHash, '.%')
|
||||
);
|
@ -0,0 +1,97 @@
|
||||
-- Create indexes for better performance
|
||||
CREATE INDEX idx_pdts_entityFQNHash ON profiler_data_time_series(entityFQNHash);
|
||||
CREATE INDEX idx_pdts_extension ON profiler_data_time_series(extension);
|
||||
CREATE INDEX idx_te_fqnHash ON table_entity(fqnHash);
|
||||
|
||||
-- Add prefix index for LIKE queries (service.database.schema.table = 4 MD5 hashes + 3 dots = 132 chars)
|
||||
CREATE INDEX idx_pdts_entityFQNHash_prefix ON profiler_data_time_series(entityFQNHash(132));
|
||||
|
||||
-- Add composite index for better join performance
|
||||
CREATE INDEX idx_pdts_composite ON profiler_data_time_series(extension, entityFQNHash);
|
||||
|
||||
-- Analyze tables for query optimizer (MySQL 8.0+)
|
||||
ANALYZE TABLE profiler_data_time_series;
|
||||
ANALYZE TABLE table_entity;
|
||||
|
||||
-- Migrate table profiles (direct match)
|
||||
UPDATE profiler_data_time_series pdts
|
||||
INNER JOIN table_entity te ON pdts.entityFQNHash = te.fqnHash
|
||||
SET pdts.json = JSON_OBJECT(
|
||||
'id', UUID(),
|
||||
'entityReference', JSON_OBJECT(
|
||||
'id', te.json -> '$.id',
|
||||
'type', 'table',
|
||||
'fullyQualifiedName', te.json -> '$.fullyQualifiedName',
|
||||
'name', te.name
|
||||
),
|
||||
'timestamp', pdts.timestamp,
|
||||
'profileData', pdts.json,
|
||||
'profileType', 'table'
|
||||
)
|
||||
WHERE pdts.extension = 'table.tableProfile';
|
||||
|
||||
-- Migrate system profiles (direct match)
|
||||
UPDATE profiler_data_time_series pdts
|
||||
INNER JOIN table_entity te ON pdts.entityFQNHash = te.fqnHash
|
||||
SET pdts.json = JSON_OBJECT(
|
||||
'id', UUID(),
|
||||
'entityReference', JSON_OBJECT(
|
||||
'id', te.json -> '$.id',
|
||||
'type', 'table',
|
||||
'fullyQualifiedName', te.json -> '$.fullyQualifiedName',
|
||||
'name', te.name
|
||||
),
|
||||
'timestamp', pdts.timestamp,
|
||||
'profileData', pdts.json,
|
||||
'profileType', 'system'
|
||||
)
|
||||
WHERE pdts.extension = 'table.systemProfile';
|
||||
|
||||
-- Migrate column profiles using temporary mapping table for better performance
|
||||
-- Create temporary mapping table to extract table hash from column hash
|
||||
CREATE TEMPORARY TABLE IF NOT EXISTS column_to_table_mapping (
|
||||
column_hash VARCHAR(768) PRIMARY KEY,
|
||||
table_hash VARCHAR(768),
|
||||
INDEX idx_table_hash (table_hash)
|
||||
) ENGINE=MEMORY;
|
||||
|
||||
-- Populate mapping by extracting table hash (everything before the last dot)
|
||||
INSERT INTO column_to_table_mapping (column_hash, table_hash)
|
||||
SELECT DISTINCT
|
||||
pdts.entityFQNHash as column_hash,
|
||||
SUBSTRING_INDEX(pdts.entityFQNHash, '.', 4) as table_hash
|
||||
FROM profiler_data_time_series pdts
|
||||
WHERE pdts.extension = 'table.columnProfile'
|
||||
AND CHAR_LENGTH(pdts.entityFQNHash) - CHAR_LENGTH(REPLACE(pdts.entityFQNHash, '.', '')) >= 4;
|
||||
|
||||
-- Update column profiles using the mapping (much faster than LIKE)
|
||||
UPDATE profiler_data_time_series pdts
|
||||
INNER JOIN column_to_table_mapping ctm ON pdts.entityFQNHash = ctm.column_hash
|
||||
INNER JOIN table_entity te ON ctm.table_hash = te.fqnHash
|
||||
SET pdts.json = JSON_OBJECT(
|
||||
'id', UUID(),
|
||||
'entityReference', JSON_OBJECT(
|
||||
'id', te.json -> '$.id',
|
||||
'type', 'table',
|
||||
'fullyQualifiedName', te.json -> '$.fullyQualifiedName',
|
||||
'name', te.name
|
||||
),
|
||||
'timestamp', pdts.timestamp,
|
||||
'profileData', pdts.json,
|
||||
'profileType', 'column'
|
||||
)
|
||||
WHERE pdts.extension = 'table.columnProfile';
|
||||
|
||||
-- Clean up temporary table
|
||||
DROP TEMPORARY TABLE IF EXISTS column_to_table_mapping;
|
||||
|
||||
-- Drop temporary indexes after migration
|
||||
DROP INDEX idx_pdts_entityFQNHash ON profiler_data_time_series;
|
||||
DROP INDEX idx_pdts_entityFQNHash_prefix ON profiler_data_time_series;
|
||||
DROP INDEX idx_pdts_extension ON profiler_data_time_series;
|
||||
DROP INDEX idx_te_fqnHash ON table_entity;
|
||||
DROP INDEX idx_pdts_composite ON profiler_data_time_series;
|
||||
|
||||
-- Analyze tables after migration for updated statistics
|
||||
ANALYZE TABLE profiler_data_time_series;
|
||||
ANALYZE TABLE table_entity;
|
@ -0,0 +1,118 @@
|
||||
-- Create indexes for better performance
|
||||
CREATE INDEX IF NOT EXISTS idx_pdts_entityFQNHash ON profiler_data_time_series(entityFQNHash);
|
||||
CREATE INDEX IF NOT EXISTS idx_pdts_extension ON profiler_data_time_series(extension);
|
||||
CREATE INDEX IF NOT EXISTS idx_te_fqnHash ON table_entity(fqnHash);
|
||||
|
||||
-- Add prefix index for LIKE queries (service.database.schema.table = 4 MD5 hashes + 3 dots = 132 chars)
|
||||
CREATE INDEX IF NOT EXISTS idx_pdts_entityFQNHash_prefix ON profiler_data_time_series(substring(entityFQNHash, 1, 132));
|
||||
|
||||
-- Add composite index for better join performance
|
||||
CREATE INDEX IF NOT EXISTS idx_pdts_composite ON profiler_data_time_series(extension, entityFQNHash);
|
||||
|
||||
-- Analyze tables for query planner optimization
|
||||
ANALYZE profiler_data_time_series;
|
||||
ANALYZE table_entity;
|
||||
|
||||
-- Set work_mem higher temporarily for better sort performance (session level)
|
||||
SET LOCAL work_mem = '256MB';
|
||||
SET LOCAL maintenance_work_mem = '512MB';
|
||||
|
||||
-- Migrate table profiles (direct match)
|
||||
UPDATE profiler_data_time_series pdts
|
||||
SET json = jsonb_build_object(
|
||||
'id', gen_random_uuid(),
|
||||
'entityReference', jsonb_build_object(
|
||||
'id', te.json ->> 'id',
|
||||
'type', 'table',
|
||||
'fullyQualifiedName', te.json ->> 'fullyQualifiedName',
|
||||
'name', te.name
|
||||
),
|
||||
'timestamp', pdts.timestamp,
|
||||
'profileData', pdts.json,
|
||||
'profileType', 'table'
|
||||
)
|
||||
FROM table_entity te
|
||||
WHERE pdts.entityFQNHash = te.fqnHash
|
||||
AND pdts.extension = 'table.tableProfile';
|
||||
|
||||
-- Migrate system profiles (direct match)
|
||||
UPDATE profiler_data_time_series pdts
|
||||
SET json = jsonb_build_object(
|
||||
'id', gen_random_uuid(),
|
||||
'entityReference', jsonb_build_object(
|
||||
'id', te.json ->> 'id',
|
||||
'type', 'table',
|
||||
'fullyQualifiedName', te.json ->> 'fullyQualifiedName',
|
||||
'name', te.name
|
||||
),
|
||||
'timestamp', pdts.timestamp,
|
||||
'profileData', pdts.json,
|
||||
'profileType', 'system'
|
||||
)
|
||||
FROM table_entity te
|
||||
WHERE pdts.entityFQNHash = te.fqnHash
|
||||
AND pdts.extension = 'table.systemProfile';
|
||||
|
||||
-- Migrate column profiles using temporary mapping table for better performance
|
||||
-- Use UNLOGGED table for memory-like performance (no WAL writes)
|
||||
CREATE UNLOGGED TABLE IF NOT EXISTS column_to_table_mapping (
|
||||
column_hash VARCHAR(768) PRIMARY KEY,
|
||||
table_hash VARCHAR(768)
|
||||
);
|
||||
CREATE INDEX idx_ctm_table_hash ON column_to_table_mapping(table_hash);
|
||||
|
||||
-- Optimize for in-memory operations
|
||||
ALTER TABLE column_to_table_mapping SET (autovacuum_enabled = false);
|
||||
SET LOCAL temp_buffers = '256MB'; -- Increase temp buffer size
|
||||
SET LOCAL work_mem = '256MB'; -- Already set above but ensuring it's set
|
||||
|
||||
-- Populate mapping by extracting table hash (first 4 dot-separated parts)
|
||||
INSERT INTO column_to_table_mapping (column_hash, table_hash)
|
||||
SELECT DISTINCT
|
||||
pdts.entityFQNHash as column_hash,
|
||||
SPLIT_PART(pdts.entityFQNHash, '.', 1) || '.' ||
|
||||
SPLIT_PART(pdts.entityFQNHash, '.', 2) || '.' ||
|
||||
SPLIT_PART(pdts.entityFQNHash, '.', 3) || '.' ||
|
||||
SPLIT_PART(pdts.entityFQNHash, '.', 4) as table_hash
|
||||
FROM profiler_data_time_series pdts
|
||||
WHERE pdts.extension = 'table.columnProfile'
|
||||
AND ARRAY_LENGTH(STRING_TO_ARRAY(pdts.entityFQNHash, '.'), 1) >= 5;
|
||||
|
||||
-- Update column profiles using the mapping (much faster than LIKE)
|
||||
UPDATE profiler_data_time_series pdts
|
||||
SET json = jsonb_build_object(
|
||||
'id', gen_random_uuid(),
|
||||
'entityReference', jsonb_build_object(
|
||||
'id', te.json ->> 'id',
|
||||
'type', 'table',
|
||||
'fullyQualifiedName', te.json ->> 'fullyQualifiedName',
|
||||
'name', te.name
|
||||
),
|
||||
'timestamp', pdts.timestamp,
|
||||
'profileData', pdts.json,
|
||||
'profileType', 'column'
|
||||
)
|
||||
FROM column_to_table_mapping ctm
|
||||
INNER JOIN table_entity te ON ctm.table_hash = te.fqnHash
|
||||
WHERE pdts.entityFQNHash = ctm.column_hash
|
||||
AND pdts.extension = 'table.columnProfile';
|
||||
|
||||
-- Clean up temporary table
|
||||
DROP TABLE IF EXISTS column_to_table_mapping;
|
||||
|
||||
-- Reset temp buffers
|
||||
RESET temp_buffers;
|
||||
|
||||
-- Drop temporary indexes after migration
|
||||
DROP INDEX IF EXISTS idx_pdts_entityFQNHash;
|
||||
DROP INDEX IF EXISTS idx_pdts_entityFQNHash_prefix;
|
||||
DROP INDEX IF EXISTS idx_pdts_extension;
|
||||
DROP INDEX IF EXISTS idx_te_fqnHash;
|
||||
DROP INDEX IF EXISTS idx_pdts_composite;
|
||||
|
||||
-- Reset work_mem to default
|
||||
RESET work_mem;
|
||||
RESET maintenance_work_mem;
|
||||
|
||||
-- Analyze tables after migration for updated statistics
|
||||
ANALYZE profiler_data_time_series;
|
Loading…
x
Reference in New Issue
Block a user