Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement local cache for crowdsec #33

Closed
wants to merge 11 commits into from
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,6 @@ tmp/
.idea/

!.idea/runConfigurations/
!.idea/rest-api.http
!.idea/rest-api.http

traefik-crowdsec-bouncer
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,18 @@ A http service to verify request and bounce them according to decisions made by
# Description
This repository aim to implement a [CrowdSec](https://doc.crowdsec.net/) bouncer for the router [Traefik](https://doc.traefik.io/traefik/) to block malicious IP to access your services.
For this it leverages [Traefik v2 ForwardAuth middleware](https://doc.traefik.io/traefik/middlewares/http/forwardauth/) and query CrowdSec with client IP.

It can operate with 3 modes:
- None
If the client IP is on ban list, it will get a http code 403 response. Otherwise, request will continue as usual.
All request
maxlerebourg marked this conversation as resolved.
Show resolved Hide resolved

- Live
If the client IP is on ban list, it will get a http code 403 response. Otherwise, request will continue as usual.
The bouncer can leverage use of a [local cache](https://github.com/patrickmn/go-cache) in order to reduce the number of requests made to the crowdsec local API. It will keep in cache the status for each IP that makes queries.

- Stream
Streaming mode allows you to keep in the local cache only the Banned IPs, every requests that does not hit the cache is authorized. Every minute, the cache is updated with news from the Local API using [go-cron](https://github.com/robfig/cron) library.

# Demo
## Prerequisites
Expand Down Expand Up @@ -58,6 +69,9 @@ The webservice configuration is made via environment variables:
* `CROWDSEC_AGENT_HOST` - Host and port of CrowdSec agent, i.e. crowdsec-agent:8080 (required)`
* `CROWDSEC_BOUNCER_SCHEME` - Scheme to query CrowdSec agent. Expected value: http, https. Default to http`
* `CROWDSEC_BOUNCER_LOG_LEVEL` - Minimum log level for bouncer. Expected value [zerolog levels](https://pkg.go.dev/github.com/rs/zerolog#readme-leveled-logging). Default to 1
* `CROWDSEC_BOUNCER_STREAM_INTERVAL` - Configure delay between each call to pull decisions in stream cache mode. Default to "1m"
* `CROWDSEC_BOUNCER_CACHE_MODE` - (live, stream, none) Enable cache mode to pull decisions from the LAPI. Default to "none"
* `CROWDSEC_DEFAULT_CACHE_DURATION` - Configure default duration of the cached data. Default to "5m"
maxlerebourg marked this conversation as resolved.
Show resolved Hide resolved
* `CROWDSEC_BOUNCER_BAN_RESPONSE_CODE` - HTTP code to respond in case of ban. Default to 403
* `CROWDSEC_BOUNCER_BAN_RESPONSE_MSG` - HTTP body as message to respond in case of ban. Default to Forbidden
* `PORT` - Change listening port of web server. Default listen on 8080
Expand Down
6 changes: 5 additions & 1 deletion bouncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,17 @@ package main

import (
"os"
"strings"

. "github.com/fbonalair/traefik-crowdsec-bouncer/config"
"github.com/fbonalair/traefik-crowdsec-bouncer/controler"
"github.com/gin-contrib/logger"
"github.com/gin-gonic/gin"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"strings"
)

var crowdsecBouncerCacheMode = OptionalEnv("CROWDSEC_BOUNCER_CACHE_MODE", "none")
var logLevel = OptionalEnv("CROWDSEC_BOUNCER_LOG_LEVEL", "1")
var trustedProxiesList = strings.Split(OptionalEnv("TRUSTED_PROXIES", "0.0.0.0/0"), ",")

Expand Down Expand Up @@ -58,6 +59,9 @@ func setupRouter() (*gin.Engine, error) {
router.Use(logger.SetLogger(
logger.WithSkipPath([]string{"/api/v1/ping", "/api/v1/healthz"}),
))
if crowdsecBouncerCacheMode == "stream" {
go controler.HandleStreamCache("true")
}
router.GET("/api/v1/ping", controler.Ping)
router.GET("/api/v1/healthz", controler.Healthz)
router.GET("/api/v1/forwardAuth", controler.ForwardAuth)
Expand Down
16 changes: 16 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"log"
"os"
"strconv"
"time"
)

/*
Expand Down Expand Up @@ -62,4 +63,19 @@ func ValidateEnv() {
if parsedCode < 100 || parsedCode > 599 {
log.Fatalf("The value for env var %s should be a valid http response code between 100 and 599 included.", "CROWDSEC_BOUNCER_BAN_RESPONSE_CODE")
}
cacheMode := OptionalEnv("CROWDSEC_BOUNCER_CACHE_MODE", "none")
if !contains([]string{"none", "live", "stream"}, cacheMode) {
log.Fatalf("Cache mode must be one of 'none', 'stream' or 'live'")
}
cacheStreamInterval := OptionalEnv("CROWDSEC_BOUNCER_CACHE_STREAM_INTERVAL", "1m")
duration, err := time.ParseDuration(cacheStreamInterval)
if err != nil && duration.Seconds() < 3600 {
log.Fatalf("Cache stream interval provided is not valid")
}
defaultCacheDuration := OptionalEnv("CROWDSEC_DEFAULT_CACHE_DURATION", "5m")
duration2, err := time.ParseDuration(defaultCacheDuration)
if err != nil && duration2.Seconds() < 3600{
log.Fatalf("Cache default duration provided is not valid")
}

}
160 changes: 126 additions & 34 deletions controler/controler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,54 +4,58 @@ import (
"bytes"
"encoding/json"
"fmt"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
"io"
"io/ioutil"
"net/http"
"net/url"
"strconv"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/coocood/freecache"
. "github.com/fbonalair/traefik-crowdsec-bouncer/config"
"github.com/fbonalair/traefik-crowdsec-bouncer/model"
"github.com/gin-gonic/gin"
"github.com/rs/zerolog/log"
)

const (
realIpHeader = "X-Real-Ip"
forwardHeader = "X-Forwarded-For"
crowdsecAuthHeader = "X-Api-Key"
crowdsecBouncerRoute = "v1/decisions"
healthCheckIp = "127.0.0.1"
realIpHeader = "X-Real-Ip"
forwardHeader = "X-Forwarded-For"
crowdsecAuthHeader = "X-Api-Key"
crowdsecBouncerRoute = "v1/decisions"
crowdsecBouncerStreamRoute = "v1/decisions/stream"
healthCheckIp = "127.0.0.1"
)

var crowdsecBouncerApiKey = RequiredEnv("CROWDSEC_BOUNCER_API_KEY")
var crowdsecBouncerHost = RequiredEnv("CROWDSEC_AGENT_HOST")
var crowdsecBouncerCacheMode = OptionalEnv("CROWDSEC_BOUNCER_CACHE_MODE", "none") // Validated via ValidateEnv()
var crowdsecBouncerCacheStreamInterval, _ = time.ParseDuration(OptionalEnv("CROWDSEC_BOUNCER_CACHE_STREAM_INTERVAL", "1m")) // Validated via ValidateEnv()
var crowdsecBouncerDefaultCacheDuration, _ = time.ParseDuration(OptionalEnv("CROWDSEC_DEFAULT_CACHE_DURATION", "5m")) // Validated via ValidateEnv()
var crowdsecBouncerScheme = OptionalEnv("CROWDSEC_BOUNCER_SCHEME", "http")
var crowdsecBanResponseCode, _ = strconv.Atoi(OptionalEnv("CROWDSEC_BOUNCER_BAN_RESPONSE_CODE", "403")) // Validated via ValidateEnv()
var crowdsecBanResponseMsg = OptionalEnv("CROWDSEC_BOUNCER_BAN_RESPONSE_MSG", "Forbidden")
var (
ipProcessed = promauto.NewCounter(prometheus.CounterOpts{
Name: "crowdsec_traefik_bouncer_processed_ip_total",
Help: "The total number of processed IP",
})
)

var cache = freecache.NewCache(100 * 1024 * 1024)
var ipProcessed = promauto.NewCounter(prometheus.CounterOpts{
Name: "crowdsec_traefik_bouncer_processed_ip_total",
Help: "The total number of processed IP",
})
var client = &http.Client{
Transport: &http.Transport{
MaxIdleConns: 10,
IdleConnTimeout: 30 * time.Second,
},
Timeout: 5 * time.Second,
Timeout: 10 * time.Second,
}

/**
Call Crowdsec local IP and with realIP and return true if IP does NOT have a ban decisions.
maxlerebourg marked this conversation as resolved.
Show resolved Hide resolved
*/
func isIpAuthorized(clientIP string) (bool, error) {
func getBanDuration(clientIP string) (int, error) {
maxlerebourg marked this conversation as resolved.
Show resolved Hide resolved
// Generating crowdsec API request
decisionUrl := url.URL{
Scheme: crowdsecBouncerScheme,
Expand All @@ -61,7 +65,7 @@ func isIpAuthorized(clientIP string) (bool, error) {
}
req, err := http.NewRequest(http.MethodGet, decisionUrl.String(), nil)
if err != nil {
return false, err
return 0, err
}
req.Header.Add(crowdsecAuthHeader, crowdsecBouncerApiKey)
log.Debug().
Expand All @@ -71,11 +75,8 @@ func isIpAuthorized(clientIP string) (bool, error) {

// Calling crowdsec API
resp, err := client.Do(req)
if err != nil {
return false, err
}
if resp.StatusCode == http.StatusForbidden {
return false, err
if err != nil || resp.StatusCode == http.StatusForbidden {
return 0, err
}

// Parsing response
Expand All @@ -87,30 +88,93 @@ func isIpAuthorized(clientIP string) (bool, error) {
}(resp.Body)
reqBody, err := ioutil.ReadAll(resp.Body)
if err != nil {
return false, err
return 0, err
}
if bytes.Equal(reqBody, []byte("null")) {
log.Debug().Msgf("No decision for IP %q. Accepting", clientIP)
return true, nil
return -1, nil
}

log.Debug().RawJSON("decisions", reqBody).Msg("Found Crowdsec's decision(s), evaluating ...")
var decisions []model.Decision
err = json.Unmarshal(reqBody, &decisions)
if err != nil {
return false, err
return 0, err
}
if len(decisions) == 0 {
return -1, nil
}

// Authorization logic
return len(decisions) < 0, nil
duration, err := time.ParseDuration(decisions[0].Duration)
if err != nil {
return -1, err
}
return int(duration.Seconds()), nil
}

func HandleStreamCache(initialized string) {
time.AfterFunc(crowdsecBouncerCacheStreamInterval, func () {
HandleStreamCache("false")
})
streamUrl := url.URL{
Scheme: crowdsecBouncerScheme,
Host: crowdsecBouncerHost,
Path: crowdsecBouncerStreamRoute,
RawQuery: fmt.Sprintf("startup=%s", initialized),
}
req, err := http.NewRequest(http.MethodGet, streamUrl.String(), nil)
if err != nil {
return
}
req.Header.Add(crowdsecAuthHeader, crowdsecBouncerApiKey)
log.Debug().
Str("method", http.MethodGet).
Str("url", streamUrl.String()).
Msg("Request Crowdsec's decision Local API")

// Calling crowdsec API
resp, err := client.Do(req)
if err != nil || resp.StatusCode == http.StatusForbidden {
return
}

// Parsing response
defer func(Body io.ReadCloser) {
err := Body.Close()
if err != nil {
log.Err(err).Msg("An error occurred while closing body reader")
}
}(resp.Body)
reqBody, err := ioutil.ReadAll(resp.Body)
if err != nil {
return
}
log.Debug().RawJSON("stream", reqBody).Msg("Found Crowdsec's decision(s), evaluating ...")
var stream model.Stream
err = json.Unmarshal(reqBody, &stream)
if err != nil {
return
}
for i := 0; i < len(stream.New); i++ {
maxlerebourg marked this conversation as resolved.
Show resolved Hide resolved
duration, err := time.ParseDuration(stream.New[i].Duration)
if err == nil {
cache.Set([]byte(stream.New[i].Value), []byte("t"), int(duration.Seconds()))
log.Warn().Str("decision", stream.New[i].Value).Msg("Add")
maxlerebourg marked this conversation as resolved.
Show resolved Hide resolved
}
}
for i := 0; i < len(stream.Deleted); i++ {
cache.Del([]byte(stream.Deleted[i].Value))
log.Warn().Str("decision", stream.Deleted[i].Value).Msg("Delete")
maxlerebourg marked this conversation as resolved.
Show resolved Hide resolved
}
}

/*
Main route used by Traefik to verify authorization for a request
Main route used by Traefik to verify authorization for a request
*/
func ForwardAuth(c *gin.Context) {
ipProcessed.Inc()
clientIP := c.ClientIP()
key := []byte(clientIP)

log.Debug().
Str("ClientIP", clientIP).
Expand All @@ -119,24 +183,52 @@ func ForwardAuth(c *gin.Context) {
Str(realIpHeader, c.Request.Header.Get(realIpHeader)).
Msg("Handling forwardAuth request")

if crowdsecBouncerCacheMode != "none" {
entry, err := cache.Get(key)
log.Warn().Str("entry", string(entry)).Str("key", string(key)).Msg("Entry")
maxlerebourg marked this conversation as resolved.
Show resolved Hide resolved

if err == nil && len(entry) > 0 {
log.Info().Str("Banned", string(string(entry)[0])).Msg("Reading cache")
if string(entry)[0] == 'f' {
c.Status(http.StatusOK)
} else {
c.String(crowdsecBanResponseCode, crowdsecBanResponseMsg)
}
return
}

if crowdsecBouncerCacheMode == "stream" {
c.Status(http.StatusOK)
return;
}
}

// Getting and verifying ip using ClientIP function
isAuthorized, err := isIpAuthorized(clientIP)
duration, err := getBanDuration(clientIP)
if err != nil {
log.Warn().Err(err).Msgf("An error occurred while checking IP %q", c.Request.Header.Get(clientIP))
c.String(crowdsecBanResponseCode, crowdsecBanResponseMsg)
} else if !isAuthorized {
return
}
if duration >= 0 {
if crowdsecBouncerCacheMode == "live" && duration != 0 {
cache.Set(key, []byte("t"), duration)
}
c.String(crowdsecBanResponseCode, crowdsecBanResponseMsg)
} else {
if crowdsecBouncerCacheMode == "live" {
cache.Set(key, []byte("f"), int(crowdsecBouncerDefaultCacheDuration.Seconds()))
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of using our value, could we use Crowdsec decision's duration value? The user would just have to configure it once, directly in Crowdsec.

Copy link
Author

@maxlerebourg maxlerebourg Sep 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fbonalair can Crowdsec response with duration value when the IP is authorized ?

maxlerebourg marked this conversation as resolved.
Show resolved Hide resolved
}
c.Status(http.StatusOK)
}
}

/*
Route to check bouncer connectivity with Crowdsec agent. Mainly use for Kubernetes readiness probe
Route to check bouncer connectivity with Crowdsec agent. Mainly use for Kubernetes readiness probe
*/
func Healthz(c *gin.Context) {
maxlerebourg marked this conversation as resolved.
Show resolved Hide resolved
isHealthy, err := isIpAuthorized(healthCheckIp)
if err != nil || !isHealthy {
duration, err := getBanDuration(healthCheckIp)
if err != nil || duration >= 0 {
log.Warn().Err(err).Msgf("The health check did not pass. Check error if present and if the IP %q is authorized", healthCheckIp)
c.Status(http.StatusForbidden)
} else {
Expand All @@ -145,7 +237,7 @@ func Healthz(c *gin.Context) {
}

/*
Simple route responding pong to every request. Mainly use for Kubernetes liveliness probe
Simple route responding pong to every request. Mainly use for Kubernetes liveliness probe
*/
func Ping(c *gin.Context) {
c.String(http.StatusOK, "pong")
Expand Down
Loading