Innovating with AWS Lambda Extensions: Boosting Your Serverless Capabilities
I) Introduction
Just when we think we’ve mastered AWS Lambda, we discover hidden features and capabilities that deepen our understanding and enhance our applications. One such powerful feature is Lambda extensions.
We’ll explore in this article what lambda extension is and how it can power up our Serverless applications by analyzing a real use case.
II) What is an Lambda extension?
This diagram illustrates the operation of the AWS Lambda service. The Lambda execution environment comprises Processes, which handle the actual computation by running the Lambda function’s code, and API Endpoints, which act as interfaces between the Lambda’s compute environment and the AWS platform.
When you invoke a Lambda function, the Lambda service sends an API call to the Runtime API, which then invokes the runtime within the Processes. After the function executes, the runtime sends the response back to the Runtime API, which forwards it to the Lambda service. This is how you receive the response from your Lambda function invocation.
Based on this, we can now explain in detail what a Lambda extension is. In simple terms, a Lambda extension is code that runs as an independent process alongside your Lambda handler code within the same execution environment, as depicted in the diagram. A Lambda extension comes as a layer and shares the execution environment’s memory, timeout, and ephemeral storage with the handler.
Lambda Extension specificities.
Despite a Lambda extension sharing the memory and timeout of your Lambda function, it does not share the same lifecycle. An extension can run before the Lambda function handler and continue to run after the function invocation is fully processed. Extensions can register for function and execution environment lifecycle events (initialization, invocation, and shutdown), allowing you to define and run your own logic during these phases (e.g., triggering a process when your Lambda finishes execution)
Another feature of extensions is that they can run in a different language than the function since they operate as independent processes.
Lambda extensions to send lambda’s logs to custom destinations
One of the use cases for Lambda extensions is to handle Lambda's logs, allowing you to send logs to any destination you choose. Previously, to send logs to a custom destination, you would typically configure and operate a CloudWatch Log Group subscription, with a separate Lambda function forwarding logs to your desired destination.
With Lambda extensions, logging tools can now receive log streams directly from within the Lambda execution environment and send them to any destination. In this article, I will use an S3 bucket as the destination for Lambda's logs. Throughout the rest of the article, I will guide you through implementing a Lambda extension. You can find all the code in this repository.
How to create an extension ?
To add an extension, you only need to add the layer to your lambda. The extension process will be spawn automatically. Lambda looks for extensions in the /opt/extensions/
directory, interprets each file as an executable bootstrap for launching the extension, and starts all extensions in parallel.
Lambda extension code for custom destination
Taking a look on the repo, we can see that the code has different components:
I) Lambda extension API
First step is to register your process as an extension to be able to listen to the different events that are happening in your lambda. For this we need to use the lambda extension API. As shown in the diagram down below.
# extensions/logs_api_http_extension/extensions_api_client.py
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: MIT-0
import json
import os
import sys
import urllib.request
# Demonstrates code to register as an extension.
LAMBDA_AGENT_NAME_HEADER_KEY = "Lambda-Extension-Name"
LAMBDA_AGENT_IDENTIFIER_HEADER_KEY = "Lambda-Extension-Identifier"
class ExtensionsAPIClient():
def __init__(self):
try:
runtime_api_address = os.environ['AWS_LAMBDA_RUNTIME_API']
self.runtime_api_base_url = f"http://{runtime_api_address}/2020-01-01/extension"
except Exception as e:
raise Exception(f"AWS_LAMBDA_RUNTIME_API is not set {e}") from e
# Register as early as possible - the runtime initialization starts after all extensions have registered.
def register(self, agent_unique_name, registration_body):
try:
print(f"extension.extensions_api_client: Registering Extension at ExtensionsAPI address: {self.runtime_api_base_url}")
req = urllib.request.Request(f"{self.runtime_api_base_url}/register")
req.method = 'POST'
req.add_header(LAMBDA_AGENT_NAME_HEADER_KEY, agent_unique_name)
req.add_header("Content-Type", "application/json")
data = json.dumps(registration_body).encode("utf-8")
req.data = data
resp = urllib.request.urlopen(req)
if resp.status != 200:
print(f"extension.extensions_api_client: /register request to ExtensionsAPI failed. Status: {resp.status}, Response: {resp.read()}")
# Fail the extension
sys.exit(1)
agent_identifier = resp.headers.get(LAMBDA_AGENT_IDENTIFIER_HEADER_KEY)
# print(f"extension.extensions_api_client: received agent_identifier header {agent_identifier}")
return agent_identifier
except Exception as e:
raise Exception(f"Failed to register to ExtensionsAPI: on {self.runtime_api_base_url}/register \
with agent_unique_name:{agent_unique_name} \
and registration_body:{registration_body}\nError: {e}") from e
# Call the following method when the extension is ready to receive the next invocation
# and there is no job it needs to execute beforehand.
def next(self, agent_id):
try:
print(f"extension.extensions_api_client: Requesting /event/next from Extensions API")
req = urllib.request.Request(f"{self.runtime_api_base_url}/event/next")
req.method = 'GET'
req.add_header(LAMBDA_AGENT_IDENTIFIER_HEADER_KEY, agent_id)
req.add_header("Content-Type", "application/json")
resp = urllib.request.urlopen(req)
if resp.status != 200:
print(f"extension.extensions_api_client: /event/next request to ExtensionsAPI failed. Status: {resp.status}, Response: {resp.read()} ")
# Fail the extension
sys.exit(1)
data = resp.read()
print(f"extension.extensions_api_client: Received event from ExtensionsAPI: {data}")
return data
except Exception as e:
raise Exception(f"Failed to get /event/next from ExtensionsAPI: {e}") from e
II) Logs API
The logs API is necessary to subscribe to the log events. This will give the extension the ability to listen to the lambda’s logs events.
# extensions/logs_api_http_extension/logs_api_client.py
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: MIT-0
import json
import os
import sys
import urllib.request
# Demonstrates code to call the Logs API to subscribe to log events
LAMBDA_AGENT_IDENTIFIER_HEADER_KEY = "Lambda-Extension-Identifier"
class LogsAPIClient:
def __init__(self):
try:
runtime_api_address = os.environ['AWS_LAMBDA_RUNTIME_API']
self.logs_api_base_url = f"http://{runtime_api_address}/2020-08-15/logs"
except Exception as e:
raise Exception(f"AWS_LAMBDA_RUNTIME_API is not set {e}") from e
# Method to call the Logs API to subscribe to log events.
def subscribe(self, agent_id, subscription_body):
try:
print(f"extension.logs_api_client: Subscribing to Logs API on {self.logs_api_base_url}")
req = urllib.request.Request(f"{self.logs_api_base_url}")
req.method = 'PUT'
req.add_header(LAMBDA_AGENT_IDENTIFIER_HEADER_KEY, agent_id)
req.add_header("Content-Type", "application/json")
data = json.dumps(subscription_body).encode("utf-8")
req.data = data
resp = urllib.request.urlopen(req)
if resp.status == 202:
print("extension.logs_api_client: WARNING!!! Logs API is not supported! Is this extension running in a local sandbox?")
elif resp.status != 200:
print(f"extension.logs_api_client: Could not subscribe to Logs API: {resp.status} {resp.read()}")
# Fail the extension
sys.exit(1)
# print(f"extension.logs_api_client: Succesfully subscribed to Logs API: {resp.read()}")
except Exception as e:
raise Exception(f"Failed to subscribe to Logs API on {self.logs_api_base_url} with id: {agent_id} \
and subscription_body: {json.dumps(subscription_body).encode('utf-8')} \nError:{e}") from e
III) HTTP listener
In order for the extension to get the content of the logs, the extension (external process) needs to be exposed as a server, so the lambda can communicate with the logs api to receive the data.
# extensions/logs_api_http_extension/http_listener.py
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: MIT-0
import json
import os
import sys
from http.server import BaseHTTPRequestHandler, HTTPServer
from threading import Event, Thread
# Demonstrates code to set up an HTTP listener and receive log events
RECEIVER_NAME = "sandbox"
LOCAL_DEBUGGING_IP = "0.0.0.0"
RECEIVER_PORT = 4243
def get_listener_address():
return RECEIVER_NAME if ("true" != os.getenv("AWS_SAM_LOCAL")) else LOCAL_DEBUGGING_IP
def http_server_init(queue):
def handler(*args):
LogsHandler(queue, *args)
listener_address = get_listener_address()
server = HTTPServer((listener_address, RECEIVER_PORT), handler)
# Ensure that the server thread is scheduled so that the server binds to the port
# and starts to listening before subscribing to the LogsAPI and asking for the next event.
started_event = Event()
server_thread = Thread(target=serve, daemon=True, args=(started_event, server,listener_address,))
server_thread.start()
rc = started_event.wait(timeout = 9)
if rc is not True:
raise Exception("server_thread has timed out before starting")
# Server implementation
class LogsHandler(BaseHTTPRequestHandler):
def __init__(self, queue, *args):
self.queue = queue
BaseHTTPRequestHandler.__init__(self, *args)
def do_POST(self):
try:
cl = self.headers.get("Content-Length")
if cl:
data_len = int(cl)
else:
data_len = 0
content = self.rfile.read(data_len)
self.send_response(200)
self.end_headers()
batch = json.loads(content.decode("utf-8"))
self.queue.put(batch)
except Exception as e:
print(f"Error processing message: {e}")
# Server thread
def serve(started_event, server, listener_name):
# Notify that this thread is up and running
started_event.set()
try:
print(f"extension.http_listener: Running HTTP Server on {listener_name}:{RECEIVER_PORT}")
server.serve_forever()
except:
print(f"extension.http_listener: Error in HTTP server {sys.exc_info()[0]}", flush=True)
finally:
if server:
server.shutdown()
IV) Wrap it up
All the files we defined need to be referenced in a single entry point file, which should be placed under the path /extensions/. As mentioned before, Lambda will look for all the files under /opt/extensions/ and will call them in parallel. Therefore, we need to organize all the defined files into a specific folder (/logs_api_http_extension) and create a new file that will call these files in the desired order. Otherwise, all the scripts will be executed simultaneously.
Please note a small modification in the code below compared to the file available in the GitHub repository. I modified Line 71 to add a sleep(5)
, as without it, I couldn't receive any logs in the S3 bucket due to the Lambda cold start. Here is the related GitHub issue: GitHub Issue.
#!/bin/sh
# extensions/logs_api_http_extension.py
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: MIT-0
''''exec python -u -- "$0" ${1+"$@"} # '''
import os
import sys
from pathlib import Path
from datetime import datetime
from time import sleep
# Add lib folder to path to import boto3 library.
# Normally with Lambda Layers, python libraries are put into the /python folder which is in the path.
# As this extension is bringing its own Python runtime, and running a separate process, the path is not available.
# Hence, having the files in a different folder and adding it to the path, makes it available.
lib_folder = Path(__file__).parent / "lib"
sys.path.insert(0,str(lib_folder))
import boto3
from logs_api_http_extension.http_listener import http_server_init, RECEIVER_PORT
from logs_api_http_extension.logs_api_client import LogsAPIClient
from logs_api_http_extension.extensions_api_client import ExtensionsAPIClient
from queue import Queue
"""Here is the sample extension code.
- The extension runs two threads. The "main" thread, will register with the Extension API and process its invoke and
shutdown events (see next call). The second "listener" thread listens for HTTP POST events that deliver log batches.
- The "listener" thread places every log batch it receives in a synchronized queue; during each execution slice,
the "main" thread will make sure to process any event in the queue before returning control by invoking next again.
- Note that because of the asynchronous nature of the system, it is possible that logs for one invoke are
processed during the next invoke slice. Likewise, it is possible that logs for the last invoke are processed during
the SHUTDOWN event.
Note:
1. This is a simple example extension to help you understand the Lambda Logs API.
This code is not production ready. Use it with your own discretion after testing it thoroughly.
2. The extension code starts with a shebang. This is to bring Python runtime to the execution environment.
This works if the lambda function is a python3.x function, therefore it brings the python3.x runtime with itself.
It may not work for python 2.7 or other runtimes.
The recommended best practice is to compile your extension into an executable binary and not rely on the runtime.
3. This file needs to be executable, so make sure you add execute permission to the file
`chmod +x logs_api_http_extension.py`
"""
class LogsAPIHTTPExtension():
def __init__(self, agent_name, registration_body, subscription_body):
# print(f"extension.logs_api_http_extension: Initializing LogsAPIExternalExtension {agent_name}")
self.agent_name = agent_name
self.queue = Queue()
self.logs_api_client = LogsAPIClient()
self.extensions_api_client = ExtensionsAPIClient()
# Register early so Runtime could start in parallel
self.agent_id = self.extensions_api_client.register(self.agent_name, registration_body)
# Start listening before Logs API registration
# print(f"extension.logs_api_http_extension: Starting HTTP Server {agent_name}")
http_server_init(self.queue)
self.logs_api_client.subscribe(self.agent_id, subscription_body)
def run_forever(self):
# Configuring S3 Connection
s3_bucket = (os.environ['S3_BUCKET_NAME'])
s3 = boto3.resource('s3')
print(f"extension.logs_api_http_extension: Receiving Logs {self.agent_name}")
while True:
resp = self.extensions_api_client.next(self.agent_id)
## Added by me, without it couldn't receive any logs in S3
## This is related to the cold start of a lambda
## https://github.com/aws-samples/aws-lambda-extensions/issues/55
sleep(5)
# Process the received batches if any.
while not self.queue.empty():
batch = self.queue.get_nowait()
# This following line logs the events received to CloudWatch.
# Replace it to send logs to elsewhere.
# If you've subscribed to extension logs, e.g. "types": ["platform", "function", "extension"],
# you'll receive the logs of this extension back from Logs API.
# And if you log it again with the line below, it will create a cycle since you receive it back again.
# Use `extension` log type if you'll egress it to another endpoint,
# or make sure you've implemented a protocol to handle this case.
# print(f"Log Batch Received from Lambda: {batch}", flush=True)
# There are two options illustrated:
# 1. Sending the entire log batch to S3
# 2. Parsing the batch and sending individual log lines.
# This could be used to parse the log lines and only selectively send logs to S3, or amend for any other destination.
# 1. The following line writes the entire batch to S3
s3_filename = (os.environ['AWS_LAMBDA_FUNCTION_NAME'])+'-'+(datetime.now().strftime('%Y-%m-%d-%H:%M:%S.%f'))+'.log'
try:
response = s3.Bucket(s3_bucket).put_object(Key=s3_filename, Body=str(batch))
except Exception as e:
raise Exception(f"Error sending log to S3 {e}") from e
# Register for the INVOKE events from the EXTENSIONS API
_REGISTRATION_BODY = {
"events": ["INVOKE", "SHUTDOWN"],
}
# Subscribe to platform logs and receive them on ${local_ip}:4243 via HTTP protocol.
TIMEOUT_MS = 1000 # Maximum time (in milliseconds) that a batch is buffered.
MAX_BYTES = 262144 # Maximum size in bytes that the logs are buffered in memory.
MAX_ITEMS = 10000 # Maximum number of events that are buffered in memory.
_SUBSCRIPTION_BODY = {
"destination":{
"protocol": "HTTP",
"URI": f"http://sandbox:{RECEIVER_PORT}",
},
"types": ["platform", "function"],
"buffering": {
"timeoutMs": TIMEOUT_MS,
"maxBytes": MAX_BYTES,
"maxItems": MAX_ITEMS
}
}
def main():
# print(f"extension.logs_api_http_extension: Starting Extension {_REGISTRATION_BODY} {_SUBSCRIPTION_BODY}")
# Note: Agent name has to be file name to register as an external extension
ext = LogsAPIHTTPExtension(os.path.basename(__file__), _REGISTRATION_BODY, _SUBSCRIPTION_BODY)
ext.run_forever()
if __name__ == "__main__":
main()
Deployment
For the deployment is pretty straight forward, we can just follow the instructions mentioned in the repo.
They offer a SAM template that deploys an S3 bucket (in which we’ll store the lambda logs), a Lambda function with the extension attached to it as a layer. All that’s left for us is to execute these two commands.
sam build
sam deploy --stack-name s3-logs-extension-demo --guided
Testing phase: Moment of truth
If everything was setup correctly, we should see an S3 bucket and a lambda available in the AWS account.
After invoking the lambda, you should be able to see the logs in the S3 bucket.
Congratulations 🎉🎉
You have now setup a lambda extension that sends your lambda logs to an S3. Feel free to let me know what you think about lambda extensions in your comment and what was your use case.
Here are a couple of useful links before I go :
- https://aws.amazon.com/blogs/compute/using-aws-lambda-extensions-to-send-logs-to-custom-destinations/
- https://docs.aws.amazon.com/lambda/latest/dg/runtimes-extensions-api.html
- https://docs.aws.amazon.com/lambda/latest/dg/extensions-api-partners.html
- https://github.com/aws-samples/aws-lambda-extensions/issues/55