diff --git a/plugins/extractors/http/README.md b/plugins/extractors/http/README.md
index 92f6a9144..2d631a0d0 100644
--- a/plugins/extractors/http/README.md
+++ b/plugins/extractors/http/README.md
@@ -20,10 +20,10 @@ The user specified script has access to the response, if the API call was
 successful, and can use it for constructing and emitting assets using a custom
 script. Currently, [Tengo][tengo] is the only supported script engine.
 
-Refer Tengo documentation for script language syntax and supported functionality
-\- https://github.com/d5/tengo/tree/v2.13.0#references
-. [Tengo standard library modules][tengo-stdlib] can also be imported and used
-if required.
+Refer Tengo documentation for script language syntax and supported
+functionality - https://github.com/d5/tengo/tree/v2.13.0#references.
+[Tengo standard library modules][tengo-stdlib] can also be imported and used if
+required (except the `os` module).
 
 ## Usage
 
@@ -46,6 +46,7 @@ source:
         key: value
       timeout: 5s
     success_codes: [ 200 ]
+    concurrency: 3
     script:
       engine: tengo
       source: |
@@ -56,19 +57,26 @@ source:
 
 ## Inputs
 
-| Key                    | Value               | Example                                | Description                                                                                     | Required? |
-|:-----------------------|:--------------------|:---------------------------------------|:------------------------------------------------------------------------------------------------|:----------|
-| `request.url`          | `string`            | `http://example.com/api/v1/endpoint`   | The HTTP endpoint to send request to                                                            | ✅         |
-| `request.query_params` | `[]{key, value}`    | `[{"key":"s","value":"One Piece"}]`    | The query parameters to be added to the request URL.                                            | ✘         |
-| `request.method`       | `string`            | `GET`/`POST`                           | The HTTP verb/method to use with request. Default is `GET`.                                     | ✘         |
-| `request.headers`      | `map[string]string` | `{"Api-Token": "..."}`                 | Headers to send in the HTTP request.                                                            | ✘         |
-| `request.content_type` | `string`            | `application/json`                     | Content type for encoding request body. Also sent as a header.                                  | ✅         |
-| `request.accept`       | `string`            | `application/json`                     | Sent as the `Accept` header. Also indicates the format to use for decoding.                     | ✅         |
-| `request.body`         | `Object`            | `{"key": "value"}`                     | The request body to be sent.                                                                    | ✘         |
-| `request.timeout`      | `string`            | `1s`                                   | Timeout for the HTTP request. Default is 5s.                                                    | ✘         |
-| `success_codes`        | `[]int`             | `[200]`                                | The list of status codes that would be considered as a successful response. Default is `[200]`. | ✘         |
-| `script.engine`        | `string`            | `tengo`                                | Script engine. Only `"tengo"` is supported currently                                            | ✅         |
-| `script.source`        | `string`            | see [Worked Example](#worked-example). | [Tengo][tengo] script used to map the response into 0 or more assets.                           | ✅         |
+| Key             | Value    | Example                                | Description                                                                                     | Required? |
+|:----------------|:---------|:---------------------------------------|:------------------------------------------------------------------------------------------------|:----------|
+| `request`       | `Object` | see [Request](#request)                | The configuration for constructing and sending HTTP request.                                    | ✅         |
+| `success_codes` | `[]int`  | `[200]`                                | The list of status codes that would be considered as a successful response. Default is `[200]`. | ✘         |
+| `concurrency`   | `int`    | `5`                                    | Number of concurrent child requests to execute. Default is `5`                                  | ✘         |
+| `script.engine` | `string` | `tengo`                                | Script engine. Only `"tengo"` is supported currently                                            | ✅         |
+| `script.source` | `string` | see [Worked Example](#worked-example). | [Tengo][tengo] script used to map the response into 0 or more assets.                           | ✅         |
+
+### Request
+
+| Key            | Value               | Example                              | Description                                                                 | Required? |
+|:---------------|:--------------------|:-------------------------------------|:----------------------------------------------------------------------------|:----------|
+| `url`          | `string`            | `http://example.com/api/v1/endpoint` | The HTTP endpoint to send request to                                        | ✅         |
+| `query_params` | `[]{key, value}`    | `[{"key":"s","value":"One Piece"}]`  | The query parameters to be added to the request URL.                        | ✘         |
+| `method`       | `string`            | `GET`/`POST`                         | The HTTP verb/method to use with request. Default is `GET`.                 | ✘         |
+| `headers`      | `map[string]string` | `{"Api-Token": "..."}`               | Headers to send in the HTTP request.                                        | ✘         |
+| `content_type` | `string`            | `application/json`                   | Content type for encoding request body. Also sent as a header.              | ✅         |
+| `accept`       | `string`            | `application/json`                   | Sent as the `Accept` header. Also indicates the format to use for decoding. | ✅         |
+| `body`         | `Object`            | `{"key": "value"}`                   | The request body to be sent.                                                | ✘         |
+| `timeout`      | `string`            | `1s`                                 | Timeout for the HTTP request. Default is 5s.                                | ✘         |
 
 ### Notes
 
@@ -86,7 +94,24 @@ source:
 
 #### `response`
 
-HTTP response received.
+HTTP response received with the `status_code`, `header` and `body`. Ex:
+
+```json
+{
+  "status_code": "200",
+  "header": {
+    "link": "</products?page=5&perPage=20>;rel=self,</products?page=0&perPage=20>;rel=first,</products?page=4&perPage=20>;rel=previous,</products?page=6&perPage=20>;rel=next,</products?page=26&perPage=20>;rel=last"
+  },
+  "body": [
+    {"id": 1, "name": "Widget #1"},
+    {"id": 2, "name": "Widget #2"},
+    {"id": 3, "name": "Widget #3"}
+  ]
+}
+```
+
+The header names are always in lower case. See
+[Worked Example](#worked-example) for detailed usage.
 
 #### `new_asset(string): Asset`
 
@@ -125,6 +150,60 @@ asset.data.full_name = "Daiyamondo Jozu"
 Takes an asset and emits the asset that can then be consumed by the
 processor/sink.
 
+#### `execute_request(...requests)`
+
+Takes 1 or more requests and executes the requests with the concurrency defined
+in the recipe. The results are returned as an array. Each item in the array can
+be an error or the HTTP response. The request object supports the properties
+defined in the [Request](#request) input section.
+
+When a request is executed, it can fail due to temporary errors such as network
+errors. These instances need to be handled in the script.
+
+[//]: # (@formatter:off)
+
+```go
+if !response.body.success {
+	exit()
+}
+
+reqs := []
+for j in response.body.jobs {
+	reqs = append(reqs, {
+		url: format("http://my.server.com/jobs/%s/config", j.id),
+		method: "GET",
+		content_type: "application/json", 
+		accept: "application/json",
+		timeout: "5s" 
+	})
+}
+
+responses := execute_request(reqs...)
+for r in responses {
+	if is_error(r) {
+		// TODO: Handle it appropriately. The error value has the request and 
+		//  error string:
+		//  r.value.{request, error}
+		continue 
+	}
+	
+	asset := new_asset("job")
+	asset.name = r.body.name
+	exec_cfg := r.body["execution-config"]
+	asset.data.attributes = {
+	  "job_id": r.body.jid,
+	  "job_parallelism": exec_cfg["job-parallelism"],
+	  "config": exec_cfg["user-config"]
+	}
+	emit(asset)
+}
+```
+
+[//]: # (@formatter:on)
+
+If the request passed to the function fails validation, a runtime error is
+thrown.
+
 #### `exit()`
 
 Terminates the script execution.
@@ -200,11 +279,11 @@ source:
     script:
       engine: tengo
       source: |
-        if !response.success {
+        if !response.body.success {
           exit()
         }
 
-        users := response.data
+        users := response.body.data
         for u in users {
           if u.email == "" {
             continue
@@ -229,19 +308,21 @@ source:
             manager_id:           u.manager_id,
             cost_center_id:       u.cost_center_id, 
             supervisory_org_name: u.supervisory_org_name,
-            location_id:          u.location_id
+            location_id:          u.location_id,
+            service_job_id:       response.header["x-job-id"]
           }
           emit(asset)
         }
 ```
 
-This would emit a 'User' asset for each user object in `response.data`.
+This would emit a 'User' asset for each user object in `response.data`. Note 
+that the response headers can be accessed under `response.header` and can be 
+used as needed.
 
 ## Caveats
 
 The following features are currently not supported:
 
-- Pagination.
 - Explicit authentication support, ex: Basic auth/OAuth/OAuth2/JWT etc.
 - Retries with configurable backoff.
 - Content type for request/response body other than `application/json`.
diff --git a/plugins/extractors/http/execute_request.go b/plugins/extractors/http/execute_request.go
index 5bfe6a1c1..e7e441e4f 100644
--- a/plugins/extractors/http/execute_request.go
+++ b/plugins/extractors/http/execute_request.go
@@ -7,31 +7,32 @@ import (
 	"fmt"
 	"io"
 	"net/http"
+	"strings"
 )
 
-func (e *Extractor) executeRequest(ctx context.Context) (interface{}, error) {
-	cfg := e.config
+type executeRequestFunc func(ctx context.Context, reqCfg RequestConfig) (map[string]interface{}, error)
 
-	ctx, cancel := context.WithTimeout(ctx, cfg.Request.Timeout)
-	defer cancel()
+func makeRequestExecutor(successCodes []int, httpClient *http.Client) executeRequestFunc {
+	return func(ctx context.Context, reqCfg RequestConfig) (map[string]interface{}, error) {
+		ctx, cancel := context.WithTimeout(ctx, reqCfg.Timeout)
+		defer cancel()
 
-	req, err := buildRequest(ctx, cfg)
-	if err != nil {
-		return nil, err
-	}
+		req, err := buildRequest(ctx, reqCfg)
+		if err != nil {
+			return nil, err
+		}
 
-	resp, err := e.http.Do(req)
-	defer drainBody(resp)
-	if err != nil {
-		return nil, fmt.Errorf("do request: %w", err)
-	}
+		resp, err := httpClient.Do(req)
+		defer drainBody(resp)
+		if err != nil {
+			return nil, fmt.Errorf("do request: %w", err)
+		}
 
-	return handleResponse(cfg, resp)
+		return handleResponse(successCodes, resp)
+	}
 }
 
-func buildRequest(ctx context.Context, cfg Config) (*http.Request, error) {
-	reqCfg := cfg.Request
-
+func buildRequest(ctx context.Context, reqCfg RequestConfig) (*http.Request, error) {
 	body, err := asReader(reqCfg.Body)
 	if err != nil {
 		return nil, fmt.Errorf("encode request body: %w", err)
@@ -72,17 +73,26 @@ func addQueryParams(req *http.Request, params []QueryParam) {
 	req.URL.RawQuery = q.Encode()
 }
 
-func handleResponse(cfg Config, resp *http.Response) (interface{}, error) {
-	if !has(cfg.SuccessCodes, resp.StatusCode) {
+func handleResponse(successCodes []int, resp *http.Response) (map[string]interface{}, error) {
+	if !has(successCodes, resp.StatusCode) {
 		return nil, fmt.Errorf("unsuccessful request: response status code: %d", resp.StatusCode)
 	}
 
-	var res interface{}
-	if err := json.NewDecoder(resp.Body).Decode(&res); err != nil {
+	h := make(map[string]interface{}, len(resp.Header))
+	for k := range resp.Header {
+		h[strings.ToLower(k)] = resp.Header.Get(k)
+	}
+
+	var body interface{}
+	if err := json.NewDecoder(resp.Body).Decode(&body); err != nil {
 		return nil, fmt.Errorf("decode response: %w", err)
 	}
 
-	return res, nil
+	return map[string]interface{}{
+		"status_code": resp.StatusCode,
+		"header":      h,
+		"body":        body,
+	}, nil
 }
 
 func asReader(v interface{}) (io.Reader, error) {
diff --git a/plugins/extractors/http/execute_script.go b/plugins/extractors/http/execute_script.go
index 8a8e95976..43a9c4e5d 100644
--- a/plugins/extractors/http/execute_script.go
+++ b/plugins/extractors/http/execute_script.go
@@ -4,8 +4,13 @@ import (
 	"context"
 	"errors"
 	"fmt"
+	"reflect"
+	"strings"
+	"sync"
 
 	"github.com/d5/tengo/v2"
+	"github.com/go-playground/validator/v10"
+	"github.com/mcuadros/go-defaults"
 	"github.com/odpf/meteor/models"
 	v1beta2 "github.com/odpf/meteor/models/odpf/assets/v1beta2"
 	"github.com/odpf/meteor/plugins"
@@ -15,9 +20,11 @@ import (
 )
 
 func (e *Extractor) executeScript(ctx context.Context, res interface{}, emit plugins.Emit) error {
-	s := tengoutil.NewSecureScript(([]byte)(e.config.Script.Source))
-	if err := declareGlobals(s, res, emit); err != nil {
-		return fmt.Errorf("declare globals: %w", err)
+	s, err := tengoutil.NewSecureScript(
+		([]byte)(e.config.Script.Source), e.scriptGlobals(ctx, res, emit),
+	)
+	if err != nil {
+		return err
 	}
 
 	c, err := s.Compile()
@@ -32,8 +39,8 @@ func (e *Extractor) executeScript(ctx context.Context, res interface{}, emit plu
 	return nil
 }
 
-func declareGlobals(s *tengo.Script, res interface{}, emit plugins.Emit) error {
-	for name, v := range map[string]interface{}{
+func (e *Extractor) scriptGlobals(ctx context.Context, res interface{}, emit plugins.Emit) map[string]interface{} {
+	return map[string]interface{}{
 		"response": res,
 		"new_asset": &tengo.UserFunction{
 			Name:  "new_asset",
@@ -43,18 +50,17 @@ func declareGlobals(s *tengo.Script, res interface{}, emit plugins.Emit) error {
 			Name:  "emit",
 			Value: emitWrapper(emit),
 		},
+		"execute_request": &tengo.UserFunction{
+			Name:  "execute_request",
+			Value: executeRequestWrapper(ctx, e.config.Concurrency, e.executeRequest),
+		},
 		"exit": &tengo.UserFunction{
 			Name: "exit",
 			Value: func(...tengo.Object) (tengo.Object, error) {
 				return nil, errUserExit
 			},
 		},
-	} {
-		if err := s.Add(name, v); err != nil {
-			return fmt.Errorf("declare script globals: %w", err)
-		}
 	}
-	return nil
 }
 
 func newAssetWrapper() tengo.CallableFunc {
@@ -103,6 +109,120 @@ func emitWrapper(emit plugins.Emit) tengo.CallableFunc {
 	}
 }
 
+func executeRequestWrapper(ctx context.Context, concurrency int, executeRequest executeRequestFunc) tengo.CallableFunc {
+	type job struct {
+		i      int
+		reqCfg RequestConfig
+	}
+	requestsChan := func(ctx context.Context, reqs []RequestConfig) <-chan job {
+		ch := make(chan job)
+
+		go func() {
+			defer close(ch)
+
+			for i, r := range reqs {
+				select {
+				case <-ctx.Done():
+					return
+
+				case ch <- job{i, r}:
+				}
+			}
+		}()
+
+		return ch
+	}
+
+	type result struct {
+		resp interface{}
+		err  error
+	}
+	processJobs := func(ctx context.Context, n int, ch <-chan job) []result {
+		var wg sync.WaitGroup
+		wg.Add(concurrency)
+
+		results := make([]result, n)
+		work := func() {
+			defer wg.Done()
+
+			for {
+				select {
+				case <-ctx.Done():
+					return
+
+				case j, ok := <-ch:
+					if !ok {
+						return
+					}
+
+					resp, err := executeRequest(ctx, j.reqCfg)
+					if err != nil {
+						results[j.i] = result{err: fmt.Errorf("execute request #%d: %w", j.i, err)}
+						continue
+					}
+
+					results[j.i] = result{resp: resp}
+				}
+			}
+		}
+
+		for i := 0; i < concurrency; i++ {
+			go work()
+		}
+
+		wg.Wait()
+		return results
+	}
+
+	validate := validator.New()
+	validate.RegisterTagNameFunc(func(fld reflect.StructField) string {
+		name := strings.SplitN(fld.Tag.Get("mapstructure"), ",", 2)[0]
+		if name == "-" {
+			return ""
+		}
+		return name
+	})
+	return func(args ...tengo.Object) (tengo.Object, error) {
+		if len(args) < 1 {
+			return nil, tengo.ErrWrongNumArguments
+		}
+
+		ctx, cancel := context.WithCancel(ctx)
+		defer cancel()
+
+		reqs, err := argsToRequestConfigs(args, validate)
+		if err != nil {
+			return nil, fmt.Errorf("execute request: %w", err)
+		}
+
+		results := processJobs(ctx, len(reqs), requestsChan(ctx, reqs))
+
+		var ret tengo.Array
+		for i, res := range results {
+			if res.err != nil {
+				ret.Value = append(ret.Value, &tengo.Error{
+					Value: &tengo.Map{
+						Value: map[string]tengo.Object{
+							"request": args[i],
+							"error":   &tengo.String{Value: res.err.Error()},
+						},
+					},
+				})
+				continue
+			}
+
+			o, err := tengo.FromInterface(res.resp)
+			if err != nil {
+				return nil, fmt.Errorf("execute request: translate response: %s: %w", args[i], err)
+			}
+
+			ret.Value = append(ret.Value, o)
+		}
+
+		return &ret, nil
+	}
+}
+
 func newAsset(typeURLs map[string]string, typ string) (tengo.Object, error) {
 	u, ok := typeURLs[typ]
 	if !ok {
@@ -121,6 +241,24 @@ func newAsset(typeURLs map[string]string, typ string) (tengo.Object, error) {
 	}, nil
 }
 
+func argsToRequestConfigs(args []tengo.Object, validate *validator.Validate) ([]RequestConfig, error) {
+	reqs := make([]RequestConfig, 0, len(args))
+	for _, arg := range args {
+		var r RequestConfig
+		defaults.SetDefaults(&r)
+		if err := structmap.AsStructWithTag("mapstructure", tengo.ToInterface(arg), &r); err != nil {
+			return nil, fmt.Errorf("map arg to request config: %s: %w", arg, err)
+		}
+
+		if err := validate.Struct(r); err != nil {
+			return nil, fmt.Errorf("validate request config: %s, %w", arg, err)
+		}
+
+		reqs = append(reqs, r)
+	}
+	return reqs, nil
+}
+
 func knownTypeURLs() map[string]string {
 	typeURLs := make(map[string]string, 12)
 	for _, typ := range []string{
diff --git a/plugins/extractors/http/http_extractor.go b/plugins/extractors/http/http_extractor.go
index 703463d53..185230be5 100644
--- a/plugins/extractors/http/http_extractor.go
+++ b/plugins/extractors/http/http_extractor.go
@@ -30,23 +30,26 @@ var summary string
 
 // Config holds the set of configuration for the HTTP extractor.
 type Config struct {
-	Request struct {
-		URL         string            `mapstructure:"url" validate:"required,url"`
-		QueryParams []QueryParam      `mapstructure:"query_params" validate:"dive"`
-		Method      string            `mapstructure:"method" validate:"oneof=GET POST" default:"GET"`
-		Headers     map[string]string `mapstructure:"headers"`
-		ContentType string            `mapstructure:"content_type" validate:"required,oneof=application/json"`
-		Accept      string            `mapstructure:"accept" validate:"required,oneof=application/json"`
-		Body        interface{}       `mapstructure:"body"`
-		Timeout     time.Duration     `mapstructure:"timeout" validate:"min=1ms" default:"5s"`
-	} `mapstructure:"request"`
-	SuccessCodes []int `mapstructure:"success_codes" validate:"dive,gte=200,lt=300" default:"[200]"`
+	Request      RequestConfig `mapstructure:"request"`
+	SuccessCodes []int         `mapstructure:"success_codes" validate:"dive,gte=200,lt=300" default:"[200]"`
+	Concurrency  int           `mapstructure:"concurrency" validate:"gte=1,lte=100" default:"5"`
 	Script       struct {
 		Engine string `mapstructure:"engine" validate:"required,oneof=tengo"`
 		Source string `mapstructure:"source" validate:"required"`
 	} `mapstructure:"script"`
 }
 
+type RequestConfig struct {
+	URL         string            `mapstructure:"url" validate:"required,url"`
+	QueryParams []QueryParam      `mapstructure:"query_params" validate:"dive"`
+	Method      string            `mapstructure:"method" validate:"oneof=GET POST" default:"GET"`
+	Headers     map[string]string `mapstructure:"headers"`
+	ContentType string            `mapstructure:"content_type" validate:"required,oneof=application/json"`
+	Accept      string            `mapstructure:"accept" validate:"required,oneof=application/json"`
+	Body        interface{}       `mapstructure:"body"`
+	Timeout     time.Duration     `mapstructure:"timeout" validate:"min=1ms" default:"5s"`
+}
+
 type QueryParam struct {
 	Key   string `mapstructure:"key" validate:"required"`
 	Value string `mapstructure:"value" validate:"required"`
@@ -83,9 +86,9 @@ var info = plugins.Info{
 type Extractor struct {
 	plugins.BaseExtractor
 
-	logger log.Logger
-	config Config
-	http   *http.Client
+	logger         log.Logger
+	config         Config
+	executeRequest executeRequestFunc
 }
 
 // New returns a pointer to an initialized Extractor Object
@@ -104,7 +107,7 @@ func (e *Extractor) Init(ctx context.Context, config plugins.Config) error {
 		return err
 	}
 
-	e.http = &http.Client{}
+	e.executeRequest = makeRequestExecutor(e.config.SuccessCodes, &http.Client{})
 	return nil
 }
 
@@ -112,7 +115,7 @@ func (e *Extractor) Init(ctx context.Context, config plugins.Config) error {
 // executes the script. The script has access to the response and can use the
 // same to 'emit' assets from within the script.
 func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) error {
-	res, err := e.executeRequest(ctx)
+	res, err := e.executeRequest(ctx, e.config.Request)
 	if err != nil {
 		return fmt.Errorf("http extractor: execute request: %w", err)
 	}
diff --git a/plugins/extractors/http/http_extractor_test.go b/plugins/extractors/http/http_extractor_test.go
index 2f03b35c3..36bdd05cc 100644
--- a/plugins/extractors/http/http_extractor_test.go
+++ b/plugins/extractors/http/http_extractor_test.go
@@ -290,26 +290,27 @@ func TestExtract(t *testing.T) {
 				"script": map[string]interface{}{
 					"engine": "tengo",
 					"source": heredoc.Doc(`
+						body := response.body
 						asset := new_asset("user")
 						// URN format: "urn:{service}:{scope}:{type}:{id}"
-						asset.urn = format("urn:%s:staging:user:%s", "my_usr_svc", response.employee_id)
-						asset.name = response.fullname
+						asset.urn = format("urn:%s:staging:user:%s", "my_usr_svc", body.employee_id)
+						asset.name = body.fullname
 						asset.service = "my_usr_svc"
 						// asset.type = "user" // not required, new_asset("user") sets the field.
-						asset.data.email = response.work_email
-						asset.data.username = response.employee_id
-						asset.data.first_name = response.legal_first_name
-						asset.data.last_name = response.legal_last_name
-						asset.data.full_name = response.fullname
-						asset.data.display_name = response.fullname
-						asset.data.title = response.business_title
-						asset.data.status = response.terminated == "true" ? "suspended" : "active"
-						asset.data.manager_email = response.manager_email
+						asset.data.email = body.work_email
+						asset.data.username = body.employee_id
+						asset.data.first_name = body.legal_first_name
+						asset.data.last_name = body.legal_last_name
+						asset.data.full_name = body.fullname
+						asset.data.display_name = body.fullname
+						asset.data.title = body.business_title
+						asset.data.status = body.terminated == "true" ? "suspended" : "active"
+						asset.data.manager_email = body.manager_email
 						asset.data.attributes = {
-							manager_id:           response.manager_id,
-							cost_center_id:       response.cost_center_id,
-							supervisory_org_name: response.supervisory_org_name,
-							location_id:          response.location_id
+							manager_id:           body.manager_id,
+							cost_center_id:       body.cost_center_id,
+							supervisory_org_name: body.supervisory_org_name,
+							location_id:          body.location_id
 						}
 						emit(asset)
 					`),
@@ -393,19 +394,20 @@ func TestExtract(t *testing.T) {
 							return upstreams
 						}
 						
-						for ft in response.tables {
+						body := response.body
+						for ft in body.tables {
 							ast := new_asset("feature_table")
 							ast.urn = format(
 								"urn:caramlstore:staging:feature_table:%s.%s",
-								response.project, ft.spec.name
+								body.project, ft.spec.name
 							)
 							ast.name = ft.spec.name
 							ast.service = "caramlstore"
 							ast.type = "feature_table"
 							ast.data = merge(ast.data, {
-								namespace: response.project,
+								namespace: body.project,
 								entities: enum.map(ft.spec.entities, func(i, e){
-									entity := enum.find(response.entities, func(i, entity) {
+									entity := enum.find(body.entities, func(i, entity) {
 										return e == entity.spec.name
 									})
 									return {
@@ -514,6 +516,241 @@ func TestExtract(t *testing.T) {
 				},
 			},
 		},
+		{
+			name: "AdditionalRequestsFromScript",
+			rawCfg: map[string]interface{}{
+				"request": map[string]interface{}{
+					"url":          "{{serverURL}}/jobs",
+					"content_type": "application/json",
+					"accept":       "application/json",
+				},
+				"script": map[string]interface{}{
+					"engine": "tengo",
+					"source": heredoc.Doc(`
+						reqs := []
+						for j in response.body.jobs {
+						  reqs = append(reqs, {
+							url: format("{{serverURL}}/jobs/%s/config", j.id),
+							method: "GET",
+							content_type: "application/json",
+							accept: "application/json",
+							timeout: "5s"
+						  })
+						}
+						  
+						responses := execute_request(reqs...)
+						for r in responses {
+						  if is_error(r) {
+							continue
+						  }
+						  
+						  asset := new_asset("job")
+						  asset.name = r.body.name
+						  exec_cfg := r.body["execution-config"]
+						  asset.data.attributes = {
+							"job_id": r.body.jid,
+							"job_parallelism": exec_cfg["job-parallelism"],
+							"config": exec_cfg["user-config"]
+						  }
+						  emit(asset)
+						}
+					`),
+				},
+			},
+			handler: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
+				switch r.URL.Path {
+				case "/jobs":
+					testutils.Respond(t, w, http.StatusOK,
+						`{"jobs":[{"id":"72b6753ab1984be6a65055b95ea9dd32","status":"RUNNING"},{"id":"3473947d1115c155513014cc6ecbd2fa","status":"RUNNING"},{"id":"9b12cb10b119b957b085c08e49bde3f2","status":"RESTARTING"},{"id":"fc308f1ac8c23b5f5a7942742b253917","status":"RESTARTING"}]}`,
+					)
+				case "/jobs/72b6753ab1984be6a65055b95ea9dd32/config":
+					testutils.Respond(t, w, http.StatusOK,
+						`{"jid":"72b6753ab1984be6a65055b95ea9dd32","name":"data-test-external-voucher-dagger","execution-config":{"execution-mode":"PIPELINED","restart-strategy":"Cluster level default restart strategy","job-parallelism":1,"object-reuse-mode":false,"user-config":{"SINK_KAFKA_TOPIC":"test_external_voucher","FLINK_ROWTIME_ATTRIBUTE_NAME":"rowtime","ENABLE_STENCIL_URL":"true","FLINK_SQL_QUERY":"SELECT member_ids.member_id as customer_id, '96962e7a-cd9e-4fb2-87fe-96091c124de6' as voucher_batch_id, rowtime as event_timestampfrom table1, UNNEST(table1.members) AS member_ids (member_id)where segment_name = 'testdagger' and action = 'ADD_MEMBERS'","SINK_TYPE":"kafka","PROCESSOR_PREPROCESSOR_CONFIG":"","FLINK_WATERMARK_INTERVAL_MS":"60000","FLINK_PARALLELISM":"1","SINK_INFLUX_BATCH_SIZE":"100","PROCESSOR_POSTPROCESSOR_ENABLE":"true","SINK_KAFKA_PROTO_MESSAGE":"com.company.esb.growth.AllocatePromoRequestMessage","PROCESSOR_PREPROCESSOR_ENABLE":"","SINK_INFLUX_MEASUREMENT_NAME":"data-test-external-voucher-dagger","SCHEMA_REGISTRY_STENCIL_ENABLE":"true","SINK_INFLUX_DB_NAME":"DAGGERS_COLLECTIVE","SCHEMA_REGISTRY_STENCIL_URLS":"http://data-systems-stencil.company.io/v1beta1/namespaces/company/schemas/esb/versions/68","SINK_KAFKA_STREAM":"data-dagstream","PROCESSOR_LONGBOW_GCP_INSTANCE_ID":"","PROCESSOR_POSTPROCESSOR_CONFIG":"{\"external_source\":{\"http\":[{\"endpoint\":\"ase1.company.io/internal/v2/voucher/allocate\",\"verb\":\"post\",\"request_pattern\":\"{\\\"voucher_batch_id\\\": \\\"%s\\\",\\\"customer_id\\\": \\\"%s-60\\\"}\",\"request_variables\":\"customer_id,customer_id\",\"stream_timeout\":\"5000\",\"connect_timeout\":\"5000\",\"fail_on_errors\":\"false\",\"capacity\":\"30\",\"headers\":{\"Content-Type\":\"application/json\",\"Accept-Language\":\"en\"},\"type\":\"com.company.esb.growth.AllocatePromoRequestMessage\",\"output_mapping\":{\"voucher_batch_id\":{\"path\":\"$.data.id\"}}}]}}","STREAMS":"[{\"SOURCE_KAFKA_TOPIC_NAMES\":\"segmentation-message\",\"INPUT_SCHEMA_PROTO_CLASS\":\"com.company.esb.segmentation.UpdateLogMessage\",\"INPUT_SCHEMA_TABLE\":\"table1\",\"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_COMMIT_ENABLE\":\"false\",\"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET\":\"latest\",\"SOURCE_KAFKA_CONSUMER_CONFIG_GROUP_ID\":\"data-test-external-voucher-dagger-0001\",\"SOURCE_KAFKA_CONSUMER_CONFIG_BOOTSTRAP_SERVERS\":\"<REDACTED>\",\"INPUT_SCHEMA_EVENT_TIMESTAMP_FIELD_INDEX\":\"3\",\"SOURCE_KAFKA_NAME\":\"data-dagstream\"}]","SINK_INFLUX_FLUSH_DURATION_MS":"1000","SINK_INFLUX_URL":"http://data-dagger-shared-influx.company.io:6798","SINK_KAFKA_BROKERS":"<REDACTED>","FLINK_JOB_ID":"data-test-external-voucher-dagger","FLINK_WATERMARK_DELAY_MS":"1000","SINK_KAFKA_PROTO_KEY":"com.company.esb.growth.AllocatePromoRequestKey"}}}`,
+					)
+				case "/jobs/3473947d1115c155513014cc6ecbd2fa/config":
+					testutils.Respond(t, w, http.StatusOK,
+						`{"jid":"3473947d1115c155513014cc6ecbd2fa","name":"data-booking-map-matching-dagger","execution-config":{"execution-mode":"PIPELINED","restart-strategy":"Cluster level default restart strategy","job-parallelism":1,"object-reuse-mode":false,"user-config":{"SINK_KAFKA_TOPIC":"booking-map-matching-log","FLINK_ROWTIME_ATTRIBUTE_NAME":"rowtime","SINK_BIGQUERY_TABLE_CLUSTERING_ENABLE":"false","SINK_TYPE":"kafka","FLINK_SQL_QUERY":"SELECT driver_id, booking_id, country_code, event_timestamp, booking_status, vehicle_type, driver_locations, ping_processing_driver_locations(driver_locations) as filtered_driver_locations, gh_map_matching_response, polylineFROM data_streams_0","SINK_BIGQUERY_TABLE_PARTITION_EXPIRY_MS":"-1","SINK_CONNECTOR_SCHEMA_PROTO_MESSAGE_CLASS":"","PROCESSOR_PREPROCESSOR_CONFIG":"","FLINK_WATERMARK_INTERVAL_MS":"60000","FLINK_PARALLELISM":"1","SINK_METRICS_APPLICATION_PREFIX":"dagger_","SINK_INFLUX_BATCH_SIZE":"100","SINK_KAFKA_PROTO_MESSAGE":"company.esb.cartography.erp.ERPMapMatchingLogV2Message","SINK_BIGQUERY_CLIENT_READ_TIMEOUT_MS":"-1","SINK_BIGQUERY_DATASET_LABELS":"","SCHEMA_REGISTRY_STENCIL_ENABLE":"true","SINK_INFLUX_DB_NAME":"DAGGERS_COLLECTIVE","SINK_BIGQUERY_TABLE_LABELS":"","SINK_CONNECTOR_SCHEMA_DATA_TYPE":"PROTOBUF","SINK_CONNECTOR_SCHEMA_PROTO_ALLOW_UNKNOWN_FIELDS_ENABLE":"false","SINK_INFLUX_URL":"http://data-dagger-shared-influx.company.io:6798","FLINK_JOB_ID":"data-booking-map-matching-dagger","SINK_KAFKA_BROKERS":"<REDACTED>","PYTHON_UDF_ENABLE":"true","FLINK_WATERMARK_DELAY_MS":"1000","SINK_KAFKA_PROTO_KEY":"company.esb.cartography.erp.ERPMapMatchingLogV2Key","SINK_CONNECTOR_SCHEMA_PROTO_KEY_CLASS":"","SINK_BIGQUERY_BATCH_SIZE":"","SINK_BIGQUERY_METADATA_COLUMNS_TYPES":"","ENABLE_STENCIL_URL":"true","PYTHON_UDF_CONFIG":"{\"PYTHON_FILES\":\"gs://data-dagger-magic/python/master/88.41.13/python_udfs.zip\",\"PYTHON_ARCHIVES\":\"gs://data-dagger-magic/python/master/88.41.13/data.zip\",\"PYTHON_REQUIREMENTS\":\"gs://data-dagger-magic/python/master/88.41.13/requirements.txt\"}","SINK_CONNECTOR_SCHEMA_MESSAGE_MODE":"LOG_MESSAGE","PROCESSOR_POSTPROCESSOR_ENABLE":"true","SINK_BIGQUERY_CLIENT_CONNECT_TIMEOUT_MS":"-1","SINK_INFLUX_MEASUREMENT_NAME":"map-matching-dagger","PROCESSOR_PREPROCESSOR_ENABLE":"false","SINK_BIGQUERY_TABLE_NAME":"","SINK_KAFKA_STREAM":"data-dagstream","SCHEMA_REGISTRY_STENCIL_URLS":"http://data-systems-stencil.company.io/v1beta1/namespaces/company/schemas/esb/versions/590","SINK_BIGQUERY_GOOGLE_CLOUD_PROJECT_ID":"","SINK_BIGQUERY_DATASET_NAME":"","PROCESSOR_LONGBOW_GCP_INSTANCE_ID":"","PROCESSOR_POSTPROCESSOR_CONFIG":"{\"external_source\":{\"http\":[{\"endpoint\":\"http://11.126.1.18:6798/match\",\"verb\":\"post\",\"request_pattern\":\"{\"locations\":%s,\"hints\":{\"vehicle\":\"car\",\"instructions\":false,\"points_encoded\":true,\"gps_accuracy\":50}}\",\"request_variables\":\"filtered_driver_locations\",\"stream_timeout\":\"100000\",\"connect_timeout\":\"10000\",\"fail_on_errors\":\"false\",\"capacity\":\"30\",\"headers\":{\"content-type\":\"application/json\"},\"output_mapping\":{\"gh_map_matching_response\":{\"path\":\"$\"},\"polyline\":{\"path\":\"$.map_matching.edge_geometry_polyline\"}}}]},\"internal_source\":[{\"output_field\":\"driver_id\",\"value\":\"driver_id\",\"type\":\"sql\"},{\"output_field\":\"booking_id\",\"value\":\"booking_id\",\"type\":\"sql\"},{\"output_field\":\"country_code\",\"value\":\"country_code\",\"type\":\"sql\"},{\"output_field\":\"event_timestamp\",\"value\":\"event_timestamp\",\"type\":\"sql\"},{\"output_field\":\"booking_status\",\"value\":\"booking_status\",\"type\":\"sql\"},{\"output_field\":\"vehicle_type\",\"value\":\"vehicle_type\",\"type\":\"sql\"},{\"output_field\":\"driver_locations\",\"value\":\"driver_locations\",\"type\":\"sql\"},{\"output_field\":\"filtered_driver_locations\",\"value\":\"filtered_driver_locations\",\"type\":\"sql\"}]}","SINK_BIGQUERY_TABLE_CLUSTERING_KEYS":"","STREAMS":"[{\"INPUT_SCHEMA_EVENT_TIMESTAMP_FIELD_INDEX\":\"4\",\"INPUT_SCHEMA_PROTO_CLASS\":\"company.esb.cartography.erp.ERPMapMatchingLogV2Message\",\"INPUT_SCHEMA_TABLE\":\"data_streams_0\",\"SOURCE_DETAILS\":[{\"SOURCE_NAME\":\"KAFKA_CONSUMER\",\"SOURCE_TYPE\":\"UNBOUNDED\"}],\"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_COMMIT_ENABLE\":\"false\",\"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET\":\"latest\",\"SOURCE_KAFKA_CONSUMER_CONFIG_BOOTSTRAP_SERVERS\":\"company-mainstream.company.io:6668\",\"SOURCE_KAFKA_CONSUMER_CONFIG_GROUP_ID\":\"data-booking-map-matching-dagger-0002\",\"SOURCE_KAFKA_NAME\":\"company-mainstream\",\"SOURCE_KAFKA_TOPIC_NAMES\":\"aggregated-busy-driver-location-ping\",\"SOURCE_PARQUET_FILE_DATE_RANGE\":null,\"SOURCE_PARQUET_FILE_PATHS\":null}]","SINK_INFLUX_FLUSH_DURATION_MS":"1000","SINK_BIGQUERY_METADATA_NAMESPACE":""}}}`,
+					)
+				case "/jobs/9b12cb10b119b957b085c08e49bde3f2/config":
+					testutils.Respond(t, w, http.StatusOK,
+						`{"jid":"9b12cb10b119b957b085c08e49bde3f2","name":"data-eim-driver-nearby-staging-dagger","execution-config":{"execution-mode":"PIPELINED","restart-strategy":"Cluster level default restart strategy","job-parallelism":1,"object-reuse-mode":false,"user-config":{"SINK_KAFKA_TOPIC":"eim-driver-nearby-log","FLINK_ROWTIME_ATTRIBUTE_NAME":"rowtime","ENABLE_STENCIL_URL":"true","SINK_TYPE":"kafka","FLINK_SQL_QUERY":"SELECT gofood_booking.event_timestamp as event_timestamp, gofood_booking.order_number AS order_number, gofood_booking.service_type AS gofood_booking_message.service_type, gofood_booking.order_number AS gofood_booking_message.order_number, gofood_booking.order_url AS gofood_booking_message.order_url, gofood_booking.status AS gofood_booking_message.status, gofood_booking.event_timestamp AS gofood_booking_message.event_timestamp, gofood_booking.customer_id AS gofood_booking_message.customer_id, gofood_booking.customer_url AS gofood_booking_message.customer_url, gofood_booking.driver_id AS gofood_booking_message.driver_id, gofood_booking.driver_url AS gofood_booking_message.driver_url, gofood_booking.is_reblast AS gofood_booking_message.is_reblast, gofood_booking.activity_source AS gofood_booking_message.activity_source, gofood_booking.service_area_id AS gofood_booking_message.service_area_id, gofood_booking.payment_type AS gofood_booking_message.payment_type, gofood_booking.total_unsubsidised_price AS gofood_booking_message.total_unsubsidised_price, gofood_booking.customer_price AS gofood_booking_message.customer_price, gofood_booking.amount_paid_by_cash AS gofood_booking_message.amount_paid_by_cash, gofood_booking.amount_paid_by_credits AS gofood_booking_message.amount_paid_by_credits, gofood_booking.surcharge_amount AS gofood_booking_message.surcharge_amount, gofood_booking.tip_amount AS gofood_booking_message.tip_amount, gofood_booking.driver_cut_amount AS gofood_booking_message.driver_cut_amount, gofood_booking.requested_payment_type AS gofood_booking_message.requested_payment_type, gofood_booking.total_distance_in_kms AS gofood_booking_message.total_distance_in_kms, gofood_booking.route_polyline AS gofood_booking_message.route_polyline, gofood_booking.routes AS gofood_booking_message.routes, gofood_booking.driver_eta_pickup AS gofood_booking_message.driver_eta_pickup, gofood_booking.driver_eta_dropoff AS gofood_booking_message.driver_eta_dropoff, gofood_booking.driver_pickup_location AS gofood_booking_message.driver_pickup_location, gofood_booking.driver_dropoff_location AS gofood_booking_message.driver_dropoff_location, gofood_booking.customer_email AS gofood_booking_message.customer_email, gofood_booking.customer_name AS gofood_booking_message.customer_name, gofood_booking.customer_phone AS gofood_booking_message.customer_phone, gofood_booking.driver_email AS gofood_booking_message.driver_email, gofood_booking.driver_name AS gofood_booking_message.driver_name, gofood_booking.driver_phone AS gofood_booking_message.driver_phone, gofood_booking.driver_phone2 AS gofood_booking_message.driver_phone2, gofood_booking.driver_phone3 AS gofood_booking_message.driver_phone3, gofood_booking.cancel_reason_id AS gofood_booking_message.cancel_reason_id, gofood_booking.cancel_reason_description AS gofood_booking_message.cancel_reason_description, gofood_booking.cancel_source AS gofood_booking_message.cancel_source, gofood_booking.customer_type AS gofood_booking_message.customer_type, gofood_booking.cancel_owner AS gofood_booking_message.cancel_owner, gofood_booking.booking_creation_time AS gofood_booking_message.booking_creation_time, gofood_booking.total_customer_discount AS gofood_booking_message.total_customer_discount, gofood_booking.gopay_customer_discount AS gofood_booking_message.gopay_customer_discount, gofood_booking.voucher_customer_discount AS gofood_booking_message.voucher_customer_discount, gofood_booking.pickup_time AS gofood_booking_message.pickup_time, gofood_booking.driver_paid_in_cash AS gofood_booking_message.driver_paid_in_cash, gofood_booking.driver_paid_in_credit AS gofood_booking_message.driver_paid_in_credit, gofood_booking.receiver_name AS gofood_booking_message.receiver_name, gofood_booking.driver_photo_url AS gofood_booking_message.driver_photo_url, gofood_booking.previous_booking_status AS gofood_booking_message.previous_booking_status, gofood_booking.vehicle_type AS gofood_booking_message.vehicle_type, gofood_booking.customer_total_fare_without_surge AS gofood_booking_message.customer_total_fare_without_surge, gofood_booking.customer_surge_factor AS gofood_booking_message.customer_surge_factor, gofood_booking.customer_dynamic_surge AS gofood_booking_message.customer_dynamic_surge, gofood_booking.customer_dynamic_surge_enabled AS gofood_booking_message.customer_dynamic_surge_enabled, gofood_booking.driver_total_fare_without_surge AS gofood_booking_message.driver_total_fare_without_surge, gofood_booking.driver_surge_factor AS gofood_booking_message.driver_surge_factor, gofood_booking.driver_dynamic_surge AS gofood_booking_message.driver_dynamic_surge, gofood_booking.driver_dynamic_surge_enabled AS gofood_booking_message.driver_dynamic_surge_enabled, gofood_booking.driver_ata_pickup AS gofood_booking_message.driver_ata_pickup, gofood_booking.driver_ata_dropoff AS gofood_booking_message.driver_ata_dropoff, gofood_booking.gcm_key AS gofood_booking_message.gcm_key, gofood_booking.device_token AS gofood_booking_message.device_token, gofood_booking.pricing_service_id AS gofood_booking_message.pricing_service_id, gofood_booking.payment_invoice_number AS gofood_booking_message.payment_invoice_number, gofood_booking.pricing_currency AS gofood_booking_message.pricing_currency, gofood_booking.country_code AS gofood_booking_message.country_code, gofood_booking.service_area_tzname AS gofood_booking_message.service_area_tzname, gofood_booking.payment_option_type AS gofood_booking_message.payment_option_type, gofood_booking.payment_option_metadata AS gofood_booking_message.payment_option_metadata, gofood_booking.payment_option_name AS gofood_booking_message.payment_option_name, gofood_booking.booking_info AS gofood_booking_message.booking_info, gofood_booking.price_edit_reason AS gofood_booking_message.price_edit_reason, gofood_booking.driver_arrived_location AS gofood_booking_message.driver_arrived_location, gofood_booking.driver_arrived_time AS gofood_booking_message.driver_arrived_time, gofood_booking.driver_order_placed_location AS gofood_booking_message.driver_order_placed_location, gofood_booking.driver_order_placed_time AS gofood_booking_message.driver_order_placed_time, gofood_booking.merchant_accepted_time AS gofood_booking_message.merchant_accepted_time, gofood_booking.merchant_acknowledged_time AS gofood_booking_message.merchant_acknowledged_time, gofood_booking.merchant_received_time AS gofood_booking_message.merchant_received_time, gofood_booking.merchant_cancel_reason AS gofood_booking_message.merchant_cancel_reason, gofood_booking.merchant_cancel_time AS gofood_booking_message.merchant_cancel_time, gofood_booking.merchant_cancel_description AS gofood_booking_message.merchant_cancel_description, gofood_booking.takeaway_charges AS gofood_booking_message.takeaway_charges, gofood_booking.food_prepared_time AS gofood_booking_message.food_prepared_time, gofood_booking.cancel_reason_code AS gofood_booking_message.cancel_reason_code, gofood_booking.verification_requested_time AS gofood_booking_message.verification_requested_time, gofood_booking.customer_pickup_time AS gofood_booking_message.customer_pickup_time, gofood_booking.customer_pickup_location AS gofood_booking_message.customer_pickup_location, gofood_booking.verification_failed_time AS gofood_booking_message.verification_failed_time, gofood_booking.verification_requested_location AS gofood_booking_message.verification_requested_location, gofood_booking.convenience_fee AS gofood_booking_message.convenience_fee, gofood_booking.payment_actions AS gofood_booking_message.payment_actions, gofood_booking.marketplace_serviceability_log_id AS gofood_booking_message.marketplace_serviceability_log_id, gofood_booking.order_completion_time AS gofood_booking_message.order_completion_time, gofood_booking.restaurant_id AS gofood_booking_message.restaurant_id, gofood_booking.sub_status AS gofood_booking_message.sub_status, gofood_booking.shopping_price AS gofood_booking_message.shopping_price, gofood_booking.commission_price AS gofood_booking_message.commission_price, gofood_booking.withholding_income_tax AS gofood_booking_message.withholding_income_tax, gofood_booking.voucher_redeemed_value AS gofood_booking_message.voucher_redeemed_value, gofood_booking.voucher_commission AS gofood_booking_message.voucher_commission, gofood_booking.voucher_id AS gofood_booking_message.voucher_id, gofood_booking.voucher_title AS gofood_booking_message.voucher_title, gofood_booking.otp AS gofood_booking_message.otp, gofood_booking.driver_entered_price AS gofood_booking_message.driver_entered_price, gofood_booking.driver_wallet_id AS gofood_booking_message.driver_wallet_id, gofood_booking.saudagar_id AS gofood_booking_message.saudagar_id, gofood_booking.validated_at AS gofood_booking_message.validated_at, gofood_booking.merchant_wallet_id AS gofood_booking_message.merchant_wallet_id, gofood_booking.restaurant_uuid AS gofood_booking_message.restaurant_uuid, gofood_booking.search_id AS gofood_booking_message.search_id, gofood_booking.gopay_driver_reservation_id AS gofood_booking_message.gopay_driver_reservation_id, gofood_booking.voucher_batch_id AS gofood_booking_message.voucher_batch_id, gofood_booking.merchant_config AS gofood_booking_message.merchant_config, gofood_booking.brand_id AS gofood_booking_message.brand_id, gofood_booking.customer_wallet_id AS gofood_booking_message.customer_wallet_id, gofood_booking.customer_payment_details AS gofood_booking_message.customer_payment_details, gofood_booking.merchant_phone AS gofood_booking_message.merchant_phone, gofood_booking.previous_sub_status AS gofood_booking_message.previous_sub_status, gofood_booking.merchant_acceptance_deadline AS gofood_booking_message.merchant_acceptance_deadline, gofood_booking.inapplicable_voucher_id AS gofood_booking_message.inapplicable_voucher_id, gofood_booking.driver_fee_adjustments AS gofood_booking_message.driver_fee_adjustments, gofood_booking.receipt_url AS gofood_booking_message.receipt_url, gofood_booking.is_goresto AS gofood_booking_message.is_goresto, gofood_booking.shopping_items AS gofood_booking_message.shopping_items, gofood_booking.has_promo AS gofood_booking_message.has_promo, gofood_booking.analytics AS gofood_booking_message.analytics, gofood_booking.fraud_reason AS gofood_booking_message.fraud_reason, gofood_booking.driver_completion_time AS gofood_booking_message.driver_completion_time, gofood_booking.use_service_wallet_fund_flow AS gofood_booking_message.use_service_wallet_fund_flow, gofood_booking.campaign_discounts AS gofood_booking_message.campaign_discounts, gofood_booking.tracer_bullet AS gofood_booking_message.tracer_bullet, gofood_booking.bid_delay_in_seconds AS gofood_booking_message.bid_delay_in_seconds, gofood_booking.driver_gopay_account_id AS gofood_booking_message.driver_gopay_account_id, gofood_booking.use_gopay_v3 AS gofood_booking_message.use_gopay_v3, gofood_booking.experiments AS gofood_booking_message.experiments, gofood_booking.eta_in_minutes AS gofood_booking_message.eta_in_minutes, gofood_booking.eta_source AS gofood_booking_message.eta_source, gofood_booking.eta_performance_bucket AS gofood_booking_message.eta_performance_bucket, gofood_booking.payment_method AS gofood_booking_message.payment_method, gofood_booking.vehicle_info AS gofood_booking_message.vehicle_info, gofood_booking.actor AS gofood_booking_message.actor, gofood_booking.vehicle_tags AS gofood_booking_message.vehicle_tags, gofood_booking.goclub AS gofood_booking_message.goclub, gofood_nearby.event_timestamp AS gofood_nearby_message.event_timestamp, gofood_nearby.order_number AS gofood_nearby_message.order_number, gofood_nearby.driver_id AS gofood_nearby_message.driver_id, gofood_nearby.customer_id AS gofood_nearby_message.customer_id, gofood_nearby.restaurant_uuid AS gofood_nearby_message.restaurant_uuid, gofood_nearby.saudagar_id AS gofood_nearby_message.saudagar_id, gofood_nearby.type AS gofood_nearby_message.type, gofood_nearby.radius AS gofood_nearby_message.radius, gofood_nearby.actor AS gofood_nearby_message.actorFROM gofood_nearby JOIN gofood_booking ON gofood_nearby.order_number = gofood_booking.order_number AND gofood_nearby.driver_id = gofood_booking.driver_id AND gofood_nearby.customer_id = gofood_booking.customer_id AND gofood_nearby.restaurant_uuid = gofood_booking.restaurant_uuid AND gofood_nearby.rowtime BETWEEN (gofood_booking.rowtime - INTERVAL '10' MINUTE) AND (gofood_booking.rowtime + INTERVAL '40' MINUTE) AND gofood_booking.sub_status = 'OTW_PICKUP' AND gofood_nearby.type = 'PICKUP' AND gofood_nearby.actor = 'DRIVER'","PYTHON_UDF_CONFIG":"{\"PYTHON_FILES\": \"gs://data-dagger/python/master/latest/python_udfs.zip\",\"PYTHON_REQUIREMENTS\": \"gs://data-dagger/python/master/latest/requirements.txt\",\"PYTHON_ARCHIVES\": \"gs://data-dagger/python/master/latest/data.zip#data\",\"PYTHON_FN_EXECUTION_ARROW_BATCH_SIZE\": \"10000\",\"PYTHON_FN_EXECUTION_BUNDLE_SIZE\": \"100000\",\"PYTHON_FN_EXECUTION_BUNDLE_TIME\": \"1000\"}","PROCESSOR_PREPROCESSOR_CONFIG":"","FLINK_WATERMARK_INTERVAL_MS":"30000","FLINK_PARALLELISM":"1","SINK_INFLUX_BATCH_SIZE":"100","PROCESSOR_POSTPROCESSOR_ENABLE":"false","SINK_KAFKA_PROTO_MESSAGE":"company.esb.gomerchants.eim.EimDriverNearbyEventMessage","SINK_INFLUX_MEASUREMENT_NAME":"data-eim-driver-nearby-staging-dagger","PROCESSOR_PREPROCESSOR_ENABLE":"","SCHEMA_REGISTRY_STENCIL_ENABLE":"true","SINK_INFLUX_DB_NAME":"DAGGERS_COLLECTIVE","SINK_KAFKA_STREAM":"data-dagstream","SCHEMA_REGISTRY_STENCIL_URLS":"http://data-systems-stencil.company.io/v1beta1/namespaces/company/schemas/esb/versions/72","PROCESSOR_LONGBOW_GCP_INSTANCE_ID":"","PROCESSOR_POSTPROCESSOR_CONFIG":"{}","STREAMS":"[{\"INPUT_SCHEMA_EVENT_TIMESTAMP_FIELD_INDEX\":\"5\",\"INPUT_SCHEMA_PROTO_CLASS\":\"company.esb.booking.GoFoodBookingLogMessage\",\"INPUT_SCHEMA_TABLE\":\"gofood_booking\",\"SOURCE_DETAILS\":[{\"SOURCE_NAME\":\"KAFKA_CONSUMER\",\"SOURCE_TYPE\":\"UNBOUNDED\"}],\"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_COMMIT_ENABLE\":\"false\",\"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET\":\"latest\",\"SOURCE_KAFKA_CONSUMER_CONFIG_BOOTSTRAP_SERVERS\":\"<REDACTED>\",\"SOURCE_KAFKA_CONSUMER_CONFIG_GROUP_ID\":\"data-eim-driver-nearby-staging-dagger-0039\",\"SOURCE_KAFKA_NAME\":\"company-mainstream\",\"SOURCE_KAFKA_TOPIC_NAMES\":\"gofood-booking-log\"},{\"INPUT_SCHEMA_EVENT_TIMESTAMP_FIELD_INDEX\":\"1\",\"INPUT_SCHEMA_PROTO_CLASS\":\"company.esb.gofood.NearbyEventMessage\",\"INPUT_SCHEMA_TABLE\":\"gofood_nearby\",\"SOURCE_DETAILS\":[{\"SOURCE_NAME\":\"KAFKA_CONSUMER\",\"SOURCE_TYPE\":\"UNBOUNDED\"}],\"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_COMMIT_ENABLE\":\"false\",\"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET\":\"latest\",\"SOURCE_KAFKA_CONSUMER_CONFIG_BOOTSTRAP_SERVERS\":\"<REDACTED>\",\"SOURCE_KAFKA_CONSUMER_CONFIG_GROUP_ID\":\"data-eim-driver-nearby-staging-dagger-0040\",\"SOURCE_KAFKA_NAME\":\"company-mainstream\",\"SOURCE_KAFKA_TOPIC_NAMES\":\"gofood-nearby-log\"}]","SINK_INFLUX_FLUSH_DURATION_MS":"1000","SINK_INFLUX_URL":"http://data-dagger-shared-influx.company.io:6798","FLINK_JOB_ID":"data-eim-driver-nearby-staging-dagger","SINK_KAFKA_BROKERS":"<REDACTED>","PYTHON_UDF_ENABLE":"false","FLINK_WATERMARK_DELAY_MS":"1000","SINK_KAFKA_PROTO_KEY":"company.esb.gomerchants.eim.EimDriverNearbyEventKey"}}}`,
+					)
+				case "/jobs/fc308f1ac8c23b5f5a7942742b253917/config":
+					testutils.Respond(t, w, http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError))
+				default:
+					t.Error("Unexpected HTTP call on", r.URL.Path)
+				}
+			},
+			expected: []*v1beta2.Asset{
+				{
+					Name: "data-test-external-voucher-dagger",
+					Type: "job",
+					Data: testutils.BuildAny(t, &v1beta2.Job{
+						Attributes: &structpb.Struct{Fields: map[string]*structpb.Value{
+							"job_id":          structpb.NewStringValue("72b6753ab1984be6a65055b95ea9dd32"),
+							"job_parallelism": structpb.NewNumberValue(1),
+							"config": structpb.NewStructValue(&structpb.Struct{Fields: map[string]*structpb.Value{
+								"ENABLE_STENCIL_URL":                structpb.NewStringValue("true"),
+								"FLINK_JOB_ID":                      structpb.NewStringValue("data-test-external-voucher-dagger"),
+								"FLINK_PARALLELISM":                 structpb.NewStringValue("1"),
+								"FLINK_ROWTIME_ATTRIBUTE_NAME":      structpb.NewStringValue("rowtime"),
+								"FLINK_SQL_QUERY":                   structpb.NewStringValue("SELECT member_ids.member_id as customer_id, '96962e7a-cd9e-4fb2-87fe-96091c124de6' as voucher_batch_id, rowtime as event_timestampfrom table1, UNNEST(table1.members) AS member_ids (member_id)where segment_name = 'testdagger' and action = 'ADD_MEMBERS'"),
+								"FLINK_WATERMARK_DELAY_MS":          structpb.NewStringValue("1000"),
+								"FLINK_WATERMARK_INTERVAL_MS":       structpb.NewStringValue("60000"),
+								"PROCESSOR_LONGBOW_GCP_INSTANCE_ID": structpb.NewStringValue(""),
+								"PROCESSOR_POSTPROCESSOR_CONFIG":    structpb.NewStringValue(`{"external_source":{"http":[{"endpoint":"ase1.company.io/internal/v2/voucher/allocate","verb":"post","request_pattern":"{\"voucher_batch_id\": \"%s\",\"customer_id\": \"%s-60\"}","request_variables":"customer_id,customer_id","stream_timeout":"5000","connect_timeout":"5000","fail_on_errors":"false","capacity":"30","headers":{"Content-Type":"application/json","Accept-Language":"en"},"type":"com.company.esb.growth.AllocatePromoRequestMessage","output_mapping":{"voucher_batch_id":{"path":"$.data.id"}}}]}}`),
+								"PROCESSOR_POSTPROCESSOR_ENABLE":    structpb.NewStringValue("true"),
+								"PROCESSOR_PREPROCESSOR_CONFIG":     structpb.NewStringValue(""),
+								"PROCESSOR_PREPROCESSOR_ENABLE":     structpb.NewStringValue(""),
+								"SCHEMA_REGISTRY_STENCIL_ENABLE":    structpb.NewStringValue("true"),
+								"SCHEMA_REGISTRY_STENCIL_URLS":      structpb.NewStringValue("http://data-systems-stencil.company.io/v1beta1/namespaces/company/schemas/esb/versions/68"),
+								"SINK_INFLUX_BATCH_SIZE":            structpb.NewStringValue("100"),
+								"SINK_INFLUX_DB_NAME":               structpb.NewStringValue("DAGGERS_COLLECTIVE"),
+								"SINK_INFLUX_FLUSH_DURATION_MS":     structpb.NewStringValue("1000"),
+								"SINK_INFLUX_MEASUREMENT_NAME":      structpb.NewStringValue("data-test-external-voucher-dagger"),
+								"SINK_INFLUX_URL":                   structpb.NewStringValue("http://data-dagger-shared-influx.company.io:6798"),
+								"SINK_KAFKA_BROKERS":                structpb.NewStringValue("<REDACTED>"),
+								"SINK_KAFKA_PROTO_KEY":              structpb.NewStringValue("com.company.esb.growth.AllocatePromoRequestKey"),
+								"SINK_KAFKA_PROTO_MESSAGE":          structpb.NewStringValue("com.company.esb.growth.AllocatePromoRequestMessage"),
+								"SINK_KAFKA_STREAM":                 structpb.NewStringValue("data-dagstream"),
+								"SINK_KAFKA_TOPIC":                  structpb.NewStringValue("test_external_voucher"),
+								"SINK_TYPE":                         structpb.NewStringValue("kafka"),
+								"STREAMS":                           structpb.NewStringValue(`[{"SOURCE_KAFKA_TOPIC_NAMES":"segmentation-message","INPUT_SCHEMA_PROTO_CLASS":"com.company.esb.segmentation.UpdateLogMessage","INPUT_SCHEMA_TABLE":"table1","SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_COMMIT_ENABLE":"false","SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET":"latest","SOURCE_KAFKA_CONSUMER_CONFIG_GROUP_ID":"data-test-external-voucher-dagger-0001","SOURCE_KAFKA_CONSUMER_CONFIG_BOOTSTRAP_SERVERS":"<REDACTED>","INPUT_SCHEMA_EVENT_TIMESTAMP_FIELD_INDEX":"3","SOURCE_KAFKA_NAME":"data-dagstream"}]`),
+							}}),
+						}},
+					}),
+				},
+				{
+					Name: "data-booking-map-matching-dagger",
+					Type: "job",
+					Data: testutils.BuildAny(t, &v1beta2.Job{
+						Attributes: &structpb.Struct{Fields: map[string]*structpb.Value{
+							"job_id":          structpb.NewStringValue("3473947d1115c155513014cc6ecbd2fa"),
+							"job_parallelism": structpb.NewNumberValue(1),
+							"config": structpb.NewStructValue(&structpb.Struct{Fields: map[string]*structpb.Value{
+								"ENABLE_STENCIL_URL":                                      structpb.NewStringValue("true"),
+								"FLINK_JOB_ID":                                            structpb.NewStringValue("data-booking-map-matching-dagger"),
+								"FLINK_PARALLELISM":                                       structpb.NewStringValue("1"),
+								"FLINK_ROWTIME_ATTRIBUTE_NAME":                            structpb.NewStringValue("rowtime"),
+								"FLINK_SQL_QUERY":                                         structpb.NewStringValue("SELECT driver_id, booking_id, country_code, event_timestamp, booking_status, vehicle_type, driver_locations, ping_processing_driver_locations(driver_locations) as filtered_driver_locations, gh_map_matching_response, polylineFROM data_streams_0"),
+								"FLINK_WATERMARK_DELAY_MS":                                structpb.NewStringValue("1000"),
+								"FLINK_WATERMARK_INTERVAL_MS":                             structpb.NewStringValue("60000"),
+								"PROCESSOR_LONGBOW_GCP_INSTANCE_ID":                       structpb.NewStringValue(""),
+								"PROCESSOR_POSTPROCESSOR_CONFIG":                          structpb.NewStringValue(`{"external_source":{"http":[{"endpoint":"http://11.126.1.18:6798/match","verb":"post","request_pattern":"{"locations":%s,"hints":{"vehicle":"car","instructions":false,"points_encoded":true,"gps_accuracy":50}}","request_variables":"filtered_driver_locations","stream_timeout":"100000","connect_timeout":"10000","fail_on_errors":"false","capacity":"30","headers":{"content-type":"application/json"},"output_mapping":{"gh_map_matching_response":{"path":"$"},"polyline":{"path":"$.map_matching.edge_geometry_polyline"}}}]},"internal_source":[{"output_field":"driver_id","value":"driver_id","type":"sql"},{"output_field":"booking_id","value":"booking_id","type":"sql"},{"output_field":"country_code","value":"country_code","type":"sql"},{"output_field":"event_timestamp","value":"event_timestamp","type":"sql"},{"output_field":"booking_status","value":"booking_status","type":"sql"},{"output_field":"vehicle_type","value":"vehicle_type","type":"sql"},{"output_field":"driver_locations","value":"driver_locations","type":"sql"},{"output_field":"filtered_driver_locations","value":"filtered_driver_locations","type":"sql"}]}`),
+								"PROCESSOR_POSTPROCESSOR_ENABLE":                          structpb.NewStringValue("true"),
+								"PROCESSOR_PREPROCESSOR_CONFIG":                           structpb.NewStringValue(""),
+								"PROCESSOR_PREPROCESSOR_ENABLE":                           structpb.NewStringValue("false"),
+								"PYTHON_UDF_CONFIG":                                       structpb.NewStringValue(`{"PYTHON_FILES":"gs://data-dagger-magic/python/master/88.41.13/python_udfs.zip","PYTHON_ARCHIVES":"gs://data-dagger-magic/python/master/88.41.13/data.zip","PYTHON_REQUIREMENTS":"gs://data-dagger-magic/python/master/88.41.13/requirements.txt"}`),
+								"PYTHON_UDF_ENABLE":                                       structpb.NewStringValue("true"),
+								"SCHEMA_REGISTRY_STENCIL_ENABLE":                          structpb.NewStringValue("true"),
+								"SCHEMA_REGISTRY_STENCIL_URLS":                            structpb.NewStringValue("http://data-systems-stencil.company.io/v1beta1/namespaces/company/schemas/esb/versions/590"),
+								"SINK_BIGQUERY_BATCH_SIZE":                                structpb.NewStringValue(""),
+								"SINK_BIGQUERY_CLIENT_CONNECT_TIMEOUT_MS":                 structpb.NewStringValue("-1"),
+								"SINK_BIGQUERY_CLIENT_READ_TIMEOUT_MS":                    structpb.NewStringValue("-1"),
+								"SINK_BIGQUERY_DATASET_LABELS":                            structpb.NewStringValue(""),
+								"SINK_BIGQUERY_DATASET_NAME":                              structpb.NewStringValue(""),
+								"SINK_BIGQUERY_GOOGLE_CLOUD_PROJECT_ID":                   structpb.NewStringValue(""),
+								"SINK_BIGQUERY_METADATA_COLUMNS_TYPES":                    structpb.NewStringValue(""),
+								"SINK_BIGQUERY_METADATA_NAMESPACE":                        structpb.NewStringValue(""),
+								"SINK_BIGQUERY_TABLE_CLUSTERING_ENABLE":                   structpb.NewStringValue("false"),
+								"SINK_BIGQUERY_TABLE_CLUSTERING_KEYS":                     structpb.NewStringValue(""),
+								"SINK_BIGQUERY_TABLE_LABELS":                              structpb.NewStringValue(""),
+								"SINK_BIGQUERY_TABLE_NAME":                                structpb.NewStringValue(""),
+								"SINK_BIGQUERY_TABLE_PARTITION_EXPIRY_MS":                 structpb.NewStringValue("-1"),
+								"SINK_CONNECTOR_SCHEMA_DATA_TYPE":                         structpb.NewStringValue("PROTOBUF"),
+								"SINK_CONNECTOR_SCHEMA_MESSAGE_MODE":                      structpb.NewStringValue("LOG_MESSAGE"),
+								"SINK_CONNECTOR_SCHEMA_PROTO_ALLOW_UNKNOWN_FIELDS_ENABLE": structpb.NewStringValue("false"),
+								"SINK_CONNECTOR_SCHEMA_PROTO_KEY_CLASS":                   structpb.NewStringValue(""),
+								"SINK_CONNECTOR_SCHEMA_PROTO_MESSAGE_CLASS":               structpb.NewStringValue(""),
+								"SINK_INFLUX_BATCH_SIZE":                                  structpb.NewStringValue("100"),
+								"SINK_INFLUX_DB_NAME":                                     structpb.NewStringValue("DAGGERS_COLLECTIVE"),
+								"SINK_INFLUX_FLUSH_DURATION_MS":                           structpb.NewStringValue("1000"),
+								"SINK_INFLUX_MEASUREMENT_NAME":                            structpb.NewStringValue("map-matching-dagger"),
+								"SINK_INFLUX_URL":                                         structpb.NewStringValue("http://data-dagger-shared-influx.company.io:6798"),
+								"SINK_KAFKA_BROKERS":                                      structpb.NewStringValue("<REDACTED>"),
+								"SINK_KAFKA_PROTO_KEY":                                    structpb.NewStringValue("company.esb.cartography.erp.ERPMapMatchingLogV2Key"),
+								"SINK_KAFKA_PROTO_MESSAGE":                                structpb.NewStringValue("company.esb.cartography.erp.ERPMapMatchingLogV2Message"),
+								"SINK_KAFKA_STREAM":                                       structpb.NewStringValue("data-dagstream"),
+								"SINK_KAFKA_TOPIC":                                        structpb.NewStringValue("booking-map-matching-log"),
+								"SINK_METRICS_APPLICATION_PREFIX":                         structpb.NewStringValue("dagger_"),
+								"SINK_TYPE":                                               structpb.NewStringValue("kafka"),
+								"STREAMS":                                                 structpb.NewStringValue(`[{"INPUT_SCHEMA_EVENT_TIMESTAMP_FIELD_INDEX":"4","INPUT_SCHEMA_PROTO_CLASS":"company.esb.cartography.erp.ERPMapMatchingLogV2Message","INPUT_SCHEMA_TABLE":"data_streams_0","SOURCE_DETAILS":[{"SOURCE_NAME":"KAFKA_CONSUMER","SOURCE_TYPE":"UNBOUNDED"}],"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_COMMIT_ENABLE":"false","SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET":"latest","SOURCE_KAFKA_CONSUMER_CONFIG_BOOTSTRAP_SERVERS":"company-mainstream.company.io:6668","SOURCE_KAFKA_CONSUMER_CONFIG_GROUP_ID":"data-booking-map-matching-dagger-0002","SOURCE_KAFKA_NAME":"company-mainstream","SOURCE_KAFKA_TOPIC_NAMES":"aggregated-busy-driver-location-ping","SOURCE_PARQUET_FILE_DATE_RANGE":null,"SOURCE_PARQUET_FILE_PATHS":null}]`),
+							}}),
+						}},
+					}),
+				},
+				{
+					Name: "data-eim-driver-nearby-staging-dagger",
+					Type: "job",
+					Data: testutils.BuildAny(t, &v1beta2.Job{
+						Attributes: &structpb.Struct{Fields: map[string]*structpb.Value{
+							"job_id":          structpb.NewStringValue("9b12cb10b119b957b085c08e49bde3f2"),
+							"job_parallelism": structpb.NewNumberValue(1),
+							"config": structpb.NewStructValue(&structpb.Struct{Fields: map[string]*structpb.Value{
+								"ENABLE_STENCIL_URL":                structpb.NewStringValue("true"),
+								"FLINK_JOB_ID":                      structpb.NewStringValue("data-eim-driver-nearby-staging-dagger"),
+								"FLINK_PARALLELISM":                 structpb.NewStringValue("1"),
+								"FLINK_ROWTIME_ATTRIBUTE_NAME":      structpb.NewStringValue("rowtime"),
+								"FLINK_SQL_QUERY":                   structpb.NewStringValue("SELECT gofood_booking.event_timestamp as event_timestamp, gofood_booking.order_number AS order_number, gofood_booking.service_type AS gofood_booking_message.service_type, gofood_booking.order_number AS gofood_booking_message.order_number, gofood_booking.order_url AS gofood_booking_message.order_url, gofood_booking.status AS gofood_booking_message.status, gofood_booking.event_timestamp AS gofood_booking_message.event_timestamp, gofood_booking.customer_id AS gofood_booking_message.customer_id, gofood_booking.customer_url AS gofood_booking_message.customer_url, gofood_booking.driver_id AS gofood_booking_message.driver_id, gofood_booking.driver_url AS gofood_booking_message.driver_url, gofood_booking.is_reblast AS gofood_booking_message.is_reblast, gofood_booking.activity_source AS gofood_booking_message.activity_source, gofood_booking.service_area_id AS gofood_booking_message.service_area_id, gofood_booking.payment_type AS gofood_booking_message.payment_type, gofood_booking.total_unsubsidised_price AS gofood_booking_message.total_unsubsidised_price, gofood_booking.customer_price AS gofood_booking_message.customer_price, gofood_booking.amount_paid_by_cash AS gofood_booking_message.amount_paid_by_cash, gofood_booking.amount_paid_by_credits AS gofood_booking_message.amount_paid_by_credits, gofood_booking.surcharge_amount AS gofood_booking_message.surcharge_amount, gofood_booking.tip_amount AS gofood_booking_message.tip_amount, gofood_booking.driver_cut_amount AS gofood_booking_message.driver_cut_amount, gofood_booking.requested_payment_type AS gofood_booking_message.requested_payment_type, gofood_booking.total_distance_in_kms AS gofood_booking_message.total_distance_in_kms, gofood_booking.route_polyline AS gofood_booking_message.route_polyline, gofood_booking.routes AS gofood_booking_message.routes, gofood_booking.driver_eta_pickup AS gofood_booking_message.driver_eta_pickup, gofood_booking.driver_eta_dropoff AS gofood_booking_message.driver_eta_dropoff, gofood_booking.driver_pickup_location AS gofood_booking_message.driver_pickup_location, gofood_booking.driver_dropoff_location AS gofood_booking_message.driver_dropoff_location, gofood_booking.customer_email AS gofood_booking_message.customer_email, gofood_booking.customer_name AS gofood_booking_message.customer_name, gofood_booking.customer_phone AS gofood_booking_message.customer_phone, gofood_booking.driver_email AS gofood_booking_message.driver_email, gofood_booking.driver_name AS gofood_booking_message.driver_name, gofood_booking.driver_phone AS gofood_booking_message.driver_phone, gofood_booking.driver_phone2 AS gofood_booking_message.driver_phone2, gofood_booking.driver_phone3 AS gofood_booking_message.driver_phone3, gofood_booking.cancel_reason_id AS gofood_booking_message.cancel_reason_id, gofood_booking.cancel_reason_description AS gofood_booking_message.cancel_reason_description, gofood_booking.cancel_source AS gofood_booking_message.cancel_source, gofood_booking.customer_type AS gofood_booking_message.customer_type, gofood_booking.cancel_owner AS gofood_booking_message.cancel_owner, gofood_booking.booking_creation_time AS gofood_booking_message.booking_creation_time, gofood_booking.total_customer_discount AS gofood_booking_message.total_customer_discount, gofood_booking.gopay_customer_discount AS gofood_booking_message.gopay_customer_discount, gofood_booking.voucher_customer_discount AS gofood_booking_message.voucher_customer_discount, gofood_booking.pickup_time AS gofood_booking_message.pickup_time, gofood_booking.driver_paid_in_cash AS gofood_booking_message.driver_paid_in_cash, gofood_booking.driver_paid_in_credit AS gofood_booking_message.driver_paid_in_credit, gofood_booking.receiver_name AS gofood_booking_message.receiver_name, gofood_booking.driver_photo_url AS gofood_booking_message.driver_photo_url, gofood_booking.previous_booking_status AS gofood_booking_message.previous_booking_status, gofood_booking.vehicle_type AS gofood_booking_message.vehicle_type, gofood_booking.customer_total_fare_without_surge AS gofood_booking_message.customer_total_fare_without_surge, gofood_booking.customer_surge_factor AS gofood_booking_message.customer_surge_factor, gofood_booking.customer_dynamic_surge AS gofood_booking_message.customer_dynamic_surge, gofood_booking.customer_dynamic_surge_enabled AS gofood_booking_message.customer_dynamic_surge_enabled, gofood_booking.driver_total_fare_without_surge AS gofood_booking_message.driver_total_fare_without_surge, gofood_booking.driver_surge_factor AS gofood_booking_message.driver_surge_factor, gofood_booking.driver_dynamic_surge AS gofood_booking_message.driver_dynamic_surge, gofood_booking.driver_dynamic_surge_enabled AS gofood_booking_message.driver_dynamic_surge_enabled, gofood_booking.driver_ata_pickup AS gofood_booking_message.driver_ata_pickup, gofood_booking.driver_ata_dropoff AS gofood_booking_message.driver_ata_dropoff, gofood_booking.gcm_key AS gofood_booking_message.gcm_key, gofood_booking.device_token AS gofood_booking_message.device_token, gofood_booking.pricing_service_id AS gofood_booking_message.pricing_service_id, gofood_booking.payment_invoice_number AS gofood_booking_message.payment_invoice_number, gofood_booking.pricing_currency AS gofood_booking_message.pricing_currency, gofood_booking.country_code AS gofood_booking_message.country_code, gofood_booking.service_area_tzname AS gofood_booking_message.service_area_tzname, gofood_booking.payment_option_type AS gofood_booking_message.payment_option_type, gofood_booking.payment_option_metadata AS gofood_booking_message.payment_option_metadata, gofood_booking.payment_option_name AS gofood_booking_message.payment_option_name, gofood_booking.booking_info AS gofood_booking_message.booking_info, gofood_booking.price_edit_reason AS gofood_booking_message.price_edit_reason, gofood_booking.driver_arrived_location AS gofood_booking_message.driver_arrived_location, gofood_booking.driver_arrived_time AS gofood_booking_message.driver_arrived_time, gofood_booking.driver_order_placed_location AS gofood_booking_message.driver_order_placed_location, gofood_booking.driver_order_placed_time AS gofood_booking_message.driver_order_placed_time, gofood_booking.merchant_accepted_time AS gofood_booking_message.merchant_accepted_time, gofood_booking.merchant_acknowledged_time AS gofood_booking_message.merchant_acknowledged_time, gofood_booking.merchant_received_time AS gofood_booking_message.merchant_received_time, gofood_booking.merchant_cancel_reason AS gofood_booking_message.merchant_cancel_reason, gofood_booking.merchant_cancel_time AS gofood_booking_message.merchant_cancel_time, gofood_booking.merchant_cancel_description AS gofood_booking_message.merchant_cancel_description, gofood_booking.takeaway_charges AS gofood_booking_message.takeaway_charges, gofood_booking.food_prepared_time AS gofood_booking_message.food_prepared_time, gofood_booking.cancel_reason_code AS gofood_booking_message.cancel_reason_code, gofood_booking.verification_requested_time AS gofood_booking_message.verification_requested_time, gofood_booking.customer_pickup_time AS gofood_booking_message.customer_pickup_time, gofood_booking.customer_pickup_location AS gofood_booking_message.customer_pickup_location, gofood_booking.verification_failed_time AS gofood_booking_message.verification_failed_time, gofood_booking.verification_requested_location AS gofood_booking_message.verification_requested_location, gofood_booking.convenience_fee AS gofood_booking_message.convenience_fee, gofood_booking.payment_actions AS gofood_booking_message.payment_actions, gofood_booking.marketplace_serviceability_log_id AS gofood_booking_message.marketplace_serviceability_log_id, gofood_booking.order_completion_time AS gofood_booking_message.order_completion_time, gofood_booking.restaurant_id AS gofood_booking_message.restaurant_id, gofood_booking.sub_status AS gofood_booking_message.sub_status, gofood_booking.shopping_price AS gofood_booking_message.shopping_price, gofood_booking.commission_price AS gofood_booking_message.commission_price, gofood_booking.withholding_income_tax AS gofood_booking_message.withholding_income_tax, gofood_booking.voucher_redeemed_value AS gofood_booking_message.voucher_redeemed_value, gofood_booking.voucher_commission AS gofood_booking_message.voucher_commission, gofood_booking.voucher_id AS gofood_booking_message.voucher_id, gofood_booking.voucher_title AS gofood_booking_message.voucher_title, gofood_booking.otp AS gofood_booking_message.otp, gofood_booking.driver_entered_price AS gofood_booking_message.driver_entered_price, gofood_booking.driver_wallet_id AS gofood_booking_message.driver_wallet_id, gofood_booking.saudagar_id AS gofood_booking_message.saudagar_id, gofood_booking.validated_at AS gofood_booking_message.validated_at, gofood_booking.merchant_wallet_id AS gofood_booking_message.merchant_wallet_id, gofood_booking.restaurant_uuid AS gofood_booking_message.restaurant_uuid, gofood_booking.search_id AS gofood_booking_message.search_id, gofood_booking.gopay_driver_reservation_id AS gofood_booking_message.gopay_driver_reservation_id, gofood_booking.voucher_batch_id AS gofood_booking_message.voucher_batch_id, gofood_booking.merchant_config AS gofood_booking_message.merchant_config, gofood_booking.brand_id AS gofood_booking_message.brand_id, gofood_booking.customer_wallet_id AS gofood_booking_message.customer_wallet_id, gofood_booking.customer_payment_details AS gofood_booking_message.customer_payment_details, gofood_booking.merchant_phone AS gofood_booking_message.merchant_phone, gofood_booking.previous_sub_status AS gofood_booking_message.previous_sub_status, gofood_booking.merchant_acceptance_deadline AS gofood_booking_message.merchant_acceptance_deadline, gofood_booking.inapplicable_voucher_id AS gofood_booking_message.inapplicable_voucher_id, gofood_booking.driver_fee_adjustments AS gofood_booking_message.driver_fee_adjustments, gofood_booking.receipt_url AS gofood_booking_message.receipt_url, gofood_booking.is_goresto AS gofood_booking_message.is_goresto, gofood_booking.shopping_items AS gofood_booking_message.shopping_items, gofood_booking.has_promo AS gofood_booking_message.has_promo, gofood_booking.analytics AS gofood_booking_message.analytics, gofood_booking.fraud_reason AS gofood_booking_message.fraud_reason, gofood_booking.driver_completion_time AS gofood_booking_message.driver_completion_time, gofood_booking.use_service_wallet_fund_flow AS gofood_booking_message.use_service_wallet_fund_flow, gofood_booking.campaign_discounts AS gofood_booking_message.campaign_discounts, gofood_booking.tracer_bullet AS gofood_booking_message.tracer_bullet, gofood_booking.bid_delay_in_seconds AS gofood_booking_message.bid_delay_in_seconds, gofood_booking.driver_gopay_account_id AS gofood_booking_message.driver_gopay_account_id, gofood_booking.use_gopay_v3 AS gofood_booking_message.use_gopay_v3, gofood_booking.experiments AS gofood_booking_message.experiments, gofood_booking.eta_in_minutes AS gofood_booking_message.eta_in_minutes, gofood_booking.eta_source AS gofood_booking_message.eta_source, gofood_booking.eta_performance_bucket AS gofood_booking_message.eta_performance_bucket, gofood_booking.payment_method AS gofood_booking_message.payment_method, gofood_booking.vehicle_info AS gofood_booking_message.vehicle_info, gofood_booking.actor AS gofood_booking_message.actor, gofood_booking.vehicle_tags AS gofood_booking_message.vehicle_tags, gofood_booking.goclub AS gofood_booking_message.goclub, gofood_nearby.event_timestamp AS gofood_nearby_message.event_timestamp, gofood_nearby.order_number AS gofood_nearby_message.order_number, gofood_nearby.driver_id AS gofood_nearby_message.driver_id, gofood_nearby.customer_id AS gofood_nearby_message.customer_id, gofood_nearby.restaurant_uuid AS gofood_nearby_message.restaurant_uuid, gofood_nearby.saudagar_id AS gofood_nearby_message.saudagar_id, gofood_nearby.type AS gofood_nearby_message.type, gofood_nearby.radius AS gofood_nearby_message.radius, gofood_nearby.actor AS gofood_nearby_message.actorFROM gofood_nearby JOIN gofood_booking ON gofood_nearby.order_number = gofood_booking.order_number AND gofood_nearby.driver_id = gofood_booking.driver_id AND gofood_nearby.customer_id = gofood_booking.customer_id AND gofood_nearby.restaurant_uuid = gofood_booking.restaurant_uuid AND gofood_nearby.rowtime BETWEEN (gofood_booking.rowtime - INTERVAL '10' MINUTE) AND (gofood_booking.rowtime + INTERVAL '40' MINUTE) AND gofood_booking.sub_status = 'OTW_PICKUP' AND gofood_nearby.type = 'PICKUP' AND gofood_nearby.actor = 'DRIVER'"),
+								"FLINK_WATERMARK_DELAY_MS":          structpb.NewStringValue("1000"),
+								"FLINK_WATERMARK_INTERVAL_MS":       structpb.NewStringValue("30000"),
+								"PROCESSOR_LONGBOW_GCP_INSTANCE_ID": structpb.NewStringValue(""),
+								"PROCESSOR_POSTPROCESSOR_CONFIG":    structpb.NewStringValue("{}"),
+								"PROCESSOR_POSTPROCESSOR_ENABLE":    structpb.NewStringValue("false"),
+								"PROCESSOR_PREPROCESSOR_CONFIG":     structpb.NewStringValue(""),
+								"PROCESSOR_PREPROCESSOR_ENABLE":     structpb.NewStringValue(""),
+								"PYTHON_UDF_CONFIG":                 structpb.NewStringValue(`{"PYTHON_FILES": "gs://data-dagger/python/master/latest/python_udfs.zip","PYTHON_REQUIREMENTS": "gs://data-dagger/python/master/latest/requirements.txt","PYTHON_ARCHIVES": "gs://data-dagger/python/master/latest/data.zip#data","PYTHON_FN_EXECUTION_ARROW_BATCH_SIZE": "10000","PYTHON_FN_EXECUTION_BUNDLE_SIZE": "100000","PYTHON_FN_EXECUTION_BUNDLE_TIME": "1000"}`),
+								"PYTHON_UDF_ENABLE":                 structpb.NewStringValue("false"),
+								"SCHEMA_REGISTRY_STENCIL_ENABLE":    structpb.NewStringValue("true"),
+								"SCHEMA_REGISTRY_STENCIL_URLS":      structpb.NewStringValue("http://data-systems-stencil.company.io/v1beta1/namespaces/company/schemas/esb/versions/72"),
+								"SINK_INFLUX_BATCH_SIZE":            structpb.NewStringValue("100"),
+								"SINK_INFLUX_DB_NAME":               structpb.NewStringValue("DAGGERS_COLLECTIVE"),
+								"SINK_INFLUX_FLUSH_DURATION_MS":     structpb.NewStringValue("1000"),
+								"SINK_INFLUX_MEASUREMENT_NAME":      structpb.NewStringValue("data-eim-driver-nearby-staging-dagger"),
+								"SINK_INFLUX_URL":                   structpb.NewStringValue("http://data-dagger-shared-influx.company.io:6798"),
+								"SINK_KAFKA_BROKERS":                structpb.NewStringValue("<REDACTED>"),
+								"SINK_KAFKA_PROTO_KEY":              structpb.NewStringValue("company.esb.gomerchants.eim.EimDriverNearbyEventKey"),
+								"SINK_KAFKA_PROTO_MESSAGE":          structpb.NewStringValue("company.esb.gomerchants.eim.EimDriverNearbyEventMessage"),
+								"SINK_KAFKA_STREAM":                 structpb.NewStringValue("data-dagstream"),
+								"SINK_KAFKA_TOPIC":                  structpb.NewStringValue("eim-driver-nearby-log"),
+								"SINK_TYPE":                         structpb.NewStringValue("kafka"),
+								"STREAMS":                           structpb.NewStringValue(`[{"INPUT_SCHEMA_EVENT_TIMESTAMP_FIELD_INDEX":"5","INPUT_SCHEMA_PROTO_CLASS":"company.esb.booking.GoFoodBookingLogMessage","INPUT_SCHEMA_TABLE":"gofood_booking","SOURCE_DETAILS":[{"SOURCE_NAME":"KAFKA_CONSUMER","SOURCE_TYPE":"UNBOUNDED"}],"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_COMMIT_ENABLE":"false","SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET":"latest","SOURCE_KAFKA_CONSUMER_CONFIG_BOOTSTRAP_SERVERS":"<REDACTED>","SOURCE_KAFKA_CONSUMER_CONFIG_GROUP_ID":"data-eim-driver-nearby-staging-dagger-0039","SOURCE_KAFKA_NAME":"company-mainstream","SOURCE_KAFKA_TOPIC_NAMES":"gofood-booking-log"},{"INPUT_SCHEMA_EVENT_TIMESTAMP_FIELD_INDEX":"1","INPUT_SCHEMA_PROTO_CLASS":"company.esb.gofood.NearbyEventMessage","INPUT_SCHEMA_TABLE":"gofood_nearby","SOURCE_DETAILS":[{"SOURCE_NAME":"KAFKA_CONSUMER","SOURCE_TYPE":"UNBOUNDED"}],"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_COMMIT_ENABLE":"false","SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET":"latest","SOURCE_KAFKA_CONSUMER_CONFIG_BOOTSTRAP_SERVERS":"<REDACTED>","SOURCE_KAFKA_CONSUMER_CONFIG_GROUP_ID":"data-eim-driver-nearby-staging-dagger-0040","SOURCE_KAFKA_NAME":"company-mainstream","SOURCE_KAFKA_TOPIC_NAMES":"gofood-nearby-log"}]`),
+							}}),
+						}},
+					}),
+				},
+			},
+		},
+		{
+			name: "InvalidRequestConfigFromScript",
+			rawCfg: map[string]interface{}{
+				"request": map[string]interface{}{
+					"url":          "{{serverURL}}/api/v1/endpoint",
+					"content_type": "application/json",
+					"accept":       "application/json",
+				},
+				"script": map[string]interface{}{
+					"engine": "tengo",
+					"source": heredoc.Doc(`
+						responses := execute_request(
+						  {
+						    "url":          "{{serverURL}}/api/v1/endpoint",
+						    "content_type": "application/json",
+						    "accept":       "application/json"
+						  },
+						  {
+						    "content_type": "application/json",
+						    "accept":       "application/json"
+						  }
+						)
+					`),
+				},
+			},
+			handler: func(t *testing.T, w http.ResponseWriter, r *http.Request) {
+				testutils.Respond(t, w, http.StatusOK, `{}`)
+			},
+			expectedErr: `Runtime Error: execute request: validate request config: {content_type: "application/json", accept: "application/json"}, Key: 'RequestConfig.url' Error:Field validation for 'url' failed on the 'required' tag`,
+		},
 		{
 			name: "ConditionalExit",
 			rawCfg: map[string]interface{}{
@@ -525,7 +762,7 @@ func TestExtract(t *testing.T) {
 				"script": map[string]interface{}{
 					"engine": "tengo",
 					"source": heredoc.Doc(`
-						if !response.success {
+						if !response.body.success {
 							exit()
 						}
 						a := new_asset("invalid")
@@ -651,4 +888,6 @@ func TestExtract(t *testing.T) {
 func replaceServerURL(cfg map[string]interface{}, serverURL string) {
 	reqCfg := cfg["request"].(map[string]interface{})
 	reqCfg["url"] = strings.Replace(reqCfg["url"].(string), "{{serverURL}}", serverURL, 1)
+	scriptCfg := cfg["script"].(map[string]interface{})
+	scriptCfg["source"] = strings.Replace(scriptCfg["source"].(string), "{{serverURL}}", serverURL, -1)
 }
diff --git a/plugins/internal/tengoutil/secure_script.go b/plugins/internal/tengoutil/secure_script.go
index 610969880..388fd2cb8 100644
--- a/plugins/internal/tengoutil/secure_script.go
+++ b/plugins/internal/tengoutil/secure_script.go
@@ -1,6 +1,8 @@
 package tengoutil
 
 import (
+	"fmt"
+
 	"github.com/d5/tengo/v2"
 	"github.com/d5/tengo/v2/stdlib"
 )
@@ -10,15 +12,21 @@ const (
 	maxConsts = 500
 )
 
-func NewSecureScript(input []byte) *tengo.Script {
+func NewSecureScript(input []byte, globals map[string]interface{}) (*tengo.Script, error) {
 	s := tengo.NewScript(input)
 
 	s.SetImports(stdlib.GetModuleMap(
-		// `os` is excluded, should not be importable from script.
+		// `os` is excluded, should *not* be importable from script.
 		"math", "text", "times", "rand", "fmt", "json", "base64", "hex", "enum",
 	))
 	s.SetMaxAllocs(maxAllocs)
 	s.SetMaxConstObjects(maxConsts)
 
-	return s
+	for name, v := range globals {
+		if err := s.Add(name, v); err != nil {
+			return nil, fmt.Errorf("new secure script: declare globals: %w", err)
+		}
+	}
+
+	return s, nil
 }
diff --git a/plugins/internal/tengoutil/secure_script_test.go b/plugins/internal/tengoutil/secure_script_test.go
index 308eb0d8e..cb2445111 100644
--- a/plugins/internal/tengoutil/secure_script_test.go
+++ b/plugins/internal/tengoutil/secure_script_test.go
@@ -12,7 +12,7 @@ import (
 
 func TestNewSecureScript(t *testing.T) {
 	t.Run("Allows import of builtin modules except os", func(t *testing.T) {
-		s := NewSecureScript(([]byte)(heredoc.Doc(`
+		s, err := NewSecureScript(([]byte)(heredoc.Doc(`
 			math := import("math")
 			text := import("text")
 			times := import("times")
@@ -22,20 +22,37 @@ func TestNewSecureScript(t *testing.T) {
 			base64 := import("base64")
 			hex := import("hex")
 			enum := import("enum")
-		`)))
-		_, err := s.Compile()
+		`)), nil)
+		assert.NoError(t, err)
+		_, err = s.Compile()
 		assert.NoError(t, err)
 	})
 
 	t.Run("os import disallowed", func(t *testing.T) {
-		s := NewSecureScript(([]byte)(`os := import("os")`))
-		_, err := s.Compile()
+		s, err := NewSecureScript(([]byte)(`os := import("os")`), nil)
+		assert.NoError(t, err)
+		_, err = s.Compile()
 		assert.ErrorContains(t, err, "Compile Error: module 'os' not found")
 	})
 
 	t.Run("File import disallowed", func(t *testing.T) {
-		s := NewSecureScript(([]byte)(`sum := import("./testdata/sum")`))
-		_, err := s.Compile()
+		s, err := NewSecureScript(([]byte)(`sum := import("./testdata/sum")`), nil)
+		assert.NoError(t, err)
+		_, err = s.Compile()
 		assert.ErrorContains(t, err, "Compile Error: module './testdata/sum' not found")
 	})
+
+	t.Run("Script globals", func(t *testing.T) {
+		s, err := NewSecureScript(([]byte)(`obj.prop = 1`), nil)
+		assert.NoError(t, err)
+		_, err = s.Compile()
+		assert.ErrorContains(t, err, "Compile Error: unresolved reference 'obj'")
+
+		s, err = NewSecureScript(([]byte)(`obj.prop = 1`), map[string]interface{}{
+			"obj": map[string]interface{}{},
+		})
+		assert.NoError(t, err)
+		_, err = s.Compile()
+		assert.NoError(t, err)
+	})
 }
diff --git a/plugins/internal/tengoutil/structmap/structmap.go b/plugins/internal/tengoutil/structmap/structmap.go
index 60a813bd8..be0c2b0ab 100644
--- a/plugins/internal/tengoutil/structmap/structmap.go
+++ b/plugins/internal/tengoutil/structmap/structmap.go
@@ -42,12 +42,17 @@ func AsMap(v interface{}) (interface{}, error) {
 }
 
 func AsStruct(input, output interface{}) error {
+	return AsStructWithTag("json", input, output)
+}
+
+func AsStructWithTag(tagName string, input, output interface{}) error {
 	dec, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{
 		DecodeHook: mapstructure.ComposeDecodeHookFunc(
 			checkAssetDataHookFunc(),
 			stringToTimestampHookFunc(time.RFC3339),
 			timeToTimestampHookFunc(),
 			mapstructure.StringToTimeHookFunc(time.RFC3339),
+			mapstructure.StringToTimeDurationHookFunc(),
 			mapToAttributesHookFunc(),
 			mapToAnyPBHookFunc(),
 		),
@@ -55,7 +60,7 @@ func AsStruct(input, output interface{}) error {
 		ErrorUnused:      true,
 		ZeroFields:       true,
 		Result:           output,
-		TagName:          "json",
+		TagName:          tagName,
 	})
 	if err != nil {
 		return fmt.Errorf("structmap: decode into %T: create decoder: %w", output, err)
diff --git a/plugins/internal/tengoutil/structmap/structmap_test.go b/plugins/internal/tengoutil/structmap/structmap_test.go
index 81df0d082..8df71c59e 100644
--- a/plugins/internal/tengoutil/structmap/structmap_test.go
+++ b/plugins/internal/tengoutil/structmap/structmap_test.go
@@ -181,6 +181,17 @@ func TestAsMap(t *testing.T) {
 	}
 }
 
+func TestAsStructWithTag(t *testing.T) {
+	type V struct {
+		Duration time.Duration `myspltag:"duration"`
+	}
+	input := map[string]interface{}{"duration": "5s"}
+	var v V
+	err := AsStructWithTag("myspltag", input, &v)
+	assert.NoError(t, err)
+	assert.Equal(t, V{Duration: time.Second * 5}, v)
+}
+
 func TestAsStruct(t *testing.T) {
 	cases := []struct {
 		name        string
@@ -383,7 +394,7 @@ func TestAsStruct(t *testing.T) {
 				"type":           "table",
 				"data":           map[string]interface{}{},
 			},
-			expected: &v1beta2.Asset{},
+			expected:    &v1beta2.Asset{},
 			expectedErr: "invalid keys: does-not-exist",
 		},
 		{
diff --git a/plugins/processors/script/tengo_script.go b/plugins/processors/script/tengo_script.go
index 0e3563f91..bd70d67a0 100644
--- a/plugins/processors/script/tengo_script.go
+++ b/plugins/processors/script/tengo_script.go
@@ -70,8 +70,10 @@ func (p *Processor) Init(ctx context.Context, config plugins.Config) error {
 		return fmt.Errorf("script processor init: %w", err)
 	}
 
-	s := tengoutil.NewSecureScript(([]byte)(p.config.Script))
-	if err := p.declareGlobals(s); err != nil {
+	s, err := tengoutil.NewSecureScript(([]byte)(p.config.Script), map[string]interface{}{
+		"asset": map[string]interface{}{},
+	})
+	if err != nil {
 		return fmt.Errorf("script processor init: %w", err)
 	}
 
@@ -108,14 +110,3 @@ func (p *Processor) Process(ctx context.Context, src models.Record) (models.Reco
 
 	return models.NewRecord(transformed), nil
 }
-
-func (p *Processor) declareGlobals(s *tengo.Script) error {
-	for name, v := range map[string]interface{}{
-		"asset": map[string]interface{}{},
-	} {
-		if err := s.Add(name, v); err != nil {
-			return fmt.Errorf("declare script globals: %w", err)
-		}
-	}
-	return nil
-}