From c302d730d1291599f903a4cdba627dbb58c590d1 Mon Sep 17 00:00:00 2001 From: Deepnarayan Sett Date: Wed, 16 Apr 2025 13:37:36 +0530 Subject: [PATCH 1/3] replaced nanomsg with nanomsg_sys --- src/lib_ccx/ccx_share.c | 55 +- src/lib_ccx/ccx_share.h | 1 - src/rust/Cargo.lock | 80 ++- src/rust/Cargo.toml | 1 + src/rust/lib_ccxr/Cargo.lock | 114 ++++ src/rust/lib_ccxr/Cargo.toml | 4 + src/rust/lib_ccxr/src/lib.rs | 2 + .../src/share/ccxr_sub_entry_message.rs | 16 + src/rust/lib_ccxr/src/share/mod.rs | 3 + src/rust/lib_ccxr/src/share/share.rs | 516 ++++++++++++++++++ src/rust/lib_ccxr/src/share/tests.rs | 260 +++++++++ src/rust/src/libccxr_exports/mod.rs | 2 + src/rust/src/libccxr_exports/share.rs | 115 ++++ 13 files changed, 1165 insertions(+), 4 deletions(-) create mode 100644 src/rust/lib_ccxr/src/share/ccxr_sub_entry_message.rs create mode 100644 src/rust/lib_ccxr/src/share/mod.rs create mode 100644 src/rust/lib_ccxr/src/share/share.rs create mode 100644 src/rust/lib_ccxr/src/share/tests.rs create mode 100644 src/rust/src/libccxr_exports/share.rs diff --git a/src/lib_ccx/ccx_share.c b/src/lib_ccx/ccx_share.c index e5026b9e2..4c13c2d8c 100644 --- a/src/lib_ccx/ccx_share.c +++ b/src/lib_ccx/ccx_share.c @@ -8,26 +8,46 @@ #include "ccx_common_option.h" #include "ccx_decoders_structs.h" #include "lib_ccx.h" - #ifdef ENABLE_SHARING #include #include ccx_share_service_ctx ccx_share_ctx; +#ifndef DISABLE_RUST +extern void ccxr_sub_entry_msg_cleanup_c(CcxSubEntryMessage *msg); +extern void ccxr_sub_entry_msg_print_c(const CcxSubEntryMessage *msg); +extern void ccxr_sub_entries_cleanup_c(ccx_sub_entries *entries); +extern void ccxr_sub_entries_print_c(const ccx_sub_entries *entries); +extern ccx_share_status ccxr_share_start_c(const char *stream_name); +extern ccx_share_status ccxr_share_stop_c(void); +extern ccx_share_status _ccxr_share_send_c(const CcxSubEntryMessage *msg); +extern ccx_share_status ccxr_share_send_c(const struct cc_subtitle *sub); +extern ccx_share_status ccxr_share_stream_done_c(const char *stream_name); +extern ccx_share_status _ccxr_share_sub_to_entries_c(const struct cc_subtitle *sub, ccx_sub_entries *entries); + +#endif void ccx_sub_entry_msg_cleanup(CcxSubEntryMessage *msg) { +#ifndef DISABLE_RUST + return ccxr_sub_entry_msg_cleanup_c(msg); +#else + for (int i = 0; i < msg->n_lines; i++) { free(msg->lines[i]); } free(msg->lines); free(msg->stream_name); +#endif } void ccx_sub_entry_msg_print(CcxSubEntryMessage *msg) { +#ifndef DISABLE_RUST + return ccxr_sub_entry_msg_print_c(msg); +#else if (!msg) { dbg_print(CCX_DMT_SHARE, "[share] print(!msg)\n"); @@ -55,6 +75,7 @@ void ccx_sub_entry_msg_print(CcxSubEntryMessage *msg) } dbg_print(CCX_DMT_SHARE, "[share] %s\n", msg->lines[i]); } +#endif } void ccx_sub_entries_init(ccx_sub_entries *entries) @@ -65,6 +86,9 @@ void ccx_sub_entries_init(ccx_sub_entries *entries) void ccx_sub_entries_cleanup(ccx_sub_entries *entries) { +#ifndef DISABLE_RUST + return ccxr_sub_entries_cleanup_c(entries); +#else for (int i = 0; i < entries->count; i++) { ccx_sub_entry_msg_cleanup(entries->messages + i); @@ -72,19 +96,27 @@ void ccx_sub_entries_cleanup(ccx_sub_entries *entries) free(entries->messages); entries->messages = NULL; entries->count = 0; +#endif } void ccx_sub_entries_print(ccx_sub_entries *entries) { +#ifndef DISABLE_RUST + return ccxr_sub_entries_print_c(entries); +#else dbg_print(CCX_DMT_SHARE, "[share] ccx_sub_entries_print (%u entries)\n", entries->count); for (int i = 0; i < entries->count; i++) { ccx_sub_entry_msg_print(entries->messages + i); } +#endif } ccx_share_status ccx_share_start(const char *stream_name) // TODO add stream { +#ifndef DISABLE_RUST + return ccxr_share_start_c(stream_name); +#else dbg_print(CCX_DMT_SHARE, "[share] ccx_share_start: starting service\n"); // TODO for multiple files we have to move creation to ccx_share_init ccx_share_ctx.nn_sock = nn_socket(AF_SP, NN_PUB); @@ -121,18 +153,26 @@ ccx_share_status ccx_share_start(const char *stream_name) // TODO add stream sleep(1); // We have to sleep a while, because it takes some time for subscribers to subscribe return CCX_SHARE_OK; +#endif } ccx_share_status ccx_share_stop() { +#ifndef DISABLE_RUST + return ccxr_share_stop_c(); +#else dbg_print(CCX_DMT_SHARE, "[share] ccx_share_stop: stopping service\n"); nn_shutdown(ccx_share_ctx.nn_sock, ccx_share_ctx.nn_binder); free(ccx_share_ctx.stream_name); return CCX_SHARE_OK; +#endif } ccx_share_status ccx_share_send(struct cc_subtitle *sub) { +#ifndef DISABLE_RUST + return ccxr_share_send_c(sub); +#else dbg_print(CCX_DMT_SHARE, "[share] ccx_share_send: sending\n"); ccx_sub_entries entries; ccx_sub_entries_init(&entries); @@ -154,10 +194,14 @@ ccx_share_status ccx_share_send(struct cc_subtitle *sub) ccx_sub_entries_cleanup(&entries); return CCX_SHARE_OK; +#endif } ccx_share_status _ccx_share_send(CcxSubEntryMessage *msg) { +#ifndef DISABLE_RUST + return _ccxr_share_send_c(msg); +#else dbg_print(CCX_DMT_SHARE, "[share] _ccx_share_send\n"); size_t len = ccx_sub_entry_message__get_packed_size(msg); void *buf = malloc(len); @@ -175,10 +219,14 @@ ccx_share_status _ccx_share_send(CcxSubEntryMessage *msg) free(buf); dbg_print(CCX_DMT_SHARE, "[share] _ccx_share_send: sent\n"); return CCX_SHARE_OK; +#endif } ccx_share_status ccx_share_stream_done(char *stream_name) { +#ifndef DISABLE_RUST + return ccxr_share_stream_done_c(stream_name); +#else CcxSubEntryMessage msg = CCX_SUB_ENTRY_MESSAGE__INIT; msg.eos = 1; msg.stream_name = strdup(stream_name); @@ -197,10 +245,14 @@ ccx_share_status ccx_share_stream_done(char *stream_name) ccx_sub_entry_msg_cleanup(&msg); return CCX_SHARE_OK; +#endif } ccx_share_status _ccx_share_sub_to_entries(struct cc_subtitle *sub, ccx_sub_entries *entries) { +#ifndef DISABLE_RUST + return _ccxr_share_sub_to_entries_c(sub, entries); +#else dbg_print(CCX_DMT_SHARE, "\n[share] _ccx_share_sub_to_entry\n"); if (sub->type == CC_608) { @@ -295,6 +347,7 @@ ccx_share_status _ccx_share_sub_to_entries(struct cc_subtitle *sub, ccx_sub_entr dbg_print(CCX_DMT_SHARE, "[share] done\n"); return CCX_SHARE_OK; +#endif } ccx_share_status ccx_share_launch_translator(char *langs, char *auth) diff --git a/src/lib_ccx/ccx_share.h b/src/lib_ccx/ccx_share.h index c6ddd6b0a..6148f7053 100644 --- a/src/lib_ccx/ccx_share.h +++ b/src/lib_ccx/ccx_share.h @@ -1,7 +1,6 @@ // // Created by Oleg Kisselef (olegkisselef at gmail dot com) on 6/21/15 // - #ifndef CCEXTRACTOR_CCX_SHARE_H #define CCEXTRACTOR_CCX_SHARE_H diff --git a/src/rust/Cargo.lock b/src/rust/Cargo.lock index 37aa15a8b..ac1ab2615 100644 --- a/src/rust/Cargo.lock +++ b/src/rust/Cargo.lock @@ -61,6 +61,12 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "anyhow" +version = "1.0.98" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e16d2d3311acee920a9eb8d33b8cbc1787ce4a264e85f964c2404b969bdcd487" + [[package]] name = "approx" version = "0.5.1" @@ -144,12 +150,27 @@ version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5c8214115b7bf84099f1309324e63141d4c5d7cc26862f97a0a857dbefe165bd" +[[package]] +name = "bytes" +version = "1.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a" + [[package]] name = "camino" version = "1.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b96ec4966b5813e2c0507c1f86115c8c5abaadc3980879c3424042a02fd1ad3" +[[package]] +name = "cc" +version = "1.2.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e3a13707ac958681c13b39b458c073d0d9bc8a22cb1b2f4c8e55eb72c13f362" +dependencies = [ + "shlex", +] + [[package]] name = "ccx_rust" version = "0.1.0" @@ -161,6 +182,7 @@ dependencies = [ "iconv", "leptonica-sys", "lib_ccxr", + "libc", "log", "num-integer", "palette", @@ -239,6 +261,15 @@ version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f46ad14479a25103f283c0f10005961cf086d8dc42205bb44c46ac563475dca6" +[[package]] +name = "cmake" +version = "0.1.54" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7caa3f9de89ddbe2c607f4101924c5abec803763ae9534e4f4d7d8f84aa81f0" +dependencies = [ + "cc", +] + [[package]] name = "colorchoice" version = "1.0.3" @@ -352,6 +383,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "gcc" +version = "0.3.55" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f5f3913fa0bfe7ee1fd8248b6b9f42a5af4b9d65ec2dd2c3c26132b950ecfc2" + [[package]] name = "glob" version = "0.3.2" @@ -610,7 +647,11 @@ dependencies = [ "bitflags 2.9.0", "crc32fast", "derive_more", + "lazy_static", + "libc", + "nanomsg-sys", "num_enum", + "prost", "strum 0.26.3", "strum_macros 0.26.4", "thiserror", @@ -620,9 +661,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.170" +version = "0.2.172" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "875b3680cb2f8f71bdcf9a30f38d48282f5d3c95cbf9b3fa57269bb5d5c06828" +checksum = "d750af042f7ef4f724306de029d18836c26c1765a54a6a3f094cbd23a7267ffa" [[package]] name = "libloading" @@ -664,6 +705,18 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" +[[package]] +name = "nanomsg-sys" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78aa3ccb6d007dfecb4f7070725c4b1670a87677babb6621cb0c8cce9cfdc004" +dependencies = [ + "cmake", + "gcc", + "libc", + "pkg-config", +] + [[package]] name = "nom" version = "7.1.3" @@ -850,6 +903,29 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "prost" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-derive" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" +dependencies = [ + "anyhow", + "itertools", + "proc-macro2", + "quote", + "syn 2.0.99", +] + [[package]] name = "quote" version = "1.0.39" diff --git a/src/rust/Cargo.toml b/src/rust/Cargo.toml index b94861db2..da6601ee9 100644 --- a/src/rust/Cargo.toml +++ b/src/rust/Cargo.toml @@ -28,6 +28,7 @@ cfg-if = "1.0.0" num-integer = "0.1.46" lib_ccxr = { path = "lib_ccxr" } url = "2.5.4" +libc = "0.2.172" [build-dependencies] bindgen = "0.64.0" diff --git a/src/rust/lib_ccxr/Cargo.lock b/src/rust/lib_ccxr/Cargo.lock index 8ec94ecf1..7b0338bae 100644 --- a/src/rust/lib_ccxr/Cargo.lock +++ b/src/rust/lib_ccxr/Cargo.lock @@ -2,18 +2,48 @@ # It is not intended for manual editing. version = 4 +[[package]] +name = "anyhow" +version = "1.0.98" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e16d2d3311acee920a9eb8d33b8cbc1787ce4a264e85f964c2404b969bdcd487" + [[package]] name = "bitflags" version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5c8214115b7bf84099f1309324e63141d4c5d7cc26862f97a0a857dbefe165bd" +[[package]] +name = "bytes" +version = "1.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a" + +[[package]] +name = "cc" +version = "1.2.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e3a13707ac958681c13b39b458c073d0d9bc8a22cb1b2f4c8e55eb72c13f362" +dependencies = [ + "shlex", +] + [[package]] name = "cfg-if" version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "cmake" +version = "0.1.54" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7caa3f9de89ddbe2c607f4101924c5abec803763ae9534e4f4d7d8f84aa81f0" +dependencies = [ + "cc", +] + [[package]] name = "convert_case" version = "0.4.0" @@ -62,6 +92,12 @@ dependencies = [ "syn", ] +[[package]] +name = "either" +version = "1.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" + [[package]] name = "equivalent" version = "1.0.2" @@ -77,6 +113,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "gcc" +version = "0.3.55" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f5f3913fa0bfe7ee1fd8248b6b9f42a5af4b9d65ec2dd2c3c26132b950ecfc2" + [[package]] name = "hashbrown" version = "0.15.2" @@ -238,12 +280,27 @@ dependencies = [ "hashbrown", ] +[[package]] +name = "itertools" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b192c782037fadd9cfa75548310488aabdbf3d2da73885b31bd0abd03351285" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c" +[[package]] +name = "lazy_static" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" + [[package]] name = "lib_ccxr" version = "0.1.0" @@ -251,7 +308,11 @@ dependencies = [ "bitflags", "crc32fast", "derive_more", + "lazy_static", + "libc", + "nanomsg-sys", "num_enum", + "prost", "strum", "strum_macros", "thiserror", @@ -259,6 +320,12 @@ dependencies = [ "url", ] +[[package]] +name = "libc" +version = "0.2.172" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d750af042f7ef4f724306de029d18836c26c1765a54a6a3f094cbd23a7267ffa" + [[package]] name = "litemap" version = "0.7.5" @@ -271,6 +338,18 @@ version = "2.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" +[[package]] +name = "nanomsg-sys" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78aa3ccb6d007dfecb4f7070725c4b1670a87677babb6621cb0c8cce9cfdc004" +dependencies = [ + "cmake", + "gcc", + "libc", + "pkg-config", +] + [[package]] name = "num-conv" version = "0.1.0" @@ -310,6 +389,12 @@ version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" +[[package]] +name = "pkg-config" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c" + [[package]] name = "powerfmt" version = "0.2.0" @@ -335,6 +420,29 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "prost" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-derive" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" +dependencies = [ + "anyhow", + "itertools", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "quote" version = "1.0.39" @@ -385,6 +493,12 @@ dependencies = [ "syn", ] +[[package]] +name = "shlex" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" + [[package]] name = "smallvec" version = "1.14.0" diff --git a/src/rust/lib_ccxr/Cargo.toml b/src/rust/lib_ccxr/Cargo.toml index 6162aef3a..7d0bcb141 100644 --- a/src/rust/lib_ccxr/Cargo.toml +++ b/src/rust/lib_ccxr/Cargo.toml @@ -15,6 +15,10 @@ strum = "0.26.3" strum_macros = "0.26.4" crc32fast = "1.4.2" num_enum = "0.6.1" +prost = "0.13.5" +lazy_static = "1.5.0" +nanomsg-sys = "0.7.2" +libc = "0.2.172" [features] default = [ diff --git a/src/rust/lib_ccxr/src/lib.rs b/src/rust/lib_ccxr/src/lib.rs index 9f32678db..d789efdee 100644 --- a/src/rust/lib_ccxr/src/lib.rs +++ b/src/rust/lib_ccxr/src/lib.rs @@ -5,3 +5,5 @@ pub mod subtitle; pub mod teletext; pub mod time; pub mod util; +#[cfg(feature = "enable_sharing")] +pub mod share; \ No newline at end of file diff --git a/src/rust/lib_ccxr/src/share/ccxr_sub_entry_message.rs b/src/rust/lib_ccxr/src/share/ccxr_sub_entry_message.rs new file mode 100644 index 000000000..2cabcdc56 --- /dev/null +++ b/src/rust/lib_ccxr/src/share/ccxr_sub_entry_message.rs @@ -0,0 +1,16 @@ +// This file is @generated by prost-build. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct CcxSubEntryMessage { + #[prost(int32, required, tag = "1")] + pub eos: i32, + #[prost(string, required, tag = "2")] + pub stream_name: ::prost::alloc::string::String, + #[prost(int64, required, tag = "3")] + pub counter: i64, + #[prost(int64, required, tag = "4")] + pub start_time: i64, + #[prost(int64, required, tag = "5")] + pub end_time: i64, + #[prost(string, repeated, tag = "7")] + pub lines: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, +} diff --git a/src/rust/lib_ccxr/src/share/mod.rs b/src/rust/lib_ccxr/src/share/mod.rs new file mode 100644 index 000000000..8938af992 --- /dev/null +++ b/src/rust/lib_ccxr/src/share/mod.rs @@ -0,0 +1,3 @@ +pub mod share; +pub mod ccxr_sub_entry_message; +mod tests; diff --git a/src/rust/lib_ccxr/src/share/share.rs b/src/rust/lib_ccxr/src/share/share.rs new file mode 100644 index 000000000..31c4138fa --- /dev/null +++ b/src/rust/lib_ccxr/src/share/share.rs @@ -0,0 +1,516 @@ +use crate::common::Options; +use crate::share::ccxr_sub_entry_message::CcxSubEntryMessage; +use crate::util::log::{debug, DebugMessageFlag}; +use lazy_static::lazy_static; +use libc::size_t; +use nanomsg_sys::{nn_bind, nn_send, nn_setsockopt, nn_shutdown, nn_socket, AF_SP, NN_LINGER, NN_PUB, NN_SOL_SOCKET}; +use prost::Message; +use std::cmp::PartialEq; +use std::ffi::c_void; +use std::os::raw::{c_char, c_int}; +use std::sync::{LazyLock, Mutex}; +use std::{ffi::{CStr, CString}, thread, time::Duration}; +// use crate::bindings::{cc_subtitle, ccx_output_format}; + +pub const CCX_DECODER_608_SCREEN_ROWS: usize = 15; +pub const CCX_DECODER_608_SCREEN_WIDTH: usize = 32; +pub static CCX_OPTIONS: LazyLock> = LazyLock::new(|| Mutex::new(Options::default())); + +#[repr(C)] +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub enum CcxEia608Format { + SFormatCcScreen = 0, + SFormatCcLine = 1, + SFormatXds = 2, +} + +#[repr(C)] +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub enum CcModes { + ModePopOn = 0, + ModeRollUp2 = 1, + ModeRollUp3 = 2, + ModeRollUp4 = 3, + ModeText = 4, + ModePaintOn = 5, + ModeFakeRollUp1 = 100, +} + + +#[repr(C)] +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub enum CcxDecoder608ColorCode { + White = 0, + Green = 1, + Blue = 2, + Cyan = 3, + Red = 4, + Yellow = 5, + Magenta = 6, + UserDefined = 7, + Black = 8, + Transparent = 9, + Max = 10, +} + +#[repr(C)] +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub enum FontBits { + Normal = 0, + Italics = 1, + Underline = 2, + UnderlineItalics = 3, +} + + +#[repr(C)] +#[derive(Debug, Clone)] +pub struct Eia608Screen { + pub format: CcxEia608Format, + pub characters: [[c_char; CCX_DECODER_608_SCREEN_WIDTH + 1]; CCX_DECODER_608_SCREEN_ROWS], + pub colors: [[CcxDecoder608ColorCode; CCX_DECODER_608_SCREEN_WIDTH + 1]; CCX_DECODER_608_SCREEN_ROWS], + pub fonts: [[FontBits; CCX_DECODER_608_SCREEN_WIDTH + 1]; CCX_DECODER_608_SCREEN_ROWS], + pub row_used: [c_int; CCX_DECODER_608_SCREEN_ROWS], + pub empty: c_int, + pub start_time: i64, + pub end_time: i64, + pub mode: CcModes, + pub channel: c_int, + pub my_field: c_int, + pub xds_str: *mut c_char, + pub xds_len: usize, + pub cur_xds_packet_class: c_int, +} + +impl Default for Eia608Screen { + fn default() -> Self { + Self { + format: CcxEia608Format::SFormatCcScreen, + characters: [[0; CCX_DECODER_608_SCREEN_WIDTH + 1]; CCX_DECODER_608_SCREEN_ROWS], + colors: [[CcxDecoder608ColorCode::Black; CCX_DECODER_608_SCREEN_WIDTH + 1]; CCX_DECODER_608_SCREEN_ROWS], + fonts: [[FontBits::Normal; CCX_DECODER_608_SCREEN_WIDTH + 1]; CCX_DECODER_608_SCREEN_ROWS], + row_used: [0; CCX_DECODER_608_SCREEN_ROWS], + empty: 1, + start_time: 0, + end_time: 0, + mode: CcModes::ModePopOn, + channel: 0, + my_field: 0, + xds_str: std::ptr::null_mut(), + xds_len: 0, + cur_xds_packet_class: 0, + } + } +} + +impl Eia608Screen { + pub fn new() -> Self { + Self::default() + } + + pub fn set_xds_str(&mut self, xds: &str) { + let c_string = CString::new(xds).expect("CString::new failed"); + self.xds_str = c_string.into_raw(); + self.xds_len = xds.len(); + } + + pub fn free_xds_str(&mut self) { + if !self.xds_str.is_null() { + unsafe { + let _ = CString::from_raw(self.xds_str); + } + self.xds_str = std::ptr::null_mut(); + self.xds_len = 0; + } + } +} + +impl Drop for Eia608Screen { + fn drop(&mut self) { + self.free_xds_str(); + } +} + + +pub type SubDataType = std::os::raw::c_uint; +pub type LLONG = i64; +pub const SUBTYPE_CC_BITMAP: Subtype = 0; +pub const SUBTYPE_CC_608: Subtype = 1; +pub const SUBTYPE_CC_TEXT: Subtype = 2; +pub const SUBTYPE_CC_RAW: Subtype = 3; +pub type Subtype = std::os::raw::c_uint; + +pub type CcxEncodingType = std::os::raw::c_uint; + +pub struct CcSubtitle { + #[doc = " A generic data which contain data according to decoder\n @warn decoder cant output multiple types of data"] + pub data: *mut std::os::raw::c_void, + pub datatype: SubDataType, + #[doc = " number of data"] + pub nb_data: std::os::raw::c_uint, + #[doc = " type of subtitle"] + pub type_: Subtype, + #[doc = " Encoding type of Text, must be ignored in case of subtype as bitmap or cc_screen"] + pub enc_type: CcxEncodingType, + pub start_time: LLONG, + pub end_time: LLONG, + pub flags: c_int, + pub lang_index: c_int, + #[doc = " flag to tell that decoder has given output"] + pub got_output: c_int, + pub mode: [c_char; 5usize], + pub info: [c_char; 4usize], + #[doc = " Used for DVB end time in ms"] + pub time_out: c_int, + pub next: *mut CcSubtitle, + pub prev: *mut CcSubtitle, +} + +#[repr(C)] +#[derive(Debug)] +pub enum CcxShareStatus { + Ok = 0, + Fail, +} +impl PartialEq for CcxShareStatus { + fn eq(&self, other: &Self) -> bool { + match (self, other) { + (CcxShareStatus::Ok, CcxShareStatus::Ok) => true, + (CcxShareStatus::Fail, CcxShareStatus::Fail) => true, + _ => false, + } + } +} +pub struct CcxShareServiceCtx { + counter: i64, + stream_name: Option, + nn_sock: libc::c_int, + nn_binder: libc::c_int, +} + +pub struct CcxSubEntries { + pub messages: Vec, +} +impl CcxShareServiceCtx { + fn new() -> Self { + CcxShareServiceCtx { + counter: 0, + stream_name: None, + nn_sock: 0, + nn_binder: 0, + } + } +} + + +lazy_static! { + pub static ref CCX_SHARE_CTX: Mutex = Mutex::new(CcxShareServiceCtx::new()); +} + + + +pub fn ccxr_sub_entry_msg_init(msg: &mut CcxSubEntryMessage) { + msg.eos = 0; + msg.stream_name = "".parse().unwrap(); + msg.counter = 0; + msg.start_time = 0; + msg.end_time = 0; + msg.lines.clear(); +} + + +pub fn ccxr_sub_entry_msg_cleanup(msg: &mut CcxSubEntryMessage) { + msg.lines.clear(); + msg.stream_name = "".parse().unwrap(); +} + +pub fn ccxr_sub_entry_msg_print(msg: &CcxSubEntryMessage) { + if msg.lines.is_empty() { + debug!(msg_type = DebugMessageFlag::SHARE; "[share] no lines allocated"); + return; + } + + debug!(msg_type = DebugMessageFlag::SHARE; "\n[share] sub msg #{}", msg.counter); + if !msg.stream_name.is_empty() { + debug!(msg_type = DebugMessageFlag::SHARE; "[share] name: {}", msg.stream_name); + } else { + debug!(msg_type = DebugMessageFlag::SHARE; "[share] name: None"); + } + debug!(msg_type = DebugMessageFlag::SHARE; "[share] start: {}", msg.start_time); + debug!(msg_type = DebugMessageFlag::SHARE; "[share] end: {}", msg.end_time); + debug!(msg_type = DebugMessageFlag::SHARE; "[share] lines count: {}", msg.lines.len()); + + if msg.lines.is_empty() { + debug!(msg_type = DebugMessageFlag::SHARE; "[share] no lines allocated"); + return; + } + for (i, line) in msg.lines.iter().enumerate() { + if !line.is_empty() { + debug!(msg_type = DebugMessageFlag::SHARE; "[share] line[{}]: {}", i, line); + } else { + debug!(msg_type = DebugMessageFlag::SHARE; "[share] line[{}] is not allocated", i); + } + } +} + + +pub fn ccxr_sub_entries_cleanup(entries: &mut CcxSubEntries) { + entries.messages.clear(); +} + +pub fn ccxr_sub_entries_print(entries: &CcxSubEntries) { + eprintln!("[share] ccxr_sub_entries_print ({} entries)", entries.messages.len()); + for message in &entries.messages { + ccxr_sub_entry_msg_print(message); + } +} + +pub unsafe fn ccxr_share_start(stream_name: Option<&str>) -> CcxShareStatus { + let mut ccx_options = CCX_OPTIONS.lock().unwrap(); + + // Debug print similar to dbg_print in C + debug!(msg_type = DebugMessageFlag::SHARE; "[share] ccx_share_start: starting service\n"); + + // Create a nanomsg socket with domain AF_SP and protocol NN_PUB + let nn_sock = nn_socket(AF_SP, NN_PUB); + if nn_sock < 0 { + debug!(msg_type = DebugMessageFlag::SHARE; "[share] ccx_share_start: can't nn_socket()\n"); + return CcxShareStatus::Fail; + } + CCX_SHARE_CTX.lock().unwrap().nn_sock = nn_sock; + // Set a default URL if one was not already provided. + if ccx_options.sharing_url.is_none() { + ccx_options.sharing_url = Some("tcp://*:3269".to_string().parse().unwrap()); + } + + // Convert the sharing URL into a C-compatible string. + let url = ccx_options.sharing_url.as_ref().unwrap(); + let sharing_url_cstr = CString::new(url.as_str()).expect("Failed to create CString"); + debug!(msg_type = DebugMessageFlag::SHARE; "[share] ccx_share_start: url={}", ccx_options.sharing_url.as_mut().unwrap()); + + // Bind the socket to the URL. + let nn_binder = nn_bind(nn_sock, sharing_url_cstr.as_ptr()); + if nn_binder < 0 { + debug!(msg_type = DebugMessageFlag::SHARE; "[share] ccx_share_start: can't nn_bind()"); + return CcxShareStatus::Fail; + } + CCX_SHARE_CTX.lock().unwrap().nn_binder = nn_binder; + + // Set the linger socket option to -1. + let linger: i32 = -1; + let ret = nn_setsockopt( + nn_sock, + NN_SOL_SOCKET, + NN_LINGER, + &linger as *const _ as *const c_void, + std::mem::size_of::() as size_t, + ); + if ret < 0 { + debug!(msg_type = DebugMessageFlag::SHARE; "[share] ccx_share_start: can't nn_setsockopt()"); + return CcxShareStatus::Fail; + } + + // Save the stream name into the context, defaulting to "unknown" if not provided. + CCX_SHARE_CTX.lock().unwrap().stream_name = Some(stream_name.unwrap_or("unknown").to_string()); + + // Sleep for 1 second to allow subscribers to connect. + thread::sleep(Duration::from_secs(1)); + + CcxShareStatus::Ok +} + + +pub unsafe fn ccxr_share_stop() -> CcxShareStatus { + let mut ctx = CCX_SHARE_CTX.lock().unwrap(); + + debug!(msg_type = DebugMessageFlag::SHARE; "[share] ccx_share_stop: stopping service"); + + + nn_shutdown(CCX_SHARE_CTX.lock().unwrap().nn_sock, CCX_SHARE_CTX.lock().unwrap().nn_binder); + ctx.stream_name = None; + CcxShareStatus::Ok +} + + +pub unsafe fn ccxr_share_send(sub: *mut CcSubtitle) -> CcxShareStatus { + debug!(msg_type = DebugMessageFlag::SHARE; "[share] ccx_share_send: sending"); + + // Create an entries structure and populate it from the subtitle. + let mut entries = CcxSubEntries { messages: Vec::new() }; + if ccxr_share_sub_to_entries( &*sub , &mut entries) == CcxShareStatus::Fail { + debug!(msg_type = DebugMessageFlag::SHARE; "[share] failed to convert subtitle to entries"); + return CcxShareStatus::Fail; + } + + // Debug print of entries. + ccxr_sub_entries_print(&entries); + debug!(msg_type = DebugMessageFlag::SHARE; "[share] entry obtained:"); + + // Iterate over all entries and send them. + for (i, message) in entries.messages.iter().enumerate() { + debug!(msg_type = DebugMessageFlag::SHARE; "[share] ccx_share_send: sending entry {}", i); + if entries.messages[i].lines.is_empty() { + debug!(msg_type = DebugMessageFlag::SHARE; "[share] skipping empty message"); + continue; + } + if _ccxr_share_send(message) != CcxShareStatus::Ok { + debug!(msg_type = DebugMessageFlag::SHARE; "[share] can't send message"); + return CcxShareStatus::Fail; + } + } + + ccxr_sub_entries_cleanup(&mut entries); + CcxShareStatus::Ok +} + +pub fn ccxr_sub_entry_message_get_packed_size(message: &CcxSubEntryMessage) -> usize { + message.encoded_len() +} + +pub fn ccxr_sub_entry_message_pack(message: &CcxSubEntryMessage, buf: &mut Vec) -> Result<(), prost::EncodeError> { + message.encode(buf) +} + + + +pub unsafe fn _ccxr_share_send(msg: &CcxSubEntryMessage) -> CcxShareStatus { + debug!(msg_type = DebugMessageFlag::SHARE; "[share] _ccx_share_send"); + + let len: usize = ccxr_sub_entry_message_get_packed_size(msg); + + // Allocate a buffer to hold the packed message. + let mut buf = Vec::with_capacity(len); + if buf.is_empty() { + debug!(msg_type = DebugMessageFlag::SHARE; "[share] _ccx_share_send: malloc failed"); + return CcxShareStatus::Fail; + } + + debug!(msg_type = DebugMessageFlag::SHARE; "[share] _ccx_share_send: packing"); + ccxr_sub_entry_message_pack(msg, &mut buf).expect( + "Failed to pack message", + ); + + debug!(msg_type = DebugMessageFlag::SHARE; "[share] _ccx_share_send: sending"); + let sent: c_int = nn_send( + CCX_SHARE_CTX.lock().unwrap().nn_sock, + buf.as_ptr() as *const c_void, + len, + 0, + ); + if sent != len as c_int { + buf.clear(); + debug!(msg_type = DebugMessageFlag::SHARE; "[share] _ccx_share_send: len={} sent={}", len, sent); + return CcxShareStatus::Fail; + } + buf.clear(); + debug!(msg_type = DebugMessageFlag::SHARE; "[share] _ccx_share_send: sent"); + CcxShareStatus::Ok +} + +pub unsafe fn ccxr_share_stream_done(stream_name: &str) -> CcxShareStatus { + let mut msg = CcxSubEntryMessage { + eos: 1, + stream_name: stream_name.parse().unwrap(), + counter: 0, + start_time: 0, + end_time: 0, + lines: Vec::new(), + }; + #[allow(unused)] + let mut ctx = CCX_SHARE_CTX.lock().unwrap(); + + if _ccxr_share_send(&msg) != CcxShareStatus::Ok { + ccxr_sub_entry_msg_cleanup(&mut msg); + debug!(msg_type = DebugMessageFlag::SHARE; "[share] ccx_share_stream_done: can't send message"); + return CcxShareStatus::Fail; + } + debug!(msg_type = DebugMessageFlag::SHARE; "[share] ccx_share_stream_done: message sent successfully"); + CcxShareStatus::Ok +} + +pub fn ccxr_share_sub_to_entries(sub: &CcSubtitle, entries: &mut CcxSubEntries) -> CcxShareStatus { + unsafe { + let mut ctx = CCX_SHARE_CTX.lock().unwrap(); + + debug!(msg_type = DebugMessageFlag::SHARE; "[share] _ccx_share_sub_to_entries"); + + if sub.type_ == SUBTYPE_CC_608 { + debug!(msg_type = DebugMessageFlag::SHARE; "[share] CC_608"); + + let data_ptr = sub.data as *const Eia608Screen; + let mut nb_data = sub.nb_data; + + while nb_data > 0 { + let data = &*data_ptr.add(sub.nb_data as usize - nb_data as usize); + + debug!(msg_type = DebugMessageFlag::SHARE; "[share] data item"); + + if data.format == CcxEia608Format::SFormatXds { + debug!(msg_type = DebugMessageFlag::SHARE; "[share] XDS. Skipping"); + nb_data -= 1; + continue; + } + + if data.start_time == 0 { + debug!(msg_type = DebugMessageFlag::SHARE; "[share] No start time. Skipping"); + break; + } + + entries.messages.push(CcxSubEntryMessage { + eos: 0, + stream_name: String::new(), + counter: ctx.counter + 1, + start_time: data.start_time, + end_time: data.end_time, + lines: Vec::new(), + }); + + let entry_index = entries.messages.len() - 1; + let entry = &mut entries.messages[entry_index]; + + for row in 0..CCX_DECODER_608_SCREEN_ROWS { + if data.row_used[row] != 0 { + let characters = CStr::from_ptr(data.characters[row].as_ptr()) + .to_string_lossy() + .to_string(); + entry.lines.push(characters); + } + } + + if entry.lines.is_empty() { + debug!(msg_type = DebugMessageFlag::SHARE; "[share] buffer is empty"); + entries.messages.pop(); + return CcxShareStatus::Ok; + } + + debug!( + msg_type = DebugMessageFlag::SHARE; + "[share] Copied {} lines", entry.lines.len() + ); + + ctx.counter += 1; + nb_data -= 1; + + debug!(msg_type = DebugMessageFlag::SHARE; "[share] item done"); + } + } else { + match sub.type_ { + SUBTYPE_CC_BITMAP => { + debug!(msg_type = DebugMessageFlag::SHARE; "[share] CC_BITMAP. Skipping"); + } + SUBTYPE_CC_RAW => { + debug!(msg_type = DebugMessageFlag::SHARE; "[share] CC_RAW. Skipping"); + } + SUBTYPE_CC_TEXT => { + debug!(msg_type = DebugMessageFlag::SHARE; "[share] CC_TEXT. Skipping"); + } + _ => { + debug!(msg_type = DebugMessageFlag::SHARE; "[share] Unknown subtitle type"); + } + } + } + + debug!(msg_type = DebugMessageFlag::SHARE; "[share] done"); + CcxShareStatus::Ok + } +} diff --git a/src/rust/lib_ccxr/src/share/tests.rs b/src/rust/lib_ccxr/src/share/tests.rs new file mode 100644 index 000000000..b9c2c712e --- /dev/null +++ b/src/rust/lib_ccxr/src/share/tests.rs @@ -0,0 +1,260 @@ +#![allow(unused_imports)] +#![allow(unused)] +use libc::c_char; +use crate::share::share::*; +use crate::share::ccxr_sub_entry_message::*; +use std::sync::Once; +use crate::util::log::{set_logger, CCExtractorLogger, DebugMessageFlag, DebugMessageMask, OutputTarget}; + +mod test { + use super::*; + + static INIT: Once = Once::new(); + + fn initialize_logger() { + INIT.call_once(|| { + set_logger(CCExtractorLogger::new( + OutputTarget::Stdout, + DebugMessageMask::new(DebugMessageFlag::VERBOSE, DebugMessageFlag::VERBOSE), + false, + )) + .ok(); + }); + } + #[test] + fn test_ffi_sub_entry_msg_cleanup() { + let mut msg = CcxSubEntryMessage { + eos: 0, + stream_name: "test".to_string(), + counter: 0, + start_time: 0, + end_time: 0, + lines: vec!["test".to_string()], + }; + + unsafe { + ccxr_sub_entry_msg_cleanup(&mut *(&mut msg as *mut CcxSubEntryMessage)); + } + + assert!(msg.lines.is_empty()); + assert_eq!(msg.stream_name, ""); + } + + #[test] + fn test_ffi_sub_entry_msg_print() { + let msg = CcxSubEntryMessage { + eos: 0, + stream_name: "test".to_string(), + counter: 0, + start_time: 0, + end_time: 0, + lines: vec![ + "test".to_string(), + "test".to_string(), + "test".to_string(), + ], + }; + + unsafe { + ccxr_sub_entry_msg_print(&*(&msg as *const CcxSubEntryMessage)); + } + } + #[test] + fn test_ffi_sub_entries_init() { + let mut entries = CcxSubEntries { + messages: vec![CcxSubEntryMessage { + eos: 0, + counter: 1, + stream_name: "Test".parse().unwrap(), + start_time: 0, + end_time: 0, + lines: vec![], + }], + }; + + unsafe { + ccxr_sub_entries_cleanup(&mut *(&mut entries as *mut CcxSubEntries)); + } + + assert!(entries.messages.is_empty()); + } + + #[test] + fn test_ffi_sub_entries_cleanup() { + let mut entries = CcxSubEntries { + messages: vec![CcxSubEntryMessage { + eos: 0, + counter: 1, + stream_name: "Test".parse().unwrap(), + start_time: 0, + end_time: 0, + lines: vec![], + }], + }; + + unsafe { + ccxr_sub_entries_cleanup(&mut *(&mut entries as *mut CcxSubEntries)); + } + + assert!(entries.messages.is_empty()); + } + + #[test] + fn test_ffi_sub_entries_print() { + let entries = CcxSubEntries { + messages: vec![CcxSubEntryMessage { + eos: 0, + counter: 1, + stream_name: "Test".parse().unwrap(), + start_time: 0, + end_time: 0, + lines: vec![], + }], + }; + + unsafe { + ccxr_sub_entries_print(&*(&entries as *const CcxSubEntries)); + } + } + + + #[test] + fn test_ccxr_share_send() { + initialize_logger(); + + let mut sub = CcSubtitle { + data: std::ptr::null_mut(), + datatype: 1, + nb_data: 0, + type_: 1, + enc_type: 1, + start_time: 0, + end_time: 0, + flags: 0, + lang_index: 0, + got_output: 0, + mode: [0; 5], + info: [0; 4], + time_out: 0, + next: std::ptr::null_mut(), + prev: std::ptr::null_mut(), + + }; + let status = unsafe { ccxr_share_send(&mut sub as *mut CcSubtitle) }; + assert!(matches!(status, CcxShareStatus::Ok | CcxShareStatus::Fail)); + } + + #[test] + fn test_ccxr_share_send_c() { + initialize_logger(); + let msg = CcxSubEntryMessage { + eos: 0, + stream_name: "test_stream".to_string(), + counter: 1, + start_time: 0, + end_time: 0, + lines: vec!["Line 1".to_string(), "Line 2".to_string()], + }; + + let status = unsafe { _ccxr_share_send(&*(&msg as *const CcxSubEntryMessage)) }; + assert!(matches!(status, CcxShareStatus::Ok | CcxShareStatus::Fail)); + } + + + const CCX_DECODER_608_SCREEN_WIDTH: usize = 32; + const CCX_DECODER_608_SCREEN_ROWS: usize = 15; + + #[test] + fn test_ccxr_share_sub_to_entries() { + initialize_logger(); + const NUM_ROWS: usize = CCX_DECODER_608_SCREEN_ROWS; + let mut screen = Eia608Screen::new(); + screen.row_used[0] = 1; + screen.row_used[2] = 1; + screen.row_used[4] = 1; + + let row_0_content = b"Hello, World!"; + let row_2_content = b"Subtitle line 2"; + let row_4_content = b"Subtitle line 3"; + + for (i, &ch) in row_0_content.iter().enumerate() { + screen.characters[0][i] = ch as c_char; + } + for (i, &ch) in row_2_content.iter().enumerate() { + screen.characters[2][i] = ch as c_char; + } + for (i, &ch) in row_4_content.iter().enumerate() { + screen.characters[4][i] = ch as c_char; + } + + screen.start_time = 1000; + screen.end_time = 2000; + screen.mode = CcModes::ModePaintOn; + screen.channel = 1; + screen.my_field = 42; + + let sub = CcSubtitle { + data: &screen as *const _ as *mut std::os::raw::c_void, + datatype: 1, + nb_data: 1, + type_: SUBTYPE_CC_608, + enc_type: 1, + start_time: 0, + end_time: 0, + flags: 0, + lang_index: 0, + got_output: 1, + mode: [b'M' as c_char, b'O' as c_char, b'D' as c_char, b'E' as c_char, 0], + info: [b'I' as c_char, b'N' as c_char, b'F' as c_char, 0], + time_out: 0, + next: std::ptr::null_mut(), + prev: std::ptr::null_mut(), + }; + let mut entries = CcxSubEntries { messages: Vec::new() }; + let status = ccxr_share_sub_to_entries(&sub, &mut entries); + assert_eq!(status, CcxShareStatus::Ok, "Function should return OK status"); + assert_eq!(entries.messages.len(), 1, "There should be one entry in messages"); + + let message = &entries.messages[0]; + assert_eq!(message.start_time, 1000, "Start time should match input"); + assert_eq!(message.end_time, 2000, "End time should match input"); + assert_eq!(message.lines.len(), 3, "There should be 3 lines of content"); + + assert_eq!(message.lines[0], "Hello, World!", "First line content mismatch"); + assert_eq!(message.lines[1], "Subtitle line 2", "Second line content mismatch"); + assert_eq!(message.lines[2], "Subtitle line 3", "Third line content mismatch"); + } + + #[test] + fn test_ccxr_share_sub_to_entries_empty_rows() { + let mut screen = Eia608Screen::new(); + + screen.start_time = 1000; + screen.end_time = 2000; + + let sub = CcSubtitle { + data: &screen as *const _ as *mut std::os::raw::c_void, + datatype: 1, + nb_data: 1, + type_: SUBTYPE_CC_608, + enc_type: 1, + start_time: 0, + end_time: 0, + flags: 0, + lang_index: 0, + got_output: 1, + mode: [b'M' as c_char, b'O' as c_char, b'D' as c_char, b'E' as c_char, 0], + info: [b'I' as c_char, b'N' as c_char, b'F' as c_char, 0], + time_out: 0, + next: std::ptr::null_mut(), + prev: std::ptr::null_mut(), + }; + + let mut entries = CcxSubEntries { messages: Vec::new() }; + + let status = ccxr_share_sub_to_entries(&sub, &mut entries); + + assert_eq!(status, CcxShareStatus::Ok, "Function should return OK status"); + assert_eq!(entries.messages.len(), 0, "There should be no messages for empty rows"); + } +} diff --git a/src/rust/src/libccxr_exports/mod.rs b/src/rust/src/libccxr_exports/mod.rs index c2f64a258..19834235d 100644 --- a/src/rust/src/libccxr_exports/mod.rs +++ b/src/rust/src/libccxr_exports/mod.rs @@ -1,6 +1,8 @@ //! Provides C-FFI functions that are direct equivalent of functions available in C. pub mod time; +#[cfg(feature = "enable_sharing")] +pub mod share; use crate::ccx_options; use lib_ccxr::util::log::*; use lib_ccxr::util::{bits::*, levenshtein::*}; diff --git a/src/rust/src/libccxr_exports/share.rs b/src/rust/src/libccxr_exports/share.rs new file mode 100644 index 000000000..50136f509 --- /dev/null +++ b/src/rust/src/libccxr_exports/share.rs @@ -0,0 +1,115 @@ +use lib_ccxr::share::ccxr_sub_entry_message::*; +use lib_ccxr::share::share::*; +use lib_ccxr::util::log::{debug, DebugMessageFlag}; +use std::ffi::CStr; +/// C-compatible function to clean up a `CcxSubEntryMessage`. +#[no_mangle] +pub extern "C" fn ccxr_sub_entry_msg_cleanup_c(msg: *mut CcxSubEntryMessage) { + unsafe { + if msg.is_null() { + return; + } + let msg = &mut *msg; + ccxr_sub_entry_msg_cleanup(msg); + } +} + +/// C-compatible function to print a `CcxSubEntryMessage`. +#[no_mangle] +pub unsafe extern "C" fn ccxr_sub_entry_msg_print_c(msg: *const CcxSubEntryMessage) { + if msg.is_null() { + debug!(msg_type = DebugMessageFlag::SHARE; "[share] print(!msg)\n"); + return; + } + + let msg = &*msg; + + // Call the main Rust function + ccxr_sub_entry_msg_print(msg); +} + + +#[no_mangle] +pub unsafe extern "C" fn ccxr_sub_entries_cleanup_c(entries: *mut CcxSubEntries) { + if entries.is_null() { + return; + } + let entries = &mut *entries; + ccxr_sub_entries_cleanup(entries); +} + +#[no_mangle] +pub unsafe extern "C" fn ccxr_sub_entries_print_c(entries: *const CcxSubEntries) { + if entries.is_null() { + debug!(msg_type = DebugMessageFlag::SHARE; "[share] ccxr_sub_entries_print (null entries)\n"); + return; + } + let entries = &*entries; + ccxr_sub_entries_print(entries); +} + +/// C-compatible function to start the sharing service. +#[no_mangle] +pub unsafe extern "C" fn ccxr_share_start_c(stream_name: *const libc::c_char) -> CcxShareStatus { + if stream_name.is_null() { + return ccxr_share_start(Option::from("unknown")); + } + + let c_str = CStr::from_ptr(stream_name); + let stream_name = match c_str.to_str() { + Ok(name) => name, + Err(_) => return CcxShareStatus::Fail, + }; + + ccxr_share_start(Option::from(stream_name)) +} +#[no_mangle] +pub unsafe extern "C" fn ccxr_share_stop_c() -> CcxShareStatus { + ccxr_share_stop() +} + +#[no_mangle] +pub unsafe extern "C" fn _ccxr_share_send_c(msg: *const CcxSubEntryMessage) -> CcxShareStatus { + if msg.is_null() { + return CcxShareStatus::Fail; + } + _ccxr_share_send(&*msg) +} + +#[no_mangle] +pub unsafe extern "C" fn ccxr_share_send_c(sub: *const CcSubtitle) -> CcxShareStatus { + if sub.is_null() { + return CcxShareStatus::Fail; + } + ccxr_share_send(sub as *mut CcSubtitle) +} + +#[no_mangle] +pub unsafe extern "C" fn ccxr_share_stream_done_c(stream_name: *const libc::c_char) -> CcxShareStatus { + if stream_name.is_null() { + return CcxShareStatus::Fail; + } + + let c_str = CStr::from_ptr(stream_name); + match c_str.to_str() { + Ok(name) => ccxr_share_stream_done(name), + Err(_) => CcxShareStatus::Fail, + } +} + +/// C-compatible function to convert subtitles to sub-entry messages. +#[no_mangle] +pub unsafe extern "C" fn _ccxr_share_sub_to_entries_c( + sub: *const CcSubtitle, + entries: *mut CcxSubEntries, +) -> CcxShareStatus { + if sub.is_null() || entries.is_null() { + return CcxShareStatus::Fail; + } + + // Dereference the pointers safely + let sub_ref = &*sub; + let entries_ref = &mut *entries; + + ccxr_share_sub_to_entries(sub_ref, entries_ref) +} From a4d04a6abf941acbada407b073ea225483db5a66 Mon Sep 17 00:00:00 2001 From: Deepnarayan Sett Date: Sun, 20 Apr 2025 13:18:42 +0530 Subject: [PATCH 2/3] feat: Share Module - squash commits --- .github/workflows/build_linux.yml | 2 +- .github/workflows/build_mac.yml | 2 +- docs/CHANGES.TXT | 1 + linux/Makefile.am | 2 +- mac/Makefile.am | 2 +- src/rust/Cargo.lock | 8 +- src/rust/Cargo.toml | 1 - src/rust/lib_ccxr/Cargo.lock | 1 - src/rust/lib_ccxr/Cargo.toml | 16 +- src/rust/lib_ccxr/src/lib.rs | 4 +- src/rust/lib_ccxr/src/share/functions.rs | 536 +++++++++++++++++++++++ src/rust/lib_ccxr/src/share/mod.rs | 4 +- src/rust/lib_ccxr/src/share/share.rs | 516 ---------------------- src/rust/lib_ccxr/src/share/tests.rs | 88 ++-- src/rust/src/libccxr_exports/mod.rs | 2 +- src/rust/src/libccxr_exports/share.rs | 11 +- 16 files changed, 625 insertions(+), 571 deletions(-) create mode 100644 src/rust/lib_ccxr/src/share/functions.rs delete mode 100644 src/rust/lib_ccxr/src/share/share.rs diff --git a/.github/workflows/build_linux.yml b/.github/workflows/build_linux.yml index cd3349ecb..153ca43d4 100644 --- a/.github/workflows/build_linux.yml +++ b/.github/workflows/build_linux.yml @@ -46,7 +46,7 @@ jobs: runs-on: ubuntu-latest steps: - name: Install dependencies - run: sudo apt update && sudo apt-get install libgpac-dev + run: sudo apt update && sudo apt-get install libgpac-dev libnanomsg-dev - uses: actions/checkout@v4 - name: run autogen run: ./autogen.sh diff --git a/.github/workflows/build_mac.yml b/.github/workflows/build_mac.yml index c81af5327..7dfebd82a 100644 --- a/.github/workflows/build_mac.yml +++ b/.github/workflows/build_mac.yml @@ -47,7 +47,7 @@ jobs: steps: - uses: actions/checkout@v4 - name: Install dependencies - run: brew install pkg-config autoconf automake libtool gpac + run: brew install pkg-config autoconf automake libtool gpac nanomsg - name: run autogen run: ./autogen.sh working-directory: ./mac diff --git a/docs/CHANGES.TXT b/docs/CHANGES.TXT index d6765ffb2..c9a615014 100644 --- a/docs/CHANGES.TXT +++ b/docs/CHANGES.TXT @@ -1291,3 +1291,4 @@ version of CCExtractor. - Added video information (as extracted from sequence header). - Some code clean-up. - FF sanity check enabled by default. +- Added Share Module to lib_ccxr \ No newline at end of file diff --git a/linux/Makefile.am b/linux/Makefile.am index 0b8c167fc..7491c1f7d 100644 --- a/linux/Makefile.am +++ b/linux/Makefile.am @@ -334,7 +334,7 @@ ccextractor_LDADD += $(LEPT_LIB) endif if WITH_RUST -ccextractor_LDADD += ./rust/@RUST_TARGET_SUBDIR@/libccx_rust.a +ccextractor_LDADD += ./rust/@RUST_TARGET_SUBDIR@/libccx_rust.a -lnanomsg else ccextractor_CFLAGS += -DDISABLE_RUST ccextractor_CPPFLAGS += -DDISABLE_RUST diff --git a/mac/Makefile.am b/mac/Makefile.am index 9870c07bf..13f0a039e 100644 --- a/mac/Makefile.am +++ b/mac/Makefile.am @@ -244,7 +244,7 @@ endif ccextractor_CFLAGS = -std=gnu99 -Wno-write-strings -Wno-pointer-sign -D_FILE_OFFSET_BITS=64 -DVERSION_FILE_PRESENT -DFT2_BUILD_LIBRARY -DGPAC_DISABLE_VTT -DGPAC_DISABLE_OD_DUMP -DGPAC_DISABLE_REMOTERY -DNO_GZIP -ccextractor_LDFLAGS = $(shell pkg-config --libs gpac) +ccextractor_LDFLAGS = $(shell pkg-config --libs gpac nanomsg) GPAC_CPPFLAGS = $(shell pkg-config --cflags gpac) ccextractor_CPPFLAGS =-I../src/lib_ccx/ -I../src/thirdparty/libpng/ -I../src/thirdparty/zlib/ -I../src/lib_ccx/zvbi/ -I../src/thirdparty/lib_hash/ -I../src/thirdparty/protobuf-c/ -I../src/thirdparty -I../src/ -I../src/thirdparty/freetype/include/ diff --git a/src/rust/Cargo.lock b/src/rust/Cargo.lock index ac1ab2615..277a657b9 100644 --- a/src/rust/Cargo.lock +++ b/src/rust/Cargo.lock @@ -182,7 +182,6 @@ dependencies = [ "iconv", "leptonica-sys", "lib_ccxr", - "libc", "log", "num-integer", "palette", @@ -648,7 +647,6 @@ dependencies = [ "crc32fast", "derive_more", "lazy_static", - "libc", "nanomsg-sys", "num_enum", "prost", @@ -661,9 +659,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.172" +version = "0.2.170" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d750af042f7ef4f724306de029d18836c26c1765a54a6a3f094cbd23a7267ffa" +checksum = "875b3680cb2f8f71bdcf9a30f38d48282f5d3c95cbf9b3fa57269bb5d5c06828" [[package]] name = "libloading" @@ -1517,4 +1515,4 @@ dependencies = [ "proc-macro2", "quote", "syn 2.0.99", -] +] \ No newline at end of file diff --git a/src/rust/Cargo.toml b/src/rust/Cargo.toml index da6601ee9..b94861db2 100644 --- a/src/rust/Cargo.toml +++ b/src/rust/Cargo.toml @@ -28,7 +28,6 @@ cfg-if = "1.0.0" num-integer = "0.1.46" lib_ccxr = { path = "lib_ccxr" } url = "2.5.4" -libc = "0.2.172" [build-dependencies] bindgen = "0.64.0" diff --git a/src/rust/lib_ccxr/Cargo.lock b/src/rust/lib_ccxr/Cargo.lock index 7b0338bae..fa7a432fa 100644 --- a/src/rust/lib_ccxr/Cargo.lock +++ b/src/rust/lib_ccxr/Cargo.lock @@ -309,7 +309,6 @@ dependencies = [ "crc32fast", "derive_more", "lazy_static", - "libc", "nanomsg-sys", "num_enum", "prost", diff --git a/src/rust/lib_ccxr/Cargo.toml b/src/rust/lib_ccxr/Cargo.toml index 7d0bcb141..dae77c866 100644 --- a/src/rust/lib_ccxr/Cargo.toml +++ b/src/rust/lib_ccxr/Cargo.toml @@ -17,18 +17,16 @@ crc32fast = "1.4.2" num_enum = "0.6.1" prost = "0.13.5" lazy_static = "1.5.0" -nanomsg-sys = "0.7.2" -libc = "0.2.172" - +nanomsg-sys = { version = "0.7.2", optional = true, default-features = false, features = ["bundled"] } [features] default = [ - "enable_sharing", - "wtv_debug", - "enable_ffmpeg", - "debug", - "with_libcurl", + "enable_sharing", + "wtv_debug", + "enable_ffmpeg", + "debug", + "with_libcurl", ] -enable_sharing = [] +enable_sharing = ["nanomsg-sys"] wtv_debug = [] enable_ffmpeg = [] debug_out = [] diff --git a/src/rust/lib_ccxr/src/lib.rs b/src/rust/lib_ccxr/src/lib.rs index d789efdee..1e5d1eb33 100644 --- a/src/rust/lib_ccxr/src/lib.rs +++ b/src/rust/lib_ccxr/src/lib.rs @@ -1,9 +1,9 @@ pub mod activity; pub mod common; pub mod hardsubx; +#[cfg(feature = "enable_sharing")] +pub mod share; pub mod subtitle; pub mod teletext; pub mod time; pub mod util; -#[cfg(feature = "enable_sharing")] -pub mod share; \ No newline at end of file diff --git a/src/rust/lib_ccxr/src/share/functions.rs b/src/rust/lib_ccxr/src/share/functions.rs new file mode 100644 index 000000000..8329f5990 --- /dev/null +++ b/src/rust/lib_ccxr/src/share/functions.rs @@ -0,0 +1,536 @@ +#[cfg(feature = "enable_sharing")] +pub mod sharing { + use crate::common::Options; + use crate::share::ccxr_sub_entry_message::CcxSubEntryMessage; + use crate::util::log::{debug, DebugMessageFlag}; + use lazy_static::lazy_static; + use nanomsg_sys::{ + nn_bind, nn_send, nn_setsockopt, nn_shutdown, nn_socket, AF_SP, NN_LINGER, NN_PUB, + NN_SOL_SOCKET, + }; + use prost::Message; + use std::cmp::PartialEq; + use std::ffi::c_void; + use std::os::raw::{c_char, c_int}; + use std::sync::{LazyLock, Mutex}; + use std::{ + ffi::{CStr, CString}, + thread, + time::Duration, + }; + + pub const CCX_DECODER_608_SCREEN_ROWS: usize = 15; + pub const CCX_DECODER_608_SCREEN_WIDTH: usize = 32; + pub static CCX_OPTIONS: LazyLock> = + LazyLock::new(|| Mutex::new(Options::default())); + + #[repr(C)] + #[derive(Debug, Copy, Clone, PartialEq, Eq)] + pub enum CcxEia608Format { + SFormatCcScreen = 0, + SFormatCcLine = 1, + SFormatXds = 2, + } + + #[repr(C)] + #[derive(Debug, Copy, Clone, PartialEq, Eq)] + pub enum CcModes { + ModePopOn = 0, + ModeRollUp2 = 1, + ModeRollUp3 = 2, + ModeRollUp4 = 3, + ModeText = 4, + ModePaintOn = 5, + ModeFakeRollUp1 = 100, + } + + #[repr(C)] + #[derive(Debug, Copy, Clone, PartialEq, Eq)] + pub enum CcxDecoder608ColorCode { + White = 0, + Green = 1, + Blue = 2, + Cyan = 3, + Red = 4, + Yellow = 5, + Magenta = 6, + UserDefined = 7, + Black = 8, + Transparent = 9, + Max = 10, + } + + #[repr(C)] + #[derive(Debug, Copy, Clone, PartialEq, Eq)] + pub enum FontBits { + Normal = 0, + Italics = 1, + Underline = 2, + UnderlineItalics = 3, + } + + #[repr(C)] + #[derive(Debug, Clone)] + pub struct Eia608Screen { + pub format: CcxEia608Format, + pub characters: [[c_char; CCX_DECODER_608_SCREEN_WIDTH + 1]; CCX_DECODER_608_SCREEN_ROWS], + pub colors: [[CcxDecoder608ColorCode; CCX_DECODER_608_SCREEN_WIDTH + 1]; + CCX_DECODER_608_SCREEN_ROWS], + pub fonts: [[FontBits; CCX_DECODER_608_SCREEN_WIDTH + 1]; CCX_DECODER_608_SCREEN_ROWS], + pub row_used: [c_int; CCX_DECODER_608_SCREEN_ROWS], + pub empty: c_int, + pub start_time: i64, + pub end_time: i64, + pub mode: CcModes, + pub channel: c_int, + pub my_field: c_int, + pub xds_str: *mut c_char, + pub xds_len: usize, + pub cur_xds_packet_class: c_int, + } + + impl Default for Eia608Screen { + fn default() -> Self { + Self { + format: CcxEia608Format::SFormatCcScreen, + characters: [[0; CCX_DECODER_608_SCREEN_WIDTH + 1]; CCX_DECODER_608_SCREEN_ROWS], + colors: [[CcxDecoder608ColorCode::Black; CCX_DECODER_608_SCREEN_WIDTH + 1]; + CCX_DECODER_608_SCREEN_ROWS], + fonts: [[FontBits::Normal; CCX_DECODER_608_SCREEN_WIDTH + 1]; + CCX_DECODER_608_SCREEN_ROWS], + row_used: [0; CCX_DECODER_608_SCREEN_ROWS], + empty: 1, + start_time: 0, + end_time: 0, + mode: CcModes::ModePopOn, + channel: 0, + my_field: 0, + xds_str: std::ptr::null_mut(), + xds_len: 0, + cur_xds_packet_class: 0, + } + } + } + + impl Eia608Screen { + pub fn new() -> Self { + Self::default() + } + + pub fn set_xds_str(&mut self, xds: &str) { + let c_string = CString::new(xds).expect("CString::new failed"); + self.xds_str = c_string.into_raw(); + self.xds_len = xds.len(); + } + + pub fn free_xds_str(&mut self) { + if !self.xds_str.is_null() { + unsafe { + let _ = CString::from_raw(self.xds_str); + } + self.xds_str = std::ptr::null_mut(); + self.xds_len = 0; + } + } + } + + impl Drop for Eia608Screen { + fn drop(&mut self) { + self.free_xds_str(); + } + } + + pub type SubDataType = std::os::raw::c_uint; + pub type LLONG = i64; + pub const SUBTYPE_CC_BITMAP: Subtype = 0; + pub const SUBTYPE_CC_608: Subtype = 1; + pub const SUBTYPE_CC_TEXT: Subtype = 2; + pub const SUBTYPE_CC_RAW: Subtype = 3; + pub type Subtype = std::os::raw::c_uint; + + pub type CcxEncodingType = std::os::raw::c_uint; + + pub struct CcSubtitle { + #[doc = " A generic data which contain data according to decoder\n @warn decoder cant output multiple types of data"] + pub data: *mut std::os::raw::c_void, + pub datatype: SubDataType, + #[doc = " number of data"] + pub nb_data: std::os::raw::c_uint, + #[doc = " type of subtitle"] + pub type_: Subtype, + #[doc = " Encoding type of Text, must be ignored in case of subtype as bitmap or cc_screen"] + pub enc_type: CcxEncodingType, + pub start_time: LLONG, + pub end_time: LLONG, + pub flags: c_int, + pub lang_index: c_int, + #[doc = " flag to tell that decoder has given output"] + pub got_output: c_int, + pub mode: [c_char; 5usize], + pub info: [c_char; 4usize], + #[doc = " Used for DVB end time in ms"] + pub time_out: c_int, + pub next: *mut CcSubtitle, + pub prev: *mut CcSubtitle, + } + + #[repr(C)] + #[derive(Debug)] + pub enum CcxShareStatus { + Ok = 0, + Fail, + } + impl PartialEq for CcxShareStatus { + fn eq(&self, other: &Self) -> bool { + matches!( + (self, other), + (CcxShareStatus::Ok, CcxShareStatus::Ok) + | (CcxShareStatus::Fail, CcxShareStatus::Fail) + ) + } + } + pub struct CcxShareServiceCtx { + counter: i64, + stream_name: Option, + nn_sock: c_int, + nn_binder: c_int, + } + + pub struct CcxSubEntries { + pub messages: Vec, + } + impl CcxShareServiceCtx { + fn new() -> Self { + CcxShareServiceCtx { + counter: 0, + stream_name: None, + nn_sock: 0, + nn_binder: 0, + } + } + } + + lazy_static! { + pub static ref CCX_SHARE_CTX: Mutex = + Mutex::new(CcxShareServiceCtx::new()); + } + + pub fn ccxr_sub_entry_msg_init(msg: &mut CcxSubEntryMessage) { + msg.eos = 0; + msg.stream_name = "".parse().unwrap(); + msg.counter = 0; + msg.start_time = 0; + msg.end_time = 0; + msg.lines.clear(); + } + + pub fn ccxr_sub_entry_msg_cleanup(msg: &mut CcxSubEntryMessage) { + msg.lines.clear(); + msg.stream_name = "".parse().unwrap(); + } + + pub fn ccxr_sub_entry_msg_print(msg: &CcxSubEntryMessage) { + if msg.lines.is_empty() { + debug!(msg_type = DebugMessageFlag::SHARE; "[share] no lines allocated"); + return; + } + + debug!(msg_type = DebugMessageFlag::SHARE; "\n[share] sub msg #{}", msg.counter); + if !msg.stream_name.is_empty() { + debug!(msg_type = DebugMessageFlag::SHARE; "[share] name: {}", msg.stream_name); + } else { + debug!(msg_type = DebugMessageFlag::SHARE; "[share] name: None"); + } + debug!(msg_type = DebugMessageFlag::SHARE; "[share] start: {}", msg.start_time); + debug!(msg_type = DebugMessageFlag::SHARE; "[share] end: {}", msg.end_time); + debug!(msg_type = DebugMessageFlag::SHARE; "[share] lines count: {}", msg.lines.len()); + + if msg.lines.is_empty() { + debug!(msg_type = DebugMessageFlag::SHARE; "[share] no lines allocated"); + return; + } + for (i, line) in msg.lines.iter().enumerate() { + if !line.is_empty() { + debug!(msg_type = DebugMessageFlag::SHARE; "[share] line[{}]: {}", i, line); + } else { + debug!(msg_type = DebugMessageFlag::SHARE; "[share] line[{}] is not allocated", i); + } + } + } + + pub fn ccxr_sub_entries_cleanup(entries: &mut CcxSubEntries) { + entries.messages.clear(); + } + + pub fn ccxr_sub_entries_print(entries: &CcxSubEntries) { + eprintln!( + "[share] ccxr_sub_entries_print ({} entries)", + entries.messages.len() + ); + for message in &entries.messages { + ccxr_sub_entry_msg_print(message); + } + } + /// # Safety + /// This function is unsafe as it calls unsafe functions like nn_socket and nn_bind. + pub unsafe fn ccxr_share_start(stream_name: Option<&str>) -> CcxShareStatus { + let mut ccx_options = CCX_OPTIONS.lock().unwrap(); + + // Debug print similar to dbg_print in C + debug!(msg_type = DebugMessageFlag::SHARE; "[share] ccx_share_start: starting service\n"); + + // Create a nanomsg socket with domain AF_SP and protocol NN_PUB + let nn_sock = nn_socket(AF_SP, NN_PUB); + if nn_sock < 0 { + debug!(msg_type = DebugMessageFlag::SHARE; "[share] ccx_share_start: can't nn_socket()\n"); + return CcxShareStatus::Fail; + } + CCX_SHARE_CTX.lock().unwrap().nn_sock = nn_sock; + // Set a default URL if one was not already provided. + if ccx_options.sharing_url.is_none() { + ccx_options.sharing_url = Some("tcp://*:3269".to_string().parse().unwrap()); + } + + // Convert the sharing URL into a C-compatible string. + let url = ccx_options.sharing_url.as_ref().unwrap(); + let sharing_url_cstr = CString::new(url.as_str()).expect("Failed to create CString"); + debug!(msg_type = DebugMessageFlag::SHARE; "[share] ccx_share_start: url={}", ccx_options.sharing_url.as_mut().unwrap()); + + // Bind the socket to the URL. + let nn_binder = nn_bind(nn_sock, sharing_url_cstr.as_ptr()); + if nn_binder < 0 { + debug!(msg_type = DebugMessageFlag::SHARE; "[share] ccx_share_start: can't nn_bind()"); + return CcxShareStatus::Fail; + } + CCX_SHARE_CTX.lock().unwrap().nn_binder = nn_binder; + + // Set the linger socket option to -1. + let linger: i32 = -1; + let ret = nn_setsockopt( + nn_sock, + NN_SOL_SOCKET, + NN_LINGER, + &linger as *const _ as *const c_void, + size_of::(), + ); + if ret < 0 { + debug!(msg_type = DebugMessageFlag::SHARE; "[share] ccx_share_start: can't nn_setsockopt()"); + return CcxShareStatus::Fail; + } + + // Save the stream name into the context, defaulting to "unknown" if not provided. + CCX_SHARE_CTX.lock().unwrap().stream_name = + Some(stream_name.unwrap_or("unknown").to_string()); + + // Sleep for 1 second to allow subscribers to connect. + thread::sleep(Duration::from_secs(1)); + + CcxShareStatus::Ok + } + /// # Safety + /// This function is unsafe as it calls unsafe functions like nn_shutdown. + pub unsafe fn ccxr_share_stop() -> CcxShareStatus { + let mut ctx = CCX_SHARE_CTX.lock().unwrap(); + + debug!(msg_type = DebugMessageFlag::SHARE; "[share] ccx_share_stop: stopping service"); + + nn_shutdown( + CCX_SHARE_CTX.lock().unwrap().nn_sock, + CCX_SHARE_CTX.lock().unwrap().nn_binder, + ); + ctx.stream_name = None; + CcxShareStatus::Ok + } + /// # Safety + /// This function is unsafe as it calls unsafe functions like _ccxr_share_send. + pub unsafe fn ccxr_share_send(sub: *mut CcSubtitle) -> CcxShareStatus { + debug!(msg_type = DebugMessageFlag::SHARE; "[share] ccx_share_send: sending"); + + // Create an entries structure and populate it from the subtitle. + let mut entries = CcxSubEntries { + messages: Vec::new(), + }; + if ccxr_share_sub_to_entries(&*sub, &mut entries) == CcxShareStatus::Fail { + debug!(msg_type = DebugMessageFlag::SHARE; "[share] failed to convert subtitle to entries"); + return CcxShareStatus::Fail; + } + + // Debug print of entries. + ccxr_sub_entries_print(&entries); + debug!(msg_type = DebugMessageFlag::SHARE; "[share] entry obtained:"); + + // Iterate over all entries and send them. + for (i, message) in entries.messages.iter().enumerate() { + debug!(msg_type = DebugMessageFlag::SHARE; "[share] ccx_share_send: sending entry {}", i); + if entries.messages[i].lines.is_empty() { + debug!(msg_type = DebugMessageFlag::SHARE; "[share] skipping empty message"); + continue; + } + if _ccxr_share_send(message) != CcxShareStatus::Ok { + debug!(msg_type = DebugMessageFlag::SHARE; "[share] can't send message"); + return CcxShareStatus::Fail; + } + } + + ccxr_sub_entries_cleanup(&mut entries); + CcxShareStatus::Ok + } + + pub fn ccxr_sub_entry_message_get_packed_size(message: &CcxSubEntryMessage) -> usize { + message.encoded_len() + } + + pub fn ccxr_sub_entry_message_pack( + message: &CcxSubEntryMessage, + buf: &mut Vec, + ) -> Result<(), prost::EncodeError> { + message.encode(buf) + } + + /// # Safety + /// This function is unsafe as it calls unsafe functions like nn_send + pub unsafe fn _ccxr_share_send(msg: &CcxSubEntryMessage) -> CcxShareStatus { + debug!(msg_type = DebugMessageFlag::SHARE; "[share] _ccx_share_send"); + + let len: usize = ccxr_sub_entry_message_get_packed_size(msg); + + // Allocate a buffer to hold the packed message. + let mut buf = Vec::with_capacity(len); + if buf.is_empty() { + debug!(msg_type = DebugMessageFlag::SHARE; "[share] _ccx_share_send: malloc failed"); + return CcxShareStatus::Fail; + } + + debug!(msg_type = DebugMessageFlag::SHARE; "[share] _ccx_share_send: packing"); + ccxr_sub_entry_message_pack(msg, &mut buf).expect("Failed to pack message"); + + debug!(msg_type = DebugMessageFlag::SHARE; "[share] _ccx_share_send: sending"); + let sent: c_int = nn_send( + CCX_SHARE_CTX.lock().unwrap().nn_sock, + buf.as_ptr() as *const c_void, + len, + 0, + ); + if sent != len as c_int { + buf.clear(); + debug!(msg_type = DebugMessageFlag::SHARE; "[share] _ccx_share_send: len={} sent={}", len, sent); + return CcxShareStatus::Fail; + } + buf.clear(); + debug!(msg_type = DebugMessageFlag::SHARE; "[share] _ccx_share_send: sent"); + CcxShareStatus::Ok + } + + /// # Safety + /// This function is unsafe as it calls unsafe functions like _ccxr_share_send + pub unsafe fn ccxr_share_stream_done(stream_name: &str) -> CcxShareStatus { + let mut msg = CcxSubEntryMessage { + eos: 1, + stream_name: stream_name.parse().unwrap(), + counter: 0, + start_time: 0, + end_time: 0, + lines: Vec::new(), + }; + #[allow(unused)] + let mut ctx = CCX_SHARE_CTX.lock().unwrap(); + + if _ccxr_share_send(&msg) != CcxShareStatus::Ok { + ccxr_sub_entry_msg_cleanup(&mut msg); + debug!(msg_type = DebugMessageFlag::SHARE; "[share] ccx_share_stream_done: can't send message"); + return CcxShareStatus::Fail; + } + debug!(msg_type = DebugMessageFlag::SHARE; "[share] ccx_share_stream_done: message sent successfully"); + CcxShareStatus::Ok + } + + pub fn ccxr_share_sub_to_entries( + sub: &CcSubtitle, + entries: &mut CcxSubEntries, + ) -> CcxShareStatus { + unsafe { + let mut ctx = CCX_SHARE_CTX.lock().unwrap(); + + debug!(msg_type = DebugMessageFlag::SHARE; "[share] _ccx_share_sub_to_entries"); + + if sub.type_ == SUBTYPE_CC_608 { + debug!(msg_type = DebugMessageFlag::SHARE; "[share] CC_608"); + + let data_ptr = sub.data as *const Eia608Screen; + let mut nb_data = sub.nb_data; + + while nb_data > 0 { + let data = &*data_ptr.add(sub.nb_data as usize - nb_data as usize); + + debug!(msg_type = DebugMessageFlag::SHARE; "[share] data item"); + + if data.format == CcxEia608Format::SFormatXds { + debug!(msg_type = DebugMessageFlag::SHARE; "[share] XDS. Skipping"); + nb_data -= 1; + continue; + } + + if data.start_time == 0 { + debug!(msg_type = DebugMessageFlag::SHARE; "[share] No start time. Skipping"); + break; + } + + entries.messages.push(CcxSubEntryMessage { + eos: 0, + stream_name: String::new(), + counter: ctx.counter + 1, + start_time: data.start_time, + end_time: data.end_time, + lines: Vec::new(), + }); + + let entry_index = entries.messages.len() - 1; + let entry = &mut entries.messages[entry_index]; + + for row in 0..CCX_DECODER_608_SCREEN_ROWS { + if data.row_used[row] != 0 { + let characters = CStr::from_ptr(data.characters[row].as_ptr()) + .to_string_lossy() + .to_string(); + entry.lines.push(characters); + } + } + + if entry.lines.is_empty() { + debug!(msg_type = DebugMessageFlag::SHARE; "[share] buffer is empty"); + entries.messages.pop(); + return CcxShareStatus::Ok; + } + + debug!( + msg_type = DebugMessageFlag::SHARE; + "[share] Copied {} lines", entry.lines.len() + ); + + ctx.counter += 1; + nb_data -= 1; + + debug!(msg_type = DebugMessageFlag::SHARE; "[share] item done"); + } + } else { + match sub.type_ { + SUBTYPE_CC_BITMAP => { + debug!(msg_type = DebugMessageFlag::SHARE; "[share] CC_BITMAP. Skipping"); + } + SUBTYPE_CC_RAW => { + debug!(msg_type = DebugMessageFlag::SHARE; "[share] CC_RAW. Skipping"); + } + SUBTYPE_CC_TEXT => { + debug!(msg_type = DebugMessageFlag::SHARE; "[share] CC_TEXT. Skipping"); + } + _ => { + debug!(msg_type = DebugMessageFlag::SHARE; "[share] Unknown subtitle type"); + } + } + } + + debug!(msg_type = DebugMessageFlag::SHARE; "[share] done"); + CcxShareStatus::Ok + } + } +} diff --git a/src/rust/lib_ccxr/src/share/mod.rs b/src/rust/lib_ccxr/src/share/mod.rs index 8938af992..d981ff34c 100644 --- a/src/rust/lib_ccxr/src/share/mod.rs +++ b/src/rust/lib_ccxr/src/share/mod.rs @@ -1,3 +1,3 @@ -pub mod share; pub mod ccxr_sub_entry_message; -mod tests; +pub mod functions; +pub mod tests; diff --git a/src/rust/lib_ccxr/src/share/share.rs b/src/rust/lib_ccxr/src/share/share.rs deleted file mode 100644 index 31c4138fa..000000000 --- a/src/rust/lib_ccxr/src/share/share.rs +++ /dev/null @@ -1,516 +0,0 @@ -use crate::common::Options; -use crate::share::ccxr_sub_entry_message::CcxSubEntryMessage; -use crate::util::log::{debug, DebugMessageFlag}; -use lazy_static::lazy_static; -use libc::size_t; -use nanomsg_sys::{nn_bind, nn_send, nn_setsockopt, nn_shutdown, nn_socket, AF_SP, NN_LINGER, NN_PUB, NN_SOL_SOCKET}; -use prost::Message; -use std::cmp::PartialEq; -use std::ffi::c_void; -use std::os::raw::{c_char, c_int}; -use std::sync::{LazyLock, Mutex}; -use std::{ffi::{CStr, CString}, thread, time::Duration}; -// use crate::bindings::{cc_subtitle, ccx_output_format}; - -pub const CCX_DECODER_608_SCREEN_ROWS: usize = 15; -pub const CCX_DECODER_608_SCREEN_WIDTH: usize = 32; -pub static CCX_OPTIONS: LazyLock> = LazyLock::new(|| Mutex::new(Options::default())); - -#[repr(C)] -#[derive(Debug, Copy, Clone, PartialEq, Eq)] -pub enum CcxEia608Format { - SFormatCcScreen = 0, - SFormatCcLine = 1, - SFormatXds = 2, -} - -#[repr(C)] -#[derive(Debug, Copy, Clone, PartialEq, Eq)] -pub enum CcModes { - ModePopOn = 0, - ModeRollUp2 = 1, - ModeRollUp3 = 2, - ModeRollUp4 = 3, - ModeText = 4, - ModePaintOn = 5, - ModeFakeRollUp1 = 100, -} - - -#[repr(C)] -#[derive(Debug, Copy, Clone, PartialEq, Eq)] -pub enum CcxDecoder608ColorCode { - White = 0, - Green = 1, - Blue = 2, - Cyan = 3, - Red = 4, - Yellow = 5, - Magenta = 6, - UserDefined = 7, - Black = 8, - Transparent = 9, - Max = 10, -} - -#[repr(C)] -#[derive(Debug, Copy, Clone, PartialEq, Eq)] -pub enum FontBits { - Normal = 0, - Italics = 1, - Underline = 2, - UnderlineItalics = 3, -} - - -#[repr(C)] -#[derive(Debug, Clone)] -pub struct Eia608Screen { - pub format: CcxEia608Format, - pub characters: [[c_char; CCX_DECODER_608_SCREEN_WIDTH + 1]; CCX_DECODER_608_SCREEN_ROWS], - pub colors: [[CcxDecoder608ColorCode; CCX_DECODER_608_SCREEN_WIDTH + 1]; CCX_DECODER_608_SCREEN_ROWS], - pub fonts: [[FontBits; CCX_DECODER_608_SCREEN_WIDTH + 1]; CCX_DECODER_608_SCREEN_ROWS], - pub row_used: [c_int; CCX_DECODER_608_SCREEN_ROWS], - pub empty: c_int, - pub start_time: i64, - pub end_time: i64, - pub mode: CcModes, - pub channel: c_int, - pub my_field: c_int, - pub xds_str: *mut c_char, - pub xds_len: usize, - pub cur_xds_packet_class: c_int, -} - -impl Default for Eia608Screen { - fn default() -> Self { - Self { - format: CcxEia608Format::SFormatCcScreen, - characters: [[0; CCX_DECODER_608_SCREEN_WIDTH + 1]; CCX_DECODER_608_SCREEN_ROWS], - colors: [[CcxDecoder608ColorCode::Black; CCX_DECODER_608_SCREEN_WIDTH + 1]; CCX_DECODER_608_SCREEN_ROWS], - fonts: [[FontBits::Normal; CCX_DECODER_608_SCREEN_WIDTH + 1]; CCX_DECODER_608_SCREEN_ROWS], - row_used: [0; CCX_DECODER_608_SCREEN_ROWS], - empty: 1, - start_time: 0, - end_time: 0, - mode: CcModes::ModePopOn, - channel: 0, - my_field: 0, - xds_str: std::ptr::null_mut(), - xds_len: 0, - cur_xds_packet_class: 0, - } - } -} - -impl Eia608Screen { - pub fn new() -> Self { - Self::default() - } - - pub fn set_xds_str(&mut self, xds: &str) { - let c_string = CString::new(xds).expect("CString::new failed"); - self.xds_str = c_string.into_raw(); - self.xds_len = xds.len(); - } - - pub fn free_xds_str(&mut self) { - if !self.xds_str.is_null() { - unsafe { - let _ = CString::from_raw(self.xds_str); - } - self.xds_str = std::ptr::null_mut(); - self.xds_len = 0; - } - } -} - -impl Drop for Eia608Screen { - fn drop(&mut self) { - self.free_xds_str(); - } -} - - -pub type SubDataType = std::os::raw::c_uint; -pub type LLONG = i64; -pub const SUBTYPE_CC_BITMAP: Subtype = 0; -pub const SUBTYPE_CC_608: Subtype = 1; -pub const SUBTYPE_CC_TEXT: Subtype = 2; -pub const SUBTYPE_CC_RAW: Subtype = 3; -pub type Subtype = std::os::raw::c_uint; - -pub type CcxEncodingType = std::os::raw::c_uint; - -pub struct CcSubtitle { - #[doc = " A generic data which contain data according to decoder\n @warn decoder cant output multiple types of data"] - pub data: *mut std::os::raw::c_void, - pub datatype: SubDataType, - #[doc = " number of data"] - pub nb_data: std::os::raw::c_uint, - #[doc = " type of subtitle"] - pub type_: Subtype, - #[doc = " Encoding type of Text, must be ignored in case of subtype as bitmap or cc_screen"] - pub enc_type: CcxEncodingType, - pub start_time: LLONG, - pub end_time: LLONG, - pub flags: c_int, - pub lang_index: c_int, - #[doc = " flag to tell that decoder has given output"] - pub got_output: c_int, - pub mode: [c_char; 5usize], - pub info: [c_char; 4usize], - #[doc = " Used for DVB end time in ms"] - pub time_out: c_int, - pub next: *mut CcSubtitle, - pub prev: *mut CcSubtitle, -} - -#[repr(C)] -#[derive(Debug)] -pub enum CcxShareStatus { - Ok = 0, - Fail, -} -impl PartialEq for CcxShareStatus { - fn eq(&self, other: &Self) -> bool { - match (self, other) { - (CcxShareStatus::Ok, CcxShareStatus::Ok) => true, - (CcxShareStatus::Fail, CcxShareStatus::Fail) => true, - _ => false, - } - } -} -pub struct CcxShareServiceCtx { - counter: i64, - stream_name: Option, - nn_sock: libc::c_int, - nn_binder: libc::c_int, -} - -pub struct CcxSubEntries { - pub messages: Vec, -} -impl CcxShareServiceCtx { - fn new() -> Self { - CcxShareServiceCtx { - counter: 0, - stream_name: None, - nn_sock: 0, - nn_binder: 0, - } - } -} - - -lazy_static! { - pub static ref CCX_SHARE_CTX: Mutex = Mutex::new(CcxShareServiceCtx::new()); -} - - - -pub fn ccxr_sub_entry_msg_init(msg: &mut CcxSubEntryMessage) { - msg.eos = 0; - msg.stream_name = "".parse().unwrap(); - msg.counter = 0; - msg.start_time = 0; - msg.end_time = 0; - msg.lines.clear(); -} - - -pub fn ccxr_sub_entry_msg_cleanup(msg: &mut CcxSubEntryMessage) { - msg.lines.clear(); - msg.stream_name = "".parse().unwrap(); -} - -pub fn ccxr_sub_entry_msg_print(msg: &CcxSubEntryMessage) { - if msg.lines.is_empty() { - debug!(msg_type = DebugMessageFlag::SHARE; "[share] no lines allocated"); - return; - } - - debug!(msg_type = DebugMessageFlag::SHARE; "\n[share] sub msg #{}", msg.counter); - if !msg.stream_name.is_empty() { - debug!(msg_type = DebugMessageFlag::SHARE; "[share] name: {}", msg.stream_name); - } else { - debug!(msg_type = DebugMessageFlag::SHARE; "[share] name: None"); - } - debug!(msg_type = DebugMessageFlag::SHARE; "[share] start: {}", msg.start_time); - debug!(msg_type = DebugMessageFlag::SHARE; "[share] end: {}", msg.end_time); - debug!(msg_type = DebugMessageFlag::SHARE; "[share] lines count: {}", msg.lines.len()); - - if msg.lines.is_empty() { - debug!(msg_type = DebugMessageFlag::SHARE; "[share] no lines allocated"); - return; - } - for (i, line) in msg.lines.iter().enumerate() { - if !line.is_empty() { - debug!(msg_type = DebugMessageFlag::SHARE; "[share] line[{}]: {}", i, line); - } else { - debug!(msg_type = DebugMessageFlag::SHARE; "[share] line[{}] is not allocated", i); - } - } -} - - -pub fn ccxr_sub_entries_cleanup(entries: &mut CcxSubEntries) { - entries.messages.clear(); -} - -pub fn ccxr_sub_entries_print(entries: &CcxSubEntries) { - eprintln!("[share] ccxr_sub_entries_print ({} entries)", entries.messages.len()); - for message in &entries.messages { - ccxr_sub_entry_msg_print(message); - } -} - -pub unsafe fn ccxr_share_start(stream_name: Option<&str>) -> CcxShareStatus { - let mut ccx_options = CCX_OPTIONS.lock().unwrap(); - - // Debug print similar to dbg_print in C - debug!(msg_type = DebugMessageFlag::SHARE; "[share] ccx_share_start: starting service\n"); - - // Create a nanomsg socket with domain AF_SP and protocol NN_PUB - let nn_sock = nn_socket(AF_SP, NN_PUB); - if nn_sock < 0 { - debug!(msg_type = DebugMessageFlag::SHARE; "[share] ccx_share_start: can't nn_socket()\n"); - return CcxShareStatus::Fail; - } - CCX_SHARE_CTX.lock().unwrap().nn_sock = nn_sock; - // Set a default URL if one was not already provided. - if ccx_options.sharing_url.is_none() { - ccx_options.sharing_url = Some("tcp://*:3269".to_string().parse().unwrap()); - } - - // Convert the sharing URL into a C-compatible string. - let url = ccx_options.sharing_url.as_ref().unwrap(); - let sharing_url_cstr = CString::new(url.as_str()).expect("Failed to create CString"); - debug!(msg_type = DebugMessageFlag::SHARE; "[share] ccx_share_start: url={}", ccx_options.sharing_url.as_mut().unwrap()); - - // Bind the socket to the URL. - let nn_binder = nn_bind(nn_sock, sharing_url_cstr.as_ptr()); - if nn_binder < 0 { - debug!(msg_type = DebugMessageFlag::SHARE; "[share] ccx_share_start: can't nn_bind()"); - return CcxShareStatus::Fail; - } - CCX_SHARE_CTX.lock().unwrap().nn_binder = nn_binder; - - // Set the linger socket option to -1. - let linger: i32 = -1; - let ret = nn_setsockopt( - nn_sock, - NN_SOL_SOCKET, - NN_LINGER, - &linger as *const _ as *const c_void, - std::mem::size_of::() as size_t, - ); - if ret < 0 { - debug!(msg_type = DebugMessageFlag::SHARE; "[share] ccx_share_start: can't nn_setsockopt()"); - return CcxShareStatus::Fail; - } - - // Save the stream name into the context, defaulting to "unknown" if not provided. - CCX_SHARE_CTX.lock().unwrap().stream_name = Some(stream_name.unwrap_or("unknown").to_string()); - - // Sleep for 1 second to allow subscribers to connect. - thread::sleep(Duration::from_secs(1)); - - CcxShareStatus::Ok -} - - -pub unsafe fn ccxr_share_stop() -> CcxShareStatus { - let mut ctx = CCX_SHARE_CTX.lock().unwrap(); - - debug!(msg_type = DebugMessageFlag::SHARE; "[share] ccx_share_stop: stopping service"); - - - nn_shutdown(CCX_SHARE_CTX.lock().unwrap().nn_sock, CCX_SHARE_CTX.lock().unwrap().nn_binder); - ctx.stream_name = None; - CcxShareStatus::Ok -} - - -pub unsafe fn ccxr_share_send(sub: *mut CcSubtitle) -> CcxShareStatus { - debug!(msg_type = DebugMessageFlag::SHARE; "[share] ccx_share_send: sending"); - - // Create an entries structure and populate it from the subtitle. - let mut entries = CcxSubEntries { messages: Vec::new() }; - if ccxr_share_sub_to_entries( &*sub , &mut entries) == CcxShareStatus::Fail { - debug!(msg_type = DebugMessageFlag::SHARE; "[share] failed to convert subtitle to entries"); - return CcxShareStatus::Fail; - } - - // Debug print of entries. - ccxr_sub_entries_print(&entries); - debug!(msg_type = DebugMessageFlag::SHARE; "[share] entry obtained:"); - - // Iterate over all entries and send them. - for (i, message) in entries.messages.iter().enumerate() { - debug!(msg_type = DebugMessageFlag::SHARE; "[share] ccx_share_send: sending entry {}", i); - if entries.messages[i].lines.is_empty() { - debug!(msg_type = DebugMessageFlag::SHARE; "[share] skipping empty message"); - continue; - } - if _ccxr_share_send(message) != CcxShareStatus::Ok { - debug!(msg_type = DebugMessageFlag::SHARE; "[share] can't send message"); - return CcxShareStatus::Fail; - } - } - - ccxr_sub_entries_cleanup(&mut entries); - CcxShareStatus::Ok -} - -pub fn ccxr_sub_entry_message_get_packed_size(message: &CcxSubEntryMessage) -> usize { - message.encoded_len() -} - -pub fn ccxr_sub_entry_message_pack(message: &CcxSubEntryMessage, buf: &mut Vec) -> Result<(), prost::EncodeError> { - message.encode(buf) -} - - - -pub unsafe fn _ccxr_share_send(msg: &CcxSubEntryMessage) -> CcxShareStatus { - debug!(msg_type = DebugMessageFlag::SHARE; "[share] _ccx_share_send"); - - let len: usize = ccxr_sub_entry_message_get_packed_size(msg); - - // Allocate a buffer to hold the packed message. - let mut buf = Vec::with_capacity(len); - if buf.is_empty() { - debug!(msg_type = DebugMessageFlag::SHARE; "[share] _ccx_share_send: malloc failed"); - return CcxShareStatus::Fail; - } - - debug!(msg_type = DebugMessageFlag::SHARE; "[share] _ccx_share_send: packing"); - ccxr_sub_entry_message_pack(msg, &mut buf).expect( - "Failed to pack message", - ); - - debug!(msg_type = DebugMessageFlag::SHARE; "[share] _ccx_share_send: sending"); - let sent: c_int = nn_send( - CCX_SHARE_CTX.lock().unwrap().nn_sock, - buf.as_ptr() as *const c_void, - len, - 0, - ); - if sent != len as c_int { - buf.clear(); - debug!(msg_type = DebugMessageFlag::SHARE; "[share] _ccx_share_send: len={} sent={}", len, sent); - return CcxShareStatus::Fail; - } - buf.clear(); - debug!(msg_type = DebugMessageFlag::SHARE; "[share] _ccx_share_send: sent"); - CcxShareStatus::Ok -} - -pub unsafe fn ccxr_share_stream_done(stream_name: &str) -> CcxShareStatus { - let mut msg = CcxSubEntryMessage { - eos: 1, - stream_name: stream_name.parse().unwrap(), - counter: 0, - start_time: 0, - end_time: 0, - lines: Vec::new(), - }; - #[allow(unused)] - let mut ctx = CCX_SHARE_CTX.lock().unwrap(); - - if _ccxr_share_send(&msg) != CcxShareStatus::Ok { - ccxr_sub_entry_msg_cleanup(&mut msg); - debug!(msg_type = DebugMessageFlag::SHARE; "[share] ccx_share_stream_done: can't send message"); - return CcxShareStatus::Fail; - } - debug!(msg_type = DebugMessageFlag::SHARE; "[share] ccx_share_stream_done: message sent successfully"); - CcxShareStatus::Ok -} - -pub fn ccxr_share_sub_to_entries(sub: &CcSubtitle, entries: &mut CcxSubEntries) -> CcxShareStatus { - unsafe { - let mut ctx = CCX_SHARE_CTX.lock().unwrap(); - - debug!(msg_type = DebugMessageFlag::SHARE; "[share] _ccx_share_sub_to_entries"); - - if sub.type_ == SUBTYPE_CC_608 { - debug!(msg_type = DebugMessageFlag::SHARE; "[share] CC_608"); - - let data_ptr = sub.data as *const Eia608Screen; - let mut nb_data = sub.nb_data; - - while nb_data > 0 { - let data = &*data_ptr.add(sub.nb_data as usize - nb_data as usize); - - debug!(msg_type = DebugMessageFlag::SHARE; "[share] data item"); - - if data.format == CcxEia608Format::SFormatXds { - debug!(msg_type = DebugMessageFlag::SHARE; "[share] XDS. Skipping"); - nb_data -= 1; - continue; - } - - if data.start_time == 0 { - debug!(msg_type = DebugMessageFlag::SHARE; "[share] No start time. Skipping"); - break; - } - - entries.messages.push(CcxSubEntryMessage { - eos: 0, - stream_name: String::new(), - counter: ctx.counter + 1, - start_time: data.start_time, - end_time: data.end_time, - lines: Vec::new(), - }); - - let entry_index = entries.messages.len() - 1; - let entry = &mut entries.messages[entry_index]; - - for row in 0..CCX_DECODER_608_SCREEN_ROWS { - if data.row_used[row] != 0 { - let characters = CStr::from_ptr(data.characters[row].as_ptr()) - .to_string_lossy() - .to_string(); - entry.lines.push(characters); - } - } - - if entry.lines.is_empty() { - debug!(msg_type = DebugMessageFlag::SHARE; "[share] buffer is empty"); - entries.messages.pop(); - return CcxShareStatus::Ok; - } - - debug!( - msg_type = DebugMessageFlag::SHARE; - "[share] Copied {} lines", entry.lines.len() - ); - - ctx.counter += 1; - nb_data -= 1; - - debug!(msg_type = DebugMessageFlag::SHARE; "[share] item done"); - } - } else { - match sub.type_ { - SUBTYPE_CC_BITMAP => { - debug!(msg_type = DebugMessageFlag::SHARE; "[share] CC_BITMAP. Skipping"); - } - SUBTYPE_CC_RAW => { - debug!(msg_type = DebugMessageFlag::SHARE; "[share] CC_RAW. Skipping"); - } - SUBTYPE_CC_TEXT => { - debug!(msg_type = DebugMessageFlag::SHARE; "[share] CC_TEXT. Skipping"); - } - _ => { - debug!(msg_type = DebugMessageFlag::SHARE; "[share] Unknown subtitle type"); - } - } - } - - debug!(msg_type = DebugMessageFlag::SHARE; "[share] done"); - CcxShareStatus::Ok - } -} diff --git a/src/rust/lib_ccxr/src/share/tests.rs b/src/rust/lib_ccxr/src/share/tests.rs index b9c2c712e..ecb845222 100644 --- a/src/rust/lib_ccxr/src/share/tests.rs +++ b/src/rust/lib_ccxr/src/share/tests.rs @@ -1,13 +1,15 @@ #![allow(unused_imports)] #![allow(unused)] -use libc::c_char; -use crate::share::share::*; -use crate::share::ccxr_sub_entry_message::*; -use std::sync::Once; -use crate::util::log::{set_logger, CCExtractorLogger, DebugMessageFlag, DebugMessageMask, OutputTarget}; +#[cfg(feature = "enable_sharing")] mod test { - use super::*; + use crate::share::ccxr_sub_entry_message::*; + use crate::share::functions::sharing::*; + use crate::util::log::{ + set_logger, CCExtractorLogger, DebugMessageFlag, DebugMessageMask, OutputTarget, + }; + use std::os::raw::c_char; + use std::sync::Once; static INIT: Once = Once::new(); @@ -18,7 +20,7 @@ mod test { DebugMessageMask::new(DebugMessageFlag::VERBOSE, DebugMessageFlag::VERBOSE), false, )) - .ok(); + .ok(); }); } #[test] @@ -48,11 +50,7 @@ mod test { counter: 0, start_time: 0, end_time: 0, - lines: vec![ - "test".to_string(), - "test".to_string(), - "test".to_string(), - ], + lines: vec!["test".to_string(), "test".to_string(), "test".to_string()], }; unsafe { @@ -117,7 +115,6 @@ mod test { } } - #[test] fn test_ccxr_share_send() { initialize_logger(); @@ -138,7 +135,6 @@ mod test { time_out: 0, next: std::ptr::null_mut(), prev: std::ptr::null_mut(), - }; let status = unsafe { ccxr_share_send(&mut sub as *mut CcSubtitle) }; assert!(matches!(status, CcxShareStatus::Ok | CcxShareStatus::Fail)); @@ -160,7 +156,6 @@ mod test { assert!(matches!(status, CcxShareStatus::Ok | CcxShareStatus::Fail)); } - const CCX_DECODER_608_SCREEN_WIDTH: usize = 32; const CCX_DECODER_608_SCREEN_ROWS: usize = 15; @@ -204,25 +199,50 @@ mod test { flags: 0, lang_index: 0, got_output: 1, - mode: [b'M' as c_char, b'O' as c_char, b'D' as c_char, b'E' as c_char, 0], + mode: [ + b'M' as c_char, + b'O' as c_char, + b'D' as c_char, + b'E' as c_char, + 0, + ], info: [b'I' as c_char, b'N' as c_char, b'F' as c_char, 0], time_out: 0, next: std::ptr::null_mut(), prev: std::ptr::null_mut(), }; - let mut entries = CcxSubEntries { messages: Vec::new() }; + let mut entries = CcxSubEntries { + messages: Vec::new(), + }; let status = ccxr_share_sub_to_entries(&sub, &mut entries); - assert_eq!(status, CcxShareStatus::Ok, "Function should return OK status"); - assert_eq!(entries.messages.len(), 1, "There should be one entry in messages"); + assert_eq!( + status, + CcxShareStatus::Ok, + "Function should return OK status" + ); + assert_eq!( + entries.messages.len(), + 1, + "There should be one entry in messages" + ); let message = &entries.messages[0]; assert_eq!(message.start_time, 1000, "Start time should match input"); assert_eq!(message.end_time, 2000, "End time should match input"); assert_eq!(message.lines.len(), 3, "There should be 3 lines of content"); - assert_eq!(message.lines[0], "Hello, World!", "First line content mismatch"); - assert_eq!(message.lines[1], "Subtitle line 2", "Second line content mismatch"); - assert_eq!(message.lines[2], "Subtitle line 3", "Third line content mismatch"); + assert_eq!( + message.lines[0], "Hello, World!", + "First line content mismatch" + ); + assert_eq!( + message.lines[1], "Subtitle line 2", + "Second line content mismatch" + ); + assert_eq!( + message.lines[2], "Subtitle line 3", + "Third line content mismatch" + ); } #[test] @@ -243,18 +263,34 @@ mod test { flags: 0, lang_index: 0, got_output: 1, - mode: [b'M' as c_char, b'O' as c_char, b'D' as c_char, b'E' as c_char, 0], + mode: [ + b'M' as c_char, + b'O' as c_char, + b'D' as c_char, + b'E' as c_char, + 0, + ], info: [b'I' as c_char, b'N' as c_char, b'F' as c_char, 0], time_out: 0, next: std::ptr::null_mut(), prev: std::ptr::null_mut(), }; - let mut entries = CcxSubEntries { messages: Vec::new() }; + let mut entries = CcxSubEntries { + messages: Vec::new(), + }; let status = ccxr_share_sub_to_entries(&sub, &mut entries); - assert_eq!(status, CcxShareStatus::Ok, "Function should return OK status"); - assert_eq!(entries.messages.len(), 0, "There should be no messages for empty rows"); + assert_eq!( + status, + CcxShareStatus::Ok, + "Function should return OK status" + ); + assert_eq!( + entries.messages.len(), + 0, + "There should be no messages for empty rows" + ); } } diff --git a/src/rust/src/libccxr_exports/mod.rs b/src/rust/src/libccxr_exports/mod.rs index 19834235d..8e4740f12 100644 --- a/src/rust/src/libccxr_exports/mod.rs +++ b/src/rust/src/libccxr_exports/mod.rs @@ -1,8 +1,8 @@ //! Provides C-FFI functions that are direct equivalent of functions available in C. -pub mod time; #[cfg(feature = "enable_sharing")] pub mod share; +pub mod time; use crate::ccx_options; use lib_ccxr::util::log::*; use lib_ccxr::util::{bits::*, levenshtein::*}; diff --git a/src/rust/src/libccxr_exports/share.rs b/src/rust/src/libccxr_exports/share.rs index 50136f509..6213876a8 100644 --- a/src/rust/src/libccxr_exports/share.rs +++ b/src/rust/src/libccxr_exports/share.rs @@ -1,5 +1,5 @@ use lib_ccxr::share::ccxr_sub_entry_message::*; -use lib_ccxr::share::share::*; +use lib_ccxr::share::functions::sharing::*; use lib_ccxr::util::log::{debug, DebugMessageFlag}; use std::ffi::CStr; /// C-compatible function to clean up a `CcxSubEntryMessage`. @@ -28,7 +28,6 @@ pub unsafe extern "C" fn ccxr_sub_entry_msg_print_c(msg: *const CcxSubEntryMessa ccxr_sub_entry_msg_print(msg); } - #[no_mangle] pub unsafe extern "C" fn ccxr_sub_entries_cleanup_c(entries: *mut CcxSubEntries) { if entries.is_null() { @@ -50,7 +49,9 @@ pub unsafe extern "C" fn ccxr_sub_entries_print_c(entries: *const CcxSubEntries) /// C-compatible function to start the sharing service. #[no_mangle] -pub unsafe extern "C" fn ccxr_share_start_c(stream_name: *const libc::c_char) -> CcxShareStatus { +pub unsafe extern "C" fn ccxr_share_start_c( + stream_name: *const std::os::raw::c_char, +) -> CcxShareStatus { if stream_name.is_null() { return ccxr_share_start(Option::from("unknown")); } @@ -85,7 +86,9 @@ pub unsafe extern "C" fn ccxr_share_send_c(sub: *const CcSubtitle) -> CcxShareSt } #[no_mangle] -pub unsafe extern "C" fn ccxr_share_stream_done_c(stream_name: *const libc::c_char) -> CcxShareStatus { +pub unsafe extern "C" fn ccxr_share_stream_done_c( + stream_name: *const std::os::raw::c_char, +) -> CcxShareStatus { if stream_name.is_null() { return CcxShareStatus::Fail; } From 270bf658e834ab4dcf129d464aa944ba90b981aa Mon Sep 17 00:00:00 2001 From: Deepnarayan Sett Date: Sat, 3 May 2025 22:32:28 +0530 Subject: [PATCH 3/3] Share Module: Added Documentation --- src/rust/lib_ccxr/src/share/mod.rs | 59 ++++++++++++++++++++++++++++++ 1 file changed, 59 insertions(+) diff --git a/src/rust/lib_ccxr/src/share/mod.rs b/src/rust/lib_ccxr/src/share/mod.rs index d981ff34c..f23d0d08d 100644 --- a/src/rust/lib_ccxr/src/share/mod.rs +++ b/src/rust/lib_ccxr/src/share/mod.rs @@ -1,3 +1,62 @@ +//! Module `share` broadcasts subtitle entries over a pub/sub message bus +//! (Nanomsg) and to interoperate with C code via C-compatible FFI bindings. +//! +//! # Overview +//! +//! - CcxSubEntryMessage Represents a single subtitle entry, including timing and text lines. +//! - CcxSubEntries is A collection of `CcxSubEntryMessage` instances generated from a subtitle frame. +//! - CcxShareStatus Enumeration of possible return statuses for share operations. +//! - Ccx_share_ctx Global context holding the Nanomsg socket and stream metadata. +//! +//! ## Features +//! +//! - Initialize and manage a Nanomsg PUB socket for broadcasting messages. +//! - Convert internal `CcSubtitle` frames into one or more [`CcxSubEntryMessage`] instances. +//! - Send packed protobuf messages over the socket, handling lifecycle of messages. +//! - Signal end-of-stream events to subscribers. +//! - Launch external translation processes via system calls. +//! +//! # C-Compatible API +//! +//! All `extern "C"` functions are safe to call from C code and mirror the underlying Rust logic. +//! +//! ## Message Cleanup and Printing +//!| C Function | Rust Binding | +//!|---------------------------------------|--------------------------------------| +//!| [`ccx_sub_entry_msg_cleanup`] | [`ccxr_sub_entry_msg_cleanup`] | +//!| [`ccx_sub_entry_msg_print`] | [`ccxr_sub_entry_msg_print`] | +//!| [`ccx_sub_entries_cleanup`] | [`ccxr_sub_entries_cleanup`] | +//!| [`ccx_sub_entries_print`] | [`ccxr_sub_entries_print`] | +//! +//! ## Service Control +//!| C Function | Rust Binding | +//!|----------------------|-------------------------------| +//!| [`ccx_share_start`] | [`ccxr_share_start`] | +//!| [`ccx_share_stop`] | [`ccxr_share_stop`] | +//! +//! ## Sending Subtitles +//!| C Function | Rust Binding | +//!|-----------------------------|-------------------------------| +//!| [`ccx_share_send`] | [`ccxr_share_send`] | +//!| [`_ccx_share_send`] | [`_ccxr_share_send`] | +//!| [`ccx_share_stream_done`] | [`ccxr_share_stream_done`] | +//!| [`_ccx_share_sub_to_entries`] | [`_ccxr_share_sub_to_entries`]| +//! +//! ## Translator Launch +//!| C Function | Description | +//!|-------------------------------|-------------------------------------------------| +//!| `ccx_share_launch_translator` | Spawn external `cctranslate` process for translation. +//! +//!# Nanomsg Options +//! +//! - Default URL: `tcp://*:3269` (configurable via `ccx_options.sharing_url`). +//! - Linger option set to infinite to ensure messages are delivered before shutdown. + +use crate::share::functions::sharing::{ + ccxr_sub_entries_cleanup, ccxr_sub_entries_print, ccxr_sub_entry_msg_cleanup, + ccxr_sub_entry_msg_print, +}; + pub mod ccxr_sub_entry_message; pub mod functions; pub mod tests;