Skip to content

Commit

Permalink
zb: pick client IPs from a pool, closes project-zot#472
Browse files Browse the repository at this point in the history
Signed-off-by: Petu Eusebiu <[email protected]>
  • Loading branch information
eusebiu-constantin-petu-dbk committed Apr 6, 2022
1 parent 475d97b commit 7998263
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 38 deletions.
8 changes: 6 additions & 2 deletions cmd/zb/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
func NewPerfRootCmd() *cobra.Command {
showVersion := false

var auth, workdir, repo, outFmt string
var auth, workdir, repo, outFmt, srcIPs, srcCIDR string

var concurrency, requests int

Expand Down Expand Up @@ -45,12 +45,16 @@ func NewPerfRootCmd() *cobra.Command {

requests = concurrency * (requests / concurrency)

Perf(workdir, url, auth, repo, concurrency, requests, outFmt)
Perf(workdir, url, auth, repo, concurrency, requests, outFmt, srcIPs, srcCIDR)
},
}

rootCmd.Flags().StringVarP(&auth, "auth-creds", "A", "",
"Use colon-separated BASIC auth creds")
rootCmd.Flags().StringVarP(&srcIPs, "src-ips", "i", "",
"Use colon-separated ips to make requests from, src-ips and src-cidr are mutually exclusive")
rootCmd.Flags().StringVarP(&srcCIDR, "src-cidr", "s", "",
"Use specified cidr to obtain ips to make requests from, src-ips and src-cidr are mutually exclusive")
rootCmd.Flags().StringVarP(&workdir, "working-dir", "d", "",
"Use specified directory to store test data")
rootCmd.Flags().StringVarP(&repo, "repo", "r", "",
Expand Down
147 changes: 111 additions & 36 deletions cmd/zb/perf.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"fmt"
"io/ioutil"
"log"
"math/big"
"net"
"net/http"
"os"
"path"
Expand Down Expand Up @@ -36,6 +38,10 @@ const (
mediumBlob = 10 * MiB
largeBlob = 100 * MiB
cicdFmt = "ci-cd"
httpTimeout = 30 * time.Second
httpKeepAlive = 30 * time.Second
TLSHandshakeTimeout = 10 * time.Second
maxSourceIPs = 1000
)

//nolint:gochecknoglobals // used only in this test
Expand Down Expand Up @@ -225,16 +231,11 @@ func printStats(requests int, summary *statsSummary, outFmt string) {

// test suites/funcs.

type testFunc func(workdir, url, auth, repo string, requests int, config testConfig, statsCh chan statsRecord) error

func GetCatalog(workdir, url, auth, repo string, requests int, config testConfig, statsCh chan statsRecord) error {
client := resty.New()

if auth != "" {
creds := strings.Split(auth, ":")
client.SetBasicAuth(creds[0], creds[1])
}
type testFunc func(workdir, url, repo string, requests int, config testConfig,
statsCh chan statsRecord, client *resty.Client) error

func GetCatalog(workdir, url, repo string, requests int, config testConfig,
statsCh chan statsRecord, client *resty.Client) error {
for count := 0; count < requests; count++ {
func() {
start := time.Now()
Expand Down Expand Up @@ -279,15 +280,8 @@ func GetCatalog(workdir, url, auth, repo string, requests int, config testConfig
return nil
}

func PushMonolithStreamed(workdir, url, auth, trepo string, requests int,
config testConfig, statsCh chan statsRecord) error {
client := resty.New()

if auth != "" {
creds := strings.Split(auth, ":")
client.SetBasicAuth(creds[0], creds[1])
}

func PushMonolithStreamed(workdir, url, trepo string, requests int, config testConfig,
statsCh chan statsRecord, client *resty.Client) error {
for count := 0; count < requests; count++ {
func() {
start := time.Now()
Expand Down Expand Up @@ -322,7 +316,7 @@ func PushMonolithStreamed(workdir, url, auth, trepo string, requests int,
}

// create a new upload
resp, err := resty.R().
resp, err := client.R().
Post(fmt.Sprintf("%s/v2/%s/blobs/uploads/", url, repo))

latency = time.Since(start)
Expand Down Expand Up @@ -383,7 +377,7 @@ func PushMonolithStreamed(workdir, url, auth, trepo string, requests int,
}

// upload image config blob
resp, err = resty.R().
resp, err = client.R().
Post(fmt.Sprintf("%s/v2/%s/blobs/uploads/", url, repo))

latency = time.Since(start)
Expand Down Expand Up @@ -452,7 +446,7 @@ func PushMonolithStreamed(workdir, url, auth, trepo string, requests int,
log.Fatal(err)
}

resp, err = resty.R().
resp, err = client.R().
SetContentLength(true).
SetHeader("Content-Type", "application/vnd.oci.image.manifest.v1+json").
SetBody(content).
Expand All @@ -479,15 +473,8 @@ func PushMonolithStreamed(workdir, url, auth, trepo string, requests int,
return nil
}

func PushChunkStreamed(workdir, url, auth, trepo string, requests int,
config testConfig, statsCh chan statsRecord) error {
client := resty.New()

if auth != "" {
creds := strings.Split(auth, ":")
client.SetBasicAuth(creds[0], creds[1])
}

func PushChunkStreamed(workdir, url, trepo string, requests int, config testConfig,
statsCh chan statsRecord, client *resty.Client) error {
for count := 0; count < requests; count++ {
func() {
start := time.Now()
Expand Down Expand Up @@ -522,7 +509,7 @@ func PushChunkStreamed(workdir, url, auth, trepo string, requests int,
}

// create a new upload
resp, err := resty.R().
resp, err := client.R().
Post(fmt.Sprintf("%s/v2/%s/blobs/uploads/", url, repo))

latency = time.Since(start)
Expand Down Expand Up @@ -607,7 +594,7 @@ func PushChunkStreamed(workdir, url, auth, trepo string, requests int,
}

// upload image config blob
resp, err = resty.R().
resp, err = client.R().
Post(fmt.Sprintf("%s/v2/%s/blobs/uploads/", url, repo))

latency = time.Since(start)
Expand Down Expand Up @@ -721,7 +708,7 @@ func PushChunkStreamed(workdir, url, auth, trepo string, requests int,
log.Fatal(err)
}

resp, err = resty.R().
resp, err = client.R().
SetContentLength(true).
SetHeader("Content-Type", "application/vnd.oci.image.manifest.v1+json").
SetBody(content).
Expand Down Expand Up @@ -794,7 +781,8 @@ var testSuite = []testConfig{ // nolint:gochecknoglobals // used only in this te
},
}

func Perf(workdir, url, auth, repo string, concurrency int, requests int, outFmt string) {
func Perf(workdir, url, auth, repo string, concurrency int, requests int, outFmt string,
srcIPs string, srcCIDR string) {
json := jsoniter.ConfigCompatibleWithStandardLibrary
// logging
log.SetFlags(0)
Expand All @@ -812,6 +800,20 @@ func Perf(workdir, url, auth, repo string, concurrency int, requests int, outFmt
log.Printf("Working dir:\t%v", workdir)
log.Printf("\n")

var err error
// get host ips from command line to make requests from
var ips []string
if len(srcIPs) > 0 {
ips = strings.Split(srcIPs, ",")
} else if len(srcCIDR) > 0 {
ips, err = getIPsFromCIDR(srcCIDR, maxSourceIPs)
if err != nil {
log.Fatal(err) //nolint: gocritic
}
}

fmt.Println(ips)

for _, tconfig := range testSuite {
statsCh := make(chan statsRecord, requests)

Expand All @@ -828,7 +830,12 @@ func Perf(workdir, url, auth, repo string, concurrency int, requests int, outFmt
go func() {
defer wg.Done()

_ = tconfig.tfunc(workdir, url, auth, repo, requests/concurrency, tconfig, statsCh)
httpClient, err := getRandomClientFromIPs(auth, ips)
if err != nil {
log.Fatal(err)
}

_ = tconfig.tfunc(workdir, url, repo, requests/concurrency, tconfig, statsCh, httpClient)
}()
}
wg.Wait()
Expand All @@ -849,11 +856,79 @@ func Perf(workdir, url, auth, repo string, concurrency int, requests int, outFmt
if outFmt == cicdFmt {
jsonOut, err := json.Marshal(cicdSummary)
if err != nil {
log.Fatal(err) //nolint:gocritic // file closed on exit
log.Fatal(err) // file closed on exit
}

if err := ioutil.WriteFile(fmt.Sprintf("%s.json", outFmt), jsonOut, defaultFilePerms); err != nil {
log.Fatal(err)
}
}
}

// getRandomClientFromIPs returns a resty client with a random bind address from ips slice.
func getRandomClientFromIPs(auth string, ips []string) (*resty.Client, error) {
client := resty.New()

if auth != "" {
creds := strings.Split(auth, ":")
client.SetBasicAuth(creds[0], creds[1])
}

// get random ip client
if len(ips) != 0 {
// get random number
nBig, err := rand.Int(rand.Reader, big.NewInt(int64(len(ips))))
if err != nil {
return nil, err
}

// get random ip
ip := ips[nBig.Int64()]

// set ip in transport
localAddr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("%s:0", ip))
if err != nil {
return nil, err
}

transport := &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: httpTimeout,
KeepAlive: httpKeepAlive,
LocalAddr: localAddr,
}).DialContext,
TLSHandshakeTimeout: TLSHandshakeTimeout,
}

client.SetTransport(transport)
}

return client, nil
}

// getIPsFromCIDR returns a list of ips given a cidr.
func getIPsFromCIDR(cidr string, maxIPs int) ([]string, error) {
// nolint:varnamelen
ip, ipnet, err := net.ParseCIDR(cidr)
if err != nil {
return nil, err
}

var ips []string
for ip := ip.Mask(ipnet.Mask); ipnet.Contains(ip) && len(ips) < maxIPs; inc(ip) {
ips = append(ips, ip.String())
}
// remove network address and broadcast address
return ips[1 : len(ips)-1], nil
}

// https://go.dev/play/p/sdzcMvZYWnc
func inc(ip net.IP) {
for j := len(ip) - 1; j >= 0; j-- {
ip[j]++
if ip[j] > 0 {
break
}
}
}

0 comments on commit 7998263

Please sign in to comment.