Skip to content

Commit

Permalink
Added an asynchronous grading option using asyncio.
Browse files Browse the repository at this point in the history
  • Loading branch information
brhoades committed Mar 4, 2017
1 parent cf7d351 commit 1577aed
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 5 deletions.
53 changes: 53 additions & 0 deletions grader/grader/commands/grade/async.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import asyncio
import concurrent.futures
import logging

logger = logging.getLogger(__name__)


def grade(assignment, submission, user_id, rebuild=False):
"""
Grade a single submission asyncronously.
"""
logger.info("Grading submission for %s", user_id)
submission.grade(assignment, rebuild_container=rebuild,
show_output=False)


def async_grade(args, assignment, users):
"""
Asynchronously grade submissions.
:param argparse.Arguments args: the arguments from the grade
comand.
:param Assignment assignment: the assignment for which to grade
submissions.
:param dict users: user_id: [Submission, ...], all
submissions will be graded.
"""
async def run_blocking_tasks(executor):
loop = asyncio.get_event_loop()
blocking_tasks = []

logger.debug("Creating work queue.")
for user_id, submissions in users.items():
for submission in submissions:
blocking_tasks.append(
loop.run_in_executor(
executor, grade, assignment, submission, user_id,
args.rebuild))

logger.debug("Waiting on pool to finish.")
await asyncio.wait(blocking_tasks)

executor = concurrent.futures.ProcessPoolExecutor(
max_workers=int(args.j)
)
event_loop = asyncio.get_event_loop()

try:
event_loop.run_until_complete(
run_blocking_tasks(executor)
)
finally:
event_loop.close()
17 changes: 12 additions & 5 deletions grader/grader/commands/grade/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from grader.models import Grader
from grader.utils.config import require_grader_config
from grader.commands.grade.async import async_grade

logger = logging.getLogger(__name__)

Expand All @@ -15,6 +16,8 @@ def setup_parser(parser):
help='Rebuild containers (if they exist).')
parser.add_argument('--suppress_output', action='store_false',
help='Don\'t display output.')
parser.add_argument('-j', default="1",
help='How many concurrent containers to grade.')
parser.add_argument('assignment',
help='Name of the assignment to grade.')
parser.add_argument('student_id', nargs='?',
Expand All @@ -36,8 +39,12 @@ def run(args):
logger.error("Cannot find student %s", args.student_id)
return

for user_id, submissions in users.items():
logger.info("Grading submissions for %s", user_id)
for submission in submissions:
submission.grade(a, rebuild_container=args.rebuild,
show_output=args.suppress_output)
if args.j != "1":
async_grade(args, a, users)
else:
for user_id, submissions in users.items():
logger.info("Grading submissions for %s", user_id)

for submission in submissions:
submission.grade(a, rebuild_container=args.rebuild,
show_output=args.suppress_output)

0 comments on commit 1577aed

Please sign in to comment.