Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support named functions #288

Open
gruckion opened this issue Aug 13, 2024 · 1 comment
Open

Support named functions #288

gruckion opened this issue Aug 13, 2024 · 1 comment

Comments

@gruckion
Copy link

It would be very useful if we can define re-usable functions.

version: 1
default_environment: dev
project_id: ee2b1be3-e8f2-4962-a6c8-0af0952c60a5
environments:
  - name: dev
  - name: staging
  - name: prod
send_anonymous_usage_stats: false

plugins:
  extractors:
    - name: tap-mysql
      variant: meltanolabs
      pip_url: git+https://github.com/MeltanoLabs/tap-mysql.git
      config:
        user: root
        port: 3306
        host: 127.0.0.1
        database: mydatabase
        filter_schemas:
          - mydatabase
        ssh_tunnel:
          enable: false
          host: your_ssh_host
          port: 22
          username: your_ssh_username
          private_key: |
            -----BEGIN OPENSSH PRIVATE KEY-----
            Your private key content here
            -----END OPENSSH PRIVATE KEY-----
      select:
        # Core Business Entities
        - "mydatabase-companies.*"
        - "mydatabase-sites.*"
        - "mydatabase-site_details.*"
        - "mydatabase-contacts.*"
        - "mydatabase-contacts_sites.*"

  loaders:
    - name: target-postgres
      variant: meltanolabs
      pip_url: meltanolabs-target-postgres
      config:
        host: 127.0.0.1
        port: 5432
        user: postgres
        dbname: postgres
        database: postgres

  utilities:
    - name: dbt-postgres
      variant: dbt-labs
      pip_url: dbt-core dbt-postgres git+https://github.com/meltano/dbt-ext.git@main
      config:
        host: 127.0.0.1
        port: 5432
        user: postgres
        dbname: postgres
        schema: tap_mysql

  mappers:
    - name: meltano-map-transformer
      variant: meltano
      pip_url: git+https://github.com/MeltanoLabs/meltano-map-transform.git
      mappings:
        - name: transform_dates
          config:
            stream_map_config:
              clean_string: |
                lambda s: s.replace('\0', '') if s else None
              clean_date: |
                lambda d: d if d and '0000' not in d and '-00' not in d else None
              clean_timestamp: |
                lambda t: t if t and '0000' not in t and '-00' not in t else None
            stream_maps:
              "*": # Apply to all tables
                created_at: clean_timestamp(record['created_at'])
                updated_at: clean_timestamp(record['updated_at'])
              mydatabase-contacts:
                dob: clean_date(record['dob'])
              mydatabase-sites:
                last_viewed: clean_timestamp(record['last_viewed'])
                date_moved_in: clean_date(record['date_moved_in'])
                address_postcode: clean_string(record['address_postcode'])
                password: clean_string(record['password'])
                address_1: clean_string(record['address_1'])
                address_2: clean_string(record['address_2'])
                address_3: clean_string(record['address_3'])
                address_county: clean_string(record['address_county'])
                tag: clean_string(record['tag'])
                company_name: clean_string(record['company_name'])
                office_number: clean_string(record['office_number'])
                office_number_2: clean_string(record['office_number_2'])
                sector: clean_string(record['sector'])
                address_town: clean_string(record['address_town'])
              mydatabase-site_details:
                date_incorporated: clean_date(record['date_incorporated'])
              mydatabase-site_sources:
                date_moved_in: clean_date(record['date_moved_in'])

jobs:
  - name: run_etl_with_transform
    tasks:
      - tap-mysql transform_dates target-postgres

Instead of having to repeat the same bit of code over and over. In reality we actually need to do this instead.

version: 1
default_environment: dev
project_id: XXX
environments:
  - name: dev
  - name: staging
  - name: prod
send_anonymous_usage_stats: false

plugins:
  extractors:
    - name: tap-mysql
      variant: meltanolabs
      pip_url: git+https://github.com/MeltanoLabs/tap-mysql.git
      config:
        user: root
        port: 3306
        host: 127.0.0.1
        database: mydatabase
        filter_schemas:
          - mydatabase
        ssh_tunnel:
          enable: false
          host: your_ssh_host
          port: 22
          username: your_ssh_username
          private_key: |
            -----BEGIN OPENSSH PRIVATE KEY-----
            Your private key content here
            -----END OPENSSH PRIVATE KEY-----
      select:
        # Core Business Entities
        - "mydatabase-companies.*"
        - "mydatabase-sites.*"
        - "mydatabase-site_details.*"
        - "mydatabase-contacts.*"
        - "mydatabase-contacts_sites.*"

  loaders:
    - name: target-postgres
      variant: meltanolabs
      pip_url: meltanolabs-target-postgres
      config:
        host: 127.0.0.1
        port: 5432
        user: postgres
        dbname: postgres
        database: postgres

  utilities:
    - name: dbt-postgres
      variant: dbt-labs
      pip_url: dbt-core dbt-postgres git+https://github.com/meltano/dbt-ext.git@main
      config:
        host: 127.0.0.1
        port: 5432
        user: postgres
        dbname: postgres
        schema: tap_mysql

  mappers:
    - name: meltano-map-transformer
      variant: meltano
      pip_url: git+https://github.com/MeltanoLabs/meltano-map-transform.git
      mappings:
        - name: transform_dates
          config:
            stream_maps:
              "*": # Apply to all tables
                created_at:
                  record['created_at'] if record.get('created_at') and '0000'
                  not in record['created_at'] and '-00' not in record['created_at'] else None
                updated_at:
                  record['updated_at'] if record.get('updated_at') and '0000'
                  not in record['updated_at'] and '-00' not in record['updated_at'] else None
          mydatabase-contacts:
            dob:
              record['dob'] if record.get('dob') and '0000' not in record['dob']
              and '-00' not in record['dob'] else None
          mydatabase-sites:
            last_viewed:
              record['last_viewed'] if record.get('last_viewed') and '0000'
              not in record['last_viewed'] and '-00' not in record['last_viewed'] else None
            date_moved_in:
              record['date_moved_in'] if record.get('date_moved_in') and
              '0000' not in record['date_moved_in'] and '-00' not in record['date_moved_in'] else None
            address_postcode: record['address_postcode'].replace('\0', '') if record.get('address_postcode') else None
            password: record['password'].replace('\0', '') if record.get('password') else None
            address_1: record['address_1'].replace('\0', '') if record.get('address_1') else None
            address_2: record['address_2'].replace('\0', '') if record.get('address_2')else None
            address_3: record['address_3'].replace('\0', '') if record.get('address_3')else None
            address_county: record['address_county'].replace('\0', '') if record.get('address_county')else None
            tag: record['tag'].replace('\0', '') if record.get('tag') else None
            company_name: record['company_name'].replace('\0', '') if record.get('company_name')  else None
            office_number: record['office_number'].replace('\0', '') if record.get('office_number') else None
            office_number_2: record['office_number_2'].replace('\0', '') if record.get('office_number_2') else None
            sector: record['sector'].replace('\0', '') if record.get('sector') else  None
            address_town: record['address_town'].replace('\0', '') if record.get('address_town') else None
          mydatabase-site_details:
            date_incorporated:
              record['date_incorporated'] if record.get('date_incorporated')
              and '0000' not in record['date_incorporated'] and '-00' not in record['date_incorporated'] else None
          mydatabase-site_sources:
            date_moved_in:
              record['date_moved_in'] if record.get('date_moved_in') and
              '0000' not in record['date_moved_in'] and '-00' not in record['date_moved_in'] else None

jobs:
  - name: run_etl_with_transform
    tasks:
      - tap-mysql transform_dates target-postgres
@edgarrmondragon
Copy link
Member

edgarrmondragon commented Aug 13, 2024

We could support a new key (e.g. user_functions) under stream_map_config, then parse and cache each function definition and pass it to the simpleeval context.

EDIT: That should be done in https://github.com/meltano/sdk.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: No status
Development

No branches or pull requests

2 participants