Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add connection pooling to MySQL and Postgres #900

Merged
merged 26 commits into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
19eda09
Refactor MySQLConnection to use connection pooling
josephmancuso Oct 21, 2024
fd7d200
Refactor PostgresConnection to use connection pooling
josephmancuso Oct 22, 2024
3ff0a83
Refactor MySQLConnection to use connection pooling
josephmancuso Oct 26, 2024
bc08c93
Refactor MySQLConnection to use connection pooling and add connection…
josephmancuso Oct 27, 2024
28418eb
Refactor MySQLConnection to use connection pooling and initialize con…
josephmancuso Oct 27, 2024
341db08
Refactor MySQLConnection to use connection pooling and remove debug p…
josephmancuso Oct 27, 2024
1ad5ad7
Refactor MySQLConnection to use connection pooling and update connect…
josephmancuso Oct 27, 2024
ba7372f
Refactor MySQLConnection to use connection pooling and update connect…
josephmancuso Oct 27, 2024
cf9bc84
Refactor MySQLConnection to use connection pooling and update connect…
josephmancuso Oct 27, 2024
3010894
Refactor MySQLConnection to use connection pooling and update connect…
josephmancuso Oct 27, 2024
db112ca
format
josephmancuso Oct 27, 2024
8291bc7
Refactor PostgresConnection to use connection pooling and update conn…
josephmancuso Oct 27, 2024
4cbd0e4
Refactor PostgresConnection to remove unused variable
josephmancuso Oct 27, 2024
a4656b9
Refactor PostgresConnection to enable connection pooling with a maxim…
josephmancuso Oct 27, 2024
6a237fa
Refactor database configuration to update connection pool minimum size
josephmancuso Oct 27, 2024
6c4ff02
Refactor SQLiteConnection to use a separate method for creating the c…
josephmancuso Oct 27, 2024
d14c826
Refactor SQLiteConnection to use a separate method for creating the c…
josephmancuso Oct 27, 2024
76590f8
linted
josephmancuso Oct 27, 2024
d28435b
Refactor PostgresConnection to handle closed connections during query…
josephmancuso Oct 27, 2024
9475b9e
Refactor PostgresConnection to handle closed connections during query…
josephmancuso Oct 27, 2024
09d082b
Refactor MySQLConnection to handle closed connections during query ex…
josephmancuso Oct 27, 2024
e466282
Refactor MySQLConnection to handle closed connections during query ex…
josephmancuso Oct 27, 2024
94b1347
Refactor MySQLConnection to handle closed connections during query ex…
josephmancuso Oct 27, 2024
fd6785f
linted
josephmancuso Oct 27, 2024
0468c45
Refactor MySQLConnection to handle closed connections during query ex…
josephmancuso Oct 27, 2024
7a6e7a8
linted
josephmancuso Oct 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 81 additions & 30 deletions src/masoniteorm/connections/MySQLConnection.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,16 @@ def __init__(
if str(port).isdigit():
self.port = int(self.port)
self.database = database

self.user = user
self.password = password
self.prefix = prefix
self.full_details = full_details or {}
self.connection_pool_size = (
full_details.get(
"connection_pooling_max_size", 100
)
)
self.options = options or {}
self._cursor = None
self.open = 0
Expand All @@ -48,42 +54,80 @@ def make_connection(self):
if self._dry:
return

if self.has_global_connection():
return self.get_global_connection()

# Check if there is an available connection in the pool
self._connection = self.create_connection()
self.enable_disable_foreign_keys()

return self

def close_connection(self):
if (
self.full_details.get("connection_pooling_enabled")
and len(CONNECTION_POOL) < self.connection_pool_size
):
CONNECTION_POOL.append(self._connection)
self.open = 0
self._connection = None

def create_connection(self, autocommit=True):

try:
import pymysql
except ModuleNotFoundError:
raise DriverNotFound(
"You must have the 'pymysql' package installed to make a connection to MySQL. Please install it using 'pip install pymysql'"
"You must have the 'pymysql' package "
"installed to make a connection to MySQL. "
"Please install it using 'pip install pymysql'"
)
import pendulum
import pymysql.converters

try:
import pendulum
import pymysql.converters

pymysql.converters.conversions[
pendulum.DateTime
] = pymysql.converters.escape_datetime
except ImportError:
pass

if self.has_global_connection():
return self.get_global_connection()

self._connection = pymysql.connect(
cursorclass=pymysql.cursors.DictCursor,
autocommit=True,
host=self.host,
user=self.user,
password=self.password,
port=self.port,
db=self.database,
**self.options
pymysql.converters.conversions[pendulum.DateTime] = (
pymysql.converters.escape_datetime
)

self.enable_disable_foreign_keys()
# Initialize the connection pool if the option is set
initialize_size = self.full_details.get("connection_pooling_min_size")
if initialize_size and len(CONNECTION_POOL) < initialize_size:
for _ in range(initialize_size - len(CONNECTION_POOL)):
connection = pymysql.connect(
cursorclass=pymysql.cursors.DictCursor,
autocommit=autocommit,
host=self.host,
user=self.user,
password=self.password,
port=self.port,
database=self.database,
**self.options
)
CONNECTION_POOL.append(connection)

if (
self.full_details.get("connection_pooling_enabled")
and CONNECTION_POOL
and len(CONNECTION_POOL) > 0
):
connection = CONNECTION_POOL.pop()
else:
connection = pymysql.connect(
cursorclass=pymysql.cursors.DictCursor,
autocommit=autocommit,
host=self.host,
user=self.user,
password=self.password,
port=self.port,
database=self.database,
**self.options
)

connection.close = self.close_connection

self.open = 1

return self
return connection

def reconnect(self):
self._connection.connect()
Expand Down Expand Up @@ -139,15 +183,19 @@ def get_cursor(self):
return self._cursor

def query(self, query, bindings=(), results="*"):
"""Make the actual query that will reach the database and come back with a result.
"""Make the actual query that
will reach the database and come back with a result.

Arguments:
query {string} -- A string query. This could be a qmarked string or a regular query.
query {string} -- A string query.
This could be a qmarked string or a regular query.
bindings {tuple} -- A tuple of bindings

Keyword Arguments:
results {str|1} -- If the results is equal to an asterisks it will call 'fetchAll'
else it will return 'fetchOne' and return a single record. (default: {"*"})
results {str|1} -- If the results is equal to an
asterisks it will call 'fetchAll'
else it will return 'fetchOne' and
return a single record. (default: {"*"})

Returns:
dict|None -- Returns a dictionary of results or None
Expand All @@ -156,7 +204,10 @@ def query(self, query, bindings=(), results="*"):
if self._dry:
return {}

if not self._connection.open:
if not self.open:
if self._connection is None:
self._connection = self.create_connection()

self._connection.connect()

self._cursor = self._connection.cursor()
Expand Down
76 changes: 64 additions & 12 deletions src/masoniteorm/connections/PostgresConnection.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@ def __init__(
self.database = database
self.user = user
self.password = password

self.prefix = prefix
self.full_details = full_details or {}
self.connection_pool_size = full_details.get("connection_pooling_max_size", 100)
self.options = options or {}
self._cursor = None
self.transaction_level = 0
Expand All @@ -56,16 +58,7 @@ def make_connection(self):
if self.has_global_connection():
return self.get_global_connection()

schema = self.schema or self.full_details.get("schema")

self._connection = psycopg2.connect(
database=self.database,
user=self.user,
password=self.password,
host=self.host,
port=self.port,
options=f"-c search_path={schema}" if schema else "",
)
self._connection = self.create_connection()

self._connection.autocommit = True

Expand All @@ -75,6 +68,53 @@ def make_connection(self):

return self

def create_connection(self):
import psycopg2

# Initialize the connection pool if the option is set
initialize_size = self.full_details.get("connection_pooling_min_size")
if (
self.full_details.get("connection_pooling_enabled")
and initialize_size
and len(CONNECTION_POOL) < initialize_size
):
for _ in range(initialize_size - len(CONNECTION_POOL)):
connection = psycopg2.connect(
database=self.database,
user=self.user,
password=self.password,
host=self.host,
port=self.port,
options=(
f"-c search_path={self.schema or self.full_details.get('schema')}"
if self.schema or self.full_details.get("schema")
else ""
),
)
CONNECTION_POOL.append(connection)

if (
self.full_details.get("connection_pooling_enabled")
and CONNECTION_POOL
and len(CONNECTION_POOL) > 0
):
connection = CONNECTION_POOL.pop()
else:
connection = psycopg2.connect(
database=self.database,
user=self.user,
password=self.password,
host=self.host,
port=self.port,
options=(
f"-c search_path={self.schema or self.full_details.get('schema')}"
if self.schema or self.full_details.get("schema")
else ""
),
)

return connection

def get_database_name(self):
return self.database

Expand All @@ -93,6 +133,17 @@ def get_default_post_processor(cls):
def reconnect(self):
pass

def close_connection(self):
if (
self.full_details.get("connection_pooling_enabled")
and len(CONNECTION_POOL) < self.connection_pool_size
):
CONNECTION_POOL.append(self._connection)
else:
self._connection.close()

self._connection = None

def commit(self):
"""Transaction"""
if self.get_transaction_level() == 1:
Expand Down Expand Up @@ -140,7 +191,7 @@ def query(self, query, bindings=(), results="*"):
dict|None -- Returns a dictionary of results or None
"""
try:
if self._connection.closed:
if not self._connection or self._connection.closed:
self.make_connection()

self.set_cursor()
Expand All @@ -164,4 +215,5 @@ def query(self, query, bindings=(), results="*"):
finally:
if self.get_transaction_level() <= 0:
self.open = 0
self._connection.close()
self.close_connection()
# self._connection.close()
9 changes: 9 additions & 0 deletions tests/integrations/config/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
They can be named whatever you want.
"""


DATABASES = {
"default": "mysql",
"mysql": {
Expand All @@ -37,6 +38,9 @@
"options": {"charset": "utf8mb4"},
"log_queries": True,
"propagate": False,
"connection_pooling_enabled": True,
"connection_pooling_max_size": 10,
"connection_pooling_min_size": None,
},
"t": {"driver": "sqlite", "database": "orm.sqlite3", "log_queries": True, "foreign_keys": True},
"devprod": {
Expand Down Expand Up @@ -69,6 +73,9 @@
"password": os.getenv("POSTGRES_DATABASE_PASSWORD"),
"database": os.getenv("POSTGRES_DATABASE_DATABASE"),
"port": os.getenv("POSTGRES_DATABASE_PORT"),
"connection_pooling_enabled": True,
"connection_pooling_max_size": 10,
"connection_pooling_min_size": 2,
"prefix": "",
"log_queries": True,
"propagate": False,
Expand Down Expand Up @@ -101,6 +108,8 @@
"authentication": "ActiveDirectoryPassword",
"driver": "ODBC Driver 17 for SQL Server",
"connection_timeout": 15,
"connection_pooling": False,
"connection_pooling_size": 100,
},
},
}
Expand Down
Loading