Skip to content

Commit

Permalink
sync otel code opensource main branch
Browse files Browse the repository at this point in the history
  • Loading branch information
rajesh-1983 committed Sep 26, 2024
2 parents d1a9059 + 850f18c commit 4f2b727
Show file tree
Hide file tree
Showing 5 changed files with 238 additions and 49 deletions.
99 changes: 82 additions & 17 deletions lib/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ import (
"sync/atomic"
)

const (
mux_config_cal_name = "OCC_CONFIG"
oracle_worker_config_cal_name = "OCC_ORACLE_WORKER_CONFIG"
)

// The Config contains all the static configuration
type Config struct {
CertChainFile string
Expand Down Expand Up @@ -181,6 +186,14 @@ type Config struct {

// Max desired percentage of healthy workers for the worker pool
MaxDesiredHealthyWorkerPct int

// Oracle Worker Configs
EnableCache bool
EnableHeartBeat bool
EnableQueryReplaceNL bool
EnableBindHashLogging bool
EnableSessionVariables bool
UseNonBlocking bool
}

// The OpsConfig contains the configuration that can be modified during run time
Expand Down Expand Up @@ -232,7 +245,6 @@ func InitConfig(poolName string) error {
} else {
currentDir = currentDir + "/"
}

filename := currentDir + "hera.txt"

cdb, err := config.NewTxtConfig(filename)
Expand Down Expand Up @@ -310,6 +322,7 @@ func InitConfig(poolName string) error {
}

gAppConfig.EnableSharding = cdb.GetOrDefaultBool("enable_sharding", false)

gAppConfig.UseShardMap = cdb.GetOrDefaultBool("use_shardmap", true)
gAppConfig.NumOfShards = cdb.GetOrDefaultInt("num_shards", 1)
if gAppConfig.EnableSharding == false || gAppConfig.UseShardMap == false {
Expand Down Expand Up @@ -368,6 +381,14 @@ func InitConfig(poolName string) error {
// TODO:
gAppConfig.NumStdbyDbs = 1

// Fetch Oracle worker configurations.. The defaults must be same between oracle worker and here for accurate logging.
gAppConfig.EnableCache = cdb.GetOrDefaultBool("enable_cache", false)
gAppConfig.EnableHeartBeat = cdb.GetOrDefaultBool("enable_heart_beat", false)
gAppConfig.EnableQueryReplaceNL = cdb.GetOrDefaultBool("enable_query_replace_nl", true)
gAppConfig.EnableBindHashLogging = cdb.GetOrDefaultBool("enable_bind_hash_logging", false)
gAppConfig.EnableSessionVariables = cdb.GetOrDefaultBool("enable_session_variables", false)
gAppConfig.UseNonBlocking = cdb.GetOrDefaultBool("use_non_blocking", false)

var numWorkers int
numWorkers = 6
//err = config.InitOpsConfigWithName("../opscfg/hera.txt")
Expand Down Expand Up @@ -544,17 +565,15 @@ func LogOccConfigs() {
"hostname_prefix": gAppConfig.HostnamePrefix,
"sharding_cross_keys_err": gAppConfig.ShardingCrossKeysErr,
//"enable_sql_rewrite", // not found anywhere?
"sharding_algo": gAppConfig.ShardingAlgoHash,
"cfg_from_tns_override_num_shards": gAppConfig.CfgFromTnsOverrideNumShards,
"sharding_algo": gAppConfig.ShardingAlgoHash,
},
"TAF": {
"enable_taf": gAppConfig.EnableTAF,
"cfg_from_tns_override_taf": gAppConfig.CfgFromTnsOverrideTaf,
"testing_enable_dml_taf": gAppConfig.TestingEnableDMLTaf,
"taf_timeout_ms": gAppConfig.TAFTimeoutMs,
"taf_bin_duration": gAppConfig.TAFBinDuration,
"taf_allow_slow_every_x": gAppConfig.TAFAllowSlowEveryX,
"taf_normally_slow_count": gAppConfig.TAFNormallySlowCount,
"enable_taf": gAppConfig.EnableTAF,
"testing_enable_dml_taf": gAppConfig.TestingEnableDMLTaf,
"taf_timeout_ms": gAppConfig.TAFTimeoutMs,
"taf_bin_duration": gAppConfig.TAFBinDuration,
"taf_allow_slow_every_x": gAppConfig.TAFAllowSlowEveryX,
"taf_normally_slow_count": gAppConfig.TAFNormallySlowCount,
},
"BIND-EVICTION": {
"child.executable": gAppConfig.ChildExecutable,
Expand Down Expand Up @@ -595,24 +614,42 @@ func LogOccConfigs() {
"max_desire_healthy_worker_pct": gAppConfig.MaxDesiredHealthyWorkerPct,
},
"R-W-SPLIT": {
"readonly_children_pct": gAppConfig.ReadonlyPct,
"cfg_from_tns_override_rw_split": gAppConfig.CfgFromTnsOverrideRWSplit,
"readonly_children_pct": gAppConfig.ReadonlyPct,
},
"RAC": {
"management_table_prefix": gAppConfig.ManagementTablePrefix,
"rac_sql_interval": gAppConfig.RacMaintReloadInterval,
"rac_restart_window": gAppConfig.RacRestartWindow,
},
"NO-CATEGORY": {
"GENERAL-CONFIGURATIONS": {
"database_type": gAppConfig.DatabaseType, // Oracle = 0; MySQL=1; POSTGRES=2
"cfg_from_tns": gAppConfig.CfgFromTns,
"log_level": gOpsConfig.logLevel,
"high_load_pct": gAppConfig.HighLoadPct,
"init_limit_pct": gAppConfig.InitLimitPct,
"num_standby_dbs": gAppConfig.NumStdbyDbs,
},
"ENABLE_CFG_FROM_TNS": {
"cfg_from_tns": gAppConfig.CfgFromTns,
"cfg_from_tns_override_num_shards": gAppConfig.CfgFromTnsOverrideNumShards,
"cfg_from_tns_override_taf": gAppConfig.CfgFromTnsOverrideTaf,
"cfg_from_tns_override_rw_split": gAppConfig.CfgFromTnsOverrideRWSplit,
},
"STATEMENT-CACHE": {
"enable_cache": gAppConfig.EnableCache,
"enable_heart_beat": gAppConfig.EnableHeartBeat,
"enable_query_replace_nl": gAppConfig.EnableQueryReplaceNL,
},
"SESSION-VARIABLES": {
"enable_session_variables": gAppConfig.EnableSessionVariables,
},
"BIND-HASH-LOGGING": {
"enable_bind_hash_logging": gAppConfig.EnableBindHashLogging,
},
"KEEP-ALIVE": {
"use_non_blocking": gAppConfig.UseNonBlocking,
},
}

calName := mux_config_cal_name
for feature, configs := range whiteListConfigs {
switch feature {
case "BACKLOG":
Expand Down Expand Up @@ -643,17 +680,45 @@ func LogOccConfigs() {
if gAppConfig.ReadonlyPct == 0 {
continue
}
case "SOFT-EVICTION", "BIND-EVICTION":
case "SATURATION-RECOVERY", "BIND-EVICTION":
if GetSatRecoverThrottleRate() <= 0 {
continue
}
case "SOFT-EVICTION":
if GetSatRecoverThrottleRate() <= 0 && gAppConfig.SoftEvictionProbability <= 0 {
continue
}
case "MANUAL-RATE-LIMITER":
if !gAppConfig.EnableQueryBindBlocker {
continue
}
case "ENABLE_CFG_FROM_TNS":
if !gAppConfig.CfgFromTns {
continue
}
case "STATEMENT-CACHE":
if !gAppConfig.EnableCache {
continue
}
calName = oracle_worker_config_cal_name
case "SESSION-VARIABLES":
if !gAppConfig.EnableSessionVariables {
continue
}
calName = oracle_worker_config_cal_name
case "BIND-HASH-LOGGING":
if !gAppConfig.EnableBindHashLogging {
continue
}
calName = oracle_worker_config_cal_name
case "KEEP-ALIVE":
if !gAppConfig.UseNonBlocking {
continue
}
calName = oracle_worker_config_cal_name
}

evt := cal.NewCalEvent("OCC_CONFIG", fmt.Sprintf(feature), cal.TransOK, "")
evt := cal.NewCalEvent(calName, fmt.Sprintf(feature), cal.TransOK, "")
for cfg, val := range configs {
s := fmt.Sprintf("%v", val)
evt.AddDataStr(cfg, s)
Expand Down
2 changes: 1 addition & 1 deletion lib/coordinatortaf.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (p *tafResponsePreproc) Write(bf []byte) (int, error) {
}
ora, sz := atoi(ns.Payload)
switch ora {
case 3113, 3114, 3135, 12514, 3128, 3127, 3123, 3111, 3106, 1012, 28, 31, 51, 25400, 25401, 25402, 25403, 25404, 25405, 25407, 25408, 25409, 25425, 24343, 1041, 600, 700, 7445:
case 3113, 3114, 3135, 12514, 3128, 3127, 3123, 3111, 3106, 1012, 28, 31, 51, 25400, 25401, 25402, 25403, 25404, 25405, 25407, 25408, 25409, 25425, 24343, 1041, 600, 700, 7445, 4025:
//for testing 962=<table doesn't exist>: case 942:
p.ok = false
p.ora = string(ns.Payload[:sz])
Expand Down
14 changes: 8 additions & 6 deletions lib/racmaint.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,14 @@ func InitRacMaint(cmdLineModuleName string) {
interval := GetConfig().RacMaintReloadInterval
if interval > 0 {
for i := 0; i < GetConfig().NumOfShards; i++ {
go racMaintMain(i, interval, cmdLineModuleName)
shardIndex := i //Address the behavior called variable capture.
go racMaintMain(shardIndex, interval, cmdLineModuleName)
}
}
}

// racMaintMain wakes up every n seconds (configured in "rac_sql_interval") and reads the table
//
// [ManagementTablePrefix]_maint table to see if maintenance is requested
func racMaintMain(shard int, interval int, cmdLineModuleName string) {
if logger.GetLogger().V(logger.Debug) {
Expand Down Expand Up @@ -109,8 +111,8 @@ func racMaintMain(shard int, interval int, cmdLineModuleName string) {
}

/*
racMaint is the main function for RAC maintenance processing, being called regularly.
When maintenance is planned, it calls workerpool.RacMaint to start the actuall processing
racMaint is the main function for RAC maintenance processing, being called regularly.
When maintenance is planned, it calls workerpool.RacMaint to start the actuall processing
*/
func racMaint(ctx context.Context, shard int, db *sql.DB, racSQL string, cmdLineModuleName string, prev map[racCfgKey]racCfg) {
//
Expand Down Expand Up @@ -220,12 +222,12 @@ func racMaint(ctx context.Context, shard int, db *sql.DB, racSQL string, cmdLine
workerpool, err = GetWorkerBrokerInstance().GetWorkerPool(wtypeRW, 0, shard)
}
if err == nil {
go workerpool.RacMaint(racReq)
workerpool.RacMaint(racReq)
}
if GetConfig().ReadonlyPct > 0 {
workerpool, err := GetWorkerBrokerInstance().GetWorkerPool(wtypeRO, 0, shard)
workerpool, err = GetWorkerBrokerInstance().GetWorkerPool(wtypeRO, 0, shard)
if err == nil {
go workerpool.RacMaint(racReq)
workerpool.RacMaint(racReq)
}
}
prev[cfgKey] = row
Expand Down
50 changes: 25 additions & 25 deletions lib/workerpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (pool *WorkerPool) spawnWorker(wid int) error {
worker.setState(wsSchd)
millis := rand.Intn(GetConfig().RandomStartMs)
if logger.GetLogger().V(logger.Alert) {
logger.GetLogger().Log(logger.Alert, wid, "randomized start ms",millis)
logger.GetLogger().Log(logger.Alert, wid, "randomized start ms", millis)
}
time.Sleep(time.Millisecond * time.Duration(millis))

Expand All @@ -131,7 +131,7 @@ func (pool *WorkerPool) spawnWorker(wid int) error {
}
millis := rand.Intn(3000)
if logger.GetLogger().V(logger.Alert) {
logger.GetLogger().Log(logger.Alert, initCnt, "is too many in init state. waiting to start",wid)
logger.GetLogger().Log(logger.Alert, initCnt, "is too many in init state. waiting to start", wid)
}
time.Sleep(time.Millisecond * time.Duration(millis))
}
Expand Down Expand Up @@ -233,8 +233,10 @@ func (pool *WorkerPool) WorkerReady(worker *WorkerClient) (err error) {
// GetWorker gets the active worker if available. backlog with timeout if not.
//
// @param sqlhash to check for soft eviction against a blacklist of slow queries.
// if getworker needs to exam the incoming sql, there does not seem to be another elegant
// way to do this except to pass in the sqlhash as a parameter.
//
// if getworker needs to exam the incoming sql, there does not seem to be another elegant
// way to do this except to pass in the sqlhash as a parameter.
//
// @param timeoutMs[0] timeout in milliseconds. default to adaptive queue timeout.
func (pool *WorkerPool) GetWorker(sqlhash int32, timeoutMs ...int) (worker *WorkerClient, t string, err error) {
if logger.GetLogger().V(logger.Debug) {
Expand Down Expand Up @@ -559,10 +561,10 @@ func (pool *WorkerPool) ReturnWorker(worker *WorkerClient, ticket string) (err e
}
if skipRecycle {
if logger.GetLogger().V(logger.Alert) {
logger.GetLogger().Log(logger.Alert, "Non Healthy Worker found in pool, module_name=",pool.moduleName,"shard_id=",pool.ShardID, "HEALTHY worker Count=",pool.GetHealthyWorkersCount(),"TotalWorkers:=", pool.desiredSize)
logger.GetLogger().Log(logger.Alert, "Non Healthy Worker found in pool, module_name=", pool.moduleName, "shard_id=", pool.ShardID, "HEALTHY worker Count=", pool.GetHealthyWorkersCount(), "TotalWorkers:=", pool.desiredSize)
}
calMsg := fmt.Sprintf("Recycle(worker_pid)=%d, module_name=%s,shard_id=%d", worker.pid, worker.moduleName, worker.shardID)
evt := cal.NewCalEvent("SKIP_RECYCLE_WORKER","ReturnWorker", cal.TransOK, calMsg)
evt := cal.NewCalEvent("SKIP_RECYCLE_WORKER", "ReturnWorker", cal.TransOK, calMsg)
evt.Completed()
}

Expand Down Expand Up @@ -697,8 +699,6 @@ func (pool *WorkerPool) RacMaint(racReq racAct) {
}
now := time.Now().Unix()
window := GetConfig().RacRestartWindow
dbUname := ""
cnt := 0
pool.poolCond.L.Lock()
for i := 0; i < pool.currentSize; i++ {
if (pool.workers[i] != nil) && (racReq.instID == 0 || pool.workers[i].racID == racReq.instID) && (pool.workers[i].startTime < int64(racReq.tm)) {
Expand All @@ -716,23 +716,23 @@ func (pool *WorkerPool) RacMaint(racReq racAct) {
}

if logger.GetLogger().V(logger.Verbose) {
logger.GetLogger().Log(logger.Verbose, "Rac maint activating, worker", i, pool.workers[i].pid, "exittime=", pool.workers[i].exitTime, now, window, pool.currentSize)
}
cnt++
if len(dbUname) == 0 {
dbUname = pool.workers[i].dbUname
logger.GetLogger().Log(logger.Verbose, "Rac maint activating, worker", i, pool.workers[i].pid, "exittime=", pool.workers[i].exitTime, now, window, pool.currentSize, "rac.req timestamp=", racReq.tm)
}
//Trigger individual event for worker
evt := cal.NewCalEvent("RAC_ID", fmt.Sprintf("%d", racReq.instID), cal.TransOK, "")
evt.AddDataStr("poolModName", pool.moduleName)
evt.AddDataInt("workerId", int64(i))
evt.AddDataInt("pid", int64(pool.workers[i].pid))
evt.AddDataInt("shardId", int64(pool.ShardID))
evt.AddDataInt("tm", int64(racReq.tm))
evt.AddDataInt("exitTime", pool.workers[i].exitTime)
evt.AddDataStr("exitInSec", fmt.Sprintf("%dsec", pool.workers[i].exitTime-now))
evt.Completed()
evt = cal.NewCalEvent("DB_UNAME", pool.workers[i].dbUname, cal.TransOK, "")
evt.Completed()
}
}
pool.poolCond.L.Unlock()
// TODO: C++ worker logs one event for each worker, in the worker, so
// we keep the same. Think about changing it
for i := 0; i < cnt; i++ {
evt := cal.NewCalEvent("RAC_ID", fmt.Sprintf("%d", racReq.instID), cal.TransOK, "")
evt.Completed()
evt = cal.NewCalEvent("DB_UNAME", dbUname, cal.TransOK, "")
evt.Completed()
}
}

// checkWorkerLifespan is called periodically to check if any worker lifetime has expired and terminates it
Expand Down Expand Up @@ -768,12 +768,12 @@ func (pool *WorkerPool) checkWorkerLifespan() {
pool.poolCond.L.Lock()
for i := 0; i < pool.currentSize; i++ {
if (pool.workers[i] != nil) && (pool.workers[i].exitTime != 0) && (pool.workers[i].exitTime <= now) {
if pool.GetHealthyWorkersCount() < (int32(pool.desiredSize*GetConfig().MaxDesiredHealthyWorkerPct/100)) { // Should it be a config value
if pool.GetHealthyWorkersCount() < (int32(pool.desiredSize * GetConfig().MaxDesiredHealthyWorkerPct / 100)) { // Should it be a config value
if logger.GetLogger().V(logger.Alert) {
logger.GetLogger().Log(logger.Alert, "Non Healthy Worker found in pool, module_name=",pool.moduleName,"shard_id=",pool.ShardID, "HEALTHY worker Count=",pool.GetHealthyWorkersCount(),"TotalWorkers:", pool.desiredSize)
logger.GetLogger().Log(logger.Alert, "Non Healthy Worker found in pool, module_name=", pool.moduleName, "shard_id=", pool.ShardID, "HEALTHY worker Count=", pool.GetHealthyWorkersCount(), "TotalWorkers:", pool.desiredSize)
}
calMsg := fmt.Sprintf("module_name=%s,shard_id=%d", pool.moduleName, pool.ShardID)
evt := cal.NewCalEvent("SKIP_RECYCLE_WORKER","checkWorkerLifespan", cal.TransOK, calMsg)
evt := cal.NewCalEvent("SKIP_RECYCLE_WORKER", "checkWorkerLifespan", cal.TransOK, calMsg)
evt.Completed()
break
}
Expand Down Expand Up @@ -814,7 +814,7 @@ func (pool *WorkerPool) checkWorkerLifespan() {
pool.poolCond.L.Unlock()
for _, w := range workers {
if logger.GetLogger().V(logger.Info) {
logger.GetLogger().Log(logger.Info, "checkworkerlifespan - Lifespan exceeded, terminate worker: pid =", w.pid, ", pool_type =", w.Type, ", inst =", w.instID ,"HEALTHY worker Count=",pool.GetHealthyWorkersCount(),"TotalWorkers:", pool.desiredSize)
logger.GetLogger().Log(logger.Info, "checkworkerlifespan - Lifespan exceeded, terminate worker: pid =", w.pid, ", pool_type =", w.Type, ", inst =", w.instID, "HEALTHY worker Count=", pool.GetHealthyWorkersCount(), "TotalWorkers:", pool.desiredSize)
}
w.Terminate()
}
Expand Down
Loading

0 comments on commit 4f2b727

Please sign in to comment.