Skip to content

Commit

Permalink
Merge branch 'master' into stable
Browse files Browse the repository at this point in the history
  • Loading branch information
Xarthisius committed Dec 20, 2022
2 parents 784e87c + 23b350b commit 58d890a
Show file tree
Hide file tree
Showing 10 changed files with 898 additions and 656 deletions.
2 changes: 1 addition & 1 deletion Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ girder-client = "==2.4.0"
girder-worker = "==0.5.0"
redis = "==3.5.3"
requests = "*"
girderfs = {git = "https://github.com/whole-tale/girderfs",ref = "v1.2rc1"}
girderfs = {git = "https://github.com/whole-tale/girderfs",ref = "v1.2rc2"}
pyOpenSSL = "*"
python-dateutil = "*"
PyYAML = "*"
Expand Down
1,236 changes: 657 additions & 579 deletions Pipfile.lock

Large diffs are not rendered by default.

80 changes: 51 additions & 29 deletions gwvolman/build_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,28 @@
from urllib.parse import urlparse

from .constants import R2D_FILENAMES
from .utils import _get_container_config, DEPLOYMENT, _get_stata_license_path
from .utils import (
_get_container_config,
DEPLOYMENT,
_get_stata_license_path,
DummyTask,
stop_container
)


class DockerHelper:
def __init__(self):
def __init__(self, auth=True):
username = os.environ.get("REGISTRY_USER", "fido")
password = os.environ.get("REGISTRY_PASS")
self.cli = docker.from_env(version="1.28")
self.cli.login(
username=username, password=password, registry=DEPLOYMENT.registry_url
)
self.apicli = docker.APIClient(base_url="unix://var/run/docker.sock")
self.apicli.login(
username=username, password=password, registry=DEPLOYMENT.registry_url
)
if auth:
self.cli.login(
username=username, password=password, registry=DEPLOYMENT.registry_url
)
self.apicli.login(
username=username, password=password, registry=DEPLOYMENT.registry_url
)


class ImageBuilder:
Expand All @@ -40,21 +47,21 @@ def build_context(self):
def engine(self):
# See https://github.com/whole-tale/repo2docker_wholetale/pull/44
tag = self.container_config.repo2docker_version.rsplit(":")[-1]
r2d_version = version.parse(tag[1:])

if isinstance(
r2d_version,
version.LegacyVersion, # i.e. not something following v{version}
) or r2d_version >= version.Version("1.2dev0"):
return "--engine dockercli"
return ""
try:
if version.parse(tag[1:]) < version.Version("1.2dev0"):
return ""
except version.InvalidVersion:
# i.e. not something following v{version} which in our case
# will be either "latest" or some specific manual tag
pass
return "--engine dockercli"

def __init__(self, gc, imageId=None, tale=None):
def __init__(self, gc, imageId=None, tale=None, auth=True):
if (imageId is None) == (tale is None):
raise ValueError("Only one of 'imageId' and 'tale' can be set")

self.gc = gc
self.dh = DockerHelper()
self.dh = DockerHelper(auth=auth)
if tale is None:
tale = {
"_id": None,
Expand All @@ -74,7 +81,8 @@ def pull_r2d(self):
)

def _create_build_context(self):
temp_dir = tempfile.mkdtemp(dir=os.environ.get("HOSTDIR", "/host") + "/tmp")
tmp_path = os.path.join(os.environ.get("HOSTDIR", "/host"), "tmp")
temp_dir = tempfile.mkdtemp(dir=tmp_path)
logging.info(
"Downloading r2d files to %s (taleId:%s)", temp_dir, self.tale["_id"]
)
Expand Down Expand Up @@ -144,25 +152,24 @@ def get_tag(self, force=False):
self.build_context,
dry_run=True,
)
if ret["StatusCode"] != 0:
raise ValueError(f"Failed to compute a tag {ret=}")

# Remove the temporary directory, cause we want entire workspace for build
# NOTE: or maybe not? That would avoid bloating image with things we override anyway
# shutil.rmtree(self.build_context, ignore_errors=True)

return f"{registry_netloc}/tale/{env_hash.hexdigest()}:{output_digest}"

def run_r2d(
self,
tag,
build_dir,
dry_run=False,
):
def run_r2d(self, tag, build_dir, dry_run=False, task=None):
"""
Run repo2docker on the workspace using a shared temp directory. Note that
this uses the "local" provider. Use the same default user-id and
user-name as BinderHub
"""

task = task or DummyTask

# Extra arguments for r2d
extra_args = ""
if self.container_config.buildpack == "MatlabBuildPack":
Expand All @@ -173,8 +180,11 @@ def run_r2d(
# License is also needed at build time but can't easily
# be mounted. Pass it as a build arg

source_path = _get_stata_license_path()
with open("/host/" + source_path, "r") as license_file:
source_path = os.path.join(
os.environ.get("HOSTDIR", "/host"),
_get_stata_license_path()
)
with open(source_path, "r") as license_file:
stata_license = license_file.read()
encoded = base64.b64encode(stata_license.encode("ascii")).decode(
"ascii"
Expand All @@ -196,9 +206,13 @@ def run_r2d(

volumes = {
"/var/run/docker.sock": {"bind": "/var/run/docker.sock", "mode": "rw"},
"/tmp": {"bind": "/host/tmp", "mode": "ro"},
"/tmp": {
"bind": os.path.join(os.environ.get("HOSTDIR", "/host"), "tmp"),
"mode": "ro"
},
}

print(f"Using repo2docker {self.container_config.repo2docker_version}")
container = self.dh.cli.containers.run(
image=self.container_config.repo2docker_version,
command=r2d_cmd,
Expand All @@ -212,13 +226,21 @@ def run_r2d(
# Job output must come from stdout/stderr
h = hashlib.md5("R2D output".encode())
for line in container.logs(stream=True):
if task.canceled:
task.request.chain = None
stop_container(container)
break
output = line.decode("utf-8").strip()
if not output.startswith("Using local repo"): # contains variable path
h.update(output.encode("utf-8"))
if not dry_run: # We don't want to see it.
print(output)

ret = container.wait()
try:
ret = container.wait()
except docker.errors.NotFound:
ret = {"StatusCode": -123}

if ret["StatusCode"] != 0:
logging.error("Error building image")
# Since detach=True, then we need to explicitly check for the
Expand Down
4 changes: 2 additions & 2 deletions gwvolman/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

REPO2DOCKER_VERSION = os.environ.get(
"REPO2DOCKER_VERSION",
"wholetale/repo2docker_wholetale:v1.2rc1"
"wholetale/repo2docker_wholetale:v1.2rc2"
)
CPR_VERSION = os.environ.get("CPR_VERSION", "wholetale/wt-cpr:latest")

Expand Down Expand Up @@ -92,4 +92,4 @@ class RunStatus(object):
RUNNING = 2
COMPLETED = 3
FAILED = 4
CANCELLED = 5
CANCELED = 5
106 changes: 76 additions & 30 deletions gwvolman/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,9 +356,13 @@ def build_tale_image(task, tale_id, force=False):

# Prepare build context
# temp_dir = tempfile.mkdtemp(dir=HOSTDIR + "/tmp")
ret, _ = image_builder.run_r2d(tag, image_builder.build_context)
ret, _ = image_builder.run_r2d(tag, image_builder.build_context, task=task)
if task.canceled:
task.request.chain = None
logging.info("Build canceled.")
return

if ret['StatusCode'] != 0:
if ret["StatusCode"] != 0:
# repo2docker build failed
raise ValueError('Error building tale {}'.format(tale_id))

Expand Down Expand Up @@ -431,6 +435,7 @@ def publish(self,
if provider.published and provider.publication_info.get("versionId") == version_id:
raise ValueError(f"This version of the Tale ({version_id}) has already been published.")
provider.publish()
return provider.publication_info


@girder_job(title='Import Tale')
Expand Down Expand Up @@ -699,6 +704,7 @@ def _write_env_json(workspace_dir, image):
def recorded_run(self, run_id, tale_id, entrypoint):
"""Start a recorded run for a tale version"""
run = self.girder_client.get(f"/run/{run_id}")
state = RecordedRunCleaner(run, self.girder_client)
tale = self.girder_client.get(
f"/tale/{tale_id}/restore", parameters={"versionId": run["runVersionId"]}
)
Expand All @@ -720,16 +726,20 @@ def set_run_status(run, status):
# Create Docker volume
vol_name = "%s_%s_%s" % (run_id, user['login'], new_user(6))
mountpoint = _create_docker_volume(image_builder.dh.cli, vol_name)
state.volume_created = image_builder.dh.cli.volumes.get(vol_name)

# Create fuse directories
_make_fuse_dirs(mountpoint, ['data', 'workspace'])

# Mount data and workspace for the run
api_key = _get_api_key(self.girder_client)
session = _get_session(self.girder_client, version_id=run['runVersionId'])
state.session_created = session["_id"]

if session['_id'] is not None:
_mount_girderfs(mountpoint, 'data', 'wt_dms', session['_id'], api_key, hostns=True)
_mount_bind(mountpoint, "run", {"runId": run["_id"], "taleId": tale["_id"]})
state.fs_mounted = mountpoint

# Build the image for the run
self.job_manager.updateProgress(
Expand All @@ -742,59 +752,95 @@ def set_run_status(run, status):
"Computed tag: %s (taleId:%s, versionId:%s)", tag, tale_id, run["runVersionId"]
)

work_dir = os.path.join(mountpoint, 'workspace')
image = self.girder_client.get('/image/%s' % tale['imageId'])
env_json = _write_env_json(work_dir, image)

# Build currently assumes tmp directory, in this case mount the run workspace
container_config = image_builder.container_config
# TODO: What should we use here? Latest? What the tale was built with?
# repo2docker_version = REPO2DOCKER_VERSION
print(f"Using repo2docker {container_config.repo2docker_version}")
work_target = os.path.join(container_config.target_mount, 'workspace')

if self.canceled:
state.cleanup()
return

try:
if not image_builder.cached_image(tag):
print("Building image for recorded run " + tag)
ret, _ = image_builder.run_r2d(tag, work_target)
ret, _ = image_builder.run_r2d(tag, image_builder.build_context)
if self.canceled:
state.cleanup()
return
if ret['StatusCode'] != 0:
raise ValueError('Image build failed for recorded run {}'.format(run_id))
for line in image_builder.dh.apicli.push(tag, stream=True):
print(line.decode('utf-8'))

self.job_manager.updateProgress(
message='Recording run', total=RECORDED_RUN_STEP_TOTAL,
current=3, forceFlush=True)

set_run_status(run, RunStatus.RUNNING)
_recorded_run(image_builder.dh.cli, mountpoint, container_config, tag, entrypoint)
_recorded_run(
image_builder.dh.cli,
mountpoint,
container_config,
tag,
entrypoint,
task=self
)
if self.canceled:
state.cleanup()
return

set_run_status(run, RunStatus.COMPLETED)
self.job_manager.updateProgress(
message='Finished recorded run', total=RECORDED_RUN_STEP_TOTAL,
current=4, forceFlush=True)

except Exception as exc:
logging.error(exc, exc_info=True)
raise
finally:
# Remove the environment.json
os.remove(env_json)
state.cleanup(False)

# TODO: _cleanup_volumes
for suffix in ['data', 'workspace']:
dest = os.path.join(mountpoint, suffix)
logging.info("Unmounting %s", dest)
subprocess.call("umount %s" % dest, shell=True)

# Delete the session
try:
self.girder_client.delete('/dm/session/{}'.format(session['_id']))
except Exception as e:
logging.error("Unable to remove session. %s", e)
class RecordedRunCleaner:
volume_created = None
fs_mounted = None
session_created = None

# Delete the Docker volume
try:
volume = image_builder.dh.cli.volumes.get(vol_name)
def __init__(self, run, gc):
self.gc = gc
self.run = run

def set_run_status(self, status):
self.gc.patch(
"/run/{_id}/status".format(**self.run), parameters={'status': status}
)

def cleanup(self, canceled=True):

if self.fs_mounted:
for suffix in ["data", "workspace"]:
dest = os.path.join(self.fs_mounted, suffix)
logging.info("Unmounting %s", dest)
subprocess.call("umount %s" % dest, shell=True)
self.fs_mounted = None

if self.session_created:
# Delete the session
try:
self.gc.delete('/dm/session/{}'.format(self.session_created))
except Exception as e:
logging.error("Unable to remove session. %s", e)
self.session_created = None

if self.volume_created:
volume = self.volume_created
# Delete the Docker volume
try:
logging.info("Removing volume: %s", volume.id)
volume.remove()
except Exception as e:
logging.error("Unable to remove volume [%s]: %s", volume.id, e)
except docker.errors.NotFound:
logging.info("Volume not present [%s].", vol_name)
except docker.errors.NotFound:
logging.info("Volume not present [%s].", volume.id)
self.volume_created = None

if canceled:
self.set_run_status(RunStatus.CANCELED)
Loading

0 comments on commit 58d890a

Please sign in to comment.