Skip to content

Dummy Data Generation

Synthetic dataset generation utilities.


Main API

csvw_eo.make_dummy_from_metadata

CSVW-EO Dummy Dataset Generator.

This module generates a synthetic dummy dataset from CSVW-EO metadata. It is intended for testing pipelines and validating metadata structures.

The generator supports: - categorical partitions - numeric partitions - datetime ranges - nullable proportions - column groups (joint partitions)

The resulting dataset respects the structural information contained in CSVW-EO metadata but does not guarantee semantic correctness.

apply_nulls_dataframe(df: pd.DataFrame, columns_meta: list[dict[str, Any]], rng: np.random.Generator) -> pd.DataFrame

Apply missing values (nulls) to a dataframe according to metadata.

Each column is assigned null values based on its configured null proportion and datatype-specific null handling strategy.

Parameters:

Name Type Description Default
df DataFrame

Input dataframe to modify.

required
columns_meta list[dict[str, Any]]

Metadata describing each column, including null proportions and datatype information.

required
rng Generator

NumPy random number generator used for stochastic null injection.

required

Returns:

Type Description
DataFrame

Dataframe with null values applied according to metadata rules.

Source code in csvw-eo-library/src/csvw_eo/make_dummy_from_metadata.py
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
def apply_nulls_dataframe(
    df: pd.DataFrame, columns_meta: list[dict[str, Any]], rng: np.random.Generator
) -> pd.DataFrame:
    """
    Apply missing values (nulls) to a dataframe according to metadata.

    Each column is assigned null values based on its configured null
    proportion and datatype-specific null handling strategy.

    Parameters
    ----------
    df : pd.DataFrame
        Input dataframe to modify.
    columns_meta : list[dict[str, Any]]
        Metadata describing each column, including null proportions
        and datatype information.
    rng : np.random.Generator
        NumPy random number generator used for stochastic null injection.

    Returns
    -------
    pd.DataFrame
        Dataframe with null values applied according to metadata rules.

    """
    columns_meta_map = {c[COL_NAME]: c for c in columns_meta}
    for col in df.columns:
        series = df[col]
        col_meta = columns_meta_map[col]
        nullable_prop = col_meta.get(NULL_PROP, 0)
        datatype = col_meta[DATATYPE]
        df[col] = apply_nulls_serie(series, nullable_prop, datatype, rng)
    return df

apply_nulls_serie(series: pd.Series, nullable_prop: float, datatype: DataTypes, rng: np.random.Generator) -> pd.Series

Inject null values into a column according to metadata.

Parameters:

Name Type Description Default
series Series

Column values.

required
nullable_prop float

Proportion of null values.

required
datatype DataTypes

Column datatype.

required
rng Generator

Random number generator.

required

Returns:

Type Description
Series

Column with nulls applied.

Source code in csvw-eo-library/src/csvw_eo/make_dummy_from_metadata.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def apply_nulls_serie(
    series: pd.Series,
    nullable_prop: float,
    datatype: DataTypes,
    rng: np.random.Generator,
) -> pd.Series:
    """
    Inject null values into a column according to metadata.

    Parameters
    ----------
    series : pd.Series
        Column values.
    nullable_prop : float
        Proportion of null values.
    datatype : DataTypes
        Column datatype.
    rng : numpy.random.Generator
        Random number generator.

    Returns
    -------
    pandas.Series
        Column with nulls applied.

    """
    if nullable_prop <= 0:
        return series

    n_null = max(1, int(len(series) * nullable_prop))
    idx = rng.choice(series.index, size=n_null, replace=False)

    group = XSD_GROUP_MAP[datatype]
    if group == DataTypesGroups.DATETIME:
        series.loc[idx] = pd.NaT
    else:
        series.loc[idx] = pd.NA

    return series

build_generation_order(depends_map: dict[str, list[dict[str, Any]]]) -> list[str]

Compute a deterministic column generation order based on dependencies.

This function attempts to order columns such that each column appears after all the columns it depends on (i.e., a topological ordering).

The input is a mapping of column names to a list of dependency specifications. Each dependency dict may contain a DEPENDS_ON key indicating another column that must be generated first.

Behavior
  1. A dependency graph is built where each column maps to the set of columns it depends on.
  2. Self-dependencies are ignored.
  3. Columns are iteratively selected:
  4. Prefer columns whose dependencies are already resolved.
  5. Among those, selection is deterministic (alphabetical order).
  6. If no such column exists (e.g., due to cycles or missing dependencies), a fallback strategy is used:
  7. Select the column with the fewest dependencies.
  8. Break ties alphabetically.

This ensures the function always returns a complete, deterministic order, even when the dependency graph is not a valid DAG.

Parameters:

Name Type Description Default
depends_map dict[str, list[dict[str, Any]]]

Mapping of column name to a list of dependency definitions. Each dependency dict may include a DEPENDS_ON key referencing another column.

required

Returns:

Type Description
list[str]

A list of column names in generation order. Dependencies will appear before dependents whenever possible.

Notes
  • Cycles are not explicitly detected or reported. Instead, they are handled via the fallback strategy.
  • Missing dependency references (i.e., dependencies not present as keys in depends_map) are treated as unresolved and may influence ordering.
  • The output is stable and deterministic for a given input.
Source code in csvw-eo-library/src/csvw_eo/make_dummy_from_metadata.py
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
def build_generation_order(depends_map: dict[str, list[dict[str, Any]]]) -> list[str]:
    """
    Compute a deterministic column generation order based on dependencies.

    This function attempts to order columns such that each column appears
    *after* all the columns it depends on (i.e., a topological ordering).

    The input is a mapping of column names to a list of dependency
    specifications. Each dependency dict may contain a `DEPENDS_ON` key
    indicating another column that must be generated first.

    Behavior
    --------
    1. A dependency graph is built where each column maps to the set of
       columns it depends on.
    2. Self-dependencies are ignored.
    3. Columns are iteratively selected:
       - Prefer columns whose dependencies are already resolved.
       - Among those, selection is deterministic (alphabetical order).
    4. If no such column exists (e.g., due to cycles or missing dependencies),
       a fallback strategy is used:
       - Select the column with the fewest dependencies.
       - Break ties alphabetically.

    This ensures the function always returns a complete, deterministic order,
    even when the dependency graph is not a valid DAG.

    Parameters
    ----------
    depends_map : dict[str, list[dict[str, Any]]]
        Mapping of column name to a list of dependency definitions.
        Each dependency dict may include a `DEPENDS_ON` key referencing
        another column.

    Returns
    -------
    list[str]
        A list of column names in generation order. Dependencies will
        appear before dependents whenever possible.

    Notes
    -----
    - Cycles are not explicitly detected or reported. Instead, they are
      handled via the fallback strategy.
    - Missing dependency references (i.e., dependencies not present as keys
      in `depends_map`) are treated as unresolved and may influence ordering.
    - The output is stable and deterministic for a given input.

    """
    # Build graph: col -> set(of columns it depends on)
    graph: dict[str, set[str]] = {
        col: {dep[DEPENDS_ON] for dep in deps if DEPENDS_ON in dep} for col, deps in depends_map.items()
    }

    # remove self-dependencies
    for col, _ in graph.items():
        graph[col].discard(col)

    remaining = set(graph)
    resolved: set[str] = set()
    order: list[str] = []

    while remaining:
        ready = sorted(c for c in remaining if graph[c].issubset(resolved))

        node = ready[0] if ready else min(remaining, key=lambda c: (len(graph[c]), c))

        order.append(node)
        resolved.add(node)
        remaining.remove(node)

    return order

column_group_partitions(df: pd.DataFrame, columns_group_meta: list[dict[str, Any]]) -> pd.DataFrame

Filter a dataframe to keep only rows belonging to allowed column-group partitions.

The function builds a global boolean mask by combining per-column-group partition or key constraints, depending on metadata configuration.

Parameters:

Name Type Description Default
df DataFrame

Input dataframe to filter.

required
columns_group_meta list[dict[str, Any]]

Metadata describing column groups and their partitioning rules.

required

Returns:

Type Description
DataFrame

Filtered dataframe containing only rows that satisfy all group constraints.

Source code in csvw-eo-library/src/csvw_eo/make_dummy_from_metadata.py
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
def column_group_partitions(
    df: pd.DataFrame,
    columns_group_meta: list[dict[str, Any]],
) -> pd.DataFrame:
    """
    Filter a dataframe to keep only rows belonging to allowed column-group partitions.

    The function builds a global boolean mask by combining per-column-group
    partition or key constraints, depending on metadata configuration.

    Parameters
    ----------
    df : pd.DataFrame
        Input dataframe to filter.
    columns_group_meta : list[dict[str, Any]]
        Metadata describing column groups and their partitioning rules.

    Returns
    -------
    pd.DataFrame
        Filtered dataframe containing only rows that satisfy all group constraints.

    """
    global_mask = pd.Series(True, index=df.index)

    for col_group in columns_group_meta:
        group_mask = pd.Series(False, index=df.index)

        if col_group.get(EXHAUSTIVE_PARTITIONS, False):
            # Partitions take precedence over keys
            partitions = col_group.get(PUBLIC_PARTITIONS, [])
            for p in partitions:
                predicate = p.get(PREDICATE, p)
                group_mask |= _predicate_mask(df, predicate)

        elif col_group.get(EXHAUSTIVE_KEYS, False):
            key_values = col_group.get(KEY_VALUES, [])
            for key in key_values:
                group_mask |= _predicate_mask(df, key)

        else:
            continue  # nothing to apply

        global_mask &= group_mask

    return df[global_mask].reset_index(drop=True)

main() -> None

Command-line interface for dummy dataset generation.

Source code in csvw-eo-library/src/csvw_eo/make_dummy_from_metadata.py
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
def main() -> None:
    """Command-line interface for dummy dataset generation."""
    parser = argparse.ArgumentParser(description="Generate a dummy dataset from CSVW-EO metadata.")

    parser.add_argument("metadata_file", type=str)
    parser.add_argument("--rows", type=int, default=100)
    parser.add_argument("--output", type=str, default="dummy.csv")
    parser.add_argument("--seed", type=int, default=0)

    args = parser.parse_args()

    metadata_path = Path(args.metadata_file)

    if not metadata_path.exists():
        raise FileNotFoundError(f"Metadata file not found: {metadata_path}")

    with metadata_path.open("r", encoding="utf-8") as f:
        metadata = json.load(f)

    df_dummy = make_dummy_from_metadata(
        metadata,
        nb_rows=args.rows,
        seed=args.seed,
    )

    df_dummy.to_csv(args.output, index=False)

    print(  # noqa: T201
        f"Dummy dataset written to {args.output} ({len(df_dummy)} rows,{len(df_dummy.columns)} columns)."
    )

make_dummy_from_metadata(metadata: dict[str, Any], nb_rows: int = 100, seed: int = 0) -> pd.DataFrame

Generate a dummy dataset from CSVW-EO metadata, respecting exhaustive column group partitions.

Parameters:

Name Type Description Default
metadata dict

CSVW-EO metadata structure.

required
nb_rows int

Number of rows to generate.

100
seed int

Random seed.

0

Returns:

Type Description
DataFrame

Generated dataset

Source code in csvw-eo-library/src/csvw_eo/make_dummy_from_metadata.py
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
def make_dummy_from_metadata(
    metadata: dict[str, Any],
    nb_rows: int = 100,
    seed: int = 0,
) -> pd.DataFrame:
    """
    Generate a dummy dataset from CSVW-EO metadata, respecting exhaustive column group partitions.

    Parameters
    ----------
    metadata : dict
        CSVW-EO metadata structure.
    nb_rows : int, default=100
        Number of rows to generate.
    seed : int, default=0
        Random seed.

    Returns
    -------
    pandas.DataFrame
        Generated dataset

    """
    rng = np.random.default_rng(seed)

    columns_meta = metadata[TABLE_SCHEMA][COL_LIST]
    columns_group_meta = metadata.get(ADD_INFO, [])

    # Based on dependency, create ordered plan for dummy generation
    depends_map: dict[str, list[dict[str, Any]]] = {
        c[COL_NAME]: list(c.get(ROW_DEP, [])) for c in columns_meta
    }
    order = build_generation_order(depends_map)

    # Generate dataframes until enough rows of existing partitions
    generated: list[pd.DataFrame] = []
    while sum(len(df) for df in generated) < nb_rows:
        df = generate_dataframe(
            depends_map,
            order,
            {c[COL_NAME]: c for c in columns_meta},
            nb_rows,
            rng,
        )

        # Remove partition that do ont exist
        if columns_group_meta:
            df = column_group_partitions(df, columns_group_meta)

        generated.append(df)

    # Format in one dataframe with nb_rows
    output_df = pd.concat(generated, ignore_index=True)
    output_df = output_df.sample(n=nb_rows, random_state=seed)

    # Add nulls where required
    output_df = apply_nulls_dataframe(output_df, columns_meta, rng)

    # Metadata column order
    output_df = output_df.reindex(columns=[c[COL_NAME] for c in columns_meta])

    return output_df.reset_index(drop=True)

Series Generation Utilities

csvw_eo.generate_series

CSVW-EO Dummy Dataset Generator.

This module generates a synthetic dummy dataset from CSVW-EO metadata. It is intended for testing pipelines and validating metadata structures.

The generator supports: - categorical partitions - numeric partitions - datetime ranges - nullable proportions - column groups (joint partitions)

The resulting dataset respects the structural information contained in CSVW-EO metadata but does not guarantee semantic correctness.

bigger_series(depend_serie: pd.Series, col_meta: dict[str, Any], nb_rows: int, rng: np.random.Generator) -> pd.Series

Generate a series where each value is greater than the corresponding value.

in depend_serie, while respecting the original column bounds.

Works for numeric and datetime columns.

Parameters:

Name Type Description Default
depend_serie Series

The series this column depends on.

required
col_meta dict

Metadata for the column, must contain bounds and datatype.

required
nb_rows int

Number of rows to generate.

required
rng Generator

Random number generator for offsets.

required

Returns:

Type Description
Series

Generated series satisfying BIGGER dependency.

Source code in csvw-eo-library/src/csvw_eo/generate_series.py
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
def bigger_series(
    depend_serie: pd.Series,
    col_meta: dict[str, Any],
    nb_rows: int,
    rng: np.random.Generator,
) -> pd.Series:
    """
    Generate a series where each value is greater than the corresponding value.

    in `depend_serie`, while respecting the original column bounds.

    Works for numeric and datetime columns.

    Parameters
    ----------
    depend_serie : pd.Series
        The series this column depends on.
    col_meta : dict
        Metadata for the column, must contain bounds and datatype.
    nb_rows : int
        Number of rows to generate.
    rng : np.random.Generator
        Random number generator for offsets.

    Returns
    -------
    pd.Series
        Generated series satisfying BIGGER dependency.

    """
    lower, upper = get_bounds(col_meta)
    datatype: DataTypes = col_meta[DATATYPE]
    group = XSD_GROUP_MAP[datatype]

    # ---- DATETIME ----
    if group == DataTypesGroups.DATETIME:
        lower_dt, upper_dt = pd.to_datetime(lower), pd.to_datetime(upper)
        total_seconds = (upper_dt - lower_dt).total_seconds()

        offsets: np.ndarray = rng.integers(
            int(0.01 * total_seconds),
            int(0.2 * total_seconds),
            size=nb_rows,
        )

        series = depend_serie + pd.to_timedelta(offsets, unit="s")
        return series.clip(lower=lower_dt, upper=upper_dt)

    # ---- DURATION ----
    if group == DataTypesGroups.DURATION:
        lower_td = pd.to_timedelta(lower)
        upper_td = pd.to_timedelta(upper)

        total_seconds = (upper_td - lower_td).total_seconds()

        offsets = rng.uniform(0.01, 0.2, size=nb_rows) * total_seconds
        series = depend_serie + pd.to_timedelta(offsets, unit="s")

        return series.clip(lower=lower_td, upper=upper_td)

    # ---- INTEGER ----
    if group == DataTypesGroups.INTEGER:
        offsets = rng.integers(int(0.01 * (upper - lower)), int(0.2 * (upper - lower)), size=nb_rows)

        # Ensure depend_serie is integer array
        base = depend_serie.astype("Int64").to_numpy()

        series = pd.Series(base + offsets, index=depend_serie.index, dtype="Int64")
        return series.clip(lower=lower, upper=upper)

    # ---- FLOAT ----
    if group == DataTypesGroups.FLOAT:
        offsets = rng.uniform(0.01, 0.2, size=nb_rows) * float(upper - lower)
        series = pd.Series(
            depend_serie.astype("float64").values + offsets,
            index=depend_serie.index,
            dtype="float64",
        )
        return series.clip(lower=lower, upper=upper)

    raise ValueError(f"BIGGER not supported for datatype {datatype}")

fixed_series(depend_serie: pd.Series, col_meta: dict[str, Any], rng: np.random.Generator) -> pd.Series

Generate a series where each unique entity in depend_serie has a fixed value.

(multi-row fixedPerEntity dependency).

Parameters:

Name Type Description Default
depend_serie Series

Entity identifier series.

required
col_meta dict

Column metadata, must include DATATYPE.

required
rng Generator

Random number generator for value generation.

required

Returns:

Type Description
Series

Series satisfying FIXED dependency.

Source code in csvw-eo-library/src/csvw_eo/generate_series.py
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
def fixed_series(
    depend_serie: pd.Series,
    col_meta: dict[str, Any],
    rng: np.random.Generator,
) -> pd.Series:
    """
    Generate a series where each unique entity in depend_serie has a fixed value.

    (multi-row fixedPerEntity dependency).

    Parameters
    ----------
    depend_serie : pd.Series
        Entity identifier series.
    col_meta : dict
        Column metadata, must include DATATYPE.
    rng : np.random.Generator
        Random number generator for value generation.

    Returns
    -------
    pd.Series
        Series satisfying FIXED dependency.

    """
    value_for_entity = {}
    entity_meta = col_meta.copy()
    entity_meta[NULL_PROP] = 0  # avoid nulls for the entity value

    for ent in depend_serie.unique():
        value_for_entity[ent] = generate_column_series(entity_meta, 1, rng).iloc[0]

    return pd.Series(depend_serie.map(value_for_entity), dtype=to_pandas_dtype(col_meta[DATATYPE]))

generate_boolean_column(nb_rows: int, rng: np.random.Generator) -> pd.Series

Generate a boolean column.

Parameters:

Name Type Description Default
nb_rows int

Number of rows to generate.

required
rng Generator

NumPy random number generator.

required

Returns:

Type Description
Series

Series containing randomly generated boolean values.

Source code in csvw-eo-library/src/csvw_eo/generate_series.py
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
def generate_boolean_column(nb_rows: int, rng: np.random.Generator) -> pd.Series:
    """
    Generate a boolean column.

    Parameters
    ----------
    nb_rows : int
        Number of rows to generate.
    rng : np.random.Generator
        NumPy random number generator.

    Returns
    -------
    pd.Series
        Series containing randomly generated boolean values.

    """
    return pd.Series(rng.choice([True, False], size=nb_rows), dtype="boolean")

generate_column_series(col_meta: dict[str, Any], nb_rows: int, rng: np.random.Generator) -> pd.Series

Generate a column series based on metadata and datatype.

Supports datetime, integer, floating-point, boolean, string, and duration datatypes.

Parameters:

Name Type Description Default
col_meta dict[str, Any]

Column metadata describing datatype and constraints.

required
nb_rows int

Number of rows to generate.

required
rng Generator

NumPy random number generator.

required

Returns:

Type Description
Series

Generated pandas Series with the appropriate datatype.

Raises:

Type Description
ValueError

If the datatype is unknown or unsupported.

Source code in csvw-eo-library/src/csvw_eo/generate_series.py
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
def generate_column_series(
    col_meta: dict[str, Any],
    nb_rows: int,
    rng: np.random.Generator,
) -> pd.Series:
    """
    Generate a column series based on metadata and datatype.

    Supports datetime, integer, floating-point, boolean,
    string, and duration datatypes.

    Parameters
    ----------
    col_meta : dict[str, Any]
        Column metadata describing datatype and constraints.
    nb_rows : int
        Number of rows to generate.
    rng : np.random.Generator
        NumPy random number generator.

    Returns
    -------
    pd.Series
        Generated pandas Series with the appropriate datatype.

    Raises
    ------
    ValueError
        If the datatype is unknown or unsupported.

    """
    datatype: DataTypes = col_meta[DATATYPE]
    group = XSD_GROUP_MAP.get(datatype)

    if group == DataTypesGroups.DATETIME:
        series = generate_datetime_column(col_meta, nb_rows, rng)

    elif group == DataTypesGroups.INTEGER:
        series = generate_integer_column(col_meta, nb_rows, rng)

    elif group == DataTypesGroups.FLOAT:
        series = generate_double_column(col_meta, nb_rows, rng)

    elif group == DataTypesGroups.BOOLEAN:
        series = generate_boolean_column(nb_rows, rng)

    elif group == DataTypesGroups.STRING:
        series = generate_string_column(col_meta, nb_rows, rng)

    elif group == DataTypesGroups.DURATION:
        series = generate_duration_column(col_meta, nb_rows, rng)

    else:
        raise ValueError(f"Unknown datatype {datatype}")

    return series.astype(to_pandas_dtype(datatype))

generate_dataframe(depends_map: dict[str, list[dict[str, Any]]], order: list[str], meta_map: dict[str, dict[str, Any]], nb_rows: int, rng: np.random.Generator) -> pd.DataFrame

Generate a dummy dataframe based on column metadata and dependency rules.

Columns are generated in a specified order, optionally using dependency relationships between columns (e.g., mapping, fixed, or relational constraints).

Parameters:

Name Type Description Default
depends_map dict[str, list[dict[str, Any]]]

Mapping of column names to their dependency definitions. Each dependency may define how a column depends on another column.

required
order list[str]

Ordered list of column names defining generation sequence.

required
meta_map dict[str, dict[str, Any]]

Metadata for each column describing datatype, constraints, and generation rules.

required
nb_rows int

Number of rows to generate.

required
rng Generator

NumPy random number generator used for all stochastic operations.

required

Returns:

Type Description
DataFrame

Generated dataframe with dummy random values respecting metadata and dependency constraints.

Source code in csvw-eo-library/src/csvw_eo/generate_series.py
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
def generate_dataframe(
    depends_map: dict[str, list[dict[str, Any]]],
    order: list[str],
    meta_map: dict[str, dict[str, Any]],
    nb_rows: int,
    rng: np.random.Generator,
) -> pd.DataFrame:
    """
    Generate a dummy dataframe based on column metadata and dependency rules.

    Columns are generated in a specified order, optionally using dependency
    relationships between columns (e.g., mapping, fixed, or relational constraints).

    Parameters
    ----------
    depends_map : dict[str, list[dict[str, Any]]]
        Mapping of column names to their dependency definitions.
        Each dependency may define how a column depends on another column.
    order : list[str]
        Ordered list of column names defining generation sequence.
    meta_map : dict[str, dict[str, Any]]
        Metadata for each column describing datatype, constraints,
        and generation rules.
    nb_rows : int
        Number of rows to generate.
    rng : np.random.Generator
        NumPy random number generator used for all stochastic operations.

    Returns
    -------
    pd.DataFrame
        Generated dataframe with dummy random values respecting metadata
        and dependency constraints.

    """
    data = {}

    for col in order:
        col_meta = meta_map[col]
        deps = depends_map.get(col, [])

        if not deps:
            data[col] = generate_column_series(col_meta, nb_rows, rng)
        else:
            dep = next(
                (d for d in deps if d.get(DEPENDS_ON) in data),
                None,
            )
            if dep is None:
                data[col] = generate_column_series(col_meta, nb_rows, rng)
                continue
            mode = dep.get(DEPENDENCY_TYPE, DependencyType.NO_DEP)
            dep_col = dep.get(DEPENDS_ON, None)

            if mode == DependencyType.MAPPING:
                value_map = dep.get(VALUE_MAP, None)
                data[col] = mapping_series(data[dep_col], value_map, col_meta, rng)

            elif mode == DependencyType.BIGGER:
                data[col] = bigger_series(data[dep_col], col_meta, nb_rows, rng)

            elif mode == DependencyType.FIXED:
                data[col] = fixed_series(data[dep_col], col_meta, rng)

            else:
                data[col] = generate_column_series(col_meta, nb_rows, rng)

    return pd.DataFrame(data)

generate_datetime_column(col_meta: dict[str, Any], nb_rows: int, rng: np.random.Generator) -> pd.Series

Generate a datetime column between minimum and maximum values.

Parameters:

Name Type Description Default
col_meta dict[str, Any]

Column metadata containing datetime bounds.

required
nb_rows int

Number of rows to generate.

required
rng Generator

NumPy random number generator.

required

Returns:

Type Description
Series

Series containing randomly generated datetime values.

Source code in csvw-eo-library/src/csvw_eo/generate_series.py
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def generate_datetime_column(col_meta: dict[str, Any], nb_rows: int, rng: np.random.Generator) -> pd.Series:
    """
    Generate a datetime column between minimum and maximum values.

    Parameters
    ----------
    col_meta : dict[str, Any]
        Column metadata containing datetime bounds.
    nb_rows : int
        Number of rows to generate.
    rng : np.random.Generator
        NumPy random number generator.

    Returns
    -------
    pd.Series
        Series containing randomly generated datetime values.

    """
    lower, upper = get_bounds(col_meta)
    dates = pd.date_range(start=lower, end=upper)
    return pd.Series(rng.choice(dates, size=nb_rows))

generate_double_column(col_meta: dict[str, Any], nb_rows: int, rng: np.random.Generator) -> pd.Series

Generate a floating-point column between minimum and maximum values.

Parameters:

Name Type Description Default
col_meta dict[str, Any]

Column metadata containing numeric bounds.

required
nb_rows int

Number of rows to generate.

required
rng Generator

NumPy random number generator.

required

Returns:

Type Description
Series

Series containing randomly generated floating-point values.

Source code in csvw-eo-library/src/csvw_eo/generate_series.py
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
def generate_double_column(col_meta: dict[str, Any], nb_rows: int, rng: np.random.Generator) -> pd.Series:
    """
    Generate a floating-point column between minimum and maximum values.

    Parameters
    ----------
    col_meta : dict[str, Any]
        Column metadata containing numeric bounds.
    nb_rows : int
        Number of rows to generate.
    rng : np.random.Generator
        NumPy random number generator.

    Returns
    -------
    pd.Series
        Series containing randomly generated floating-point values.

    """
    lower, upper = get_bounds(col_meta)
    return pd.Series(rng.uniform(float(lower), float(upper), size=nb_rows))

generate_duration_column(col_meta: dict[str, Any], nb_rows: int, rng: np.random.Generator) -> pd.Series

Generate a duration column between minimum and maximum values.

Bounds are interpreted as seconds.

Parameters:

Name Type Description Default
col_meta dict[str, Any]

Column metadata containing duration bounds.

required
nb_rows int

Number of rows to generate.

required
rng Generator

NumPy random number generator.

required

Returns:

Type Description
Series

Series containing randomly generated durations.

Source code in csvw-eo-library/src/csvw_eo/generate_series.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def generate_duration_column(col_meta: dict[str, Any], nb_rows: int, rng: np.random.Generator) -> pd.Series:
    """
    Generate a duration column between minimum and maximum values.

    Bounds are interpreted as seconds.

    Parameters
    ----------
    col_meta : dict[str, Any]
        Column metadata containing duration bounds.
    nb_rows : int
        Number of rows to generate.
    rng : np.random.Generator
        NumPy random number generator.

    Returns
    -------
    pd.Series
        Series containing randomly generated durations.

    """
    lower, upper = get_bounds(col_meta)

    # assume bounds in seconds (simplest robust approach)
    values = rng.uniform(float(lower), float(upper), size=nb_rows)

    return pd.to_timedelta(values, unit="s")

generate_integer_column(col_meta: dict[str, Any], nb_rows: int, rng: np.random.Generator) -> pd.Series

Generate an integer column between minimum and maximum values.

The generated values respect the XSD integer subtype constraints.

Parameters:

Name Type Description Default
col_meta dict[str, Any]

Column metadata containing integer bounds and datatype.

required
nb_rows int

Number of rows to generate.

required
rng Generator

NumPy random number generator.

required

Returns:

Type Description
Series

Series containing randomly generated integer values.

Notes

If zero is allowed by the bounds, at least one generated value is forced to be zero.

Source code in csvw-eo-library/src/csvw_eo/generate_series.py
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
def generate_integer_column(col_meta: dict[str, Any], nb_rows: int, rng: np.random.Generator) -> pd.Series:
    """
    Generate an integer column between minimum and maximum values.

    The generated values respect the XSD integer subtype constraints.

    Parameters
    ----------
    col_meta : dict[str, Any]
        Column metadata containing integer bounds and datatype.
    nb_rows : int
        Number of rows to generate.
    rng : np.random.Generator
        NumPy random number generator.

    Returns
    -------
    pd.Series
        Series containing randomly generated integer values.

    Notes
    -----
    If zero is allowed by the bounds, at least one generated value
    is forced to be zero.

    """
    lower, upper = get_bounds(col_meta)
    datatype: DataTypes = col_meta[DATATYPE]

    low = int(lower)
    high = int(upper)

    # Force inclusion of zero for if needed
    if datatype == DataTypes.POSITIVE_INTEGER:
        low = max(0, low)
    elif datatype == DataTypes.NEGATIVE_INTEGER:
        high = min(0, high)

    values = rng.integers(low, high + 1, size=nb_rows)

    # Ensure at least one zero if allowed
    if low <= 0 <= high and nb_rows > 0:
        values[0] = 0

    return pd.Series(values, dtype="Int64")

generate_string_column(col_meta: dict[str, Any], nb_rows: int, rng: np.random.Generator) -> pd.Series

Generate a string column based on partition metadata.

The generated values are selected from public keys, public partitions, or randomly generated strings depending on the available metadata.

Parameters:

Name Type Description Default
col_meta dict[str, Any]

Column metadata describing available partitions or keys.

required
nb_rows int

Number of rows to generate.

required
rng Generator

NumPy random number generator.

required

Returns:

Type Description
Series

Series containing randomly generated string values.

Source code in csvw-eo-library/src/csvw_eo/generate_series.py
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
def generate_string_column(col_meta: dict[str, Any], nb_rows: int, rng: np.random.Generator) -> pd.Series:
    """
    Generate a string column based on partition metadata.

    The generated values are selected from public keys, public
    partitions, or randomly generated strings depending on the
    available metadata.

    Parameters
    ----------
    col_meta : dict[str, Any]
        Column metadata describing available partitions or keys.
    nb_rows : int
        Number of rows to generate.
    rng : np.random.Generator
        NumPy random number generator.

    Returns
    -------
    pd.Series
        Series containing randomly generated string values.

    """
    public_keys_values = []
    if KEY_VALUES in col_meta:
        public_keys_values = col_meta[KEY_VALUES]

        if EXHAUSTIVE_KEYS in col_meta and not col_meta[EXHAUSTIVE_KEYS]:
            diff = col_meta[MAX_NUM_PARTITIONS] - len(col_meta[KEY_VALUES])
            public_keys_values.extend(RANDOM_STRINGS[0:diff])

    elif PUBLIC_PARTITIONS in col_meta:
        for partition in col_meta[PUBLIC_PARTITIONS]:
            public_keys_values.append(partition[PREDICATE][PARTITION_VALUE])

        if EXHAUSTIVE_PARTITIONS in col_meta and not col_meta[EXHAUSTIVE_PARTITIONS]:
            diff = col_meta[MAX_NUM_PARTITIONS] - len(col_meta[PUBLIC_PARTITIONS])
            public_keys_values.extend(RANDOM_STRINGS[0:diff])

    elif col_meta.get(MAX_NUM_PARTITIONS):
        public_keys_values = RANDOM_STRINGS[0 : col_meta[MAX_NUM_PARTITIONS]]
    else:
        public_keys_values = RANDOM_STRINGS[0:DEFAULT_NUMBER_PARTITIONS]

    return pd.Series(rng.choice(public_keys_values, size=nb_rows))

get_bounds(col_meta: dict[str, Any]) -> tuple[T, T]

Get the lower and upper bounds from column metadata.

Parameters:

Name Type Description Default
col_meta dict[str, Any]

Column metadata containing minimum and maximum values.

required

Returns:

Type Description
tuple[T, T]

Tuple containing the minimum and maximum bounds.

Raises:

Type Description
KeyError

If the minimum or maximum bound is missing from the metadata.

Source code in csvw-eo-library/src/csvw_eo/generate_series.py
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
def get_bounds(col_meta: dict[str, Any]) -> tuple[T, T]:
    """
    Get the lower and upper bounds from column metadata.

    Parameters
    ----------
    col_meta : dict[str, Any]
        Column metadata containing minimum and maximum values.

    Returns
    -------
    tuple[T, T]
        Tuple containing the minimum and maximum bounds.

    Raises
    ------
    KeyError
        If the minimum or maximum bound is missing from the metadata.

    """
    if MINIMUM not in col_meta:
        raise KeyError(f"Missing {MINIMUM} in column {col_meta[COL_NAME]}")

    if MAXIMUM not in col_meta:
        raise KeyError(f"Missing {MAXIMUM} in column {col_meta[COL_NAME]}")

    return col_meta[MINIMUM], col_meta[MAXIMUM]

mapping_series(depend_serie: pd.Series, value_map: dict[Any, Any], col_meta: dict[str, Any], rng: np.random.Generator) -> pd.Series

Generate a series based on a valueMap dependency.

Each value in depend_serie is mapped according to col_meta[VALUE_MAP]. If multiple options exist, one is chosen randomly.

Parameters:

Name Type Description Default
depend_serie Series

Series to map from.

required
value_map dict[Any, Any]

Mapping from origin column

required
col_meta dict

Column metadata, must include VALUE_MAP and DATATYPE.

required
rng Generator

Random number generator for choosing among multiple mapping values.

required

Returns:

Type Description
Series

Generated series satisfying MAPPING dependency.

Source code in csvw-eo-library/src/csvw_eo/generate_series.py
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
def mapping_series(
    depend_serie: pd.Series,
    value_map: dict[Any, Any],
    col_meta: dict[str, Any],
    rng: np.random.Generator,
) -> pd.Series:
    """
    Generate a series based on a valueMap dependency.

    Each value in depend_serie is mapped according to col_meta[VALUE_MAP].
    If multiple options exist, one is chosen randomly.

    Parameters
    ----------
    depend_serie : pd.Series
        Series to map from.
    value_map: dict
        Mapping from origin column
    col_meta : dict
        Column metadata, must include VALUE_MAP and DATATYPE.
    rng : np.random.Generator
        Random number generator for choosing among multiple mapping values.

    Returns
    -------
    pd.Series
        Generated series satisfying MAPPING dependency.

    """
    mapped = []
    for val in depend_serie:
        choices = value_map.get(val)

        if isinstance(choices, list):
            mapped.append(rng.choice(choices))
        elif choices is not None:
            mapped.append(choices)
        else:
            mapped.append(pd.NA)

    return pd.Series(mapped, dtype=to_pandas_dtype(col_meta[DATATYPE]))