I love serverless. I’ve written about it before in my previous post on how to implement a third-party logs ingestion workflow, but I now want to tackle the question: ‘Is it possible to scale stateless serverless functions on the runtime as opposed to leveraging function concurrency?’
In short, my answer is yes.
Stateless vs stateful applications
In the context of AWS Lambda, serverless makes it easier to scale up applications in multiple parallel executions. However, this also implies that stateful applications need to rely on some form of external storage to share memory. This is the only asynchronous way by which function invocations can ‘communicate’ with each other about their evolving status.
This adds to the complexity. It is somewhat harder to design a system that does not store its shared status on any form of local resource, be it volatile memory or the filesystem. But is it always necessary to avoid all forms of local sharing in the serverless runtime environment?
I believe the answer is no. Purists may shudder at this but I will try to make my case.
It really depends on whether the application you’re writing is stateful as opposed to stateless.
If you’re writing code that runs periodically and perpetually, with a need to communicate its evolving status, (and this is what I mean by stateful) then pushing the runtime for more parallelism is not an idea that will significantly simplify your application design. You’ll still need to have external storage in which to track the status, as the runtime existence is ephemeral and bound in time.
If instead your application is stateless, then much can be gained by leveraging parallel executions on the runtime itself.
Let’s consider an example. You want to write a scavenger function that periodically checks whether entries on a DynamoDB table are old enough to be deleted, so that the table is kept minimal and scan operations consume less read capacity. In short, you want to save on your DynamoDB bill.
This is an example of a stateless application. The single application run – or Lambda function invocation – does not need to know anything about previous runs.
By increasing the parallelism on the Lambda runtime environment, we get more speed, and possibly some savings on the Lambda bill.
If we were instead invoking the Lambda function multiple times in parallel, we would possibly be getting multiple cold starts, together with having to share an application status. That would be necessary to avoid having the same segments in our DynamoDB table being read by different Lambda invocations, thus giving up the benefits of parallel execution. In short, we’d be introducing a status for a conceptually stateless invocation. It doesn’t sound right.
Language considerations
Now this is where I get excited. Increasing the parallelism of code is something that can be done in any language supporting multithreading.
Golang has been my language of choice, because I think the Go pattern of sharing memory by communicating fits very well with stateless AWS Lambda functions.
Go support for cheap routines and channels is outstanding. Together with the support for context cancellation provided by the package context, it makes a great candidate language for programming a parallel function for the Lambda runtime.
The parallel implementation
Let’s revisit the example of the scavenger function. How do we translate our thoughts into code?
Here’s the code sample for our implementation, and some guidelines for interpreting the code snippet:
- The Lambda handler is conceptually divided in two blocks: One reads items from the DynamoDB table, while the other deals with deleting them.
- Both reads and deletions are parallelised.
- A channel is used to send items from read to deletion.
- The channel is closed when the reading phase is complete, without having to rely on knowing exactly how many items have been read.
- Context objects are used to expire routines without leaking them on the Lambda execution timeout expiration.
package main
import (
"context"
"fmt"
"log"
"os"
"strconv"
"sync"
"time"
"github.com/aws/aws-lambda-go/lambda"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute"
"github.com/remeh/sizedwaitgroup"
)
var svc *dynamodb.DynamoDB
func init() {
sess := session.Must(session.NewSession())
svc = dynamodb.New(sess)
}
type TableItem struct {
WorkerId int `dynamodbav:",string"`
StartTime string `dynamodbav:",string"`
StartExecutionTime string `dynamodbav:",string"`
EndTime string `dynamodbav:",string"`
PaginationToken string `dynamodbav:",string"`
Completed int `dynamodbav:",string"`
}
func HandleRequest(ctx aws.Context) {
deadline, _ := ctx.Deadline()
log.Println("Setting the parent context to control all context supporting operations.")
ctx, cancel := context.WithDeadline(ctx, deadline)
Defers the context cancellation so that all the routines, that can be cancelled upon the context being so, are and do not leak resources.
defer cancel()
log.Println("Reading DynamoDb table name from the Environment")
WorkersTableName := os.Getenv("WorkersTableName")
log.Println("Setting the total number of segments in which to split the DynamoDb table scanning operations.")
totalSegments, convErr := strconv.Atoi(os.Getenv("TotalSegments"))
if convErr != nil {
log.Fatalf("Fatal error occurred when trying to convert \"TotalSegments\" environment variable from string to int. Error message was: %s", convErr)
}
log.Println("Creating the channel onto which to push the items read from the table.")
readChannel := make(chan TableItem, 3*totalSegments) //we want this channel to be buffered so that the sender is not blocked if no receivers are ready.
The scan can be parallelised among multiple readers with a set number of segments. The segment
and totalSegments
can be set as part of the input fields. The scanned items are made available on the readChannel
.
readRoutine := func(ctx *aws.Context, input *dynamodb.ScanInput, wg *sync.WaitGroup) {
defer wg.Done()
select {
case <-(*ctx).Done():
log.Println((*ctx).Err())
return // returning not to leak the goroutine
default:
log.Printf("Starting scan operation for segment number %v", *(input.Segment))
scanErr := svc.ScanPagesWithContext(*ctx, input, func(page *dynamodb.ScanOutput, lastPage bool) bool {
//fmt.Println(page.String()) //This is to debug the retrieved page.
//fmt.Println(page.Items[0])
Here use a type TableItem
that is application-defined and matches the structure of items from the DynamoDB table the application is reading from.
Unmarshall the scanned items to the slice of TableItem
structures and send them to the readChannel
.
Once the deleteItem
reads the object from the channel it can extract the relevant properties and add them to the DeleteItemInput
structure it needs to build to send the delete request to DynamoDB.
It is not possible to simply have the scanned items passed over to the delete function as the DeleteItemInput
structure is not compatible with the custom table-dependent item structure that is returned by the scan operation.
TableItems := []TableItem{}
log.Println("Unmarshalling the scanned page of items from the DynamoDB table into the scavenger-specific slice of TableItem objects.")
if readErr := dynamodbattribute.UnmarshalListOfMaps(page.Items, &TableItems); readErr != nil {
log.Fatalf("Failed to unmarshall DynamoDb scanned items: Error message was %s", readErr.(awserr.Error).Message())
}
log.Println("Outputting the table items to the readChannel")
log.Printf("The number of TableItems awaiting being output to the readChannel is: %v", len(TableItems))
for _, readTableItem := range TableItems {
readChannel <- readTableItem
log.Printf("TableItem with WorkerId #%v has been sent to the readChannel.", readTableItem.WorkerId)
}
log.Println("All Table Items from the scanned page have been sent to the readChannel.")
return !lastPage //see https://github.com/aws/aws-sdk-go/blob/v1.35.37/service/dynamodb/api.go#L5267 as to why this function returns the negated lastPage value.
})
if scanErr != nil {
if aerr, ok := scanErr.(awserr.Error); ok {
switch aerr.Code() {
case dynamodb.ErrCodeProvisionedThroughputExceededException:
fmt.Println(dynamodb.ErrCodeProvisionedThroughputExceededException, aerr.Error())
case dynamodb.ErrCodeResourceNotFoundException:
fmt.Println(dynamodb.ErrCodeResourceNotFoundException, aerr.Error())
case dynamodb.ErrCodeRequestLimitExceeded:
fmt.Println(dynamodb.ErrCodeRequestLimitExceeded, aerr.Error())
case dynamodb.ErrCodeInternalServerError:
fmt.Println(dynamodb.ErrCodeInternalServerError, aerr.Error())
default:
fmt.Println(aerr.Error())
}
} else {
// Print the error, cast err to awserr.Error to get the Code and
// Message from an error.
fmt.Println(scanErr.Error())
}
}
return
}
}
Cast totalSegments
to int64
as that is the type expected by SetSegment
and SetTotalSegments
.
totalSegmentsInt64 := int64(totalSegments)
var wg sync.WaitGroup
for segment := int64(0); segment < totalSegmentsInt64; segment++ {
log.Println("Creating the ScanInput object for each segment scanning routine.")
input := &dynamodb.ScanInput{
TableName: aws.String(WorkersTableName),
ExpressionAttributeNames: map[string]*string{
"#Comp": aws.String("Completed"),
},
ExpressionAttributeValues: map[string]*dynamodb.AttributeValue{
":NCompl": {
N: aws.String("1"),
},
This is necessary to ensure that the old entries are also returned by the scan operation, as originally Completed
was of type ‘string’.
":SCompl": {
S: aws.String("1"),
},
},
FilterExpression: aws.String("#Comp = :NCompl OR #Comp = :SCompl"),
//FilterExpression: aws.String("Completed = '1'"),
}
input.SetSegment(segment)
input.SetTotalSegments(totalSegmentsInt64)
log.Printf("Launching the scanning routine for segment #%v", segment)
log.Printf("The scanning input is: %s", input.String())
wg.Add(1)
go readRoutine(&ctx, input, &wg)
}
The routine that is launched here is responsible for closing the communication channel between producers and consumers to avoid the Lambda running until the execution timeout expires. Given that we are listening on the communication channel with a ‘range’ directive, without having to predict how many items are going to be sent on the channel and having to use another mechanism to communicate that the sending is over, the receiving routine can also end on retrieving all elements from the channel.
I believe that the devised solution of using a waitgroup before closing the communication channel and then allowing for the channel to be emptied is more elegant than using another out-of-band shared variable to check that we are done with sending. This is also more Go-like because we can share memory by communicating, rather than communicating by sharing memory.
go func(wg *sync.WaitGroup) {
wg.Wait()
close(readChannel)
}(&wg)
Here we keep any utility variables that are used by date operations within the deleteItem
function.
loc, _ := time.LoadLocation("UTC")
maxItemLifeTime := 48
Now we have the section of the handler that is responsible for the deletion of the scanned items.
deleteItem := func(ctx *aws.Context, input *dynamodb.DeleteItemInput, StartExecutionTime string, swg *sizedwaitgroup.SizedWaitGroup) {
defer (*swg).Done()
select {
case <-(*ctx).Done():
log.Println((*ctx).Err())
return // returning not to leak the goroutine
default:
//if now - StartExecutionTime >= 48hours then delete.
if StartExecutionTime == "" {
log.Fatalf("The read item did not have a StartExecutionTime attribute. The related WorkerId is %s", *input.Key["WorkerId"].N)
}
log.Printf("Parsing the StartExecutionTime for item with WorkerId #%v", *input.Key["WorkerId"].N)
StartExecutionTime, dateError := time.Parse(time.RFC3339, StartExecutionTime)
if dateError != nil {
fmt.Println(dateError)
}
diff := time.Now().In(loc).Sub(StartExecutionTime)
hoursdiff := int(diff.Hours())
log.Printf("Checking whether the item with WorkerId #%v should be deleted, which is happening if the StartExecutionTime timestamp is older than %v hours.", *input.Key["WorkerId"].N, maxItemLifeTime)
if hoursdiff > maxItemLifeTime {
log.Printf("Deleting item with WorkerId #%v", *input.Key["WorkerId"].N)
_, deleteErr := svc.DeleteItemWithContext(*ctx, input)
// Print the error, cast err to awserr.Error to get the Code and Message from an error.
if deleteErr != nil {
if aerr, ok := deleteErr.(awserr.Error); ok {
switch aerr.Code() {
case dynamodb.ErrCodeConditionalCheckFailedException:
fmt.Println(dynamodb.ErrCodeConditionalCheckFailedException, aerr.Error())
case dynamodb.ErrCodeProvisionedThroughputExceededException:
fmt.Println(dynamodb.ErrCodeProvisionedThroughputExceededException, aerr.Error())
case dynamodb.ErrCodeResourceNotFoundException:
fmt.Println(dynamodb.ErrCodeResourceNotFoundException, aerr.Error())
case dynamodb.ErrCodeItemCollectionSizeLimitExceededException:
fmt.Println(dynamodb.ErrCodeItemCollectionSizeLimitExceededException, aerr.Error())
case dynamodb.ErrCodeTransactionConflictException:
fmt.Println(dynamodb.ErrCodeTransactionConflictException, aerr.Error())
case dynamodb.ErrCodeRequestLimitExceeded:
fmt.Println(dynamodb.ErrCodeRequestLimitExceeded, aerr.Error())
case dynamodb.ErrCodeInternalServerError:
fmt.Println(dynamodb.ErrCodeInternalServerError, aerr.Error())
default:
fmt.Println(aerr.Error())
}
} else {
fmt.Println(deleteErr.Error())
}
}
}
return
}
}
log.Println("Getting the max number of concurrent deletion routines from the environment.")
maxDelRoutines, convErr := strconv.Atoi(os.Getenv("MaxDelRoutines"))
if convErr != nil {
log.Fatalf("An error occurred while attempting to convert environment variable \"MaxDelRoutines\" from string to int. Error message was: %s", convErr)
}
Here the idea is that if there is space in the sizeWaitGroup
for a routine then launch it, but if not we need to wait until there is availability so as not to overwhelm the DynamoDB table, which can retain most of the capacity for the main read and update operations performed by the other Lambdas in the orchestration framework.
log.Println("Creating the sized wait group to synchronise the group of concurrent deletion routines and keep their number maxed out as provided by the Environment.")
swg := sizedwaitgroup.New(maxDelRoutines)
for readOutput := range readChannel {
log.Println("Waiting to add a new deletion routine to the sized synchronous wait group.")
swg.Add()
log.Println("Added a deletion routine.")
input := &dynamodb.DeleteItemInput{
Key: map[string]*dynamodb.AttributeValue{
"WorkerId": {
N: aws.String(strconv.Itoa(readOutput.WorkerId)),
},
"StartTime": {
S: aws.String(readOutput.StartTime),
},
},
TableName: aws.String(WorkersTableName),
}
DeleteStartExecutionTime := readOutput.StartExecutionTime
log.Printf("Launching deletion routine for item with WorkerId #%v", *input.Key["WorkerId"].N)
go deleteItem(&ctx, input, DeleteStartExecutionTime, &swg)
}
swg.Wait()
log.Printf("All completed and older than %v hours items from the DynamoDB table %v have been successfully scanned and deleted.", maxItemLifeTime, WorkersTableName)
}
func main() {
lambda.Start(HandleRequest)
}
Analysis
For the analysis, a sequential version of the code above was used as a comparison – essentially the same code but with the parallelism variables set to 1
.
Then two specular copies of a DynamoDB table were prepared by leveraging AWS DataPipelines to export tables from S3 and reimport them. The tables contained approximately 130000 items and the functions conditionally deleted all but about 3300 items that did not meet the deletion criteria.
A number of tests were then run.
The parameterised settings used for the analysis are shown in the following table:
Allocated memory (MB) | # deletion routines | # reading routines | Billed duration (ms) | Max memory used (MB) | Billed cost ($) |
---|---|---|---|---|---|
128 | 1 | 1 | 1965719 | 95.5* | 0.004128 |
512 | 10 | 10 | 701119 | 505 | 0.005819 |
1024 | 20 | 10 | 366121 | 445 | 0.006114 |
1024 | 50 | 20 | 381811 | 882 | 0.006376 |
1024 | 100 | 100 | 412897 | 766 | 0.006895 |
2048 | 100 | 50 | 210355 | 862 | 0.007005 |
2048 | 100 | 100 | 189988 | 709 | 0.006327 |
*
The maximum memory used is the average consumed by the 3 consecutive sequential executions.
Let’s comment on the first two execution setups reported in the table; similar considerations can easily be deduced for the others.
The memory for the parallel Lambda function was set to 512MB and 128MB was allocated for the sequential function. This changes the billing unitary cost per ms, as per AWS Lamdba pricing
, which on eu-west-1
translates to:
Parallel lambda: $0.0000000083 / ms
Sequential lambda: $0.0000000021 / ms
The parallelism for both the reading and the deleting parts of our handler was set to 10
in the parallel Lambda function. This worked out well for the chosen amount of memory, but the same cannot be said for all the other tested setups (see table above).
The functions were executed with the same timeout, set to the currently allowed AWS maximum of 15 minutes.
The parallel code successfully deleted all the items from the DynamoDB table in just over 700000ms, or about 12 minutes:
while the sequential implementation took three executions. Two of these ran down to the wire until the expiration of the execution timeout, thus a billed duration of 2 x 900000ms. The last one is shown below:
In summary, the total costs for running the parallel and the sequential implementation were:
Parallel: 701119ms*0.0000000083$/ms = $0.005819
Sequential: (1800000+165719)ms*0.0000000021$/ms = $0.004128
Conclusions
The sequential implementation turns out to be cheaper by about 29% when it comes to total cost, albeit the execution takes 180% longer than in the first parallel case reported above.
The dataset allows us to draw a few more soft conclusions:
- Because DynamoDB is an HTTP database, we have a substantial impact on the billed duration, which only on average scales with the allocated memory and parallelism.
- The billing incurred when processing sequentially is not matched by any of the parallel tests, however the sheer amount of time that the execution takes is significantly longer than any others.
- The execution duration on average scales down with the allocated memory and parallelism, and seems to be more sensitive to the allocated memory, which for Lambda carries a relationship with CPU power more than to parallelism. The fact that at 1.8GB allocated memory a second virtual core is assigned to the function, as informally known from some AWS reInvent talks, seems to not carry any additional benefits.
Getting back to the initial question – whether there is a case for stretching the parallelism of the execution of a function on the Lambda runtime, I think the analysis provided suggests that it does.
In particular, if the focus is on reducing the execution duration of a stateless application, a Golang implementation seems like a good idea.
Happy coding!
See our latest technology team opportunities
If you see a position that suits, why not apply today?
We create this content for general information purposes and it should not be taken as advice. Always take professional advice. Read our full disclaimer
This block is configured using JavaScript. A preview is not available in the editor.