Skip to content

Commit

Permalink
Merge pull request #225 from outbrain/merge-downstream-gh
Browse files Browse the repository at this point in the history
Merge downstream gh
  • Loading branch information
Shlomi Noach authored Jul 11, 2016
2 parents 83c746b + e27a740 commit 69145f2
Show file tree
Hide file tree
Showing 10 changed files with 150 additions and 30 deletions.
2 changes: 1 addition & 1 deletion build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
#
set -e

RELEASE_VERSION="1.5.0"
RELEASE_VERSION="1.5.5"
TOPDIR=/tmp/orchestrator-release
export RELEASE_VERSION TOPDIR
export GO15VENDOREXPERIMENT=1
Expand Down
15 changes: 13 additions & 2 deletions go/app/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -978,7 +978,7 @@ func Cli(command string, strict bool, instance string, destination string, owner
fmt.Println(clusterInstance.Key.DisplayString())
}
}
case registerCliCommand("which-cluster-osc-slaves", "Information", `Output a list of slaves in same cluster as given instance, that could serve as a pt-online-schema-change operation control slaves`):
case registerCliCommand("which-cluster-osc-slaves", "Information", `Output a list of slaves in a cluster, that could serve as a pt-online-schema-change operation control slaves`):
{
clusterName := getClusterName(clusterAlias, instanceKey)
instances, err := inst.GetClusterOSCSlaves(clusterName)
Expand All @@ -989,6 +989,17 @@ func Cli(command string, strict bool, instance string, destination string, owner
fmt.Println(clusterInstance.Key.DisplayString())
}
}
case registerCliCommand("which-cluster-gh-ost-slaves", "Information", `Output a list of slaves in a cluster, that could serve as a gh-ost working server`):
{
clusterName := getClusterName(clusterAlias, instanceKey)
instances, err := inst.GetClusterGhostSlaves(clusterName)
if err != nil {
log.Fatale(err)
}
for _, clusterInstance := range instances {
fmt.Println(clusterInstance.Key.DisplayString())
}
}
case registerCliCommand("which-master", "Information", `Output the fully-qualified hostname:port representation of a given instance's master`):
{
instanceKey = deduceInstanceKeyIfNeeded(instance, instanceKey, true)
Expand Down Expand Up @@ -1080,7 +1091,7 @@ func Cli(command string, strict bool, instance string, destination string, owner
log.Fatalf("Duration value must be non-negative. Given value: %d", durationSeconds)
}
}
maintenanceKey, err := inst.BeginBoundedMaintenance(instanceKey, inst.GetMaintenanceOwner(), reason, uint(durationSeconds))
maintenanceKey, err := inst.BeginBoundedMaintenance(instanceKey, inst.GetMaintenanceOwner(), reason, uint(durationSeconds), true)
if err == nil {
log.Infof("Maintenance key: %+v", maintenanceKey)
log.Infof("Maintenance duration: %d seconds", durationSeconds)
Expand Down
15 changes: 13 additions & 2 deletions go/app/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package app

import (
"net"
nethttp "net/http"
"strings"

Expand Down Expand Up @@ -122,7 +123,17 @@ func standardHttp(discovery bool) {
http.Web.RegisterRequests(m)

// Serve
if config.Config.UseSSL {
if config.Config.ListenSocket != "" {
log.Infof("Starting HTTP listener on unix socket %v", config.Config.ListenSocket)
unixListener, err := net.Listen("unix", config.Config.ListenSocket)
if err != nil {
log.Fatale(err)
}
defer unixListener.Close()
if err := nethttp.Serve(unixListener, m); err != nil {
log.Fatale(err)
}
} else if config.Config.UseSSL {
log.Info("Starting HTTPS listener")
tlsConfig, err := ssl.NewTLSConfig(config.Config.SSLCAFile, config.Config.UseMutualTLS)
if err != nil {
Expand All @@ -136,7 +147,7 @@ func standardHttp(discovery bool) {
log.Fatale(err)
}
} else {
log.Info("Starting HTTP listener")
log.Infof("Starting HTTP listener on %+v", config.Config.ListenAddress)
if err := nethttp.ListenAndServe(config.Config.ListenAddress, m); err != nil {
log.Fatale(err)
}
Expand Down
10 changes: 7 additions & 3 deletions go/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@ var (
// Some of the parameteres have reasonable default values, and some (like database credentials) are
// strictly expected from user.
type Configuration struct {
Debug bool // set debug mode (similar to --debug option)
EnableSyslog bool // Should logs be directed (in addition) to syslog daemon?
ListenAddress string
Debug bool // set debug mode (similar to --debug option)
EnableSyslog bool // Should logs be directed (in addition) to syslog daemon?
ListenAddress string // Where orchestrator HTTP should listen for TCP
ListenSocket string // Where orchestrator HTTP should listen for unix socket (default: empty; when given, TCP is disabled)
AgentsServerPort string // port orchestrator agents talk back to
MySQLTopologyUser string
MySQLTopologyPassword string // my.cnf style configuration file from where to pick credentials. Expecting `user`, `password` under `[client]` section
Expand Down Expand Up @@ -71,6 +72,7 @@ type Configuration struct {
SnapshotTopologiesIntervalHours uint // Interval in hour between snapshot-topologies invocation. Default: 0 (disabled)
InstanceBulkOperationsWaitTimeoutSeconds uint // Time to wait on a single instance when doing bulk (many instances) operation
ActiveNodeExpireSeconds uint // Maximum time to wait for active node to send keepalive before attempting to take over as active node.
NodeHealthExpiry bool // Do we expire the node_health table? Usually this is true but it might be disabled on command line tools if an orchestrator daemon is running.
HostnameResolveMethod string // Method by which to "normalize" hostname ("none"/"default"/"cname")
MySQLHostnameResolveMethod string // Method by which to "normalize" hostname via MySQL server. ("none"/"@@hostname"/"@@report_host"; default "@@hostname")
SkipBinlogServerUnresolveCheck bool // Skip the double-check that an unresolved hostname resolves back to same hostname for binlog servers
Expand Down Expand Up @@ -192,6 +194,7 @@ func newConfiguration() *Configuration {
Debug: false,
EnableSyslog: false,
ListenAddress: ":3000",
ListenSocket: "",
AgentsServerPort: ":3001",
StatusEndpoint: "/api/status",
StatusSimpleHealth: true,
Expand All @@ -212,6 +215,7 @@ func newConfiguration() *Configuration {
DiscoverByShowSlaveHosts: false,
InstanceBulkOperationsWaitTimeoutSeconds: 10,
ActiveNodeExpireSeconds: 5,
NodeHealthExpiry: true,
HostnameResolveMethod: "default",
MySQLHostnameResolveMethod: "@@hostname",
SkipBinlogServerUnresolveCheck: true,
Expand Down
16 changes: 16 additions & 0 deletions go/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -825,6 +825,22 @@ var generateSQLPatches = []string{
database_instance
MODIFY cluster_name varchar(128) NOT NULL
`,
`
ALTER TABLE
node_health
ADD INDEX last_seen_active_idx (last_seen_active)
`,
`
ALTER TABLE
database_instance_maintenance
ADD COLUMN processing_node_hostname varchar(128) CHARACTER SET ascii NOT NULL,
ADD COLUMN processing_node_token varchar(128) NOT NULL
`,
`
ALTER TABLE
database_instance_maintenance
ADD COLUMN explicitly_bounded TINYINT UNSIGNED NOT NULL
`,
}

// Track if a TLS has already been configured for topology
Expand Down
2 changes: 1 addition & 1 deletion go/http/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func (this *HttpAPI) BeginMaintenance(params martini.Params, r render.Render, re
r.JSON(200, &APIResponse{Code: ERROR, Message: err.Error()})
return
}
key, err := inst.BeginMaintenance(&instanceKey, params["owner"], params["reason"])
key, err := inst.BeginBoundedMaintenance(&instanceKey, params["owner"], params["reason"], 0, true)
if err != nil {
r.JSON(200, &APIResponse{Code: ERROR, Message: err.Error(), Details: key})
return
Expand Down
49 changes: 47 additions & 2 deletions go/inst/instance_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,8 +288,13 @@ func ReadTopologyInstance(instanceKey *InstanceKey) (*Instance, error) {
instance.MasterKey = *masterKey
instance.IsDetachedMaster = instance.MasterKey.IsDetached()
instance.SecondsBehindMaster = m.GetNullInt64("Seconds_Behind_Master")
if instance.SecondsBehindMaster.Valid && instance.SecondsBehindMaster.Int64 < 0 {
log.Warningf("Host: %+v, instance.SecondsBehindMaster < 0 [%+v], correcting to 0", instanceKey, instance.SecondsBehindMaster.Int64)
instance.SecondsBehindMaster.Int64 = 0
}
// And until told otherwise:
instance.SlaveLagSeconds = instance.SecondsBehindMaster

instance.AllowTLS = (m.GetString("Master_SSL_Allowed") == "Yes")
// Not breaking the flow even on error
slaveStatusFound = true
Expand Down Expand Up @@ -424,8 +429,12 @@ func ReadTopologyInstance(instanceKey *InstanceKey) (*Instance, error) {
}

if config.Config.SlaveLagQuery != "" && !isMaxScale {
err := db.QueryRow(config.Config.SlaveLagQuery).Scan(&instance.SlaveLagSeconds)
if err != nil {
if err := db.QueryRow(config.Config.SlaveLagQuery).Scan(&instance.SlaveLagSeconds); err == nil {
if instance.SlaveLagSeconds.Valid && instance.SlaveLagSeconds.Int64 < 0 {
log.Warningf("Host: %+v, instance.SlaveLagSeconds < 0 [%+v], correcting to 0", instanceKey, instance.SlaveLagSeconds.Int64)
instance.SlaveLagSeconds.Int64 = 0
}
} else {
instance.SlaveLagSeconds = instance.SecondsBehindMaster
logReadTopologyInstanceError(instanceKey, "SlaveLagQuery", err)
}
Expand Down Expand Up @@ -1119,6 +1128,42 @@ func GetClusterOSCSlaves(clusterName string) ([](*Instance), error) {
return result, nil
}

// GetClusterGhostSlaves returns a list of replicas that can serve as the connected servers
// for a [gh-ost](https://github.com/github/gh-ost) operation. A gh-ost operation prefers to talk
// to a RBR replica that has no children.
func GetClusterGhostSlaves(clusterName string) (result [](*Instance), err error) {
condition := `
replication_depth > 0
and binlog_format = 'ROW'
and cluster_name = ?
`
instances, err := readInstancesByCondition(condition, sqlutils.Args(clusterName), "num_slave_hosts asc")
if err != nil {
return result, err
}

for _, instance := range instances {
skipThisHost := false
if instance.IsBinlogServer() {
skipThisHost = true
}
if !instance.IsLastCheckValid {
skipThisHost = true
}
if !instance.LogBinEnabled {
skipThisHost = true
}
if !instance.LogSlaveUpdatesEnabled {
skipThisHost = true
}
if !skipThisHost {
result = append(result, instance)
}
}

return result, err
}

// GetInstancesMaxLag returns the maximum lag in a set of instances
func GetInstancesMaxLag(instances [](*Instance)) (maxLag int64, err error) {
if len(instances) == 0 {
Expand Down
62 changes: 44 additions & 18 deletions go/inst/maintenance_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,19 @@ package inst

import (
"fmt"

"github.com/outbrain/golib/log"
"github.com/outbrain/golib/sqlutils"
"github.com/outbrain/orchestrator/go/config"
"github.com/outbrain/orchestrator/go/db"
"github.com/outbrain/orchestrator/go/process"
)

// ReadActiveMaintenance returns the list of currently active maintenance entries
func ReadActiveMaintenance() ([]Maintenance, error) {
res := []Maintenance{}
query := `
select
select
database_instance_maintenance_id,
hostname,
port,
Expand All @@ -37,7 +39,7 @@ func ReadActiveMaintenance() ([]Maintenance, error) {
maintenance_active,
owner,
reason
from
from
database_instance_maintenance
where
maintenance_active = 1
Expand Down Expand Up @@ -67,24 +69,29 @@ func ReadActiveMaintenance() ([]Maintenance, error) {
}

// BeginBoundedMaintenance will make new maintenance entry for given instanceKey.
func BeginBoundedMaintenance(instanceKey *InstanceKey, owner string, reason string, durationSeconds uint) (int64, error) {
func BeginBoundedMaintenance(instanceKey *InstanceKey, owner string, reason string, durationSeconds uint, explicitlyBounded bool) (int64, error) {
var maintenanceToken int64 = 0
if durationSeconds == 0 {
durationSeconds = config.Config.MaintenanceExpireMinutes * 60
}
res, err := db.ExecOrchestrator(`
insert ignore
into database_instance_maintenance (
hostname, port, maintenance_active, begin_timestamp, end_timestamp, owner, reason
hostname, port, maintenance_active, begin_timestamp, end_timestamp, owner, reason,
processing_node_hostname, processing_node_token, explicitly_bounded
) VALUES (
?, ?, 1, NOW(), NOW() + INTERVAL ? SECOND, ?, ?
?, ?, 1, NOW(), NOW() + INTERVAL ? SECOND, ?, ?,
?, ?, ?
)
`,
instanceKey.Hostname,
instanceKey.Port,
durationSeconds,
owner,
reason,
process.ThisHostname,
process.ProcessToken.Hash,
explicitlyBounded,
)
if err != nil {
return maintenanceToken, log.Errore(err)
Expand All @@ -102,19 +109,19 @@ func BeginBoundedMaintenance(instanceKey *InstanceKey, owner string, reason stri

// BeginMaintenance will make new maintenance entry for given instanceKey. Maintenance time is unbounded
func BeginMaintenance(instanceKey *InstanceKey, owner string, reason string) (int64, error) {
return BeginBoundedMaintenance(instanceKey, owner, reason, 0)
return BeginBoundedMaintenance(instanceKey, owner, reason, 0, false)
}

// EndMaintenanceByInstanceKey will terminate an active maintenance using given instanceKey as hint
func EndMaintenanceByInstanceKey(instanceKey *InstanceKey) error {
res, err := db.ExecOrchestrator(`
update
database_instance_maintenance
set
set
maintenance_active = NULL,
end_timestamp = NOW()
where
hostname = ?
hostname = ?
and port = ?
and maintenance_active = 1
`,
Expand All @@ -138,10 +145,10 @@ func EndMaintenanceByInstanceKey(instanceKey *InstanceKey) error {
func ReadMaintenanceInstanceKey(maintenanceToken int64) (*InstanceKey, error) {
var res *InstanceKey
query := `
select
hostname, port
from
database_instance_maintenance
select
hostname, port
from
database_instance_maintenance
where
database_instance_maintenance_id = ?
`
Expand All @@ -167,11 +174,11 @@ func EndMaintenance(maintenanceToken int64) error {
res, err := db.ExecOrchestrator(`
update
database_instance_maintenance
set
set
maintenance_active = NULL,
end_timestamp = NOW()
where
database_instance_maintenance_id = ?
database_instance_maintenance_id = ?
`,
maintenanceToken,
)
Expand All @@ -196,7 +203,7 @@ func ExpireMaintenance() error {
database_instance_maintenance
where
maintenance_active is null
and end_timestamp < NOW() - INTERVAL ? DAY
and end_timestamp < NOW() - INTERVAL ? DAY
`,
config.Config.MaintenancePurgeDays,
)
Expand All @@ -211,11 +218,11 @@ func ExpireMaintenance() error {
res, err := db.ExecOrchestrator(`
update
database_instance_maintenance
set
maintenance_active = NULL
set
maintenance_active = NULL
where
maintenance_active = 1
and end_timestamp < NOW()
and end_timestamp < NOW()
`,
)
if err != nil {
Expand All @@ -225,6 +232,25 @@ func ExpireMaintenance() error {
AuditOperation("expire-maintenance", nil, fmt.Sprintf("Expired bounded: %d", rowsAffected))
}
}
{
res, err := db.ExecOrchestrator(`
update
database_instance_maintenance
left join node_health on (processing_node_hostname = node_health.hostname AND processing_node_token = node_health.token)
set
database_instance_maintenance.maintenance_active = NULL
where
node_health.last_seen_active IS NULL
and explicitly_bounded = 0
`,
)
if err != nil {
return log.Errore(err)
}
if rowsAffected, _ := res.RowsAffected(); rowsAffected > 0 {
AuditOperation("expire-maintenance", nil, fmt.Sprintf("Expired dead: %d", rowsAffected))
}
}

return nil
}
Loading

0 comments on commit 69145f2

Please sign in to comment.