Skip to content

bartosz25/data-generator-blogging-platform

Repository files navigation

Blogging platform data generation

The project generates different datasets that you can use to test your data engineering applications. It simulates real-life data quality issues such as late, incomplete, or duplicate data.

Components

Data generation relies on different layers: entities, data blockers, data generators, and writers.

Entities

entities_layer.png

This layer is responsible for:

  • defining the entities content (model); typically these are @dataclasses describing a given entity
  • creating the individual entity instances (generator)

Dataset generators

dataset_generation_layer.png

Dataset generators control the dataset generation process, i.e. when the application should stop creating the given dataset. There are currently 3 supported controllers:

  • One-shot: generates the dataset only once
  • Fixed times: runs the generation loop a given number of times
  • Continuous: runs until interrupted by the user; typically useful for data generation for streaming jobs

Dataset generation blockers

dataset_generation_layer.png

Objects from this layer control the dataset generation rate, i.e. whether the generation job should pause between 2 generation loops

There are currently 2 supported blockers:

  • None: there is no pause between the loops.
  • Sleeping: there is a pause of x to y seconds; final value is generated randomly from the input each time

Writers

writer_classes.png

This layer exposes the API for writing the generated rows to the supported data stores.

Data generation loop

The data generation process is in the main_generator.py file:

def generate_dataset(generator: DatasetGenerator, context: DatasetGenerationContext,
                     writer: DatasetWriter):
    rows_to_generate = context.get_rows_to_generate_with_maybe_decorators()
    while generator.should_continue():
        for row_decorator in rows_to_generate:
            row = row_decorator.return_decorated_row()
            logging.debug(f'Generated row is {row}')
            writer.write_dataset_decorated_rows(row)
        writer.flush()

        context.irregular_data_blocker.block()

The snippet shows all the components presented in the previous section. All starts with the generation controller called in the while loop. Later, the return_decorated_row invokes the entity generator while all the writer occurrences the sinks from the writer layer. In the end, the irregular_data_blocker might eventually stop the data generation by calling the configured data blocker.

Configuration

To use the project, you can either adapt one of th examples present in the dataset_generator_examples or use the Docker image.

Docker image

Building

Currently there is one Docker image for all available data stores. This situation may change in the future if you encounter some issues with new I/O integrations or with the libraries upgrades.

Building and releasing a Docker image is a manual task. Adapt the build_image and release_docker_image tasks in the Makefile.

Using

Using the Docker image requires creating a configuration YAML file following the schema

dataset:
  rows: x # number of rows to generate
  composition_percentage:
    duplicates: x # % of duplicates
    missing_fields: x # % of rows with missing fields
    unprocessable_rows: x # % of rows with unprocessable formats
data_blocker:
  type: x # type of the data blocker
entity:
  type: x # type of the generated entity w/ an optional configuration
  configuration:
    property1: x
generator:
  type: x # type of the data generator
writer:
  type: x # type of the dataset writer w/ an optional configuration
  configuration:
    property1: x
  partitions: [...] # it's an optional list of partitions, e.g. for JSON file writer

Since some of the attributes accept complex types, below you can find a more detailed configuration for them.

Data blocker - None

data_blocker:
  type: 'no''

Data blocker - Sleeping

data_blocker:
  type: sleep
  configuration:
    sleep_time_range_seconds:
      from: 2
      to: 6   

Entity - devices or users

entity:
  type: 'device'
entity:
  type: 'user'

Entity - visits

entity:
  type: visit
  configuration:
    start_time: '2023-11-01T00:00:00Z'

start_time defines when the visits start.

Generator - one-shot or continuous

generator:
  type: one-shot
generator:
  type: continuous

Generator - fixed times

generator:
  type: fixed-times
  configuration:
    all_runs: 5

all_runs defines how many times the generation loop should run before quitting

Writer - Apache Kafka

writer:
  type: kafka
  configuration:
    broker: 'localhost:9094'
    output_topic: 'visits'
    extra_producer_config:
      'queue.buffering.max.ms': 2000

extra_producer_config sets the extra configuration to pass to the Kafka Producer. The attributes must be accepted by the confluent_kafka.cimpl.Producer class.

Writer - file system CSV

writer:
  type: csv
  configuration:
    output_path: '/home/data_generator_user/data_generator_output/input'
    clean_path: true
  partitions: ['date=2023-11-01', 'date=2023-11-02', 'date=2023-11-03', 'date=2023-11-04',
               'date=2023-11-05', 'date=2023-11-06', 'date=2023-11-07']

output_path defines where the data should be written. Remember, it's the location on the Docker container and not your localhost. If you mount the volumes, create the mounted directories on your file system first. Otherwise, you may encounter permission issues

clean_path defines whether the writer should delete all files from the output directory before writing the new ones.

partitions defines a list of partitions to write data to. Each partition will be generated from a new data generation loop. As a result, if you configured a fixed-size data generator of 5 runs, there will be 5 generation loops against each partition. The partitions are not related to the entities! They're optional. If you don't define this attribute, data will be written to the output_path directly. Otherwise, the writer will create files under ${output_path}/${partition}.

Writer - file system JSON

writer:
  type: csv
  configuration:
    output_path: '/home/data_generator_user/data_generator_output/input'
    clean_path: true
  partitions: ['date=2023-11-01', 'date=2023-11-02', 'date=2023-11-03', 'date=2023-11-04',
               'date=2023-11-05', 'date=2023-11-06', 'date=2023-11-07']

Writer - PostgreSQL

writer:
  type: postgresql
  configuration:
    host: 'postgres'
    dbname: 'dedp'
    db_schema: 'dedp_schema'
    user: 'postgres'
    password: 'postgres'
    table_name: 'visits'
    table_columns: ['visit_id', 'event_time']
    row_fields_to_insert: ['visitId', 'eventTime']

table_columns defines the list of columns in the output table. row_fields_to_insert maps the attributes of the generated entity to the columns. In the example, the visitId attribute is mapped to the visit_id column and the eventTime to the event_time.

Test

PyCharm

To launch the tests on PyCharm, you need to enable pytest as the test runner for the project. You can see how to do this on jetbrains.com page

Command line

To execute all tests from command line, you can use make test_all command. To check test coverage, you can execute make test_coverage.

Development

virtualenv

Setup a virtualenv environment:

virtualenv -p python3 .venv/

Activate the installed environment:

source .venv/bin/activate

Install dependencies (venv activated):

pip3 install -r requirements.txt

Desactivate the virtualenv:

deactivate

Code checks

Check code format:

make lint_all

Reformat code:

make reformat_all

Pre-commit hook setup

The hook will execute the code formatting before the commit and the unit tests before the push. To install it, please use Pre-commit plugin and pre-commit install command.

pip install pre-commit
pre-commit install

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Packages

No packages published

Languages