diff --git a/.env b/.env index 4c08b3d..14346e8 100644 --- a/.env +++ b/.env @@ -1,2 +1,2 @@ -CROWDSEC_BOUNCER_API_KEY=40796d93c2958f9e58345514e67740e5 +CROWDSEC_BOUNCER_API_KEY=FIXME CROWDSEC_AGENT_HOST=localhost:8083 diff --git a/.gitignore b/.gitignore index 01e5891..e55df98 100644 --- a/.gitignore +++ b/.gitignore @@ -22,4 +22,7 @@ tmp/ .idea/ !.idea/runConfigurations/ -!.idea/rest-api.http \ No newline at end of file +!.idea/rest-api.http + +# Dev docker-compose +docker-compose-dev.yaml \ No newline at end of file diff --git a/README.md b/README.md index e69defd..1942342 100644 --- a/README.md +++ b/README.md @@ -11,8 +11,16 @@ A http service to verify request and bounce them according to decisions made by # Description This repository aim to implement a [CrowdSec](https://doc.crowdsec.net/) bouncer for the router [Traefik](https://doc.traefik.io/traefik/) to block malicious IP to access your services. + +It can operate with 2 modes: +- Request For this it leverages [Traefik v2 ForwardAuth middleware](https://doc.traefik.io/traefik/middlewares/http/forwardauth/) and query CrowdSec with client IP. If the client IP is on ban list, it will get a http code 403 response. Otherwise, request will continue as usual. +The bouncer can leverage use of a [local cache](https://github.com/patrickmn/go-cache) in order to reduce the number of requests made to the crowdsec local API. It will keep in cache the status for each IP that makes queries. + +- Stream +Streaming mode allows you to keep in the local cache only the Banned IPs, every requests that does not hit the cache is authorized. Every minute, the cache is updated with news from the Local API using [go-cron](https://github.com/robfig/cron) library. +It is the recommanded and most perfomant mode, (enabled by default) # Demo ## Prerequisites @@ -60,6 +68,10 @@ The webservice configuration is made via environment variables: * `CROWDSEC_BOUNCER_LOG_LEVEL` - Minimum log level for bouncer. Expected value [zerolog levels](https://pkg.go.dev/github.com/rs/zerolog#readme-leveled-logging). Default to 1 * `CROWDSEC_BOUNCER_BAN_RESPONSE_CODE` - HTTP code to respond in case of ban. Default to 403 * `CROWDSEC_BOUNCER_BAN_RESPONSE_MSG` - HTTP body as message to respond in case of ban. Default to Forbidden +* `CROWDSEC_BOUNCER_ENABLE_LOCAL_CACHE` - Configure the use of a local cache in memory. Default to false +* `CROWDSEC_DEFAULT_CACHE_DURATION` - Configure default duration of the cached data. Default to "15m00s" +* `CROWDSEC_LAPI_ENABLE_STREAM_MODE` - Enable streaming mode to pull decisions from the LAPI. Will override CROWDSEC_BOUNCER_ENABLE_LOCAL_CACHE and enable it. Default to "true" +* `CROWDSEC_LAPI_STREAM_MODE_INTERVAL` - Define the interval between two calls to LAPI. Default to "1m" * `PORT` - Change listening port of web server. Default listen on 8080 * `GIN_MODE` - By default, run app in "debug" mode. Set it to "release" in production * `TRUSTED_PROXIES` - List of trusted proxies IP addresses in CIDR format, delimited by ','. Default of 0.0.0.0/0 should be fine for most use cases, but you HAVE to add them directly in traefik. @@ -82,3 +94,5 @@ Any constructive feedback is welcome, fill free to add an issue or a pull reques 4. In `_test.env` replace `` with the previously generated key 5. Adding a banned IP to your crodwsec instance with : `docker exec traefik-crowdsec-bouncer-crowdsec-1 cscli decisions add -i 1.2.3.4` 6. Run test with `godotenv -f ./_test.env go test -cover` + +NB: Be aware that you cannot use network_mode: host with Docker Desktop on Windows. It is used in the docker-compose.yaml file for the traefik container to be able to contact a local instance of the bouncer through localhost \ No newline at end of file diff --git a/bouncer.go b/bouncer.go index 134ebb3..67ee6e0 100644 --- a/bouncer.go +++ b/bouncer.go @@ -1,19 +1,30 @@ package main import ( + "fmt" "os" + "time" + + "strings" . "github.com/fbonalair/traefik-crowdsec-bouncer/config" "github.com/fbonalair/traefik-crowdsec-bouncer/controler" "github.com/gin-contrib/logger" "github.com/gin-gonic/gin" + "github.com/patrickmn/go-cache" + "github.com/robfig/cron/v3" "github.com/rs/zerolog" "github.com/rs/zerolog/log" - "strings" ) var logLevel = OptionalEnv("CROWDSEC_BOUNCER_LOG_LEVEL", "1") var trustedProxiesList = strings.Split(OptionalEnv("TRUSTED_PROXIES", "0.0.0.0/0"), ",") +var crowdsecDefaultCacheDuration = OptionalEnv("CROWDSEC_BOUNCER_DEFAULT_CACHE_DURATION", "15m00s") +var crowdsecDefaultStreamModeInterval = OptionalEnv("CROWDSEC_LAPI_STREAM_MODE_INTERVAL", "1m") +var crowdsecEnableLocalCache = OptionalEnv("CROWDSEC_BOUNCER_ENABLE_LOCAL_CACHE", "false") +var crowdsecEnableStreamMode = OptionalEnv("CROWDSEC_LAPI_ENABLE_STREAM_MODE", "true") +var cr *cron.Cron +var lc *cache.Cache func main() { ValidateEnv() @@ -31,6 +42,61 @@ func main() { } +func cacheMiddleware(lc *cache.Cache) gin.HandlerFunc { + return func(c *gin.Context) { + c.Set("lc", lc) + c.Next() + } +} + +func cronMiddleware(cr *cron.Cron) gin.HandlerFunc { + return func(c *gin.Context) { + c.Set("cr", cr) + c.Next() + } +} + +func setupCacheStream() { + // local go-cache and streaming mode + if crowdsecEnableLocalCache == "true" || crowdsecEnableStreamMode == "true" { + duration, err := time.ParseDuration(crowdsecDefaultCacheDuration) + if err != nil { + log.Warn().Msg("Duration provided is not valid, defaulting to 15m00s") + duration, _ = time.ParseDuration("15m") + } + lc = cache.New(duration, 5*time.Minute) + if crowdsecEnableStreamMode == "true" { + duration, err := time.ParseDuration(crowdsecDefaultStreamModeInterval) + var strD string + if err != nil { + log.Warn().Msg("Duration provided is not valid, defaulting to 1m") + duration, _ = time.ParseDuration("1m") + strD = duration.String() + strD = fmt.Sprintf("@every %v", strD) + } else { + strD = duration.String() + strD = fmt.Sprintf("@every %v", strD) + } + go func() { + log.Debug().Msg("Streaming mode enabled") + cr = cron.New() + cr.Start() + cr.AddFunc(strD, func() { + controler.CallLAPIStream(lc, false) + }) + log.Debug().Msg("Start polling initial stream") + controler.CallLAPIStream(lc, true) + log.Debug().Msg("Finish polling initial stream") + }() + } else { + cr = nil + } + + } else { + lc = nil + cr = nil + } +} func setupRouter() (*gin.Engine, error) { // logger framework if gin.IsDebugging() { @@ -49,6 +115,8 @@ func setupRouter() (*gin.Engine, error) { } zerolog.SetGlobalLevel(level) + setupCacheStream() + // Web framework router := gin.New() err = router.SetTrustedProxies(trustedProxiesList) @@ -58,6 +126,8 @@ func setupRouter() (*gin.Engine, error) { router.Use(logger.SetLogger( logger.WithSkipPath([]string{"/api/v1/ping", "/api/v1/healthz"}), )) + router.Use(cacheMiddleware(lc)) + router.Use(cronMiddleware(cr)) router.GET("/api/v1/ping", controler.Ping) router.GET("/api/v1/healthz", controler.Healthz) router.GET("/api/v1/forwardAuth", controler.ForwardAuth) diff --git a/controler/controler.go b/controler/controler.go index 2f6aae0..09f74fb 100644 --- a/controler/controler.go +++ b/controler/controler.go @@ -4,9 +4,6 @@ import ( "bytes" "encoding/json" "fmt" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" - "github.com/prometheus/client_golang/prometheus/promhttp" "io" "io/ioutil" "net/http" @@ -14,6 +11,11 @@ import ( "strconv" "time" + "github.com/patrickmn/go-cache" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/client_golang/prometheus/promhttp" + . "github.com/fbonalair/traefik-crowdsec-bouncer/config" "github.com/fbonalair/traefik-crowdsec-bouncer/model" "github.com/gin-gonic/gin" @@ -21,11 +23,12 @@ import ( ) const ( - realIpHeader = "X-Real-Ip" - forwardHeader = "X-Forwarded-For" - crowdsecAuthHeader = "X-Api-Key" - crowdsecBouncerRoute = "v1/decisions" - healthCheckIp = "127.0.0.1" + realIpHeader = "X-Real-Ip" + forwardHeader = "X-Forwarded-For" + crowdsecAuthHeader = "X-Api-Key" + crowdsecBouncerRoute = "v1/decisions" + crowdsecBouncerStreamRoute = "v1/decisions/stream" + healthCheckIp = "127.0.0.1" ) var crowdsecBouncerApiKey = RequiredEnv("CROWDSEC_BOUNCER_API_KEY") @@ -33,13 +36,14 @@ var crowdsecBouncerHost = RequiredEnv("CROWDSEC_AGENT_HOST") var crowdsecBouncerScheme = OptionalEnv("CROWDSEC_BOUNCER_SCHEME", "http") var crowdsecBanResponseCode, _ = strconv.Atoi(OptionalEnv("CROWDSEC_BOUNCER_BAN_RESPONSE_CODE", "403")) // Validated via ValidateEnv() var crowdsecBanResponseMsg = OptionalEnv("CROWDSEC_BOUNCER_BAN_RESPONSE_MSG", "Forbidden") +var crowdsecDefaultCacheDuration = OptionalEnv("CROWDSEC_BOUNCER_DEFAULT_CACHE_DURATION", "15m00s") +var crowdsecEnableStreamMode = OptionalEnv("CROWDSEC_LAPI_ENABLE_STREAM_MODE", "true") var ( ipProcessed = promauto.NewCounter(prometheus.CounterOpts{ Name: "crowdsec_traefik_bouncer_processed_ip_total", Help: "The total number of processed IP", }) ) - var client = &http.Client{ Transport: &http.Transport{ MaxIdleConns: 10, @@ -51,7 +55,7 @@ var client = &http.Client{ /** Call Crowdsec local IP and with realIP and return true if IP does NOT have a ban decisions. */ -func isIpAuthorized(clientIP string) (bool, error) { +func isIpAuthorized(clientIP string) (authorized bool, decisions []model.Decision, err error) { // Generating crowdsec API request decisionUrl := url.URL{ Scheme: crowdsecBouncerScheme, @@ -61,7 +65,7 @@ func isIpAuthorized(clientIP string) (bool, error) { } req, err := http.NewRequest(http.MethodGet, decisionUrl.String(), nil) if err != nil { - return false, err + return false, nil, err } req.Header.Add(crowdsecAuthHeader, crowdsecBouncerApiKey) log.Debug(). @@ -72,10 +76,10 @@ func isIpAuthorized(clientIP string) (bool, error) { // Calling crowdsec API resp, err := client.Do(req) if err != nil { - return false, err + return false, nil, err } if resp.StatusCode == http.StatusForbidden { - return false, err + return false, nil, err } // Parsing response @@ -87,22 +91,134 @@ func isIpAuthorized(clientIP string) (bool, error) { }(resp.Body) reqBody, err := ioutil.ReadAll(resp.Body) if err != nil { - return false, err + return false, nil, err } if bytes.Equal(reqBody, []byte("null")) { log.Debug().Msgf("No decision for IP %q. Accepting", clientIP) - return true, nil + return true, nil, nil } log.Debug().RawJSON("decisions", reqBody).Msg("Found Crowdsec's decision(s), evaluating ...") - var decisions []model.Decision err = json.Unmarshal(reqBody, &decisions) if err != nil { - return false, err + return false, nil, err } // Authorization logic - return len(decisions) < 0, nil + if len(decisions) > 0 { + return false, decisions, nil + } else { + return true, nil, nil + } +} + +/* + Call to the LAPI stream and cache updates +*/ +func CallLAPIStream(lc *cache.Cache, init bool) { + if lc != nil { + log.Debug().Bool("init", init).Msg("Start polling stream") + streamUrl := url.URL{ + Scheme: crowdsecBouncerScheme, + Host: crowdsecBouncerHost, + Path: crowdsecBouncerStreamRoute, + RawQuery: fmt.Sprintf("startup=%t", init), + } + req, err := http.NewRequest(http.MethodGet, streamUrl.String(), nil) + if err != nil { + // log smth + log.Warn().Msg("Could not create http request") + } + req.Header.Add(crowdsecAuthHeader, crowdsecBouncerApiKey) + log.Debug(). + Str("method", http.MethodGet). + Str("url", streamUrl.String()). + Msg("Request Crowdsec's stream Local API") + + // Calling crowdsec stream LAPI + resp, err := client.Do(req) + defer func(Body io.ReadCloser) { + err := Body.Close() + if err != nil { + log.Err(err).Msg("An error occurred while closing body reader") + } + }(resp.Body) + reqBody, err := ioutil.ReadAll(resp.Body) + if err != nil { + log.Warn().Msg("Error reading resp.Body") + return + } + // if bytes.Equal(reqBody, []byte("null")) { + // log.Debug().Msgf("No decision for IP %q. Accepting", clientIP) + // return true, nil, nil + // } + var streamDecisions model.StreamDecision + // log.Debug().RawJSON("decisions", reqBody).Msg("Found Crowdsec's decision(s), evaluating ...") + err = json.Unmarshal(reqBody, &streamDecisions) + if err != nil { + log.Warn().Msg("Error unmarshall to streamDecision") + return + } + for _, d := range streamDecisions.New { + addToCache(lc, d) + } + for _, d := range streamDecisions.Deleted { + removeFromCache(lc, d) + } + } +} + +/* + Add to the cache information about the IP +*/ +func addToCache(lc *cache.Cache, d model.Decision) { + if lc != nil { + log.Debug().Interface("decision", d).Msg("Add IP to local cache") + + duration, err := time.ParseDuration(d.Duration) + if err != nil { + log.Warn().Str("Duration", duration.String()).Msg("Error parsing duration provided") + duration, _ = time.ParseDuration(cache.DefaultExpiration.String()) + } + lc.Set(d.Value, d, duration) + } +} + +/* + Remove from the cache information about the IP +*/ +func removeFromCache(lc *cache.Cache, d model.Decision) { + if lc != nil { + log.Debug().Interface("decision", d).Msg("Remove IP from local cache") + lc.Delete(d.Value) + } +} + +/* + Get Local cache result for the IP, return if we found it and if it is authorized +*/ +func getLocalCache(lc *cache.Cache, clientIP string) (lcFound bool, lcAuthorized bool) { + + if lc != nil { + log.Debug(). + Msg("Check if IP is in the local cache") + if cachedIP, time, found := lc.GetWithExpiration(clientIP); found { + value := cachedIP.(model.Decision) + log.Debug(). + Str("ClientIP", value.Value). + Time("expirationTime", time). + Str("duration", value.Duration). + Msg("IP was found in local cache") + // check if the result is lcAuthorized + return true, value.Authorized + } else { + log.Debug(). + Str("ClientIP", clientIP). + Msg("IP was not found in local cache") + return false, true + } + } + return false, true } /* @@ -119,15 +235,42 @@ func ForwardAuth(c *gin.Context) { Str(realIpHeader, c.Request.Header.Get(realIpHeader)). Msg("Handling forwardAuth request") - // Getting and verifying ip using ClientIP function - isAuthorized, err := isIpAuthorized(clientIP) - if err != nil { - log.Warn().Err(err).Msgf("An error occurred while checking IP %q", c.Request.Header.Get(clientIP)) - c.String(crowdsecBanResponseCode, crowdsecBanResponseMsg) - } else if !isAuthorized { + // check local cache + lc := c.MustGet("lc").(*cache.Cache) + + lcFound, lcAuthorized := getLocalCache(lc, clientIP) + log.Debug().Bool("lcFound", lcFound).Bool("lcAuthorized", lcAuthorized).Msg("Result of cache") + // the IP was banned and found in the cache + if !lcAuthorized { c.String(crowdsecBanResponseCode, crowdsecBanResponseMsg) - } else { + // The IP has been found in the cache but was not banned before + } else if lcFound || crowdsecEnableStreamMode == "true" { + // if we are in streaming mode, any IP not found in the cache will be cleared c.Status(http.StatusOK) + } else { + // Getting and verifying ip using ClientIP function + // We should look at the cache in the isIPAuthorized + isAuthorized, decisions, err := isIpAuthorized(clientIP) + if err != nil { + log.Warn().Err(err).Msgf("An error occurred while checking IP %q", c.Request.Header.Get(clientIP)) + c.String(crowdsecBanResponseCode, crowdsecBanResponseMsg) + } else if !isAuthorized { + // result is authorized = false, we take the first decision returned by lapi + log.Debug().Msg("Not Authorized") + d := decisions[0] + d.Authorized = false + addToCache(lc, d) + c.String(crowdsecBanResponseCode, crowdsecBanResponseMsg) + } else { + // result is autorized = true (nil), we create a decision based on the IP + log.Debug().Msg("Authorized") + var d model.Decision + d.Duration = crowdsecDefaultCacheDuration + d.Value = clientIP + d.Authorized = true + addToCache(lc, d) + c.Status(http.StatusOK) + } } } @@ -135,7 +278,7 @@ func ForwardAuth(c *gin.Context) { Route to check bouncer connectivity with Crowdsec agent. Mainly use for Kubernetes readiness probe */ func Healthz(c *gin.Context) { - isHealthy, err := isIpAuthorized(healthCheckIp) + isHealthy, _, err := isIpAuthorized(healthCheckIp) if err != nil || !isHealthy { log.Warn().Err(err).Msgf("The health check did not pass. Check error if present and if the IP %q is authorized", healthCheckIp) c.Status(http.StatusForbidden) diff --git a/docker-compose.yaml b/docker-compose.yaml index c382950..9f8fce5 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -2,7 +2,7 @@ version: "3.8" services: traefik: - image: "traefik:v2.5" + image: "traefik:v2.8.4" container_name: "traefik" command: # - "--log.level=DEBUG" @@ -29,14 +29,14 @@ services: - "traefik.http.routers.whoami.middlewares=crowdsec-bouncer@docker" crowdsec: - image: crowdsecurity/crowdsec:v1.2.0 + image: crowdsecurity/crowdsec:v1.4.1 environment: COLLECTIONS: "crowdsecurity/nginx" GID: "${GID-1000}" depends_on: - 'traefik' volumes: - - ./crowdsec/acquis.yaml:/etc/crowdsec/acquis.yaml + # - ./crowdsec/acquis.yaml:/etc/crowdsec/acquis.yaml - logs:/var/log/nginx - crowdsec-db:/var/lib/crowdsec/data/ - crowdsec-config:/etc/crowdsec/ diff --git a/go.mod b/go.mod index 8c5b846..f3e0585 100644 --- a/go.mod +++ b/go.mod @@ -27,10 +27,12 @@ require ( github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.1 // indirect + github.com/patrickmn/go-cache v2.1.0+incompatible // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/common v0.26.0 // indirect github.com/prometheus/procfs v0.6.0 // indirect + github.com/robfig/cron/v3 v3.0.0 // indirect github.com/ugorji/go/codec v1.1.7 // indirect golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 // indirect golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40 // indirect diff --git a/go.sum b/go.sum index a04be99..2f21d4a 100644 --- a/go.sum +++ b/go.sum @@ -89,6 +89,8 @@ github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9 github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= +github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= +github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -112,6 +114,8 @@ github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsT github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= github.com/prometheus/procfs v0.6.0 h1:mxy4L2jP6qMonqmq+aTtOx1ifVWUgG/TAmntgbh3xv4= github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= +github.com/robfig/cron/v3 v3.0.0 h1:kQ6Cb7aHOHTSzNVNEhmp8EcWKLb4CbiMW9h9VyIhO4E= +github.com/robfig/cron/v3 v3.0.0/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ= github.com/rs/xid v1.3.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= github.com/rs/zerolog v1.23.0/go.mod h1:6c7hFfxPOy7TacJc4Fcdi24/J0NKYGzjG8FWRI916Qo= diff --git a/model/model.go b/model/model.go index 10cb384..7bf0dce 100644 --- a/model/model.go +++ b/model/model.go @@ -4,12 +4,18 @@ package model Data representing a decision made by Crowdsec */ type Decision struct { - Id int `json:"id"` - Origin string `json:"origin"` - Type string `json:"type"` - Scope string `json:"scope"` - Value string `json:"value"` - Duration string `json:"duration"` - Scenario string `json:"scenario"` - Simulated bool `json:"simulated"` + Id int `json:"id"` + Origin string `json:"origin"` + Type string `json:"type"` + Scope string `json:"scope"` + Value string `json:"value"` + Duration string `json:"duration"` + Scenario string `json:"scenario"` + Authorized bool `json:"authorized"` + Simulated bool `json:"simulated"` +} + +type StreamDecision struct { + Deleted []Decision `json:"deleted"` + New []Decision `json:"new"` }