Skip to content

Commit

Permalink
[Function optimization] use file-lock to enable multi-process downloa…
Browse files Browse the repository at this point in the history
…ding (PaddlePaddle#3788)

* add file_lock unittest

* enable multi-processing downloading

* use small weight file & more process to download

* use global lock file

* fix testing

* modify the docstring

* use self.assertGreater

* change the file name & variable name

* add docstring for testing method

* update internal testing bert url

* update multiprocess testing code

* remove unused importing

* add Optional import

* use the origin method name

* format tokenize_utils_base
  • Loading branch information
wj-Mcat authored Nov 24, 2022
1 parent b122d4a commit 0d931ad
Show file tree
Hide file tree
Showing 7 changed files with 190 additions and 38 deletions.
17 changes: 10 additions & 7 deletions paddlenlp/transformers/model_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@
from paddle import Tensor
from paddle.nn import Embedding, Layer
# TODO(fangzeyang) Temporary fix and replace by paddle framework downloader later
from paddle.utils.download import get_path_from_url, is_url
from paddle.utils.download import is_url
from paddlenlp.utils.downloader import (download_check, COMMUNITY_MODEL_PREFIX)
from paddlenlp.utils.downloader import get_path_from_url_with_filelock
from paddlenlp.utils.env import MODEL_HOME, LOCK_FILE_HOME

from paddlenlp.utils.downloader import COMMUNITY_MODEL_PREFIX, download_check
from paddlenlp.utils.env import MODEL_HOME
from paddlenlp.utils.log import logger
from paddlenlp.utils.file_lock import FileLock

from .configuration_utils import PretrainedConfig
from .generation_utils import GenerationMixin
Expand Down Expand Up @@ -447,8 +449,9 @@ def from_pretrained(cls,
logger.info("Downloading %s and saved to %s" %
(file_path, default_root))
try:
resolved_resource_files[file_id] = get_path_from_url(
file_path, default_root)
resolved_resource_files[
file_id] = get_path_from_url_with_filelock(
file_path, default_root)
except RuntimeError as err:
logger.error(err)
raise RuntimeError(
Expand Down Expand Up @@ -864,8 +867,8 @@ def _resolve_model_file_path(cls: Type[PretrainedModel],

# 3. when it is url
if is_url(pretrained_model_name_or_path):
weight_file_path = get_path_from_url(pretrained_model_name_or_path,
cache_dir)
weight_file_path = get_path_from_url_with_filelock(
pretrained_model_name_or_path, cache_dir)
# # check the downloaded weight file and registered weight file name

# make sure that
Expand Down
17 changes: 10 additions & 7 deletions paddlenlp/transformers/tokenizer_utils_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
import paddle
from huggingface_hub import hf_hub_download

from paddlenlp.utils.downloader import (COMMUNITY_MODEL_PREFIX,
get_path_from_url)
from paddlenlp.utils.downloader import COMMUNITY_MODEL_PREFIX
from paddlenlp.utils.downloader import get_path_from_url_with_filelock
from paddlenlp.utils.env import MODEL_HOME
from paddlenlp.utils.log import logger

Expand Down Expand Up @@ -1578,8 +1578,9 @@ def from_pretrained(cls,
logger.info("Downloading %s and saved to %s" %
(file_path, default_root))
try:
resolved_vocab_files[file_id] = get_path_from_url(
file_path, default_root)
resolved_vocab_files[
file_id] = get_path_from_url_with_filelock(
file_path, default_root)
except RuntimeError as err:
if file_id not in cls.resource_files_names:
resolved_vocab_files[file_id] = None
Expand Down Expand Up @@ -1942,7 +1943,8 @@ def _get_padding_truncation_strategies(self,
warnings.warn(
"Though `pad_to_max_length` = `True`, it is ignored because `padding`=`True`."
)
padding_strategy = PaddingStrategy.LONGEST # Default to pad to the longest sequence in the batch
# Default to pad to the longest sequence in the batch
padding_strategy = PaddingStrategy.LONGEST
elif not isinstance(padding, PaddingStrategy):
padding_strategy = PaddingStrategy(padding)
elif isinstance(padding, PaddingStrategy):
Expand Down Expand Up @@ -2842,7 +2844,8 @@ def prepare_for_model(self,
offset_mapping = self.build_offset_mapping_with_special_tokens(
token_offset_mapping, token_pair_offset_mapping)
else:
offset_mapping = token_offset_mapping + token_pair_offset_mapping if token_pair_offset_mapping else token_offset_mapping
offset_mapping = token_offset_mapping + \
token_pair_offset_mapping if token_pair_offset_mapping else token_offset_mapping
encoded_inputs['offset_mapping'] = offset_mapping

# Check lengths
Expand All @@ -2864,7 +2867,7 @@ def prepare_for_model(self,

if return_length:
encoded_inputs["length"] = len(encoded_inputs["input_ids"])
#for compatibility
# for compatibility
encoded_inputs["seq_len"] = encoded_inputs["length"]

batch_outputs = BatchEncoding(encoded_inputs,
Expand Down
31 changes: 31 additions & 0 deletions paddlenlp/utils/downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import os
import sys
import os.path as osp
from typing import Optional
import shutil
import json
import requests
Expand Down Expand Up @@ -173,6 +174,36 @@ def get_path_from_url(url, root_dir, md5sum=None, check_exist=True):
return fullpath


def get_path_from_url_with_filelock(url: str,
root_dir: str,
md5sum: Optional[str] = None,
check_exist: bool = True) -> str:
"""construct `get_path_from_url` for `model_utils` to enable downloading multiprocess-safe
Args:
url (str): the url of resource file
root_dir (str): the local download path
md5sum (str, optional): md5sum string for file. Defaults to None.
check_exist (bool, optional): whether check the file is exist. Defaults to True.
Returns:
str: the path of downloaded file
"""
os.makedirs(root_dir, exist_ok=True)

# create lock file, which is empty, under the `LOCK_FILE_HOME` directory.
lock_file_path = os.path.join(LOCK_FILE_HOME,
f"{str(hash(url + root_dir))}")
with FileLock(lock_file_path):
# import get_path_from_url from paddle framework
from paddle.utils.download import get_path_from_url as _get_path_from_url
result = _get_path_from_url(url=url,
root_dir=root_dir,
md5sum=md5sum,
check_exist=check_exist)
return result


def _download(url, path, md5sum=None):
"""
Download from url, save to path.
Expand Down
1 change: 1 addition & 0 deletions paddlenlp/utils/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def _get_sub_home(directory, parent_home=_get_ppnlp_home()):
PPNLP_HOME = _get_ppnlp_home()
MODEL_HOME = _get_sub_home('models')
DATA_HOME = _get_sub_home('datasets')
LOCK_FILE_HOME = _get_sub_home(".lock")
DOWNLOAD_SERVER = "http://paddlepaddle.org.cn/paddlehub"
FAILED_STATUS = -1
SUCCESS_STATUS = 0
24 changes: 0 additions & 24 deletions tests/transformers/test_modeling.py

This file was deleted.

82 changes: 82 additions & 0 deletions tests/transformers/test_modeling_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import unittest
import shutil
from tempfile import TemporaryDirectory
from tests.testing_utils import slow
from multiprocessing import Pool
from paddlenlp.transformers import TinyBertModel, BertModel
from paddlenlp.utils.env import MODEL_HOME


def download_bert_model(model_name: str):
"""set the global method: multiprocessing can not pickle local method
Args:
model_name (str): the model name
"""

model = BertModel.from_pretrained(model_name)
# free the model resource
del model


class TestModeling(unittest.TestCase):
"""Test PretrainedModel single time, not in Transformer models"""

@slow
def test_from_pretrained_with_load_as_state_np_params(self):
"""init model with `load_state_as_np` params"""
model = TinyBertModel.from_pretrained("tinybert-4l-312d",
load_state_as_np=True)
self.assertIsNotNone(model)

@slow
def test_multiprocess_downloading(self):
"""test downloading with multi-process. Some errors may be triggered when downloading model
weight file with multiprocess, so this test code was born.
`num_process_in_pool` is the number of process in Pool.
And the `num_jobs` is the number of total process to download file.
"""
num_process_in_pool, num_jobs = 10, 20
small_model_path = "https://paddlenlp.bj.bcebos.com/models/community/__internal_testing__/bert/model_state.pdparams"

from paddlenlp.transformers.model_utils import get_path_from_url
with TemporaryDirectory() as tempdir:

with Pool(num_process_in_pool) as pool:
pool.starmap(get_path_from_url, [(small_model_path, tempdir)
for _ in range(num_jobs)])

# @slow
def test_model_from_pretrained_with_multiprocessing(self):
"""
this test can not init tooooo many models which will occupy CPU/GPU memorys.
`num_process_in_pool` is the number of process in Pool.
And the `num_jobs` is the number of total process to download file.
"""
num_process_in_pool, num_jobs = 1, 10

# 1.remove tinybert model weight file
model_name = "__internal_testing__/bert"
shutil.rmtree(os.path.join(MODEL_HOME, model_name), ignore_errors=True)

# 2. downloaing tinybert modeling using multi-processing
with Pool(num_process_in_pool) as pool:
pool.starmap(download_bert_model,
[(model_name, ) for _ in range(num_jobs)])
56 changes: 56 additions & 0 deletions tests/utils/test_file_lock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import unittest
import time
from datetime import datetime
from multiprocessing import Pool
from tempfile import TemporaryDirectory, TemporaryFile
from paddlenlp.utils.file_lock import FileLock


def time_lock(lock_file: str) -> datetime:
"""just sleep 1.2 seconds to test sequence timing
Args:
lock_file (str): the path of lock file
Returns:
datetime: the current datetime
"""
with FileLock(lock_file):
time.sleep(1.2)
return datetime.now()


class TestFileLock(unittest.TestCase):

def test_time_lock(self):
"""lock the time"""
with TemporaryDirectory() as tempdir:
lock_file = os.path.join(tempdir, 'download.lock')
pre_time, seconds = datetime.now(), 0

with Pool(4) as pool:
datetimes = pool.map(time_lock, [lock_file for _ in range(10)])
datetimes.sort()

pre_time = None
for current_time in datetimes:
if pre_time is None:
pre_time = current_time
else:
self.assertGreater((current_time - pre_time).seconds,
1 - 1e-3)

0 comments on commit 0d931ad

Please sign in to comment.