1
0
Fork 0
cmpt815perf/lib/turso.go

338 lines
10 KiB
Go

package lib
import (
"context"
"database/sql"
"fmt"
"log"
"time"
_ "github.com/tursodatabase/libsql-client-go/libsql"
)
// TursoStorage implements a Turso (distributed SQLite) backed storage layer
// for persisting test data and performance metrics with automatic connection management.
type TursoStorage struct {
db *sql.DB // Database connection pool
}
// NewTursoStorage initializes a new database connection pool with the provided credentials.
// It configures connection pooling and timeout settings for optimal performance.
func NewTursoStorage(url, token string) (*TursoStorage, error) {
db, err := sql.Open("libsql", url+"?authToken="+token)
if err != nil {
return nil, fmt.Errorf("database connection failed: %v", err)
}
// Configure connection pool settings
db.SetMaxOpenConns(200) // Maximum concurrent connections
db.SetConnMaxLifetime(5 * time.Minute) // Connection time-to-live
db.SetMaxIdleConns(25) // Connections maintained when idle
return &TursoStorage{db: db}, nil
}
// Close safely shuts down the database connection pool.
// Should be called during application shutdown to prevent connection leaks.
func (s *TursoStorage) Close() error {
if err := s.db.Close(); err != nil {
return fmt.Errorf("error closing database connections: %v", err)
}
log.Printf("Database connections closed successfully")
return nil
}
// InitTables ensures all required database tables and indices exist.
// This should be called once during application startup.
func (s *TursoStorage) InitTables() error {
log.Printf("Initializing database schema...")
// Verify existing schema
var count int
err := s.db.QueryRow("SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='metrics'").Scan(&count)
if err != nil {
log.Printf("Error checking existing schema: %v", err)
} else {
log.Printf("Found %d existing metrics tables", count)
}
// Create required tables and indices
_, err = s.db.Exec(`
CREATE TABLE IF NOT EXISTS metrics (
timestamp INTEGER, -- Unix timestamp in milliseconds
service_time REAL, -- Total request processing time (ms)
db_time REAL, -- Database operation time (ms)
cache_time REAL, -- Cache operation time (ms)
db_rows_read INTEGER DEFAULT 0,
db_rows_written INTEGER DEFAULT 0,
db_total_rows INTEGER DEFAULT 0,
cache_hits INTEGER DEFAULT 0,
cache_misses INTEGER DEFAULT 0
);
CREATE TABLE IF NOT EXISTS test_data (
id INTEGER PRIMARY KEY AUTOINCREMENT,
data TEXT NOT NULL,
timestamp DATETIME NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_test_timestamp ON test_data(timestamp);
`)
if err != nil {
return fmt.Errorf("schema initialization failed: %v", err)
}
// Verify tables were created
tables := []string{"metrics", "test_data"}
for _, table := range tables {
var count int
err := s.db.QueryRow("SELECT COUNT(*) FROM " + table).Scan(&count)
if err != nil {
log.Printf("Error verifying table %s: %v", table, err)
} else {
log.Printf("Table %s exists with %d rows", table, count)
}
}
return nil
}
// GetTotalRows returns the total number of rows in the test_data table.
// Used for monitoring database growth over time.
func (s *TursoStorage) GetTotalRows(ctx context.Context) (int64, error) {
var count int64
err := s.db.QueryRowContext(ctx, "SELECT COUNT(*) FROM test_data").Scan(&count)
return count, err
}
// SaveTestData stores a new test data record in the database.
// It first clears existing data to maintain a single active test record.
func (s *TursoStorage) SaveTestData(ctx context.Context, data *TestData) error {
// Clear existing data
_, err := s.db.ExecContext(ctx, "DELETE FROM test_data")
if err != nil {
return fmt.Errorf("failed to clear existing data: %v", err)
}
// Insert new record
result, err := s.db.ExecContext(ctx, `
INSERT INTO test_data (data, timestamp)
VALUES (?, ?)
`, data.Data, data.Timestamp)
if err != nil {
return fmt.Errorf("failed to insert test data: %v", err)
}
// Update ID of inserted record
id, err := result.LastInsertId()
if err != nil {
return fmt.Errorf("failed to get inserted ID: %v", err)
}
data.ID = id
return nil
}
// GetTestData retrieves the most recent test data record.
func (s *TursoStorage) GetTestData(ctx context.Context) (*TestData, error) {
var data TestData
err := s.db.QueryRowContext(ctx, `
SELECT id, data, timestamp
FROM test_data
ORDER BY timestamp DESC
LIMIT 1
`).Scan(&data.ID, &data.Data, &data.Timestamp)
if err != nil {
return nil, fmt.Errorf("failed to retrieve test data: %v", err)
}
return &data, nil
}
// SaveMetrics stores a new performance metrics data point.
// This data is used for monitoring and visualization.
func (s *TursoStorage) SaveMetrics(ctx context.Context, point DataPoint) error {
log.Printf("Storing metrics - Service: %.2fms, DB: %.2fms, Cache: %.2fms",
point.ServiceTime, point.DBTime, point.CacheTime)
_, err := s.db.ExecContext(ctx, `
INSERT INTO metrics (
timestamp,
service_time,
db_time,
cache_time,
db_rows_read,
db_rows_written,
db_total_rows,
cache_hits,
cache_misses
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`,
point.Timestamp,
point.ServiceTime,
point.DBTime,
point.CacheTime,
point.DBRowsRead,
point.DBRowsWritten,
point.DBTotalRows,
point.CacheHits,
point.CacheMisses,
)
if err != nil {
return fmt.Errorf("failed to store metrics: %v", err)
}
log.Printf("Metrics stored successfully")
return nil
}
// ClearDB removes all data from both metrics and test_data tables.
// This operation is atomic - either all data is cleared or none is.
func (s *TursoStorage) ClearDB(ctx context.Context) error {
// Use transaction for atomicity
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("failed to start transaction: %v", err)
}
defer tx.Rollback() // Ensure rollback on error
// Clear metrics table
if _, err := tx.ExecContext(ctx, "DELETE FROM metrics"); err != nil {
return fmt.Errorf("failed to clear metrics: %v", err)
}
// Clear test_data table
if _, err := tx.ExecContext(ctx, "DELETE FROM test_data"); err != nil {
return fmt.Errorf("failed to clear test data: %v", err)
}
// Commit transaction
if err := tx.Commit(); err != nil {
return fmt.Errorf("failed to commit clear operation: %v", err)
}
log.Printf("Database cleared successfully")
return nil
}
// GetMetrics retrieves performance metrics within the specified time range.
// Returns metrics sorted by timestamp in descending order, limited to 10000 points.
func (s *TursoStorage) GetMetrics(ctx context.Context, start, end time.Time) ([]DataPoint, error) {
log.Printf("Retrieving metrics from %v to %v", start, end)
// Convert timestamps to Unix milliseconds for storage
startMs := start.UnixMilli()
endMs := end.UnixMilli()
// Prepare query with time range filter
query := `
SELECT
timestamp, service_time, db_time, cache_time,
db_rows_read, db_rows_written, db_total_rows,
cache_hits, cache_misses
FROM metrics
WHERE timestamp BETWEEN ? AND ?
ORDER BY timestamp DESC
LIMIT 10000 -- Protect against excessive memory usage
`
log.Printf("Executing query with range: %d to %d", startMs, endMs)
rows, err := s.db.QueryContext(ctx, query, startMs, endMs)
if err != nil {
log.Printf("Query failed: %v", err)
return nil, err
}
defer rows.Close()
// Collect all matching metrics
points := make([]DataPoint, 0)
for rows.Next() {
var p DataPoint
if err := rows.Scan(
&p.Timestamp, &p.ServiceTime, &p.DBTime, &p.CacheTime,
&p.DBRowsRead, &p.DBRowsWritten, &p.DBTotalRows,
&p.CacheHits, &p.CacheMisses,
); err != nil {
log.Printf("Row scan failed: %v", err)
return nil, err
}
points = append(points, p)
}
// Log summary of retrieved data
if len(points) > 0 {
log.Printf("First point: %v (%v)",
points[0].Timestamp,
time.UnixMilli(points[0].Timestamp))
log.Printf("Last point: %v (%v)",
points[len(points)-1].Timestamp,
time.UnixMilli(points[len(points)-1].Timestamp))
}
log.Printf("Retrieved %d metric points", len(points))
return points, rows.Err()
}
// DebugMetrics performs diagnostic checks on the metrics table.
// Used during startup and for troubleshooting system state.
func (s *TursoStorage) DebugMetrics(ctx context.Context) error {
// Check total metrics count
var count int
err := s.db.QueryRowContext(ctx, "SELECT COUNT(*) FROM metrics").Scan(&count)
if err != nil {
return fmt.Errorf("failed to count metrics: %v", err)
}
log.Printf("Debug: Total metrics in database: %d", count)
if count == 0 {
log.Printf("Debug: Metrics table is empty")
return nil
}
// Check timestamp range of stored metrics
var minTs, maxTs int64
err = s.db.QueryRowContext(ctx, "SELECT MIN(timestamp), MAX(timestamp) FROM metrics").Scan(&minTs, &maxTs)
if err != nil {
return fmt.Errorf("failed to get timestamp range: %v", err)
}
log.Printf("Debug: Metrics timestamp range: %v to %v",
time.UnixMilli(minTs),
time.UnixMilli(maxTs))
// Sample recent metrics for verification
rows, err := s.db.QueryContext(ctx, `
SELECT timestamp, service_time, db_time, cache_time,
db_rows_read, db_rows_written, db_total_rows,
cache_hits, cache_misses
FROM metrics
ORDER BY timestamp DESC
LIMIT 5
`)
if err != nil {
return fmt.Errorf("failed to query recent metrics: %v", err)
}
defer rows.Close()
log.Printf("Debug: Most recent metrics:")
for rows.Next() {
var p DataPoint
if err := rows.Scan(
&p.Timestamp, &p.ServiceTime, &p.DBTime, &p.CacheTime,
&p.DBRowsRead, &p.DBRowsWritten, &p.DBTotalRows,
&p.CacheHits, &p.CacheMisses,
); err != nil {
return fmt.Errorf("failed to scan row: %v", err)
}
log.Printf("Time: %v, Service: %.2fms, DB: %.2fms, Cache: %.2fms, "+
"Reads: %d, Writes: %d, Total: %d, Hits: %d, Misses: %d",
time.UnixMilli(p.Timestamp),
p.ServiceTime,
p.DBTime,
p.CacheTime,
p.DBRowsRead,
p.DBRowsWritten,
p.DBTotalRows,
p.CacheHits,
p.CacheMisses)
}
return rows.Err()
}