Skip to content

Draft: Split push for blocking and non-blocking #244

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,9 @@ Commands:
share Fetch permissions to project
share-add Add permissions to [users] to project
share-remove Remove [users] permissions from project
show-file-changeset Displays information about project changes.
show-file-history Displays information about a single version of a...
show-version Displays information about a single version of a...
show-file-changeset Display information about project changes.
show-file-history Display information about a single version of a...
show-version Display information about a single version of a...
status Show all changes in project files - upstream and...
```

Expand All @@ -99,7 +99,7 @@ To download a specific version of a project:
$ mergin --username john download --version v42 john/project1 ~/mergin/project1
```

To download a sepecific version of a single file:
To download a specific version of a single file:

1. First you need to download the project:
```
Expand Down
29 changes: 15 additions & 14 deletions mergin/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,17 +412,18 @@ def push(ctx):
return
directory = os.getcwd()
try:
job = push_project_async(mc, directory)
if job is not None: # if job is none, we don't upload any files, and the transaction is finished already
with click.progressbar(length=job.total_size) as bar:
last_transferred_size = 0
while push_project_is_running(job):
time.sleep(1 / 10) # 100ms
new_transferred_size = job.transferred_size
bar.update(new_transferred_size - last_transferred_size) # the update() needs increment only
last_transferred_size = new_transferred_size
push_project_finalize(job)
click.echo("Done")
blocking_job, non_blocking_job = push_project_async(mc, directory)
for job in [blocking_job, non_blocking_job]:
if job is not None: # if job is none, we don't upload any files, and the transaction is finished already
with click.progressbar(length=job.total_size) as bar:
last_transferred_size = 0
while push_project_is_running(job):
time.sleep(1 / 10) # 100ms
new_transferred_size = job.transferred_size
bar.update(new_transferred_size - last_transferred_size) # the update() needs increment only
last_transferred_size = new_transferred_size
push_project_finalize(job)
click.echo("Done")
except InvalidProject as e:
click.secho("Invalid project directory ({})".format(str(e)), fg="red")
except ClientError as e:
Expand Down Expand Up @@ -473,7 +474,7 @@ def pull(ctx):
@click.argument("version")
@click.pass_context
def show_version(ctx, version):
"""Displays information about a single version of a project. `version` is 'v1', 'v2', etc."""
"""Display information about a single version of a project. `version` is 'v1', 'v2', etc."""
mc = ctx.obj["client"]
if mc is None:
return
Expand All @@ -492,7 +493,7 @@ def show_version(ctx, version):
@click.argument("path")
@click.pass_context
def show_file_history(ctx, path):
"""Displays information about a single version of a project."""
"""Display information about a single version of a project."""
mc = ctx.obj["client"]
if mc is None:
return
Expand All @@ -516,7 +517,7 @@ def show_file_history(ctx, path):
@click.argument("version")
@click.pass_context
def show_file_changeset(ctx, path, version):
"""Displays information about project changes."""
"""Display information about project changes."""
mc = ctx.obj["client"]
if mc is None:
return
Expand Down
11 changes: 6 additions & 5 deletions mergin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -876,11 +876,12 @@ def push_project(self, directory):
:param directory: Project's directory
:type directory: String
"""
job = push_project_async(self, directory)
if job is None:
return # there is nothing to push (or we only deleted some files)
push_project_wait(job)
push_project_finalize(job)
blocking_job, non_blocking_job = push_project_async(self, directory)
for job in [blocking_job, non_blocking_job]:
if job is None:
return # there is nothing to push (or we only deleted some files)
push_project_wait(job)
push_project_finalize(job)

def pull_project(self, directory):
"""
Expand Down
43 changes: 41 additions & 2 deletions mergin/client_push.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from .common import UPLOAD_CHUNK_SIZE, ClientError
from .merginproject import MerginProject
from .editor import filter_changes
from .utils import is_qgis_file, is_versioned_file


class UploadJob:
Expand Down Expand Up @@ -122,6 +123,23 @@ def push_project_async(mc, directory):

changes = mp.get_push_changes()
changes = filter_changes(mc, project_info, changes)

blocking_changes, non_blocking_changes = split_changes(changes)

blocking_job = (
_prepare_upload_job(mp, mc, project_path, local_version, blocking_changes)
if any(len(v) for v in blocking_changes.values())
else None
)
non_blocking_job = (
_prepare_upload_job(mp, mc, project_path, local_version, non_blocking_changes)
if any(len(v) for v in non_blocking_changes.values())
else None
)

return blocking_job, non_blocking_job

def _prepare_upload_job(mp, mc, project_path, local_version, changes):
mp.log.debug("push changes:\n" + pprint.pformat(changes))

tmp_dir = tempfile.TemporaryDirectory(prefix="python-api-client-")
Expand Down Expand Up @@ -206,10 +224,8 @@ def push_project_async(mc, directory):
for item in upload_queue_items:
future = job.executor.submit(_do_upload, item, job)
job.futures.append(future)

return job


def push_project_wait(job):
"""blocks until all upload tasks are finished"""

Expand Down Expand Up @@ -334,3 +350,26 @@ def remove_diff_files(job) -> None:
diff_file = job.mp.fpath_meta(change["diff"]["path"])
if os.path.exists(diff_file):
os.remove(diff_file)


def split_changes(changes):
"""Split changes into blocking and non-blocking.

Blocking criteria:
- any updated files
- any removed files
- added files that are .gpkg or .qgz (.ggs)
"""
blocking = non_blocking = {"added": [], "updated": [], "removed": [], "renamed": []}

blocking["updated"] = changes["updated"]
blocking["removed"] = changes["removed"]
blocking["renamed"] = changes["renamed"]

for f in changes["added"]:
if is_qgis_file(f["path"]) or is_versioned_file(f["path"]):
blocking["added"].append(f)
else:
non_blocking["added"].append(f)

return blocking, non_blocking
2 changes: 1 addition & 1 deletion mergin/common.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import os
from enum import Enum

CHUNK_SIZE = 100 * 1024 * 1024
CHUNK_SIZE = 10 * 1024 * 1024

# there is an upper limit for chunk size on server, ideally should be requested from there once implemented
UPLOAD_CHUNK_SIZE = 10 * 1024 * 1024
Expand Down
2 changes: 1 addition & 1 deletion mergin/test/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ def test_cancel_push(mc):
assert next((f for f in push_changes["updated"] if f["path"] == f_updated), None)

# start pushing and then cancel the job
job = push_project_async(mc, project_dir)
job, _ = push_project_async(mc, project_dir)
push_project_cancel(job)

# if cancelled properly, we should be now able to do the push without any problem
Expand Down
Loading