338 lines
10 KiB
Go
338 lines
10 KiB
Go
package lib
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"fmt"
|
|
"log"
|
|
"time"
|
|
|
|
_ "github.com/tursodatabase/libsql-client-go/libsql"
|
|
)
|
|
|
|
// TursoStorage implements a Turso (distributed SQLite) backed storage layer
|
|
// for persisting test data and performance metrics with automatic connection management.
|
|
type TursoStorage struct {
|
|
db *sql.DB // Database connection pool
|
|
}
|
|
|
|
// NewTursoStorage initializes a new database connection pool with the provided credentials.
|
|
// It configures connection pooling and timeout settings for optimal performance.
|
|
func NewTursoStorage(url, token string) (*TursoStorage, error) {
|
|
db, err := sql.Open("libsql", url+"?authToken="+token)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("database connection failed: %v", err)
|
|
}
|
|
|
|
// Configure connection pool settings
|
|
db.SetMaxOpenConns(200) // Maximum concurrent connections
|
|
db.SetConnMaxLifetime(5 * time.Minute) // Connection time-to-live
|
|
db.SetMaxIdleConns(25) // Connections maintained when idle
|
|
|
|
return &TursoStorage{db: db}, nil
|
|
}
|
|
|
|
// Close safely shuts down the database connection pool.
|
|
// Should be called during application shutdown to prevent connection leaks.
|
|
func (s *TursoStorage) Close() error {
|
|
if err := s.db.Close(); err != nil {
|
|
return fmt.Errorf("error closing database connections: %v", err)
|
|
}
|
|
log.Printf("Database connections closed successfully")
|
|
return nil
|
|
}
|
|
|
|
// InitTables ensures all required database tables and indices exist.
|
|
// This should be called once during application startup.
|
|
func (s *TursoStorage) InitTables() error {
|
|
log.Printf("Initializing database schema...")
|
|
|
|
// Verify existing schema
|
|
var count int
|
|
err := s.db.QueryRow("SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='metrics'").Scan(&count)
|
|
if err != nil {
|
|
log.Printf("Error checking existing schema: %v", err)
|
|
} else {
|
|
log.Printf("Found %d existing metrics tables", count)
|
|
}
|
|
|
|
// Create required tables and indices
|
|
_, err = s.db.Exec(`
|
|
CREATE TABLE IF NOT EXISTS metrics (
|
|
timestamp INTEGER, -- Unix timestamp in milliseconds
|
|
service_time REAL, -- Total request processing time (ms)
|
|
db_time REAL, -- Database operation time (ms)
|
|
cache_time REAL, -- Cache operation time (ms)
|
|
db_rows_read INTEGER DEFAULT 0,
|
|
db_rows_written INTEGER DEFAULT 0,
|
|
db_total_rows INTEGER DEFAULT 0,
|
|
cache_hits INTEGER DEFAULT 0,
|
|
cache_misses INTEGER DEFAULT 0
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS test_data (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
data TEXT NOT NULL,
|
|
timestamp DATETIME NOT NULL
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_test_timestamp ON test_data(timestamp);
|
|
`)
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("schema initialization failed: %v", err)
|
|
}
|
|
|
|
// Verify tables were created
|
|
tables := []string{"metrics", "test_data"}
|
|
for _, table := range tables {
|
|
var count int
|
|
err := s.db.QueryRow("SELECT COUNT(*) FROM " + table).Scan(&count)
|
|
if err != nil {
|
|
log.Printf("Error verifying table %s: %v", table, err)
|
|
} else {
|
|
log.Printf("Table %s exists with %d rows", table, count)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// GetTotalRows returns the total number of rows in the test_data table.
|
|
// Used for monitoring database growth over time.
|
|
func (s *TursoStorage) GetTotalRows(ctx context.Context) (int64, error) {
|
|
var count int64
|
|
err := s.db.QueryRowContext(ctx, "SELECT COUNT(*) FROM test_data").Scan(&count)
|
|
return count, err
|
|
}
|
|
|
|
// SaveTestData stores a new test data record in the database.
|
|
// It first clears existing data to maintain a single active test record.
|
|
func (s *TursoStorage) SaveTestData(ctx context.Context, data *TestData) error {
|
|
// Clear existing data
|
|
_, err := s.db.ExecContext(ctx, "DELETE FROM test_data")
|
|
if err != nil {
|
|
return fmt.Errorf("failed to clear existing data: %v", err)
|
|
}
|
|
|
|
// Insert new record
|
|
result, err := s.db.ExecContext(ctx, `
|
|
INSERT INTO test_data (data, timestamp)
|
|
VALUES (?, ?)
|
|
`, data.Data, data.Timestamp)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to insert test data: %v", err)
|
|
}
|
|
|
|
// Update ID of inserted record
|
|
id, err := result.LastInsertId()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get inserted ID: %v", err)
|
|
}
|
|
data.ID = id
|
|
return nil
|
|
}
|
|
|
|
// GetTestData retrieves the most recent test data record.
|
|
func (s *TursoStorage) GetTestData(ctx context.Context) (*TestData, error) {
|
|
var data TestData
|
|
err := s.db.QueryRowContext(ctx, `
|
|
SELECT id, data, timestamp
|
|
FROM test_data
|
|
ORDER BY timestamp DESC
|
|
LIMIT 1
|
|
`).Scan(&data.ID, &data.Data, &data.Timestamp)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to retrieve test data: %v", err)
|
|
}
|
|
return &data, nil
|
|
}
|
|
|
|
// SaveMetrics stores a new performance metrics data point.
|
|
// This data is used for monitoring and visualization.
|
|
func (s *TursoStorage) SaveMetrics(ctx context.Context, point DataPoint) error {
|
|
log.Printf("Storing metrics - Service: %.2fms, DB: %.2fms, Cache: %.2fms",
|
|
point.ServiceTime, point.DBTime, point.CacheTime)
|
|
|
|
_, err := s.db.ExecContext(ctx, `
|
|
INSERT INTO metrics (
|
|
timestamp,
|
|
service_time,
|
|
db_time,
|
|
cache_time,
|
|
db_rows_read,
|
|
db_rows_written,
|
|
db_total_rows,
|
|
cache_hits,
|
|
cache_misses
|
|
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`,
|
|
point.Timestamp,
|
|
point.ServiceTime,
|
|
point.DBTime,
|
|
point.CacheTime,
|
|
point.DBRowsRead,
|
|
point.DBRowsWritten,
|
|
point.DBTotalRows,
|
|
point.CacheHits,
|
|
point.CacheMisses,
|
|
)
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("failed to store metrics: %v", err)
|
|
}
|
|
|
|
log.Printf("Metrics stored successfully")
|
|
return nil
|
|
}
|
|
|
|
// ClearDB removes all data from both metrics and test_data tables.
|
|
// This operation is atomic - either all data is cleared or none is.
|
|
func (s *TursoStorage) ClearDB(ctx context.Context) error {
|
|
// Use transaction for atomicity
|
|
tx, err := s.db.BeginTx(ctx, nil)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to start transaction: %v", err)
|
|
}
|
|
defer tx.Rollback() // Ensure rollback on error
|
|
|
|
// Clear metrics table
|
|
if _, err := tx.ExecContext(ctx, "DELETE FROM metrics"); err != nil {
|
|
return fmt.Errorf("failed to clear metrics: %v", err)
|
|
}
|
|
|
|
// Clear test_data table
|
|
if _, err := tx.ExecContext(ctx, "DELETE FROM test_data"); err != nil {
|
|
return fmt.Errorf("failed to clear test data: %v", err)
|
|
}
|
|
|
|
// Commit transaction
|
|
if err := tx.Commit(); err != nil {
|
|
return fmt.Errorf("failed to commit clear operation: %v", err)
|
|
}
|
|
|
|
log.Printf("Database cleared successfully")
|
|
return nil
|
|
}
|
|
|
|
// GetMetrics retrieves performance metrics within the specified time range.
|
|
// Returns metrics sorted by timestamp in descending order, limited to 10000 points.
|
|
func (s *TursoStorage) GetMetrics(ctx context.Context, start, end time.Time) ([]DataPoint, error) {
|
|
log.Printf("Retrieving metrics from %v to %v", start, end)
|
|
|
|
// Convert timestamps to Unix milliseconds for storage
|
|
startMs := start.UnixMilli()
|
|
endMs := end.UnixMilli()
|
|
|
|
// Prepare query with time range filter
|
|
query := `
|
|
SELECT
|
|
timestamp, service_time, db_time, cache_time,
|
|
db_rows_read, db_rows_written, db_total_rows,
|
|
cache_hits, cache_misses
|
|
FROM metrics
|
|
WHERE timestamp BETWEEN ? AND ?
|
|
ORDER BY timestamp DESC
|
|
LIMIT 10000 -- Protect against excessive memory usage
|
|
`
|
|
|
|
log.Printf("Executing query with range: %d to %d", startMs, endMs)
|
|
rows, err := s.db.QueryContext(ctx, query, startMs, endMs)
|
|
if err != nil {
|
|
log.Printf("Query failed: %v", err)
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
// Collect all matching metrics
|
|
points := make([]DataPoint, 0)
|
|
for rows.Next() {
|
|
var p DataPoint
|
|
if err := rows.Scan(
|
|
&p.Timestamp, &p.ServiceTime, &p.DBTime, &p.CacheTime,
|
|
&p.DBRowsRead, &p.DBRowsWritten, &p.DBTotalRows,
|
|
&p.CacheHits, &p.CacheMisses,
|
|
); err != nil {
|
|
log.Printf("Row scan failed: %v", err)
|
|
return nil, err
|
|
}
|
|
points = append(points, p)
|
|
}
|
|
|
|
// Log summary of retrieved data
|
|
if len(points) > 0 {
|
|
log.Printf("First point: %v (%v)",
|
|
points[0].Timestamp,
|
|
time.UnixMilli(points[0].Timestamp))
|
|
log.Printf("Last point: %v (%v)",
|
|
points[len(points)-1].Timestamp,
|
|
time.UnixMilli(points[len(points)-1].Timestamp))
|
|
}
|
|
|
|
log.Printf("Retrieved %d metric points", len(points))
|
|
return points, rows.Err()
|
|
}
|
|
|
|
// DebugMetrics performs diagnostic checks on the metrics table.
|
|
// Used during startup and for troubleshooting system state.
|
|
func (s *TursoStorage) DebugMetrics(ctx context.Context) error {
|
|
// Check total metrics count
|
|
var count int
|
|
err := s.db.QueryRowContext(ctx, "SELECT COUNT(*) FROM metrics").Scan(&count)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to count metrics: %v", err)
|
|
}
|
|
log.Printf("Debug: Total metrics in database: %d", count)
|
|
|
|
if count == 0 {
|
|
log.Printf("Debug: Metrics table is empty")
|
|
return nil
|
|
}
|
|
|
|
// Check timestamp range of stored metrics
|
|
var minTs, maxTs int64
|
|
err = s.db.QueryRowContext(ctx, "SELECT MIN(timestamp), MAX(timestamp) FROM metrics").Scan(&minTs, &maxTs)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get timestamp range: %v", err)
|
|
}
|
|
log.Printf("Debug: Metrics timestamp range: %v to %v",
|
|
time.UnixMilli(minTs),
|
|
time.UnixMilli(maxTs))
|
|
|
|
// Sample recent metrics for verification
|
|
rows, err := s.db.QueryContext(ctx, `
|
|
SELECT timestamp, service_time, db_time, cache_time,
|
|
db_rows_read, db_rows_written, db_total_rows,
|
|
cache_hits, cache_misses
|
|
FROM metrics
|
|
ORDER BY timestamp DESC
|
|
LIMIT 5
|
|
`)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to query recent metrics: %v", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
log.Printf("Debug: Most recent metrics:")
|
|
for rows.Next() {
|
|
var p DataPoint
|
|
if err := rows.Scan(
|
|
&p.Timestamp, &p.ServiceTime, &p.DBTime, &p.CacheTime,
|
|
&p.DBRowsRead, &p.DBRowsWritten, &p.DBTotalRows,
|
|
&p.CacheHits, &p.CacheMisses,
|
|
); err != nil {
|
|
return fmt.Errorf("failed to scan row: %v", err)
|
|
}
|
|
log.Printf("Time: %v, Service: %.2fms, DB: %.2fms, Cache: %.2fms, "+
|
|
"Reads: %d, Writes: %d, Total: %d, Hits: %d, Misses: %d",
|
|
time.UnixMilli(p.Timestamp),
|
|
p.ServiceTime,
|
|
p.DBTime,
|
|
p.CacheTime,
|
|
p.DBRowsRead,
|
|
p.DBRowsWritten,
|
|
p.DBTotalRows,
|
|
p.CacheHits,
|
|
p.CacheMisses)
|
|
}
|
|
|
|
return rows.Err()
|
|
}
|