Dynamo DB based DR sample

Below document explains how DynamoDB can be used to implement DR in Admiral. The active and passive Admiral instances try to obtain lease on a lock object maintained on a Dynamo DB table. Dynamo DB supports Global tables which can help replicate lock object information between regions where Admiral is hosted. Writes are supported on multiple regions.

Admiral state changes

  • An Admiral instance gets promoted to Active state when it obtains a lease on the lock object.
  • Once lease is obtained, the Active state Admiral keeps renewing the lease every x seconds.
  • An Admiral instance in passive state keeps checking if the lease is updated every x seconds. If the lease is not updated for y*x seconds, where y is the failure threshold, the passive Admiral takes over as the Active state Admiral.

Sample Code

Define the structs needed to unmarshall DynamoDB configs from Yaml file

package clusters

type DynamoDBConfigWrapper struct {
	DynamoDBConfig DynamoDBConfig `yaml:"dynamoDB,omitempty"`

Reference struct used to unmarshall the DynamoDB config present in the yaml config file
type DynamoDBConfig struct {
	LeaseName               string `yaml:"leaseName,omitempty"`
	PodIdentifier           string `yaml:"podIdentifier,omitempty"`
	WaitTimeInSeconds       int `yaml:"waitTimeInSeconds,omitempty"`
	FailureThreshold        int `yaml:"failureThreshold,omitempty"`
	TableName               string `yaml:"tableName,omitempty"`
	Role                    string `yaml:"role,omitempty"`
	Region                  string `yaml:"region,omitempty"`

Define new struct for DR implementation using Dynamo DB and provide implementation for necessary interface methods of AdmiralStateChecker interface

package clusters

import (
	log ""

The skip lease pod can be used for testing DynamoDB based DR.
Update the podname field to "SKIP-LEASE-POD" to test Admiral pods in passive mode.

type DynamoDBBasedStateChecker struct {
	drConfigFileLocation string

func (DynamoDBBasedStateChecker) shouldRunOnIndependentGoRoutine() bool{
	return true;

This method has the logic to update the ReadOnly field within the AdmiralState object based on the lease obtained on the shared lock object
The AdmiralState object is referenced everywhere in the code before trying to create/update/delete Istio custom objects

Below is the logic for Admiral instance in Active state
1. Get the latest lease information from DynamoDB table
2. If the current pod owns the lease, update the last updated field with current timestamp
3. Update ReadOnly field to false.
4. Sleep for configured duration
5. Admiral instance which is constantly monitoring all the clusters for changes and is responsible to creating , updating and deleting the Istio custom objects
like Service Entry, Destination rule, Virtual Service , Sidecar and others.

Below is the logic for Admiral instance in Passive state
1. Get the latest lease information from DynamoDB table
2. If the current pod does not own the lease, check if the last updated time field is within the configured wait threshold.
3. If the last updated time field is older than the computed threshold, update self as the owner of the lease with current timestamp as last updated time
4. If the last updated time field is within the computed threshold,mark current pod as read only
5. Sleep for configured duration
func (dr DynamoDBBasedStateChecker) runStateCheck(ctx context.Context){
	CurrentAdmiralState.ReadOnly = ReadOnlyEnabled
	var dynamodbClient *DynamoClient
	dynamoDBConfig,err := BuildDynamoDBConfig(dr.drConfigFileLocation)
	if nil!= err {
		log.Error("DynamoDR: Could not start DynamoDBBasedStateChecker ", err)
		panic("Could not start DynamoDBBasedStateChecker")
	dynamodbClient = NewDynamoClient(dynamoDBConfig)
	waitDuration := time.Duration(dynamoDBConfig.WaitTimeInSeconds) * time.Second
	ticker := time.NewTicker(waitDuration)
	tickChan := ticker.C
	// Call Execute State Check explicitly to speed up initialization. Without this the initialization will be delayed by waitDuration
	ExecuteStateCheck(dynamoDBConfig, dynamodbClient)
	for {
		select {
		case <-ctx.Done():
			log.Infoln("DynamoDR: context done stopping ticker")

		case <-tickChan:
			ExecuteStateCheck(dynamoDBConfig, dynamodbClient)

func ExecuteStateCheck(dynamoDBConfig DynamoDBConfig, dynamodbClient *DynamoClient ){
	leaseName := dynamoDBConfig.LeaseName
	podIdentifier := dynamoDBConfig.PodIdentifier
	waitTimeInSeconds :=dynamoDBConfig.WaitTimeInSeconds
	failureThreshold := dynamoDBConfig.FailureThreshold
	log.Infof("DynamoDR: CurrentPod = %v LeaseName = %v WaitTime= %v sec tableName= %v role= %v region= %v" ,podIdentifier, leaseName, waitTimeInSeconds, dynamoDBConfig.TableName,dynamoDBConfig.Role,dynamoDBConfig.Region)

	currentTime := time.Now().UTC().Unix()
	log.Infof("DynamoDR: Retrieving latest value of read write value for leaseName : %v , timestamp : %v " , leaseName,currentTime )
	readWriteLeases, err := dynamodbClient.getReadWriteLease()
	if nil!=err{
			"error": err.Error(),
		}).Error("DynamoDR: Error retrieving the latest lease")
		//Transition Admiral to Read-only mode in case of issue connecting to Dynamo DB
		CurrentAdmiralState.ReadOnly =ReadOnlyEnabled
		log.Error("DynamoDR: Error retrieving the latest lease. Admiral will not write")
	readWriteLease := filterOrCreateLeaseIfNotFound(readWriteLeases,leaseName)
	if "" == readWriteLease.LeaseOwner {
		log.Infof("DynamoDR: Lease with name=%v does not exist. Creating a new lease with owner=%v" , leaseName,podIdentifier)
		readWriteLease.LeaseOwner = podIdentifier
		readWriteLease.UpdatedTime = currentTime
		//Not updating read-write mode until we confirm this pod has the lease
	}else if SKIP_LEASE_CHECK_POD_NAME == readWriteLease.LeaseOwner {
		log.Info("DynamoDR: Lease held by skip lease check pod. Setting Admiral to read only mode")
		CurrentAdmiralState.ReadOnly =ReadOnlyEnabled
		CurrentAdmiralState.IsStateInitialized = StateInitialized
	}else if podIdentifier == readWriteLease.LeaseOwner {
		CurrentAdmiralState.ReadOnly = ReadWriteEnabled
		CurrentAdmiralState.IsStateInitialized = StateInitialized
		log.Infof("DynamoDR: Lease with name=%v is owned by the current pod. Extending lease ownership till %v. Admiral will write",leaseName, currentTime)
		readWriteLease.UpdatedTime = currentTime
	}else if readWriteLease.UpdatedTime < (currentTime - int64(waitTimeInSeconds*failureThreshold)){
		diffSecs := currentTime -readWriteLease.UpdatedTime
		log.Infof("DynamoDR: Current time %v is more than the lastUpdated time of lease %v by %v sec. Taking over the lease from %v.",currentTime, readWriteLease.UpdatedTime,diffSecs, readWriteLease.LeaseOwner)
		readWriteLease.LeaseOwner = podIdentifier
		readWriteLease.UpdatedTime = currentTime
		//Not updating read-write mode until we confirm this pod has the lease
	}else {
		log.Infof("DynamoDR: Lease held by %v till %v . Admiral will not write ", readWriteLease.LeaseOwner, readWriteLease.UpdatedTime)
		CurrentAdmiralState.ReadOnly = ReadOnlyEnabled;
		CurrentAdmiralState.IsStateInitialized = StateInitialized


Utility methods needed to communicate with DynamoDB

package clusters

import (
	log ""

Utility function to block the go-routine for duration specified
func sleep(sleepDuration time.Duration, sleepSeconds int){
	log.Info("Sleeping for ", sleepSeconds, " seconds")

Utility function to filter lease from all the leases returned from DynamoDB
The DynamoDB table maybe used for multiple environments
func filterOrCreateLeaseIfNotFound(allLeases []ReadWriteLease, leaseName string) ReadWriteLease {
	for _, readWriteLease := range  allLeases {
		if readWriteLease.LeaseName == leaseName {
			return  readWriteLease
	readWriteLease := ReadWriteLease{}
	readWriteLease.LeaseName = leaseName;
	readWriteLease.Notes ="Created at "+strconv.FormatInt(time.Now().UTC().Unix(), 10)
	return readWriteLease

type ReadWriteLease struct {
	LeaseName string `json:"leaseName"`
	LeaseOwner string `json:"leaseOwner"`
	UpdatedTime int64 `json:"updatedTime"`
	Notes string `json:"notes"`

type DynamoClient struct {
	svc dynamodbiface.DynamoDBAPI

func NewDynamoClient(dynamoDBConfig DynamoDBConfig) *DynamoClient {
	return &DynamoClient{
		svc: GetDynamoSvc(dynamoDBConfig.Role,dynamoDBConfig.Region),

Utility function to update lease duration .
This will be called in configured interval by Active instance
Passive instance calls this when it finds the existing Active instance has not udpated the lease within the duration specified.
func (client *DynamoClient) updatedReadWriteLease(lease ReadWriteLease, tableName string) error {
	svc := client.svc
	av, err := dynamodbattribute.MarshalMap(lease)
	if err != nil {
			"error" : err.Error(),
		}).Error("Error marshalling readWriteLease item.")
		return err

	input := &dynamodb.PutItemInput{
		Item:      av,
		TableName: aws.String(tableName),
	_, err = svc.PutItem(input)
	if err != nil {
			"error": err.Error(),
		}).Error("Got error calling PutItem:")
		return err
		"leaseName": lease.LeaseName,
		"leaseOwner": lease.LeaseOwner,
		"updatedTime": lease.UpdatedTime,
		"notes": lease.Notes,
	}).Info("Successfully added item to table " + tableName)

	return err

Utility function to get all the entries from the Dynamo DB table
func (client *DynamoClient) getReadWriteLease() ([]ReadWriteLease, error) {
	var readWriteLeases []ReadWriteLease
	svc := client.svc
	log.Info("Fetching existing readWrite entries...")
	readWriteLeaseEntries, err := svc.Scan(&dynamodb.ScanInput{
		TableName: aws.String("admiral-lease"),
	if err != nil {
			"error": err.Error(),
		}).Error("Failed to scan dynamo table")
		return nil, err

		"readWriteLeaseEntries": readWriteLeaseEntries,
	}).Debug("retrieved records...")

	item := ReadWriteLease{}

	for _, v := range readWriteLeaseEntries.Items {
		err = dynamodbattribute.UnmarshalMap(v, &item)
		if err != nil {
				"error": err.Error(),
			}).Panic("Failed to unmarshall record")
		readWriteLeases = append(readWriteLeases, item)
	return readWriteLeases, nil

Utility function to initialize AWS seassion for DynamoDB connection
func GetDynamoSvc(dynamoArn string,region string) *dynamodb.DynamoDB {
	log.Info("dynamoArn: "+dynamoArn)
	sess := session.Must(session.NewSession())
	// Create the credentials from AssumeRoleProvider to assume the role
	// referenced by the "myRoleARN" ARN.
	creds := stscreds.NewCredentials(sess, dynamoArn)
	// Create a Session with a custom region
	dynamoSession := session.Must(session.NewSession(&aws.Config{
		Credentials: creds,
		Region:      &region,
	// Create service client value configured for credentials
	// from assumed role.
	svc := dynamodb.New(dynamoSession)
	return svc
utility function to read the yaml file containing the DynamoDB configuration.
The file will be present inside the pod. File name should be provided as a program argument.
func BuildDynamoDBConfig(configFile string) (DynamoDBConfig, error) {

	data, err := ioutil.ReadFile(configFile)
	dynamoDBConfigWrapper := &DynamoDBConfigWrapper{}

	if err != nil {
		return DynamoDBConfig{}, fmt.Errorf("error reading config file to build Dynamo DB config: %v", err)

	err = yaml.Unmarshal(data, &dynamoDBConfigWrapper)

	if err != nil {
		return DynamoDBConfig{}, fmt.Errorf("error unmarshaling config file err: %v", err)

	return dynamoDBConfigWrapper.DynamoDBConfig,nil

Add DynamoDBBasedStateChecker as an option in startAdmiralStateChecker function

utility function to identify the Admiral DR implementation based on the program parameters
func startAdmiralStateChecker (ctx context.Context,params common.AdmiralParams){
	var  admiralStateChecker AdmiralStateChecker
	admiralStateCheckerName :=  strings.ToLower(params.AdmiralStateCheckerName)
	switch  admiralStateCheckerName {
	     Add entries for your custom Disaster Recovery state checkers below
	     case "keywordforsomecustomchecker":
			admiralStateChecker  = customChecker{}
	case "dynamodbbasedstatechecker":
		admiralStateChecker = DynamoDBBasedStateChecker{params.DRStateStoreConfigPath}
		admiralStateChecker = NoOPStateChecker{}

Configuration changes

Create a DynamoDB table with below specifications. You can follow instructions provided here to create Global tables spanning your DR regions

Table name :- admiral-lease
Attribute names
leaseName type-string  Partition key
leaseOwner type-string 
notes  type-string 
updatedTime type-Number

Below is a sample config file that be used to configure Dynamo DB connection settings. This can be added as a configmap and made available to Admiral pod using Volume mounts

leaseName: mylease
podIdentifier: myname
waitTimeInSeconds: 15
failureThreshold: 3
tableName: admiral-lease
role: arn:aws:iam::<awsaccountno>:role/<rolename>
region: us-west-2

Include below program arguments during Admiral startup

--admiral_state_checker_name dynamodbbasedstatechecker
--dr_state_store_config_path <some file location inside Admiral pod>
--se_ip_prefix 242.0 

Please use different values for se_ip_prefix per Admiral deployments. This is needed to ensure that the same IP address is not used in different service entries which causes collisions in envoy clusters and resulting in failed config pushes from istiod