280 lines
8.2 KiB
Python

"""
Generates data for performance testing of warehouse sources.
In the future, we could try to create a more realistic dataset
by anonymizing and reduplicating a production datahub instance's data.
We could also get more human data by using Faker.
This is a work in progress, built piecemeal as needed.
"""
import random
from abc import ABCMeta, abstractmethod
from collections import OrderedDict
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from typing import Collection, Iterable, List, Optional, TypeVar, Union, cast
from faker import Faker
from tests.performance.data_model import (
Column,
ColumnType,
Container,
FieldAccess,
Query,
StatementType,
Table,
View,
)
T = TypeVar("T")
OPERATION_TYPES: List[StatementType] = [
"INSERT",
"UPDATE",
"DELETE",
"CREATE",
"ALTER",
"DROP",
"CUSTOM",
"UNKNOWN",
]
ID_COLUMN = "id" # Use to allow joins between all tables
class Distribution(metaclass=ABCMeta):
@abstractmethod
def _sample(self) -> int:
raise NotImplementedError
def sample(
self, *, floor: Optional[int] = None, ceiling: Optional[int] = None
) -> int:
value = self._sample()
if floor is not None:
value = max(value, floor)
if ceiling is not None:
value = min(value, ceiling)
return value
@dataclass(frozen=True)
class NormalDistribution(Distribution):
mu: float
sigma: float
def _sample(self) -> int:
return int(random.gauss(mu=self.mu, sigma=self.sigma))
@dataclass(frozen=True)
class LomaxDistribution(Distribution):
"""See https://en.wikipedia.org/wiki/Lomax_distribution.
Equivalent to pareto(scale, shape) - scale; scale * beta_prime(1, shape)
"""
scale: float
shape: float
def _sample(self) -> int:
return int(self.scale * (random.paretovariate(self.shape) - 1))
@dataclass
class SeedMetadata:
# Each list is a layer of containers, e.g. [[databases], [schemas]]
containers: List[List[Container]]
tables: List[Table]
views: List[View]
start_time: datetime
end_time: datetime
@property
def all_tables(self) -> List[Table]:
return self.tables + cast(List[Table], self.views)
def generate_data(
num_containers: Union[List[int], int],
num_tables: int,
num_views: int,
columns_per_table: Distribution = NormalDistribution(5, 2),
parents_per_view: Distribution = NormalDistribution(2, 1),
view_definition_length: Distribution = NormalDistribution(150, 50),
time_range: timedelta = timedelta(days=14),
) -> SeedMetadata:
# Assemble containers
if isinstance(num_containers, int):
num_containers = [num_containers]
containers: List[List[Container]] = []
for i, num_in_layer in enumerate(num_containers):
layer = [
Container(
f"{_container_type(i)}_{j}",
parent=random.choice(containers[-1]) if containers else None,
)
for j in range(num_in_layer)
]
containers.append(layer)
# Assemble tables and views, lineage, and definitions
tables = [
_generate_table(i, containers[-1], columns_per_table) for i in range(num_tables)
]
views = [
View(
**{ # type: ignore
**_generate_table(i, containers[-1], columns_per_table).__dict__,
"name": f"view_{i}",
"definition": f"--{'*' * view_definition_length.sample(floor=0)}",
},
)
for i in range(num_views)
]
for view in views:
view.upstreams = random.sample(tables, k=parents_per_view.sample(floor=1))
generate_lineage(tables, views)
now = datetime.now(tz=timezone.utc)
return SeedMetadata(
containers=containers,
tables=tables,
views=views,
start_time=now - time_range,
end_time=now,
)
def generate_lineage(
tables: Collection[Table],
views: Collection[Table],
# Percentiles: 75th=0, 80th=1, 95th=2, 99th=4, 99.99th=15
upstream_distribution: Distribution = LomaxDistribution(scale=3, shape=5),
) -> None:
num_upstreams = [upstream_distribution.sample(ceiling=100) for _ in tables]
# Prioritize tables with a lot of upstreams themselves
factor = 1 + len(tables) // 10
table_weights = [1 + (num_upstreams[i] * factor) for i in range(len(tables))]
view_weights = [1] * len(views)
# TODO: Python 3.9 use random.sample with counts
sample = []
for table, weight in zip(tables, table_weights):
for _ in range(weight):
sample.append(table)
for view, weight in zip(views, view_weights):
for _ in range(weight):
sample.append(view)
for i, table in enumerate(tables):
table.upstreams = random.sample( # type: ignore
sample,
k=num_upstreams[i],
)
def generate_queries(
seed_metadata: SeedMetadata,
num_selects: int,
num_operations: int,
num_unique_queries: int,
num_users: int,
tables_per_select: NormalDistribution = NormalDistribution(3, 5),
columns_per_select: NormalDistribution = NormalDistribution(10, 5),
upstream_tables_per_operation: NormalDistribution = NormalDistribution(2, 2),
query_length: NormalDistribution = NormalDistribution(100, 50),
) -> Iterable[Query]:
faker = Faker()
query_texts = [
faker.paragraph(query_length.sample(floor=30) // 30)
for _ in range(num_unique_queries)
]
all_tables = seed_metadata.tables + seed_metadata.views
users = [f"user_{i}@xyz.com" for i in range(num_users)]
for _ in range(num_selects): # Pure SELECT statements
tables = _sample_list(all_tables, tables_per_select)
all_columns = [
FieldAccess(column, table) for table in tables for column in table.columns
]
yield Query(
text=random.choice(query_texts),
type="SELECT",
actor=random.choice(users),
timestamp=_random_time_between(
seed_metadata.start_time, seed_metadata.end_time
),
fields_accessed=_sample_list(all_columns, columns_per_select),
)
for _ in range(num_operations):
modified_table = random.choice(seed_metadata.tables)
n_col = len(modified_table.columns)
num_columns_modified = NormalDistribution(n_col / 2, n_col / 2)
upstream_tables = _sample_list(all_tables, upstream_tables_per_operation)
all_columns = [
FieldAccess(column, table)
for table in upstream_tables
for column in table.columns
]
yield Query(
text=random.choice(query_texts),
type=random.choice(OPERATION_TYPES),
actor=random.choice(users),
timestamp=_random_time_between(
seed_metadata.start_time, seed_metadata.end_time
),
# Can have no field accesses, e.g. on a standard INSERT
fields_accessed=_sample_list(all_columns, num_columns_modified, 0),
object_modified=modified_table,
)
def _container_type(i: int) -> str:
if i == 0:
return "database"
elif i == 1:
return "schema"
else:
return f"{i}container"
def _generate_table(
i: int, parents: List[Container], columns_per_table: Distribution
) -> Table:
num_columns = columns_per_table.sample(floor=1)
columns = OrderedDict({ID_COLUMN: Column(ID_COLUMN, ColumnType.INTEGER, False)})
for j in range(num_columns):
name = f"column_{j}"
columns[name] = Column(
name=name,
type=random.choice(list(ColumnType)),
nullable=random.random() < 0.1, # Fixed 10% chance for now
)
return Table(
f"table_{i}",
container=random.choice(parents),
columns=columns,
upstreams=[],
)
def _sample_list(lst: List[T], dist: NormalDistribution, floor: int = 1) -> List[T]:
return random.sample(lst, min(dist.sample(floor=floor), len(lst)))
def _random_time_between(start: datetime, end: datetime) -> datetime:
return start + timedelta(seconds=(end - start).total_seconds() * random.random())
if __name__ == "__main__":
z = generate_data(10, 1000, 10)