diff --git a/cpp/common/include/OdinDataDefaults.h b/cpp/common/include/OdinDataDefaults.h new file mode 100644 index 000000000..17f1e6ec3 --- /dev/null +++ b/cpp/common/include/OdinDataDefaults.h @@ -0,0 +1,31 @@ +/* + * OdinDataDefaults.h + * + * Created on: 07/07/2023 + * Author: Gary Yendell + */ + +#ifndef ODINDATADEFAULTS_H_ +#define ODINDATADEFAULTS_H_ + +#include +#include +#include + +namespace OdinData +{ + +namespace Defaults +{ + +const unsigned int default_io_threads = 1; +const std::string default_frame_ready_endpoint = "tcp://127.0.0.1:5001"; +const std::string default_frame_release_endpoint = "tcp://127.0.0.1:5002"; +const std::string default_json_config_file = ""; +const std::string default_shared_buffer_name = "OdinDataBuffer"; + +} // namespace Defaults + +} // namespace OdinData + +#endif // ODINDATADEFAULTS_H_ diff --git a/cpp/config/CMakeLists.txt b/cpp/config/CMakeLists.txt index 588954f36..b7cf500d0 100644 --- a/cpp/config/CMakeLists.txt +++ b/cpp/config/CMakeLists.txt @@ -1,16 +1,6 @@ set(TEST_CONFIGS fp_log4cxx.xml fr_log4cxx.xml - fr_test.config - fp_test.config - fr_test_osx.config - fp_py_test.config - fp_py_test_osx.config - fp_py_test_excalibur.config - fr_excalibur1.config - fr_excalibur2.config - fp_excalibur1.config - fp_excalibur2.config ) foreach(test_config ${TEST_CONFIGS}) diff --git a/cpp/config/README.md b/cpp/config/README.md deleted file mode 100644 index 5c46fcb7a..000000000 --- a/cpp/config/README.md +++ /dev/null @@ -1,20 +0,0 @@ -Configurations -============== - -This dir contain a number of configuration files to be used by frameReceiver and file writer. - -Logging Configuration ---------------------- - -Logging of various messages from the software applications is all done using log4cxx (a C++ port of the log4j 1.x library). This allow using simple configuration files for all each application. - -A sensible default configuration is provided, but it can be adjusted to be more or less verbose, send messages to different files, etc etc. For details of the XML configuration format see: https://wiki.apache.org/logging-log4j/Log4jXmlFormat - -The logging configuration will by default write messages to stdout - and the following files will also be produced. Each run will overwrite the previous log file: - -Application | Config | Log type | Log file | Log level ---------------|----------------|----------------------|---------------------------------------|----------- -frameReceiver | fr_log4cxx.xml | Application messages | /tmp/frameReceiver.log | DEBUG -frameReceiver | fr_log4cxx.xml | UDP packet info | /tmp/tmp/frameReceiver_packetDump.log | INFO -filewriter | fw_log4cxx.xml | Application messages | /tmp/fileWriter.log | DEBUG - diff --git a/cpp/config/fp_excalibur1.config b/cpp/config/fp_excalibur1.config deleted file mode 100644 index b23e9706c..000000000 --- a/cpp/config/fp_excalibur1.config +++ /dev/null @@ -1,5 +0,0 @@ -logconfig=../config/fw_log4cxx.xml -ready=tcp://127.0.0.1:5001 -release=tcp://127.0.0.1:5002 -ctrl=tcp://127.0.0.1:5004 -sharedbuf=FrameReceiverBuffer1 \ No newline at end of file diff --git a/cpp/config/fp_excalibur2.config b/cpp/config/fp_excalibur2.config deleted file mode 100644 index 9c1597c51..000000000 --- a/cpp/config/fp_excalibur2.config +++ /dev/null @@ -1,5 +0,0 @@ -logconfig=../config/fw_log4cxx.xml -ready=tcp://127.0.0.1:5011 -release=tcp://127.0.0.1:5012 -ctrl=tcp://127.0.0.1:5014 -sharedbuf=FrameReceiverBuffer2 \ No newline at end of file diff --git a/cpp/config/fp_py_test.config b/cpp/config/fp_py_test.config deleted file mode 100644 index 6f19c0b88..000000000 --- a/cpp/config/fp_py_test.config +++ /dev/null @@ -1,9 +0,0 @@ -[FrameProcessor] -ctrl_endpoint=tcp://127.0.0.1:5000 -sharedbuf=FrameReceiverBuffer -bypass_mode=0 -packet_state=0 -boost_mmap_mode=0 -sensortype=percivalemulator - - diff --git a/cpp/config/fp_py_test_excalibur.config b/cpp/config/fp_py_test_excalibur.config deleted file mode 100644 index df6d28027..000000000 --- a/cpp/config/fp_py_test_excalibur.config +++ /dev/null @@ -1,8 +0,0 @@ -[FrameProcessor] -ctrl_endpoint=tcp://127.0.0.1:5000 -sharedbuf=FrameReceiverBuffer -bypass_mode=0 -packet_state=0 -boost_mmap_mode=0 -sensortype=excalibur - diff --git a/cpp/config/fp_py_test_osx.config b/cpp/config/fp_py_test_osx.config deleted file mode 100644 index f31aec653..000000000 --- a/cpp/config/fp_py_test_osx.config +++ /dev/null @@ -1,6 +0,0 @@ -[FrameProcessor] -ctrl_endpoint=tcp://127.0.0.1:5000 -sharedbuf=FrameReceiverBuffer -bypass_mode=0 -packet_state=0 -boost_mmap_mode=True diff --git a/cpp/config/fp_test.config b/cpp/config/fp_test.config deleted file mode 100644 index 8dcbf83cd..000000000 --- a/cpp/config/fp_test.config +++ /dev/null @@ -1,9 +0,0 @@ -logconfig=../config/fp_log4cxx.xml -no-client=true -detector=excalibur -path=../../excalibur-detector/data/build/lib -datasets=data -dtype=1 -dims=256 -dims=2048 -acqid=0 \ No newline at end of file diff --git a/cpp/config/fr_excalibur1.config b/cpp/config/fr_excalibur1.config deleted file mode 100644 index 9b1bcfdb4..000000000 --- a/cpp/config/fr_excalibur1.config +++ /dev/null @@ -1,11 +0,0 @@ -logconfig=../config/fr_log4cxx.xml -sensortype=Excalibur -#maxmem=1073741824 -maxmem=840000000 -port=61649 -ipaddress=10.0.2.1 -sharedbuf=FrameReceiverBuffer1 -packetlog=true -ctrl=tcp://*:5000 -ready=tcp://*:5001 -release=tcp://*:5002 \ No newline at end of file diff --git a/cpp/config/fr_excalibur2.config b/cpp/config/fr_excalibur2.config deleted file mode 100644 index b159baee9..000000000 --- a/cpp/config/fr_excalibur2.config +++ /dev/null @@ -1,11 +0,0 @@ -logconfig=../config/fr_log4cxx.xml -sensortype=Excalibur -#maxmem=1073741824 -maxmem=840000000 -port=61649 -ipaddress=10.0.3.1 -sharedbuf=FrameReceiverBuffer2 -packetlog=true -ctrl=tcp://*:6000 -ready=tcp://*:6001 -release=tcp://*:6002 diff --git a/cpp/config/fr_test.config b/cpp/config/fr_test.config deleted file mode 100644 index 7ac14f79a..000000000 --- a/cpp/config/fr_test.config +++ /dev/null @@ -1,8 +0,0 @@ -logconfig=../config/fr_log4cxx.xml -sensortype=DummyUDP -#maxmem=1073741824 -maxmem=840000000 -port=8000,8001 -ipaddress=0.0.0.0 -sharedbuf=FrameReceiverBuffer -packetlog=true diff --git a/cpp/config/fr_test_excalibur.config b/cpp/config/fr_test_excalibur.config deleted file mode 100644 index 7adb9f36f..000000000 --- a/cpp/config/fr_test_excalibur.config +++ /dev/null @@ -1,8 +0,0 @@ -logconfig=../config/fr_log4cxx.xml -decodertype=Excalibur -#maxmem=1073741824 -maxmem=840000000 -port=61649 -ipaddress=0.0.0.0 -sharedbuf=FrameReceiverBuffer -packetlog=true diff --git a/cpp/config/fr_test_osx.config b/cpp/config/fr_test_osx.config deleted file mode 100644 index 6ba816076..000000000 --- a/cpp/config/fr_test_osx.config +++ /dev/null @@ -1,9 +0,0 @@ -logconfig=../config/fr_log4cxx.xml -sensortype=DummyUDP -#maxmem=1073741824 -maxmem=840000000 -port=8000,8001 -ipaddress=127.0.0.1 -sharedbuf=FrameReceiverBuffer -packetlog=true -rxbuffer=1048576 \ No newline at end of file diff --git a/cpp/frameProcessor/include/FrameProcessorApp.h b/cpp/frameProcessor/include/FrameProcessorApp.h index bfee5d9a3..0d9161bcf 100644 --- a/cpp/frameProcessor/include/FrameProcessorApp.h +++ b/cpp/frameProcessor/include/FrameProcessorApp.h @@ -22,28 +22,19 @@ class FrameProcessorApp ~FrameProcessorApp(); int parse_arguments(int argc, char** argv); - - bool isBloscRequired(); - - void configureController(); - void configureDetector(); - void configureDetectorDecoder(); - void configureBlosc(); - void configureHDF5(); - void configureDataset(string name, bool master=false); - void configurePlugins(); - void configureFileWriter(); - - void checkNoClientArgs(); - + void configure_controller(OdinData::IpcMessage& config_msg); void run(); private: LoggerPtr logger_; //!< Log4CXX logger instance pointer - std::string json_config_file_; //!< Full path to JSON configuration file static boost::shared_ptr controller_; //!< FrameProcessor controller object - boost::program_options::variables_map vm_; //!< Boost program options variable map + + // Command line options + unsigned int io_threads_; //!< Number of IO threads for IPC channels + std::string ctrl_channel_endpoint_; //!< IPC channel endpoint for control communication with other processes + std::string config_file_; //!< Full path to JSON configuration file + }; } // namespace FrameProcessor diff --git a/cpp/frameProcessor/include/FrameProcessorController.h b/cpp/frameProcessor/include/FrameProcessorController.h index d8395018c..d2944bdd6 100644 --- a/cpp/frameProcessor/include/FrameProcessorController.h +++ b/cpp/frameProcessor/include/FrameProcessorController.h @@ -18,6 +18,7 @@ #include "SharedBufferManager.h" #include "ClassLoader.h" #include "FrameProcessorPlugin.h" +#include "OdinDataDefaults.h" namespace FrameProcessor { @@ -36,7 +37,7 @@ class FrameProcessorController : public IFrameCallback, public boost::enable_shared_from_this { public: - FrameProcessorController(unsigned int num_io_threads=1); + FrameProcessorController(unsigned int num_io_threads=OdinData::Defaults::default_io_threads); virtual ~FrameProcessorController(); void handleCtrlChannel(); void handleMetaRxChannel(); @@ -134,8 +135,8 @@ class FrameProcessorController : public IFrameCallback, std::map stored_configs_; /** Condition for exiting this file writing process */ boost::condition_variable exitCondition_; - /** Frames per dataset */ - int datasetSize; + /** Frames to write before shutting down - 0 to disable shutdown */ + int shutdownFrameCount; /** Total frames processed */ int totalFrames; /** Master frame specifier - Frame to include in count of total frames processed */ diff --git a/cpp/frameProcessor/src/FrameProcessorApp.cpp b/cpp/frameProcessor/src/FrameProcessorApp.cpp index 543f1321f..77da547a8 100644 --- a/cpp/frameProcessor/src/FrameProcessorApp.cpp +++ b/cpp/frameProcessor/src/FrameProcessorApp.cpp @@ -2,7 +2,7 @@ * FrameProcessorApp.cpp * * Created on: 25 May 2016 - * Author: gnx91527 + * Author: Alan Greer */ #include @@ -49,7 +49,7 @@ static bool has_suffix(const std::string &str, const std::string &suffix) str.compare(str.size() - suffix.size(), suffix.size(), suffix) == 0; } -FrameProcessorApp::FrameProcessorApp(void) : json_config_file_("") +FrameProcessorApp::FrameProcessorApp(void) : config_file_("") { // Retrieve and configure a logger instance OdinData::configure_logging_mdc(OdinData::app_path.c_str()); @@ -62,9 +62,16 @@ FrameProcessorApp::~FrameProcessorApp() controller_.reset(); } +/** Parse command line arguments + * + * @return return code: + * * -1 if option parsing succeeded and we should continue running the application + * * 0 if specific command completed (e.g. --help) and we should exit with success + * * 1 if option parsing failed and we should exit with failure + */ int FrameProcessorApp::parse_arguments(int argc, char** argv) { - int rc = 0; + int rc = -1; try { std::string config_file; @@ -76,282 +83,73 @@ int FrameProcessorApp::parse_arguments(int argc, char** argv) "Print this help message") ("version,v", "Print program version string") - ("config,c", po::value(&config_file), - "Specify program configuration file") ; - // Declare a group of options that will be allowed both on the command line - // and in the configuration file po::options_description config("Configuration options"); config.add_options() - ("debug-level,d", po::value()->default_value(debug_level), - "Set the debug level") - ("logconfig,l", po::value(), - "Set the log4cxx logging configuration file") - ("iothreads", po::value()->default_value(1), - "Set number of IPC channel IO threads") - ("ctrl", po::value()->default_value("tcp://0.0.0.0:5004"), - "Set the control endpoint") - ("ready", po::value()->default_value("tcp://127.0.0.1:5001"), - "Ready ZMQ endpoint from frameReceiver") - ("release", po::value()->default_value("tcp://127.0.0.1:5002"), - "Release frame ZMQ endpoint from frameReceiver") - ("meta", po::value()->default_value("tcp://*:5558"), - "ZMQ meta data channel publish stream") - ("init,I", po::bool_switch(), - "Initialise frame receiver and meta interfaces.") - ("no-client,N", po::bool_switch(), - "Enable full initial configuration to run without any client controller." - "You must also be provide: detector, path, datasets, dtype and dims.") - ("processes,p", po::value()->default_value(1), - "Number of concurrent file writer processes") - ("rank,r", po::value()->default_value(0), - "The rank (index number) of the current file writer process in relation to the other concurrent ones") - ("detector", po::value(), - "Detector to configure for") - ("path", po::value(), - "Path to detector shared library with format 'libProcessPlugin.so'") - ("datasets", po::value >()->multitoken(), - "Name(s) of datasets to write (space separated)") - ("dtype", po::value(), - "Data type of raw detector data (0: 8bit, 1: 16bit, 2: 32bit, 3:64bit)") - ("dims", po::value >()->multitoken(), - "Dimensions of each frame (space separated)") - ("chunk-dims,C", po::value >()->multitoken(), - "Chunk size of each sub-frame (space separated)") - ("bit-depth", po::value(), - "Bit-depth mode of detector") - ("compression", po::value(), - "Compression type of input data (0: None, 1: LZ4, 2: BSLZ4, 3: Blosc)") - ("output,o", po::value()->default_value("test.hdf5"), - "Name of HDF5 file to write frames to (default: test.hdf5)") - ("output-dir", po::value()->default_value("/tmp/"), - "Directory to write HDF5 file to (default: /tmp/)") - ("extension", po::value()->default_value("h5"), - "Set the file extension of the data files (default: h5)") - ("single-shot,S", po::bool_switch(), - "Shutdown after one dataset completed") - ("frames,f", po::value()->default_value(0), - "Set the number of frames to write into dataset") - ("acqid", po::value()->default_value(""), - "Set the Acquisition Id of the acquisition") - ("timeout", po::value()->default_value(0), - "Set the timeout period for closing the file (milliseconds)") - ("block-size", po::value()->default_value(1), - "Set the number of consecutive frames to write per block") - ("blocks-per-file", po::value()->default_value(0), - "Set the number of blocks to write to file. Default is 0 (unlimited)") - ("earliest-hdf-ver", po::bool_switch(), - "Set to use earliest hdf5 file version. Default is off (use latest)") - ("alignment-threshold", po::value()->default_value(1), - "Set the hdf5 alignment threshold. Default is 1 (no alignment)") - ("alignment-value", po::value()->default_value(1), - "Set the hdf5 alignment value. Default is 1 (no alignment)") - ("json_file,j", po::value()->default_value(""), - "Path to a JSON configuration file to submit to the application") + ("debug-level,d", po::value()->default_value(debug_level), "Set the debug level") + ("log-config,l", po::value(), "Set the log4cxx logging configuration file") + ("io-threads", po::value()->default_value(OdinData::Defaults::default_io_threads), "Set number of IPC channel IO threads") + ("ctrl", po::value()->default_value("tcp://127.0.0.1:5004"), "Set the control endpoint") + ("config,c", po::value()->default_value(""), "File path of inital JSON config for controller") ; - // Group the variables for parsing at the command line and/or from the configuration file + // Group the variables for parsing at the command line po::options_description cmdline_options; cmdline_options.add(generic).add(config); - po::options_description config_file_options; - config_file_options.add(config); // Parse the command line options - po::store(po::parse_command_line(argc, argv, cmdline_options), vm_); - po::notify(vm_); + po::variables_map vm; + po::store(po::parse_command_line(argc, argv, cmdline_options), vm); + po::notify(vm); // If the command-line help option was given, print help and exit - if (vm_.count("help")) + if (vm.count("help")) { - std::cout << "usage: frameProcessor [options]" << std::endl << std::endl; + std::cout << "Usage: frameProcessor [options]" << std::endl << std::endl; std::cout << cmdline_options << std::endl; - return 1; + return 0; } // If the command line version option was given, print version and exit - if (vm_.count("version")) { + if (vm.count("version")) { std::cout << "frameProcessor version " << ODIN_DATA_VERSION_STR << std::endl; - return 1; + return 0; } - // If the command line config option was given, parse the specified configuration - // file for additional options. Note that boost::program_options gives precedence - // to the first instance occurring. In this case, that implies that command-line - // options have precedence over equivalent configuration file entries. - if (vm_.count("config")) + if (vm.count("log-config")) { - std::ifstream config_ifs(config_file.c_str()); - if (config_ifs) - { - LOG4CXX_DEBUG(logger_, "Parsing configuration file " << config_file); - po::store(po::parse_config_file(config_ifs, config_file_options, true), vm_); - po::notify(vm_); - } - else - { - LOG4CXX_ERROR(logger_, "Unable to open configuration file " << config_file << " for parsing"); - return 1; - } - } - - if (vm_.count("logconfig")) - { - std::string logconf_fname = vm_["logconfig"].as(); - if (has_suffix(logconf_fname, ".xml")) { - log4cxx::xml::DOMConfigurator::configure(logconf_fname); + std::string log_config = vm["log-config"].as(); + if (has_suffix(log_config, ".xml")) { + log4cxx::xml::DOMConfigurator::configure(log_config); } else { - PropertyConfigurator::configure(logconf_fname); + PropertyConfigurator::configure(log_config); } - LOG4CXX_DEBUG(logger_, "log4cxx config file is set to " << vm_["logconfig"].as()); + LOG4CXX_DEBUG(logger_, "log4cxx config file is set to " << vm["log-config"].as()); } - if (vm_.count("debug-level")) + if (vm.count("debug-level")) { - set_debug_level(vm_["debug-level"].as()); + set_debug_level(vm["debug-level"].as()); LOG4CXX_DEBUG_LEVEL(1, logger_, "Debug level set to " << debug_level); } - if (vm_.count("iothreads")) - { - LOG4CXX_DEBUG_LEVEL(1, logger_, "Setting number of IO threads to " << vm_["iothreads"].as()); - } - - bool no_client = vm_["no-client"].as(); - - if (no_client) - { - LOG4CXX_DEBUG_LEVEL(1, logger_, "Running FileWriter without client"); - } - - if (no_client && vm_.count("ready")) - { - LOG4CXX_DEBUG_LEVEL(1, logger_, "Setting frame ready notification ZMQ address to " << vm_["ready"].as()); - } - - if (no_client && vm_.count("release")) - { - LOG4CXX_DEBUG_LEVEL(1, logger_, "Setting frame release notification ZMQ address to " << vm_["release"].as()); - } - - if (no_client && vm_.count("frames")) - { - LOG4CXX_DEBUG_LEVEL(1, logger_, "Setting number of frames to receive to " << vm_["frames"].as()); - } - - if (no_client && vm_.count("datasets")) - { - string message = "Configuring datasets: "; - std::vector datasets = vm_["datasets"].as >(); - for (std::vector::iterator it = datasets.begin(); it != datasets.end(); ++it) { - message += *it + ", "; - } - LOG4CXX_DEBUG_LEVEL(1, logger_, message); - } - - if (no_client && vm_.count("dims")) - { - string message = "Setting dataset dimensions to: "; - std::vector dims = vm_["dims"].as >(); - for (std::vector::iterator it = dims.begin(); it != dims.end(); ++it) { - message += boost::lexical_cast(*it) + ", "; - } - LOG4CXX_DEBUG_LEVEL(1, logger_, message); - } - - if (no_client && vm_.count("chunk-dims")) - { - string message = "Setting dataset chunk dimensions to: "; - std::vector chunkDims = vm_["chunk-dims"].as >(); - for (std::vector::iterator it = chunkDims.begin(); it != chunkDims.end(); ++it) { - message += boost::lexical_cast(*it) + ", "; - } - LOG4CXX_DEBUG_LEVEL(1, logger_, message); - } - - if (no_client && vm_.count("detector")) - { - LOG4CXX_DEBUG_LEVEL(1, logger_, "Configuring for " << vm_["detector"].as() << " detector"); - } - - if (no_client && vm_.count("bit-depth")) - { - LOG4CXX_DEBUG_LEVEL(1, logger_, "Setting bit-depth to " << vm_["bit-depth"].as()); - } - - if (no_client && vm_.count("dtype")) + if (vm.count("io-threads")) { - LOG4CXX_DEBUG_LEVEL(1, logger_, "Setting data type to " << vm_["dtype"].as()); + io_threads_ = vm["io-threads"].as(); + LOG4CXX_DEBUG_LEVEL(1, logger_, "Setting number of IO threads to " << vm["io-threads"].as()); } - if (no_client && vm_.count("path")) + if (vm.count("ctrl")) { - LOG4CXX_DEBUG_LEVEL(1, logger_, "Setting shared library path to " << vm_["path"].as()); + ctrl_channel_endpoint_ = vm["ctrl"].as(); + LOG4CXX_DEBUG_LEVEL(1, logger_, "Setting control channel endpoint to " << ctrl_channel_endpoint_); } - if (no_client && vm_.count("compression")) + if (vm.count("config")) { - LOG4CXX_DEBUG_LEVEL(1, logger_, "Setting compression type to " << vm_["compression"].as()); - } - - if (no_client && vm_.count("ctrl")) - { - LOG4CXX_DEBUG_LEVEL(1, logger_, "Setting control endpoint to: " << vm_["ctrl"].as()); - } - - if (no_client && vm_.count("meta")) - { - LOG4CXX_DEBUG_LEVEL(1, logger_, "Setting meta endpoint to: " << vm_["meta"].as()); - } - - if (no_client && vm_.count("output")) - { - LOG4CXX_DEBUG_LEVEL(1, logger_, "Writing frames to file: " << vm_["output"].as()); - } - - if (no_client && vm_.count("processes")) - { - LOG4CXX_DEBUG_LEVEL(1, logger_, "Number of concurrent file writer processes: " << vm_["processes"].as()); - } - - if (no_client && vm_.count("rank")) - { - LOG4CXX_DEBUG_LEVEL(1, logger_, "This process rank (index): " << vm_["rank"].as()); - } - - if (no_client && vm_.count("acqid")) - { - LOG4CXX_DEBUG_LEVEL(1, logger_, "Setting acquisition ID to " << vm_["acqid"].as()); - } - - if (no_client && vm_.count("timeout")) - { - LOG4CXX_DEBUG_LEVEL(1, logger_, "Setting close file timeout period to " << vm_["timeout"].as()); - } - - if (no_client && vm_.count("block-size")) - { - LOG4CXX_DEBUG_LEVEL(1, logger_, "Setting number of frames per block to " << vm_["block-size"].as()); - } - - if (no_client && vm_.count("blocks-per-file")) - { - LOG4CXX_DEBUG_LEVEL(1, logger_, "Setting number of blocks per file to " << vm_["blocks-per-file"].as()); - } - - if (no_client && vm_["earliest-hdf-ver"].as()) - { - LOG4CXX_DEBUG_LEVEL(1, logger_, "Using earliest HDF5 version"); - } - - if (no_client && vm_.count("alignment-threshold")) - { - LOG4CXX_DEBUG_LEVEL(1, logger_, "Setting alignment threshold to " << vm_["alignment-threshold"].as()); - } - - if (no_client && vm_.count("alignment-value")) - { - LOG4CXX_DEBUG_LEVEL(1, logger_, "Setting alignment value to " << vm_["alignment-value"].as()); + config_file_ = vm["config"].as(); + LOG4CXX_DEBUG_LEVEL(1, logger_, "Loading JSON configuration file " << config_file_); } } @@ -379,267 +177,62 @@ int FrameProcessorApp::parse_arguments(int argc, char** argv) return rc; } -bool FrameProcessorApp::isBloscRequired() -{ - return vm_.count("compression") && vm_["compression"].as() == 3; -} - -void FrameProcessorApp::configureController() -{ - OdinData::IpcMessage cfg; - OdinData::IpcMessage reply; - - // Configure ZMQ channels - cfg.set_param("fr_setup/fr_ready_cnxn", vm_["ready"].as()); - cfg.set_param("fr_setup/fr_release_cnxn", vm_["release"].as()); - cfg.set_param("meta_endpoint", vm_["meta"].as()); - - // Configure single-shot mode - cfg.set_param("single-shot", vm_["single-shot"].as()); - cfg.set_param("frames", vm_["frames"].as()); - - controller_->configure(cfg, reply); -} - -void FrameProcessorApp::configureDetector() -{ - OdinData::IpcMessage cfg; - OdinData::IpcMessage reply; - - string name = vm_["detector"].as(); - name[0] = std::toupper(name[0]); - string libraryName = name + "ProcessPlugin"; - boost::filesystem::path libraryPath = boost::filesystem::path(vm_["path"].as()) / - boost::filesystem::path("lib" + libraryName + ".so"); - - cfg.set_param("plugin/load/library", libraryPath.string()); - cfg.set_param("plugin/load/index", vm_["detector"].as()); - cfg.set_param("plugin/load/name", libraryName); - cfg.set_param("plugin/connect/index", vm_["detector"].as()); - cfg.set_param("plugin/connect/connection", "frame_receiver"); - if (vm_.count("bit-depth")) { - cfg.set_param(vm_["detector"].as() + "/bitdepth", vm_["bit-depth"].as()); - } - - controller_->configure(cfg, reply); -} - -void FrameProcessorApp::configureDetectorDecoder() -{ - OdinData::IpcMessage cfg; - OdinData::IpcMessage reply; - - string detector_decoder = vm_["detector"].as(); - detector_decoder[0] = std::tolower(detector_decoder[0]); - - if (vm_.count("bit-depth")) { - cfg.set_param(detector_decoder + "/bitdepth", vm_["bit-depth"].as()); - } - - if (vm_.count("dims")) { - std::vector dims = vm_["dims"].as >(); - cfg.set_param(detector_decoder + "/height", dims[0]); - cfg.set_param(detector_decoder + "/width", dims[1]); - } - controller_->configure(cfg, reply); -} - -void FrameProcessorApp::configureBlosc() -{ - OdinData::IpcMessage cfg; - OdinData::IpcMessage reply; - - cfg.set_param("plugin/load/library", "./lib/libBloscPlugin.so"); - cfg.set_param("plugin/load/index", "blosc"); - cfg.set_param("plugin/load/name", "BloscPlugin"); - cfg.set_param("plugin/connect/index", "blosc"); - cfg.set_param("plugin/connect/connection", vm_["detector"].as()); - - controller_->configure(cfg, reply); -} - -void FrameProcessorApp::configureHDF5() -{ - OdinData::IpcMessage cfg; - OdinData::IpcMessage reply; - - cfg.set_param("plugin/load/library", "./lib/libHdf5Plugin.so"); - cfg.set_param("plugin/load/index", "hdf"); - cfg.set_param("plugin/load/name", "FileWriterPlugin"); - cfg.set_param("plugin/connect/index", "hdf"); - if (isBloscRequired()) { - cfg.set_param("plugin/connect/connection", "blosc"); - } else { - cfg.set_param("plugin/connect/connection", vm_["detector"].as()); - } - cfg.set_param("hdf/process/number", vm_["processes"].as()); - cfg.set_param("hdf/process/rank", vm_["rank"].as()); - cfg.set_param("hdf/process/frames_per_block", vm_["block-size"].as()); - cfg.set_param("hdf/process/blocks_per_file", vm_["blocks-per-file"].as()); - cfg.set_param("hdf/process/earliest_version", vm_["earliest-hdf-ver"].as()); - cfg.set_param("hdf/process/alignment_threshold", vm_["alignment-threshold"].as()); - cfg.set_param("hdf/process/alignment_value", vm_["alignment-value"].as()); - - controller_->configure(cfg, reply); -} - -void FrameProcessorApp::configureDataset(string name, bool master) -{ - OdinData::IpcMessage cfg; - OdinData::IpcMessage reply; - - rapidjson::Document jsonDoc; - rapidjson::Value jsonDims(rapidjson::kArrayType); - rapidjson::Document::AllocatorType& dimAllocator = jsonDoc.GetAllocator(); - jsonDoc.SetObject(); - - if (vm_.count("dims")) { - std::vector dims = vm_["dims"].as >(); - for (std::vector::iterator it = dims.begin(); it != dims.end(); ++it) { - jsonDims.PushBack(*it, dimAllocator); - } - } - - cfg.set_param("hdf/dataset/" + name + "/datatype", vm_["dtype"].as()); - cfg.set_param("hdf/dataset/" + name + "/dims", jsonDims); - - if (vm_.count("chunk-dims")) { - std::vector chunkDims = vm_["chunk-dims"].as >(); - rapidjson::Value jsonChunkDims(rapidjson::kArrayType); - rapidjson::Document::AllocatorType &chunkDimAllocator = jsonDoc.GetAllocator(); - jsonDoc.SetObject(); - for (std::vector::iterator it2 = chunkDims.begin(); it2 != chunkDims.end(); ++it2) { - jsonChunkDims.PushBack(*it2, chunkDimAllocator); - } - cfg.set_param("hdf/dataset/" + name + "/chunks", jsonChunkDims); - } - if (master) { - cfg.set_param("hdf/master", name); - } - if (vm_.count("compression")) { - cfg.set_param("hdf/dataset/" + name + "/compression", vm_["compression"].as()); - } - - controller_->configure(cfg, reply); -} - -void FrameProcessorApp::configureFileWriter() -{ - OdinData::IpcMessage cfg; - OdinData::IpcMessage reply; - - cfg.set_param("hdf/file/name", vm_["output"].as()); - cfg.set_param("hdf/file/path", vm_["output-dir"].as()); - cfg.set_param("hdf/file/extension", vm_["extension"].as()); - cfg.set_param("hdf/frames", vm_["frames"].as()); - cfg.set_param("hdf/acquisition_id", vm_["acqid"].as()); - cfg.set_param("hdf/timeout_timer_period", vm_["timeout"].as()); - cfg.set_param("hdf/write", true); - - controller_->configure(cfg, reply); -} - -void FrameProcessorApp::configurePlugins() -{ - configureDetector(); - configureDetectorDecoder(); - if (isBloscRequired()) { - configureBlosc(); - } - configureHDF5(); - - std::vector datasets = vm_["datasets"].as >(); - if (datasets.size() > 1) { - configureDataset(datasets[0], true); - for (std::vector::iterator it = datasets.begin() + 1; it != datasets.end(); ++it) { - configureDataset(*it, false); - } - } - else { - configureDataset(datasets[0], false); - } -} - -void FrameProcessorApp::checkNoClientArgs() { - if (!(vm_.count("detector") && vm_.count("path") && vm_.count("datasets") - && vm_.count("dtype") && vm_.count("dims"))) { - throw runtime_error("Must provide detector, path, datasets, dtype and " - "dims to run no client mode."); - } -} - void FrameProcessorApp::run(void) { LOG4CXX_INFO(logger_, "frameProcessor version " << ODIN_DATA_VERSION_STR << " starting up"); - try { - // Instantiate a controller - unsigned int num_io_threads = vm_["iothreads"].as(); controller_ = boost::shared_ptr( - new FrameProcessorController(num_io_threads) + new FrameProcessorController(io_threads_) ); + try { + // Configure the control channel for the FrameProcessor - OdinData::IpcMessage cfg; + OdinData::IpcMessage ctrl_endpoint_cfg; OdinData::IpcMessage reply; - cfg.set_param("ctrl_endpoint", vm_["ctrl"].as()); - controller_->configure(cfg, reply); - - if (vm_["init"].as() || vm_["no-client"].as()) { - configureController(); - if (vm_["no-client"].as()) { - checkNoClientArgs(); - LOG4CXX_DEBUG_LEVEL(1, logger_, "Adding configuration options to work without a client"); - configurePlugins(); - configureFileWriter(); - } - } + ctrl_endpoint_cfg.set_param("ctrl_endpoint", ctrl_channel_endpoint_); + controller_->configure(ctrl_endpoint_cfg, reply); - if (vm_["json_file"].as() != "") { - std::string json_config_file = vm_["json_file"].as(); + if (config_file_ != "") { // Attempt to open the file specified and read in the string as a JSON parameter set - std::ifstream t(json_config_file.c_str()); - std::string json((std::istreambuf_iterator(t)), - std::istreambuf_iterator()); + std::ifstream config_file_stream(config_file_.c_str()); + std::string config_text((std::istreambuf_iterator(config_file_stream)), std::istreambuf_iterator()); // Check for empty JSON and throw an exception. - if (json == "") { + if (config_text == "") { throw OdinData::OdinDataException("Incorrect or empty JSON configuration file specified"); } // Parse the JSON file - rapidjson::Document param_doc; + rapidjson::Document config_json; - if (param_doc.Parse(json.c_str()).HasParseError()) { + if (config_json.Parse(config_text.c_str()).HasParseError()) { std::stringstream msg; - std::string error_snippet = extract_substr_at_pos(json, param_doc.GetErrorOffset(), 15); + std::string error_snippet = extract_substr_at_pos(config_text, config_json.GetErrorOffset(), 15); msg << "Parsing JSON configuration failed at line " - << extract_line_no(json, param_doc.GetErrorOffset()) << ": " - << rapidjson::GetParseError_En(param_doc.GetParseError()) << " " << error_snippet; + << extract_line_no(config_text, config_json.GetErrorOffset()) << ": " + << rapidjson::GetParseError_En(config_json.GetParseError()) << " " << error_snippet; throw OdinData::OdinDataException(msg.str()); } // Check if the top level object is an array - if (param_doc.IsArray()) { + OdinData::IpcMessage config_msg; + if (config_json.IsArray()) { // Loop over the array submitting the child objects in order - for (rapidjson::SizeType i = 0; i < param_doc.Size(); ++i) { - // Create a configuration message - OdinData::IpcMessage json_config_msg(param_doc[i], - OdinData::IpcMessage::MsgTypeCmd, - OdinData::IpcMessage::MsgValCmdConfigure); - // Now submit the config to the controller - controller_->configure(json_config_msg, reply); + for (rapidjson::SizeType i = 0; i < config_json.Size(); ++i) { + OdinData::IpcMessage config_msg( + config_json[i], OdinData::IpcMessage::MsgTypeCmd, OdinData::IpcMessage::MsgValCmdConfigure + ); + configure_controller(config_msg); } } else { // Single level JSON object - // Create a configuration message - OdinData::IpcMessage json_config_msg(param_doc, - OdinData::IpcMessage::MsgTypeCmd, - OdinData::IpcMessage::MsgValCmdConfigure); - // Now submit the config to the controller - controller_->configure(json_config_msg, reply); + OdinData::IpcMessage json_config_msg( + config_json, OdinData::IpcMessage::MsgTypeCmd, OdinData::IpcMessage::MsgValCmdConfigure + ); + configure_controller(config_msg); } } @@ -654,9 +247,26 @@ void FrameProcessorApp::run(void) } } +/** + * Configure the controller with the given configuration message. + * + * Any runtime_error from the controller is caught and reported as an error. + * + * @param config_msg the message containing the configuration + */ +void FrameProcessorApp::configure_controller(OdinData::IpcMessage& config_msg) +{ + OdinData::IpcMessage reply; + try { + controller_->configure(config_msg, reply); + } catch (const std::runtime_error& e) { + LOG4CXX_ERROR(logger_, "Failed to configure controller with message " << config_msg.encode()); + } +} + int main (int argc, char** argv) { - int rc = 0; + int rc = -1; // Initialise unexpected fault handling OdinData::init_seg_fault_handler(); @@ -666,15 +276,15 @@ int main (int argc, char** argv) OdinData::app_path = argv[0]; // Create a FrameProcessorApp instance - FrameProcessor::FrameProcessorApp fp_instance; + FrameProcessor::FrameProcessorApp app; // Parse commnd line arguments - rc = fp_instance.parse_arguments (argc, argv); + rc = app.parse_arguments(argc, argv); - // Run the frame processor - if (rc == 0) - { - fp_instance.run (); + if (rc == -1) { + // Run the application + app.run(); + rc = 0; } return rc; diff --git a/cpp/frameProcessor/src/FrameProcessorController.cpp b/cpp/frameProcessor/src/FrameProcessorController.cpp index 54c132d25..3a61f1ce5 100644 --- a/cpp/frameProcessor/src/FrameProcessorController.cpp +++ b/cpp/frameProcessor/src/FrameProcessorController.cpp @@ -63,8 +63,8 @@ FrameProcessorController::FrameProcessorController(unsigned int num_io_threads) metaRxChannel_(ZMQ_PULL), metaTxChannelEndpoint_(""), metaTxChannel_(ZMQ_PUB), - frReadyEndpoint_(""), - frReleaseEndpoint_("") + frReadyEndpoint_(OdinData::Defaults::default_frame_ready_endpoint), + frReleaseEndpoint_(OdinData::Defaults::default_frame_release_endpoint) { OdinData::configure_logging_mdc(OdinData::app_path.c_str()); LOG4CXX_DEBUG_LEVEL(1, logger_, "Constructing FrameProcessorController"); @@ -110,7 +110,7 @@ void FrameProcessorController::handleCtrlChannel() std::string clientIdentity; std::string ctrlMsgEncoded = ctrlChannel_.recv(&clientIdentity); unsigned int msg_id = 0; - + LOG4CXX_DEBUG_LEVEL(3, logger_, "Control thread called with message: " << ctrlMsgEncoded); // Parse and handle the message @@ -252,8 +252,8 @@ void FrameProcessorController::callback(boost::shared_ptr frame) { LOG4CXX_DEBUG_LEVEL(2, logger_, "Frame " << totalFrames << " complete."); } - if (totalFrames == datasetSize) { - LOG4CXX_DEBUG_LEVEL(2, logger_, "Dataset complete. Shutting down."); + if (totalFrames == shutdownFrameCount) { + LOG4CXX_DEBUG_LEVEL(2, logger_, "Shutdown frame count reached"); // Set exit condition so main thread can continue and shutdown exitCondition_.notify_all(); // Wait until the main thread has sent stop commands to the plugins @@ -317,7 +317,7 @@ void FrameProcessorController::provideVersion(OdinData::IpcMessage& reply) reply.set_param("version/odin-data/patch", ODIN_DATA_VERSION_PATCH); reply.set_param("version/odin-data/short", std::string(ODIN_DATA_VERSION_STR_SHORT)); reply.set_param("version/odin-data/full", std::string(ODIN_DATA_VERSION_STR)); - + // Loop over plugins, list version information from each std::map >::iterator iter; for (iter = plugins_.begin(); iter != plugins_.end(); ++iter) { @@ -329,7 +329,7 @@ void FrameProcessorController::provideVersion(OdinData::IpcMessage& reply) /** * Set configuration options for the FrameProcessorController. * - * Sets up the overall FileWriter application according to the + * Sets up the overall frameProcessor application according to the * configuration IpcMessage objects that are received. The objects * are searched for: * CONFIG_SHUTDOWN - Shuts down the application @@ -363,11 +363,10 @@ void FrameProcessorController::configure(OdinData::IpcMessage& config, OdinData: } } - // If single-shot and frames given then we are running for defined number and then shutting down - if (config.has_param("single-shot") && config.get_param("single-shot") && - config.has_param("frames") && config.get_param("frames") != 0) { - datasetSize = config.get_param("frames"); - LOG4CXX_DEBUG_LEVEL(1, logger_, "Dataset size: " << datasetSize); + // If frames given then we are running for defined number and then shutting down + if (config.has_param("frames") && config.get_param("frames") != 0) { + shutdownFrameCount = config.get_param("frames"); + LOG4CXX_DEBUG_LEVEL(1, logger_, "Shutdown frame count set to: " << shutdownFrameCount); } // Check for a request to inject an End Of Acquisition object @@ -621,10 +620,11 @@ void FrameProcessorController::loadPlugin(const std::string& index, const std::s plugin->connect_meta_channel(); plugins_[index] = plugin; - // Register callback to FWC with FileWriter plugin - if (name == "FileWriter") { + // Register callback with self for FileWriterPlugin + if (name == "FileWriterPlugin") { plugin->register_callback("controller", this->shared_from_this(), true); } + LOG4CXX_INFO(logger_, "Class " << name << " loaded as index = " << index); // Start the plugin worker thread plugin->start(); @@ -788,8 +788,9 @@ void FrameProcessorController::setupFrameReceiverInterface(const std::string& fr { LOG4CXX_DEBUG_LEVEL(1, logger_, "Shared Memory Config: Publisher=" << frPublisherString << " Subscriber=" << frSubscriberString); - // Only re-construct the shared memory object if either of the endpoints has been changed - if (frPublisherString != frReleaseEndpoint_ || frSubscriberString != frReadyEndpoint_){ + // Only reconstruct the shared memory controller if it has never been created or either + // of the endpoints has been changed + if (!sharedMemController_ || frPublisherString != frReleaseEndpoint_ || frSubscriberString != frReadyEndpoint_) { try { // Release the current shared memory controller if one exists diff --git a/cpp/frameReceiver/include/FrameReceiverApp.h b/cpp/frameReceiver/include/FrameReceiverApp.h index 08bc71c04..478ae30a3 100644 --- a/cpp/frameReceiver/include/FrameReceiverApp.h +++ b/cpp/frameReceiver/include/FrameReceiverApp.h @@ -52,10 +52,12 @@ class FrameReceiverApp private: - LoggerPtr logger_; //!< Log4CXX logger instance pointer - FrameReceiverConfig config_; //!< Configuration storage object - std::string json_config_file_; //!< Full path to JSON configuration file - static boost::shared_ptr controller_; //!< FrameReceiver controller object + LoggerPtr logger_; //!< Log4CXX logger instance pointer + static boost::shared_ptr controller_; //!< FrameReceiver controller object + + // Command line options + FrameReceiverConfig config_; //!< Configuration storage object + std::string config_file_; //!< Full path to JSON configuration file }; diff --git a/cpp/frameReceiver/include/FrameReceiverConfig.h b/cpp/frameReceiver/include/FrameReceiverConfig.h index 931bfbf82..f3e4959e4 100644 --- a/cpp/frameReceiver/include/FrameReceiverConfig.h +++ b/cpp/frameReceiver/include/FrameReceiverConfig.h @@ -19,6 +19,7 @@ #include #include "FrameReceiverDefaults.h" +#include "OdinDataDefaults.h" #include "IpcMessage.h" using namespace OdinData; @@ -44,13 +45,14 @@ namespace FrameReceiver const std::string CONFIG_ENABLE_PACKET_LOGGING = "enable_packet_logging"; const std::string CONFIG_FORCE_RECONFIG = "force_reconfig"; const std::string CONFIG_DEBUG = "debug_level"; + const std::string CONFIG_FRAMES = "frames"; class FrameReceiverConfig { public: FrameReceiverConfig() : - + max_buffer_mem_(Defaults::default_max_buffer_mem), decoder_path_(Defaults::default_decoder_path), decoder_type_(Defaults::default_decoder_type), @@ -58,12 +60,12 @@ class FrameReceiverConfig rx_type_(Defaults::default_rx_type), rx_address_(Defaults::default_rx_address), rx_recv_buffer_size_(Defaults::default_rx_recv_buffer_size), - io_threads_(Defaults::default_io_threads), + io_threads_(OdinData::Defaults::default_io_threads), rx_channel_endpoint_(Defaults::default_rx_chan_endpoint), ctrl_channel_endpoint_(Defaults::default_ctrl_chan_endpoint), - frame_ready_endpoint_(Defaults::default_frame_ready_endpoint), - frame_release_endpoint_(Defaults::default_frame_release_endpoint), - shared_buffer_name_(Defaults::default_shared_buffer_name), + frame_ready_endpoint_(OdinData::Defaults::default_frame_ready_endpoint), + frame_release_endpoint_(OdinData::Defaults::default_frame_release_endpoint), + shared_buffer_name_(OdinData::Defaults::default_shared_buffer_name), frame_timeout_ms_(Defaults::default_frame_timeout_ms), enable_packet_logging_(Defaults::default_enable_packet_logging), force_reconfig_(Defaults::default_force_reconfig) diff --git a/cpp/frameReceiver/include/FrameReceiverDefaults.h b/cpp/frameReceiver/include/FrameReceiverDefaults.h index 63994c752..cec11ddad 100644 --- a/cpp/frameReceiver/include/FrameReceiverDefaults.h +++ b/cpp/frameReceiver/include/FrameReceiverDefaults.h @@ -27,8 +27,6 @@ enum RxType RxTypeZMQ, }; -const int default_node = 1; -const unsigned int default_io_threads = 1; const std::size_t default_max_buffer_mem = 1048576; const std::string default_decoder_path = std::string(BUILD_DIR) + "/lib/"; const std::string default_decoder_type = "unknown"; @@ -41,13 +39,8 @@ const int default_rx_recv_buffer_size = 1048576; const int default_rx_recv_buffer_size = 30000000; #endif const std::string default_rx_chan_endpoint = "inproc://rx_channel"; -const std::string default_ctrl_chan_endpoint = "tcp://*:5000"; -const std::string default_frame_ready_endpoint = "tcp://*:5001"; -const std::string default_frame_release_endpoint = "tcp://*:5002"; -const std::string default_json_config_file = ""; -const std::string default_shared_buffer_name = "FrameReceiverBuffer"; +const std::string default_ctrl_chan_endpoint = "tcp://127.0.0.1:5000"; const unsigned int default_frame_timeout_ms = 1000; -const unsigned int default_frame_count = 0; const bool default_enable_packet_logging = false; const bool default_force_reconfig = false; diff --git a/cpp/frameReceiver/src/FrameReceiverApp.cpp b/cpp/frameReceiver/src/FrameReceiverApp.cpp index 0f89437b5..7874224d6 100644 --- a/cpp/frameReceiver/src/FrameReceiverApp.cpp +++ b/cpp/frameReceiver/src/FrameReceiverApp.cpp @@ -36,7 +36,7 @@ static bool has_suffix(const std::string &str, const std::string &suffix) //! //! This constructor initialises the FrameReceiverApp instance -FrameReceiverApp::FrameReceiverApp(void) : json_config_file_("") +FrameReceiverApp::FrameReceiverApp(void) : config_file_("") { // Retrieve a logger instance @@ -63,12 +63,14 @@ FrameReceiverApp::~FrameReceiverApp() //! //! \param argc - standard command-line argument count //! \param argv - array of char command-line options -//! \return return code, 0 if OK, 1 if option parsing failed +//! \return return code: +//! * -1 if option parsing succeeded and we should continue running the application +//! * 0 if specific command completed (e.g. --help) and we should exit with success +//! * 1 if option parsing failed and we should exit with failure int FrameReceiverApp::parse_arguments(int argc, char** argv) { - - int rc = 0; + int rc = -1; try { std::string config_file; @@ -80,8 +82,6 @@ int FrameReceiverApp::parse_arguments(int argc, char** argv) "Print this help message") ("version,v", "Print program version string") - ("config,c", po::value(&config_file), - "Specify program configuration file") ; // Declare a group of options that will be allowed both on the command line @@ -90,50 +90,19 @@ int FrameReceiverApp::parse_arguments(int argc, char** argv) config.add_options() ("debug-level,d", po::value()->default_value(debug_level), "Set the debug level") - ("node,n", po::value()->default_value(FrameReceiver::Defaults::default_node), - "Set the frame receiver node ID") - ("logconfig,l", po::value(), + ("log-config,l", po::value(), "Set the log4cxx logging configuration file") - ("maxmem,m", po::value()->default_value(FrameReceiver::Defaults::default_max_buffer_mem), - "Set the maximum amount of shared memory to allocate for frame buffers") - ("decodertype,t", po::value()->default_value(FrameReceiver::Defaults::default_decoder_type), - "Set the decoder type to to handle data reception") - ("path", po::value()->default_value(FrameReceiver::Defaults::default_decoder_path), - "Path to load the decoder library from") - ("rxtype", po::value()->default_value( - FrameReceiverConfig::map_rx_type_to_name(FrameReceiver::Defaults::default_rx_type)), - "Set the interface to use for receiving frame data (udp or zmq)") - ("port,p", po::value()->default_value(FrameReceiver::Defaults::default_rx_port_list), - "Set the port to receive frame data on") - ("ipaddress,i", po::value()->default_value(FrameReceiver::Defaults::default_rx_address), - "Set the IP address of the interface to receive frame data on") - ("sharedbuf", po::value()->default_value(FrameReceiver::Defaults::default_shared_buffer_name), - "Set the name of the shared memory frame buffer") - ("frametimeout", po::value()->default_value(FrameReceiver::Defaults::default_frame_timeout_ms), - "Set the incomplete frame timeout in ms") - ("frames,f", po::value()->default_value(FrameReceiver::Defaults::default_frame_count), - "Set the number of frames to receive before terminating") - ("packetlog", po::value()->default_value(FrameReceiver::Defaults::default_enable_packet_logging), - "Enable logging of packet diagnostics to file") - ("rxbuffer", po::value()->default_value(FrameReceiver::Defaults::default_rx_recv_buffer_size), - "Set UDP receive buffer size") - ("iothreads", po::value()->default_value(FrameReceiver::Defaults::default_io_threads), + ("io-threads", po::value()->default_value(OdinData::Defaults::default_io_threads), "Set number of IPC channel IO threads") ("ctrl", po::value()->default_value(FrameReceiver::Defaults::default_ctrl_chan_endpoint), "Set the control channel endpoint") - ("ready", po::value()->default_value(FrameReceiver::Defaults::default_frame_ready_endpoint), - "Set the frame ready channel endpoint") - ("release", po::value()->default_value(FrameReceiver::Defaults::default_frame_release_endpoint), - "Set the frame release channel endpoint") - ("json_file,j", po::value()->default_value(FrameReceiver::Defaults::default_json_config_file), + ("config,c", po::value()->default_value(OdinData::Defaults::default_json_config_file), "Path to a JSON configuration file to submit to the application") ; - // Group the variables for parsing at the command line and/or from the configuration file + // Group the variables for parsing at the command line po::options_description cmdline_options; cmdline_options.add(generic).add(config); - po::options_description config_file_options; - config_file_options.add(config); // Parse the command line options po::variables_map vm; @@ -145,130 +114,36 @@ int FrameReceiverApp::parse_arguments(int argc, char** argv) { std::cout << "usage: frameReceiver [options]" << std::endl << std::endl; std::cout << cmdline_options << std::endl; - return 1; + return 0; } // If the command line version option was given, print version and exit if (vm.count("version")) { std::cout << "frameReceiver version " ODIN_DATA_VERSION_STR << std::endl; - return 1; - } - - if (vm.count("debug-level")) - { - set_debug_level(vm["debug-level"].as()); - LOG4CXX_DEBUG_LEVEL(1, logger_, "Debug level set to " << debug_level); - } - // If the command line config option was given, parse the specified configuration - // file for additional options. Note that boost::program_options gives precedence - // to the first instance occurring. In this case, that implies that command-line - // options have precedence over equivalent configuration file entries. - if (vm.count("config")) - { - std::ifstream config_ifs(config_file.c_str()); - if (config_ifs) - { - LOG4CXX_DEBUG_LEVEL(1, logger_, "Parsing configuration file " << config_file); - po::store(po::parse_config_file(config_ifs, config_file_options, true), vm); - po::notify(vm); - } - else - { - LOG4CXX_ERROR(logger_, "Unable to open configuration file " << config_file << " for parsing"); - return 1; - } + return 0; } - if (vm.count("logconfig")) + if (vm.count("log-config")) { - std::string logconf_fname = vm["logconfig"].as(); - if (has_suffix(logconf_fname, ".xml")) { - log4cxx::xml::DOMConfigurator::configure(logconf_fname); + std::string log_config = vm["log-config"].as(); + if (has_suffix(log_config, ".xml")) { + log4cxx::xml::DOMConfigurator::configure(log_config); } else { - PropertyConfigurator::configure(logconf_fname); - } - LOG4CXX_DEBUG_LEVEL(1, logger_, "log4cxx config file is set to " << logconf_fname); - } - - if (vm.count("maxmem")) - { - config_.max_buffer_mem_ = vm["maxmem"].as(); - LOG4CXX_DEBUG_LEVEL(1, logger_, "Setting frame buffer maximum memory size to " << config_.max_buffer_mem_); - } - - if (vm.count("decodertype")) - { - config_.decoder_type_ = vm["decodertype"].as(); - LOG4CXX_DEBUG_LEVEL(1, logger_, "Setting decoder type to " << config_.decoder_type_); - } - - if (vm.count("path")) - { - config_.decoder_path_ = vm["path"].as(); - LOG4CXX_DEBUG_LEVEL(1, logger_, "Setting decoder path to " << config_.decoder_path_); - } - - if (vm.count("rxtype")) - { - std::string rx_name = vm["rxtype"].as(); - config_.rx_type_ = config_.map_rx_name_to_type(rx_name); - LOG4CXX_DEBUG_LEVEL(1, logger_, "Setting rx type to " << rx_name << " (" << config_.rx_type_ << ")"); - } - - if (vm.count("port")) - { - config_.rx_ports_.clear(); - config_.tokenize_port_list(config_.rx_ports_, vm["port"].as()); - - std::stringstream ss; - for (std::vector::iterator itr = config_.rx_ports_.begin(); itr !=config_.rx_ports_.end(); itr++) - { - ss << *itr << " "; + PropertyConfigurator::configure(log_config); } - LOG4CXX_DEBUG_LEVEL(1, logger_, "Setting RX port(s) to " << ss.str()); - } - - if (vm.count("ipaddress")) - { - config_.rx_address_ = vm["ipaddress"].as(); - LOG4CXX_DEBUG_LEVEL(1, logger_, "Setting RX interface address to " << config_.rx_address_); - } - - if (vm.count("sharedbuf")) - { - config_.shared_buffer_name_ = vm["sharedbuf"].as(); - LOG4CXX_DEBUG_LEVEL(1, logger_, "Setting shared frame buffer name to " << config_.shared_buffer_name_); - } - - if (vm.count("frametimeout")) - { - config_.frame_timeout_ms_ = vm["frametimeout"].as(); - LOG4CXX_DEBUG_LEVEL(1, logger_, "Setting incomplete frame timeout to " << config_.frame_timeout_ms_); - } - - if (vm.count("frames")) - { - config_.frame_count_ = vm["frames"].as(); - LOG4CXX_DEBUG_LEVEL(1, logger_, "Setting number of frames to receive to " << config_.frame_count_); - } - - if (vm.count("packetlog")) - { - config_.enable_packet_logging_ = vm["packetlog"].as(); - LOG4CXX_DEBUG_LEVEL(1, logger_, "Packet diagnostic logging is " << - (config_.enable_packet_logging_ ? "enabled" : "disabled")); + LOG4CXX_DEBUG_LEVEL(1, logger_, "log4cxx config file is set to " << log_config); } - if (vm.count("rxbuffer")) + if (vm.count("debug-level")) { - config_.rx_recv_buffer_size_ = vm["rxbuffer"].as(); - LOG4CXX_DEBUG_LEVEL(1, logger_, "RX receive buffer size is " << config_.rx_recv_buffer_size_); + set_debug_level(vm["debug-level"].as()); + LOG4CXX_DEBUG_LEVEL(1, logger_, "Debug level set to " << debug_level); } - if (vm.count("iothreads")) + if (vm.count("io-threads")) { - config_.io_threads_ = vm["iothreads"].as(); + config_.io_threads_ = vm["io-threads"].as(); LOG4CXX_DEBUG_LEVEL(1, logger_, "Setting number of IO threads to " << config_.io_threads_); } @@ -278,22 +153,10 @@ int FrameReceiverApp::parse_arguments(int argc, char** argv) LOG4CXX_DEBUG_LEVEL(1, logger_, "Setting control channel endpoint to " << config_.ctrl_channel_endpoint_); } - if (vm.count("ready")) - { - config_.frame_ready_endpoint_ = vm["ready"].as(); - LOG4CXX_DEBUG_LEVEL(1, logger_, "Setting frame ready channel endpoint to " << config_.frame_ready_endpoint_); - } - - if (vm.count("release")) - { - config_.frame_release_endpoint_ = vm["release"].as(); - LOG4CXX_DEBUG_LEVEL(1, logger_, "Setting frame release channel endpoint to " << config_.frame_release_endpoint_); - } - - if (vm.count("json_file")) + if (vm.count("config")) { - json_config_file_ = vm["json_file"].as(); - LOG4CXX_DEBUG_LEVEL(1, logger_, "Loading JSON configuration file " << json_config_file_); + config_file_ = vm["config"].as(); + LOG4CXX_DEBUG_LEVEL(1, logger_, "Loading JSON configuration file " << config_file_); } } @@ -333,9 +196,9 @@ void FrameReceiverApp::run(void) LOG4CXX_INFO(logger_, "frameReceiver version " << ODIN_DATA_VERSION_STR << " starting up"); - // Instantiate a controller + // Instantiate a controller controller_ = boost::shared_ptr( - new FrameReceiverController(config_) + new FrameReceiverController(config_) ); try { @@ -347,46 +210,45 @@ void FrameReceiverApp::run(void) controller_->configure(config_msg, config_reply); // Check for a JSON configuration file option - if (json_config_file_ != "") { + if (config_file_ != "") { // Attempt to open the file specified and read in the string as a JSON parameter set - std::ifstream t(json_config_file_.c_str()); - std::string json((std::istreambuf_iterator(t)), - std::istreambuf_iterator()); + std::ifstream config_file_stream(config_file_.c_str()); + std::string config_text((std::istreambuf_iterator(config_file_stream)), std::istreambuf_iterator()); // Check for empty JSON and throw an exception. - if (json == "") { + if (config_text == "") { throw OdinData::OdinDataException("Incorrect or empty JSON configuration file specified"); } // Parse the JSON file - rapidjson::Document param_doc; + rapidjson::Document config_json; - if (param_doc.Parse(json.c_str()).HasParseError()) { + if (config_json.Parse(config_text.c_str()).HasParseError()) { std::stringstream msg; - std::string error_snippet = extract_substr_at_pos(json, param_doc.GetErrorOffset(), 15); + std::string error_snippet = extract_substr_at_pos(config_text, config_json.GetErrorOffset(), 15); msg << "Parsing JSON configuration failed at line " - << extract_line_no(json, param_doc.GetErrorOffset()) << ": " - << rapidjson::GetParseError_En(param_doc.GetParseError()) << " " << error_snippet; + << extract_line_no(config_text, config_json.GetErrorOffset()) << ": " + << rapidjson::GetParseError_En(config_json.GetParseError()) << " " << error_snippet; throw OdinData::OdinDataException(msg.str()); } // Check if the top level object is an array - if (param_doc.IsArray()) { + if (config_json.IsArray()) { // Loop over the array submitting the child objects in order - for (rapidjson::SizeType i = 0; i < param_doc.Size(); ++i) { + for (rapidjson::SizeType i = 0; i < config_json.Size(); ++i) { // Create a configuration message - OdinData::IpcMessage json_config_msg(param_doc[i], - OdinData::IpcMessage::MsgTypeCmd, - OdinData::IpcMessage::MsgValCmdConfigure); + OdinData::IpcMessage json_config_msg( + config_json[i], OdinData::IpcMessage::MsgTypeCmd, OdinData::IpcMessage::MsgValCmdConfigure + ); // Now submit the config to the controller controller_->configure(json_config_msg, config_reply); } } else { // Single level JSON object // Create a configuration message - OdinData::IpcMessage json_config_msg(param_doc, - OdinData::IpcMessage::MsgTypeCmd, - OdinData::IpcMessage::MsgValCmdConfigure); + OdinData::IpcMessage json_config_msg( + config_json, OdinData::IpcMessage::MsgTypeCmd, OdinData::IpcMessage::MsgValCmdConfigure + ); // Now submit the config to the controller controller_->configure(json_config_msg, config_reply); } @@ -431,7 +293,7 @@ void intHandler (int sig) int main (int argc, char** argv) { - int rc = 0; + int rc = -1; // Initialise unexpected fault handling OdinData::init_seg_fault_handler(); @@ -445,17 +307,16 @@ int main (int argc, char** argv) OdinData::app_path = argv[0]; // Create a FrameReceiverApp instance - FrameReceiver::FrameReceiverApp fr_instance; + FrameReceiver::FrameReceiverApp app; // Parse command line arguments and set up node configuration - rc = fr_instance.parse_arguments (argc, argv); + rc = app.parse_arguments(argc, argv); - if (rc == 0) - { - // Run the instance - fr_instance.run (); + if (rc == -1) { + // Run the application + app.run(); + rc = 0; } return rc; - } diff --git a/cpp/frameReceiver/src/FrameReceiverController.cpp b/cpp/frameReceiver/src/FrameReceiverController.cpp index a995037a0..f61f637c9 100644 --- a/cpp/frameReceiver/src/FrameReceiverController.cpp +++ b/cpp/frameReceiver/src/FrameReceiverController.cpp @@ -94,6 +94,13 @@ void FrameReceiverController::configure(OdinData::IpcMessage& config_msg, set_debug_level(debug_level); } + // If frames given then we are running for defined number and then shutting down + if (config_msg.has_param(CONFIG_FRAMES) && config_msg.get_param(CONFIG_FRAMES) != 0) { + unsigned int frame_count = config_msg.get_param(CONFIG_FRAMES); + LOG4CXX_DEBUG_LEVEL(1, logger_, "Shutdown frame count set to: " << frame_count); + config_.frame_count_ = frame_count; + } + // Configure IPC channels this->configure_ipc_channels(config_msg); @@ -180,7 +187,7 @@ void FrameReceiverController::stop(const bool deferred) boost::bind(&FrameReceiverController::stop, this, false) ); } - else + else { LOG4CXX_TRACE(logger_, "FrameReceiverController::stop()"); terminate_controller_ = true; @@ -484,7 +491,7 @@ void FrameReceiverController::configure_frame_decoder(OdinData::IpcMessage& conf // Clear the decoder configuration status until succesful completion decoder_configured_ = false; - + if (decoder_type != Defaults::default_decoder_type) { std::string lib_name = "lib" + decoder_type + "FrameDecoder" + SHARED_LIBRARY_SUFFIX; @@ -602,7 +609,7 @@ void FrameReceiverController::configure_buffer_manager(OdinData::IpcMessage& con // Clear the buffer manager configuration status until succesful completion buffer_manager_configured_ = false; - + if (frame_decoder_) { @@ -704,7 +711,7 @@ void FrameReceiverController::configure_rx_thread(OdinData::IpcMessage& config_m // Clear the RX thread configuration status until succesful completion rx_thread_configured_ = false; - + if (frame_decoder_ && buffer_manager_) { @@ -814,7 +821,7 @@ void FrameReceiverController::handle_ctrl_channel(void) "Got control channel version request from client " << client_identity); this->get_version(ctrl_reply); break; - + case IpcMessage::MsgValCmdResetStatistics: LOG4CXX_DEBUG_LEVEL(3, logger_, "Got reset statistics request from client " << client_identity); @@ -827,7 +834,7 @@ void FrameReceiverController::handle_ctrl_channel(void) this->stop(true); ctrl_reply.set_msg_type(IpcMessage::MsgTypeAck); break; - + default: request_ok = false; error_ss << "Illegal command request value: " << req_val; @@ -884,15 +891,15 @@ void FrameReceiverController::handle_rx_channel(void) switch (msg_type) { case IpcMessage::MsgTypeCmd: - switch (msg_val) + switch (msg_val) { case IpcMessage::MsgValCmdBufferPrechargeRequest: LOG4CXX_DEBUG_LEVEL(2, logger_, "Got buffer precharge request from RX thread"); - this->precharge_buffers(); + this->precharge_buffers(); break; default: - LOG4CXX_ERROR(logger_, + LOG4CXX_ERROR(logger_, "Got unexpected value on command message from RX thread: " << rx_msg_encoded); break; } @@ -915,7 +922,7 @@ void FrameReceiverController::handle_rx_channel(void) rx_thread_identity_ = msg_indentity; { IpcMessage rx_reply(IpcMessage::MsgTypeAck, IpcMessage::MsgValNotifyIdentity); - rx_channel_.send(rx_reply.encode(), 0, rx_thread_identity_); + rx_channel_.send(rx_reply.encode(), 0, rx_thread_identity_); } break; @@ -925,14 +932,14 @@ void FrameReceiverController::handle_rx_channel(void) break; default: - LOG4CXX_ERROR(logger_, + LOG4CXX_ERROR(logger_, "Got unexpected value on notification message from RX thread: " << rx_msg_encoded); break; } break; default: - LOG4CXX_ERROR(logger_, + LOG4CXX_ERROR(logger_, "Got unexpected type on message from RX thread: " << rx_msg_encoded); break; } @@ -1056,7 +1063,7 @@ void FrameReceiverController::notify_buffer_config(const bool deferred) //! Store the RX thread status. //! -//! This method stores all the parameters present in the RX thread status message passed as an +//! This method stores all the parameters present in the RX thread status message passed as an //! argument, allowing them to be returned in subsequent get_status calls //! //! \param[in] rx_status_msg - IpcMessage containing RX thread status parameters @@ -1090,7 +1097,7 @@ void FrameReceiverController::get_status(OdinData::IpcMessage& status_reply) unsigned int frames_timedout = 0; unsigned int frames_dropped = 0; - if (rx_thread_status_) + if (rx_thread_status_) { empty_buffers = rx_thread_status_->get_param("rx_thread/empty_buffers"); mapped_buffers = rx_thread_status_->get_param("rx_thread/mapped_buffers"); diff --git a/cpp/frameSimulator/src/FrameSimulatorApp.cpp b/cpp/frameSimulator/src/FrameSimulatorApp.cpp index 21367ce67..7e7bfa6b7 100644 --- a/cpp/frameSimulator/src/FrameSimulatorApp.cpp +++ b/cpp/frameSimulator/src/FrameSimulatorApp.cpp @@ -36,9 +36,8 @@ static const std::string librarySuffix = "FrameSimulatorPlugin"; // Basic command line options static const FrameSimulatorOption opt_detector("detector", "Set the detector (Excalibur, Eiger etc.)"); static const FrameSimulatorOption opt_libpath("lib-path", "Path to detector plugin library"); -static const FrameSimulatorOption opt_debuglevel("debug-level", "Set the debug level"); -static const FrameSimulatorOption opt_logconfig("logconfig", - "Set the log4cxx logging configuration file"); +static const FrameSimulatorOption opt_debug_level("debug-level", "Set the debug level"); +static const FrameSimulatorOption opt_log_config("log-config", "Set the log4cxx logging configuration file"); /** Load requested plugin * /param[in] map of command line specified variables @@ -53,7 +52,7 @@ boost::shared_ptr get_requested_plugin(co boost::filesystem::path("lib" + pluginClass + ".so"); boost::shared_ptr plugin; - + try { plugin = OdinData::ClassLoader::load_class(pluginClass, libraryPathAndName.string()); } @@ -97,8 +96,8 @@ int parse_arguments(int argc, char **argv, po::variables_map &vm, LoggerPtr &log ("subargs", po::value < std::vector < std::string > > (), "Detector specific arguments"); opt_detector.add_option_to(generic); opt_libpath.add_option_to(generic); - opt_debuglevel.add_option_to(generic); - opt_logconfig.add_option_to(generic); + opt_debug_level.add_option_to(generic); + opt_log_config.add_option_to(generic); po::positional_options_description pos; pos.add(opt_detector.get_arg(), 1).add("subargs", -1); @@ -118,8 +117,8 @@ int parse_arguments(int argc, char **argv, po::variables_map &vm, LoggerPtr &log // Setup logging - if (opt_logconfig.is_specified(vm)) { - std::string logconf_fname = opt_logconfig.get_val(vm); + if (opt_log_config.is_specified(vm)) { + std::string logconf_fname = opt_log_config.get_val(vm); if (has_suffix(logconf_fname, ".xml")) { log4cxx::xml::DOMConfigurator::configure(logconf_fname); } else { @@ -167,8 +166,8 @@ int parse_arguments(int argc, char **argv, po::variables_map &vm, LoggerPtr &log std::cout << "usage: frameSimulator " + detector << " --lib-path " << std::endl << std::endl; std::cout << " --version Print version information" << std::endl; - std::cout << " --" + opt_debuglevel.get_argstring() + " " + opt_debuglevel.get_description() << std::endl; - std::cout << " --" + opt_logconfig.get_argstring() + " " + opt_logconfig.get_description() << std::endl << std::endl; + std::cout << " --" + opt_debug_level.get_argstring() + " " + opt_debug_level.get_description() << std::endl; + std::cout << " --" + opt_log_config.get_argstring() + " " + opt_log_config.get_description() << std::endl << std::endl; std::cout << config << std::endl; exit(1); } @@ -180,8 +179,8 @@ int parse_arguments(int argc, char **argv, po::variables_map &vm, LoggerPtr &log std::cout << "usage: frameSimulator --lib-path [options]" << std::endl << std::endl; std::cout << " --version Print version information" << std::endl; - std::cout << " --" + opt_debuglevel.get_argstring() + " " + opt_debuglevel.get_description() << std::endl; - std::cout << " --" + opt_logconfig.get_argstring() + " " + opt_logconfig.get_description() << std::endl; + std::cout << " --" + opt_debug_level.get_argstring() + " " + opt_debug_level.get_description() << std::endl; + std::cout << " --" + opt_log_config.get_argstring() + " " + opt_log_config.get_description() << std::endl; exit(1); } @@ -190,8 +189,8 @@ int parse_arguments(int argc, char **argv, po::variables_map &vm, LoggerPtr &log exit(1); } - if (opt_debuglevel.is_specified(vm)) { - set_debug_level(opt_debuglevel.get_val(vm)); + if (opt_debug_level.is_specified(vm)) { + set_debug_level(opt_debug_level.get_val(vm)); LOG4CXX_DEBUG_LEVEL(1, logger, "Debug level set to " << debug_level); }