Skip to content

Commit

Permalink
Merge pull request #13 from pysql-beam-org/fix-precommit
Browse files Browse the repository at this point in the history
pre-commit fix
  • Loading branch information
pysql beam admin authored Dec 16, 2023
2 parents 86fb3a4 + 52ec8d4 commit 249d4ba
Show file tree
Hide file tree
Showing 14 changed files with 749 additions and 407 deletions.
Empty file.
51 changes: 51 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
repos:
# must be initialized via pre-commit install
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.2.0
hooks:
- id: check-added-large-files # Prevent giant files from being committed
- id: check-docstring-first # Checks a common error of defining a docstring after code.
- id: check-executables-have-shebangs
- id: check-shebang-scripts-are-executable
- id: end-of-file-fixer
exclude_types: [svg]
- id: mixed-line-ending
- id: trailing-whitespace

- repo: https://github.com/pre-commit/mirrors-prettier
rev: v2.6.2
hooks:
- id: prettier
exclude: >
(?x)^(
.*\.yaml
)$
# Check for typos
- repo: https://github.com/codespell-project/codespell
rev: v2.1.0
hooks:
- id: codespell
args: [--ignore-words=allow.txt, --exclude-file=allow.txt]

# - repo: https://github.com/asottile/pyupgrade
# rev: v2.29.1
# hooks:
# - id: pyupgrade
# args: ["--py38-plus"]

- repo: https://github.com/psf/black
rev: 22.3.0
hooks:
- id: black
language_version: python3

- repo: https://github.com/pycqa/flake8
rev: 6.1.0
hooks:
- id: flake8

- repo: https://github.com/pycqa/bandit
rev: 1.7.4
hooks:
- id: bandit
22 changes: 10 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@

This package aim to provide Apache_beam io connector for MySQL, Postgres and MSSQL Database database.


This package provides apache beam io connectors for postgres db, mssql db and mysql db.
This package is a python implementation for those 3 io connectors

FYI: it uses a pyodbc connector for the mssql implementation, but not for the other two connectors
FYI: it uses a pyodbc connector for the mssql implementation, but not for the other two connectors

Requirements:

Expand All @@ -16,14 +15,12 @@ Requirements:
4. psycopg2-binary
5. pyodbc


Installation:

1. pip install pysql-beam
or
1. pip install [email protected]:yesdeepakverma/pysql-beam.git


Current functionality:

1. Read from MySQL database by passing either table name or sql query
Expand All @@ -34,14 +31,15 @@ Current functionality:
Reference Guide:

1. Java IO connector for the same:
https://github.com/spotify/dbeam
https://github.com/spotify/dbeam

2. How to write io connector for Apache Beam:
https://beam.apache.org/documentation/io/developing-io-overview/
https://beam.apache.org/documentation/io/developing-io-python/
https://beam.apache.org/documentation/io/developing-io-overview/

https://beam.apache.org/documentation/io/developing-io-python/

Usage Guide:

```
from pysql_beam.sql_io.sql import ReadFromSQL
Expand All @@ -54,6 +52,7 @@ ReadFromSQL(host=options.host, port=options.port,
batch=100000)
```

Examples:

For mysql:
Expand All @@ -63,8 +62,7 @@ Examples:
`python cloud_sql_to_bigquery.py --host localhost --port 5432 --database SECRET_DATABASE --username SECRET_USER --password SECRET_PASSWORD --table YOUR_TABLE --output_table 'MyProject:MyDataset.MyTable' --temp_location "gs://MyBucket/tmp"`

For mssql:
`python cloud_sql_to_bigquery.py --host localhost --port 1433 --database SECRET_DATABASE --username SECRET_USER --password SECRET_PASSWORD --query 'SELECT * from MyTable' --output_table 'MyProject:MyDataset.MyTable' --temp_location "gs://MyBucket/tmp"`

`python cloud_sql_to_bigquery.py --host localhost --port 1433 --database SECRET_DATABASE --username SECRET_USER --password SECRET_PASSWORD --query 'SELECT * from MyTable' --output_table 'MyProject:MyDataset.MyTable' --temp_location "gs://MyBucket/tmp"`

contribution:
You can contribute to this package by raising bugs or sending pull requests
You can contribute to this package by raising bugs or sending pull requests
Empty file added allow.txt
Empty file.
Empty file modified pysql_beam/__init__.py
100755 → 100644
Empty file.
Empty file modified pysql_beam/examples/__init__.py
100755 → 100644
Empty file.
128 changes: 75 additions & 53 deletions pysql_beam/examples/cloud_sql_to_bigquery.py
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -4,34 +4,36 @@
Command to run this script:
python cloud_sql_to_file.py --host localhost --port 3306 --database SECRET_DATABASE \
--username SECRET_USER --password SECRET_PASSWORD --table YOUR_TABLE --output YOUR_OUTPUT_FLLE
python cloud_sql_to_file.py --host localhost --port 3306 \
--database SECRET_DATABASE --username SECRET_USER --password SECRET_PASSWORD \
--table YOUR_TABLE --output YOUR_OUTPUT_FLLE
For postgres sql:
python cloud_sql_to_file.py --host localhost --port 5432 --database SECRET_DATABASE \
--username SECRET_USER --password SECRET_PASSWORD --table YOUR_TABLE --output YOUR_OUTPUT_FLLE
python cloud_sql_to_file.py --host localhost --port 5432 \
--database SECRET_DATABASE \
--username SECRET_USER --password SECRET_PASSWORD --table YOUR_TABLE \
--output YOUR_OUTPUT_FLLE
"""
import sys
import apache_beam as beam
import json
sys.path.insert(0, '/home/jupyter/My_package/pysql-beam/pysql-beam')

import logging
import apache_beam as beam
from pysql_beam.sql_io.sql import SQLSource, SQLWriter, ReadFromSQL
from pysql_beam.sql_io.wrapper import MySQLWrapper, MSSQLWrapper, PostgresWrapper
from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions, GoogleCloudOptions
from apache_beam.io.gcp.internal.clients import bigquery
from apache_beam.io.gcp.bigquery import WriteToBigQuery, BigQueryDisposition
from apache_beam.options.pipeline_options import (
PipelineOptions,
)

from pysql_beam.sql_io.sql import ReadFromSQL
from pysql_beam.sql_io.wrapper import MSSQLWrapper


def log(row, level="debug"):
getattr(logging, level.lower())(row)
return row


class SQLOptions(PipelineOptions):

@classmethod
def _add_argparse_args(cls, parser):
# parser.add_value_provider_argument('--host', dest='host', required=False)
Expand All @@ -41,53 +43,74 @@ def _add_argparse_args(cls, parser):
# parser.add_value_provider_argument('--query', dest='query', required=False)
# parser.add_value_provider_argument('--username', dest='username', required=False)
# parser.add_value_provider_argument('--password', dest='password', required=False)
#parser.add_value_provider_argument('--db_type', dest='db_type', default="mssql", required=False, help="the type of database; allowed are 'mssql', 'mysql' and 'postgres'")
parser.add_value_provider_argument('--host', dest='host', default="localhost")
parser.add_value_provider_argument('--port', dest='port', default="3306")
parser.add_value_provider_argument('--database', dest='database', default="dverma")
parser.add_value_provider_argument('--query', dest='query', default="SELECT * FROM dverma.userPointsLedger;")
parser.add_value_provider_argument('--username', dest='username', default="dverma")
parser.add_value_provider_argument('--password', dest='password', default="Deepak@123")
#parser.add_value_provider_argument('--output', dest='output', default="abc", help="output file name")
parser.add_argument('--output_table', dest='output_table', required=True,
help=('Output BigQuery table for results specified as: PROJECT:DATASET.TABLE '
'or DATASET.TABLE.'))

# parser.add_value_provider_argument('--db_type', dest='db_type', default="mssql",
# required=False, help="the type of database; allowed are 'mssql', 'mysql' and 'postgres'")
parser.add_value_provider_argument("--host", dest="host", default="localhost")
parser.add_value_provider_argument("--port", dest="port", default="3306")
parser.add_value_provider_argument(
"--database", dest="database", default="dverma"
)
parser.add_value_provider_argument(
"--query", dest="query", default="SELECT * FROM dverma.userPointsLedger;"
)
parser.add_value_provider_argument(
"--username", dest="username", default="dverma"
)
parser.add_value_provider_argument(
"--password", dest="password", default="Deepak@123"
)
# parser.add_value_provider_argument('--output', dest='output', default="abc",
# help="output file name")
parser.add_argument(
"--output_table",
dest="output_table",
required=True,
help=(
"Output BigQuery table for results specified as: PROJECT:DATASET.TABLE "
"or DATASET.TABLE."
),
)


def parse_json(line):
'''Converts line from PubSub back to dictionary
'''
"""Converts line from PubSub back to dictionary"""
record = json.loads(line)
return record


def run():
pipeline_options = PipelineOptions()
options = pipeline_options.view_as(SQLOptions)
#options.view_as(SetupOptions).save_main_session = True
#temp_location = options.view_as(GoogleCloudOptions).temp_location
#print("Here!", temp_location)
# options.view_as(SetupOptions).save_main_session = True
# temp_location = options.view_as(GoogleCloudOptions).temp_location
# print("Here!", temp_location)
pipeline = beam.Pipeline(options=options)



mysql_data = (pipeline | ReadFromSQL(host=options.host, port=options.port,
username=options.username, password=options.password,
database=options.database, query=options.query,
#wrapper={'mssql': MSSQLWrapper, 'mysql': MySQLWrapper, 'postgres': PostgresWrapper}[options.db_type],
wrapper=MSSQLWrapper,
# wrapper=PostgresWrapper
#
)
#| 'Parse' >> beam.Map(parse_json)
| 'Write to Table' >> WriteToBigQuery(
table= options.output_table,
schema = 'SCHEMA_AUTODETECT',
write_disposition=BigQueryDisposition.WRITE_TRUNCATE,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
)

#transformed_data = mysql_data | "Transform records" >> beam.Map(transform_records, 'user_id')

mysql_data = ( # noqa: E402, E501, F841
pipeline
| ReadFromSQL(
host=options.host,
port=options.port,
username=options.username,
password=options.password,
database=options.database,
query=options.query,
# wrapper={'mssql': MSSQLWrapper, 'mysql': MySQLWrapper, 'postgres': PostgresWrapper}[options.db_type],
wrapper=MSSQLWrapper,
# wrapper=PostgresWrapper
#
)
# | 'Parse' >> beam.Map(parse_json)
| "Write to Table"
>> WriteToBigQuery(
table=options.output_table,
schema="SCHEMA_AUTODETECT",
write_disposition=BigQueryDisposition.WRITE_TRUNCATE,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
)
)

# transformed_data = mysql_data | "Transform records" >> beam.Map(transform_records, 'user_id')
# transformed_data | "insert into mysql" >> SQLWriter(options.host, options.port,
# options.username, options.password,
# options.database,
Expand All @@ -97,9 +120,8 @@ def run():
# 'postgres', options.password,
# options.database, table=options.output_table,
# wrapper=PostgresWrapper, autocommit=False, batch_size=500)
#mysql_data | "Log records " >> beam.Map(log) | beam.io.WriteToText(options.output, num_shards=1, file_name_suffix=".json")


# mysql_data | "Log records " >> beam.Map(log)
# | beam.io.WriteToText(options.output, num_shards=1, file_name_suffix=".json")

pipeline.run().wait_until_finish()

Expand Down
Empty file modified pysql_beam/sql_io/__init__.py
100755 → 100644
Empty file.
Empty file modified pysql_beam/sql_io/exceptions.py
100755 → 100644
Empty file.
Loading

0 comments on commit 249d4ba

Please sign in to comment.