diff --git a/collectors/echarging-ocpi/infrastructure/helm/driwe.yaml b/collectors/echarging-ocpi/infrastructure/helm/driwe.yaml index be58a61..ef1da3a 100644 --- a/collectors/echarging-ocpi/infrastructure/helm/driwe.yaml +++ b/collectors/echarging-ocpi/infrastructure/helm/driwe.yaml @@ -33,6 +33,7 @@ env: GIN_MODE: release LOGLEVEL: DEBUG RABBITMQ_EXCHANGE: ingress + RABBITMQ_CLIENT: dc-echarging-ocpi-driwe # postfix is added to provides depending on the data source (push path, pull etc.) PROVIDER: echarging-ocpi/driwe PULL_LOCATIONS_CRON: "*/10 * * * *" diff --git a/collectors/echarging-ocpi/infrastructure/helm/neogy.yaml b/collectors/echarging-ocpi/infrastructure/helm/neogy.yaml index bc10e2d..9f620e4 100644 --- a/collectors/echarging-ocpi/infrastructure/helm/neogy.yaml +++ b/collectors/echarging-ocpi/infrastructure/helm/neogy.yaml @@ -40,6 +40,7 @@ env: RABBITMQ_EXCHANGE: ingress PROVIDER: echarging-ocpi/neogy PULL_LOCATIONS_CRON: "0 0,4,8,12,16 * * *" + RABBITMQ_CLIENT: dc-echarging-ocpi-driwe envSecret: # List of valid tokens, no spaces etc. diff --git a/collectors/echarging-ocpi/src/cron.go b/collectors/echarging-ocpi/src/cron.go index d614d6c..6b9d607 100644 --- a/collectors/echarging-ocpi/src/cron.go +++ b/collectors/echarging-ocpi/src/cron.go @@ -11,9 +11,12 @@ import ( "log/slog" "net/http" "time" + + "github.com/noi-techpark/go-opendatahub-ingest/dto" + "github.com/noi-techpark/go-opendatahub-ingest/mq" ) -func getAllLocations(rabbit RabbitC, provider string) error { +func getAllLocations(rabbit mq.R, provider string) error { slog.Debug("Pulling all locations") url := cfg.PULL_LOCATIONS_ENDPOINT for url != "" { @@ -24,11 +27,11 @@ func getAllLocations(rabbit RabbitC, provider string) error { return err } - err = rabbit.Publish(mqMsg{ + err = rabbit.Publish(dto.RawAny{ Provider: provider, Timestamp: time.Now(), Rawdata: locations, - }, cfg.RABBITMQ_EXCHANGE) + }, cfg.MQ_EXCHANGE) if err != nil { slog.Error("error getting locations") return err diff --git a/collectors/echarging-ocpi/src/endpoint.go b/collectors/echarging-ocpi/src/endpoint.go index f809ce0..1d999b6 100644 --- a/collectors/echarging-ocpi/src/endpoint.go +++ b/collectors/echarging-ocpi/src/endpoint.go @@ -12,13 +12,15 @@ import ( "time" "github.com/gin-gonic/gin" + "github.com/noi-techpark/go-opendatahub-ingest/dto" + "github.com/noi-techpark/go-opendatahub-ingest/mq" ) func health(c *gin.Context) { c.Status(http.StatusOK) } -func handlePush(rabbit RabbitC, provider string) gin.HandlerFunc { +func handlePush(rabbit mq.R, provider string) gin.HandlerFunc { return func(c *gin.Context) { var body map[string]any if err := c.BindJSON(&body); err != nil { @@ -34,7 +36,7 @@ func handlePush(rabbit RabbitC, provider string) gin.HandlerFunc { slog.Debug("Received message", "params", params, "body", body, "path", c.FullPath()) - err := rabbit.Publish(mqMsg{ + err := rabbit.Publish(dto.RawAny{ Provider: provider, Timestamp: time.Now(), Rawdata: map[string]any{ @@ -42,7 +44,7 @@ func handlePush(rabbit RabbitC, provider string) gin.HandlerFunc { "body": body, // TODO: once more than one endpoint are implemented, send some details about which endpoint this belongs to. or put it into the routing key }, - }, cfg.RABBITMQ_EXCHANGE) + }, cfg.MQ_EXCHANGE) if err != nil { c.AbortWithError(http.StatusInternalServerError, fmt.Errorf("cannot publish to rabbitmq: %w", err)) diff --git a/collectors/echarging-ocpi/src/go.mod b/collectors/echarging-ocpi/src/go.mod index 4f29d85..cb5b52d 100644 --- a/collectors/echarging-ocpi/src/go.mod +++ b/collectors/echarging-ocpi/src/go.mod @@ -5,25 +5,26 @@ go 1.22 require ( github.com/gin-contrib/cors v1.7.2 github.com/gin-gonic/gin v1.10.0 - github.com/samber/slog-gin v1.13.4 + github.com/samber/slog-gin v1.13.6 ) +require github.com/noi-techpark/go-opendatahub-ingest v0.0.0-20241119065535-d68a9695ad44 + require ( - github.com/bytedance/sonic v1.12.2 // indirect - github.com/bytedance/sonic/loader v0.2.0 // indirect + github.com/bytedance/sonic v1.12.4 // indirect + github.com/bytedance/sonic/loader v0.2.1 // indirect github.com/cloudwego/base64x v0.1.4 // indirect github.com/cloudwego/iasm v0.2.0 // indirect - github.com/gabriel-vasile/mimetype v1.4.5 // indirect + github.com/gabriel-vasile/mimetype v1.4.6 // indirect github.com/gin-contrib/sse v0.1.0 // indirect github.com/go-playground/locales v0.14.1 // indirect github.com/go-playground/universal-translator v0.18.1 // indirect - github.com/go-playground/validator/v10 v10.22.0 // indirect + github.com/go-playground/validator/v10 v10.23.0 // indirect github.com/goccy/go-json v0.10.3 // indirect github.com/google/uuid v1.6.0 // indirect - github.com/joho/godotenv v1.5.1 github.com/json-iterator/go v1.1.12 // indirect github.com/kelseyhightower/envconfig v1.4.0 - github.com/klauspost/cpuid/v2 v2.2.8 // indirect + github.com/klauspost/cpuid/v2 v2.2.9 // indirect github.com/leodido/go-urn v1.4.0 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect @@ -33,13 +34,13 @@ require ( github.com/robfig/cron/v3 v3.0.1 github.com/twitchyliquid64/golang-asm v0.15.1 // indirect github.com/ugorji/go/codec v1.2.12 // indirect - go.opentelemetry.io/otel v1.28.0 // indirect - go.opentelemetry.io/otel/trace v1.28.0 // indirect - golang.org/x/arch v0.9.0 // indirect - golang.org/x/crypto v0.26.0 // indirect - golang.org/x/net v0.28.0 // indirect - golang.org/x/sys v0.24.0 // indirect - golang.org/x/text v0.17.0 // indirect - google.golang.org/protobuf v1.34.2 // indirect + go.opentelemetry.io/otel v1.32.0 // indirect + go.opentelemetry.io/otel/trace v1.32.0 // indirect + golang.org/x/arch v0.12.0 // indirect + golang.org/x/crypto v0.29.0 // indirect + golang.org/x/net v0.31.0 // indirect + golang.org/x/sys v0.27.0 // indirect + golang.org/x/text v0.20.0 // indirect + google.golang.org/protobuf v1.35.2 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/collectors/echarging-ocpi/src/go.sum b/collectors/echarging-ocpi/src/go.sum index 4da6449..4d72353 100644 --- a/collectors/echarging-ocpi/src/go.sum +++ b/collectors/echarging-ocpi/src/go.sum @@ -1,8 +1,8 @@ -github.com/bytedance/sonic v1.12.2 h1:oaMFuRTpMHYLpCntGca65YWt5ny+wAceDERTkT2L9lg= -github.com/bytedance/sonic v1.12.2/go.mod h1:B8Gt/XvtZ3Fqj+iSKMypzymZxw/FVwgIGKzMzT9r/rk= +github.com/bytedance/sonic v1.12.4 h1:9Csb3c9ZJhfUWeMtpCDCq6BUoH5ogfDFLUgQ/jG+R0k= +github.com/bytedance/sonic v1.12.4/go.mod h1:B8Gt/XvtZ3Fqj+iSKMypzymZxw/FVwgIGKzMzT9r/rk= github.com/bytedance/sonic/loader v0.1.1/go.mod h1:ncP89zfokxS5LZrJxl5z0UJcsk4M4yY2JpfqGeCtNLU= -github.com/bytedance/sonic/loader v0.2.0 h1:zNprn+lsIP06C/IqCHs3gPQIvnvpKbbxyXQP1iU4kWM= -github.com/bytedance/sonic/loader v0.2.0/go.mod h1:ncP89zfokxS5LZrJxl5z0UJcsk4M4yY2JpfqGeCtNLU= +github.com/bytedance/sonic/loader v0.2.1 h1:1GgorWTqf12TA8mma4DDSbaQigE2wOgQo7iCjjJv3+E= +github.com/bytedance/sonic/loader v0.2.1/go.mod h1:ncP89zfokxS5LZrJxl5z0UJcsk4M4yY2JpfqGeCtNLU= github.com/cloudwego/base64x v0.1.4 h1:jwCgWpFanWmN8xoIUHa2rtzmkd5J2plF/dnLS6Xd/0Y= github.com/cloudwego/base64x v0.1.4/go.mod h1:0zlkT4Wn5C6NdauXdJRhSKRlJvmclQ1hhJgA0rcu/8w= github.com/cloudwego/iasm v0.2.0 h1:1KNIy1I1H9hNNFEEH3DVnI4UujN+1zjpuk6gwHLTssg= @@ -10,8 +10,8 @@ github.com/cloudwego/iasm v0.2.0/go.mod h1:8rXZaNYT2n95jn+zTI1sDr+IgcD2GVs0nlbbQ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/gabriel-vasile/mimetype v1.4.5 h1:J7wGKdGu33ocBOhGy0z653k/lFKLFDPJMG8Gql0kxn4= -github.com/gabriel-vasile/mimetype v1.4.5/go.mod h1:ibHel+/kbxn9x2407k1izTA1S81ku1z/DlgOW2QE0M4= +github.com/gabriel-vasile/mimetype v1.4.6 h1:3+PzJTKLkvgjeTbts6msPJt4DixhT4YtFNf1gtGe3zc= +github.com/gabriel-vasile/mimetype v1.4.6/go.mod h1:JX1qVKqZd40hUPpAfiNTe0Sne7hdfKSbOqqmkq8GCXc= github.com/gin-contrib/cors v1.7.2 h1:oLDHxdg8W/XDoN/8zamqk/Drgt4oVZDvaV0YmvVICQw= github.com/gin-contrib/cors v1.7.2/go.mod h1:SUJVARKgQ40dmrzgXEVxj2m7Ig1v1qIboQkPDTQ9t2E= github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE= @@ -24,8 +24,8 @@ github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/o github.com/go-playground/locales v0.14.1/go.mod h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/NuW9t0sjnWqG1slY= github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJnYK9S473LQFuzCbDbfSFY= github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY= -github.com/go-playground/validator/v10 v10.22.0 h1:k6HsTZ0sTnROkhS//R0O+55JgM8C4Bx7ia+JlgcnOao= -github.com/go-playground/validator/v10 v10.22.0/go.mod h1:dbuPbCMFw/DrkbEynArYaCwl3amGuJotoKCe95atGMM= +github.com/go-playground/validator/v10 v10.23.0 h1:/PwmTwZhS0dPkav3cdK9kV1FsAmrL8sThn8IHr/sO+o= +github.com/go-playground/validator/v10 v10.23.0/go.mod h1:dbuPbCMFw/DrkbEynArYaCwl3amGuJotoKCe95atGMM= github.com/goccy/go-json v0.10.3 h1:KZ5WoDbxAIgm2HNbYckL0se1fHD6rz5j4ywS6ebzDqA= github.com/goccy/go-json v0.10.3/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= @@ -33,15 +33,13 @@ github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeN github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= -github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/kelseyhightower/envconfig v1.4.0 h1:Im6hONhd3pLkfDFsbRgu68RDNkGF1r3dvMUtDTo2cv8= github.com/kelseyhightower/envconfig v1.4.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= -github.com/klauspost/cpuid/v2 v2.2.8 h1:+StwCXwm9PdpiEkPyzBXIy+M9KUb4ODm0Zarf1kS5BM= -github.com/klauspost/cpuid/v2 v2.2.8/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= +github.com/klauspost/cpuid/v2 v2.2.9 h1:66ze0taIn2H33fBvCkXuv9BmCwDfafmiIVpKV9kKGuY= +github.com/klauspost/cpuid/v2 v2.2.9/go.mod h1:rqkxqrZ1EhYM9G+hXH7YdowN5R5RGN6NK4QwQ3WMXF8= github.com/knz/go-libedit v1.10.1/go.mod h1:MZTVkCWyz0oBc7JOWP3wNAzd002ZbM/5hgShxwh4x8M= github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= @@ -56,6 +54,8 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/noi-techpark/go-opendatahub-ingest v0.0.0-20241119065535-d68a9695ad44 h1:rr/a0Lha81kj2sase8SXQrFrWHeFduUehgMAjiZwRC0= +github.com/noi-techpark/go-opendatahub-ingest v0.0.0-20241119065535-d68a9695ad44/go.mod h1:OR71Qot9WXWSNwLjNNuhhdIts1KbDfDzbwv2F/Q6KOk= github.com/pelletier/go-toml/v2 v2.2.3 h1:YmeHyLY8mFWbdkNWwpr+qIL2bEqT0o95WSdkNHvL12M= github.com/pelletier/go-toml/v2 v2.2.3/go.mod h1:MfCQTFTvCcUyyvvwm1+G6H/jORL20Xlb6rzQu9GuUkc= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -66,8 +66,8 @@ github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rogpeppe/go-internal v1.8.0 h1:FCbCCtXNOY3UtUuHUYaghJg4y7Fd14rXifAYUAtL9R8= github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE= -github.com/samber/slog-gin v1.13.4 h1:L3tkid2T+km1hjXGka8pVmqqdIH3K+AT9jWifEHlOl8= -github.com/samber/slog-gin v1.13.4/go.mod h1:vqUCcni2o7z/miSF3uj904ZL8+hVBiwnPKP8Id0RNe8= +github.com/samber/slog-gin v1.13.6 h1:clWpgtdL/3KM1eGkUMUHTLNM1fG4nA3bHC1Pt7v0z44= +github.com/samber/slog-gin v1.13.6/go.mod h1:iicbXYT1DozbzsbLfpRdXkAal3zmzIjayQCV5YR+A6M= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= @@ -82,26 +82,25 @@ github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08= github.com/ugorji/go/codec v1.2.12 h1:9LC83zGrHhuUA9l16C9AHXAqEV/2wBQ4nkvumAE65EE= github.com/ugorji/go/codec v1.2.12/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg= -go.opentelemetry.io/otel v1.28.0 h1:/SqNcYk+idO0CxKEUOtKQClMK/MimZihKYMruSMViUo= -go.opentelemetry.io/otel v1.28.0/go.mod h1:q68ijF8Fc8CnMHKyzqL6akLO46ePnjkgfIMIjUIX9z4= -go.opentelemetry.io/otel/trace v1.28.0 h1:GhQ9cUuQGmNDd5BTCP2dAvv75RdMxEfTmYejp+lkx9g= -go.opentelemetry.io/otel/trace v1.28.0/go.mod h1:jPyXzNPg6da9+38HEwElrQiHlVMTnVfM3/yv2OlIHaI= +go.opentelemetry.io/otel v1.32.0 h1:WnBN+Xjcteh0zdk01SVqV55d/m62NJLJdIyb4y/WO5U= +go.opentelemetry.io/otel v1.32.0/go.mod h1:00DCVSB0RQcnzlwyTfqtxSm+DRr9hpYrHjNGiBHVQIg= +go.opentelemetry.io/otel/trace v1.32.0 h1:WIC9mYrXf8TmY/EXuULKc8hR17vE+Hjv2cssQDe03fM= +go.opentelemetry.io/otel/trace v1.32.0/go.mod h1:+i4rkvCraA+tG6AzwloGaCtkx53Fa+L+V8e9a7YvhT8= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= -golang.org/x/arch v0.9.0 h1:ub9TgUInamJ8mrZIGlBG6/4TqWeMszd4N8lNorbrr6k= -golang.org/x/arch v0.9.0/go.mod h1:FEVrYAQjsQXMVJ1nsMoVVXPZg6p2JE2mx8psSWTDQys= -golang.org/x/crypto v0.26.0 h1:RrRspgV4mU+YwB4FYnuBoKsUapNIL5cohGAmSH3azsw= -golang.org/x/crypto v0.26.0/go.mod h1:GY7jblb9wI+FOo5y8/S2oY4zWP07AkOJ4+jxCqdqn54= -golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE= -golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg= -golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/arch v0.12.0 h1:UsYJhbzPYGsT0HbEdmYcqtCv8UNGvnaL561NnIUvaKg= +golang.org/x/arch v0.12.0/go.mod h1:FEVrYAQjsQXMVJ1nsMoVVXPZg6p2JE2mx8psSWTDQys= +golang.org/x/crypto v0.29.0 h1:L5SG1JTTXupVV3n6sUqMTeWbjAyfPwoda2DLX8J8FrQ= +golang.org/x/crypto v0.29.0/go.mod h1:+F4F4N5hv6v38hfeYwTdx20oUvLLc+QfrE9Ax9HtgRg= +golang.org/x/net v0.31.0 h1:68CPQngjLL0r2AlUKiSxtQFKvzRVbnzLwMUn5SzcLHo= +golang.org/x/net v0.31.0/go.mod h1:P4fl1q7dY2hnZFxEk4pPSkDHF+QqjitcnDjUQyMM+pM= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg= -golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc= -golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= -google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= -google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= +golang.org/x/sys v0.27.0 h1:wBqf8DvsY9Y/2P8gAfPDEYNuS30J4lPHJxXSb/nJZ+s= +golang.org/x/sys v0.27.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.20.0 h1:gK/Kv2otX8gz+wn7Rmb3vT96ZwuoxnQlY+HlJVj7Qug= +golang.org/x/text v0.20.0/go.mod h1:D4IsuqiFMhST5bX19pQ9ikHC2GsaKyk/oF+pn3ducp4= +google.golang.org/protobuf v1.35.2 h1:8Ar7bF+apOIoThw1EdZl0p1oWvMqTHmpA2fRTyZO8io= +google.golang.org/protobuf v1.35.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= diff --git a/collectors/echarging-ocpi/src/main.go b/collectors/echarging-ocpi/src/main.go index bcabc3f..8d1ec4b 100644 --- a/collectors/echarging-ocpi/src/main.go +++ b/collectors/echarging-ocpi/src/main.go @@ -5,43 +5,33 @@ package main import ( "log/slog" - "os" "github.com/gin-contrib/cors" "github.com/gin-gonic/gin" "github.com/kelseyhightower/envconfig" + "github.com/noi-techpark/go-opendatahub-ingest/dc" + "github.com/noi-techpark/go-opendatahub-ingest/mq" + "github.com/noi-techpark/go-opendatahub-ingest/ms" "github.com/rabbitmq/amqp091-go" "github.com/robfig/cron/v3" sloggin "github.com/samber/slog-gin" ) var cfg struct { - RABBITMQ_URI string - RABBITMQ_EXCHANGE string + dc.Env PULL_TOKEN string PULL_LOCATIONS_ENDPOINT string PULL_LOCATIONS_CRON string OCPI_TOKENS []string - - PROVIDER string - LOGLEVEL string `default:"INFO"` } const ver string = "2.2" -func initLogger() { - level := &slog.LevelVar{} - level.UnmarshalText([]byte(cfg.LOGLEVEL)) - slog.SetDefault(slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ - Level: level, - }))) -} - func main() { envconfig.MustProcess("", &cfg) - initLogger() + ms.InitLog(cfg.LOG_LEVEL) mq := connectMq() defer mq.Close() @@ -55,20 +45,20 @@ func main() { select {} } -func connectMq() RabbitC { - rabbit, err := RabbitConnect(cfg.RABBITMQ_URI) +func connectMq() mq.R { + rabbit, err := mq.Connect(cfg.MQ_URI, cfg.MQ_CLIENT) if err != nil { slog.Error("cannot open rabbitmq connection. aborting") panic(err) } - rabbit.OnClose(func(err amqp091.Error) { + rabbit.OnClose(func(err *amqp091.Error) { slog.Error("rabbit connection closed unexpectedly", "err", err) panic(err) }) return rabbit } -func startCron(rabbit RabbitC) { +func startCron(rabbit mq.R) { c := cron.New() // Poll locations endpoint to get all charging stations and their plugs @@ -82,7 +72,7 @@ func startCron(rabbit RabbitC) { c.Start() } -func startEndpoint(rabbit RabbitC) { +func startEndpoint(rabbit mq.R) { r := gin.New() r.Use(gin.Recovery()) r.Use(cors.Default()) diff --git a/collectors/echarging-ocpi/src/mq.go b/collectors/echarging-ocpi/src/mq.go deleted file mode 100644 index 4032f61..0000000 --- a/collectors/echarging-ocpi/src/mq.go +++ /dev/null @@ -1,85 +0,0 @@ -// SPDX-FileCopyrightText: 2024 NOI Techpark -// -// SPDX-License-Identifier: AGPL-3.0-or-later - -package main - -import ( - "encoding/json" - "fmt" - "time" - - amqp "github.com/rabbitmq/amqp091-go" -) - -type mqMsg struct { - Provider string `json:"provider"` - Timestamp time.Time `json:"timestamp"` - Rawdata any `json:"rawdata"` -} - -type RabbitC struct { - Con *amqp.Connection - Ch *amqp.Channel -} - -func (r *RabbitC) Close() { - if r.Ch != nil && !r.Ch.IsClosed() { - _ = r.Ch.Close() - } - if r.Con != nil && !r.Con.IsClosed() { - _ = r.Con.Close() - } -} - -func (r *RabbitC) OnClose(handler func(amqp.Error)) { - r.Con.NotifyClose(func() chan *amqp.Error { - notifyClose := make(chan *amqp.Error) - go func() { - err := <-notifyClose - handler(*err) - }() - return notifyClose - }()) -} - -func RabbitConnect(url string) (RabbitC, error) { - r := RabbitC{} - con, err := amqp.Dial(url) - if err != nil { - return r, err - } - - ch, err := con.Channel() - if err != nil { - return r, err - } - - r.Ch = ch - r.Con = con - - return r, nil -} - -func (r *RabbitC) Publish(msg mqMsg, exchange string) error { - payload, err := json.Marshal(msg) - if err != nil { - return fmt.Errorf("error marshalling message to json: %w", err) - } - - err = r.Ch.Publish( - exchange, // exchange - msg.Provider, // routing key - false, // mandatory - false, // immediate - amqp.Publishing{ - ContentType: "application/json", - Body: payload, - Headers: amqp.Table{"provider": msg.Provider}, - }) - - if err != nil { - return fmt.Errorf("error sending amqp msg: %w", err) - } - return nil -}