Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concurrent Grading using asyncio #33

Closed
wants to merge 9 commits into from
2 changes: 1 addition & 1 deletion grader/grader/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
("build", "grader.commands.build"),
("import", "grader.commands.import"),
("list", "grader.commands.list"),
("grade", "grader.commands.grade"),
("grade", "grader.commands.grade.main"),
("inspect", "grader.commands.inspect"),
("cat", "grader.commands.cat"),
("report", "grader.commands.report"),
Expand Down
65 changes: 65 additions & 0 deletions grader/grader/commands/grade/async.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import asyncio
import concurrent.futures
import logging
from threading import Lock

logger = logging.getLogger(__name__)


def grade(submission, print_lock, rebuild, suppress_output):
"""
Grade a single submission asynchronously. Locks to print
output if output is not suppressed.
"""
logger.info("Grading submission for %s", submission.user_id)
output = submission.grade(
rebuild_container=rebuild, show_output=False
)

if not suppress_output:
# Synchronously print
logger.debug("Waiting for print lock.")
with print_lock:
print(output)


def async_grade(args, users):
"""
Asynchronously grade submissions.

:param argparse.Namespace args: the arguments from the grade
comand.
:param dict users: user_id: [Submission, ...], all
submissions will be graded.
"""
print_lock = Lock()

@asyncio.coroutine
def run_blocking_tasks(executor):
loop = asyncio.get_event_loop()
blocking_tasks = []

logger.debug("Creating work queue.")
for user_id, submissions in users.items():
for submission in submissions:
blocking_tasks.append(
loop.run_in_executor(
executor, grade, submission,
print_lock, args.rebuild,
args.suppress_output))

logger.debug("Waiting on pool to finish.")
await asyncio.wait(blocking_tasks)

logger.debug("Spawning a worker pool with %s workers.", args.jobs)
executor = concurrent.futures.ThreadPoolExecutor(
max_workers=int(args.jobs)
)
event_loop = asyncio.get_event_loop()

try:
event_loop.run_until_complete(
run_blocking_tasks(executor)
)
finally:
event_loop.close()
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from grader.models import Grader
from grader.utils.config import require_grader_config
from grader.commands.grade.async import async_grade

logger = logging.getLogger(__name__)

Expand All @@ -13,8 +14,10 @@
def setup_parser(parser):
parser.add_argument('--rebuild', action='store_true',
help='Rebuild containers (if they exist).')
parser.add_argument('--suppress_output', action='store_false',
parser.add_argument('--suppress_output', action='store_true',
help='Don\'t display output.')
parser.add_argument('--jobs', '-j', default="1",
help='How many concurrent containers to grade.')
parser.add_argument('assignment',
help='Name of the assignment to grade.')
parser.add_argument('student_id', nargs='?',
Expand All @@ -36,8 +39,12 @@ def run(args):
logger.error("Cannot find student %s", args.student_id)
return

if args.jobs != "1":
return async_grade(args, users)

for user_id, submissions in users.items():
logger.info("Grading submissions for %s", user_id)

for submission in submissions:
submission.grade(a, rebuild_container=args.rebuild,
show_output=args.suppress_output)
submission.grade(rebuild_container=args.rebuild,
show_output=(not args.suppress_output))
15 changes: 7 additions & 8 deletions grader/grader/models/submission.py
Original file line number Diff line number Diff line change
Expand Up @@ -669,18 +669,16 @@ def _record_output(self, output):

logger.info("Wrote to %s", path)

def grade(self, assignment, rebuild_container=False, show_output=True):
def grade(self, rebuild_container=False, show_output=True):
"""Performs the magic--- prepares the docker container,
runs the grade command, and writes to logs.

:param Assignment assignment: The assignment we're grading, used for
results directory.
:param bool rebuild_container: Whether to discard the old
container and build a new one instead. Defaults to False.
:param bool show_output: Whether to output STDOUT/STDERR from the
container to STDOUT. Defaults to True.

:return: None
:return: output str from the container.

"""
c_id = self.get_container_id(rebuild=rebuild_container)
Expand Down Expand Up @@ -710,8 +708,9 @@ def grade(self, assignment, rebuild_container=False, show_output=True):
print(line, end="")
output_text.write(line)

self._record_output(output_text.getvalue())
output_text = output_text.getvalue()
self._record_output(output_text)

self.docker_cli.stop(
container=c_id
)
self.docker_cli.stop(container=c_id)

return output_text
1 change: 1 addition & 0 deletions misc/completions/_grader.zsh
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ _grader_completion() {
'--help[View help for grade and exit]' \
'--rebuild[Rebuild cointainers (if they exist)]' \
"--suppress_output[Don't display output]" \
"--jobs[Number of containers to grade concurrently]: :" \
"1: :{_describe 'assignments' assignments}" \
"2: :{_describe 'students' students }"

Expand Down