Skip to content

Commit

Permalink
dongjiang, add cluster version
Browse files Browse the repository at this point in the history
Signed-off-by: dongjiang1989 <[email protected]>
  • Loading branch information
dongjiang1989 committed Aug 20, 2024
1 parent 3c70ea5 commit 3f1cf01
Show file tree
Hide file tree
Showing 7 changed files with 325 additions and 145 deletions.
39 changes: 21 additions & 18 deletions common/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,34 @@ package mysql
import (
"database/sql"
"fmt"
_ "github.com/go-sql-driver/mysql"
"k8s.io/klog"
"net/url"
"strings"

_ "github.com/go-sql-driver/mysql"
"k8s.io/klog"
)

const (
DEFAULT_TABLE = "kube_event"
)

type MysqlService struct {
db *sql.DB
table string
dsn string
Db *sql.DB
Table string
Dsn string
Cluster string
}

type MysqlKubeEventPoint struct {
Cluster string
Namespace string
Kind string
Name string
Type string
Reason string
Message string
EventID string
Source string
FirstOccurrenceTimestamp string
LastOccurrenceTimestamp string
}
Expand All @@ -52,10 +56,9 @@ func (mySvc MysqlService) SaveData(sinkData []interface{}) error {
return nil
}

prepareStatement := fmt.Sprintf("INSERT INTO %s (namespace,kind,name,type,reason,message,event_id,first_occurrence_time,last_occurrence_time) VALUES(?,?,?,?,?,?,?,?,?)", mySvc.table)

prepareStatement := fmt.Sprintf("INSERT INTO %s (cluster,namespace,kind,name,type,reason,message,event_id,source,first_occurrence_time,last_occurrence_time) VALUES(?,?,?,?,?,?,?,?,?,?,?)", mySvc.Table)
// Prepare statement for inserting data
stmtIns, err := mySvc.db.Prepare(prepareStatement)
stmtIns, err := mySvc.Db.Prepare(prepareStatement)
if err != nil {
klog.Errorf("failed to Prepare statement for inserting data. SQL: %v, err: %v", prepareStatement, err)
return err
Expand All @@ -67,8 +70,8 @@ func (mySvc MysqlService) SaveData(sinkData []interface{}) error {

ked := data.(MysqlKubeEventPoint)
klog.V(7).Infof("Begin Insert Mysql Data ...")
klog.V(8).Infof("Namespace: %s, Kind: %s, Name: %s, Type: %s, Reason: %s, Message: %s, EventID: %s, FirstOccurrenceTimestamp: %s, LastOccurrenceTimestamp: %s ", ked.Namespace, ked.Kind, ked.Name, ked.Type, ked.Reason, ked.Message, ked.EventID, ked.FirstOccurrenceTimestamp, ked.LastOccurrenceTimestamp)
_, err = stmtIns.Exec(ked.Namespace, ked.Kind, ked.Name, ked.Type, ked.Reason, ked.Message, ked.EventID, ked.FirstOccurrenceTimestamp, ked.LastOccurrenceTimestamp)
klog.V(8).Infof("Cluster: %s, Namespace: %s, Kind: %s, Name: %s, Type: %s, Reason: %s, Message: %s, EventID: %s, Source:%s, FirstOccurrenceTimestamp: %s, LastOccurrenceTimestamp: %s ", ked.Cluster, ked.Namespace, ked.Kind, ked.Name, ked.Type, ked.Reason, ked.Message, ked.EventID, ked.Source, ked.FirstOccurrenceTimestamp, ked.LastOccurrenceTimestamp)
_, err = stmtIns.Exec(ked.Cluster, ked.Namespace, ked.Kind, ked.Name, ked.Type, ked.Reason, ked.Message, ked.EventID, ked.Source, ked.FirstOccurrenceTimestamp, ked.LastOccurrenceTimestamp)
if err != nil {
klog.Errorf("failed to Prepare statement for inserting data ")
return err
Expand All @@ -89,24 +92,24 @@ func (mySvc MysqlService) CreateDatabase(name string) error {
}

func (mySvc MysqlService) CloseDB() error {
return mySvc.db.Close()
return mySvc.Db.Close()
}

func NewMysqlClient(uri *url.URL) (*MysqlService, error) {
mysqlSvc := &MysqlService{}

if uri.Query().Get("table") != "" {
mysqlSvc.table = uri.Query().Get("table")
mysqlSvc.Table = uri.Query().Get("table")
slice := strings.Split(uri.RawQuery, "&")
mysqlSvc.dsn = slice[0]
mysqlSvc.Dsn = slice[0]
} else {
mysqlSvc.table = DEFAULT_TABLE
mysqlSvc.dsn = uri.RawQuery
mysqlSvc.Table = DEFAULT_TABLE
mysqlSvc.Dsn = uri.RawQuery
}

klog.Infof("mysql jdbc url: %s", mysqlSvc.dsn)
klog.Infof("mysql jdbc url: %s", mysqlSvc.Dsn)

db, err := sql.Open("mysql", mysqlSvc.dsn)
db, err := sql.Open("mysql", mysqlSvc.Dsn)
if err != nil {
return nil, fmt.Errorf("failed to connect mysql according jdbc url string: %s", err)
}
Expand All @@ -120,7 +123,7 @@ func NewMysqlClient(uri *url.URL) (*MysqlService, error) {
return nil, fmt.Errorf("cannot open a connection for mysql according jdbc url string: %s", err)
}

mysqlSvc.db = db
mysqlSvc.Db = db

return mysqlSvc, nil
}
4 changes: 3 additions & 1 deletion docs/en/mysql-sink.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,19 @@ create table k8s_event
(
id bigint(20) not null auto_increment primary key comment 'event primary key',
name varchar(64) not null default '' comment 'event name',
cluster varchar(64) not null default '' comment 'event cluster name',
namespace varchar(64) not null default '' comment 'event namespace',
event_id varchar(64) not null default '' comment 'event_id',
type varchar(64) not null default '' comment 'event type Warning or Normal',
reason varchar(64) not null default '' comment 'event reason',
message text not null comment 'event message' ,
kind varchar(64) not null default '' comment 'event kind' ,
source varchar(64) not null default '' comment 'event source' ,
first_occurrence_time varchar(64) not null default '' comment 'event first occurrence time',
last_occurrence_time varchar(64) not null default '' comment 'event last occurrence time',
) ENGINE = InnoDB default CHARSET = utf8 comment ='Event info tables';
```

For example:

--sink=mysql:?root:transwarp@tcp(172.16.180.132:3306)/kube_eventer?charset=utf8&table=kube_event
--sink=mysql:?root:transwarp@tcp(172.16.180.132:3306)/kube_eventer?charset=utf8&table=kube_event&cluster=clusterA
14 changes: 6 additions & 8 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,25 @@ require (
github.com/aws/aws-sdk-go v1.34.28
github.com/denverdino/aliyungo v0.0.0-20190410085603-611ead8a6fed
github.com/go-sql-driver/mysql v1.5.0
github.com/golang/protobuf v1.4.3
github.com/golang/protobuf v1.5.4
github.com/google/cadvisor v0.33.1
github.com/google/uuid v1.1.1
github.com/google/uuid v1.3.0
github.com/imdario/mergo v0.3.7 // indirect
github.com/influxdata/influxdb v1.7.7
github.com/mailru/easyjson v0.7.0 // indirect
github.com/olivere/elastic v6.2.23+incompatible // indirect
github.com/olivere/elastic/v7 v7.0.6
github.com/pborman/uuid v1.2.0
github.com/prometheus/client_golang v1.11.1
github.com/riemann/riemann-go-client v0.4.0
github.com/smartystreets/go-aws-auth v0.0.0-20180515143844-0c1422d1fdb9
github.com/smartystreets/gunit v1.0.0 // indirect
github.com/stretchr/testify v1.6.1
github.com/stretchr/testify v1.8.2
go.mongodb.org/mongo-driver v1.5.1
golang.org/x/sys v0.2.0 // indirect
gopkg.in/olivere/elastic.v3 v3.0.75
gopkg.in/olivere/elastic.v5 v5.0.81
gopkg.in/olivere/elastic.v6 v6.2.23
k8s.io/api v0.17.4
k8s.io/apimachinery v0.17.4
k8s.io/client-go v0.17.4
k8s.io/api v0.28.11
k8s.io/apimachinery v0.28.11
k8s.io/client-go v0.28.11
k8s.io/klog v1.0.0
)
Loading

0 comments on commit 3f1cf01

Please sign in to comment.