Skip to content

✨ Add Async SQLAlchemy #11

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions benchmarks/bench.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ django/bench.sh | tee -a outfile1
peewee/bench.sh | tee -a outfile1
pony/bench.sh | tee -a outfile1
sqlalchemy/bench.sh | tee -a outfile1
sqlalchemy_async/bench.sh | tee -a outfile1
sqlobject/bench.sh | tee -a outfile1
tortoise/bench.sh | tee -a outfile1

Expand All @@ -33,6 +34,7 @@ django/bench.sh | tee -a outfile2
peewee/bench.sh | tee -a outfile2
pony/bench.sh | tee -a outfile2
sqlalchemy/bench.sh | tee -a outfile2
sqlalchemy_async/bench.sh | tee -a outfile2
sqlobject/bench.sh | tee -a outfile2
tortoise/bench.sh | tee -a outfile2

Expand All @@ -45,6 +47,7 @@ django/bench.sh | tee -a outfile3
peewee/bench.sh | tee -a outfile3
pony/bench.sh | tee -a outfile3
sqlalchemy/bench.sh | tee -a outfile3
sqlalchemy_async/bench.sh | tee -a outfile3
sqlobject/bench.sh | tee -a outfile3
tortoise/bench.sh | tee -a outfile3

Expand Down
40 changes: 40 additions & 0 deletions benchmarks/sqlalchemy_async/bench.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#!/usr/bin/env python
import asyncio

import test_a
import test_b
import test_c
import test_d
import test_e
import test_f
import test_g
import test_h
import test_i
import test_j
import test_k
from models import Base, engine, loopstr


async def create_db():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)


async def run_benchmarks():
await create_db()
await test_a.runtest(loopstr)
await test_b.runtest(loopstr)
await test_c.runtest(loopstr)
await test_d.runtest(loopstr)
await test_e.runtest(loopstr)
await test_f.runtest(loopstr)
await test_g.runtest(loopstr)
await test_h.runtest(loopstr)
await test_i.runtest(loopstr)
await test_j.runtest(loopstr)
await test_k.runtest(loopstr)


if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(run_benchmarks())
15 changes: 15 additions & 0 deletions benchmarks/sqlalchemy_async/bench.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#!/bin/sh

cd $(dirname $0)

PYPY=`python -V | grep PyPy`

../db.sh

if [ -z "$PYPY" ]; then
# run uvloop benchmarks
PYTHONUNBUFFERED=x UVLOOP=1 python -m bench
else
# run regular loop benchmarks
PYTHONUNBUFFERED=x python -m bench
fi
146 changes: 146 additions & 0 deletions benchmarks/sqlalchemy_async/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
import os
import sys
from datetime import datetime
from decimal import Decimal

from sqlalchemy import (JSON, BigInteger, Column, DateTime, Float, ForeignKey, Integer, Numeric,
SmallInteger, String, Text)
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship

try:
concurrents = int(os.environ.get("CONCURRENTS", "10"))

if concurrents != 10:
loopstr = f" C{concurrents}"
else:
loopstr = ""
if os.environ.get("UVLOOP", ""):
import asyncio

import uvloop

asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
finally:
pass

if concurrents > 1 and sys.version_info < (3, 7):
sys.exit()

dbtype = os.environ.get("DBTYPE", "")
if dbtype == "postgres":
engine = create_async_engine(
f"postgres+asyncpg://postgres:{os.environ.get('PASSWORD')}@127.0.0.1:5432/tbench?minsize={concurrents}&maxsize={concurrents}"
)
elif dbtype == "mysql":
engine = create_async_engine(
f"mysql+aiomysql://root:{os.environ.get('PASSWORD')}@127.0.0.1:3306/tbench?minsize={concurrents}&maxsize={concurrents}"
)
else:
engine = create_async_engine(
"sqlite+aiosqlite:////dev/shm/db.sqlite3",
connect_args={"check_same_thread": False},
)

Base = declarative_base()

test = int(os.environ.get("TEST", "1"))
if test == 1:

class Journal(Base):
__tablename__ = "journal"

id = Column(Integer, primary_key=True)
timestamp = Column(DateTime, default=datetime.now, nullable=False)
level = Column(SmallInteger, index=True, nullable=False)
text = Column(String(255), index=True, nullable=False)


if test == 2:

class JournalRelated(Base):
__tablename__ = "journal_related"
journal_id = Column(Integer, ForeignKey("journal.id"), primary_key=True)
journal_from_id = Column(Integer, ForeignKey("journal.id"), primary_key=True)

class Journal(Base):
__tablename__ = "journal"

id = Column(Integer, primary_key=True)
timestamp = Column(DateTime, default=datetime.now, nullable=False)
level = Column(SmallInteger, index=True, nullable=False)
text = Column(String(255), index=True, nullable=False)
parent_id = Column(Integer, ForeignKey("journal.id"))
parent = relationship("Journal", remote_side=id, backref="children")
related = relationship(
"JournalRelated", backref="to", primaryjoin=id == JournalRelated.journal_id
)
related_from = relationship(
"JournalRelated",
backref="from",
primaryjoin=id == JournalRelated.journal_from_id,
)


if test == 3:

class Journal(Base):
__tablename__ = "journal"

id = Column(Integer, primary_key=True)
timestamp = Column(DateTime, default=datetime.now, nullable=False)
level = Column(SmallInteger, index=True, nullable=False)
text = Column(String(255), index=True, nullable=False)

col_float1 = Column(Float, default=2.2, nullable=False)
col_smallint1 = Column(SmallInteger, default=2, nullable=False)
col_int1 = Column(Integer, default=2000000, nullable=False)
col_bigint1 = Column(BigInteger, default=99999999, nullable=False)
col_char1 = Column(String(255), default="value1", nullable=False)
col_text1 = Column(
Text,
default="Moo,Foo,Baa,Waa,Moo,Foo,Baa,Waa,Moo,Foo,Baa,Waa",
nullable=False,
)
col_decimal1 = Column(Numeric(12, 8), default=Decimal("2.2"), nullable=False)
col_json1 = Column(
JSON,
default={"a": 1, "b": "b", "c": [2], "d": {"e": 3}, "f": True},
nullable=False,
)

col_float2 = Column(Float)
col_smallint2 = Column(SmallInteger)
col_int2 = Column(Integer)
col_bigint2 = Column(BigInteger)
col_char2 = Column(String(255))
col_text2 = Column(Text)
col_decimal2 = Column(Numeric(12, 8))
col_json2 = Column(JSON)

col_float3 = Column(Float, default=2.2, nullable=False)
col_smallint3 = Column(SmallInteger, default=2, nullable=False)
col_int3 = Column(Integer, default=2000000, nullable=False)
col_bigint3 = Column(BigInteger, default=99999999, nullable=False)
col_char3 = Column(String(255), default="value1", nullable=False)
col_text3 = Column(
Text,
default="Moo,Foo,Baa,Waa,Moo,Foo,Baa,Waa,Moo,Foo,Baa,Waa",
nullable=False,
)
col_decimal3 = Column(Numeric(12, 8), default=Decimal("2.2"), nullable=False)
col_json3 = Column(
JSON,
default={"a": 1, "b": "b", "c": [2], "d": {"e": 3}, "f": True},
nullable=False,
)

col_float4 = Column(Float)
col_smallint4 = Column(SmallInteger)
col_int4 = Column(Integer)
col_bigint4 = Column(BigInteger)
col_char4 = Column(String(255))
col_text4 = Column(Text)
col_decimal4 = Column(Numeric(12, 8))
col_json4 = Column(JSON)
31 changes: 31 additions & 0 deletions benchmarks/sqlalchemy_async/test_a.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import asyncio
import os
import time
from random import choice

from models import Journal, engine
from sqlalchemy.ext.asyncio import AsyncSession

LEVEL_CHOICE = [10, 20, 30, 40, 50]
concurrents = int(os.environ.get("CONCURRENTS", "10"))
count = int(os.environ.get("ITERATIONS", "1000"))
count = int(count // concurrents) * concurrents


async def _runtest(count):
async with AsyncSession(engine) as session:
for i in range(count):
session.add(
Journal(level=choice(LEVEL_CHOICE), text=f"Insert from A, item {i}")
)
await session.commit()


async def runtest(loopstr):
start = now = time.time()

await asyncio.gather(*[_runtest(count // concurrents) for _ in range(concurrents)])

now = time.time()

print(f"Async SQLAlchemy ORM{loopstr}, A: Rows/sec: {count / (now - start): 10.2f}")
31 changes: 31 additions & 0 deletions benchmarks/sqlalchemy_async/test_b.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import asyncio
import os
import time
from random import choice

from models import Journal, engine
from sqlalchemy.ext.asyncio import AsyncSession

LEVEL_CHOICE = [10, 20, 30, 40, 50]
concurrents = int(os.environ.get("CONCURRENTS", "10"))
count = int(os.environ.get("ITERATIONS", "1000"))
count = int(count // concurrents) * concurrents


async def _runtest(count):
async with AsyncSession(engine) as session:
for i in range(count):
session.add(
Journal(level=choice(LEVEL_CHOICE), text=f"Insert from B, item {i}")
)
await session.commit()


async def runtest(loopstr):
start = now = time.time()

await asyncio.gather(*[_runtest(count // concurrents) for _ in range(concurrents)])

now = time.time()

print(f"Async SQLAlchemy ORM{loopstr}, B: Rows/sec: {count / (now - start): 10.2f}")
34 changes: 34 additions & 0 deletions benchmarks/sqlalchemy_async/test_c.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import asyncio
import os
import time
from random import choice

from models import Journal, engine
from sqlalchemy.ext.asyncio import AsyncSession

LEVEL_CHOICE = [10, 20, 30, 40, 50]
concurrents = int(os.environ.get("CONCURRENTS", "10"))
count = int(os.environ.get("ITERATIONS", "1000"))
count = int(count // concurrents) * concurrents


async def _runtest(count):
async with AsyncSession(engine) as session:
# NOTE: `bulk_save_objects` is not available.
session.add_all(
[
Journal(level=choice(LEVEL_CHOICE), text=f"Insert from C, item {i}")
for i in range(count)
]
)
await session.commit()


async def runtest(loopstr):
start = now = time.time()

await asyncio.gather(*[_runtest(count // concurrents) for _ in range(concurrents)])

now = time.time()

print(f"Async SQLAlchemy ORM{loopstr}, C: Rows/sec: {count / (now - start): 10.2f}")
44 changes: 44 additions & 0 deletions benchmarks/sqlalchemy_async/test_d.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import asyncio
import os
import time
from random import choice

from models import Journal, engine
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession

LEVEL_CHOICE = [10, 20, 30, 40, 50]
concurrents = int(os.environ.get("CONCURRENTS", "10"))


async def _runtest(inrange):
count = 0

async with AsyncSession(engine) as session:
for _ in range(inrange):
for level in LEVEL_CHOICE:
res = (
(
await session.execute(
select(Journal).where(Journal.level == level)
)
)
.scalars()
.all()
)
count += len(res)
return count


async def runtest(loopstr):
inrange = 10 // concurrents
if inrange < 1:
inrange = 1

start = now = time.time()

count = sum(await asyncio.gather(*[_runtest(inrange) for _ in range(concurrents)]))

now = time.time()

print(f"Async SQLAlchemy ORM{loopstr}, D: Rows/sec: {count / (now - start): 10.2f}")
Loading