Skip to content

Commit

Permalink
Merge pull request #4 from devforth/agent-fix
Browse files Browse the repository at this point in the history
Agent fix, better dbs managing
  • Loading branch information
LbP22 committed Feb 28, 2024
2 parents 025b4ee + 0aab629 commit 82f473f
Show file tree
Hide file tree
Showing 10 changed files with 121 additions and 105 deletions.
4 changes: 0 additions & 4 deletions application/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ RUN npm run build
COPY . /code/

FROM alpine
RUN apk add bash curl
# tmp

COPY --from=frontbuilder /code/dist/ /backend/dist/

Expand All @@ -26,8 +24,6 @@ RUN go mod download \
&& go build -o main .

FROM alpine
RUN apk add bash curl
# tmp

COPY --from=frontbuilder /code/dist/ /dist/
COPY --from=backendbuilder /backend/main /backend/main
Expand Down
33 changes: 19 additions & 14 deletions application/backend/app/containerdb/containerdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"os"
"strings"
"time"

"github.com/devforth/OnLogs/app/util"
"github.com/devforth/OnLogs/app/vars"
Expand All @@ -28,6 +29,9 @@ func PutLogMessage(db *leveldb.DB, host string, container string, message_item [
panic("Host is not mentioned!")
}
location := host + "/" + container
if vars.Statuses_DBs[location] == nil {
vars.Statuses_DBs[location] = util.GetDB(host, container, "statuses")
}

if strings.Contains(message_item[1], "ERROR") || strings.Contains(message_item[1], "ERR") || // const statuses_errors = ["ERROR", "ERR", "Error", "Err"];
strings.Contains(message_item[1], "Error") || strings.Contains(message_item[1], "Err") {
Expand All @@ -49,21 +53,28 @@ func PutLogMessage(db *leveldb.DB, host string, container string, message_item [
} else if strings.Contains(message_item[1], "ONLOGS") {
vars.Counters_For_Containers_Last_30_Min[location]["meta"]++
vars.Statuses_DBs[location].Put([]byte(message_item[0]), []byte("meta"), nil)

} else {
vars.Counters_For_Containers_Last_30_Min[location]["other"]++
vars.Statuses_DBs[location].Put([]byte(message_item[0]), []byte("other"), nil)
}

return db.Put([]byte(message_item[0]), []byte(message_item[1]), nil)
err := db.Put([]byte(message_item[0]), []byte(message_item[1]), nil)
tries := 0
for err != nil && tries < 10 {
db = util.GetDB(host, container, "logs")
err = db.Put([]byte(message_item[0]), []byte(message_item[1]), nil)
time.Sleep(10 * time.Millisecond)
tries++
}
if err != nil {
panic(err)
}
return err
}

func GetLogsByStatus(host string, container string, message string, status string, limit int, startWith string, getPrev bool, include bool, caseSensetivity bool) [][]string {
logs_db := util.GetDB(host, container, "logs")
db := util.GetDB(host, container, "statuses")
if host != util.GetHost() || vars.ActiveDBs[container] == nil {
defer logs_db.Close()
}

iter := db.NewIterator(nil, nil)
defer iter.Release()
Expand Down Expand Up @@ -136,9 +147,6 @@ func GetLogsByStatus(host string, container string, message string, status strin

func GetLogs(getPrev bool, include bool, host string, container string, message string, limit int, startWith string, caseSensetivity bool) [][]string {
db := util.GetDB(host, container, "logs")
if host != util.GetHost() || vars.ActiveDBs[container] == nil {
defer db.Close()
}

iter := db.NewIterator(nil, nil)
defer iter.Release()
Expand Down Expand Up @@ -215,18 +223,15 @@ func DeleteContainer(host string, container string, fullDelete bool) {

if vars.ActiveDBs[container] != nil {
vars.ActiveDBs[container].Close()
newActiveDB, _ := leveldb.OpenFile("leveldb/hosts/"+host+"/containers/"+container+"/logs", nil)
vars.ActiveDBs[container] = newActiveDB
vars.ActiveDBs[container] = util.GetDB(host, container, "active")
}
if vars.Statuses_DBs[host+"/"+container] != nil {
vars.Statuses_DBs[host+"/"+container].Close()
newStatusesDB, _ := leveldb.OpenFile("leveldb/hosts/"+host+"/containers/"+container+"/statuses", nil)
vars.Statuses_DBs[host+"/"+container] = newStatusesDB
vars.Statuses_DBs[host+"/"+container] = util.GetDB(host, container, "statuses")
}
if vars.Stat_Containers_DBs[host+"/"+container] != nil {
vars.Stat_Containers_DBs[host+"/"+container].Close()
newStatDB, _ := leveldb.OpenFile("leveldb/hosts/"+host+"/containers/"+container+"/statistics", nil)
vars.Statuses_DBs[host+"/"+container] = newStatDB
vars.Statuses_DBs[host+"/"+container] = util.GetDB(host, container, "statistics")
}
vars.Counters_For_Containers_Last_30_Min[host+"/"+container] = map[string]uint64{"error": 0, "debug": 0, "info": 0, "warn": 0, "meta": 0, "other": 0}
}
22 changes: 15 additions & 7 deletions application/backend/app/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@ func closeActiveStream(containerName string) {
newDaemonStreams = append(newDaemonStreams, stream)
}
}
vars.ActiveDBs[containerName].Close()
if vars.ActiveDBs[containerName] != nil {
vars.ActiveDBs[containerName].Close()
}
vars.ActiveDBs[containerName] = nil
vars.Active_Daemon_Streams = newDaemonStreams
}
Expand Down Expand Up @@ -109,8 +111,8 @@ func CreateDaemonToDBStream(containerName string) {
reader := bufio.NewReader(connection)
readHeader(*reader)

current_db := vars.ActiveDBs[containerName]
host := util.GetHost()
current_db := util.GetDB(host, containerName, "logs")
createLogMessage(current_db, host, containerName, "ONLOGS: Container listening started!")

defer current_db.Close()
Expand Down Expand Up @@ -171,17 +173,23 @@ func GetContainersList() []string {
json.Unmarshal([]byte(body), &result)

var names []string
containersDB, err := leveldb.OpenFile("leveldb/hosts/"+util.GetHost()+"/containersMeta", nil)
if err != nil {
panic(err)

containersMetaDB := vars.ContainersMeta_DBs[util.GetHost()]
if containersMetaDB == nil {
containersMetaDB, err := leveldb.OpenFile("leveldb/hosts/"+util.GetHost()+"/containersMeta", nil)
if err != nil {
panic(err)
}
vars.ContainersMeta_DBs[util.GetHost()] = containersMetaDB
}
defer containersDB.Close()
containersMetaDB = vars.ContainersMeta_DBs[util.GetHost()]

for i := 0; i < len(result); i++ {
name := fmt.Sprintf("%v", result[i]["Names"].([]interface{})[0].(string))[1:]
id := result[i]["Id"].(string)

names = append(names, name)
containersDB.Put([]byte(name), []byte(id), nil)
containersMetaDB.Put([]byte(name), []byte(id), nil)
}

return names
Expand Down
31 changes: 15 additions & 16 deletions application/backend/app/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,47 +4,46 @@ import (
"time"

"github.com/devforth/OnLogs/app/util"
"github.com/devforth/OnLogs/app/vars"
"github.com/syndtr/goleveldb/leveldb"
)

func CreateOnLogsToken() string {
tokensDB, _ := leveldb.OpenFile("leveldb/tokens", nil)
defer tokensDB.Close()

token := util.GenerateJWTSecret()
to_put := time.Now().UTC().Add(24 * time.Hour).String()
tokensDB.Put([]byte(token), []byte(to_put), nil)
err := vars.TokensDB.Put([]byte(token), []byte(to_put), nil)
if err != nil {
vars.TokensDB.Close()
vars.TokensDB, vars.TokensDBErr = leveldb.OpenFile("leveldb/tokens", nil)

err = vars.TokensDB.Put([]byte(token), []byte(to_put), nil)
if err != nil {
panic(err)
}
}
return token
}

func IsTokenExists(token string) bool {
tokensDB, _ := leveldb.OpenFile("leveldb/tokens", nil)
defer tokensDB.Close()

iter := tokensDB.NewIterator(nil, nil)
iter := vars.TokensDB.NewIterator(nil, nil)
defer iter.Release()
iter.First()
if string(iter.Key()) == token {
tokensDB.Put([]byte(token), []byte("was used"), nil)
vars.TokensDB.Put([]byte(token), []byte("was used"), nil)
return true
}
for iter.Next() {
if string(iter.Key()) == token {
tokensDB.Put([]byte(token), []byte("was used"), nil)
vars.TokensDB.Put([]byte(token), []byte("was used"), nil)
return true
}
}

return false
}

func DeleteUnusedTokens() {
for {
db, r := leveldb.OpenFile("leveldb/tokens", nil)
if r != nil {
panic(r)
}

db := vars.TokensDB
iter := db.NewIterator(nil, nil)
for iter.Next() {
wasUsed := string(iter.Value())
Expand Down
14 changes: 6 additions & 8 deletions application/backend/app/routes/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/devforth/OnLogs/app/util"
"github.com/devforth/OnLogs/app/vars"
"github.com/gorilla/websocket"
"github.com/syndtr/goleveldb/leveldb"
)

func enableCors(w *http.ResponseWriter) {
Expand Down Expand Up @@ -128,16 +127,14 @@ func AddLogLine(w http.ResponseWriter, req *http.Request) {
if vars.Counters_For_Hosts_Last_30_Min[logItem.Host] == nil {
go statistics.RunStatisticForContainer(logItem.Host, logItem.Container)
}
location := logItem.Host + "/" + logItem.Container
if vars.Statuses_DBs[location] == nil {
vars.Statuses_DBs[location] = util.GetDB(logItem.Host, logItem.Container, "statuses")
err := containerdb.PutLogMessage(util.GetDB(logItem.Host, logItem.Container, "logs"), logItem.Host, logItem.Container, logItem.LogLine)
if err != nil {
defer w.WriteHeader(http.StatusInternalServerError)
panic(err)
}
current_db, _ := leveldb.OpenFile("leveldb/hosts/"+logItem.Host+"/containers/"+logItem.Container+"/logs", nil)
containerdb.PutLogMessage(current_db, logItem.Host, logItem.Container, logItem.LogLine)
defer current_db.Close()

to_send, _ := json.Marshal([]string{logItem.LogLine[0], logItem.LogLine[1]})
for _, c := range vars.Connections[location] {
for _, c := range vars.Connections[logItem.Host+"/"+logItem.Container] {
c.WriteMessage(1, to_send)
}
}
Expand All @@ -162,6 +159,7 @@ func AddHost(w http.ResponseWriter, req *http.Request) {
}

vars.AgentsActiveContainers[addReq.Hostname] = addReq.Services
fmt.Println("New host added: " + addReq.Hostname)
for _, container := range addReq.Services {
os.MkdirAll("leveldb/hosts/"+addReq.Hostname+"/containers/"+container, 0700)
}
Expand Down
18 changes: 5 additions & 13 deletions application/backend/app/statistics/statistics.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ import (

"github.com/devforth/OnLogs/app/util"
"github.com/devforth/OnLogs/app/vars"
"github.com/syndtr/goleveldb/leveldb"
)

func restartStats(host string, container string, current_db *leveldb.DB) {
func restartStats(host string, container string) {
current_db := util.GetDB(host, container, "statistics")

var used_storage map[string]map[string]uint64
var location string
if container == "" {
Expand Down Expand Up @@ -41,15 +42,9 @@ func restartStats(host string, container string, current_db *leveldb.DB) {
func RunStatisticForContainer(host string, container string) {
location := host + "/" + container
vars.Counters_For_Containers_Last_30_Min[location] = map[string]uint64{"error": 0, "debug": 0, "info": 0, "warn": 0, "meta": 0, "other": 0}
if vars.Stat_Containers_DBs[location] == nil {
current_db, _ := leveldb.OpenFile("leveldb/hosts/"+host+"/containers/"+container+"/statistics", nil)
defer current_db.Close()
vars.Stat_Containers_DBs[location] = current_db
}
defer delete(vars.Stat_Containers_DBs, location)
defer restartStats(host, container, vars.Stat_Containers_DBs[location])
defer restartStats(host, container)
for {
restartStats(host, container, vars.Stat_Containers_DBs[location])
restartStats(host, container)
time.Sleep(30 * time.Minute)
}
}
Expand All @@ -72,9 +67,6 @@ func GetStatisticsByService(host string, service string, value int) map[string]u
searchTo := time.Now().Add(-(time.Hour * time.Duration(value/2))).UTC()
var tmp_stats map[string]uint64
current_db := util.GetDB(host, service, "statistics")
if vars.Stat_Containers_DBs[location] == nil {
defer current_db.Close()
}
iter := current_db.NewIterator(nil, nil)
defer iter.Release()
iter.Last()
Expand Down
18 changes: 0 additions & 18 deletions application/backend/app/streamer/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,32 +11,14 @@ import (
"github.com/devforth/OnLogs/app/statistics"
"github.com/devforth/OnLogs/app/util"
"github.com/devforth/OnLogs/app/vars"
"github.com/syndtr/goleveldb/leveldb"
)

func createStreams(containers []string) {
for _, container := range vars.DockerContainers {
if !util.Contains(container, vars.Active_Daemon_Streams) {
go statistics.RunStatisticForContainer(util.GetHost(), container)
newDB, err := leveldb.OpenFile("leveldb/hosts/"+util.GetHost()+"/containers/"+container+"/logs", nil)
if err != nil {
fmt.Println("ERROR: " + container + ": " + err.Error())
newDB, err = leveldb.RecoverFile("leveldb/hosts/"+util.GetHost()+"/containers/"+container+"/logs", nil)
fmt.Println("INFO: " + container + ": recovering db...")
if err == nil {
fmt.Println("INFO: " + container + ": db recovered!")
} else {
fmt.Println("ERROR: " + container + ": " + err.Error())
}
}
if vars.Statuses_DBs[util.GetHost()+"/"+container] == nil {
statusesDB, _ := leveldb.OpenFile("leveldb/hosts/"+util.GetHost()+"/containers/"+container+"/statuses", nil)
vars.Statuses_DBs[util.GetHost()+"/"+container] = statusesDB
}
vars.ActiveDBs[container] = newDB
vars.Active_Daemon_Streams = append(vars.Active_Daemon_Streams, container)
if os.Getenv("AGENT") != "" {
vars.BrokenLogs_DBs[container] = util.GetDB(util.GetHost(), container, "/brokenlogs")
go daemon.CreateDaemonToHostStream(container)
} else {
go daemon.CreateDaemonToDBStream(container)
Expand Down
47 changes: 37 additions & 10 deletions application/backend/app/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,18 +88,37 @@ func GetDB(host string, container string, dbType string) *leveldb.DB {
res_db = vars.Statuses_DBs[host+"/"+container]
} else if dbType == "statistics" {
res_db = vars.Stat_Containers_DBs[host+"/"+container]
} else if dbType == "brokenlogs" {
res_db = vars.BrokenLogs_DBs[container]
}

if res_db != nil {
return res_db
}

var err error
if res_db == nil {
path := "leveldb/hosts/" + host + "/containers/" + container + "/" + dbType
res_db, err = leveldb.OpenFile(path, nil)
if err != nil {
res_db, err = leveldb.RecoverFile(path, nil)
}
tries := 0
path := "leveldb/hosts/" + host + "/containers/" + container + "/" + dbType
res_db, err = leveldb.OpenFile(path, nil)
for (err != nil && res_db == nil) && tries < 10 {
res_db, err = leveldb.RecoverFile(path, nil)
fmt.Println(path, err)
time.Sleep(10 * time.Millisecond)
tries++
}

if err != nil {
fmt.Println("ERROR: unable to open db for "+host+"/"+container+"/"+dbType, err)
panic("ERROR: unable to open db for " + host + "/" + container + "/" + dbType + "\n" + err.Error())
}

if dbType == "logs" {
vars.ActiveDBs[container] = res_db
} else if dbType == "statuses" {
vars.Statuses_DBs[host+"/"+container] = res_db
} else if dbType == "statistics" {
vars.Stat_Containers_DBs[host+"/"+container] = res_db
} else if dbType == "brokenlogs" {
vars.BrokenLogs_DBs[container] = res_db
}

return res_db
Expand Down Expand Up @@ -185,9 +204,17 @@ func GetDockerContainerID(host string, container string) string {
return ""
}

idDB, _ := leveldb.OpenFile("leveldb/hosts/"+host+"/containersMeta", nil)
defer idDB.Close()
iter := idDB.NewIterator(nil, nil)
containersMetaDB := vars.ContainersMeta_DBs[host]
if containersMetaDB == nil {
containersMetaDB, err := leveldb.OpenFile("leveldb/hosts/"+host+"/containersMeta", nil)
if err != nil {
panic(err)
}
vars.ContainersMeta_DBs[host] = containersMetaDB
}
containersMetaDB = vars.ContainersMeta_DBs[host]

iter := containersMetaDB.NewIterator(nil, nil)
defer iter.Release()

iter.Last()
Expand Down
Loading

0 comments on commit 82f473f

Please sign in to comment.