Skip to content

Commit

Permalink
merge the develop branch and fix conflict
Browse files Browse the repository at this point in the history
  • Loading branch information
linchiahuisage committed Apr 30, 2021
2 parents 922106c + 53eef1f commit 56fca24
Show file tree
Hide file tree
Showing 14 changed files with 354 additions and 65 deletions.
9 changes: 8 additions & 1 deletion synapseclient/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ def get(args, syn):
if args.recursive:
if args.version is not None:
raise ValueError('You cannot specify a version making a recursive download.')
_validate_id_arg(args)
synapseutils.syncFromSynapse(syn, args.id, args.downloadLocation, followLink=args.followLink,
manifest=args.manifest)
elif args.queryString is not None:
Expand All @@ -101,6 +102,7 @@ def get(args, syn):
for id in ids:
syn.get(id, downloadLocation=args.downloadLocation)
else:
_validate_id_arg(args)
# search by MD5
if isinstance(args.id, str) and os.path.isfile(args.id):
entity = syn.get(args.id, version=args.version, limitSearch=args.limitSearch, downloadFile=False)
Expand All @@ -121,6 +123,11 @@ def get(args, syn):
syn.logger.info('Creating %s', entity.path)


def _validate_id_arg(args):
if args.id is None:
raise ValueError(f'Missing expected id argument for use with the {args.subparser} command')


def sync(args, syn):
synapseutils.syncToSynapse(syn, manifestFile=args.manifestFile,
dryRun=args.dryRun, sendMessages=args.sendMessages,
Expand Down Expand Up @@ -586,7 +593,7 @@ def build_parser():
default=True, help='Download file using a multiple threaded implementation. '
'This flag will be removed in the future when multi-threaded download '
'is deemed fully stable and becomes the default implementation.')
parser_get.add_argument('id', metavar='syn123', nargs='?', type=str,
parser_get.add_argument('id', metavar='local path', nargs='?', type=str,
help='Synapse ID of form syn123 of desired data object.')
# add no manifest option
parser_get.add_argument('--manifest', type=str, choices=['all', 'root', 'suppress'],
Expand Down
63 changes: 48 additions & 15 deletions synapseclient/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,12 @@
from synapseclient.core.pool_provider import DEFAULT_NUM_THREADS
from synapseclient.core.utils import id_of, get_properties, MB, memoize, is_json, extract_synapse_id_from_query, \
find_data_file_handle, extract_zip_file_to_directory, is_integer, require_param
from synapseclient.core.retry import with_retry
from synapseclient.core.retry import (
with_retry,
DEFAULT_RETRY_STATUS_CODES,
RETRYABLE_CONNECTION_ERRORS,
RETRYABLE_CONNECTION_EXCEPTIONS,
)
from synapseclient.core import sts_transfer
from synapseclient.core.upload.multipart_upload import multipart_upload_file, multipart_upload_string
from synapseclient.core.remote_file_storage_wrappers import S3ClientWrapper, SFTPWrapper
Expand Down Expand Up @@ -127,12 +132,9 @@

# Defines the standard retry policy applied to the rest methods
# The retry period needs to span a minute because sending messages is limited to 10 per 60 seconds.
STANDARD_RETRY_PARAMS = {"retry_status_codes": [429, 500, 502, 503, 504],
"retry_errors": ["proxy error", "slow down", "timeout", "timed out",
"connection reset by peer", "unknown ssl protocol error",
"couldn't connect to host", "slowdown", "try again",
"connection reset by peer"],
"retry_exceptions": ["ConnectionError", "Timeout", "timeout", "ChunkedEncodingError"],
STANDARD_RETRY_PARAMS = {"retry_status_codes": DEFAULT_RETRY_STATUS_CODES,
"retry_errors": RETRYABLE_CONNECTION_ERRORS,
"retry_exceptions": RETRYABLE_CONNECTION_EXCEPTIONS,
"retries": 60, # Retries for up to about 30 minutes
"wait": 1,
"max_wait": 30,
Expand Down Expand Up @@ -907,7 +909,8 @@ def _download_file_entity(self, downloadLocation, entity, ifcollision, submissio
if downloadPath is None or not os.path.exists(downloadPath):
return

entity.path = downloadPath
# converts the path format from forward slashes back to backward slashes on Windows
entity.path = os.path.normpath(downloadPath)
entity.files = [os.path.basename(downloadPath)]
entity.cacheDir = os.path.dirname(downloadPath)

Expand Down Expand Up @@ -996,7 +999,15 @@ def store(self, obj, *, createOrUpdate=True, forceVersion=True, versionLabel=Non
# _synapse_store hook
# for objects that know how to store themselves
if hasattr(obj, '_synapse_store'):
return obj._synapse_store(self)
obj = obj._synapse_store(self)
return self._apply_provenance(
obj,
activity=activity,
used=used,
executed=executed,
activityName=activityName,
activityDescription=activityDescription,
)

# Handle all non-Entity objects
if not (isinstance(obj, Entity) or type(obj) == dict):
Expand Down Expand Up @@ -1152,6 +1163,30 @@ def store(self, obj, *, createOrUpdate=True, forceVersion=True, versionLabel=Non
annotations = self.set_annotations(Annotations(properties['id'], properties['etag'], annotations))
properties['etag'] = annotations.etag

properties = self._apply_provenance(
properties,
activity=activity,
used=used,
executed=executed,
activityName=activityName,
activityDescription=activityDescription,
)

# Return the updated Entity object
entity = Entity.create(properties, annotations, local_state)
return self.get(entity, downloadFile=False)

def _apply_provenance(
self,
entity,
activity=None,
used=None,
executed=None,
activityName=None,
activityDescription=None
):
# apply any provenance passed to via the store method to the entity

# If the parameters 'used' or 'executed' are given, create an Activity object
if used or executed:
if activity is not None:
Expand All @@ -1163,14 +1198,12 @@ def store(self, obj, *, createOrUpdate=True, forceVersion=True, versionLabel=Non

# If we have an Activity, set it as the Entity's provenance record
if activity:
self.setProvenance(properties, activity)
self.setProvenance(entity, activity)

# 'etag' has changed, so get the new Entity
properties = self._getEntity(properties)
entity = self._getEntity(entity)

# Return the updated Entity object
entity = Entity.create(properties, annotations, local_state)
return self.get(entity, downloadFile=False)
return entity

def _createAccessRequirementIfNone(self, entity):
"""
Expand Down Expand Up @@ -3187,7 +3220,7 @@ def create_snapshot_version(self, table: typing.Union[EntityViewSchema, Schema,
:param label: Optional snapshot label.
:param activity: Optional activity ID applied to snapshot version.
:param wait: True if this method should return the snapshot version after waiting for any necessary
asynchronous table updates to complete. If False this method will return return
asynchronous table updates to complete. If False this method will return
as soon as any updates are initiated.
:return: the snapshot version number if wait=True, None if wait=False
"""
Expand Down
41 changes: 30 additions & 11 deletions synapseclient/core/multithread_download/download_threads.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@

from synapseclient.core.exceptions import SynapseError
from synapseclient.core.pool_provider import get_executor
from synapseclient.core.retry import (
with_retry,
RETRYABLE_CONNECTION_ERRORS,
RETRYABLE_CONNECTION_EXCEPTIONS,
)

# constants
MAX_QUEUE_SIZE: int = 20
MAX_RETRIES: int = 20
MiB: int = 2 ** 20
SYNAPSE_DEFAULT_DOWNLOAD_PART_SIZE: int = 8 * MiB
ISO_AWS_STR_FORMAT: str = '%Y%m%dT%H%M%SZ'
Expand Down Expand Up @@ -324,16 +328,31 @@ def download_file(self, request):
def _get_response_with_retry(presigned_url_provider, start: int, end: int) -> Response:
session = _get_thread_session()
range_header = {'Range': f'bytes={start}-{end}'}
response = session.get(presigned_url_provider.get_info().url, headers=range_header, stream=True)
# try request until successful or out of retries
try_counter = 1
while response.status_code != HTTPStatus.PARTIAL_CONTENT:
if try_counter >= MAX_RETRIES:
raise SynapseError(
f'Could not download the file: {presigned_url_provider.get_info().file_name},'
f' please try again.')
response = session.get(presigned_url_provider.get_info().url, headers=range_header, stream=True)
try_counter += 1

def session_get():
return session.get(presigned_url_provider.get_info().url, headers=range_header)

response = None
cause = None
try:
# currently when doing a range request to AWS we retry on anything other than a 206.
# this seems a bit excessive (i.e. some 400 statuses would suggest a non-retryable condition)
# but for now matching previous behavior.
response = with_retry(
session_get,
expected_status_codes=(HTTPStatus.PARTIAL_CONTENT,),
retry_errors=RETRYABLE_CONNECTION_ERRORS,
retry_exceptions=RETRYABLE_CONNECTION_EXCEPTIONS,
)
except Exception as ex:
cause = ex

if not response or response.status_code != HTTPStatus.PARTIAL_CONTENT:
raise SynapseError(
f'Could not download the file: {presigned_url_provider.get_info().file_name},'
f' please try again.'
) from cause

return start, response

@staticmethod
Expand Down
57 changes: 47 additions & 10 deletions synapseclient/core/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,54 @@
from synapseclient.core.utils import is_json
from synapseclient.core.dozer import doze

DEFAULT_RETRIES = 3
DEFAULT_WAIT = 1
DEFAULT_BACK_OFF = 2
DEFAULT_MAX_WAIT = 30

DEFAULT_RETRY_STATUS_CODES = [429, 500, 502, 503, 504]

# strings that may appear in responses that suggest a retryable condition
RETRYABLE_CONNECTION_ERRORS = [
"proxy error",
"slow down",
"timeout",
"timed out",
"connection reset by peer",
"unknown ssl protocol error",
"couldn't connect to host",
"slowdown",
"try again",
"connection reset by peer",
]

# Exceptions that may be retryable. These are socket level exceptions
# not associated with an HTTP response
RETRYABLE_CONNECTION_EXCEPTIONS = [
"ChunkedEncodingError",
"ConnectionError",
'ConnectionResetError',
"Timeout",
"timeout",
]


def with_retry(function, verbose=False,
retry_status_codes=[429, 500, 502, 503, 504], retry_errors=[], retry_exceptions=[],
retries=3, wait=1, back_off=2, max_wait=30):
retry_status_codes=[429, 500, 502, 503, 504], expected_status_codes=[],
retry_errors=[], retry_exceptions=[],
retries=DEFAULT_RETRIES, wait=DEFAULT_WAIT, back_off=DEFAULT_BACK_OFF, max_wait=DEFAULT_MAX_WAIT):
"""
Retries the given function under certain conditions.
:param function: A function with no arguments. If arguments are needed, use a lambda (see example).
:param retry_status_codes: What status codes to retry upon in the case of a SynapseHTTPError.
:param retry_errors: What reasons to retry upon, if function().response.json()['reason'] exists.
:param retry_exceptions: What types of exceptions, specified as strings, to retry upon.
:param retries: How many times to retry maximum.
:param wait: How many seconds to wait between retries.
:param back_off: Exponential constant to increase wait for between progressive failures.
:param function: A function with no arguments. If arguments are needed, use a lambda (see example).
:param retry_status_codes: What status codes to retry upon in the case of a SynapseHTTPError.
:param expected_status_codes: If specified responses with any other status codes result in a retry.
:param retry_errors: What reasons to retry upon, if function().response.json()['reason'] exists.
:param retry_exceptions: What types of exceptions, specified as strings or Exception classes, to retry upon.
:param retries: How many times to retry maximum.
:param wait: How many seconds to wait between retries.
:param back_off: Exponential constant to increase wait for between progressive failures.
:param max_wait: back_off between requests will not exceed this value
:returns: function()
Expand Down Expand Up @@ -55,7 +89,10 @@ def foo(a, b, c): return [a, b, c]

# Check if we got a retry-able error
if response is not None and hasattr(response, 'status_code'):
if response.status_code in retry_status_codes:
if (
(expected_status_codes and response.status_code not in expected_status_codes) or
(retry_status_codes and response.status_code in retry_status_codes)
):
response_message = _get_message(response)
retry = True
logger.debug("retrying on status code: %s" % str(response.status_code))
Expand Down
3 changes: 0 additions & 3 deletions synapseclient/evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,7 @@ def getURI(cls, id):
return '/evaluation/%s' % id

def __init__(self, **kwargs):
kwargs['status'] = kwargs.get('status', 'OPEN')
kwargs['contentSource'] = kwargs.get('contentSource', '')
if kwargs['status'] not in ['OPEN', 'PLANNED', 'CLOSED', 'COMPLETED']:
raise ValueError('Evaluation Status must be one of [OPEN, PLANNED, CLOSED, COMPLETED]')
if not kwargs['contentSource'].startswith('syn'): # Verify that synapse Id given
raise ValueError('The "contentSource" parameter must be specified as a Synapse Entity when creating an'
' Evaluation')
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/synapseclient/integration_test_Entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ def test_download_local_file_URL_path(syn, project, schedule_for_cleanup):

localFileEntity = syn.store(File(dataFileHandleId=filehandle['id'], parent=project))
e = syn.get(localFileEntity.id)
assert path == e.path
assert path == utils.normalize_path(e.path)


# SYNPY-424
Expand Down Expand Up @@ -566,7 +566,7 @@ def test_store__changing_externalURL_by_changing_path(syn, project, schedule_for

assert ext.externalURL != url
assert utils.normalize_path(temp_path) == utils.file_url_to_path(ext.externalURL)
assert temp_path == ext.path
assert temp_path == utils.normalize_path(ext.path)
assert not ext.synapseStore


Expand Down
10 changes: 1 addition & 9 deletions tests/integration/synapseclient/test_evaluations.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def test_evaluations(syn, project, schedule_for_cleanup):
# Create an Evaluation
name = 'Test Evaluation %s' % str(uuid.uuid4())
ev = Evaluation(name=name, description='Evaluation for testing',
contentSource=project['id'], status='CLOSED')
contentSource=project['id'])
ev = syn.store(ev)

try:
Expand All @@ -29,7 +29,6 @@ def test_evaluations(syn, project, schedule_for_cleanup):
assert ev['id'] == evalNamed['id']
assert ev['name'] == evalNamed['name']
assert ev['ownerId'] == evalNamed['ownerId']
assert ev['status'] == evalNamed['status']

# -- Get the Evaluation by project
evalProj = syn.getEvaluationByContentSource(project)
Expand All @@ -41,13 +40,6 @@ def test_evaluations(syn, project, schedule_for_cleanup):
assert ev['id'] == evalProj['id']
assert ev['name'] == evalProj['name']
assert ev['ownerId'] == evalProj['ownerId']
assert ev['status'] == evalProj['status']

# Update the Evaluation
ev['status'] = 'OPEN'
ev = syn.store(ev, createOrUpdate=True)
schedule_for_cleanup(ev)
assert ev.status == 'OPEN'

# Add the current user as a participant
myOwnerId = int(syn.getUserProfile()['ownerId'])
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/synapseutils/test_synapseutils_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ def test_syncFromSynapse(test_state):

assert len(output) == len(uploaded_paths)
for f in output:
assert f.path in uploaded_paths
assert utils.normalize_path(f.path) in uploaded_paths


def test_syncFromSynapse__children_contain_non_file(test_state):
Expand Down Expand Up @@ -240,7 +240,7 @@ def test_syncFromSynapse_Links(test_state):

assert len(output) == len(uploaded_paths)
for f in output:
assert f.path in uploaded_paths
assert utils.normalize_path(f.path) in uploaded_paths


def test_write_manifest_data__unicode_characters_in_rows(test_state):
Expand Down
Loading

0 comments on commit 56fca24

Please sign in to comment.