package main import ( "context" "embed" "encoding/json" "fmt" "html/template" "io" "log" "net/http" "os" "strconv" "sync" "time" "atri.dad/distributedperf/lib" "github.com/joho/godotenv" ) //go:embed static var static embed.FS // Embedded filesystem for static web assets // Global storage interfaces for database and cache access var ( db *lib.TursoStorage cache *lib.RedisStorage ) // Thread-safe performance counters for monitoring system behavior var ( cumulativeCacheHits int64 cumulativeCacheMisses int64 cumulativeRowsRead int64 cumulativeRowsWritten int64 counterMutex sync.RWMutex ) // resetCounters safely zeroes all performance counters and logs the before/after values. // This is typically called when clearing all data or starting a new test run. func resetCounters() { counterMutex.Lock() defer counterMutex.Unlock() // Log current values before reset for historical reference log.Printf("Resetting counters - Current values: hits=%d, misses=%d, reads=%d, writes=%d", cumulativeCacheHits, cumulativeCacheMisses, cumulativeRowsRead, cumulativeRowsWritten) // Zero all counters atomically cumulativeCacheHits = 0 cumulativeCacheMisses = 0 cumulativeRowsRead = 0 cumulativeRowsWritten = 0 // Confirm reset was successful log.Printf("Counters after reset: hits=%d, misses=%d, reads=%d, writes=%d", cumulativeCacheHits, cumulativeCacheMisses, cumulativeRowsRead, cumulativeRowsWritten) } // handleRequest processes GET and POST requests for test data using a cache-aside pattern. // GET requests attempt cache retrieval before falling back to database. // POST requests invalidate cache and write directly to database. func handleRequest(w http.ResponseWriter, r *http.Request) { requestStart := time.Now() var data *lib.TestData var err error var dbTime, cacheTime time.Duration log.Printf("Starting %s request", r.Method) switch r.Method { case http.MethodGet: // Try cache first for better performance cacheStart := time.Now() data, err = cache.GetTestData(r.Context()) cacheTime = time.Since(cacheStart) cacheHit := (err == nil && data != nil) if cacheHit { // Update cache hit statistics counterMutex.Lock() log.Printf("Before cache hit increment: hits=%d", cumulativeCacheHits) cumulativeCacheHits++ log.Printf("After cache hit increment: hits=%d", cumulativeCacheHits) counterMutex.Unlock() log.Printf("Cache HIT - total hits now: %d", cumulativeCacheHits) } else { // Handle cache miss - fallback to database counterMutex.Lock() log.Printf("Before cache miss increment: misses=%d", cumulativeCacheMisses) cumulativeCacheMisses++ log.Printf("After cache miss increment: misses=%d", cumulativeCacheMisses) counterMutex.Unlock() log.Printf("Cache MISS - total misses now: %d", cumulativeCacheMisses) // Retrieve from database dbStart := time.Now() data, err = db.GetTestData(r.Context()) dbTime = time.Since(dbStart) if err == nil && data != nil { counterMutex.Lock() cumulativeRowsRead++ counterMutex.Unlock() log.Printf("DB read successful - total rows read: %d", cumulativeRowsRead) // Update cache with fresh data if err := cache.SaveTestData(r.Context(), data); err != nil { log.Printf("Failed to update cache: %v", err) } else { log.Printf("Cache updated with fresh data") } } if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } } case http.MethodPost: // Invalidate cache before write to maintain consistency if err := cache.InvalidateTestData(r.Context()); err != nil { log.Printf("Warning: Cache invalidation failed: %v", err) } else { log.Printf("Cache invalidated for POST operation") } // Create new test data record data = &lib.TestData{ Data: fmt.Sprintf("test-%d", time.Now().Unix()), Timestamp: time.Now(), } // Write to database dbStart := time.Now() err = db.SaveTestData(r.Context(), data) dbTime = time.Since(dbStart) if err == nil { counterMutex.Lock() cumulativeRowsWritten++ counterMutex.Unlock() log.Printf("DB write successful - total rows written: %d", cumulativeRowsWritten) } if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } } // Send response to client json.NewEncoder(w).Encode(data) // Calculate core operation time before metrics processing serviceTime := time.Since(requestStart) // Process metrics asynchronously to minimize request latency go func(svcTime time.Duration, dbT time.Duration, cacheT time.Duration) { // Set timeout for metrics processing ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() // Get total row count for monitoring totalRows, err := db.GetTotalRows(ctx) if err != nil { log.Printf("Failed to get total row count: %v", err) totalRows = 0 } // Capture current counter values atomically counterMutex.RLock() metrics := lib.DataPoint{ Timestamp: time.Now().UnixMilli(), ServiceTime: float64(svcTime.Milliseconds()), DBTime: float64(dbT.Milliseconds()), CacheTime: float64(cacheT.Milliseconds()), DBRowsRead: cumulativeRowsRead, DBRowsWritten: cumulativeRowsWritten, DBTotalRows: totalRows, CacheHits: cumulativeCacheHits, CacheMisses: cumulativeCacheMisses, } counterMutex.RUnlock() // Store metrics if err := db.SaveMetrics(ctx, metrics); err != nil { log.Printf("Failed to save performance metrics: %v", err) } }(serviceTime, dbTime, cacheTime) } // getMetrics retrieves performance metrics within the specified time range. // Supports both absolute and relative time ranges. func getMetrics(w http.ResponseWriter, r *http.Request) { ctx := r.Context() // Parse time range parameters var start, end time.Time end = time.Now() // Parse start time if provided if startStr := r.URL.Query().Get("start"); startStr != "" { if ts, err := strconv.ParseInt(startStr, 10, 64); err == nil { start = time.UnixMilli(ts) } else { http.Error(w, "Invalid start time format", http.StatusBadRequest) return } } // Parse end time if provided if endStr := r.URL.Query().Get("end"); endStr != "" { if ts, err := strconv.ParseInt(endStr, 10, 64); err == nil { end = time.UnixMilli(ts) } else { http.Error(w, "Invalid end time format", http.StatusBadRequest) return } } // Default to last 30 minutes if no start time specified if start.IsZero() { start = end.Add(-30 * time.Minute) } log.Printf("Retrieving metrics from %v to %v", start, end) points, err := db.GetMetrics(ctx, start, end) if err != nil { log.Printf("Metrics retrieval failed: %v", err) http.Error(w, "Internal server error", http.StatusInternalServerError) return } log.Printf("Retrieved %d metric points", len(points)) w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(points) } // clearDB removes all test data and metrics, resetting the system to initial state. func clearDB(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return } log.Printf("Initiating system-wide data clear...") // Reset performance counters resetCounters() log.Printf("Performance counters reset") // Clear cache if err := cache.InvalidateTestData(r.Context()); err != nil { log.Printf("Warning: Cache clear failed: %v", err) } else { log.Printf("Cache cleared successfully") } // Clear database if err := db.ClearDB(r.Context()); err != nil { log.Printf("Database clear failed: %v", err) http.Error(w, "Internal server error", http.StatusInternalServerError) return } log.Printf("Database cleared successfully") w.Write([]byte("OK")) } func main() { var err error // Load environment configuration err = godotenv.Load() if err != nil { log.Printf("Warning: .env file not found: %v", err) } // Validate required environment variables dbURL := os.Getenv("TURSO_URL") dbToken := os.Getenv("TURSO_AUTH_TOKEN") redisURL := os.Getenv("REDIS_URL") if dbURL == "" || dbToken == "" || redisURL == "" { log.Fatal("Missing required environment variables") } // Initialize storage systems db, err = lib.NewTursoStorage(dbURL, dbToken) if err != nil { log.Fatalf("Database initialization failed: %v", err) } cache, err = lib.NewRedisStorage(redisURL) if err != nil { log.Fatalf("Cache initialization failed: %v", err) } // Initialize database schema if err := db.InitTables(); err != nil { log.Fatalf("Schema initialization failed: %v", err) } // Verify metrics system if err := db.DebugMetrics(context.Background()); err != nil { log.Printf("Warning: Metrics verification failed: %v", err) } // Configure API routes http.HandleFunc("/api/request", handleRequest) http.HandleFunc("/api/metrics", getMetrics) http.HandleFunc("/api/clear", clearDB) // Configure static file serving http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { if r.URL.Path == "/" { // Serve index page tmpl := template.Must(template.ParseFS(static, "static/index.html")) tmpl.Execute(w, nil) return } // Serve static files from embedded filesystem fsys := http.FS(static) path := "static" + r.URL.Path file, err := fsys.Open(path) if err != nil { http.Error(w, "File not found", http.StatusNotFound) return } defer file.Close() stat, err := file.Stat() if err != nil { http.Error(w, "File error", http.StatusInternalServerError) return } http.ServeContent(w, r, stat.Name(), stat.ModTime(), file.(io.ReadSeeker)) }) // Start HTTP server port := os.Getenv("PORT") if port == "" { port = "8080" } log.Printf("Server starting on port %s", port) log.Fatal(http.ListenAndServe(":"+port, nil)) }