Skip to content

Commit

Permalink
Merge #6229
Browse files Browse the repository at this point in the history
6229: Adding zero-copy support on the receiving end of the TCP and MPI parcel ports r=hkaiser a=hkaiser

- flyby: cleaning up and modernizing TCP parcel port

`@JiakunYan` this implements what we discussed recently by de-serializing received parcels once the chunk information is available. This de-serialization however does not assume that the chunk data has been received, but merely allocates the memory for the subsequent networking operations to place received chunk data directly into the internal memory buffers.

Co-authored-by: Hartmut Kaiser <[email protected]>
  • Loading branch information
StellarBot and hkaiser committed May 7, 2023
2 parents 4a5f819 + 058c8b7 commit e82d578
Show file tree
Hide file tree
Showing 55 changed files with 1,304 additions and 515 deletions.
39 changes: 32 additions & 7 deletions .cmake-format.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,24 @@
'SOURCE_ROOT': 1},
'pargs': { 'flags': ['FAILURE_EXPECTED', 'NOLIBS'],
'nargs': '2+'}},
'add_hpx_regression_test': {'pargs': {'nargs': 2}},
'add_hpx_regression_test': { 'kwargs': { 'COMPONENT_DEPENDENCIES': '+',
'DEPENDENCIES': '+',
'FOLDER': 1,
'SOURCES': '+',
'SOURCE_ROOT': 1,
'ARGS': '+',
'EXECUTABLE': 1,
'PSEUDO_DEPS_NAME': 1,
'LOCALITIES': 1,
'PARCELPORTS': '+',
'THREADS_PER_LOCALITY': 1},
'pargs': { 'flags': ['FAILURE_EXPECTED',
'NOLIBS',
'RUN_SERIAL',
'NO_PARCELPORT_TCP',
'NO_PARCELPORT_LCI',
'NO_PARCELPORT_MPI'],
'nargs': '2+'}},
'add_hpx_source_group': { 'kwargs': { 'CLASS': 1,
'NAME': 1,
'ROOT': 1,
Expand All @@ -386,10 +403,7 @@
'EXECUTABLE': 1,
'LOCALITIES': 1,
'PARCELPORTS': '+',
'THREADS_PER_LOCALITY': 1,
'NO_PARCELPORT_TCP': 1,
'NO_PARCELPORT_MPI': 1,
'NO_PARCELPORT_LCI': 1},
'THREADS_PER_LOCALITY': 1},
'pargs': {'flags': ['FAILURE_EXPECTED',
'RUN_SERIAL',
'NO_PARCELPORT_TCP',
Expand All @@ -410,8 +424,19 @@
'DEPENDENCIES': '+',
'FOLDER': 1,
'SOURCES': '+',
'SOURCE_ROOT': 1},
'pargs': { 'flags': ['FAILURE_EXPECTED', 'NOLIBS'],
'SOURCE_ROOT': 1,
'ARGS': '+',
'EXECUTABLE': 1,
'PSEUDO_DEPS_NAME': 1,
'LOCALITIES': 1,
'PARCELPORTS': '+',
'THREADS_PER_LOCALITY': 1},
'pargs': { 'flags': ['FAILURE_EXPECTED',
'NOLIBS',
'RUN_SERIAL',
'NO_PARCELPORT_TCP',
'NO_PARCELPORT_LCI',
'NO_PARCELPORT_MPI'],
'nargs': '2+'}},
'add_parcelport': { 'kwargs': { 'COMPILE_FLAGS': '+',
'DEPENDENCIES': '+',
Expand Down
22 changes: 20 additions & 2 deletions docs/sphinx/manual/launching_and_configuring_hpx_applications.rst
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,7 @@ The ``hpx.parcel`` configuration section
max_outbound_message_size = ${HPX_PARCEL_MAX_OUTBOUND_MESSAGE_SIZE:<hpx_parcel_max_outbound_message_size>}
array_optimization = ${HPX_PARCEL_ARRAY_OPTIMIZATION:1}
zero_copy_optimization = ${HPX_PARCEL_ZERO_COPY_OPTIMIZATION:$[hpx.parcel.array_optimization]}
zero_copy_receive_optimization = ${HPX_PARCEL_ZERO_COPY_RECEIVE_OPTIMIZATION:$[hpx.parcel.array_optimization]}
async_serialization = ${HPX_PARCEL_ASYNC_SERIALIZATION:1}
message_handlers = ${HPX_PARCEL_MESSAGE_HANDLERS:0}
Expand Down Expand Up @@ -520,6 +521,11 @@ The ``hpx.parcel`` configuration section
* This property defines whether this :term:`locality` is allowed to utilize
zero copy optimizations during serialization of :term:`parcel` data. The default
is the same value as set for ``hpx.parcel.array_optimization``.
* * ``hpx.parcel.zero_copy_receive_optimization``
* This property defines whether this :term:`locality` is allowed to utilize
zero copy optimizations on the receiving end during de-serialization of
:term:`parcel` data. The default is the same value as set for
``hpx.parcel.zero_copy_optimization``.
* * ``hpx.parcel.zero_copy_serialization_threshold``
* This property defines the threshold value (in bytes) starting at which the
serialization layer will apply zero-copy optimizations for serialized
Expand All @@ -544,6 +550,7 @@ The following settings relate to the TCP/IP parcelport.
enable = ${HPX_HAVE_PARCELPORT_TCP:$[hpx.parcel.enabled]}
array_optimization = ${HPX_PARCEL_TCP_ARRAY_OPTIMIZATION:$[hpx.parcel.array_optimization]}
zero_copy_optimization = ${HPX_PARCEL_TCP_ZERO_COPY_OPTIMIZATION:$[hpx.parcel.zero_copy_optimization]}
zero_copy_receive_optimization = ${HPX_PARCEL_TCP_ZERO_COPY_RECEIVE_OPTIMIZATION:$[hpx.parcel.zero_copy_receive_optimization]}
zero_copy_serialization_threshold = ${HPX_PARCEL_TCP_ZERO_COPY_SERIALIZATION_THRESHOLD:$[hpx.parcel.zero_copy_serialization_threshold]}
async_serialization = ${HPX_PARCEL_TCP_ASYNC_SERIALIZATION:$[hpx.parcel.async_serialization]}
parcel_pool_size = ${HPX_PARCEL_TCP_PARCEL_POOL_SIZE:$[hpx.threadpools.parcel_pool_size]}
Expand Down Expand Up @@ -571,9 +578,14 @@ The following settings relate to the TCP/IP parcelport.
``hpx.parcel.array_optimization``.
* * ``hpx.parcel.tcp.zero_copy_optimization``
* This property defines whether this :term:`locality` is allowed to utilize
zero copy optimizations in the TCP/IP parcelport during serialization of
zero copy optimizations during serialization of
parcel data. The default is the same value as set for
``hpx.parcel.zero_copy_optimization``.
* * ``hpx.parcel.tcp.zero_copy_receive_optimization``
* This property defines whether this :term:`locality` is allowed to utilize
zero copy optimizations on the receiving end in the TCP/IP parcelport during
de-serialization of :term:`parcel` data. The default is the same value as set
for ``hpx.parcel.zero_copy_optimization``.
* * ``hpx.parcel.tcp.zero_copy_serialization_threshold``
* This property defines the threshold value (in bytes) starting at which the
serialization layer will apply zero-copy optimizations for serialized
Expand Down Expand Up @@ -623,6 +635,7 @@ equivalent CMake variable is ``HPX_WITH_PARCELPORT_MPI`` and has to be set to
processor_name = <MPI_processor_name>
array_optimization = ${HPX_HAVE_PARCEL_MPI_ARRAY_OPTIMIZATION:$[hpx.parcel.array_optimization]}
zero_copy_optimization = ${HPX_HAVE_PARCEL_MPI_ZERO_COPY_OPTIMIZATION:$[hpx.parcel.zero_copy_optimization]}
zero_copy_receive_optimization = ${HPX_HAVE_PARCEL_MPI_ZERO_COPY_RECEIVE_OPTIMIZATION:$[hpx.parcel.zero_copy_receive_optimization]}
zero_copy_serialization_threshold = ${HPX_PARCEL_MPI_ZERO_COPY_SERIALIZATION_THRESHOLD:$[hpx.parcel.zero_copy_serialization_threshold]}
use_io_pool = ${HPX_HAVE_PARCEL_MPI_USE_IO_POOL:$1}
async_serialization = ${HPX_HAVE_PARCEL_MPI_ASYNC_SERIALIZATION:$[hpx.parcel.async_serialization]}
Expand Down Expand Up @@ -670,6 +683,11 @@ equivalent CMake variable is ``HPX_WITH_PARCELPORT_MPI`` and has to be set to
zero copy optimizations in the MPI parcelport during serialization of
parcel data. The default is the same value as set for
``hpx.parcel.zero_copy_optimization``.
* * ``hpx.parcel.mpi.zero_copy_receive_optimization``
* This property defines whether this :term:`locality` is allowed to utilize
zero copy optimizations on the receiving end in the MPI parcelport during
de-serialization of :term:`parcel` data. The default is the same value as
set for ``hpx.parcel.zero_copy_optimization``.
* * ``hpx.parcel.mpi.zero_copy_serialization_threshold``
* This property defines the threshold value (in bytes) starting at which the
serialization layer will apply zero-copy optimizations for serialized
Expand Down Expand Up @@ -703,7 +721,7 @@ equivalent CMake variable is ``HPX_WITH_PARCELPORT_MPI`` and has to be set to
* This property defines the maximum allowed outbound coalesced message size
that will be transferrable through the :term:`parcel` layer. The default is
taken from ``hpx.parcel.max_outbound_connections``.
* * ``hpx.parcel.tcp.max_background_threads``
* * ``hpx.parcel.mpi.max_background_threads``
* This property defines how many cores should be used to perform background
operations. The default is taken from ``hpx.parcel.max_background_threads``.

Expand Down
2 changes: 1 addition & 1 deletion libs/core/assertion/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,6 @@ add_hpx_module(
SOURCES ${assertion_sources}
HEADERS ${assertion_headers}
COMPAT_HEADERS ${assertion_compat_headers}
MODULE_DEPENDENCIES hpx_config hpx_preprocessor
MODULE_DEPENDENCIES hpx_config hpx_format hpx_preprocessor
CMAKE_SUBDIRS examples tests
)
26 changes: 14 additions & 12 deletions libs/core/assertion/include/hpx/modules/assertion.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <hpx/config.hpp>
#include <hpx/assertion/evaluate_assert.hpp>
#include <hpx/assertion/source_location.hpp>
#include <hpx/modules/format.hpp>
#include <hpx/preprocessor/stringize.hpp>

#if defined(HPX_COMPUTE_DEVICE_CODE)
Expand Down Expand Up @@ -59,36 +60,37 @@ namespace hpx::assertion {
#define HPX_ASSERT_MSG(expr, msg)
#else
/// \cond NOINTERNAL
#define HPX_ASSERT_(expr, msg) \
#define HPX_ASSERT_(expr, ...) \
(!!(expr) ? void() : \
::hpx::assertion::detail::handle_assert( \
HPX_CURRENT_SOURCE_LOCATION(), HPX_PP_STRINGIZE(expr), \
msg)) /**/
hpx::util::format(__VA_ARGS__))) /**/

#define HPX_ASSERT_LOCKED_(l, expr, msg) \
#define HPX_ASSERT_LOCKED_(l, expr, ...) \
(!!(expr) ? void() : \
((l).unlock(), \
::hpx::assertion::detail::handle_assert( \
HPX_CURRENT_SOURCE_LOCATION(), HPX_PP_STRINGIZE(expr), \
msg))) /**/
hpx::util::format(__VA_ARGS__)))) /**/

#if defined(HPX_DEBUG)
#if defined(HPX_COMPUTE_DEVICE_CODE)
#define HPX_ASSERT(expr) assert(expr)
#define HPX_ASSERT_MSG(expr, msg) HPX_ASSERT(expr)
#define HPX_ASSERT_MSG(expr, ...) HPX_ASSERT(expr)
#define HPX_ASSERT_LOCKED(l, expr) assert(expr)
#define HPX_ASSERT_LOCKED_MSG(l, expr, msg) HPX_ASSERT(expr)
#define HPX_ASSERT_LOCKED_MSG(l, expr, ...) HPX_ASSERT(expr)
#else
#define HPX_ASSERT(expr) HPX_ASSERT_(expr, std::string())
#define HPX_ASSERT_MSG(expr, msg) HPX_ASSERT_(expr, msg)
#define HPX_ASSERT_LOCKED(l, expr) HPX_ASSERT_LOCKED_(l, expr, std::string())
#define HPX_ASSERT_LOCKED_MSG(l, expr, msg) HPX_ASSERT_LOCKED_(l, expr, msg)
#define HPX_ASSERT(expr) HPX_ASSERT_(expr, "")
#define HPX_ASSERT_MSG(expr, ...) HPX_ASSERT_(expr, __VA_ARGS__)
#define HPX_ASSERT_LOCKED(l, expr) HPX_ASSERT_LOCKED_(l, expr, "")
#define HPX_ASSERT_LOCKED_MSG(l, expr, ...) \
HPX_ASSERT_LOCKED_(l, expr, __VA_ARGS__)
#endif
#else
#define HPX_ASSERT(expr)
#define HPX_ASSERT_MSG(expr, msg)
#define HPX_ASSERT_MSG(expr, ...)
#define HPX_ASSERT_LOCKED(l, expr)
#define HPX_ASSERT_LOCKED_MSG(l, expr, msg)
#define HPX_ASSERT_LOCKED_MSG(l, expr, ...)
#endif

#define HPX_UNREACHABLE \
Expand Down
2 changes: 1 addition & 1 deletion libs/core/format/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,6 @@ add_hpx_module(
SOURCES ${format_sources}
HEADERS ${format_headers}
COMPAT_HEADERS ${format_compat_headers}
MODULE_DEPENDENCIES hpx_assertion hpx_config hpx_type_support
MODULE_DEPENDENCIES hpx_config
CMAKE_SUBDIRS examples tests
)
30 changes: 18 additions & 12 deletions libs/core/format/src/format.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

#include <hpx/assert.hpp>
#include <hpx/modules/format.hpp>
#include <hpx/type_support/unused.hpp>

#include <algorithm>
#include <cstddef>
Expand All @@ -17,6 +15,7 @@
#include <limits>
#include <ostream>
#include <sstream>
#include <stdexcept>
#include <string>
#include <string_view>

Expand All @@ -34,7 +33,7 @@ namespace hpx::util::detail {

char const* first = buffer;
char* last = buffer;
std::size_t r = std::strtoull(first, &last, 10);
std::size_t const r = std::strtoull(first, &last, 10);
if (pos != nullptr)
*pos = last - first;
return r;
Expand All @@ -58,7 +57,7 @@ namespace hpx::util::detail {
inline format_field parse_field(std::string_view field) noexcept
{
std::size_t const sep = field.find(':');
if (sep != field.npos)
if (sep != std::string_view::npos)
{
std::string_view const arg_id = format_substr(field, 0, sep);
std::string_view const spec = format_substr(field, sep + 1);
Expand Down Expand Up @@ -87,17 +86,24 @@ namespace hpx::util::detail {
}
else
{
HPX_ASSERT(format_str[0] != '}');
if (format_str[0] == '}')
{
throw std::runtime_error("bad format string");
}
std::size_t const end = format_str.find('}');
std::string_view field_str =
std::string_view const field_str =
format_substr(format_str, 1, end);
format_field const field = parse_field(field_str);
format_str.remove_prefix(end - 1);

std::size_t const id =
field.arg_id ? field.arg_id - 1 : index;
HPX_ASSERT(id < count);
HPX_UNUSED(count);
if (id >= count)
{
throw std::runtime_error(
"bad format string (wrong number of arguments)");
}

args[id](os, field.spec);
++index;
}
Expand All @@ -106,11 +112,11 @@ namespace hpx::util::detail {
else
{
std::size_t const next = format_str.find_first_of("{}");
std::size_t const count =
next != format_str.npos ? next : format_str.size();
std::size_t const cnt =
next != std::string_view::npos ? next : format_str.size();

os.write(format_str.data(), count);
format_str.remove_prefix(count);
os.write(format_str.data(), cnt);
format_str.remove_prefix(cnt);
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions libs/core/mpi_base/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ add_hpx_module(
SOURCES ${mpi_base_sources}
HEADERS ${mpi_base_headers}
COMPAT_HEADERS ${mpi_base_compat_headers}
MODULE_DEPENDENCIES hpx_logging hpx_runtime_configuration hpx_string_util
hpx_util
MODULE_DEPENDENCIES hpx_format hpx_logging hpx_runtime_configuration
hpx_string_util hpx_util
DEPENDENCIES ${additional_dependencies}
CMAKE_SUBDIRS examples tests
)
Loading

0 comments on commit e82d578

Please sign in to comment.