diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml new file mode 100644 index 0000000..27b4ec0 --- /dev/null +++ b/.github/workflows/build.yml @@ -0,0 +1,27 @@ +name: build +on: [push] +jobs: + build: + strategy: + matrix: + build-type: [Release] + raptor: [ON, OFF] + os: [ubuntu-latest, ubuntu-22.04, ubuntu-20.04] + runs-on: ${{ matrix.os }} + steps: + - name: check out repository code + uses: actions/checkout@v3 + with: + submodules: recursive + - name: install dependencies + run: sudo apt install -y gcc ninja-build cmake libssl-dev libboost-all-dev libspdlog-dev libtinyxml2-dev libnl-3-dev pkg-config libconfig++-dev + - name: pre-cmake + run: cmake --version && mkdir ${{ github.workspace }}/build && echo `nproc` processing units available + - name: cmake + run: cmake -S ${{ github.workspace }} -B ${{ github.workspace }}/build -GNinja -DCMAKE_BUILD_TYPE=${{ matrix.build-type }} -DENABLE_RAPTOR=${{ matrix.raptor }} + - name: build + run: cd ${{ github.workspace }}/build && ninja -j`nproc` + - name: check for runtime linking errors in transmitter + run: ${{ github.workspace }}/build/examples/flute-transmitter --help + - name: check for runtime linking errors in receiver + run: ${{ github.workspace }}/build/examples/flute-receiver --help diff --git a/.gitignore b/.gitignore index c42b616..bdeb7aa 100644 --- a/.gitignore +++ b/.gitignore @@ -33,4 +33,8 @@ build/ .idea -cmake-build-debug/ \ No newline at end of file +cmake-build-debug/ +html/ +.vscode/ +*.swp +flute_download_* diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..8e8412b --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "raptor"] + path = raptor + url = https://github.com/SteezyE/raptor.git diff --git a/CMakeLists.txt b/CMakeLists.txt index 8e94b64..5431735 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,12 +1,16 @@ cmake_minimum_required(VERSION 3.16) -project (libflute VERSION 0.11.0) +project (libflute VERSION 0.11.0 LANGUAGES C CXX) set(CMAKE_CXX_STANDARD 17) +set(CMAKE_C_STANDARD 17) set(CMAKE_CXX_STANDARD_REQUIRED True) +set(CMAKE_C_STANDARD_REQUIRED True) set(CMAKE_CXX_FLAGS_DEBUG_INIT "-Wall -Wextra -Werror -g3") +set(CMAKE_C_FLAGS_DEBUG_INIT "-Wall -Wextra -Werror -g3") set(CMAKE_CXX_FLAGS_RELEASE_INIT "-Wall -O3") +set(CMAKE_C_FLAGS_RELEASE_INIT "-Wall -O3") find_package(Boost REQUIRED) find_package(spdlog REQUIRED) @@ -15,11 +19,14 @@ find_package(OpenSSL REQUIRED) pkg_check_modules(TINYXML REQUIRED IMPORTED_TARGET tinyxml2) pkg_check_modules(NETLINK REQUIRED IMPORTED_TARGET libnl-3.0) +option(ENABLE_RAPTOR "Enable support for Raptor FEC" ON) + add_subdirectory(examples) include_directories( "${PROJECT_BINARY_DIR}" ${PROJECT_SOURCE_DIR}/include + ${PROJECT_SOURCE_DIR}/raptor SYSTEM ${PROJECT_SOURCE_DIR}/utils @@ -29,25 +36,44 @@ include_directories( configure_file("include/Version.h.in" "Version.h") -link_directories( - ) - set(CMAKE_CXX_CLANG_TIDY clang-tidy) +if(ENABLE_RAPTOR) + message(STATUS "Compiling raptor library for Raptor FEC support. To disable support build with -DENABLE_RAPTOR=OFF") + add_library(raptor STATIC) + target_sources(raptor + PRIVATE + raptor/bipartite.c raptor/decoder.c raptor/encoder.c raptor/galois.c raptor/gaussian.c raptor/pivoting.c raptor/random.c + PUBLIC + raptor/raptor.h + ) + target_link_libraries(raptor LINK_PUBLIC m) +else() + message(STATUS "Skipping raptor library for Raptor FEC support. To enable support build with -DENABLE_RAPTOR=ON") +endif() + add_library(flute "") target_sources(flute PRIVATE - src/Receiver.cpp src/Transmitter.cpp src/AlcPacket.cpp src/File.cpp src/EncodingSymbol.cpp src/FileDeliveryTable.cpp src/IpSec.cpp - utils/base64.cpp + src/Receiver.cpp src/Transmitter.cpp src/AlcPacket.cpp src/File.cpp src/EncodingSymbol.cpp src/FileDeliveryTable.cpp src/IpSec.cpp utils/base64.cpp + PUBLIC include/Receiver.h include/Transmitter.h include/File.h + ) -target_include_directories(flute - PUBLIC - ${CMAKE_CURRENT_LIST_DIR}/include/ - ) +target_include_directories(flute PUBLIC ${CMAKE_CURRENT_LIST_DIR}/include/) -#add_library(flute src/Receiver.cpp src/Receiver.h src/AlcPacket.cpp src/File.cpp src/EncodingSymbol.cpp src/FileDeliveryTable.cpp) +if(ENABLE_RAPTOR) + add_compile_definitions(RAPTOR_ENABLED) + target_include_directories(flute PUBLIC ${CMAKE_CURRENT_LIST_DIR}/raptor/) + target_sources(flute + PRIVATE + src/RaptorFEC.cpp + PUBLIC + include/RaptorFEC.h + ) + target_link_libraries(flute LINK_PUBLIC raptor) +endif() target_link_libraries( flute LINK_PUBLIC @@ -58,4 +84,3 @@ target_link_libraries( flute PkgConfig::TINYXML PkgConfig::NETLINK ) - diff --git a/README.md b/README.md index 562bcdb..eca806f 100644 --- a/README.md +++ b/README.md @@ -5,28 +5,37 @@ ## Installation guide -Installation of libflute consists of 3 simple steps: +Installation of libflute consists of 2 simple steps: 1. Getting the source code -2. Build setup -3. Building +2. Building ### Step 1: Getting the source code -```` -cd ~ -git clone https://github.com/5G-MAG/rt-libflute.git -```` -### Step 2: Build setup +This repository uses git submodules so it must be cloned with the following command + ```` -cd libflute/ -mkdir build && cd build -cmake -GNinja .. +git clone --recurse-submodules https://github.com/5G-MAG/rt-libflute.git ```` -### Step 3: Building +If you had checked it out before the submodules were added or ran git clone withou the `--recurse-submodules` argument then initialise and update the submodules by running + +``` +git submodule update --init +``` + +### Step 2: Building ```` -ninja +cd rt-libflute/ +mkdir build +cd build +cmake .. +cmake --build . ```` +(alternatively you can use the `build.sh` script to build in debug mode with raptor enabled) + +Build options: + +- "Enable Raptor": build with support for the raptor10-based forward error correction. On by default. Disable by passing the `-DENABLE_RAPTOR=OFF` option to cmake ## Usage @@ -87,6 +96,19 @@ sudo setcap 'cap_net_admin=eip' ./flute-transmitter sudo setcap 'cap_net_admin=eip' ./flute-receiver ```` +### Optional: Forward Error Correction (FEC) for lossy environment + +To use forward error correction to overcome packet loss during transmission add the option `-f 1` to the transmitter. The receiver needs no such option, just make sure that both of them were properly built/rebuilt with raptor enabled in the build options (this is the default). + +To simulate packet loss over the loopback interface and test that FEC works, you can run the `setup_packet_loss_on_loopback` script, as root. + +Then start a transmission as you usually would. Make sure to use the default ip address, since the script will reroute this to go through the loopback interface. + +Finally revert the changes to your loopback interface and routing table with the `reset_loopback_settings` script. + ## Documentation Documentation of the source code can be found at: https://5g-mag.github.io/rt-libflute/ + +To generate it locally via doxygen run `doxygen` in the project root. +Then to view it open the local file `html/index.html` in a browser diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index bba302a..d8f3ded 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -32,6 +32,7 @@ target_link_libraries( flute-transmitter config++ flute pthread + m ) target_link_libraries( flute-receiver LINK_PUBLIC @@ -39,4 +40,5 @@ target_link_libraries( flute-receiver config++ flute pthread + m ) diff --git a/examples/flute-receiver.cpp b/examples/flute-receiver.cpp index 694d91a..a3df945 100644 --- a/examples/flute-receiver.cpp +++ b/examples/flute-receiver.cpp @@ -32,6 +32,7 @@ #include "Version.h" #include "Receiver.h" #include "File.h" +#include "flute_types.h" using libconfig::Config; @@ -52,6 +53,8 @@ static struct argp_option options[] = { // NOLINT "Log verbosity: 0 = trace, 1 = debug, 2 = info, 3 = warn, 4 = error, 5 = " "critical, 6 = none. Default: 2.", 0}, + {"download-dir", 'd', "Download directory", 0 , "Directory in which to store downloaded files, defaults to the current directory otherwise", 0}, + {"num-files", 'n', "Stop Receiving after n files", 0, "Stop the reception after n files have been received (default is to never stop)", 0}, {nullptr, 0, nullptr, 0, nullptr, 0}}; /** @@ -64,6 +67,8 @@ struct ft_arguments { const char *aes_key = {}; unsigned short mcast_port = 40085; unsigned log_level = 2; /**< log level */ + char *download_dir = nullptr; + unsigned nfiles = 0; /**< log level */ char **files; }; @@ -89,6 +94,12 @@ static auto parse_opt(int key, char *arg, struct argp_state *state) -> error_t { case 'l': arguments->log_level = static_cast(strtoul(arg, nullptr, 10)); break; + case 'd': + arguments->download_dir = arg; + break; + case 'n': + arguments->nfiles = static_cast(strtoul(arg, nullptr, 10)); + break; default: return ARGP_ERR_UNKNOWN; } @@ -155,18 +166,39 @@ auto main(int argc, char **argv) -> int { } receiver.register_completion_callback( - [](std::shared_ptr file) { //NOLINT + [&](std::shared_ptr file) { //NOLINT spdlog::info("{} (TOI {}) has been received", file->meta().content_location, file->meta().toi); - FILE* fd = fopen(file->meta().content_location.c_str(), "wb"); - fwrite(file->buffer(), 1, file->length(), fd); - fclose(fd); + char *buf = (char*) calloc(256,1); + char *fname = (char*) strrchr(file->meta().content_location.c_str(),'/'); + if(!fname){ + fname = (char*) file->meta().content_location.c_str(); + } else { + fname++; + } + if (arguments.download_dir) { + snprintf(buf,256,"%s/%s",arguments.download_dir, fname); + } else { + snprintf(buf,256,"flute_download_%d-%s",file->meta().toi, fname); + } + FILE* fd = fopen(buf, "wb"); + if (fd) { + fwrite(file->buffer(), 1, file->length(), fd); + fclose(fd); + } else { + spdlog::error("Error opening file {} to store received object",buf); + } + free(buf); + if (file->meta().toi == arguments.nfiles) { + spdlog::warn("{} file(s) received. Stopping reception",arguments.nfiles); + receiver.stop(); + } }); // Start the IO service io.run(); } catch (std::exception ex ) { - spdlog::error("Exiting on unhandled exception: %s", ex.what()); + spdlog::error("Exiting on unhandled exception: {}", ex.what()); } exit: diff --git a/examples/flute-transmitter.cpp b/examples/flute-transmitter.cpp index 93538f3..7ede617 100644 --- a/examples/flute-transmitter.cpp +++ b/examples/flute-transmitter.cpp @@ -16,9 +16,9 @@ #include #include #include - #include - +#include +#include #include #include #include @@ -31,6 +31,7 @@ #include "Version.h" #include "Transmitter.h" +#include "flute_types.h" using libconfig::Config; @@ -44,6 +45,7 @@ static char doc[] = "FLUTE/ALC transmitter demo"; // NOLINT static struct argp_option options[] = { // NOLINT {"target", 'm', "IP", 0, "Target multicast address (default: 238.1.1.95)", 0}, + {"fec", 'f', "FEC Scheme", 0, "Choose a scheme for Forward Error Correction. Compact No Code = 0, Raptor = 1 (default is 0)", 0}, {"port", 'p', "PORT", 0, "Target port (default: 40085)", 0}, {"mtu", 't', "BYTES", 0, "Path MTU to size ALC packets for (default: 1500)", 0}, {"rate-limit", 'r', "KBPS", 0, "Transmit rate limit (kbps), 0 = no limit, default: 1000 (1 Mbps)", 0}, @@ -65,6 +67,7 @@ struct ft_arguments { unsigned short mtu = 1500; uint32_t rate_limit = 1000; unsigned log_level = 2; /**< log level */ + unsigned fec = 0; /**< log level */ char **files; }; @@ -93,6 +96,13 @@ static auto parse_opt(int key, char *arg, struct argp_state *state) -> error_t { case 'l': arguments->log_level = static_cast(strtoul(arg, nullptr, 10)); break; + case 'f': + arguments->fec = static_cast(strtoul(arg, nullptr, 10)); + if ( (arguments->fec | 1) != 1 ) { + spdlog::error("Invalid FEC scheme ! Please pick either 0 (Compact No Code) or 1 (Raptor)"); + return ARGP_ERR_UNKNOWN; + } + break; case ARGP_KEY_NO_ARGS: argp_usage (state); case ARGP_KEY_ARG: @@ -131,7 +141,7 @@ auto main(int argc, char **argv) -> int { arguments.mcast_target = "238.1.1.95"; argp_parse(&argp, argc, argv, 0, nullptr, &arguments); - + // Set up logging std::string ident = "flute-transmitter"; auto syslog_logger = spdlog::syslog_logger_mt("syslog", ident, LOG_PID | LOG_PERROR | LOG_CONS ); @@ -156,14 +166,29 @@ auto main(int argc, char **argv) -> int { // read the file contents into the buffers for (int j = 0; arguments.files[j]; j++) { - std::string location = arguments.files[j]; - std::ifstream file(arguments.files[j], std::ios::binary | std::ios::ate); - std::streamsize size = file.tellg(); - file.seekg(0, std::ios::beg); - - char* buffer = (char*)malloc(size); - file.read(buffer, size); - files.push_back(FsFile{ arguments.files[j], buffer, (size_t)size}); + struct stat sb; + int fd; + fd = open(arguments.files[j], O_RDONLY); + if (fd == -1) { + spdlog::error("Couldnt open file {}",arguments.files[j]); + continue; + } + if (fstat(fd, &sb) == -1){ // To obtain file size + spdlog::error("fstat() call for file {} failed",arguments.files[j]); + close(fd); + continue; + } + if (sb.st_size <= 0) { + close(fd); + continue; + } + char* buffer = (char*) mmap(nullptr, sb.st_size, PROT_READ, MAP_PRIVATE, fd, 0); + close(fd); + if ( (long) buffer <= 0) { + spdlog::error("mmap() failed for file {}",arguments.files[j]); + continue; + } + files.push_back(FsFile{ arguments.files[j], buffer, (size_t) sb.st_size}); } // Create a Boost io_service @@ -173,9 +198,10 @@ auto main(int argc, char **argv) -> int { LibFlute::Transmitter transmitter( arguments.mcast_target, (short)arguments.mcast_port, - 0, + 16, arguments.mtu, arguments.rate_limit, + LibFlute::FecScheme(arguments.fec), io); // Configure IPSEC ESP, if enabled @@ -188,11 +214,10 @@ auto main(int argc, char **argv) -> int { transmitter.register_completion_callback( [&files](uint32_t toi) { for (auto& file : files) { - if (file.toi == toi) { - spdlog::info("{} (TOI {}) has been transmitted", - file.location, file.toi); - // could free() the buffer here - } + if (file.toi == toi) { + spdlog::info("{} (TOI {}) has been transmitted", file.location,file.toi); + munmap(file.buffer,file.len); + } } }); @@ -204,8 +229,10 @@ auto main(int argc, char **argv) -> int { file.buffer, file.len ); - spdlog::info("Queued {} ({} bytes) for transmission, TOI is {}", + if (file.toi > 0) { + spdlog::info("Queued {} ({} bytes) for transmission, TOI is {}", file.location, file.len, file.toi); + } } // Start the io_service, and thus sending data diff --git a/include/File.h b/include/File.h index 081ef5f..1378483 100644 --- a/include/File.h +++ b/include/File.h @@ -21,6 +21,7 @@ #include "AlcPacket.h" #include "FileDeliveryTable.h" #include "EncodingSymbol.h" +#include "flute_types.h" namespace LibFlute { /** @@ -84,12 +85,12 @@ namespace LibFlute { /** * Get the FEC OTI values */ - const FecOti& fec_oti() const { return _meta.fec_oti; }; + FecOti& fec_oti() { return _meta.fec_oti; }; /** * Get the file metadata from its FDT entry */ - const LibFlute::FileDeliveryTable::FileEntry& meta() const { return _meta; }; + LibFlute::FileDeliveryTable::FileEntry& meta() { return _meta; }; /** * Timestamp of file reception @@ -130,21 +131,10 @@ namespace LibFlute { void calculate_partitioning(); void create_blocks(); - struct SourceBlock { - bool complete = false; - struct Symbol { - char* data; - size_t length; - bool complete = false; - bool queued = false; - }; - std::map symbols; - }; - void check_source_block_completion(SourceBlock& block); void check_file_completion(); - std::map _source_blocks; + std::map _source_blocks; bool _complete = false;; @@ -163,4 +153,16 @@ namespace LibFlute { uint16_t _fdt_instance_id = 0; }; + + /** + * Calculate the md5 message digest + * + * @param input byte array whose md5 message digest shall be calculated + * @param length size of the input array + * @param result buffer to store the output of the md5 calculation. Make sure it is EVP_MAX_MD_SIZE bytes large + * + * @return length of the calculated md5 sum (should be 16 bytes for md5) + */ + int calculate_md5(char *input, int length, unsigned char *result); + }; diff --git a/include/FileDeliveryTable.h b/include/FileDeliveryTable.h index 57b98aa..8b585ad 100644 --- a/include/FileDeliveryTable.h +++ b/include/FileDeliveryTable.h @@ -46,7 +46,7 @@ namespace LibFlute { /** * Default destructor. */ - virtual ~FileDeliveryTable() {}; + virtual ~FileDeliveryTable(); /** * Get the FDT instance ID @@ -64,6 +64,7 @@ namespace LibFlute { std::string content_type; uint64_t expires; FecOti fec_oti; + FecTransformer *fec_transformer; }; /** @@ -74,7 +75,7 @@ namespace LibFlute { /** * Add a file entry */ - void add(const FileEntry& entry); + void add(FileEntry& entry); /** * Remove a file entry @@ -96,6 +97,7 @@ namespace LibFlute { std::vector _file_entries; FecOti _global_fec_oti; + FecTransformer *_fdt_fec_transformer = 0; uint64_t _expires; }; diff --git a/include/RaptorFEC.h b/include/RaptorFEC.h new file mode 100644 index 0000000..8be22eb --- /dev/null +++ b/include/RaptorFEC.h @@ -0,0 +1,9 @@ +#ifndef LIBFLUTE_RAPTORFEC_H +#define LIBFLUTE_RAPTORFEC_H + +#include "raptor.h" // raptor lib +#include "flute_types.h" +#include "spdlog/spdlog.h" +#include + +#endif //LIBFLUTE_RAPTORFEC_H diff --git a/include/Receiver.h b/include/Receiver.h index 959c4c8..e9ef621 100644 --- a/include/Receiver.h +++ b/include/Receiver.h @@ -21,6 +21,7 @@ #include #include "File.h" #include "FileDeliveryTable.h" +#include "flute_types.h" namespace LibFlute { /** @@ -45,8 +46,7 @@ namespace LibFlute { * @param io_service Boost io_service to run the socket operations in (must be provided by the caller) */ Receiver( const std::string& iface, const std::string& address, - short port, uint64_t tsi, - boost::asio::io_service& io_service); + short port, uint64_t tsi, boost::asio::io_service& io_service); /** * Default destructor. diff --git a/include/Transmitter.h b/include/Transmitter.h index 1dd0565..e6d6635 100644 --- a/include/Transmitter.h +++ b/include/Transmitter.h @@ -23,6 +23,7 @@ #include "File.h" #include "AlcPacket.h" #include "FileDeliveryTable.h" +#include "flute_types.h" namespace LibFlute { /** @@ -51,6 +52,7 @@ namespace LibFlute { Transmitter( const std::string& address, short port, uint64_t tsi, unsigned short mtu, uint32_t rate_limit, + FecScheme _fec_scheme, boost::asio::io_service& io_service); /** @@ -124,6 +126,7 @@ namespace LibFlute { uint16_t _toi = 1; uint32_t _max_payload; + FecScheme _fec_scheme; FecOti _fec_oti; completion_callback_t _completion_cb = nullptr; diff --git a/include/flute_types.h b/include/flute_types.h index fe976e6..da70ce0 100644 --- a/include/flute_types.h +++ b/include/flute_types.h @@ -13,8 +13,15 @@ // See the License for the specific language governing permissions and limitations // under the License. // -#pragma once +#include +#include +#include +#include "tinyxml2.h" +#ifdef RAPTOR_ENABLED +#include "raptor.h" +#endif +#pragma once /** \mainpage LibFlute - ALC/FLUTE library * * The library contains two simple **example applications** as a starting point: @@ -39,19 +46,175 @@ namespace LibFlute { }; /** - * Error correction schemes + * Error correction schemes. From the registry for FEC schemes http://www.iana.org/assignments/rmt-fec-parameters (RFC 5052) */ enum class FecScheme { - CompactNoCode + CompactNoCode, + Raptor, + Reed_Solomon_GF_2_m, + LDPC_Staircase_Codes, + LDPC_Triangle_Codes, + Reed_Solomon_GF_2_8, + RaptorQ + }; + + struct Symbol { + char* data; + size_t length; + bool complete = false; + bool queued = false; + }; + + struct SourceBlock { + uint16_t id = 0; + bool complete = false; + std::map symbols; }; - /** - * OTI values struct - */ struct FecOti { FecScheme encoding_id; uint64_t transfer_length; uint32_t encoding_symbol_length; uint32_t max_source_block_length; }; + + /** + * abstract class for FEC Object En/De-coding + */ + class FecTransformer { + + public: + + virtual ~FecTransformer() = default; + + + /** + * @brief Attempt to decode a source block + * + * @param srcblk the source block that should be decoded + * @return whether or not the decoding was successful + */ + virtual bool check_source_block_completion(SourceBlock& srcblk) = 0; + + /** + * @brief Encode a file into multiple source blocks + * + * @param buffer a pointer to the buffer containing the data + * @param bytes_read a pointer to an integer to store the number of bytes read out of buffer + * @return a map of source blocks that the object has been encoded to + */ + virtual std::map create_blocks(char *buffer, int *bytes_read) = 0; + + /** + * @brief Process a received symbol + * + * @param srcblk the source block this symbols corresponds to + * @param symb the received symbol + * @param id the symbols id + * @return success or failure + */ + virtual bool process_symbol(LibFlute::SourceBlock& srcblk, LibFlute::Symbol& symb, unsigned int id) = 0; + + virtual bool calculate_partitioning() = 0; + + /** + * @brief Attempt to parse relevent information for decoding from the FDT + * + * @return success status + */ + virtual bool parse_fdt_info(tinyxml2::XMLElement *file) = 0; + + /** + * @brief Add relevant information about the FEC Scheme which the decoder may need, to the FDT + * + * @return success status + */ + virtual bool add_fdt_info(tinyxml2::XMLElement *file) = 0; + + /** + * @brief Allocate the size of the buffer needed for this encoding scheme (since it may be larger) + * + * @param min_length this should be the size of the file (transfer length). This determines the minimum size of the returned buffer + * @return 0 on failure, otherwise return a pointer to the buffer + */ + virtual void *allocate_file_buffer(int min_length) = 0; + + /** + * @brief Called after the file is marked as complete, to finish extraction/decoding (if necessary) + * + * @param blocks the source blocks of the file, stored in the File object + */ + virtual bool extract_file(std::map blocks) = 0; + + uint32_t nof_source_symbols = 0; + uint32_t nof_source_blocks = 0; + uint32_t large_source_block_length = 0; + uint32_t small_source_block_length = 0; + uint32_t nof_large_source_blocks = 0; + + }; + +#ifdef RAPTOR_ENABLED + class RaptorFEC : public FecTransformer { + + private: + + bool is_encoder = true; + + unsigned int target_K(int blockno); + + Symbol translate_symbol(struct enc_context *encoder_ctx); + + LibFlute::SourceBlock create_block(char *buffer, int *bytes_read, int blockid); + + const float surplus_packet_ratio = 1.15; // adds 15% transmission overhead in exchange for protection against up to 15% packet loss. Assuming 1 symbol per packet, for smaller files packets may contain up to 10 symbols per packet but small files are much less vulnerable to packet loss anyways + + void extract_finished_block(LibFlute::SourceBlock& srcblk, struct dec_context *dc); + + public: + + RaptorFEC(unsigned int transfer_length, unsigned int max_payload); + + RaptorFEC() {}; + + ~RaptorFEC(); + + bool check_source_block_completion(SourceBlock& srcblk); + + std::map create_blocks(char *buffer, int *bytes_read); + + bool process_symbol(LibFlute::SourceBlock& srcblk, LibFlute::Symbol& symb, unsigned int id); + + bool calculate_partitioning(); + + bool parse_fdt_info(tinyxml2::XMLElement *file); + + bool add_fdt_info(tinyxml2::XMLElement *file); + + void *allocate_file_buffer(int min_length); + + bool extract_file(std::map blocks); + + std::map decoders; // map of source block number to decoders + + uint32_t nof_source_symbols = 0; + uint32_t nof_source_blocks = 0; + uint32_t large_source_block_length = 0; + uint32_t small_source_block_length = 0; + uint32_t nof_large_source_blocks = 0; + + unsigned int F; // object size in bytes + unsigned int Al = 4; // symbol alignment: 4 + unsigned int T; // symbol size in bytes + unsigned long W = 16*1024*1024; // target on sub block size- set default to 16 MB, to keep the number of sub-blocks, N, = 1 (you probably only need W >= 11MB to achieve this, assuming an ethernet mtu of ~1500 bytes, but we round to the nearest power of 2) + unsigned int G; // number of symbols per packet + unsigned int Z; // number of source blocks + unsigned int N; // number of sub-blocks per source block + unsigned int K; // number of symbols in a source block + unsigned int Kt; // total number of symbols + unsigned int P; // maximum payload size: e.g. 1436 for ipv4 over 802.3 + + }; +#endif + }; diff --git a/raptor b/raptor new file mode 160000 index 0000000..4ee8bf2 --- /dev/null +++ b/raptor @@ -0,0 +1 @@ +Subproject commit 4ee8bf2f154670007bde69c8cbbd2db57f938aa3 diff --git a/reset_loopback_settings b/reset_loopback_settings new file mode 100755 index 0000000..5a8ebfc --- /dev/null +++ b/reset_loopback_settings @@ -0,0 +1,14 @@ +#! /bin/bash + +t=238.1.1.95 + +echo Removing route to target address +sudo ip r del $t dev lo +echo Removing packet loss from loopback device +sudo tc qdisc del dev lo root netem loss 2% + +echo Check everything got removed +ip r get $t +tc qdisc show dev lo +echo 2 second ping check to assert packet loss is gone +ping -I lo 127.0.0.1 -i 0.002 -w 2 -q diff --git a/setup_packet_loss_on_loopback b/setup_packet_loss_on_loopback new file mode 100755 index 0000000..708117c --- /dev/null +++ b/setup_packet_loss_on_loopback @@ -0,0 +1,18 @@ +#! /bin/bash + +t=238.1.1.95 + +if [[ -z $(ip r get $t | grep 'dev lo') ]]; then + echo Routing default target address via loopback device + sudo ip r add $t dev lo +else + echo Looks like this is already routed via loopback +fi +echo Adding packet loss to loopback device +sudo tc qdisc add dev lo root netem loss 2% + +echo Check everything worked +ip r get $t +tc qdisc show dev lo +echo 2 second ping check to assert packet loss is working +ping -I lo 127.0.0.1 -i 0.002 -w 2 -q diff --git a/src/AlcPacket.cpp b/src/AlcPacket.cpp index d6e7fdc..3b959a2 100644 --- a/src/AlcPacket.cpp +++ b/src/AlcPacket.cpp @@ -17,6 +17,7 @@ #include #include #include "AlcPacket.h" +#include "spdlog/spdlog.h" LibFlute::AlcPacket::AlcPacket(char* data, size_t len) { @@ -79,10 +80,16 @@ LibFlute::AlcPacket::AlcPacket(char* data, size_t len) throw "TOI fields over 64 bits in length are not supported"; } - if (_lct_header.codepoint == 0) { - _fec_oti.encoding_id = FecScheme::CompactNoCode; - } else { - throw "Only Compact No-Code FEC is supported"; + switch (_lct_header.codepoint) { + case 0: + _fec_oti.encoding_id = FecScheme::CompactNoCode; + break; + case 1: + _fec_oti.encoding_id = FecScheme::Raptor; + break; + default: + throw "Only the Compact No-Code and Raptor FEC schemes are supported"; + break; } auto expected_header_len = 2 + @@ -110,19 +117,29 @@ LibFlute::AlcPacket::AlcPacket(char* data, size_t len) break; // ignored } case EXT_FTI: { - if (_fec_oti.encoding_id == FecScheme::CompactNoCode) { - if (hel != 4) { - throw "Invalid length for EXT_FTI header extension"; - } - _fec_oti.transfer_length = (uint64_t)(ntohs(*(uint16_t*)hdr_ptr)) << 32; - hdr_ptr += 2; - _fec_oti.transfer_length |= (uint64_t)(ntohl(*(uint32_t*)hdr_ptr)); - hdr_ptr += 4; - hdr_ptr += 2; // reserved - _fec_oti.encoding_symbol_length = ntohs(*(uint16_t*)hdr_ptr); - hdr_ptr += 2; - _fec_oti.max_source_block_length = ntohl(*(uint32_t*)hdr_ptr); - hdr_ptr += 4; + switch (_fec_oti.encoding_id) { + case FecScheme::CompactNoCode: + if (hel != 4) { + throw "Invalid length for EXT_FTI header extension for Compact No Code FEC scheme"; + } + _fec_oti.transfer_length = (uint64_t)(ntohs(*(uint16_t*)hdr_ptr)) << 32; + hdr_ptr += 2; + _fec_oti.transfer_length |= (uint64_t)(ntohl(*(uint32_t*)hdr_ptr)); + hdr_ptr += 4; + hdr_ptr += 2; // reserved + _fec_oti.encoding_symbol_length = ntohs(*(uint16_t*)hdr_ptr); + hdr_ptr += 2; + _fec_oti.max_source_block_length = ntohl(*(uint32_t*)hdr_ptr); + hdr_ptr += 4; + break; + case FecScheme::Raptor: + //TODO + spdlog::warn("Raptor FEC support in EXT_FTI header extension is still in progress"); + throw "Raptor FEC support in EXT_FTI header extension is still in progress"; + break; + default: + throw "Unsupported FEC scheme"; + break; } break; } @@ -174,6 +191,7 @@ LibFlute::AlcPacket::AlcPacket(uint16_t tsi, uint16_t toi, LibFlute::FecOti fec_ lct_header->version = 1; lct_header->half_word_flag = 1; lct_header->lct_header_len = lct_header_len; + lct_header->codepoint = (uint8_t) fec_oti.encoding_id; auto hdr_ptr = _buffer + 4; auto payload_ptr = _buffer + 4 * lct_header_len; diff --git a/src/EncodingSymbol.cpp b/src/EncodingSymbol.cpp index 5451534..553b5aa 100644 --- a/src/EncodingSymbol.cpp +++ b/src/EncodingSymbol.cpp @@ -18,6 +18,7 @@ #include #include #include +#include "spdlog/spdlog.h" #include "EncodingSymbol.h" auto LibFlute::EncodingSymbol::from_payload(char* encoded_data, size_t data_len, const FecOti& fec_oti, ContentEncoding encoding) -> std::vector @@ -30,20 +31,28 @@ auto LibFlute::EncodingSymbol::from_payload(char* encoded_data, size_t data_len, throw "Only unencoded content is supported"; } - if (fec_oti.encoding_id == FecScheme::CompactNoCode) { - source_block_number = ntohs(*(uint16_t*)encoded_data); - encoded_data += 2; - encoding_symbol_id = ntohs(*(uint16_t*)encoded_data); - encoded_data += 2; - data_len -= 4; - } else { - throw "Only compact no-code FEC is supported"; + switch (fec_oti.encoding_id) { + case FecScheme::CompactNoCode: + case FecScheme::Raptor: + source_block_number = ntohs(*(uint16_t*)encoded_data); + encoded_data += 2; + encoding_symbol_id = ntohs(*(uint16_t*)encoded_data); + encoded_data += 2; + data_len -= 4; + break; + default: + throw "Invalid FEC encoding ID. Only 2 FEC types are currently supported: compact no-code or raptor"; + break; } int nof_symbols = std::ceil((float)data_len / (float)fec_oti.encoding_symbol_length); for (int i = 0; i < nof_symbols; i++) { - if (fec_oti.encoding_id == FecScheme::CompactNoCode) { - symbols.emplace_back(encoding_symbol_id, source_block_number, encoded_data, std::min(data_len, (size_t)fec_oti.encoding_symbol_length), fec_oti.encoding_id); + switch(fec_oti.encoding_id) { + default: + case FecScheme::CompactNoCode: + case FecScheme::Raptor: + symbols.emplace_back(encoding_symbol_id, source_block_number, encoded_data, std::min(data_len, (size_t)fec_oti.encoding_symbol_length), fec_oti.encoding_id); + break; } encoded_data += fec_oti.encoding_symbol_length; encoding_symbol_id++; @@ -57,21 +66,25 @@ auto LibFlute::EncodingSymbol::to_payload(const std::vector& sym size_t len = 0; auto ptr = encoded_data; auto first_symbol = symbols.begin(); - if (fec_oti.encoding_id == FecScheme::CompactNoCode) { - *((uint16_t*)ptr) = htons(first_symbol->source_block_number()); - ptr += 2; - *((uint16_t*)ptr) = htons(first_symbol->id()); - ptr += 2; - len += 4; - } else { - throw "Only compact no-code FEC is supported"; + switch (fec_oti.encoding_id) { + case FecScheme::CompactNoCode: + case FecScheme::Raptor: + *((uint16_t*)ptr) = htons(first_symbol->source_block_number()); + ptr += 2; + *((uint16_t*)ptr) = htons(first_symbol->id()); + ptr += 2; + len += 4; + break; + default: + throw "Invalid FEC encoding ID. Only 2 FEC types are currently supported: compact no-code or raptor"; + break; } for (const auto& symbol : symbols) { if (symbol.len() <= data_len) { auto symbol_len = symbol.encode_to(ptr, data_len); data_len -= symbol_len; - encoded_data += symbol_len; + ptr += symbol_len; len += symbol_len; } } @@ -79,19 +92,30 @@ auto LibFlute::EncodingSymbol::to_payload(const std::vector& sym } auto LibFlute::EncodingSymbol::decode_to(char* buffer, size_t max_length) const -> void { - if (_fec_scheme == FecScheme::CompactNoCode) { - if (_data_len <= max_length) { - memcpy(buffer, _encoded_data, _data_len); - } + switch (_fec_scheme) { + case FecScheme::CompactNoCode: + case FecScheme::Raptor: + if (_data_len <= max_length) { + memcpy(buffer, _encoded_data, _data_len); + } + break; + default: + spdlog::warn("EncodingSymbol::decode_to() called for unknown fec scheme {}",(int)_fec_scheme); + throw "EncodingSymbol::decode_to() called for unknown fec scheme"; + break; } } auto LibFlute::EncodingSymbol::encode_to(char* buffer, size_t max_length) const -> size_t { - if (_fec_scheme == FecScheme::CompactNoCode) { - if (_data_len <= max_length) { - memcpy(buffer, _encoded_data, _data_len); - return _data_len; - } + switch (_fec_scheme) { + default: + case FecScheme::CompactNoCode: + case FecScheme::Raptor: + if (_data_len <= max_length) { + memcpy(buffer, _encoded_data, _data_len); + return _data_len; + } + break; } return 0; } diff --git a/src/File.cpp b/src/File.cpp index 6f08020..661dd32 100644 --- a/src/File.cpp +++ b/src/File.cpp @@ -23,6 +23,7 @@ #include #include #include +#include #include "base64.h" #include "spdlog/spdlog.h" @@ -33,7 +34,11 @@ LibFlute::File::File(LibFlute::FileDeliveryTable::FileEntry entry) spdlog::debug("Creating File from FileEntry"); // Allocate a data buffer spdlog::debug("Allocating buffer"); - _buffer = (char*)malloc(_meta.fec_oti.transfer_length); + if (_meta.fec_transformer){ + _buffer = (char*) _meta.fec_transformer->allocate_file_buffer(_meta.fec_oti.transfer_length); + } else { + _buffer = (char*) malloc(_meta.fec_oti.transfer_length); + } if (_buffer == nullptr) { throw "Failed to allocate file buffer"; @@ -53,7 +58,13 @@ LibFlute::File::File(uint32_t toi, size_t length, bool copy_data) { + if (!data) { + spdlog::error("File pointer is null"); + throw "Invalid file"; + } + spdlog::debug("Creating File from data"); + if (copy_data) { spdlog::debug("Allocating buffer"); _buffer = (char*)malloc(length); @@ -67,8 +78,10 @@ LibFlute::File::File(uint32_t toi, _buffer = data; } - unsigned char md5[MD5_DIGEST_LENGTH]; - MD5((const unsigned char*)data, length, md5); + unsigned char md5[EVP_MAX_MD_SIZE]; + if ( calculate_md5(data, length, md5) < 0 ){ + throw "Failed to calculate md5"; + } _meta.toi = toi; _meta.content_location = std::move(content_location); @@ -78,11 +91,27 @@ LibFlute::File::File(uint32_t toi, _meta.expires = expires; _meta.fec_oti = fec_oti; - // for no-code - if (_meta.fec_oti.encoding_id == FecScheme::CompactNoCode) { - _meta.fec_oti.transfer_length = length; - } else { - throw "Unsupported FEC scheme"; +#ifdef RAPTOR_ENABLED + RaptorFEC *r = nullptr; +#endif + + switch (_meta.fec_oti.encoding_id) { + case FecScheme::CompactNoCode: + _meta.fec_oti.transfer_length = length; + _meta.fec_transformer = 0; + break; +#ifdef RAPTOR_ENABLED + case FecScheme::Raptor: + _meta.fec_oti.transfer_length = length; + r = new RaptorFEC(length, fec_oti.encoding_symbol_length); + _meta.fec_oti.encoding_symbol_length = r->T; + _meta.fec_oti.max_source_block_length = r->K * r->T; + _meta.fec_transformer = r; + break; +#endif + default: + throw "FEC scheme not supported or not yet implemented"; + break; } calculate_partitioning(); @@ -101,22 +130,33 @@ LibFlute::File::~File() auto LibFlute::File::put_symbol( const LibFlute::EncodingSymbol& symbol ) -> void { + if(_complete) { + spdlog::debug("Not handling symbol {} , SBN {} since file is already complete",symbol.id(),symbol.source_block_number()); + return; + } if (symbol.source_block_number() > _source_blocks.size()) { throw "Source Block number too high"; } SourceBlock& source_block = _source_blocks[ symbol.source_block_number() ]; + if(source_block.complete){ + spdlog::warn("Ignoring symbol {} since block {} is already complete",symbol.id(),symbol.source_block_number()); + return; + } + if (symbol.id() > source_block.symbols.size()) { throw "Encoding Symbol ID too high"; } - SourceBlock::Symbol& target_symbol = source_block.symbols[symbol.id()]; + LibFlute::Symbol& target_symbol = source_block.symbols[symbol.id()]; if (!target_symbol.complete) { symbol.decode_to(target_symbol.data, target_symbol.length); target_symbol.complete = true; - + if (_meta.fec_transformer) { + _meta.fec_transformer->process_symbol(source_block,target_symbol,symbol.id()); + } check_source_block_completion(source_block); check_file_completion(); } @@ -125,6 +165,10 @@ auto LibFlute::File::put_symbol( const LibFlute::EncodingSymbol& symbol ) -> voi auto LibFlute::File::check_source_block_completion( SourceBlock& block ) -> void { + if (_meta.fec_transformer) { + block.complete = _meta.fec_transformer->check_source_block_completion(block); + return; + } block.complete = std::all_of(block.symbols.begin(), block.symbols.end(), [](const auto& symbol){ return symbol.second.complete; }); } @@ -133,13 +177,18 @@ auto LibFlute::File::check_file_completion() -> void _complete = std::all_of(_source_blocks.begin(), _source_blocks.end(), [](const auto& block){ return block.second.complete; }); if (_complete && !_meta.content_md5.empty()) { + + if(_meta.fec_transformer){ + _meta.fec_transformer->extract_file(_source_blocks); + } + //check MD5 sum - unsigned char md5[MD5_DIGEST_LENGTH]; - MD5((const unsigned char*)buffer(), length(), md5); + unsigned char md5[EVP_MAX_MD_SIZE]; + calculate_md5(buffer(),length(),md5); auto content_md5 = base64_decode(_meta.content_md5); if (memcmp(md5, content_md5.c_str(), MD5_DIGEST_LENGTH) != 0) { - spdlog::debug("MD5 mismatch for TOI {}, discarding", _meta.toi); + spdlog::error("MD5 mismatch for TOI {}, discarding", _meta.toi); // MD5 mismatch, try again for (auto& block : _source_blocks) { @@ -147,7 +196,7 @@ auto LibFlute::File::check_file_completion() -> void symbol.second.complete = false; } block.second.complete = false; - } + } _complete = false; } } @@ -155,6 +204,14 @@ auto LibFlute::File::check_file_completion() -> void auto LibFlute::File::calculate_partitioning() -> void { + if (_meta.fec_transformer && _meta.fec_transformer->calculate_partitioning()){ + _nof_source_symbols = _meta.fec_transformer->nof_source_symbols; + _nof_source_blocks = _meta.fec_transformer->nof_source_blocks; + _large_source_block_length = _meta.fec_transformer->large_source_block_length; + _small_source_block_length = _meta.fec_transformer->small_source_block_length; + _nof_large_source_blocks = _meta.fec_transformer->nof_large_source_blocks; + return; + } // Calculate source block partitioning (RFC5052 9.1) _nof_source_symbols = ceil((double)_meta.fec_oti.transfer_length / (double)_meta.fec_oti.encoding_symbol_length); _nof_source_blocks = ceil((double)_nof_source_symbols / (double)_meta.fec_oti.max_source_block_length); @@ -166,11 +223,22 @@ auto LibFlute::File::calculate_partitioning() -> void auto LibFlute::File::create_blocks() -> void { // Create the required source blocks and encoding symbols + + if (_meta.fec_transformer){ + int bytes_read = 0; + _source_blocks = _meta.fec_transformer->create_blocks(_buffer, &bytes_read); + if (_source_blocks.size() <= 0) { + spdlog::error("FEC Transformer failed to create source blocks"); + throw "FEC Transformer failed to create source blocks"; + } + return; + } + auto buffer_ptr = _buffer; size_t remaining_size = _meta.fec_oti.transfer_length; auto number = 0; while (remaining_size > 0) { - SourceBlock block; + LibFlute::SourceBlock block; auto symbol_id = 0; auto block_length = ( number < _nof_large_source_blocks ) ? _large_source_block_length : _small_source_block_length; @@ -178,7 +246,7 @@ auto LibFlute::File::create_blocks() -> void auto symbol_length = std::min(remaining_size, (size_t)_meta.fec_oti.encoding_symbol_length); assert(buffer_ptr + symbol_length <= _buffer + _meta.fec_oti.transfer_length); - SourceBlock::Symbol symbol{.data = buffer_ptr, .length = symbol_length, .complete = false}; + LibFlute::Symbol symbol{ .data = buffer_ptr, .length = symbol_length, .complete = false}; block.symbols[ symbol_id++ ] = symbol; remaining_size -= symbol_length; @@ -193,10 +261,10 @@ auto LibFlute::File::create_blocks() -> void auto LibFlute::File::get_next_symbols(size_t max_size) -> std::vector { auto block = _source_blocks.begin(); - int nof_symbols = std::ceil((float)(max_size - 4) / (float)_meta.fec_oti.encoding_symbol_length); + int nof_symbols = std::floor((float)max_size / (float)_meta.fec_oti.encoding_symbol_length); auto cnt = 0; std::vector symbols; - + spdlog::debug("Attempting to queue {} symbols",nof_symbols); for (auto& block : _source_blocks) { if (cnt >= nof_symbols) break; @@ -231,3 +299,30 @@ auto LibFlute::File::mark_completed(const std::vector& symbols, } } } + + int LibFlute::calculate_md5(char *input, int length, unsigned char *result) +{ + // simple implementation based on openssl docs (https://www.openssl.org/docs/man3.0/man3/EVP_DigestInit_ex.html) + if (!input || ! length) { + spdlog::error("MD5 called with invalid input"); + return -1; + } + spdlog::debug("MD5 calculation called for input length {}", length); + + EVP_MD_CTX* context = EVP_MD_CTX_new(); + const EVP_MD* md = EVP_md5(); + unsigned int md_len; + + EVP_DigestInit_ex(context, md, NULL); + EVP_DigestUpdate(context, input, length); + EVP_DigestFinal_ex(context, result, &md_len); + EVP_MD_CTX_free(context); + + char buf [EVP_MAX_MD_SIZE * 2] = {}; + for (unsigned int i = 0 ; i < md_len ; ++i){ + sprintf(&buf[i*2], "%02x", result[i]); + } + spdlog::debug("MD5 Digest is {}", buf); + + return md_len; +} diff --git a/src/FileDeliveryTable.cpp b/src/FileDeliveryTable.cpp index ec421d5..5887805 100644 --- a/src/FileDeliveryTable.cpp +++ b/src/FileDeliveryTable.cpp @@ -14,16 +14,32 @@ // under the License. // #include "FileDeliveryTable.h" -#include "tinyxml2.h" +#include "tinyxml2.h" #include #include #include "spdlog/spdlog.h" - LibFlute::FileDeliveryTable::FileDeliveryTable(uint32_t instance_id, FecOti fec_oti) : _instance_id( instance_id ) , _global_fec_oti( fec_oti ) { + switch (fec_oti.encoding_id){ +#ifdef RAPTOR_ENABLED + case FecScheme::Raptor: + _fdt_fec_transformer = new RaptorFEC(); + break; +#endif + default: + _fdt_fec_transformer = 0; + break; + } +} + +LibFlute::FileDeliveryTable::~FileDeliveryTable() { + if (_fdt_fec_transformer) { + delete _fdt_fec_transformer; + _fdt_fec_transformer = 0; + } } LibFlute::FileDeliveryTable::FileDeliveryTable(uint32_t instance_id, char* buffer, size_t len) @@ -99,6 +115,20 @@ LibFlute::FileDeliveryTable::FileDeliveryTable(uint32_t instance_id, char* buffe encoding_id = strtoul(val, nullptr, 0); } + FecTransformer *fec_transformer = 0; + + switch (encoding_id){ +#ifdef RAPTOR_ENABLED + case (int) FecScheme::Raptor: + fec_transformer = new RaptorFEC(); // corresponding delete calls in Receiver.cpp and destuctor function + spdlog::debug("Received FDT entry for a raptor encoded file"); + break; +#endif + default: + break; + } + + auto max_source_block_length = def_fec_max_source_block_length; val = file->Attribute("FEC-OTI-Maximum-Source-Block-Length"); if (val != nullptr) { @@ -110,6 +140,10 @@ LibFlute::FileDeliveryTable::FileDeliveryTable(uint32_t instance_id, char* buffe if (val != nullptr) { encoding_symbol_length = strtoul(val, nullptr, 0); } + + if (fec_transformer && !fec_transformer->parse_fdt_info(file)) { + throw "Failed to parse fdt info for specific FEC data"; + } uint32_t expires = 0; auto cc = file->FirstChildElement("mbms2007:Cache-Control"); if (cc) { @@ -128,18 +162,19 @@ LibFlute::FileDeliveryTable::FileDeliveryTable(uint32_t instance_id, char* buffe FileEntry fe{ toi, - std::string(content_location), - content_length, - std::string(content_md5), - std::string(content_type), - expires, - fec_oti + std::string(content_location), + content_length, + std::string(content_md5), + std::string(content_type), + expires, + fec_oti, + fec_transformer }; _file_entries.push_back(fe); } } -auto LibFlute::FileDeliveryTable::add(const FileEntry& fe) -> void +auto LibFlute::FileDeliveryTable::add(FileEntry& fe) -> void { _instance_id++; _file_entries.push_back(fe); @@ -147,8 +182,12 @@ auto LibFlute::FileDeliveryTable::add(const FileEntry& fe) -> void auto LibFlute::FileDeliveryTable::remove(uint32_t toi) -> void { - for (auto it = _file_entries.cbegin(); it != _file_entries.cend();) { + for (auto it = _file_entries.begin(); it != _file_entries.end();) { if (it->toi == toi) { + if (it->fec_transformer) { + delete it->fec_transformer; + it->fec_transformer = 0; + } it = _file_entries.erase(it); } else { ++it; @@ -176,6 +215,9 @@ auto LibFlute::FileDeliveryTable::to_string() const -> std::string { f->SetAttribute("Transfer-Length", (unsigned)file.fec_oti.transfer_length); f->SetAttribute("Content-MD5", file.content_md5.c_str()); f->SetAttribute("Content-Type", file.content_type.c_str()); + if(file.fec_transformer) { + file.fec_transformer->add_fdt_info(f); + } auto cc = doc.NewElement("mbms2007:Cache-Control"); auto exp = doc.NewElement("mbms2007:Expires"); exp->SetText(std::to_string(file.expires).c_str()); diff --git a/src/IpSec.cpp b/src/IpSec.cpp index de500ab..167311e 100644 --- a/src/IpSec.cpp +++ b/src/IpSec.cpp @@ -95,8 +95,8 @@ namespace LibFlute::IpSec { xsinfo.mode = XFRM_MODE_TRANSPORT; struct { - struct xfrm_algo xa; char buf[512]; + struct xfrm_algo xa; } algo = {}; std::vector binary_key; diff --git a/src/RaptorFEC.cpp b/src/RaptorFEC.cpp new file mode 100644 index 0000000..ea3b8be --- /dev/null +++ b/src/RaptorFEC.cpp @@ -0,0 +1,281 @@ +#include "RaptorFEC.h" + +LibFlute::RaptorFEC::RaptorFEC(unsigned int transfer_length, unsigned int max_payload) + : F(transfer_length) + , P(max_payload) +{ + double g = fmin( fmin(ceil((double)P*1024/(double)F), (double)P/(double)Al), 10.0f); + spdlog::debug("double g = fmin( fmin(ceil((double)P*1024/F), (double)P/(double)Al), 10.0f"); + spdlog::debug("G = {} = min( ceil({}*1024/{}), {}/{}, 10.0f)",g,P,F,P,Al); + G = (unsigned int) g; + + T = (unsigned int) floor((double)P/(double)(Al*g)) * Al; + spdlog::debug("T = (unsigned int) floor((double)P/(double)(Al*g)) * Al"); + spdlog::debug("T = {} = floor({}/({}*{})) * {}",T,P,Al,g,Al); + + if (T % Al){ + spdlog::error(" Symbol size T should be a multiple of symbol alignment parameter Al"); + throw "Symbol size doesnt align"; + } + + Kt = ceil((double)F/(double)T); // total symbols + spdlog::debug("double Kt = ceil((double)F/(double)T)"); + spdlog::debug("Kt = {} = ceil({}/{})",Kt,F,T); + + if (Kt < 4){ + spdlog::error("Input file is too small, it must be a minimum of 4 Symbols"); + throw "Input is less than 4 symbols"; + } + + Z = (unsigned int) ceil((double)Kt/(double)8192); + spdlog::debug("Z = (unsigned int) ceil(Kt/8192)"); + spdlog::debug("Z = {} = ceil({}/8192)",Z,Kt); + + K = (Kt > 8192) ? 8192 : (unsigned int) Kt; // symbols per source block + spdlog::debug("K = {}",K); + + N = fmin( ceil( ceil((double)Kt/(double)Z) * (double)T/(double)W ) , (double)T/(double)Al ); + spdlog::debug("N = fmin( ceil( ceil(Kt/(double)Z) * (double)T/(double)W ) , (double)T/(double)Al )"); + spdlog::debug("N = {} = min( ceil( ceil({}/{}) * {}/{} ) , {}/{} )",N,Kt,Z,T,W,T,Al); + + + // Set the values that the File class may need: + nof_source_symbols = (unsigned int) Kt; + nof_source_blocks = Z; + + small_source_block_length = (Z * K - nof_source_symbols) * T; // = (number of symbols in the final (small) source block, if nof_source_symbols isnt cleanly divisible by Z * K ) * symbol size + + // open question as to how we define "large source blocks" because either none of the remaining "regular" blocks are large, or all of them are, since raptor has a fixed block size + + /* + nof_large_source_blocks = K - (small_source_block_length != 0); // if we define a "large" source block as a normal one then its just the nof "regular" source blocks minus the nof small ones (which is either one or zero) + large_source_block_length = K * T; + */ + + nof_large_source_blocks = 0; //for now argue that there are no "large" blocks, only regular and small ones + large_source_block_length = 0; +} + +LibFlute::RaptorFEC::~RaptorFEC() { + for(auto iter = decoders.begin(); iter != decoders.end(); iter++){ + free_decoder_context(iter->second); + } +} + +bool LibFlute::RaptorFEC::calculate_partitioning() { + return true; +} + +void LibFlute::RaptorFEC::extract_finished_block(LibFlute::SourceBlock& srcblk, struct dec_context *dc) { + if(!dc){ + return; + } + for(auto iter = srcblk.symbols.begin(); iter != srcblk.symbols.end(); iter++) { + memcpy(iter->second.data,dc->pp[iter->first],T); // overwrite the encoded symbol with the source data; + } + spdlog::debug("Raptor Decoder: finished decoding source block {}",srcblk.id); +} + +void *LibFlute::RaptorFEC::allocate_file_buffer(int min_length){ + assert(min_length <= Z*target_K(0)*T); // min length should be exactly Z*K*T, so including repair symbols we should have a larger value + return malloc(Z*target_K(0)*T); +} + +bool LibFlute::RaptorFEC::process_symbol(LibFlute::SourceBlock& srcblk, LibFlute::Symbol& symbol, unsigned int id) { + assert(symbol.length == T); // symbol.length should always be the symbol size, T + struct dec_context *dc = decoders[srcblk.id]; + if (!dc) { + int nsymbs = (srcblk.id < Z - 1) ? K : Kt - K*(Z-1); // the last block will usually be smaller than the normal block size, unless the file size is an exact multiple + int blocksize = (srcblk.id < Z - 1) ? K*T : F - K*T*(Z-1); + spdlog::debug("Preparing decoder context with {} blocks and blocksize {}", nsymbs, blocksize); + struct enc_context *sc = create_encoder_context(NULL, nsymbs, T, blocksize, srcblk.id); + //struct enc_context *sc = create_encoder_context(NULL, K, T, K*T, srcblk.id); // the "length" will always be K*T from the decoders perspective + dc = create_decoder_context(sc); + decoders[srcblk.id] = dc; + } + if (dc->finished){ + spdlog::warn("Skipped processing of symbol for finished block : SBN {}, ESI {}",srcblk.id,id); + return true; + } + struct LT_packet * pkt = (struct LT_packet *) calloc(1, sizeof(struct LT_packet)); + pkt->id = id; + pkt->syms = (GF_ELEMENT *) malloc(symbol.length * sizeof(char)); + memcpy(pkt->syms, symbol.data, symbol.length * sizeof(char)); + + process_LT_packet(dc, pkt); + free_LT_packet(pkt); + return true; +} + +bool LibFlute::RaptorFEC::extract_file(std::map blocks) { + for(auto iter = blocks.begin(); iter != blocks.end(); iter++) { + extract_finished_block(iter->second,decoders[iter->second.id]); + } + return true; +} + +bool LibFlute::RaptorFEC::check_source_block_completion(LibFlute::SourceBlock& srcblk) { + if (is_encoder) { + // check source block completion for the Encoder + bool complete = std::all_of(srcblk.symbols.begin(), srcblk.symbols.end(), [](const auto& symbol){ return symbol.second.complete; }); + + if(complete) + std::for_each(srcblk.symbols.begin(), srcblk.symbols.end(), [](const auto& symbol){ delete[] symbol.second.data; }); + + return complete; + } + // else case- we are the Decoder + + if(!srcblk.symbols.size()){ + spdlog::warn("Empty source block (size 0) SBN {}",srcblk.id); + return false; + } + + struct dec_context *dc = decoders[srcblk.id]; + if (!dc) { + spdlog::error("Couldnt find raptor decoder for source block {}",srcblk.id); + return false; + } + return dc->finished; +} + +unsigned int LibFlute::RaptorFEC::target_K(int blockno) { + // always send at least one repair symbol + if (blockno < Z-1) { + int target = K * surplus_packet_ratio; + return (target > K) ? target : K + 1; + } + // last block gets special treatment + int remaining_symbs = Kt - K*(Z-1); + return (remaining_symbs + 1 > remaining_symbs*surplus_packet_ratio) ? remaining_symbs + 1 : remaining_symbs * surplus_packet_ratio; +} + +LibFlute::Symbol LibFlute::RaptorFEC::translate_symbol(struct enc_context *encoder_ctx){ + struct LT_packet *lt_packet = encode_LT_packet(encoder_ctx); + struct Symbol symbol { new char[T], T}; + + memcpy(symbol.data, lt_packet->syms, T); + + free_LT_packet(lt_packet); + return symbol; +} + +LibFlute::SourceBlock LibFlute::RaptorFEC::create_block(char *buffer, int *bytes_read, int blockid) { + struct SourceBlock source_block; + source_block.id = blockid; + int seed = blockid; + int nsymbs = (blockid < Z - 1) ? K : Kt - K*(Z-1); + int blocksize = (blockid < Z - 1) ? K*T : F - K*T*(Z-1); // the last block will usually be smaller than the normal block size, unless the file size is an exact multiple + struct enc_context *encoder_ctx = create_encoder_context((unsigned char *)buffer, nsymbs , T, blocksize, seed); +// struct enc_context *encoder_ctx = create_encoder_context((unsigned char *)buffer, K, T, blocksize, seed); + if (!encoder_ctx) { + spdlog::error("Error creating encoder context"); + throw "Error creating encoder context"; + } + unsigned int symbols_to_read = target_K(blockid); + for(unsigned int symbol_id = 0; symbol_id < symbols_to_read; symbol_id++) { + source_block.symbols[symbol_id] = translate_symbol(encoder_ctx); + } + *bytes_read += blocksize; + spdlog::debug("Creating encoder context with {} blocks and blocksize {}", nsymbs, blocksize); + + free_encoder_context(encoder_ctx); + return source_block; +} + + +std::map LibFlute::RaptorFEC::create_blocks(char *buffer, int *bytes_read) { + if(!bytes_read) + throw std::invalid_argument("bytes_read pointer shouldn't be null"); + if(N != 1) + throw std::invalid_argument("Currently the encoding only supports 1 sub-block per block"); + + std::map block_map; + *bytes_read = 0; + + for(unsigned int src_blocks = 0; src_blocks < Z; src_blocks++) { + if(!is_encoder) { + LibFlute::SourceBlock block; + unsigned int symbols_to_read = target_K(src_blocks); + for (int i = 0; i < symbols_to_read; i++) { + block.symbols[i] = Symbol {.data = buffer + src_blocks*K*T + T*i, .length = T, .complete = false}; + } + block.id = src_blocks; + block_map[src_blocks] = block; + } else { + block_map[src_blocks] = create_block(&buffer[*bytes_read], bytes_read, src_blocks); + } + } + return block_map; +} + + +bool LibFlute::RaptorFEC::parse_fdt_info(tinyxml2::XMLElement *file) { + is_encoder = false; + + const char* val = 0; + val = file->Attribute("Transfer-Length"); + if (val != nullptr) { + F = strtoul(val, nullptr, 0); + } else { + throw "Required field \"Transfer-Length\" is missing for an object in the FDT"; + } + + val = file->Attribute("FEC-OTI-Number-Of-Source-Blocks"); + if (val != nullptr) { + Z = strtoul(val, nullptr, 0); + } else { + throw "Required field \"FEC-OTI-Number-Of-Source-Blocks\" is missing for an object in the FDT"; + } + + val = file->Attribute("FEC-OTI-Number-Of-Sub-Blocks"); + if (val != nullptr) { + N = strtoul(val, nullptr, 0); + } else { + throw "Required field \"FEC-OTI-Number-Of-Sub-Blocks\" is missing for an object in the FDT"; + } + + val = file->Attribute("FEC-OTI-Encoding-Symbol-Length"); + if (val != nullptr) { + T = strtoul(val, nullptr, 0); + } else { + throw "Required field \"FEC-OTI-Encoding-Symbol-Length\" is missing for an object in the FDT"; + } + + val = file->Attribute("FEC-OTI-Symbol-Alignment-Parameter"); + if (val != nullptr) { + Al = strtoul(val, nullptr, 0); + } else { + throw "Required field \"FEC-OTI-Symbol-Alignment-Parameter\" is missing for an object in the FDT"; + } + + if (T % Al) { + throw "Symbol size T is not a multiple of Al. Invalid configuration from sender"; + } + + // Set the values that are missing that we or the File class may need, follows the same logic as in calculate_partitioning() + nof_source_symbols = ceil((double)F / (double)T); + K = (nof_source_symbols > 8192) ? 8192 : nof_source_symbols; + Kt = ceil((double)F/(double)T); // total symbols + + nof_source_blocks = Z; + small_source_block_length = (Z * K - nof_source_symbols) * T; + nof_large_source_blocks = 0; + large_source_block_length = 0; + + return true; +} + +bool LibFlute::RaptorFEC::add_fdt_info(tinyxml2::XMLElement *file) { + //TODO: do we need to set transfer length too? I already gets set earlier. Does it change based on FecScheme? + file->SetAttribute("FEC-OTI-FEC-Encoding-ID", (unsigned) FecScheme::Raptor); + file->SetAttribute("FEC-OTI-Encoding-Symbol-Length", T); + file->SetAttribute("FEC-OTI-Symbol-Alignment-Parameter", Al); + file->SetAttribute("FEC-OTI-Number-Of-Source-Blocks", Z); + file->SetAttribute("FEC-OTI-Number-Of-Sub-Blocks", N); + file->SetAttribute("FEC-OTI-Symbol-Alignment-Parameter", Al); + + is_encoder = true; + + return true; +} diff --git a/src/Receiver.cpp b/src/Receiver.cpp index 6279325..4152dfa 100644 --- a/src/Receiver.cpp +++ b/src/Receiver.cpp @@ -22,8 +22,7 @@ LibFlute::Receiver::Receiver ( const std::string& iface, const std::string& address, - short port, uint64_t tsi, - boost::asio::io_service& io_service) + short port, uint64_t tsi, boost::asio::io_service& io_service) : _socket(io_service) , _tsi(tsi) , _mcast_address(address) @@ -56,7 +55,12 @@ auto LibFlute::Receiver::enable_ipsec(uint32_t spi, const std::string& key) -> v auto LibFlute::Receiver::handle_receive_from(const boost::system::error_code& error, size_t bytes_recvd) -> void { - if (!_running) return; + if (!_running) { +#ifdef SIMULATED_PKT_LOSS + spdlog::warn("Stopping reception: total packets dropped {}", packets_dropped); +#endif // SIMULATED_PKT_LOSS + return; + } if (!error) { @@ -73,7 +77,7 @@ auto LibFlute::Receiver::handle_receive_from(const boost::system::error_code& er if (alc.toi() == 0 && (!_fdt || _fdt->instance_id() != alc.fdt_instance_id())) { if (_files.find(alc.toi()) == _files.end()) { - FileDeliveryTable::FileEntry fe{0, "", static_cast(alc.fec_oti().transfer_length), "", "", 0, alc.fec_oti()}; + FileDeliveryTable::FileEntry fe{0, "", static_cast(alc.fec_oti().transfer_length), "", "", 0, alc.fec_oti(), 0}; _files.emplace(alc.toi(), std::make_shared(fe)); } } @@ -86,17 +90,22 @@ auto LibFlute::Receiver::handle_receive_from(const boost::system::error_code& er alc.content_encoding()); for (const auto& symbol : encoding_symbols) { + spdlog::debug("received TOI {} SBN {} ID {}", alc.toi(), symbol.source_block_number(), symbol.id() ); - _files[alc.toi()]->put_symbol(symbol); + _files[alc.toi()]->put_symbol(symbol); } auto file = _files[alc.toi()].get(); if (_files[alc.toi()]->complete()) { - for (auto it = _files.cbegin(); it != _files.cend();) + for (auto it = _files.begin(); it != _files.end();) { if (it->second.get() != file && it->second->meta().content_location == file->meta().content_location) { spdlog::debug("Replacing file with TOI {}", it->first); + if (it->second.get()->meta().fec_transformer){ + delete it->second.get()->meta().fec_transformer; + it->second.get()->meta().fec_transformer = 0; + } it = _files.erase(it); } else @@ -108,6 +117,10 @@ auto LibFlute::Receiver::handle_receive_from(const boost::system::error_code& er spdlog::debug("File with TOI {} completed", alc.toi()); if (alc.toi() != 0 && _completion_cb) { _completion_cb(_files[alc.toi()]); + if (_files[alc.toi()]->meta().fec_transformer){ + delete _files[alc.toi()]->meta().fec_transformer; + _files[alc.toi()]->meta().fec_transformer = 0; + } _files.erase(alc.toi()); } @@ -161,6 +174,10 @@ auto LibFlute::Receiver::remove_expired_files(unsigned max_age) -> void { auto age = time(nullptr) - it->second->received_at(); if ( it->second->meta().content_location != "bootstrap.multipart" && age > max_age) { + if (it->second.get()->meta().fec_transformer){ + delete it->second.get()->meta().fec_transformer; + it->second.get()->meta().fec_transformer = 0; + } it = _files.erase(it); } else { ++it; @@ -174,6 +191,10 @@ auto LibFlute::Receiver::remove_file_with_content_location(const std::string& cl for (auto it = _files.cbegin(); it != _files.cend();) { if ( it->second->meta().content_location == cl) { + if (it->second.get()->meta().fec_transformer){ + delete it->second.get()->meta().fec_transformer; + it->second.get()->meta().fec_transformer = 0; + } it = _files.erase(it); } else { ++it; diff --git a/src/Transmitter.cpp b/src/Transmitter.cpp index 6d4d68d..bca3cb7 100644 --- a/src/Transmitter.cpp +++ b/src/Transmitter.cpp @@ -21,9 +21,9 @@ #include "spdlog/spdlog.h" #include "Transmitter.h" #include "IpSec.h" -LibFlute::Transmitter::Transmitter ( const std::string& address, - short port, uint64_t tsi, unsigned short mtu, uint32_t rate_limit, - boost::asio::io_service& io_service) +LibFlute::Transmitter::Transmitter ( const std::string& address, short port, +uint64_t tsi, unsigned short mtu, uint32_t rate_limit, FecScheme fec_scheme, +boost::asio::io_service& io_service) : _endpoint(boost::asio::ip::address::from_string(address), port) , _socket(io_service, _endpoint.protocol()) , _fdt_timer(io_service) @@ -33,18 +33,29 @@ LibFlute::Transmitter::Transmitter ( const std::string& address, , _mtu(mtu) , _rate_limit(rate_limit) , _mcast_address(address) + ,_fec_scheme(fec_scheme) { _max_payload = mtu - - 20 - // IPv4 header + ( _endpoint.address().is_v6() ? 40 : 20) - // IP header 8 - // UDP header 32 - // ALC Header with EXT_FDT and EXT_FTI - 4; // SBN and ESI for compact no-code FEC + 4; // SBN and ESI for compact no-code or raptor FEC uint32_t max_source_block_length = 64; + switch(_fec_scheme) { + case FecScheme::Raptor: + if (_max_payload % 4) { + _max_payload -= (_max_payload % 4); // must be divisible by Al = 4 + } + break; + default: + break; + } + _socket.set_option(boost::asio::ip::multicast::enable_loopback(true)); _socket.set_option(boost::asio::ip::udp::socket::reuse_address(true)); - _fec_oti = FecOti{FecScheme::CompactNoCode, 0, _max_payload, max_source_block_length}; + _fec_oti = FecOti{_fec_scheme, 0, _max_payload, max_source_block_length}; _fdt = std::make_unique(1, _fec_oti); _fdt_timer.expires_from_now(boost::posix_time::seconds(_fdt_repeat_interval)); @@ -75,9 +86,17 @@ auto LibFlute::Transmitter::seconds_since_epoch() -> uint64_t auto LibFlute::Transmitter::send_fdt() -> void { _fdt->set_expires(seconds_since_epoch() + _fdt_repeat_interval * 2); auto fdt = _fdt->to_string(); + auto fdt_fec_oti = _fec_oti; + fdt_fec_oti.encoding_id = FecScheme::CompactNoCode; // always send the FDT in "plaintext" + fdt_fec_oti.encoding_symbol_length = _mtu - + 20 - // IPv4 header + 8 - // UDP header + 32 - // ALC Header with EXT_FDT and EXT_FTI + 4; // SBN and ESI for compact no-code FEC + fdt_fec_oti.max_source_block_length = 64; auto file = std::make_shared( 0, - _fec_oti, + fdt_fec_oti, "", "", seconds_since_epoch() + _fdt_repeat_interval * 2, @@ -96,18 +115,24 @@ auto LibFlute::Transmitter::send( size_t length) -> uint16_t { auto toi = _toi; + std::shared_ptr file; + try { + file = std::make_shared( + toi, + _fec_oti, + content_location, + content_type, + expires, + data, + length); + } catch (const char *e) { + spdlog::error("Failed to create File object for file {} : {}", content_location, e); + return -1; + } + _toi++; if (_toi == 0) _toi = 1; // clamp to >= 1 in case it wraps - auto file = std::make_shared( - toi, - _fec_oti, - content_location, - content_type, - expires, - data, - length); - _fdt->add(file->meta()); send_fdt(); _files.insert({toi, file}); @@ -151,6 +176,7 @@ auto LibFlute::Transmitter::send_next_packet() -> void } auto packet = std::make_shared(_tsi, file->meta().toi, file->meta().fec_oti, symbols, _max_payload, file->fdt_instance_id()); bytes_queued += packet->size(); + spdlog::debug("Queued ALC packet of {} bytes, containing {} symbols, for TOI {} , for transmission", packet->size(), symbols.size(), file->meta().toi ); _socket.async_send_to( boost::asio::buffer(packet->data(), packet->size()), _endpoint, @@ -159,7 +185,7 @@ auto LibFlute::Transmitter::send_next_packet() -> void std::size_t bytes_transferred) { if (error) { - spdlog::debug("sent_to error: {}", error.message()); + spdlog::debug("send_to error: {}", error.message()); } else { file->mark_completed(symbols, !error); if (file->complete()) {