Skip to content

Commit

Permalink
Move relevant examples to the Serverless repo (#815)
Browse files Browse the repository at this point in the history
  • Loading branch information
grego952 authored Mar 15, 2024
1 parent 8476bf3 commit 667a989
Show file tree
Hide file tree
Showing 16 changed files with 658 additions and 0 deletions.
13 changes: 13 additions & 0 deletions examples/custom-serverless-runtime-image/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
FROM python:3.10-bullseye

COPY kubeless/requirements.txt /kubeless/requirements.txt
RUN pip install -r /kubeless/requirements.txt
RUN pip install protobuf==3.20.* --force-reinstall

COPY kubeless/ /

WORKDIR /

USER 1000

CMD ["python", "/kubeless.py"]
28 changes: 28 additions & 0 deletions examples/custom-serverless-runtime-image/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Custom Serverless Runtime Image

## Overview

This example shows how to create own custom runtime for a Serverless Function based on the Python runtime and the `debian:bullseye-slim` base image to provide support for glibc.

## Prerequisites

- Docker as a build tool

## Build an Example Runtime

1. Export the following environments:

```bash
export IMAGE_NAME=<image_name>
export IMAGE_TAG=<image_tag>
```

2. Build and push the image:

```bash
docker build -t "${IMAGE_NAME}/${IMAGE_TAG}" .
docker push "${IMAGE_NAME}/${IMAGE_TAG}"
```

> [!NOTE]
> You can use it to define your Functions in Kyma. To learn more, read [how to override runtime image](https://kyma-project.io/#/serverless-manager/user/resources/06-20-serverless-cr?id=custom-resource-parameters).
95 changes: 95 additions & 0 deletions examples/custom-serverless-runtime-image/kubeless/ce.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import requests
import bottle
import io
import os
import json

publisher_proxy_address = os.getenv('PUBLISHER_PROXY_ADDRESS')

class PicklableBottleRequest(bottle.BaseRequest):
'''Bottle request that can be pickled (serialized).
`bottle.BaseRequest` is not picklable and therefore cannot be passed directly to a
python multiprocessing `Process` when using the forkserver or spawn multiprocessing
contexts. So, we selectively delete components that are not picklable.
'''

def __init__(self, data, *args, **kwargs):
super().__init__(*args, **kwargs)
# Bottle uses either `io.BytesIO` or `tempfile.TemporaryFile` to store the
# request body depending on whether the length of the body is less than
# `MEMFILE_MAX` or not, but `tempfile.TemporaryFile` is not picklable.
# So, we override it to always store the body as `io.BytesIO`.
self.environ['bottle.request.body'] = io.BytesIO(data)

def __getstate__(self):
env = self.environ.copy()

# File-like objects are not picklable.
del env['wsgi.errors']
del env['wsgi.input']

# bottle.ConfigDict is not picklable because it contains a lambda function.
del env['bottle.app']
del env['bottle.route']
del env['route.handle']

return env

def __setstate__(self, env):
setattr(self, 'environ', env)


class Event:
ceHeaders = dict()
tracer = None

def __init__(self, req, tracer):
data = req.body.read()
picklable_req = PicklableBottleRequest(data, req.environ.copy())
if req.get_header('content-type') == 'application/json':
data = req.json

self.req = req
self.tracer = tracer
self.ceHeaders = {
'data': data,
'ce-type': req.get_header('ce-type'),
'ce-source': req.get_header('ce-source'),
'ce-eventtypeversion': req.get_header('ce-eventtypeversion'),
'ce-specversion': req.get_header('ce-specversion'),
'ce-id': req.get_header('ce-id'),
'ce-time': req.get_header('ce-time'),
'extensions': {'request': picklable_req}
}

def __getitem__(self, item):
return self.ceHeaders[item]

def __setitem__(self, name, value):
self.ceHeaders[name] = value

def publishCloudEvent(self, data):
return requests.post(
publisher_proxy_address,
data = json.dumps(data),
headers = {"Content-Type": "application/cloudevents+json"}
)

def resolveDataType(self, event_data):
if type(event_data) is dict:
return 'application/json'
elif type(event_data) is str:
return 'text/plain'

def buildResponseCloudEvent(self, event_id, event_type, event_data):
return {
'type': event_type,
'source': self.ceHeaders['ce-source'],
'eventtypeversion': self.ceHeaders['ce-eventtypeversion'],
'specversion': self.ceHeaders['ce-specversion'],
'id': event_id,
'data': event_data,
'datacontenttype': self.resolveDataType(event_data)
}

193 changes: 193 additions & 0 deletions examples/custom-serverless-runtime-image/kubeless/kubeless.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
#!/usr/bin/env python

import importlib
import os
import queue
import threading

import bottle
import prometheus_client as prom
import sys

import tracing
from ce import Event
from tracing import set_req_context


def create_service_name(pod_name: str, service_namespace: str) -> str:
# remove generated pods suffix ( two last sections )
deployment_name = '-'.join(pod_name.split('-')[0:pod_name.count('-') - 1])
return '.'.join([deployment_name, service_namespace])


# The reason this file has an underscore prefix in its name is to avoid a
# name collision with the user-defined module.
module_name = os.getenv('MOD_NAME')
if module_name is None:
print('MOD_NAME have to be provided', flush=True)
exit(1)
current_mod = os.path.basename(__file__).split('.')[0]
if module_name == current_mod:
print('Module cannot be named {} as current module'.format(current_mod), flush=True)
exit(2)

sys.path.append('/kubeless')

mod = importlib.import_module(module_name)
func_name = os.getenv('FUNC_HANDLER')
if func_name is None:
print('FUNC_HANDLER have to be provided', flush=True)
exit(3)

func = getattr(mod, os.getenv('FUNC_HANDLER'))

func_port = os.getenv('FUNC_PORT', 8080)
timeout = float(os.getenv('FUNC_TIMEOUT', 180))
memfile_max = int(os.getenv('FUNC_MEMFILE_MAX', 100 * 1024 * 1024))
bottle.BaseRequest.MEMFILE_MAX = memfile_max

app = application = bottle.app()

function_context = {
'function-name': func.__name__,
'timeout': timeout,
'runtime': os.getenv('FUNC_RUNTIME'),
'memory-limit': os.getenv('FUNC_MEMORY_LIMIT'),
}

tracecollector_endpoint = os.getenv('TRACE_COLLECTOR_ENDPOINT')
pod_name = os.getenv('HOSTNAME')
service_namespace = os.getenv('SERVICE_NAMESPACE')
service_name = create_service_name(pod_name, service_namespace)

tracer_provider = None
# To not create several tracer providers, when the server start forking.
if __name__ == "__main__":
tracer_provider = tracing.ServerlessTracerProvider(tracecollector_endpoint, service_name)


def func_with_context(e, function_context):
ex = e.ceHeaders["extensions"]
with set_req_context(ex["request"]):
return func(e, function_context)


@app.get('/healthz')
def healthz():
return 'OK'


@app.get('/metrics')
def metrics():
bottle.response.content_type = prom.CONTENT_TYPE_LATEST
return prom.generate_latest(prom.REGISTRY)


@app.error(500)
def exception_handler():
return 'Internal server error'


@app.route('/<:re:.*>', method=['GET', 'POST', 'PATCH', 'DELETE'])
def handler():
req = bottle.request
tracer = tracer_provider.get_tracer(req)
event = Event(req, tracer)

method = req.method
func_calls.labels(method).inc()
with func_errors.labels(method).count_exceptions():
with func_hist.labels(method).time():
que = queue.Queue()
t = threading.Thread(target=lambda q, e: q.put(func_with_context(e, function_context)), args=(que, event))
t.start()
try:
res = que.get(block=True, timeout=timeout)
if hasattr(res, 'headers') and res.headers["content-type"]:
bottle.response.content_type = res.headers["content-type"]
except queue.Empty:
return bottle.HTTPError(408, "Timeout while processing the function")
else:
t.join()
return res


def preload():
"""This is a no-op function used to start the forkserver."""
pass


if __name__ == '__main__':
import logging
import multiprocessing as mp
import requestlogger

mp_context = os.getenv('MP_CONTEXT', 'forkserver')

if mp_context == "fork":
raise ValueError(
'"fork" multiprocessing context is not supported because cherrypy is a '
'multithreaded server and safely forking a multithreaded process is '
'problematic'
)
if mp_context not in ["forkserver", "spawn"]:
raise ValueError(
f'"{mp_context}" is an invalid multiprocessing context. Possible values '
'are "forkserver" and "spawn"'
)

try:
ctx = mp.get_context(mp_context)

if ctx.get_start_method() == 'forkserver':
# Preload the current module and consequently also the user-defined module
# so that all the child processes forked from the forkserver in response to
# a request immediately have access to the global data in the user-defined
# module without having to load it for every request.
ctx.set_forkserver_preload([current_mod])

# Start the forkserver before we start accepting requests.
d = ctx.Process(target=preload)
d.start()
d.join()

except ValueError:
# Default to 'spawn' if 'forkserver' is unavailable.
ctx = mp.get_context('spawn')
logging.warn(
f'"{mp_context}" multiprocessing context is unavailable. Using "spawn"'
)

func_hist = prom.Histogram(
'function_duration_seconds', 'Duration of user function in seconds', ['method']
)
func_calls = prom.Counter(
'function_calls_total', 'Number of calls to user function', ['method']
)
func_errors = prom.Counter(
'function_failures_total', 'Number of exceptions in user function', ['method']
)

# added by Kyma team
if os.getenv('KYMA_INTERNAL_LOGGER_ENABLED'):
# default that has been used so far
loggedapp = requestlogger.WSGILogger(
app,
[logging.StreamHandler(stream=sys.stdout)],
requestlogger.ApacheFormatter(),
)
else:
loggedapp = app
# end of modified section

bottle.run(
loggedapp,
server='cherrypy',
host='0.0.0.0',
port=func_port,
# Set this flag to True to auto-reload the server after any source files change
reloader=os.getenv('CHERRYPY_RELOADED', False),
# Number of requests that can be handled in parallel (default = 50).
numthreads=int(os.getenv('CHERRYPY_NUMTHREADS', 50)),
quiet='KYMA_BOTTLE_QUIET_OPTION_DISABLED' not in os.environ,
)
11 changes: 11 additions & 0 deletions examples/custom-serverless-runtime-image/kubeless/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
setuptools==69.1.1
requests
bottle==0.12.21
cherrypy==8.9.1
wsgi-request-logger==0.4.6
prometheus_client==0.8.0
opentelemetry-api==1.9.1
opentelemetry-sdk==1.9.1
opentelemetry-exporter-otlp-proto-http==1.9.1
opentelemetry-propagator-b3==1.9.1
opentelemetry-instrumentation-requests==0.28b1
Loading

0 comments on commit 667a989

Please sign in to comment.