1
0
Fork 0
cmpt815perf/main.go

351 lines
9.7 KiB
Go

package main
import (
"context"
"embed"
"encoding/json"
"fmt"
"html/template"
"io"
"log"
"net/http"
"os"
"strconv"
"sync"
"time"
"atri.dad/distributedperf/lib"
"github.com/joho/godotenv"
)
//go:embed static
var static embed.FS // Embedded filesystem for static web assets
// Global storage interfaces for database and cache access
var (
db *lib.TursoStorage
cache *lib.RedisStorage
)
// Thread-safe performance counters for monitoring system behavior
var (
cumulativeCacheHits int64
cumulativeCacheMisses int64
cumulativeRowsRead int64
cumulativeRowsWritten int64
counterMutex sync.RWMutex
)
// resetCounters safely zeroes all performance counters and logs the before/after values.
// This is typically called when clearing all data or starting a new test run.
func resetCounters() {
counterMutex.Lock()
defer counterMutex.Unlock()
// Log current values before reset for historical reference
log.Printf("Resetting counters - Current values: hits=%d, misses=%d, reads=%d, writes=%d",
cumulativeCacheHits, cumulativeCacheMisses, cumulativeRowsRead, cumulativeRowsWritten)
// Zero all counters atomically
cumulativeCacheHits = 0
cumulativeCacheMisses = 0
cumulativeRowsRead = 0
cumulativeRowsWritten = 0
// Confirm reset was successful
log.Printf("Counters after reset: hits=%d, misses=%d, reads=%d, writes=%d",
cumulativeCacheHits, cumulativeCacheMisses, cumulativeRowsRead, cumulativeRowsWritten)
}
// handleRequest processes GET and POST requests for test data using a cache-aside pattern.
// GET requests attempt cache retrieval before falling back to database.
// POST requests invalidate cache and write directly to database.
func handleRequest(w http.ResponseWriter, r *http.Request) {
requestStart := time.Now()
var data *lib.TestData
var err error
var dbTime, cacheTime time.Duration
log.Printf("Starting %s request", r.Method)
switch r.Method {
case http.MethodGet:
// Try cache first for better performance
cacheStart := time.Now()
data, err = cache.GetTestData(r.Context())
cacheTime = time.Since(cacheStart)
cacheHit := (err == nil && data != nil)
if cacheHit {
// Update cache hit statistics
counterMutex.Lock()
log.Printf("Before cache hit increment: hits=%d", cumulativeCacheHits)
cumulativeCacheHits++
log.Printf("After cache hit increment: hits=%d", cumulativeCacheHits)
counterMutex.Unlock()
log.Printf("Cache HIT - total hits now: %d", cumulativeCacheHits)
} else {
// Handle cache miss - fallback to database
counterMutex.Lock()
log.Printf("Before cache miss increment: misses=%d", cumulativeCacheMisses)
cumulativeCacheMisses++
log.Printf("After cache miss increment: misses=%d", cumulativeCacheMisses)
counterMutex.Unlock()
log.Printf("Cache MISS - total misses now: %d", cumulativeCacheMisses)
// Retrieve from database
dbStart := time.Now()
data, err = db.GetTestData(r.Context())
dbTime = time.Since(dbStart)
if err == nil && data != nil {
counterMutex.Lock()
cumulativeRowsRead++
counterMutex.Unlock()
log.Printf("DB read successful - total rows read: %d", cumulativeRowsRead)
// Update cache with fresh data
if err := cache.SaveTestData(r.Context(), data); err != nil {
log.Printf("Failed to update cache: %v", err)
} else {
log.Printf("Cache updated with fresh data")
}
}
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
case http.MethodPost:
// Invalidate cache before write to maintain consistency
if err := cache.InvalidateTestData(r.Context()); err != nil {
log.Printf("Warning: Cache invalidation failed: %v", err)
} else {
log.Printf("Cache invalidated for POST operation")
}
// Create new test data record
data = &lib.TestData{
Data: fmt.Sprintf("test-%d", time.Now().Unix()),
Timestamp: time.Now(),
}
// Write to database
dbStart := time.Now()
err = db.SaveTestData(r.Context(), data)
dbTime = time.Since(dbStart)
if err == nil {
counterMutex.Lock()
cumulativeRowsWritten++
counterMutex.Unlock()
log.Printf("DB write successful - total rows written: %d", cumulativeRowsWritten)
}
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
// Send response to client
json.NewEncoder(w).Encode(data)
// Calculate core operation time before metrics processing
serviceTime := time.Since(requestStart)
// Process metrics asynchronously to minimize request latency
go func(svcTime time.Duration, dbT time.Duration, cacheT time.Duration) {
// Set timeout for metrics processing
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// Get total row count for monitoring
totalRows, err := db.GetTotalRows(ctx)
if err != nil {
log.Printf("Failed to get total row count: %v", err)
totalRows = 0
}
// Capture current counter values atomically
counterMutex.RLock()
metrics := lib.DataPoint{
Timestamp: time.Now().UnixMilli(),
ServiceTime: float64(svcTime.Milliseconds()),
DBTime: float64(dbT.Milliseconds()),
CacheTime: float64(cacheT.Milliseconds()),
DBRowsRead: cumulativeRowsRead,
DBRowsWritten: cumulativeRowsWritten,
DBTotalRows: totalRows,
CacheHits: cumulativeCacheHits,
CacheMisses: cumulativeCacheMisses,
}
counterMutex.RUnlock()
// Store metrics
if err := db.SaveMetrics(ctx, metrics); err != nil {
log.Printf("Failed to save performance metrics: %v", err)
}
}(serviceTime, dbTime, cacheTime)
}
// getMetrics retrieves performance metrics within the specified time range.
// Supports both absolute and relative time ranges.
func getMetrics(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
// Parse time range parameters
var start, end time.Time
end = time.Now()
// Parse start time if provided
if startStr := r.URL.Query().Get("start"); startStr != "" {
if ts, err := strconv.ParseInt(startStr, 10, 64); err == nil {
start = time.UnixMilli(ts)
} else {
http.Error(w, "Invalid start time format", http.StatusBadRequest)
return
}
}
// Parse end time if provided
if endStr := r.URL.Query().Get("end"); endStr != "" {
if ts, err := strconv.ParseInt(endStr, 10, 64); err == nil {
end = time.UnixMilli(ts)
} else {
http.Error(w, "Invalid end time format", http.StatusBadRequest)
return
}
}
// Default to last 30 minutes if no start time specified
if start.IsZero() {
start = end.Add(-30 * time.Minute)
}
log.Printf("Retrieving metrics from %v to %v", start, end)
points, err := db.GetMetrics(ctx, start, end)
if err != nil {
log.Printf("Metrics retrieval failed: %v", err)
http.Error(w, "Internal server error", http.StatusInternalServerError)
return
}
log.Printf("Retrieved %d metric points", len(points))
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(points)
}
// clearDB removes all test data and metrics, resetting the system to initial state.
func clearDB(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
log.Printf("Initiating system-wide data clear...")
// Reset performance counters
resetCounters()
log.Printf("Performance counters reset")
// Clear cache
if err := cache.InvalidateTestData(r.Context()); err != nil {
log.Printf("Warning: Cache clear failed: %v", err)
} else {
log.Printf("Cache cleared successfully")
}
// Clear database
if err := db.ClearDB(r.Context()); err != nil {
log.Printf("Database clear failed: %v", err)
http.Error(w, "Internal server error", http.StatusInternalServerError)
return
}
log.Printf("Database cleared successfully")
w.Write([]byte("OK"))
}
func main() {
var err error
// Load environment configuration
err = godotenv.Load()
if err != nil {
log.Printf("Warning: .env file not found: %v", err)
}
// Validate required environment variables
dbURL := os.Getenv("TURSO_URL")
dbToken := os.Getenv("TURSO_AUTH_TOKEN")
redisURL := os.Getenv("REDIS_URL")
if dbURL == "" || dbToken == "" || redisURL == "" {
log.Fatal("Missing required environment variables")
}
// Initialize storage systems
db, err = lib.NewTursoStorage(dbURL, dbToken)
if err != nil {
log.Fatalf("Database initialization failed: %v", err)
}
cache, err = lib.NewRedisStorage(redisURL)
if err != nil {
log.Fatalf("Cache initialization failed: %v", err)
}
// Initialize database schema
if err := db.InitTables(); err != nil {
log.Fatalf("Schema initialization failed: %v", err)
}
// Verify metrics system
if err := db.DebugMetrics(context.Background()); err != nil {
log.Printf("Warning: Metrics verification failed: %v", err)
}
// Configure API routes
http.HandleFunc("/api/request", handleRequest)
http.HandleFunc("/api/metrics", getMetrics)
http.HandleFunc("/api/clear", clearDB)
// Configure static file serving
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/" {
// Serve index page
tmpl := template.Must(template.ParseFS(static, "static/index.html"))
tmpl.Execute(w, nil)
return
}
// Serve static files from embedded filesystem
fsys := http.FS(static)
path := "static" + r.URL.Path
file, err := fsys.Open(path)
if err != nil {
http.Error(w, "File not found", http.StatusNotFound)
return
}
defer file.Close()
stat, err := file.Stat()
if err != nil {
http.Error(w, "File error", http.StatusInternalServerError)
return
}
http.ServeContent(w, r, stat.Name(), stat.ModTime(), file.(io.ReadSeeker))
})
// Start HTTP server
port := os.Getenv("PORT")
if port == "" {
port = "8080"
}
log.Printf("Server starting on port %s", port)
log.Fatal(http.ListenAndServe(":"+port, nil))
}