Skip to content

Commit

Permalink
ADD: Support for multi-file submissions and result versions (#97)
Browse files Browse the repository at this point in the history
* UPDATE: checkpoint changes

* FIX: dynamic graphql mutations for different IPA versions

* FIX: check for optional kwarg
  • Loading branch information
thearchitector authored Feb 23, 2021
1 parent 8f8960d commit 8321f13
Show file tree
Hide file tree
Showing 5 changed files with 191 additions and 41 deletions.
125 changes: 90 additions & 35 deletions indico/queries/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
from indico.client.request import GraphQLRequest, RequestChain, Debouncer
from indico.errors import IndicoError, IndicoInputError
from indico.queries.storage import UploadDocument
from indico.types import Job, Submission, Workflow
from indico.types import Job, Submission, Workflow, SUBMISSION_RESULT_VERSIONS
from indico.types.utils import cc_to_snake, snake_to_cc


class ListWorkflows(GraphQLRequest):
Expand Down Expand Up @@ -83,8 +84,7 @@ def __init__(self, workflow_id: int, enable_review: bool):
query = self.query.replace("<QUERY NAME>", self.query_name)
query = query.replace("<TOGGLE>", self.toggle)
super().__init__(
query,
variables={"workflowId": workflow_id, "reviewState": enable_review},
query, variables={"workflowId": workflow_id, "reviewState": enable_review},
)

def process_response(self, response) -> Workflow:
Expand Down Expand Up @@ -134,23 +134,24 @@ def requests(self):
class _WorkflowSubmission(GraphQLRequest):

query = """
mutation workflowSubmissionMutation($workflowId: Int!, ${arg_}: {type_}, $recordSubmission: Boolean) {{
{mutation_name}(workflowId: $workflowId, {arg_}: ${arg_}, recordSubmission: $recordSubmission) {{
mutation workflowSubmissionMutation({signature}) {{
{mutation_name}({args}) {{
jobIds
submissionIds
}}
}}
"""

detailed_query = """
mutation workflowSubmissionMutation($workflowId: Int!, ${arg_}: {type_}, $recordSubmission: Boolean) {{
{mutation_name}(workflowId: $workflowId, {arg_}: ${arg_}, recordSubmission: $recordSubmission) {{
mutation workflowSubmissionMutation({signature}) {{
{mutation_name}({args}) {{
submissionIds
submissions {{
id
datasetId
workflowId
status
<SUBQUERY>
inputFile
inputFilename
resultFile
Expand All @@ -160,31 +161,54 @@ class _WorkflowSubmission(GraphQLRequest):
}}
}}
"""
files_subquery = """
inputFiles {{
filepath
filename
}}
""".strip()

query_format = {"arg_": "files", "type_": "[FileInput]!"}
mutation_name = "workflowSubmission"
mutation_args = {
"workflowId": "Int!",
"files": "[FileInput]!",
"recordSubmission": "Boolean",
"bundle": "Boolean",
"resultVersion": "SubmissionResultVersion",
}

def __init__(
self,
workflow_id: int,
submission: bool,
files: List[str] = None,
urls: List[str] = None,
detailed_response: bool = False,
self, detailed_response: bool, **kwargs,
):
self.workflow_id = workflow_id
self.record_submission = submission
self.workflow_id = kwargs["workflow_id"]
self.record_submission = kwargs["record_submission"]

# construct mutation signature and args based on provided kwargs to ensure
# backwards-compatible graphql calls
#
# inputFiles, bundle, and resultVersion only avaliable on IPA 4.9.0+
subq = (
self.files_subquery
if kwargs.get("bundle") or kwargs.get("result_version")
else ""
)
q = (
self.detailed_query.replace("<SUBQUERY>", subq)
if detailed_response
else self.query
)

q = self.detailed_query if detailed_response else self.query
args = [
_arg for _arg in self.mutation_args.keys() if kwargs.get(cc_to_snake(_arg))
]
signature = ",".join(f"${_arg}: {self.mutation_args[_arg]}" for _arg in args)
args = ",".join(f"{_arg}: ${_arg}" for _arg in args)

super().__init__(
query=q.format(mutation_name=self.mutation_name, **self.query_format),
variables={
"files": files,
"urls": urls,
"workflowId": workflow_id,
"recordSubmission": submission,
},
query=q.format(
mutation_name=self.mutation_name, signature=signature, args=args
),
variables={snake_to_cc(var): val for var, val in kwargs.items()},
)

def process_response(self, response):
Expand All @@ -199,12 +223,13 @@ def process_response(self, response):


class _WorkflowUrlSubmission(_WorkflowSubmission):
query_format = {"arg_": "urls", "type_": "[String]!"}
mutation_name = "workflowUrlSubmission"
mutation_args = {**_WorkflowSubmission.mutation_args, "urls": "[String]!"}
del mutation_args["files"]


class WorkflowSubmission(RequestChain):
"""
f"""
Submit files to a workflow for processing.
One of `files` or `urls` is required.
Expand All @@ -216,6 +241,11 @@ class WorkflowSubmission(RequestChain):
Defaults to True.
If False, files will be processed as AsyncJobs, ignoring any workflow
post-processing steps like Review and with no record in the system
bundle (bool, optional): Batch all files under a single submission id
result_version (str, optional):
The format of the submission result file. One of:
{SUBMISSION_RESULT_VERSIONS}
If bundle is enabled, this must be version TWO or later.
Returns:
List[int]: If `submission`, these will be submission ids.
Expand All @@ -231,11 +261,16 @@ def __init__(
files: List[str] = None,
urls: List[str] = None,
submission: bool = True,
bundle: bool = False,
result_version: str = None,
):
self.workflow_id = workflow_id
self.files = files
self.urls = urls
self.submission = submission
self.bundle = bundle
self.result_version = result_version

if not self.files and not self.urls:
raise IndicoInputError("One of 'files' or 'urls' must be specified")
elif self.files and self.urls:
Expand All @@ -245,22 +280,26 @@ def requests(self):
if self.files:
yield UploadDocument(files=self.files)
yield _WorkflowSubmission(
self.detailed_response,
workflow_id=self.workflow_id,
record_submission=self.submission,
files=self.previous,
submission=self.submission,
detailed_response=self.detailed_response,
bundle=self.bundle,
result_version=self.result_version,
)
elif self.urls:
yield _WorkflowUrlSubmission(
self.detailed_response,
workflow_id=self.workflow_id,
record_submission=self.submission,
urls=self.urls,
submission=self.submission,
detailed_response=self.detailed_response,
bundle=self.bundle,
result_version=self.result_version,
)


class WorkflowSubmissionDetailed(WorkflowSubmission):
"""
f"""
Submit files to a workflow for processing.
One of `files` or `urls` is required.
Submission recording is mandatory.
Expand All @@ -269,6 +308,11 @@ class WorkflowSubmissionDetailed(WorkflowSubmission):
workflow_id (int): Id of workflow to submit files to
files (List[str], optional): List of local file paths to submit
urls (List[str], optional): List of urls to submit
bundle (bool, optional): Batch all files under a single submission id
result_version (str, optional):
The format of the submission result file. One of:
{SUBMISSION_RESULT_VERSIONS}
If bundle is enabled, this must be version TWO or later.
Returns:
List[Submission]: Submission objects created
Expand All @@ -278,9 +322,21 @@ class WorkflowSubmissionDetailed(WorkflowSubmission):
detailed_response = True

def __init__(
self, workflow_id: int, files: List[str] = None, urls: List[str] = None
self,
workflow_id: int,
files: List[str] = None,
urls: List[str] = None,
bundle: bool = False,
result_version: str = None,
):
super().__init__(workflow_id, files=files, urls=urls, submission=True)
super().__init__(
workflow_id,
files=files,
urls=urls,
submission=True,
bundle=bundle,
result_version=result_version,
)


class _AddDataToWorkflow(GraphQLRequest):
Expand All @@ -298,8 +354,7 @@ class _AddDataToWorkflow(GraphQLRequest):

def __init__(self, workflow_id: int):
super().__init__(
self.query,
variables={"workflowId": workflow_id},
self.query, variables={"workflowId": workflow_id},
)

def process_response(self, response) -> Workflow:
Expand Down
1 change: 1 addition & 0 deletions indico/types/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@
from .jobs import *
from .model_group import *
from .model import *
from .submission_file import *
from .submission import *
from .workflow import *
19 changes: 13 additions & 6 deletions indico/types/submission.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from indico.types import BaseType
from indico.types import BaseType, List
from . import SubmissionFile

VALID_SUBMISSION_STATUSES = [
"COMPLETE",
Expand All @@ -10,13 +11,17 @@
]


SUBMISSION_RESULT_VERSIONS = ["ONE", "TWO", "OLDEST_SUPPORTED", "LATEST"]


class Submission(BaseType):
f"""
A Submission in the Indico Platform.
Submissions represent a single input which has been sent for processing by a specific workflow.
The input file is generally a PDF, and the processing broadly consists of an input processor, a
series of processors and components associated with particular docbots, and finally an output processor
Submissions represent a single input which has been sent for processing by a
specific workflow. A submission consists of SubmissionFiles, generally a PDF.
Processing broadly consists of an input processor, a series of processors and
components associated with particular docbots, and finally an output processor
to generate the result file.
Attributes:
Expand All @@ -25,8 +30,9 @@ class Submission(BaseType):
workflow_id (int): the Workflow id
status (str): status of the submission. One of
{VALID_SUBMISSION_STATUSES}
input_file (str): URL of the input datafile within the Indico Platform.
input_filename (str): name of the original file
input_files (list[SubmissionFile]): the SubmissionFiles for the Submission
input_file (str): URL of the first input datafile within the Indico Platform.
input_filename (str): name of the first original file
result_file (str): URL of the result datafile within the Indico Platform
retrieved (bool): Whether the submission has been retrieved by a user
This flag is set manually by users.
Expand All @@ -38,6 +44,7 @@ class Submission(BaseType):
dataset_id: int
workflow_id: int
status: str
input_files: List[SubmissionFile]
input_file: str
input_filename: str
result_file: str
Expand Down
22 changes: 22 additions & 0 deletions indico/types/submission_file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from indico.types import BaseType


class SubmissionFile(BaseType):
f"""
A Submission File in the Indico Platform.
Submissions files represent a single document, and can be grouped together under
a single submission to a submission to a workflow.
Attributes:
id (int): the Submission file id
filepath (str): URL of the input datafile within the Indico Platform.
filename (str): name of the original file
submission_id (int): the parent Submission id
"""

id: int
filepath: str
filename: str
submission_id: int
65 changes: 65 additions & 0 deletions tests/integration/queries/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,77 @@ def test_workflow_submission(
result = client.call(RetrieveStorageObject(result_url.result))
assert isinstance(result, dict)
assert result["submission_id"] == submission_id
assert result["file_version"] == 1
client.call(UpdateSubmission(submission_id, retrieved=True))
sub = client.call(GetSubmission(submission_id))
assert isinstance(sub, Submission)
assert sub.retrieved is True


@pytest.mark.parametrize(
"_input",
[
{"urls": [PUBLIC_URL + "mock.pdf"] * 3},
{"files": [str(Path(__file__).parents[1]) + "/data/mock.pdf"] * 3},
],
)
def test_workflow_submission_versioned(
indico, airlines_dataset, airlines_model_group: ModelGroup, _input
):
client = IndicoClient()
wfs = client.call(ListWorkflows(dataset_ids=[airlines_dataset.id]))
wf = max(wfs, key=lambda w: w.id)

submission_ids = client.call(
WorkflowSubmission(workflow_id=wf.id, result_version="LATEST", **_input)
)

assert len(submission_ids) == len(next(iter(_input.values())))
submission_id = submission_ids[0]
assert submission_id is not None

submissions = client.call(WaitForSubmissions(submission_id))
result = client.call(RetrieveStorageObject(submissions[0].result_file))

assert isinstance(result, dict)
assert result["file_version"] == 2
assert len(result["submission_results"]) == 1
assert result["submission_results"][0]["input_filename"] == "mock.pdf"


@pytest.mark.parametrize(
"_input",
[
{"urls": [PUBLIC_URL + "mock.pdf"] * 3},
{"files": [str(Path(__file__).parents[1]) + "/data/mock.pdf"] * 3},
],
)
def test_workflow_submission_bundled(
indico, airlines_dataset, airlines_model_group: ModelGroup, _input
):
client = IndicoClient()
wfs = client.call(ListWorkflows(dataset_ids=[airlines_dataset.id]))
wf = max(wfs, key=lambda w: w.id)

submission_ids = client.call(
WorkflowSubmission(
workflow_id=wf.id, bundle=True, result_version="LATEST", **_input
)
)

assert len(submission_ids) == 1
submission_id = submission_ids[0]
assert submission_id

submissions = client.call(WaitForSubmissions(submission_id))
result = client.call(RetrieveStorageObject(submissions[0].result_file))

assert isinstance(result, dict)
assert result["file_version"] == 2
assert len(result["submission_results"]) == len(next(iter(_input.values())))
assert result["submission_results"][0]["input_filename"] == "mock.pdf"


@pytest.mark.parametrize(
"_input,_output",
[
Expand Down

0 comments on commit 8321f13

Please sign in to comment.