Skip to content

Commit

Permalink
Merge pull request #1328 from AbsaOSS/temp-branch
Browse files Browse the repository at this point in the history
Merge release/0.7 to develop
  • Loading branch information
wajda authored Jun 1, 2024
2 parents ab87d35 + 24d471f commit b6fead7
Show file tree
Hide file tree
Showing 8 changed files with 162 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package za.co.absa.spline.producer.rest
import com.fasterxml.jackson.databind.PropertyNamingStrategies
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.twitter.finatra.jackson.FinatraInternalModules
import org.springframework.context.annotation.{Bean, ComponentScan, Configuration}
import org.springframework.context.annotation.{Bean, ComponentScan, Configuration, EnableAspectJAutoProxy}
import org.springframework.web.method.support.HandlerMethodReturnValueHandler
import org.springframework.web.servlet.config.annotation.{EnableWebMvc, WebMvcConfigurer}
import za.co.absa.commons.config.ConfTyped
Expand All @@ -32,6 +32,7 @@ import java.util

@EnableWebMvc
@Configuration
@EnableAspectJAutoProxy(proxyTargetClass = true)
@ComponentScan(basePackageClasses = Array(
classOf[common.webmvc.controller._package],
classOf[controller._package],
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright 2020 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.spline.producer.rest.controller

import org.aspectj.lang.ProceedingJoinPoint
import org.aspectj.lang.annotation.{Around, Aspect, Pointcut}
import org.slf4s.Logging
import org.springframework.stereotype.Component
import org.springframework.web.context.request.{RequestContextHolder, ServletRequestAttributes}
import za.co.absa.spline.producer.model.{ExecutionPlan, v1_1, v1_2}
import za.co.absa.spline.producer.rest.filter.MessageLengthCapturingFilter

@Aspect
@Component
class ExecutionPlansControllerMessageLengthCapturingAspect extends Logging {

@Pointcut("execution(public * za.co.absa.spline.producer.rest.controller.*Controller.*(..))")
def publicControllerMethods(): Unit = {}

@Pointcut("execution(* *(.., za.co.absa.spline.producer.model.ExecutionPlan, ..))")
def acceptsEPv10(): Unit = {}

@Pointcut("execution(* *(.., za.co.absa.spline.producer.model.v1_1.ExecutionPlan, ..))")
def acceptsEPv11(): Unit = {}

@Pointcut("execution(* *(.., za.co.absa.spline.producer.model.v1_2.ExecutionPlan, ..))")
def acceptsEPv12(): Unit = {}

@Around("publicControllerMethods() && (acceptsEPv10() || acceptsEPv11() || acceptsEPv12())")
def aroundAdvice(jp: ProceedingJoinPoint): AnyRef = {
val origArgs = jp.getArgs
val fixedArgs = origArgs.map {
case ep: ExecutionPlan =>
ep.copy(extraInfo = withMessageLengthInfo(ep.extraInfo))
case ep: v1_1.ExecutionPlan =>
ep.copy(extraInfo = withMessageLengthInfo(ep.extraInfo))
case ep: v1_2.ExecutionPlan =>
ep.copy(extraInfo = withMessageLengthInfo(ep.extraInfo))
case x => x
}
jp.proceed(fixedArgs)
}

private def withMessageLengthInfo(m: Map[String, Any]): Map[String, Any] = {
val req = RequestContextHolder.getRequestAttributes.asInstanceOf[ServletRequestAttributes].getRequest
val counters = MessageLengthCapturingFilter.getCounters(req).toArray
m + ("__spline_msg_size" -> counters.map(_.count))
}
}


Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
/*
* Copyright 2020 ABSA Group Limited
*
* Copyright 2023 ABSA Group Limited
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
Expand All @@ -14,13 +13,15 @@
* limitations under the License.
*/

package za.co.absa.spline.gateway.rest.filter
package za.co.absa.spline.producer.rest.filter

import javax.servlet._
import javax.servlet.http.HttpServletRequest
import org.springframework.http.HttpHeaders
import za.co.absa.spline.producer.rest.HttpConstants.Encoding

import java.util.zip.GZIPInputStream
import javax.servlet._
import javax.servlet.http.HttpServletRequest

/**
* Filter for decompressing gziped Http requests
*
Expand All @@ -30,7 +31,7 @@ class GzipFilter extends Filter {
override def doFilter(request: ServletRequest, response: ServletResponse, chain: FilterChain): Unit = {

val newRequest = request match {
case r: HttpServletRequest if isCompressed(r) => new GZIPRequestWrapper(r)
case r: HttpServletRequest if isCompressed(r) => new HttpRequestWrapper(r, new GZIPInputStream(r.getInputStream))
case _ => request
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
/*
* Copyright 2020 ABSA Group Limited
*
* Copyright 2023 ABSA Group Limited
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
Expand All @@ -14,20 +13,19 @@
* limitations under the License.
*/

package za.co.absa.spline.gateway.rest.filter
package za.co.absa.spline.producer.rest.filter

import java.io.{BufferedReader, InputStreamReader}
import java.io.{BufferedReader, InputStream, InputStreamReader}
import javax.servlet.ServletInputStream
import javax.servlet.http.{HttpServletRequest, HttpServletRequestWrapper}


final class GZIPRequestWrapper(val request: HttpServletRequest) extends HttpServletRequestWrapper(request) {

val stream = new GZIPServletInputStream(request.getInputStream)
val reader = new BufferedReader(new InputStreamReader(stream))
final class HttpRequestWrapper(request: HttpServletRequest, stream: InputStream)
extends HttpServletRequestWrapper(request) {

private val reader = new BufferedReader(new InputStreamReader(stream))

override def getInputStream: ServletInputStream = stream
override def getInputStream: ServletInputStream = new ServletInputStreamAdapter(stream)

override def getReader: BufferedReader = reader
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright 2020 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.spline.producer.rest.filter

import za.co.absa.spline.producer.rest.filter.MessageLengthCapturingFilter.{LengthCountingInputStreamWrapper, getCounters}

import java.io.InputStream
import javax.servlet._
import javax.servlet.http.HttpServletRequest
import scala.collection.mutable

class MessageLengthCapturingFilter extends Filter {
override def doFilter(request: ServletRequest, response: ServletResponse, chain: FilterChain): Unit = {
val newRequest = request match {
case r: HttpServletRequest =>
val inputStreamWrapper = new LengthCountingInputStreamWrapper(r.getInputStream)
getCounters(r) += inputStreamWrapper.lengthCounter
new HttpRequestWrapper(r, inputStreamWrapper)
case _ => request
}
chain.doFilter(newRequest, response)
}

override def init(config: FilterConfig): Unit = {
// nothing to do here
}

override def destroy(): Unit = {
// nothing to do here
}
}

object MessageLengthCapturingFilter {
private val CountersRequestAttributeKey: String = s"${classOf[MessageLengthCapturingFilter].getName}.counters"

def getCounters(r: ServletRequest): mutable.Buffer[ReadOnlyCounter] = {
val countersAttrOrNull = r.getAttribute(CountersRequestAttributeKey).asInstanceOf[mutable.Buffer[ReadOnlyCounter]]
val counters = Option(countersAttrOrNull).getOrElse(mutable.Buffer.empty[ReadOnlyCounter])
if (counters.isEmpty) r.setAttribute(CountersRequestAttributeKey, counters)
counters
}

trait ReadOnlyCounter {
def count: Int
}

class LengthCountingInputStreamWrapper(r: InputStream) extends InputStream {
private var _bytesReadCount: Int = 0

val lengthCounter: ReadOnlyCounter = new ReadOnlyCounter {
override def count: Int = _bytesReadCount
}

override def read(): Int = {
_bytesReadCount += 1
r.read()
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
/*
* Copyright 2020 ABSA Group Limited
*
* Copyright 2023 ABSA Group Limited
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
Expand All @@ -14,16 +13,12 @@
* limitations under the License.
*/

package za.co.absa.spline.gateway.rest.filter

import java.util.zip.GZIPInputStream
package za.co.absa.spline.producer.rest.filter

import java.io.InputStream
import javax.servlet.{ReadListener, ServletInputStream}

final class GZIPServletInputStream(val inputStream: ServletInputStream) extends ServletInputStream {

val gzipStream = new GZIPInputStream(inputStream)

final class ServletInputStreamAdapter(val gzipStream: InputStream) extends ServletInputStream {

override def read: Int = gzipStream.read

Expand Down
1 change: 1 addition & 0 deletions rest-gateway/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@
<dependency>org.jboss.logging:jboss-logging</dependency>
<dependency>jakarta.validation:jakarta.validation-api</dependency>
<dependency>jakarta.el:jakarta.el-api</dependency>
<dependency>org.aspectj:aspectjweaver</dependency>
<dependency>org.glassfish:jakarta.el</dependency>
<dependency>com.github.ben-manes.caffeine:caffeine</dependency>
<dependency>com.fasterxml.jackson.datatype:jackson-datatype-joda</dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ import za.co.absa.spline.common.webmvc.cors.PermissiveCorsFilter
import za.co.absa.spline.common.webmvc.diagnostics.{DiagnosticsRESTConfig, RootWebContextConfig}
import za.co.absa.spline.consumer.rest.ConsumerRESTConfig
import za.co.absa.spline.consumer.service.ConsumerServicesConfig
import za.co.absa.spline.gateway.rest.filter.GzipFilter
import za.co.absa.spline.persistence.ArangoRepoConfig
import za.co.absa.spline.producer.rest.ProducerRESTConfig
import za.co.absa.spline.producer.rest.filter.{GzipFilter, MessageLengthCapturingFilter}
import za.co.absa.spline.producer.service.ProducerServicesConfig

import javax.servlet.ServletContext
Expand All @@ -43,7 +43,9 @@ object AppInitializer extends WebApplicationInitializer {
}))

registerFilter[PermissiveCorsFilter](container, "CORSFilter", "/*")
registerFilter[MessageLengthCapturingFilter](container, "MessageSizeCapturingFilter_before_gzip", "/*")
registerFilter[GzipFilter](container, "GzipFilter", "/*")
registerFilter[MessageLengthCapturingFilter](container, "MessageSizeCapturingFilter_after_gzip", "/*")

registerRESTDispatcher[ConsumerRESTConfig](container, "consumer")
registerRESTDispatcher[ProducerRESTConfig](container, "producer")
Expand Down

0 comments on commit b6fead7

Please sign in to comment.