351 lines
9.7 KiB
Go
351 lines
9.7 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"embed"
|
|
"encoding/json"
|
|
"fmt"
|
|
"html/template"
|
|
"io"
|
|
"log"
|
|
"net/http"
|
|
"os"
|
|
"strconv"
|
|
"sync"
|
|
"time"
|
|
|
|
"atri.dad/distributedperf/lib"
|
|
"github.com/joho/godotenv"
|
|
)
|
|
|
|
//go:embed static
|
|
var static embed.FS // Embedded filesystem for static web assets
|
|
|
|
// Global storage interfaces for database and cache access
|
|
var (
|
|
db *lib.TursoStorage
|
|
cache *lib.RedisStorage
|
|
)
|
|
|
|
// Thread-safe performance counters for monitoring system behavior
|
|
var (
|
|
cumulativeCacheHits int64
|
|
cumulativeCacheMisses int64
|
|
cumulativeRowsRead int64
|
|
cumulativeRowsWritten int64
|
|
counterMutex sync.RWMutex
|
|
)
|
|
|
|
// resetCounters safely zeroes all performance counters and logs the before/after values.
|
|
// This is typically called when clearing all data or starting a new test run.
|
|
func resetCounters() {
|
|
counterMutex.Lock()
|
|
defer counterMutex.Unlock()
|
|
|
|
// Log current values before reset for historical reference
|
|
log.Printf("Resetting counters - Current values: hits=%d, misses=%d, reads=%d, writes=%d",
|
|
cumulativeCacheHits, cumulativeCacheMisses, cumulativeRowsRead, cumulativeRowsWritten)
|
|
|
|
// Zero all counters atomically
|
|
cumulativeCacheHits = 0
|
|
cumulativeCacheMisses = 0
|
|
cumulativeRowsRead = 0
|
|
cumulativeRowsWritten = 0
|
|
|
|
// Confirm reset was successful
|
|
log.Printf("Counters after reset: hits=%d, misses=%d, reads=%d, writes=%d",
|
|
cumulativeCacheHits, cumulativeCacheMisses, cumulativeRowsRead, cumulativeRowsWritten)
|
|
}
|
|
|
|
// handleRequest processes GET and POST requests for test data using a cache-aside pattern.
|
|
// GET requests attempt cache retrieval before falling back to database.
|
|
// POST requests invalidate cache and write directly to database.
|
|
func handleRequest(w http.ResponseWriter, r *http.Request) {
|
|
requestStart := time.Now()
|
|
var data *lib.TestData
|
|
var err error
|
|
var dbTime, cacheTime time.Duration
|
|
|
|
log.Printf("Starting %s request", r.Method)
|
|
|
|
switch r.Method {
|
|
case http.MethodGet:
|
|
// Try cache first for better performance
|
|
cacheStart := time.Now()
|
|
data, err = cache.GetTestData(r.Context())
|
|
cacheTime = time.Since(cacheStart)
|
|
cacheHit := (err == nil && data != nil)
|
|
|
|
if cacheHit {
|
|
// Update cache hit statistics
|
|
counterMutex.Lock()
|
|
log.Printf("Before cache hit increment: hits=%d", cumulativeCacheHits)
|
|
cumulativeCacheHits++
|
|
log.Printf("After cache hit increment: hits=%d", cumulativeCacheHits)
|
|
counterMutex.Unlock()
|
|
log.Printf("Cache HIT - total hits now: %d", cumulativeCacheHits)
|
|
} else {
|
|
// Handle cache miss - fallback to database
|
|
counterMutex.Lock()
|
|
log.Printf("Before cache miss increment: misses=%d", cumulativeCacheMisses)
|
|
cumulativeCacheMisses++
|
|
log.Printf("After cache miss increment: misses=%d", cumulativeCacheMisses)
|
|
counterMutex.Unlock()
|
|
log.Printf("Cache MISS - total misses now: %d", cumulativeCacheMisses)
|
|
|
|
// Retrieve from database
|
|
dbStart := time.Now()
|
|
data, err = db.GetTestData(r.Context())
|
|
dbTime = time.Since(dbStart)
|
|
if err == nil && data != nil {
|
|
counterMutex.Lock()
|
|
cumulativeRowsRead++
|
|
counterMutex.Unlock()
|
|
log.Printf("DB read successful - total rows read: %d", cumulativeRowsRead)
|
|
|
|
// Update cache with fresh data
|
|
if err := cache.SaveTestData(r.Context(), data); err != nil {
|
|
log.Printf("Failed to update cache: %v", err)
|
|
} else {
|
|
log.Printf("Cache updated with fresh data")
|
|
}
|
|
}
|
|
|
|
if err != nil {
|
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
}
|
|
|
|
case http.MethodPost:
|
|
// Invalidate cache before write to maintain consistency
|
|
if err := cache.InvalidateTestData(r.Context()); err != nil {
|
|
log.Printf("Warning: Cache invalidation failed: %v", err)
|
|
} else {
|
|
log.Printf("Cache invalidated for POST operation")
|
|
}
|
|
|
|
// Create new test data record
|
|
data = &lib.TestData{
|
|
Data: fmt.Sprintf("test-%d", time.Now().Unix()),
|
|
Timestamp: time.Now(),
|
|
}
|
|
|
|
// Write to database
|
|
dbStart := time.Now()
|
|
err = db.SaveTestData(r.Context(), data)
|
|
dbTime = time.Since(dbStart)
|
|
if err == nil {
|
|
counterMutex.Lock()
|
|
cumulativeRowsWritten++
|
|
counterMutex.Unlock()
|
|
log.Printf("DB write successful - total rows written: %d", cumulativeRowsWritten)
|
|
}
|
|
|
|
if err != nil {
|
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
}
|
|
|
|
// Send response to client
|
|
json.NewEncoder(w).Encode(data)
|
|
|
|
// Calculate core operation time before metrics processing
|
|
serviceTime := time.Since(requestStart)
|
|
|
|
// Process metrics asynchronously to minimize request latency
|
|
go func(svcTime time.Duration, dbT time.Duration, cacheT time.Duration) {
|
|
// Set timeout for metrics processing
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
|
|
// Get total row count for monitoring
|
|
totalRows, err := db.GetTotalRows(ctx)
|
|
if err != nil {
|
|
log.Printf("Failed to get total row count: %v", err)
|
|
totalRows = 0
|
|
}
|
|
|
|
// Capture current counter values atomically
|
|
counterMutex.RLock()
|
|
metrics := lib.DataPoint{
|
|
Timestamp: time.Now().UnixMilli(),
|
|
ServiceTime: float64(svcTime.Milliseconds()),
|
|
DBTime: float64(dbT.Milliseconds()),
|
|
CacheTime: float64(cacheT.Milliseconds()),
|
|
DBRowsRead: cumulativeRowsRead,
|
|
DBRowsWritten: cumulativeRowsWritten,
|
|
DBTotalRows: totalRows,
|
|
CacheHits: cumulativeCacheHits,
|
|
CacheMisses: cumulativeCacheMisses,
|
|
}
|
|
counterMutex.RUnlock()
|
|
|
|
// Store metrics
|
|
if err := db.SaveMetrics(ctx, metrics); err != nil {
|
|
log.Printf("Failed to save performance metrics: %v", err)
|
|
}
|
|
}(serviceTime, dbTime, cacheTime)
|
|
}
|
|
|
|
// getMetrics retrieves performance metrics within the specified time range.
|
|
// Supports both absolute and relative time ranges.
|
|
func getMetrics(w http.ResponseWriter, r *http.Request) {
|
|
ctx := r.Context()
|
|
|
|
// Parse time range parameters
|
|
var start, end time.Time
|
|
end = time.Now()
|
|
|
|
// Parse start time if provided
|
|
if startStr := r.URL.Query().Get("start"); startStr != "" {
|
|
if ts, err := strconv.ParseInt(startStr, 10, 64); err == nil {
|
|
start = time.UnixMilli(ts)
|
|
} else {
|
|
http.Error(w, "Invalid start time format", http.StatusBadRequest)
|
|
return
|
|
}
|
|
}
|
|
|
|
// Parse end time if provided
|
|
if endStr := r.URL.Query().Get("end"); endStr != "" {
|
|
if ts, err := strconv.ParseInt(endStr, 10, 64); err == nil {
|
|
end = time.UnixMilli(ts)
|
|
} else {
|
|
http.Error(w, "Invalid end time format", http.StatusBadRequest)
|
|
return
|
|
}
|
|
}
|
|
|
|
// Default to last 30 minutes if no start time specified
|
|
if start.IsZero() {
|
|
start = end.Add(-30 * time.Minute)
|
|
}
|
|
|
|
log.Printf("Retrieving metrics from %v to %v", start, end)
|
|
points, err := db.GetMetrics(ctx, start, end)
|
|
if err != nil {
|
|
log.Printf("Metrics retrieval failed: %v", err)
|
|
http.Error(w, "Internal server error", http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
log.Printf("Retrieved %d metric points", len(points))
|
|
w.Header().Set("Content-Type", "application/json")
|
|
json.NewEncoder(w).Encode(points)
|
|
}
|
|
|
|
// clearDB removes all test data and metrics, resetting the system to initial state.
|
|
func clearDB(w http.ResponseWriter, r *http.Request) {
|
|
if r.Method != http.MethodPost {
|
|
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
|
|
return
|
|
}
|
|
|
|
log.Printf("Initiating system-wide data clear...")
|
|
|
|
// Reset performance counters
|
|
resetCounters()
|
|
log.Printf("Performance counters reset")
|
|
|
|
// Clear cache
|
|
if err := cache.InvalidateTestData(r.Context()); err != nil {
|
|
log.Printf("Warning: Cache clear failed: %v", err)
|
|
} else {
|
|
log.Printf("Cache cleared successfully")
|
|
}
|
|
|
|
// Clear database
|
|
if err := db.ClearDB(r.Context()); err != nil {
|
|
log.Printf("Database clear failed: %v", err)
|
|
http.Error(w, "Internal server error", http.StatusInternalServerError)
|
|
return
|
|
}
|
|
log.Printf("Database cleared successfully")
|
|
|
|
w.Write([]byte("OK"))
|
|
}
|
|
|
|
func main() {
|
|
var err error
|
|
|
|
// Load environment configuration
|
|
err = godotenv.Load()
|
|
if err != nil {
|
|
log.Printf("Warning: .env file not found: %v", err)
|
|
}
|
|
|
|
// Validate required environment variables
|
|
dbURL := os.Getenv("TURSO_URL")
|
|
dbToken := os.Getenv("TURSO_AUTH_TOKEN")
|
|
redisURL := os.Getenv("REDIS_URL")
|
|
|
|
if dbURL == "" || dbToken == "" || redisURL == "" {
|
|
log.Fatal("Missing required environment variables")
|
|
}
|
|
|
|
// Initialize storage systems
|
|
db, err = lib.NewTursoStorage(dbURL, dbToken)
|
|
if err != nil {
|
|
log.Fatalf("Database initialization failed: %v", err)
|
|
}
|
|
|
|
cache, err = lib.NewRedisStorage(redisURL)
|
|
if err != nil {
|
|
log.Fatalf("Cache initialization failed: %v", err)
|
|
}
|
|
|
|
// Initialize database schema
|
|
if err := db.InitTables(); err != nil {
|
|
log.Fatalf("Schema initialization failed: %v", err)
|
|
}
|
|
|
|
// Verify metrics system
|
|
if err := db.DebugMetrics(context.Background()); err != nil {
|
|
log.Printf("Warning: Metrics verification failed: %v", err)
|
|
}
|
|
|
|
// Configure API routes
|
|
http.HandleFunc("/api/request", handleRequest)
|
|
http.HandleFunc("/api/metrics", getMetrics)
|
|
http.HandleFunc("/api/clear", clearDB)
|
|
|
|
// Configure static file serving
|
|
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
|
|
if r.URL.Path == "/" {
|
|
// Serve index page
|
|
tmpl := template.Must(template.ParseFS(static, "static/index.html"))
|
|
tmpl.Execute(w, nil)
|
|
return
|
|
}
|
|
|
|
// Serve static files from embedded filesystem
|
|
fsys := http.FS(static)
|
|
path := "static" + r.URL.Path
|
|
|
|
file, err := fsys.Open(path)
|
|
if err != nil {
|
|
http.Error(w, "File not found", http.StatusNotFound)
|
|
return
|
|
}
|
|
defer file.Close()
|
|
|
|
stat, err := file.Stat()
|
|
if err != nil {
|
|
http.Error(w, "File error", http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
http.ServeContent(w, r, stat.Name(), stat.ModTime(), file.(io.ReadSeeker))
|
|
})
|
|
|
|
// Start HTTP server
|
|
port := os.Getenv("PORT")
|
|
if port == "" {
|
|
port = "8080"
|
|
}
|
|
|
|
log.Printf("Server starting on port %s", port)
|
|
log.Fatal(http.ListenAndServe(":"+port, nil))
|
|
}
|