Skip to content

Commit

Permalink
speechAnalyzer 3.2.0 - PostgreSQL database implmentation (#14)
Browse files Browse the repository at this point in the history
* Added Vosk server to docker-compose file

* Added command line arguments for Vosk server

* Created framework for SpeechWrapperVosk class

* Updated WebsocketSession to support Vosk speech service

* Makes sure Websocket session closes correctly

* Vosk output now handled and published from JsonBuilder

* Added uuid field to vosk asr messages

* Fixed crashing on trial end bug

* Fixed formatting errors for asr message

* Updated logic to destringify features

* Resolved stream closing bug before end of trial bug

* Handles case of vosk container crashing

* Updated version number

* Removed configuration file

* Added data.features and data.sentiment fields for asr messages using vosk speech  backend

* Updated version number to 3.1.1

* Created framework for Database class

* Implemented connection/disconnection to database as well as framework for publishing features

* Added support for postgres container and libpq

* Added connection properties for postgres db

* Added '.' to list of invalid characters in db column names

* Set participant_id for postgres class

* Timestamp and participant_id fields added to features table

* Created framework for retriving features stored in postgres database

* Features now retrieved from postgres rather than stored in memory

* Turned off storing features in memory

* Postgres database can now be used across multiple trials without needing to reset agent

* Enabled Vosk speech engine for use with Postgres database

* Updated postgres docker image name

* Fixed client_id formatting for database connection

* Updated version number to 3.2.0

* Confroming code to style guidelines

Co-authored-by: Vincent Raymond <[email protected]>
  • Loading branch information
vincentraymond-ua and Vincent Raymond authored Jan 7, 2022
1 parent e11fbd6 commit 06c4fc1
Show file tree
Hide file tree
Showing 11 changed files with 462 additions and 209 deletions.
11 changes: 8 additions & 3 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ project(speechAnalyzer)

cmake_minimum_required(VERSION 3.15)

set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD 20)
set(CMAKE_CXX_STANDARD_REQUIRED ON)

add_compile_definitions(BOOST_DATE_TIME_POSIX_TIME_STD_CONFIG)
Expand All @@ -24,7 +24,7 @@ find_package(nlohmann_json REQUIRED)
find_package(SMILEapi REQUIRED)
find_package(Mosquitto REQUIRED)
find_package(range-v3 REQUIRED)

find_package(PostgreSQL REQUIRED)

option(BUILD_GOOGLE_CLOUD_SPEECH_LIB "Build google_cloud_speech library" ON)

Expand All @@ -47,16 +47,21 @@ add_executable(speechAnalyzer
src/OpensmileSession.cpp
src/SpeechWrapper.cpp
src/SpeechWrapperVosk.cpp
src/DBWrapper.cpp
src/util.cpp
src/base64.c
)

target_include_directories(
speechAnalyzer PRIVATE
${PostgreSQL_INCLUDE_DIRS}
)
target_link_libraries(
speechAnalyzer
SMILEapi
nlohmann_json::nlohmann_json
${Mosquitto_LIBRARIES}
${Boost_LIBRARIES}
${PostgreSQL_LIBRARIES}
-pthread
google_cloud_speech
)
Expand Down
4 changes: 3 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ RUN apt-get update -y && apt-get upgrade -y && \
# nlohmann::json
apt-get install nlohmann-json3-dev -y && \
# Mosquitto
apt-get install mosquitto mosquitto-clients libmosquitto-dev -y
apt-get install mosquitto mosquitto-clients libmosquitto-dev -y && \
# PostgreSQL
apt-get install libpq-dev postgresql-server-dev-all -y

#Install protobuf and grpc
ENV GRPC_RELEASE_TAG v1.35.x
Expand Down
23 changes: 22 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ services:
asist_net:
aliases:
- speech_analyzer_agent
entrypoint: ./speechAnalyzer --mode websocket --disable_opensmile false --disable_asr_google true
entrypoint: ./speechAnalyzer --mode websocket --disable_opensmile false --disable_asr_google true --disable_asr_vosk false
vosk:
image: alphacep/kaldi-en:latest
container_name: vosk
Expand All @@ -35,6 +35,27 @@ services:
- mmc
entrypoint: uvicorn mmc_server:app --host 0.0.0.0 --port 8001

db:
image: features_database:latest
restart: always
environment:
POSTGRES_PASSWORD: docker
ports:
- 63332:5432
networks:
asist_net:
aliases:
- features_db

adminer:
image: adminer
restart: always
ports:
- 63333:8080
networks:
asist_net:
aliases:
- features_adminer
heartbeat:
image: heartbeat:latest
container_name: heartbeat
Expand Down
163 changes: 163 additions & 0 deletions src/DBWrapper.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
#include <iostream>
#include <iterator>
#include <map>
#include <sstream>
#include <stdlib.h>
#include <string>
#include <vector>

#include <boost/algorithm/string.hpp>
#include <boost/uuid/uuid.hpp>
#include <boost/uuid/uuid_generators.hpp>
#include <boost/uuid/uuid_io.hpp>

#include <libpq-fe.h>
#include <nlohmann/json.hpp>

#include "DBWrapper.h"
#include "GlobalMosquittoListener.h"

using namespace std;

const vector<char> DBWrapper::INVALID_COLUMN_CHARACTERS = {
'+', '-', '(', ')', '\n', '.'};

DBWrapper::DBWrapper() {}

DBWrapper::~DBWrapper() {}

void DBWrapper::initialize() {
// Create UUID for client
this->client_id =
boost::uuids::to_string(boost::uuids::random_generator()());

// Create connection string
this->connection_string = "host=" + this->host + " port=" + this->port +
" dbname=" + this->db + " user=" + this->user +
" password= " + this->pass;
}
void DBWrapper::shutdown() {}
void DBWrapper::publish_chunk(nlohmann::json message) {
PGconn* conn;
PGresult* result;

// Create connection
conn = PQconnectdb(this->connection_string.c_str());
if (PQstatus(conn) != CONNECTION_OK) {
std::cout << "Connection error: " << std::endl;
std::cout << PQerrorMessage(conn) << std::endl;
}

// Generate columns and values
vector<string> columns;
vector<double> values;
for (auto element : message["data"]["features"]["lld"].items()) {
columns.push_back(this->format_to_db_string(element.key()));
values.push_back(element.value());
}

// Get timestamp
this->timestamp = message["data"]["tmeta"]["time"];

// Convert columns to string format
ostringstream oss;
for (string element : columns) {
oss << element << ",";
}
oss << "participant, "
<< "timestamp, "
<< "client_id";
string column_string = oss.str();
oss.str("");

// Convert values to string format
for (double element : values) {
oss << to_string(element) << ",";
}
oss << message["data"]["participant_id"] << ","
<< message["data"]["tmeta"]["time"] << ","
<< "\'" << this->client_id << "\'";
string value_string = oss.str();

// Generate sql query
string query = "INSERT INTO features (" + column_string + ") VALUES (" +
value_string + ")";

boost::replace_all(query, "\"", "\'");
// Send query
result = PQexec(conn, query.c_str());
if (result == NULL) {
std::cout << "Execution error: " << std::endl;
std::cout << PQerrorMessage(conn) << std::endl;
}
// Clear result
PQclear(result);

// End connection
PQfinish(conn);
}

vector<nlohmann::json> DBWrapper::features_between(double start_time,
double end_time) {
PGconn* conn;
PGresult* result;

// Create connection
conn = PQconnectdb(this->connection_string.c_str());
if (PQstatus(conn) != CONNECTION_OK) {
std::cout << "Connection error: " << std::endl;
std::cout << PQerrorMessage(conn) << std::endl;
}

// Get features from database
std::string query =
"SELECT * FROM features WHERE timestamp >= " + to_string(start_time) +
" and timestamp <= " + to_string(end_time) + " and client_id=" + "\'" +
this->client_id + "\'";
result = PQexec(conn, query.c_str());
if (result == NULL) {
std::cout << "FAILURE" << std::endl;
std::cout << PQerrorMessage(conn) << std::endl;
}
// Turn features into json object
vector<nlohmann::json> out;
for (int i = 0; i < PQntuples(result); i++) {
nlohmann::json message;
for (int j = 0; j < PQnfields(result); j++) {
if (this->column_map.find(PQfname(result, j)) ==
this->column_map.end()) {
continue;
}
string field = this->column_map[PQfname(result, j)];
double value = atof(PQgetvalue(result, i, j));
message[field] = value;
}
out.push_back(message);
}

// Clear result
PQclear(result);

// End connection
PQfinish(conn);

return out;
}

string DBWrapper::format_to_db_string(std::string in) {
// Check if value already in map
if (this->column_map.find(in) != this->column_map.end()) {
return this->column_map[in];
}

string original = string(in);
boost::to_lower(in);
boost::replace_all(in, ")", "");
for (char c : this->INVALID_COLUMN_CHARACTERS) {
boost::replace_all(in, string(1, c), "_");
}
this->column_map[in] = original;
this->column_map[original] = in;

return in;
}
34 changes: 34 additions & 0 deletions src/DBWrapper.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#include <libpq-fe.h>
#include <map>
#include <nlohmann/json.hpp>
#include <string>
#include <vector>

class DBWrapper {
public:
DBWrapper();
~DBWrapper();

void initialize();
void shutdown();
void publish_chunk(nlohmann::json message);
std::vector<nlohmann::json> features_between(double start_time,
double end_time);

std::string participant_id;
double timestamp;

private:
static const std::vector<char> INVALID_COLUMN_CHARACTERS;
std::map<std::string, std::string> column_map;
std::string format_to_db_string(std::string in);

std::string client_id;

std::string connection_string;
std::string user = "postgres";
std::string pass = "docker";
std::string db = "features";
std::string host = "features_db";
std::string port = "5432";
};
Loading

0 comments on commit 06c4fc1

Please sign in to comment.