AWS Kinesis
This tutorial will demonstrate how to use a AWS Kinesis trigger to invoke a function. We’ll assume you have Fission and Kubernetes installed. If not, please head over to the install guide.
You will also need AWS Kinesis setup which is reachable from the Fission Kubernetes cluster.
Installation
If you want to setup Kinesis on the Kubernetes cluster, you can use the information here or you can create streams using your aws account docs.
Also note that, if you are using localstack then it is only good for testing and dev environments and not for production usage.
Overview
Before we dive into details, let’s walk through overall flow of event and functions involved.
- A Go producer function (producerfunc) or aws cli command which acts as a producer and drops a message in a Kinesis stream named
request
. - Fission Kinesis trigger activates and invokes another function (consumerfunc) with body of Kinesis message.
- The consumer function (consumerfunc) gets body of message and returns a response.
- Fission Kinesis trigger takes the response of consumer function (consumerfunc) and drops the message in a response stream named
response
. If there is an error, the message is dropped in error stream namederror
.
When communicating to localstack we need aws cli installed in the respactive container(deployment). This is because it uses aws configuration to connect to localstack. Below are the command to create and send the message to a stream
$ aws kinesis create-stream --shard-count 2 --stream-name request $ aws kinesis create-stream --shard-count 1 --stream-name response $ aws kinesis create-stream --shard-count 1 --stream-name error $ aws kinesis list-streams $ aws kinesis put-record --stream-name request --partition-key 111 --data 'Test Message'
Building the app
Producer Function
The producer function is a go program which creates a message with timestamp and drops into a kinesis stream request
.
For brevity all values have been hard coded in the code itself.
package main
import (
"encoding/json"
"fmt"
"net/http"
"strconv"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/kinesis"
)
type message struct {
Content string `json:"content"`
}
func Handler(w http.ResponseWriter, r *http.Request) {
region := "ap-south-1"
config := &aws.Config{
Region: ®ion,
Credentials: credentials.NewStaticCredentials("xxxxxxxxxxxx", "xxxxxxxxxx", ""),
}
s, err := session.NewSession(config)
if err != nil {
w.Write([]byte(fmt.Sprintf("error creating a session: %v", err)))
return
}
kc := kinesis.New(s)
for i := 11; i <= 20; i++ {
record, err := json.Marshal(&message{
Content: fmt.Sprintf("message count %v", i+1),
})
if err != nil {
w.Write([]byte(fmt.Sprintf("error marshaling the message: %v", err)))
return
}
params := &kinesis.PutRecordInput{
Data: record, // required
PartitionKey: aws.String(strconv.Itoa(i)), // required
StreamName: aws.String("request"), // required
}
_, err = kc.PutRecord(params)
if err != nil {
w.Write([]byte(fmt.Sprintf("error putting a record: %v", err)))
return
}
}
w.Write([]byte("messages sent successfully"))
}
Since the go program uses Kinesis stream, we need to create the request stream to run the above program.
We are now ready to package this code and create a function so that we can execute it later. Following commands will create a environment, package and function. Verify that build for package succeeded before proceeding.
$ fission env create --name goenv --image fission/go-env --builder fission/go-builder
$ zip -qr kinesis.zip *
$ fission package create --env goenv --src kinesis.zip
Package 'kinesis-zip-cy16' created
$ fission fn create --name producerfunc --env goenv --pkg kinesis-zip-cy16 --entrypoint Handler
$ fission package info --name kinesis-zip-cy16
Name: kinesis-zip-cy16
Environment: goenv
Status: succeeded
Build Logs:
Building in directory /usr/src/kinesis-zip-cy16-o3vrx1
Consumer function
The consumer function is nodejs function which takes the body of the request, appends a “Hello” and returns the resulting string.
module.exports = async function (context) {
console.log(context.request.body);
let obj = context.request.body;
return {
status: 200,
body: "Hello "+ JSON.stringify(obj)
};
}
Let’s create the environment and function:
fission env create --name nodeenv --image fission/node-env
fission fn create --name consumerfunc --env nodeenv --code hellokinesis.js
Connecting via trigger
We have both the functions ready but the connection between them is the missing glue.
Let’s create a message queue trigger which will invoke the consumerfunc every time there is a message in request
stream.
The response will be sent to response
stream and in case of consumerfunc invocation fails, the error is written to error
stream.
fission mqt create --name kinesisdeployment --function helloworld --mqtype aws-kinesis-stream --topic request --resptopic response --mqtkind keda --errortopic error --maxretries 3 --metadata streamName=request --metadata shardCount=2 --metadata awsRegion=ap-south-1 --secret awsSecrets
Parameter list:
- streamName - Name of AWS Kinesis Stream
- awsRegion - AWS Region for the Kinesis Stream
- shardCount - The target value that a Kinesis data streams consumer can handle.
- secret - AWS credentials require to connect the stream e.g. below
if we are using localstack we don’t have to give secret but if we are using aws to create kinesis stream we need to provide the secret, below is the example to create secret
kubectl create secret generic awsSecrets --from-env-file=./secret.yaml
and secret.yaml file should contain values like
AWS_ACCESS_KEY_ID=foo AWS_SECRET_ACCESS_KEY=bar
Testing it out
Let’s invoke the producer function so that the stream request
gets some messages and we can see the consumer function in action.
$ fission fn test --name
Successfully sent to input
There are a couple of ways you can verify that the consumerfunc is called:
- Check the logs of
mqtrigger-kinesis
pods:
{"level":"info","ts":1603106377.3910372,"caller":"aws-kinesis-http-connector/main.go:212","msg":"done processing message","shardID":"shardId-000000000001","message":"Hello Msg 112"}
{"level":"info","ts":1603106377.3916092,"caller":"aws-kinesis-http-connector/main.go:212","msg":"done processing message","shardID":"shardId-000000000000","message":"Hello Msg 111"}
- Go to aws Kinesis stream and check if messages are coming in response stream
Introducing an error
Let’s introduce an error scenario - instead of consumer function returning a 200, you can return 400 which will cause an error:
module.exports = async function (context) {
console.log(context.request.body);
let obj = context.request.body;
return {
status: 400,
body: "Hello "+ JSON.stringify(obj)
};
}
Update the function with new code and invoke the producer function:
$ fission fn update --name consumerfunc --code hellokinesis.js
$ fission fn test --name producerfunc
Successfully sent to input
We can verify the message in error stream as we did earlier:
- Go to aws Kinesis stream and check if messages are coming in error stream