Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Schema: rewrite to run local only and handle newer schema versions #3117

Open
wants to merge 12 commits into
base: dev
Choose a base branch
from
2 changes: 1 addition & 1 deletion .prettierignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ nf_core/pipeline-template/tower.yml
# don't run on things handled by ruff
*.py
*.pyc

web-gui
1 change: 1 addition & 0 deletions MANIFEST.in
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ include nf_core/assets/logo/nf-core-repo-logo-base-darkbg.png
include nf_core/assets/logo/placeholder_logo.svg
include nf_core/assets/logo/MavenPro-Bold.ttf
include nf_core/pipelines/create/create.tcss
recursive-include web-gui *
4 changes: 2 additions & 2 deletions nf_core/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -738,7 +738,7 @@ def command_pipelines_schema_validate(pipeline, params):
@click.option(
"--url",
type=str,
default="https://nf-co.re/pipeline_schema_builder",
default="http://localhost:8000/process-schema",
help="Customise the builder URL (for development work)",
)
def command_pipelines_schema_build(directory, no_prompts, web_only, url):
Expand Down Expand Up @@ -1687,7 +1687,7 @@ def command_schema_validate(pipeline, params):
@click.option(
"--url",
type=str,
default="https://nf-co.re/pipeline_schema_builder",
default="http://localhost:8000/process-schema",
help="Customise the builder URL (for development work)",
)
def command_schema_build(directory, no_prompts, web_only, url):
Expand Down
4 changes: 3 additions & 1 deletion nf_core/components/update.py
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,9 @@ def get_single_component_info(self, component):

sha = self.sha
config_entry = None
if self.update_config is not None:
if self.update_config is None:
raise UserWarning("Could not find '.nf-core.yml' file in pipeline directory")
else:
if any(
[
entry.count("/") == 1
Expand Down
40 changes: 25 additions & 15 deletions nf_core/pipelines/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from pathlib import Path
from typing import Union

import git
import jinja2
import jsonschema
import markdown
Expand All @@ -17,6 +18,7 @@
from rich.syntax import Syntax

import nf_core.pipelines.list
import nf_core.server
import nf_core.utils
from nf_core.pipelines.lint_utils import dump_json_with_prettier, run_prettier_on_file

Expand All @@ -43,7 +45,7 @@ def __init__(self):
self.schema_from_scratch = False
self.no_prompts = False
self.web_only = False
self.web_schema_build_url = "https://nf-co.re/pipeline_schema_builder"
self.web_schema_build_url = "http://localhost:8000/process-schema"
self.web_schema_build_web_url = None
self.web_schema_build_api_url = None

Expand Down Expand Up @@ -643,22 +645,14 @@ def build_schema(self, pipeline_dir, no_prompts, web_only, url):

# If running interactively, send to the web for customisation
if not self.no_prompts:
if Confirm.ask(":rocket: Launch web builder for customisation and editing?"):
if Confirm.ask(":rocket: Launch web builder for customisation and editing?", default=True):
try:
self.launch_web_builder()
except AssertionError as e:
log.error(e.args[0])
# Extra help for people running offline
if "Could not connect" in e.args[0]:
log.info(
f"If you're working offline, now copy your schema ({self.schema_filename}) and paste at https://nf-co.re/pipeline_schema_builder"
)
log.info("When you're finished, you can paste the edited schema back into the same file")
if self.web_schema_build_web_url:
log.info(
"To save your work, open {}\n"
f"Click the blue 'Finished' button, copy the schema and paste into this file: { self.web_schema_build_web_url, self.schema_filename}"
)
log.info("Could not connect to the web builder")
return False

def get_wf_params(self):
Expand Down Expand Up @@ -869,23 +863,38 @@ def launch_web_builder(self):
"""
Send pipeline schema to web builder and wait for response
"""
# check if self.schema_filename doesn't have uncommited changes using git python
if self.schema_filename and self.pipeline_dir:
repo = git.Repo(self.pipeline_dir)
if str(Path(self.schema_filename).relative_to(self.pipeline_dir)) in [
item.a_path for item in repo.index.diff(None)
]:
if not Confirm.ask(
f""":exclamation_mark: '{str(self.schema_filename)}' has uncommitted changes. These will be overwritten in the following steps. Do you still want to continue""",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is always thrown? I just tested with a fresh pipeline and I see this logging:

INFO     [✓] Default parameters match schema validation                                                                       
INFO     [✓] Pipeline schema looks valid (found 32 params)                                                                    
INFO     Writing schema with 32 params: 'nextflow_schema.json'                                                                
🚀  Launch web builder for customisation and editing? [y/n] (y): 
❗  'nextflow_schema.json' has uncommitted changes. These will be overwritten in the following steps. Do you still want to 
continue [y/n] (y): 

default=True,
):
return

nf_core.server.start_server()
log.info("Sending pipeline schema to nf-core web builder for customisation")

content = {
"post_content": "json_schema",
"api": "true",
"version": nf_core.__version__,
"status": "waiting_for_user",
"schema": json.dumps(self.schema),
"schema_path": str(self.schema_filename),
}
web_response = nf_core.utils.poll_nfcore_web_api(self.web_schema_build_url, content)
try:
if "api_url" not in web_response:
raise AssertionError('"api_url" not in web_response')
if "web_url" not in web_response:
raise AssertionError('"web_url" not in web_response')
# DO NOT FIX THIS TYPO. Needs to stay in sync with the website. Maintaining for backwards compatability.
if web_response["status"] != "recieved":
if web_response["status"] != "received":
raise AssertionError(
f'web_response["status"] should be "recieved", but it is "{web_response["status"]}"'
f'web_response["status"] should be "received", but it is "{web_response["status"]}"'
)
except AssertionError:
log.debug(f"Response content:\n{json.dumps(web_response, indent=4)}")
Expand Down Expand Up @@ -914,7 +923,8 @@ def get_web_builder_response(self):
if web_response["status"] == "web_builder_edited":
log.info("Found saved status from nf-core pipelines schema builder")
try:
self.schema = web_response["schema"]
self.schema = web_response["data"]
log.debug(f"Schema from web builder:\n{json.dumps(self.schema, indent=4)}")
self.remove_schema_empty_definitions()
self.validate_schema()
except AssertionError as e:
Expand Down
140 changes: 140 additions & 0 deletions nf_core/server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
import json
import logging
import threading
import urllib.parse as urlparse
from http import HTTPStatus
from http.server import HTTPServer, SimpleHTTPRequestHandler
from pathlib import Path
from typing import Dict
from urllib.parse import parse_qsl

from nf_core.utils import NFCORE_CACHE_DIR

log: logging.Logger = logging.getLogger(__name__)


def parse_qsld(query: str) -> Dict:
return dict(parse_qsl(query))


class MyHandler(SimpleHTTPRequestHandler):
status = "waiting_for_user" # Default status

def __init__(self, *args, **kwargs):
super().__init__(*args, directory="web-gui", **kwargs)

def send_cors_headers(self):
self.send_header("Access-Control-Allow-Origin", "http://localhost:4321")
self.send_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
self.send_header("Access-Control-Allow-Headers", "X-Requested-With, Content-Type, Accept, message")

def do_OPTIONS(self): # noqa: N802
self.send_response(HTTPStatus.NO_CONTENT)
self.send_cors_headers()
self.end_headers()

def _send_response(self, status_code: int, body: Dict) -> None:
self.send_response(status_code)
self.send_header("Content-type", "application/json")
self.send_cors_headers()
self.end_headers()
self.wfile.write(json.dumps(body).encode())

def do_POST(self) -> None: # noqa: N802
log.debug("POST request received")
content_type = self.headers.get("Content-Type")
content_length = int(str(self.headers.get("Content-Length")))
post_data = self.rfile.read(content_length)
if urlparse.urlparse(self.path).path == "/process-schema":
if content_type == "application/json":
data = json.loads(post_data.decode())
schema_path = data.get("schema_path", None)
# write data to local schema_file
open(schema_path, "w").write(json.dumps(data["schema"], indent=4))

else:
data = parse_qsld(post_data.decode())

data["schema"] = json.loads(data.get("schema", None))
schema_path = data.get("schema_path", None)
# write data to local schema_file
open(schema_path, "w").write(json.dumps(data["schema"], indent=4))
status = data.get("status", "received")
MyHandler.status = status
if status == "waiting_for_user":
status = "received"

self._send_response(
200,
{
"message": "Data stored successfully",
"status": status,
"schema_path": schema_path,
"web_url": "http://localhost:8000/schema_builder.html?schema_path="
+ urlparse.quote(schema_path, safe=""),
"api_url": "http://localhost:8000/process-schema?schema_path="
+ urlparse.quote(schema_path, safe=""),
},
)
else:
self._send_response(404, {"error": "Not Found"})

def do_GET(self) -> None: # noqa: N802
parsed = urlparse.urlparse(self.path)
if parsed.path == "/process-schema":
schema_path: str | None = parse_qsld(parsed.query).get("schema_path", None)
if schema_path is None:
self._send_response(422, {"error": "schema_path parameter not found"})

else:
with open(schema_path) as file:
data = json.load(file)
if data is None:
self._send_response(404, {"error": "Not Found"})
else:
self._send_response(
200, {"message": "GET request received", "status": MyHandler.status, "data": data}
)
else:
super().do_GET()

def log_message(self, format, *args):
log.debug(format % args)


def run(
server_class=HTTPServer,
handler_class=MyHandler,
):
global server_instance

server_address = ("localhost", 8000)
log.info(f"Starting server on http://{server_address[0]}:{server_address[1]}")
server_instance = server_class(server_address, handler_class)
Path(NFCORE_CACHE_DIR / "schema").mkdir(parents=True, exist_ok=True)

try:
server_instance.serve_forever()
except KeyboardInterrupt:
pass
finally:
log.info("Server loop stopped")


def start_server():
server_thread = threading.Thread(target=run, daemon=True)
server_thread.start()
return server_thread


def stop_server():
global server_instance

if server_instance:
log.info("Stopping server...")
server_instance.shutdown()
server_instance.server_close()
server_instance = None
log.info("Server stopped")
else:
log.warning("No server instance to stop")
9 changes: 7 additions & 2 deletions nf_core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,11 @@ def poll_nfcore_web_api(api_url: str, post_data: Optional[Dict] = None) -> Dict:
if post_data is None:
response = requests.get(api_url, headers={"Cache-Control": "no-cache"})
else:
response = requests.post(url=api_url, data=post_data)
log.debug(f"Sending POST request to {api_url} with data: {post_data}")
response = requests.post(
url=api_url,
data=post_data,
)
except requests.exceptions.Timeout:
raise AssertionError(f"URL timed out: {api_url}")
except requests.exceptions.ConnectionError:
Expand All @@ -451,7 +455,7 @@ def poll_nfcore_web_api(api_url: str, post_data: Optional[Dict] = None) -> Dict:
response_content = response.content
if isinstance(response_content, bytes):
response_content = response_content.decode()
log.debug(f"Response content:\n{response_content}")
log.debug(f"Response status_code: {response.status_code}, Response content:\n{response_content}")
raise AssertionError(
f"Could not access remote API results: {api_url} (HTML {response.status_code} Error)"
)
Expand All @@ -472,6 +476,7 @@ def poll_nfcore_web_api(api_url: str, post_data: Optional[Dict] = None) -> Dict:
"See verbose log for full response"
)
else:
log.debug(f"Got response from nf-core API: {web_response['status']}")
return web_response


Expand Down
Loading