diff --git a/CHANGELOG.md b/CHANGELOG.md index 5bf1420..7df698e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,2 +1,13 @@ -# Release 0.1.0 -- Initial release \ No newline at end of file +# vNext + +# Release 1.1.0 +- AccountClient 相关 API 使用 appd V3 接口 +- 为异步 API 添加同步版本 +- 为 port 添加创建和更新时间 +- 为 job 添加 deps 域 +- 支持 domain 鉴权 +- 为 ServiceInspect 添加 progress 域 +- 修复BUG: 非 nil 空 slice 未被包含在发送 request body 中 + +# Release 1.0.0 +- Initial release diff --git a/kirksdk/account_api.go b/kirksdk/account_api.go index 4d6744a..92378bb 100644 --- a/kirksdk/account_api.go +++ b/kirksdk/account_api.go @@ -103,7 +103,7 @@ type CreateAppArgs struct { // AccountInfo 包含 Account 的相关信息 type AccountInfo struct { ID uint32 `json:"id"` - Name string `json:"uri"` + Name string `json:"name"` Title string `json:"title"` CreationTime time.Time `json:"ctime"` ModificationTime time.Time `json:"mtime"` diff --git a/kirksdk/account_client.go b/kirksdk/account_client.go index adcaf33..5548126 100644 --- a/kirksdk/account_client.go +++ b/kirksdk/account_client.go @@ -3,64 +3,13 @@ package kirksdk import ( "fmt" "net/http" - "sync" "golang.org/x/net/context" "qiniupkg.com/kirk/kirksdk/mac" "qiniupkg.com/x/rpc.v7" ) -const appVersionPrefix = "/v1" - -type appClient struct { - appURI string - accessKey string - secretKey string - host string - userAgent string - client rpc.Client -} - -func (p *appClient) getInfo(ctx context.Context) (ret AppInfo, err error) { - url := fmt.Sprintf("%s%s/info", p.host, appVersionPrefix) - err = p.client.Call(ctx, &ret, "GET", url) - return -} - -func (p *appClient) listAlertMethods(ctx context.Context) (ret []AlertMethodInfo, err error) { - ret1 := AlertMethods{ - Methods: []AlertMethodInfo{}, - } - - url := fmt.Sprintf("%s%s/alert/methods", p.host, appVersionPrefix) - err = p.client.Call(ctx, &ret1, "GET", url) - ret = ret1.Methods - return -} - -func (p *appClient) getAlertMethod(ctx context.Context, id string) (ret AlertMethodInfo, err error) { - url := fmt.Sprintf("%s%s/alert/methods/%s", p.host, appVersionPrefix, id) - err = p.client.Call(ctx, &ret, "GET", url) - return -} - -func (p *appClient) createAlertMethod(ctx context.Context, args CreateAlertMethodArgs) (ret AlertMethodInfo, err error) { - url := fmt.Sprintf("%s%s/alert/methods", p.host, appVersionPrefix) - err = p.client.CallWithJson(ctx, &ret, "POST", url, args) - return -} - -func (p *appClient) updateAlertMethod(ctx context.Context, id string, args UpdateAlertMethodArgs) (ret AlertMethodInfo, err error) { - url := fmt.Sprintf("%s%s/alert/methods/%s", p.host, appVersionPrefix, id) - err = p.client.CallWithJson(ctx, &ret, "POST", url, args) - return -} - -func (p *appClient) deleteAlertMethod(ctx context.Context, id string) (err error) { - url := fmt.Sprintf("%s%s/alert/methods/%s", p.host, appVersionPrefix, id) - err = p.client.Call(ctx, nil, "DELETE", url) - return -} +const appVersionPrefix = "/v3" type accountClientImp struct { accessKey string @@ -68,17 +17,12 @@ type accountClientImp struct { host string userAgent string client rpc.Client - - transport http.RoundTripper - appsClientMap map[string]*appClient - mapLock *sync.Mutex + transport http.RoundTripper } func NewAccountClient(cfg AccountConfig) AccountClient { p := new(accountClientImp) - p.appsClientMap = make(map[string]*appClient) - p.mapLock = &sync.Mutex{} p.host = cleanHost(cfg.Host) p.transport = cfg.Transport p.userAgent = cfg.UserAgent @@ -96,81 +40,24 @@ func NewAccountClient(cfg AccountConfig) AccountClient { return p } -func (p *accountClientImp) getAppClient(ctx context.Context, appURI string) (ret *appClient, err error) { - p.mapLock.Lock() - defer p.mapLock.Unlock() - - var ok bool - if ret, ok = p.appsClientMap[appURI]; !ok { - ret, err = p.createAppClient(ctx, appURI) - if err != nil { - return - } - - p.appsClientMap[appURI] = ret - } - - return -} - -func (p *accountClientImp) createAppClient(ctx context.Context, appURI string) (ret *appClient, err error) { - keyPairs, err := p.GetAppKeys(ctx, appURI) - if err != nil { - return - } - - var ak, sk string - for _, keyPair := range keyPairs { - if keyPair.State == KeyStateEnabled { - ak = keyPair.AccessKey - sk = keyPair.SecretKey - break - } - } - - if ak == "" { - err = fmt.Errorf("Fail to find keys for app \"%s\"", appURI) - return - } - - t := newKirksdkTransport(p.userAgent, p.transport) - m := mac.New(ak, sk) - c := rpc.Client{mac.NewClient(m, t)} - - return &appClient{ - appURI: appURI, - accessKey: ak, - secretKey: sk, - host: p.host, - userAgent: p.userAgent, - client: c, - }, nil -} - func (p *accountClientImp) GetAccountInfo(ctx context.Context) (ret AccountInfo, err error) { url := fmt.Sprintf("%s%s/info", p.host, appVersionPrefix) err = p.client.Call(ctx, &ret, "GET", url) return } -type createAppRet struct { - AppURI string `json:"appUri"` +type createAppArgsWithName struct { + Name string `json:"name"` + CreateAppArgs } func (p *accountClientImp) CreateApp(ctx context.Context, appName string, args CreateAppArgs) (ret AppInfo, err error) { - var createdURI createAppRet - url := fmt.Sprintf("%s%s/apps/%s", p.host, appVersionPrefix, appName) - err = p.client.CallWithJson(ctx, &createdURI, "POST", url, args) - if err != nil { - return - } - - client, err := p.getAppClient(ctx, createdURI.AppURI) - if err != nil { - return + argsWithName := createAppArgsWithName{ + Name: appName, + CreateAppArgs: args, } - - ret, err = client.getInfo(ctx) + url := fmt.Sprintf("%s%s/apps", p.host, appVersionPrefix) + err = p.client.CallWithJson(ctx, &ret, "POST", url, argsWithName) return } @@ -181,12 +68,9 @@ func (p *accountClientImp) DeleteApp(ctx context.Context, appURI string) (err er } func (p *accountClientImp) GetApp(ctx context.Context, appURI string) (ret AppInfo, err error) { - c, err := p.getAppClient(ctx, appURI) - if err != nil { - return - } - - return c.getInfo(ctx) + url := fmt.Sprintf("%s%s/apps/%s", p.host, appVersionPrefix, appURI) + err = p.client.Call(ctx, &ret, "GET", url) + return } func (p *accountClientImp) GetAppKeys(ctx context.Context, appURI string) (ret []KeyPair, err error) { @@ -196,7 +80,7 @@ func (p *accountClientImp) GetAppKeys(ctx context.Context, appURI string) (ret [ } func (p *accountClientImp) ListApps(ctx context.Context) (ret []AppInfo, err error) { - url := fmt.Sprintf("%s%s/children", p.host, appVersionPrefix) + url := fmt.Sprintf("%s%s/apps", p.host, appVersionPrefix) err = p.client.Call(ctx, &ret, "GET", url) return } @@ -214,54 +98,39 @@ func (p *accountClientImp) GetRegion(ctx context.Context, regionName string) (re } func (p *accountClientImp) ListRegions(ctx context.Context) (ret []RegionInfo, err error) { - url := p.host + appVersionPrefix + "/regions" + url := fmt.Sprintf("%s%s/regions", p.host, appVersionPrefix) err = p.client.Call(ctx, &ret, "GET", url) return } func (p *accountClientImp) CreateAlertMethod(ctx context.Context, appURI string, args CreateAlertMethodArgs) (ret AlertMethodInfo, err error) { - c, err := p.getAppClient(ctx, appURI) - if err != nil { - return - } - - return c.createAlertMethod(ctx, args) + url := fmt.Sprintf("%s%s/apps/%s/alert/methods", p.host, appVersionPrefix, appURI) + err = p.client.CallWithJson(ctx, &ret, "POST", url, args) + return } func (p *accountClientImp) DeleteAlertMethod(ctx context.Context, appURI string, id string) (err error) { - c, err := p.getAppClient(ctx, appURI) - if err != nil { - return - } - - return c.deleteAlertMethod(ctx, id) + url := fmt.Sprintf("%s%s/apps/%s/alert/methods/%s", p.host, appVersionPrefix, appURI, id) + err = p.client.Call(ctx, nil, "DELETE", url) + return } func (p *accountClientImp) GetAlertMethod(ctx context.Context, appURI string, id string) (ret AlertMethodInfo, err error) { - c, err := p.getAppClient(ctx, appURI) - if err != nil { - return - } - - return c.getAlertMethod(ctx, id) + url := fmt.Sprintf("%s%s/apps/%s/alert/methods/%s", p.host, appVersionPrefix, appURI, id) + err = p.client.Call(ctx, &ret, "GET", url) + return } func (p *accountClientImp) ListAlertMethod(ctx context.Context, appURI string) (ret []AlertMethodInfo, err error) { - c, err := p.getAppClient(ctx, appURI) - if err != nil { - return - } - - return c.listAlertMethods(ctx) + url := fmt.Sprintf("%s%s/apps/%s/alert/methods", p.host, appVersionPrefix, appURI) + err = p.client.Call(ctx, &ret, "GET", url) + return } func (p *accountClientImp) UpdateAlertMethod(ctx context.Context, appURI string, id string, args UpdateAlertMethodArgs) (ret AlertMethodInfo, err error) { - c, err := p.getAppClient(ctx, appURI) - if err != nil { - return - } - - return c.updateAlertMethod(ctx, id, args) + url := fmt.Sprintf("%s%s/apps/%s/alert/methods/%s", p.host, appVersionPrefix, appURI, id) + err = p.client.CallWithJson(ctx, &ret, "PUT", url, args) + return } func (p *accountClientImp) GetIndexClient(ctx context.Context) (client IndexClient, err error) { diff --git a/kirksdk/kirksdk.go b/kirksdk/kirksdk.go index 1cc2f32..c0f82bd 100644 --- a/kirksdk/kirksdk.go +++ b/kirksdk/kirksdk.go @@ -1,3 +1,3 @@ package kirksdk -const Version = "1.0.0" +const Version = "1.1.0" diff --git a/kirksdk/qcos_api.go b/kirksdk/qcos_api.go index cd330a6..d960585 100644 --- a/kirksdk/qcos_api.go +++ b/kirksdk/qcos_api.go @@ -13,11 +13,18 @@ type QcosClient interface { ListStacks(ctx context.Context) (ret []StackInfo, err error) // POST /v3/stacks + // Async CreateStack(ctx context.Context, args CreateStackArgs) (err error) + // Sync + SyncCreateStack(ctx context.Context, args CreateStackArgs) (err error) // POST /v3/stacks/ + // Async UpdateStack( ctx context.Context, stackName string, args UpdateStackArgs) (err error) + // Sync + SyncUpdateStack( + ctx context.Context, stackName string, args UpdateStackArgs) (err error) // GET /v3/stacks/ GetStack(ctx context.Context, stackName string) (ret StackInfo, err error) @@ -27,20 +34,27 @@ type QcosClient interface { ctx context.Context, stackName string) (ret CreateStackArgs, err error) // DELETE /v3/stacks/ + // Async DeleteStack(ctx context.Context, stackName string) (err error) // POST /v3/stacks//start + // Sync StartStack(ctx context.Context, stackName string) (err error) // POST /v3/stacks//stop + // Sync StopStack(ctx context.Context, stackName string) (err error) // GET /v3/stacks//services ListServices(ctx context.Context, stackName string) (ret []ServiceInfo, err error) // POST /v3/stacks//services + // Async CreateService( ctx context.Context, stackName string, args CreateServiceArgs) (err error) + // Sync + SyncCreateService( + ctx context.Context, stackName string, args CreateServiceArgs) (err error) // GET /v3/stacks//services//inspect GetServiceInspect(ctx context.Context, @@ -51,39 +65,71 @@ type QcosClient interface { stackName string, serviceName string) (ret ServiceExportInfo, err error) // POST /v3/stacks//services/ + // Async UpdateService(ctx context.Context, stackName string, serviceName string, args UpdateServiceArgs) (err error) + // Sync + SyncUpdateService(ctx context.Context, + stackName string, serviceName string, args UpdateServiceArgs) (err error) // POST /v3/stack//services//deploy + // Async DeployService(ctx context.Context, stackName string, serviceName string, args DeployServiceArgs) (err error) + // Sync + SyncDeployService(ctx context.Context, + stackName string, serviceName string, args DeployServiceArgs) (err error) // POST /v3/stacks//services//scale + // Async ScaleService(ctx context.Context, stackName string, serviceName string, args ScaleServiceArgs) (err error) + // Sync + SyncScaleService(ctx context.Context, + stackName string, serviceName string, args ScaleServiceArgs) (err error) // POST /v3/stacks//services//start + // Sync(status not synced) StartService( ctx context.Context, stackName string, serviceName string) (err error) + // Sync(status synced) + SyncStartService( + ctx context.Context, stackName string, serviceName string) (err error) // POST /v3/stacks//services//stop + // Sync(status not synced) StopService( ctx context.Context, stackName string, serviceName string) (err error) + // Sync(status synced) + SyncStopService( + ctx context.Context, stackName string, serviceName string) (err error) // DELETE /v3/stacks//services/ + // Async DeleteService( ctx context.Context, stackName string, serviceName string) (err error) // POST /v3/stacks//services//volumes + // Async CreateServiceVolume(ctx context.Context, stackName string, serviceName string, args CreateServiceVolumeArgs) (err error) + // Sync + SyncCreateServiceVolume(ctx context.Context, stackName string, + serviceName string, args CreateServiceVolumeArgs) (err error) // POST /v3/stacks//services//volumes//extend + // Async ExtendServiceVolume(ctx context.Context, stackName string, serviceName string, volumeName string, args ExtendVolumeArgs) (err error) + // Sync + SyncExtendServiceVolume(ctx context.Context, stackName string, + serviceName string, volumeName string, args ExtendVolumeArgs) (err error) // DELETE /v3/stacks//services//volumes/ + // Async DeleteServiceVolume(ctx context.Context, stackName string, serviceName string, volumeName string) (err error) + // Sync + SyncDeleteServiceVolume(ctx context.Context, stackName string, serviceName string, volumeName string) (err error) // POST /v3/stacks//services//natip SetServiceNatIP( @@ -105,9 +151,11 @@ type QcosClient interface { StartContainer(ctx context.Context, ip string) (err error) // POST /v3/containers//stop + // Sync StopContainer(ctx context.Context, ip string) (err error) // POST /v3/containers//restart + // Sync RestartContainer(ctx context.Context, ip string) (err error) // POST /v3/containers//commit @@ -335,6 +383,8 @@ type ServiceInfo struct { Status Status `json:"status"` UpdateSpec ServiceSpecExport `json:"updateSpec"` Volumes []VolumeSpec `json:"volumes"` + UpdateProgress int `json:"updateProgress"` + UpdatingProgress int `json:"updatingProgress"` CreatedAt time.Time `json:"createdAt"` UpdatedAt time.Time `json:"updatedAt"` } @@ -481,6 +531,10 @@ type CreateApArgs struct { UnitType string `json:"unitType"` Host string `json:"host"` Title string `json:"title"` + // domain auth + RequireAuth string `json:"requireAuth"` + UIDWhiteList []string `json:"uidWhiteList"` + UIDBlackList []string `json:"uidBlackList"` } type ListApsArgs struct { @@ -529,6 +583,8 @@ type ApPortInfo struct { SessionTmoSec int `json:"sessionTimeoutSec"` ProxyOpts ApProxyOpts `json:"proxyOptions"` HealthCheckOpts ApHealthCheckOpts `json:"healthCheck"` + CreatedAt time.Time `json:"createdAt"` + UpdatedAt time.Time `json:"updatedAt"` Backends []struct { Stack string `json:"stack"` Service string `json:"service"` @@ -542,20 +598,24 @@ type ApPortInfo struct { } type FullApInfo struct { - ApID int `json:"apid"` - Type string `json:"type"` - Title string `json:"title"` - IP string `json:"ip,omitempty"` - Domain string `json:"domain,omitempty"` - Provider string `json:"provider"` - Bandwidth int `json:"bandwidthMbps"` - Traffic int `json:"trafficBytes"` - UserDomains []string `json:"userDomains,omitempty"` - Host string `json:"host,omitempty"` - UnitType string `json:"unitType,omitempty"` - CreatedAt time.Time `json:"createdAt"` - UpdatedAt time.Time `json:"updatedAt"` - Ports []ApPortInfo `json:"ports"` + ApID int `json:"apid"` + Type string `json:"type"` + Title string `json:"title"` + IP string `json:"ip,omitempty"` + Domain string `json:"domain,omitempty"` + Provider string `json:"provider"` + Bandwidth int `json:"bandwidthMbps"` + Traffic int `json:"trafficBytes"` + UserDomains []string `json:"userDomains,omitempty"` + Host string `json:"host,omitempty"` + UnitType string `json:"unitType,omitempty"` + CreatedAt time.Time `json:"createdAt"` + UpdatedAt time.Time `json:"updatedAt"` + // domain auth + RequireAuth string `json:"requireAuth,omitempty"` + UIDWhiteList []string `json:"uidWhiteList,omitempty"` + UIDBlackList []string `json:"uidBlackList,omitempty"` + Ports []ApPortInfo `json:"ports"` } type SetApDescArgs struct { @@ -563,6 +623,10 @@ type SetApDescArgs struct { Host string `json:"host"` Title string `json:"title"` Bandwidth int `json:"bandwidthMbps"` + // domain auth + RequireAuth string `json:"requireAuth"` + UIDWhiteList []string `json:"uidWhiteList"` + UIDBlackList []string `json:"uidBlackList"` } type ApBackendArgs struct { @@ -653,6 +717,7 @@ type JobTaskSpec struct { Envs []string `json:"envs,omitempty"` Hosts []string `json:"hosts,omitempty"` LogCollectors []LogCollectorSpec `json:"logCollectors,omitempty"` + Deps []string `json:"deps,omitempty"` WorkDir string `json:"workDir,omitempty"` InstanceNum int `json:"instanceNum,omitempty"` UnitType string `json:"unitType,omitempty"` diff --git a/kirksdk/qcos_client.go b/kirksdk/qcos_client.go index c582ee6..f96fcc4 100644 --- a/kirksdk/qcos_client.go +++ b/kirksdk/qcos_client.go @@ -3,6 +3,7 @@ package kirksdk import ( "bufio" "crypto/tls" + "errors" "fmt" "io" "io/ioutil" @@ -22,6 +23,7 @@ import ( ) const DefaultStack = "default" +const waitTimeout = 120 * time.Second const MultiStatus = 207 @@ -81,6 +83,19 @@ func (p *qcosClientImp) CreateStack( return } +func (p *qcosClientImp) SyncCreateStack( + ctx context.Context, args CreateStackArgs) (err error) { + err = p.CreateStack(ctx, args) + if err != nil { + return + } + err = p.wait4StackRunning(args.Name, waitTimeout) + if err != nil { + return + } + return +} + // POST /v3/stacks/ func (p *qcosClientImp) UpdateStack(ctx context.Context, stackName string, args UpdateStackArgs) (err error) { @@ -94,6 +109,19 @@ func (p *qcosClientImp) UpdateStack(ctx context.Context, stackName string, return } +func (p *qcosClientImp) SyncUpdateStack(ctx context.Context, stackName string, + args UpdateStackArgs) (err error) { + err = p.UpdateStack(ctx, stackName, args) + if err != nil { + return + } + err = p.wait4StackRunning(stackName, waitTimeout) + if err != nil { + return + } + return +} + // GET /v3/stacks/ func (p *qcosClientImp) GetStack( ctx context.Context, stackName string) (ret StackInfo, err error) { @@ -183,6 +211,19 @@ func (p *qcosClientImp) CreateService( return } +func (p *qcosClientImp) SyncCreateService( + ctx context.Context, stackName string, args CreateServiceArgs) (err error) { + err = p.CreateService(ctx, stackName, args) + if err != nil { + return + } + err = p.wait4ServiceRunning(stackName, args.Name, waitTimeout) + if err != nil { + return + } + return +} + // GET /v3/stacks//services//inspect func (p *qcosClientImp) GetServiceInspect(ctx context.Context, stackName string, serviceName string) (ret ServiceInfo, err error) { @@ -222,6 +263,22 @@ func (p *qcosClientImp) UpdateService(ctx context.Context, stackName string, return } +func (p *qcosClientImp) SyncUpdateService(ctx context.Context, stackName string, + serviceName string, args UpdateServiceArgs) (err error) { + err = p.UpdateService(ctx, stackName, serviceName, args) + if err != nil { + return + } + + if args.ManualUpdate == false { + err = p.wait4ServiceRunning(stackName, serviceName, waitTimeout) + if err != nil { + return + } + } + return +} + // POST /v3/stacks//services//deploy func (p *qcosClientImp) DeployService(ctx context.Context, stackName string, serviceName string, args DeployServiceArgs) (err error) { @@ -235,6 +292,31 @@ func (p *qcosClientImp) DeployService(ctx context.Context, return } +func (p *qcosClientImp) SyncDeployService(ctx context.Context, + stackName string, serviceName string, args DeployServiceArgs) (err error) { + err = p.DeployService(ctx, stackName, serviceName, args) + if err != nil { + return + } + if args.Operation == "ROLLBACK" || args.Operation == "COMPLETE" { + err = p.wait4ServiceRunning(stackName, serviceName, waitTimeout) + if err != nil { + return + } + } + op := strings.Split(args.Operation, " ") + switch op[0] { + case "COMPLETE", "ROLLBACK": + err = p.wait4ServiceRunning(stackName, serviceName, waitTimeout) + if err != nil { + return + } + default: + return + } + return +} + // POST /v3/stacks//services//scale func (p *qcosClientImp) ScaleService(ctx context.Context, stackName string, serviceName string, args ScaleServiceArgs) (err error) { @@ -248,6 +330,19 @@ func (p *qcosClientImp) ScaleService(ctx context.Context, return } +func (p *qcosClientImp) SyncScaleService(ctx context.Context, + stackName string, serviceName string, args ScaleServiceArgs) (err error) { + err = p.ScaleService(ctx, stackName, serviceName, args) + if err != nil { + return + } + err = p.wait4ServiceRunning(stackName, serviceName, waitTimeout) + if err != nil { + return + } + return +} + // POST /v3/stacks//services//start func (p *qcosClientImp) StartService( ctx context.Context, stackName string, serviceName string) (err error) { @@ -261,6 +356,19 @@ func (p *qcosClientImp) StartService( return } +func (p *qcosClientImp) SyncStartService( + ctx context.Context, stackName string, serviceName string) (err error) { + err = p.StartService(ctx, stackName, serviceName) + if err != nil { + return + } + err = p.wait4ServiceRunning(stackName, serviceName, waitTimeout) + if err != nil { + return + } + return +} + // POST /v3/stacks//services//stop func (p *qcosClientImp) StopService( ctx context.Context, stackName string, serviceName string) (err error) { @@ -274,6 +382,19 @@ func (p *qcosClientImp) StopService( return } +func (p *qcosClientImp) SyncStopService( + ctx context.Context, stackName string, serviceName string) (err error) { + err = p.StopService(ctx, stackName, serviceName) + if err != nil { + return + } + err = p.wait4ServiceStopped(stackName, serviceName, waitTimeout) + if err != nil { + return + } + return +} + // DELETE /v3/stacks//services/ func (p *qcosClientImp) DeleteService(ctx context.Context, stackName string, serviceName string) (err error) { @@ -299,6 +420,19 @@ func (p *qcosClientImp) CreateServiceVolume(ctx context.Context, stackName strin return } +func (p *qcosClientImp) SyncCreateServiceVolume(ctx context.Context, stackName string, + serviceName string, args CreateServiceVolumeArgs) (err error) { + err = p.CreateServiceVolume(ctx, stackName, serviceName, args) + if err != nil { + return + } + err = p.wait4ServiceRunning(stackName, serviceName, waitTimeout) + if err != nil { + return + } + return +} + // POST /v3/stacks//services//volumes//extend func (p *qcosClientImp) ExtendServiceVolume(ctx context.Context, stackName string, serviceName string, volumeName string, args ExtendVolumeArgs) (err error) { @@ -312,6 +446,19 @@ func (p *qcosClientImp) ExtendServiceVolume(ctx context.Context, stackName strin return } +func (p *qcosClientImp) SyncExtendServiceVolume(ctx context.Context, stackName string, + serviceName string, volumeName string, args ExtendVolumeArgs) (err error) { + err = p.ExtendServiceVolume(ctx, stackName, serviceName, volumeName, args) + if err != nil { + return + } + err = p.wait4ServiceRunning(stackName, serviceName, waitTimeout) + if err != nil { + return + } + return +} + // DELETE /v3/stacks//services//volumes/ func (p *qcosClientImp) DeleteServiceVolume(ctx context.Context, stackName string, serviceName string, volumeName string) (err error) { if stackName == "" { @@ -323,6 +470,18 @@ func (p *qcosClientImp) DeleteServiceVolume(ctx context.Context, stackName strin return } +func (p *qcosClientImp) SyncDeleteServiceVolume(ctx context.Context, stackName string, serviceName string, volumeName string) (err error) { + err = p.DeleteServiceVolume(ctx, stackName, serviceName, volumeName) + if err != nil { + return + } + err = p.wait4ServiceRunning(stackName, serviceName, waitTimeout) + if err != nil { + return + } + return +} + //POST /v3/stacks//services//natip func (p *qcosClientImp) SetServiceNatIP(ctx context.Context, stackName string, serviceName string, args SetServiceNatIPArgs) (err error) { if stackName == "" { @@ -979,7 +1138,7 @@ func (p *qcosClientImp) CreateJob(ctx context.Context, args CreateJobArgs) (err // POST /v3/jobs/ func (p *qcosClientImp) UpdateJob(ctx context.Context, name string, args UpdateJobArgs) (err error) { url := fmt.Sprintf("%s/v3/jobs/%s", p.host, name) - err = p.client.Call(ctx, nil, "POST", url) + err = p.client.CallWithJson(ctx, nil, "POST", url, args) return } @@ -1092,3 +1251,128 @@ func (p *qcosClientImp) GetContainerAlert(ctx context.Context, ip string, level err = p.client.Call(ctx, &ret, "GET", url) return } + +func (p *qcosClientImp) wait4StackRunning(stackName string, timeout time.Duration) (err error) { + if stackName == "" { + stackName = DefaultStack + } + + done := make(chan struct{}) + + go func() { + ctx, _ := context.WithTimeout(context.Background(), timeout) + for { + stackInfo, err := p.GetStack(ctx, stackName) + if err != nil { + if err.Error() == "context deadline exceeded" { + break + } + } else if err == nil && stackInfo.IsDeployed == true && stackInfo.Status == "RUNNING" { + runningNum := 0 + for _, svcName := range stackInfo.Services { + err = p.wait4ServiceRunning(stackName, svcName, timeout) + if err != nil { + break + } + runningNum += 1 + } + if runningNum == len(stackInfo.Services) { + done <- struct{}{} + break + } + } + time.Sleep(time.Second) + } + }() + + select { + case <-time.After(timeout): + return errors.New("Timeout in wait4StackRunning") + case <-done: + return nil + } +} + +func (p *qcosClientImp) wait4ServiceRunning(stackName string, serviceName string, timeout time.Duration) (err error) { + if stackName == "" { + stackName = DefaultStack + } + + done := make(chan struct{}) + + go func() { + ctx, _ := context.WithTimeout(context.Background(), timeout) + for { + svcInfo, err := p.GetServiceInspect(ctx, stackName, serviceName) + if err != nil { + if err.Error() == "context deadline exceeded" { + break + } + } else if err == nil && svcInfo.State == "DEPLOYED" && svcInfo.Status == "RUNNING" { + runningNum := 0 + for _, contIp := range svcInfo.ContainerIPs { + ctx, _ := context.WithTimeout(context.Background(), timeout) + contInfo, err := p.GetContainerInspect(ctx, contIp) + if err != nil || contInfo.Status != "RUNNING" { + break + } + runningNum += 1 + } + if runningNum == len(svcInfo.ContainerIPs) { + done <- struct{}{} + break + } + } + time.Sleep(time.Second) + } + }() + + select { + case <-time.After(timeout): + return errors.New("Timeout in wait4ServiceRunning") + case <-done: + return nil + } +} + +func (p *qcosClientImp) wait4ServiceStopped(stackName string, serviceName string, timeout time.Duration) (err error) { + if stackName == "" { + stackName = DefaultStack + } + + done := make(chan struct{}) + + go func() { + ctx, _ := context.WithTimeout(context.Background(), timeout) + for { + svcInfo, err := p.GetServiceInspect(ctx, stackName, serviceName) + if err != nil { + if err.Error() == "context deadline exceeded" { + break + } + } else if err == nil && svcInfo.State == "STOPPED" && svcInfo.Status == "NOT-RUNNING" { + runningNum := 0 + for _, contIp := range svcInfo.ContainerIPs { + ctx, _ := context.WithTimeout(context.Background(), timeout) + contInfo, err := p.GetContainerInspect(ctx, contIp) + if err != nil || contInfo.Status != "EXITED" { + break + } + runningNum += 1 + } + if runningNum == len(svcInfo.ContainerIPs) { + done <- struct{}{} + break + } + } + time.Sleep(time.Second) + } + }() + + select { + case <-time.After(timeout): + return errors.New("Timeout in wait4ServiceRunning") + case <-done: + return nil + } +} diff --git a/kirksdk/qcos_client_test.go b/kirksdk/qcos_client_test.go index 64fb05d..afbebba 100644 --- a/kirksdk/qcos_client_test.go +++ b/kirksdk/qcos_client_test.go @@ -551,3 +551,75 @@ func TestServices(t *testing.T) { assert.NoError(t, err) assert.Equal(t, expectedRet, info) } + +func TestUpdateService(t *testing.T) { + expectedUrl := "/v3/stacks/stack/services/service" + expectedMethod := "POST" + expectedBody := `{"manualUpdate":false,"metadata":["a=a1","b=b2"],"spec":{"command":[],"entryPoint":[],"envs":[],"hosts":[],"logCollectors":[],"gpuUUIDs":[],"autoRestart":"always","image":"testimage","stopGraceSec":1,"workDir":"testdir","unitType":"testunittype"},"updateParallelism":2}` + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + assert.Equal(t, expectedUrl, r.URL.Path) + assert.Equal(t, expectedMethod, r.Method) + body, err := ioutil.ReadAll(r.Body) + assert.NoError(t, err) + assert.Equal(t, expectedBody, string(body)) + fmt.Fprintln(w, "") + })) + defer ts.Close() + + client := NewQcosClient(QcosConfig{ + Host: ts.URL, + }) + + args := UpdateServiceArgs{ + ManualUpdate: false, + Metadata: []string{"a=a1", "b=b2"}, + Spec: ServiceSpec{ + AutoRestart: "always", + Command: []string{}, + EntryPoint: []string{}, + Envs: []string{}, + Hosts: []string{}, + Image: "testimage", + LogCollectors: []LogCollectorSpec{}, + StopGraceSec: 1, + WorkDir: "testdir", + UnitType: "testunittype", + GpuUUIDs: []string{}, + }, + UpdateParallelism: 2, + } + + err := client.UpdateService(context.TODO(), "stack", "service", args) + assert.NoError(t, err) +} + +func TestUpdateJob(t *testing.T) { + expectedUrl := "/v3/jobs/default" + expectedMethod := "POST" + expectedBody := `{"metadata":[],"runAt":"testrunat","timeout":3000,"mode":"testmode"}` + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + assert.Equal(t, expectedUrl, r.URL.Path) + assert.Equal(t, expectedMethod, r.Method) + body, err := ioutil.ReadAll(r.Body) + assert.NoError(t, err) + assert.Equal(t, expectedBody, string(body)) + fmt.Fprintln(w, "") + })) + defer ts.Close() + + client := NewQcosClient(QcosConfig{ + Host: ts.URL, + }) + + args := UpdateJobArgs{ + Spec: map[string]JobTaskSpec{}, + Metadata: []string{}, + RunAt: "testrunat", + Timeout: 3000, + Mode: "testmode", + } + err := client.UpdateJob(context.TODO(), "default", args) + assert.NoError(t, err) +} diff --git a/kirksdk/qcos_json.go b/kirksdk/qcos_json.go new file mode 100644 index 0000000..9306ecc --- /dev/null +++ b/kirksdk/qcos_json.go @@ -0,0 +1,95 @@ +package kirksdk + +import ( + "encoding/json" +) + +func (p ServiceSpec) MarshalJSON() ([]byte, error) { + type Alias ServiceSpec + return json.Marshal(&struct { + Command *[]string `json:"command,omitempty"` + EntryPoint *[]string `json:"entryPoint,omitempty"` + Envs *[]string `json:"envs,omitempty"` + Hosts *[]string `json:"hosts,omitempty"` + LogCollectors *[]LogCollectorSpec `json:"logCollectors,omitempty"` + GpuUUIDs *[]string `json:"gpuUUIDs,omitempty"` + *Alias + }{ + Command: stringS2P(p.Command), + EntryPoint: stringS2P(p.EntryPoint), + Envs: stringS2P(p.Envs), + Hosts: stringS2P(p.Hosts), + LogCollectors: logCollectorSpecS2P(p.LogCollectors), + GpuUUIDs: stringS2P(p.GpuUUIDs), + Alias: (*Alias)(&p), + }) +} + +func (p JobTaskSpec) MarshalJSON() ([]byte, error) { + type Alias JobTaskSpec + return json.Marshal(&struct { + Command *[]string `json:"command,omitempty"` + EntryPoint *[]string `json:"entryPoint,omitempty"` + Envs *[]string `json:"envs,omitempty"` + Hosts *[]string `json:"hosts,omitempty"` + LogCollectors *[]LogCollectorSpec `json:"logCollectors,omitempty"` + *Alias + }{ + Command: stringS2P(p.Command), + EntryPoint: stringS2P(p.EntryPoint), + Envs: stringS2P(p.Envs), + Hosts: stringS2P(p.Hosts), + LogCollectors: logCollectorSpecS2P(p.LogCollectors), + Alias: (*Alias)(&p), + }) +} + +func (p UpdateJobArgs) MarshalJSON() ([]byte, error) { + type Alias UpdateJobArgs + return json.Marshal(&struct { + Metadata *[]string `json:"metadata,omitempty"` + *Alias + }{ + Metadata: stringS2P(p.Metadata), + Alias: (*Alias)(&p), + }) +} + +func (p JobTaskSpecEx) MarshalJSON() ([]byte, error) { + type Alias JobTaskSpecEx + return json.Marshal(&struct { + LogCollectors *[]LogCollectorSpec `json:"logCollectors,omitempty"` + Command *[]string `json:"command,omitempty"` + EntryPoint *[]string `json:"entryPoint,omitempty"` + Envs *[]string `json:"envs,omitempty"` + Hosts *[]string `json:"hosts,omitempty"` + Deps *[]string `json:"deps,omitempty"` + *Alias + }{ + LogCollectors: logCollectorSpecS2P(p.LogCollectors), + Command: stringS2P(p.Command), + EntryPoint: stringS2P(p.EntryPoint), + Envs: stringS2P(p.Envs), + Hosts: stringS2P(p.Hosts), + Deps: stringS2P(p.Deps), + Alias: (*Alias)(&p), + }) +} + +//----------------------------------------------------------------- + +func stringS2P(s []string) *[]string { + if s == nil { + return nil + } + + return &s +} + +func logCollectorSpecS2P(s []LogCollectorSpec) *[]LogCollectorSpec { + if s == nil { + return nil + } + + return &s +} diff --git a/kirksdk/qcos_json_test.go b/kirksdk/qcos_json_test.go new file mode 100644 index 0000000..ac678ae --- /dev/null +++ b/kirksdk/qcos_json_test.go @@ -0,0 +1,102 @@ +package kirksdk + +import ( + "encoding/json" + "fmt" + "github.com/stretchr/testify/assert" + "testing" +) + +func TestServiceSpecMarshal(t *testing.T) { + spec := ServiceSpec{ + AutoRestart: "always", + Command: []string{}, + EntryPoint: nil, + Envs: []string{"a=a1", "b=b1"}, + // Host: nil + Image: "Image", + LogCollectors: []LogCollectorSpec{}, + StopGraceSec: 1, + WorkDir: "~/", + UnitType: "unittype", + GpuUUIDs: []string{}, + } + + testMarshal( + t, + spec, + `{"command":[],"envs":["a=a1","b=b1"],"logCollectors":[],"gpuUUIDs":[],"autoRestart":"always","image":"Image","stopGraceSec":1,"workDir":"~/","unitType":"unittype"}`, + ) +} + +func TestJobTaskSpecMarshal(t *testing.T) { + spec := JobTaskSpec{ + Image: "testimage", + Command: nil, + EntryPoint: []string{}, + Envs: []string{}, + Hosts: []string{}, + LogCollectors: []LogCollectorSpec{}, + WorkDir: "testworkdir", + InstanceNum: 2, + UnitType: "testunittype", + } + + testMarshal( + t, + spec, + `{"entryPoint":[],"envs":[],"hosts":[],"logCollectors":[],"image":"testimage","workDir":"testworkdir","instanceNum":2,"unitType":"testunittype"}`, + ) +} + +func TestUpdateJobArgsMarshal(t *testing.T) { + args := UpdateJobArgs{ + Spec: map[string]JobTaskSpec{}, + Metadata: []string{}, + RunAt: "testrunat", + Timeout: 3000, + Mode: "testmode", + } + + testMarshal( + t, + args, + `{"metadata":[],"runAt":"testrunat","timeout":3000,"mode":"testmode"}`, + ) +} + +func TestJobTaskSpecExMarshal(t *testing.T) { + specEx := JobTaskSpecEx{ + WorkDir: "", + LogCollectors: []LogCollectorSpec{ + LogCollectorSpec{ + Directory: "testdir1", + Patterns: []string{"p1", "p2"}, + }, + }, + Command: []string{}, + EntryPoint: []string{}, + Envs: []string{}, + Hosts: []string{}, + UnitType: "testunittype", + Deps: []string{}, + InstanceNum: 3, + } + + testMarshal( + t, + &specEx, + `{"logCollectors":[{"directory":"testdir1","patterns":["p1","p2"]}],"command":[],"entryPoint":[],"envs":[],"hosts":[],"deps":[],"unitType":"testunittype","instanceNum":3}`, + ) +} + +func testMarshal(t *testing.T, input interface{}, expected string) { + bytes, err := json.Marshal(input) + assert.Nil(t, err) + + actual := string(bytes) + + fmt.Println(actual) + + assert.Equal(t, expected, actual) +}