Completed DynamoDB + DAX Benchmarker with a nice TUI to boot

This commit is contained in:
hamilcarBarca17
2023-08-02 18:11:41 -06:00
parent 09862c1b43
commit e42070eefa
55 changed files with 3574 additions and 1 deletions
+245
View File
@@ -0,0 +1,245 @@
package main
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"math/rand"
"os"
"strconv"
"strings"
"time"
"github.com/Dark-Alex-17/dynamodb-benchmarker/pkg/models"
"github.com/Dark-Alex-17/dynamodb-benchmarker/pkg/simulators"
"github.com/Dark-Alex-17/dynamodb-benchmarker/pkg/utils"
"github.com/aws/aws-dax-go/dax"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/elastic/go-elasticsearch/v8"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)
var concurrentSimulations, buffer, attributes, duration int
var username, password, index, endpoint, tableName string
var readOnly bool
func main() {
rootCmd := &cobra.Command{
Use: "dax-benchmarker",
Short: "A CLI tool for simulating heavy usage against DAX and publishing metrics to an Elastic Stack for analysis",
RunE: func(cmd *cobra.Command, args []string) error {
if err := validateFlags(); err != nil {
return err
}
execute()
return nil
},
}
rootCmd.PersistentFlags().IntVarP(&concurrentSimulations, "concurrent-simulations", "c", 1000, "The number of concurrent simulations to run")
rootCmd.PersistentFlags().IntVarP(&buffer, "buffer", "b", 500, "The buffer size of the Elasticsearch goroutine's channel")
rootCmd.PersistentFlags().IntVarP(&attributes, "attributes", "a", 5, "The number of attributes to use when populating and querying the DynamoDB table; minimum value of 1")
rootCmd.PersistentFlags().IntVarP(&duration, "duration", "d", 1800, "The length of time (in seconds) to run the benchmark for")
rootCmd.PersistentFlags().StringVarP(&username, "username", "u", "elastic", "Local Elasticsearch cluster username")
rootCmd.PersistentFlags().StringVarP(&password, "password", "p", "changeme", "Local Elasticsearch cluster password")
rootCmd.PersistentFlags().StringVarP(&index, "index", "i", "dax", "The Elasticsearch Index to insert data into")
rootCmd.PersistentFlags().StringVarP(&tableName, "table", "t", fmt.Sprintf("%s-high-velocity-table", os.Getenv("USER")), "The DynamoDB table to perform operations against")
rootCmd.PersistentFlags().StringVarP(&endpoint, "endpoint", "e", "", "The DAX endpoint to hit when running simulations (assumes secure endpoint, so do not specify port)")
rootCmd.PersistentFlags().BoolVarP(&readOnly, "read-only", "r", false, "Whether to run a read-only scenario for benchmarking")
if err := rootCmd.Execute(); err != nil {
log.Errorf("Something went wrong parsing CLI args and executing the client: %v", err)
}
}
func validateFlags() error {
if len(endpoint) == 0 {
daxEndpointEnvironmentVariable := os.Getenv("DAX_ENDPOINT")
if len(daxEndpointEnvironmentVariable) == 0 {
return errors.New("a DAX endpoint must be specified either via -e, --endpoint or via the DAX_ENDPOINT environment variable")
} else {
endpoint = daxEndpointEnvironmentVariable
}
}
if attributes < 1 {
return errors.New("the number of attributes cannot be lower than 1")
}
if len(os.Getenv("AWS_REGION")) == 0 {
return errors.New("an AWS region must be specified using the AWS_REGION environment variable")
}
return nil
}
func execute() {
esChan := make(chan models.DynamoDbSimulationMetrics, buffer)
defer close(esChan)
daxEndpoint := fmt.Sprintf("%s:9111", endpoint)
region := os.Getenv("AWS_REGION")
sess := session.Must(session.NewSession(&aws.Config{
Credentials: credentials.NewChainCredentials([]credentials.Provider{&credentials.EnvProvider{}}),
Endpoint: &daxEndpoint,
Region: &region,
}))
if _, err := sess.Config.Credentials.Get(); err != nil {
log.Errorf("credentials were not loaded! %v+", err)
}
client, err := dax.NewWithSession(*sess)
if err != nil {
log.Errorf("unable to initialize dax client %v", err)
}
partitionKeys, err := scanAllPartitionKeys(client)
if err != nil {
log.Errorf("Unable to fetch partition keys! Simulation failed! %v+", err)
}
go startElasticsearchPublisher(esChan)
for i := 0; i < concurrentSimulations; i++ {
go simulationLoop(esChan, client, partitionKeys)
}
duration, err := time.ParseDuration(strconv.Itoa(duration) + "s")
if err != nil {
log.Errorf("Unable to create duration from the provided time: %v", err)
return
}
<-time.After(duration)
}
func startElasticsearchPublisher(c <-chan models.DynamoDbSimulationMetrics) {
config := elasticsearch.Config{
Addresses: []string{
"http://localhost:9200",
},
Username: username,
Password: password,
}
esClient, err := elasticsearch.NewClient(config)
if err != nil {
log.Errorf("unable to initialize elasticsearch client %v", err)
}
mapping := `{
"properties": {
"timestamp": {
"type": "date"
}
}
}`
log.Infof("Setting the explicit mappings for the %s index", index)
if _, err := esClient.Indices.Create(index); err != nil {
log.Warnf("Unable to create the %s index. Encountered the following error: %v", index, err)
}
if _, err := esClient.Indices.PutMapping([]string{index}, strings.NewReader(mapping)); err != nil {
log.Errorf("unable to create mapping for the %s index! %v+", index, err)
}
for metric := range c {
log.Info("Publishing metrics to Elasticsearch...")
data, _ := json.Marshal(metric)
_, err := esClient.Index(index, bytes.NewReader(data))
if err != nil {
log.Error("Was unable to publish metrics to Elasticsearch! Received a non 2XX response")
} else {
log.Info("Successfully published metrics to Elasticsearch")
}
}
}
func simulationLoop(c chan<- models.DynamoDbSimulationMetrics, client *dax.Dax, partitionKeys []string) {
for {
metrics := new(models.DynamoDbSimulationMetrics)
metrics.Successful = true
metrics.Timestamp = time.Now().UnixNano() / 1e6
startTime := time.Now()
if readOnly {
log.Info("Running a read-only simulation...")
metrics.Scenario = models.ScenarioReadOnly.String()
runReadOnlySimulation(client, metrics, partitionKeys)
} else {
log.Info("Running a CRUD simulation...")
metrics.Scenario = models.ScenarioCrud.String()
runCrudSimulation(client, metrics, partitionKeys)
}
log.Info("Simulation completed successfully!")
duration := time.Since(startTime).Microseconds()
millisecondDuration := float64(duration) / 1000
metrics.SimulationTime = &millisecondDuration
log.Infof("Metrics: %v+", metrics)
c <- *metrics
}
}
func runReadOnlySimulation(client *dax.Dax, metrics *models.DynamoDbSimulationMetrics, partitionKeys []string) {
r := rand.New(rand.NewSource(time.Now().UnixNano()))
time.Sleep(time.Duration(r.Intn(16)))
metrics.Operation = models.DynamoRead.String()
simulators.SimulateReadOperation(client, tableName, partitionKeys, metrics)
}
func runCrudSimulation(client *dax.Dax, metrics *models.DynamoDbSimulationMetrics, partitionKeys []string) {
r := rand.New(rand.NewSource(time.Now().UnixNano()))
operation := r.Intn(3)
log.Infof("Operation number: %d", operation)
switch operation {
case int(models.DynamoRead):
metrics.Operation = models.DynamoRead.String()
simulators.SimulateReadOperation(client, tableName, partitionKeys, metrics)
case int(models.DynamoWrite):
metrics.Operation = models.DynamoWrite.String()
simulators.SimulateWriteOperation(client, tableName, attributes, metrics)
case int(models.DynamoUpdate):
metrics.Operation = models.DynamoUpdate.String()
simulators.SimulateUpdateOperation(client, tableName, attributes, metrics)
}
}
func scanAllPartitionKeys(client *dax.Dax) ([]string, error) {
log.Info("Fetching a large list of partition keys to randomly read...")
projectionExpression := "id"
var limit int64 = 10000
response, err := client.Scan(&dynamodb.ScanInput{
TableName: &tableName,
Limit: &limit,
ProjectionExpression: &projectionExpression,
})
if err != nil {
log.Errorf("Unable to fetch partition keys! %v", err)
return []string{}, err
} else {
log.Info("Fetched partition keys!")
keys := make([]string, 100)
for _, itemsMap := range response.Items {
keys = append(keys, *utils.MapValues(itemsMap)[0].S)
}
log.Infof("Found a total of %d keys", len(keys))
return keys, nil
}
}
+89
View File
@@ -0,0 +1,89 @@
package models
import (
"fmt"
"math/rand"
"strconv"
"time"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/google/uuid"
"gopkg.in/loremipsum.v1"
)
type DynamoOperation int
const (
DynamoRead DynamoOperation = iota
DynamoWrite
DynamoUpdate
)
func (d DynamoOperation) String() string {
switch d {
case DynamoRead:
return "read"
case DynamoWrite:
return "write"
case DynamoUpdate:
return "update"
default:
return "read"
}
}
type Scenario int
const (
ScenarioCrud Scenario = iota
ScenarioReadOnly
)
func (s Scenario) String() string {
switch s {
case ScenarioCrud:
return "crud"
case ScenarioReadOnly:
return "readOnly"
default:
return "crud"
}
}
type BenchmarkingItem map[string]*dynamodb.AttributeValue
func NewBenchmarkingItem(attributes int) BenchmarkingItem {
benchmarkingItem := make(map[string]*dynamodb.AttributeValue)
r := rand.New(rand.NewSource(time.Now().UnixNano()))
loremIpsumGenerator := loremipsum.NewWithSeed(time.Now().UnixNano())
id := uuid.New().String()
benchmarkingItem["id"] = &dynamodb.AttributeValue{S: &id}
for i := 0; i < attributes; i++ {
switch i % 2 {
case 1:
float := fmt.Sprintf("%.2f", r.Float64()*32.00)
benchmarkingItem[strconv.Itoa(i)] = &dynamodb.AttributeValue{N: &float}
default:
sentence := loremIpsumGenerator.Sentence()
benchmarkingItem[strconv.Itoa(i)] = &dynamodb.AttributeValue{S: &sentence}
}
}
return benchmarkingItem
}
type DynamoDbSimulationMetrics struct {
Operation string `json:"operation"`
Timestamp int64 `json:"timestamp"`
Successful bool `json:"successful"`
Scenario string `json:"scenario"`
SimulationTime *float64 `json:"simulationTime,omitempty"`
ReadTime *float64 `json:"readTime,omitempty"`
WriteTime *float64 `json:"writeTime,omitempty"`
WriteItemConfirmationTime *float64 `json:"writeItemConfirmationTime,omitempty"`
UpdateTime *float64 `json:"updateItem,omitempty"`
UpdateItemConfirmationTime *float64 `json:"updateItemConfirmationTime,omitempty"`
DeleteTime *float64 `json:"deleteTime,omitempty"`
DeleteItemConfirmationTime *float64 `json:"deleteItemConfirmationTime,omitempty"`
}
+110
View File
@@ -0,0 +1,110 @@
package simulators
import (
"time"
"github.com/Dark-Alex-17/dynamodb-benchmarker/pkg/models"
"github.com/aws/aws-dax-go/dax"
"github.com/aws/aws-sdk-go/service/dynamodb"
log "github.com/sirupsen/logrus"
)
func ReadItem(client *dax.Dax, tableName string, id dynamodb.AttributeValue, metrics *models.DynamoDbSimulationMetrics, recordMetrics bool) (dynamodb.GetItemOutput, error) {
partitionKey := *id.S
startTime := time.Now()
response, err := client.GetItem(&dynamodb.GetItemInput{
TableName: &tableName,
Key: map[string]*dynamodb.AttributeValue{
"id": {S: id.S},
},
})
if recordMetrics {
duration := time.Since(startTime).Microseconds()
millisecondDuration := float64(duration) / 1000
metrics.ReadTime = &millisecondDuration
}
if err != nil {
log.Errorf("Could not fetch item with partition key: %v. %v+", partitionKey, err)
metrics.Successful = false
return dynamodb.GetItemOutput{}, err
}
if len(response.Item) == 0 {
log.Infof("No items found with partition key: %v", partitionKey)
return dynamodb.GetItemOutput{}, nil
}
return *response, nil
}
func UpdateItem(client *dax.Dax, tableName string, id dynamodb.AttributeValue, attributes int, metrics *models.DynamoDbSimulationMetrics) {
updatedItem := models.NewBenchmarkingItem(attributes)
updatedItem["id"] = &id
partitionKey := *id.S
startTime := time.Now()
_, err := client.PutItem(&dynamodb.PutItemInput{
TableName: &tableName,
Item: updatedItem,
})
duration := time.Since(startTime).Microseconds()
millisecondDuration := float64(duration) / 1000
metrics.UpdateTime = &millisecondDuration
if err != nil {
log.Errorf("Could not update item with partition key: %v. %v+", partitionKey, err)
metrics.Successful = false
} else {
log.Infof("Successfully updated item with partition key: %v", partitionKey)
}
}
func PutItem(client *dax.Dax, tableName string, attributes int, metrics *models.DynamoDbSimulationMetrics) (models.BenchmarkingItem, error) {
newItem := models.NewBenchmarkingItem(attributes)
partitionKey := *newItem["id"].S
startTime := time.Now()
_, err := client.PutItem(&dynamodb.PutItemInput{
TableName: &tableName,
Item: newItem,
})
duration := time.Since(startTime).Microseconds()
millisecondDuration := float64(duration) / 1000
metrics.WriteTime = &millisecondDuration
if err != nil {
log.Errorf("Could not put new item with partition key: %v. %v+", partitionKey, err)
metrics.Successful = false
return models.BenchmarkingItem{}, err
}
log.Infof("Successfully put new item with partition key: %v", partitionKey)
return newItem, nil
}
func DeleteItem(client *dax.Dax, tableName string, id dynamodb.AttributeValue, metrics *models.DynamoDbSimulationMetrics) {
partitionKey := *id.S
startTime := time.Now()
_, err := client.DeleteItem(&dynamodb.DeleteItemInput{
TableName: &tableName,
Key: map[string]*dynamodb.AttributeValue{
"charger_id": &id,
},
})
duration := time.Since(startTime).Microseconds()
millisecondDuration := float64(duration) / 1000
metrics.DeleteTime = &millisecondDuration
if err != nil {
log.Errorf("Could not delete item with partition key: %v. %v+", partitionKey, err)
metrics.Successful = false
} else {
log.Infof("Successfully deleted item with partition key: %v", partitionKey)
}
}
+111
View File
@@ -0,0 +1,111 @@
package simulators
import (
"math/rand"
"strings"
"time"
"github.com/Dark-Alex-17/dynamodb-benchmarker/pkg/models"
"github.com/aws/aws-dax-go/dax"
"github.com/aws/aws-sdk-go/service/dynamodb"
log "github.com/sirupsen/logrus"
)
func SimulateReadOperation(client *dax.Dax, tableName string, partitionKeys []string, metrics *models.DynamoDbSimulationMetrics) {
log.Info("Performing READ operation...")
r := rand.New(rand.NewSource(time.Now().UnixNano()))
var partitionKey string
for {
partitionKey = partitionKeys[r.Intn(len(partitionKeys))]
if len(strings.TrimSpace(partitionKey)) == 0 {
log.Info("Parition key was empty. Trying again to choose a non-empty partition key")
} else {
break
}
}
id := dynamodb.AttributeValue{S: &partitionKey}
for i := 0; i < 10; i++ {
log.Infof("Attempt %d: Fetching existing item with partition key: %v", i, partitionKey)
response, _ := ReadItem(client, tableName, id, metrics, true)
if response.Item["id"] != nil {
log.Infof("Successfully read existing item with partition key: %v", partitionKey)
break
}
log.Errorf("Unable to find existing item with partition key: %v", partitionKey)
if i == 9 {
log.Errorf("All attempts to fetch the existing item with partition key: %v failed!", partitionKey)
metrics.Successful = false
}
}
}
func SimulateWriteOperation(client *dax.Dax, tableName string, attributes int, metrics *models.DynamoDbSimulationMetrics) {
log.Info("Performing WRITE operation...")
benchmarkingItem, err := PutItem(client, tableName, attributes, metrics)
if err != nil {
log.Errorf("Unable to complete PUT simulation. %v+", err)
metrics.Successful = false
return
}
id := *benchmarkingItem["id"]
AssertItemWasCreated(client, tableName, id, metrics)
DeleteItem(client, tableName, id, metrics)
AssertItemWasDeleted(client, tableName, id, metrics)
}
func SimulateUpdateOperation(client *dax.Dax, tableName string, attributes int, metrics *models.DynamoDbSimulationMetrics) {
log.Info("Performing UPDATE operation...")
newItem, err := PutItem(client, tableName, attributes, metrics)
if err != nil {
log.Errorf("Unable to complete UPDATE simulation. %v+", err)
metrics.Successful = false
return
}
id := *newItem["id"]
partitionKey := *id.S
attemptsExhausted := false
AssertItemWasCreated(client, tableName, id, metrics)
UpdateItem(client, tableName, id, attributes, metrics)
startTime := time.Now()
for i := 0; i < 10; i++ {
log.Infof("Attempt %d: Fetching updated item for partition key: %v...", i, partitionKey)
updatedItem, err := ReadItem(client, tableName, id, metrics, false)
if err != nil {
log.Errorf("Unable to complete UPDATE simulation. %v+", err)
metrics.Successful = false
return
}
if *newItem["1"].N != *updatedItem.Item["1"].N {
log.Infof("Confirmed update for partition key: %v", partitionKey)
break
} else {
log.Errorf("Update for partition key %v failed! Values are still equal!", partitionKey)
if i == 9 {
log.Error("Exhausted attempts to fetch updated item!")
metrics.Successful = false
attemptsExhausted = true
}
}
}
if !attemptsExhausted {
duration := time.Since(startTime).Microseconds()
millisecondDuration := float64(duration) / 1000
metrics.UpdateItemConfirmationTime = &millisecondDuration
}
DeleteItem(client, tableName, id, metrics)
AssertItemWasDeleted(client, tableName, id, metrics)
}
+69
View File
@@ -0,0 +1,69 @@
package simulators
import (
"time"
"github.com/Dark-Alex-17/dynamodb-benchmarker/pkg/models"
"github.com/aws/aws-dax-go/dax"
"github.com/aws/aws-sdk-go/service/dynamodb"
log "github.com/sirupsen/logrus"
)
func AssertItemWasCreated(client *dax.Dax, tableName string, id dynamodb.AttributeValue, metrics *models.DynamoDbSimulationMetrics) {
partitionKey := *id.S
attemptsExhausted := false
startTime := time.Now()
for i := 0; i < 10; i++ {
log.Infof("Attempt %d: Fetching newly added item with partition key: %v", i, partitionKey)
newItem, err := ReadItem(client, tableName, id, metrics, false)
if err != nil || newItem.Item["id"].S == nil {
log.Errorf("Unable to find new item with partition key: %v", partitionKey)
if i == 9 {
log.Errorf("All attempts to fetch the newly added item with partition key: %v failed!", partitionKey)
attemptsExhausted = true
metrics.Successful = false
}
} else {
log.Infof("Successfully read new item with partition key: %v", partitionKey)
break
}
}
if !attemptsExhausted {
duration := time.Since(startTime).Microseconds()
millisecondDuration := float64(duration) / 1000
metrics.WriteItemConfirmationTime = &millisecondDuration
}
}
func AssertItemWasDeleted(client *dax.Dax, tableName string, id dynamodb.AttributeValue, metrics *models.DynamoDbSimulationMetrics) {
partitionKey := *id.S
attemptsExhausted := false
startTime := time.Now()
for i := 0; i < 10; i++ {
log.Infof("Attempt %d: Fetching deleted item with partition key: %v ...", i, partitionKey)
deletedItem, _ := ReadItem(client, tableName, id, metrics, false)
if deletedItem.Item["id"].S == nil {
log.Infof("Item with partition key: %v was successfully deleted.", partitionKey)
break
} else {
log.Errorf("Item with partition key %v was not deleted as expected!", partitionKey)
if i == 9 {
log.Errorf("All attempts to receive an empty response to verify item with partition key: %v was deleted failed!", partitionKey)
attemptsExhausted = true
metrics.Successful = false
}
}
}
if !attemptsExhausted {
duration := time.Since(startTime).Microseconds()
millisecondDuration := float64(duration) / 1000
metrics.DeleteItemConfirmationTime = &millisecondDuration
}
}
+11
View File
@@ -0,0 +1,11 @@
package utils
func MapValues[K comparable, V any](inputMap map[K]V) []V {
valuesSlice := make([]V, 0)
for _, value := range inputMap {
valuesSlice = append(valuesSlice, value)
}
return valuesSlice
}