Skip to content

Commit

Permalink
Merge pull request #17 from pwillie/feature/restructure
Browse files Browse the repository at this point in the history
restructure and update - note: breaking change
  • Loading branch information
pwillie authored Oct 16, 2018
2 parents 616b927 + 1f2a3a6 commit 071d0ee
Show file tree
Hide file tree
Showing 77 changed files with 5,028 additions and 1,798 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
bin
coverage.out
51 changes: 36 additions & 15 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,11 @@
[[constraint]]
name = "gopkg.in/olivere/elastic.v6"
version = "6.2.10"

[[constraint]]
name = "github.com/TV4/graceful"
version = "0.3.3"

[[constraint]]
name = "github.com/gorilla/handlers"
version = "1.4.0"
9 changes: 6 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ help:
build:
@echo "building ${BIN_NAME} ${VERSION}"
@echo "GOPATH=${GOPATH}"
go build -ldflags "-X main.GitCommit=${GIT_COMMIT}${GIT_DIRTY} -X main.VersionPrerelease=DEV" -o bin/${BIN_NAME}
go build -ldflags "-X main.GitCommit=${GIT_COMMIT}${GIT_DIRTY} -X main.VersionPrerelease=DEV" -o bin/${BIN_NAME} cmd/adapter/*.go

build-alpine:
@echo "building ${BIN_NAME} ${VERSION}"
@echo "GOPATH=${GOPATH}"
go build -ldflags '-w -extldflags "-static" -X main.Version=${VERSION} -X main.GitCommit=${GIT_COMMIT}${GIT_DIRTY} -X main.VersionPrerelease=VersionPrerelease=RC' -o bin/${BIN_NAME}
go build -ldflags '-w -extldflags "-static" -X main.Version=${VERSION} -X main.GitCommit=${GIT_COMMIT}${GIT_DIRTY} -X main.VersionPrerelease=VersionPrerelease=RC' -o bin/${BIN_NAME} cmd/adapter/*.go

package:
@echo "building image ${BIN_NAME} ${VERSION} $(GIT_COMMIT)"
Expand All @@ -52,5 +52,8 @@ clean:
@test ! -e bin/${BIN_NAME} || rm bin/${BIN_NAME}

test:
go test $(glide nv)
go vet ./...
go test -cover -coverprofile=coverage.out -v ./...

watch:
watcher -run github.com/pwillie/prometheus-es-adapter/cmd/adapter -watch github.com/pwillie/prometheus-es-adapter
32 changes: 23 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,28 @@

A read and write adapter for prometheus persistent storage

## Types
## Config

| Env Variables | Default | Description |
| ----------------- | --------------------- | ------------------------------------------------------------------ |
| ES_URL | http://localhost:9200 | Elasticsearch URL |
| ES_USER | | Elasticsearch User |
| ES_PASSWORD | | Elasticsearch User Password |
| ES_WORKERS | 0 | Number of batch workers |
| ES_BATCH_COUNT | 1000 | Max items for bulk Elasticsearch insert operation |
| ES_BATCH_SIZE | 4096 | Max size in bytes for bulk Elasticsearch insert operation |
| ES_BATCH_INTERVAL | 10 | Max period in seconds between bulk Elasticsearch insert operations |
| ES_WORKERS | 1 | Number of batch workers |
| ES_BATCH_MAX_AGE | 10 | Max period in seconds between bulk Elasticsearch insert operations |
| ES_BATCH_MAX_DOCS | 1000 | Max items for bulk Elasticsearch insert operation |
| ES_BATCH_MAX_SIZE | 4096 | Max size in bytes for bulk Elasticsearch insert operation |
| ES_ALIAS | prom-metrics | Elasticsearch alias pointing to active write index |
| ES_INDEX_SHARDS | 5 | Number of Elasticsearch shards to create per index |
| ES_INDEX_REPLICAS | 1 | Number of Elasticsearch replicas to create per index |
| ES_INDEX_MAX_AGE | 7d | Max age of Elasticsearch index before rollover |
| ES_INDEX_MAX_DOCS | 1000000 | Max documents in Elasticsearch index before rollover |
| ES_SEARCH_MAX_DOCS | 1000 | Max documents returned by Elasticsearch search operations |
| ES_INDEX_MAX_DOCS | 1000000 | Max number of docs in Elasticsearch index before rollover |
| ES_INDEX_MAX_SIZE | | Max size of index before rollover eg 5gb |
| ES_SEARCH_MAX_DOCS | 1000 | Max number of docs returned for Elasticsearch search operation |
| ES_SNIFF | false | Enable Elasticsearch sniffing |
| STATS | true | Expose Prometheus metrics endpoint |
| LISTEN | :8080 | TCP network address to start http listener on |
| VERSION | | Display version and exit |
| VERSION | false | Display version and exit |
| DEBUG | false | Display extra debug logs |

## Requirements

Expand All @@ -45,3 +49,13 @@ $ ./bin/prometheus-es-adapter
### Testing

`make test`

#### e2e

To run end to end tests using docker-compose, from the "test" directory:
```
docker-compose up -d
docker-compose ps
docker-compose up -d --build prometheus-es-adapter
docker-compose logs -f prometheus-es-adapter
```
114 changes: 114 additions & 0 deletions cmd/adapter/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package main

import (
"context"
"fmt"
"net/http"

"go.uber.org/zap"

"github.com/TV4/graceful"
gorilla "github.com/gorilla/handlers"
"github.com/namsral/flag"
"github.com/pwillie/prometheus-es-adapter/pkg/elasticsearch"
"github.com/pwillie/prometheus-es-adapter/pkg/handlers"
"github.com/pwillie/prometheus-es-adapter/pkg/logger"
elastic "gopkg.in/olivere/elastic.v6"
)

func main() {
var (
url = flag.String("es_url", "http://localhost:9200", "Elasticsearch URL.")
user = flag.String("es_user", "", "Elasticsearch User.")
pass = flag.String("es_password", "", "Elasticsearch User Password.")
workers = flag.Int("es_workers", 1, "Number of batch workers.")
batchMaxAge = flag.Int("es_batch_max_age", 10, "Max period in seconds between bulk Elasticsearch insert operations")
batchMaxDocs = flag.Int("es_batch_max_docs", 1000, "Max items for bulk Elasticsearch insert operation")
batchMaxSize = flag.Int("es_batch_max_size", 4096, "Max size in bytes for bulk Elasticsearch insert operation")
indexAlias = flag.String("es_alias", "prom-metrics", "Elasticsearch alias pointing to active write index")
indexShards = flag.Int("es_index_shards", 5, "Number of Elasticsearch shards to create per index")
indexReplicas = flag.Int("es_index_replicas", 1, "Number of Elasticsearch replicas to create per index")
indexMaxAge = flag.String("es_index_max_age", "7d", "Max age of Elasticsearch index before rollover")
indexMaxDocs = flag.Int64("es_index_max_docs", 1000000, "Max number of docs in Elasticsearch index before rollover")
indexMaxSize = flag.String("es_index_max_size", "", "Max size of index before rollover eg 5gb")
searchMaxDocs = flag.Int("es_search_max_docs", 1000, "Max number of docs returned for Elasticsearch search operation")
sniffEnabled = flag.Bool("es_sniff", false, "Enable Elasticsearch sniffing")
statsEnabled = flag.Bool("stats", true, "Expose Prometheus metrics endpoint")
versionFlag = flag.Bool("version", false, "Version")
debug = flag.Bool("debug", false, "Debug logging")
)
flag.Parse()

log := logger.NewLogger(*debug)

if *versionFlag {
fmt.Println("Git Commit:", GitCommit)
fmt.Println("Version:", Version)
if VersionPrerelease != "" {
fmt.Println("Version PreRelease:", VersionPrerelease)
}
return
}

log.Info(fmt.Sprintf("Starting commit: %+v, version: %+v, prerelease: %+v",
GitCommit, Version, VersionPrerelease))

if *url == "" {
log.Fatal("missing url")
}

ctx := context.TODO()

client, err := elastic.NewClient(
elastic.SetURL(*url),
elastic.SetBasicAuth(*user, *pass),
elastic.SetSniff(*sniffEnabled),
)
if err != nil {
log.Fatal("Failed to create elastic client", zap.Error(err))
}
defer client.Stop()

indexCfg := &elasticsearch.IndexConfig{
Alias: *indexAlias,
MaxAge: *indexMaxAge,
MaxDocs: *indexMaxDocs,
MaxSize: *indexMaxSize,
Shards: *indexShards,
Replicas: *indexReplicas,
}
_, err = elasticsearch.NewIndexService(ctx, log, client, indexCfg)
if err != nil {
log.Fatal("Failed to create indexer", zap.Error(err))
}

readCfg := &elasticsearch.ReadConfig{
Alias: *indexAlias,
MaxDocs: *searchMaxDocs,
}
readSvc := elasticsearch.NewReadService(log, client, readCfg)

writeCfg := &elasticsearch.WriteConfig{
Alias: *indexAlias,
MaxAge: *batchMaxAge,
MaxDocs: *batchMaxDocs,
MaxSize: *batchMaxSize,
Workers: *workers,
Stats: *statsEnabled,
}
writeSvc, err := elasticsearch.NewWriteService(ctx, log, client, writeCfg)
if err != nil {
log.Fatal("Unable to create elasticsearch adapter:", zap.Error(err))
}
defer writeSvc.Close()

router := handlers.NewRouter(writeSvc, readSvc)

graceful.ListenAndServe(&http.Server{
Addr: ":8000",
Handler: gorilla.RecoveryHandler(gorilla.PrintRecoveryStack(true))(
gorilla.CompressHandler(router),
),
})
// TODO: graceful shutdown of bulk processor
}
File renamed without changes.
Loading

0 comments on commit 071d0ee

Please sign in to comment.