package dbstore import ( "context" "database/sql" "fmt" "time" "git.linuxforward.com/byop/byop-engine/models" ) // CreateBuildJob creates a new build job record in the database. func (s *SQLiteStore) CreateBuildJob(ctx context.Context, job *models.BuildJob) error { // Ensure CreatedAt and UpdatedAt are set now := time.Now() job.CreatedAt = now job.UpdatedAt = now query := ` INSERT INTO build_jobs ( component_id, request_id, source_url, version, status, image_name, image_tag, full_image_uri, registry_url, registry_user, registry_password, build_context, dockerfile, dockerfile_content, no_cache, build_args, logs, error_message, requested_at, started_at, finished_at, worker_node_id, created_at, updated_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)` stmt, err := s.db.PrepareContext(ctx, query) if err != nil { return fmt.Errorf("failed to prepare statement for CreateBuildJob: %w", err) } defer stmt.Close() res, err := stmt.ExecContext(ctx, job.ComponentID, job.RequestID, job.SourceURL, job.Version, job.Status, job.ImageName, job.ImageTag, job.FullImageURI, job.RegistryURL, job.RegistryUser, job.RegistryPassword, job.BuildContext, job.Dockerfile, job.DockerfileContent, job.NoCache, job.BuildArgs, job.Logs, job.ErrorMessage, job.RequestedAt, job.StartedAt, job.FinishedAt, job.WorkerNodeID, job.CreatedAt, job.UpdatedAt, ) if err != nil { return fmt.Errorf("failed to execute statement for CreateBuildJob: %w", err) } id, err := res.LastInsertId() if err != nil { return fmt.Errorf("failed to get last insert ID for CreateBuildJob: %w", err) } job.ID = uint(id) return nil } // GetBuildJobByID retrieves a build job by its ID. func (s *SQLiteStore) GetBuildJobByID(ctx context.Context, id uint) (*models.BuildJob, error) { query := ` SELECT id, component_id, request_id, source_url, version, status, image_name, image_tag, full_image_uri, registry_url, registry_user, registry_password, build_context, dockerfile, dockerfile_content, no_cache, build_args, logs, error_message, requested_at, started_at, finished_at, worker_node_id, created_at, updated_at FROM build_jobs WHERE id = ?` row := s.db.QueryRowContext(ctx, query, id) job := &models.BuildJob{} var startedAt, finishedAt sql.NullTime err := row.Scan( &job.ID, &job.ComponentID, &job.RequestID, &job.SourceURL, &job.Version, &job.Status, &job.ImageName, &job.ImageTag, &job.FullImageURI, &job.RegistryURL, &job.RegistryUser, &job.RegistryPassword, &job.BuildContext, &job.Dockerfile, &job.DockerfileContent, &job.NoCache, &job.BuildArgs, &job.Logs, &job.ErrorMessage, &job.RequestedAt, &startedAt, &finishedAt, &job.WorkerNodeID, &job.CreatedAt, &job.UpdatedAt, ) if err != nil { if err == sql.ErrNoRows { return nil, fmt.Errorf("build job with ID %d not found", id) } return nil, fmt.Errorf("failed to scan build job: %w", err) } if startedAt.Valid { job.StartedAt = &startedAt.Time } if finishedAt.Valid { job.FinishedAt = &finishedAt.Time } return job, nil } // UpdateBuildJob updates an existing build job record in the database. func (s *SQLiteStore) UpdateBuildJob(ctx context.Context, job *models.BuildJob) error { job.UpdatedAt = time.Now() query := ` UPDATE build_jobs SET component_id = ?, request_id = ?, source_url = ?, version = ?, status = ?, image_name = ?, image_tag = ?, full_image_uri = ?, registry_url = ?, registry_user = ?, registry_password = ?, build_context = ?, dockerfile = ?, no_cache = ?, build_args = ?, logs = ?, error_message = ?, requested_at = ?, started_at = ?, finished_at = ?, worker_node_id = ?, updated_at = ? WHERE id = ?` stmt, err := s.db.PrepareContext(ctx, query) if err != nil { return fmt.Errorf("failed to prepare statement for UpdateBuildJob: %w", err) } defer stmt.Close() _, err = stmt.ExecContext(ctx, job.ComponentID, job.RequestID, job.SourceURL, job.Version, job.Status, job.ImageName, job.ImageTag, job.FullImageURI, job.RegistryURL, job.RegistryUser, job.RegistryPassword, job.BuildContext, job.Dockerfile, job.NoCache, job.BuildArgs, job.Logs, job.ErrorMessage, job.RequestedAt, job.StartedAt, job.FinishedAt, job.WorkerNodeID, job.UpdatedAt, job.ID, ) if err != nil { return fmt.Errorf("failed to execute statement for UpdateBuildJob: %w", err) } return nil } // UpdateBuildJobStatus updates the status, error message, and relevant timestamps of a build job. func (s *SQLiteStore) UpdateBuildJobStatus(ctx context.Context, id uint, status models.BuildStatus, errorMessage string) error { now := time.Now() var startedAtExpr, finishedAtExpr string var args []interface{} baseQuery := "UPDATE build_jobs SET status = ?, error_message = ?, updated_at = ?" args = append(args, status, errorMessage, now) switch status { case models.BuildStatusFetching, models.BuildStatusBuilding, models.BuildStatusPushing: startedAtExpr = ", started_at = COALESCE(started_at, ?)" args = append(args, now) case models.BuildStatusSuccess, models.BuildStatusFailed, models.BuildStatusCancelled: startedAtExpr = ", started_at = COALESCE(started_at, ?)" finishedAtExpr = ", finished_at = ?" args = append(args, now, now) } finalQuery := baseQuery + startedAtExpr + finishedAtExpr + " WHERE id = ?" args = append(args, id) stmt, err := s.db.PrepareContext(ctx, finalQuery) if err != nil { return fmt.Errorf("failed to prepare statement for UpdateBuildJobStatus: %w", err) } defer stmt.Close() _, err = stmt.ExecContext(ctx, args...) if err != nil { return fmt.Errorf("failed to execute statement for UpdateBuildJobStatus: %w", err) } return nil } // AppendBuildJobLog appends a new log entry to the build job's logs. func (s *SQLiteStore) AppendBuildJobLog(ctx context.Context, id uint, logMessage string) error { tx, err := s.db.BeginTx(ctx, nil) if err != nil { return fmt.Errorf("failed to begin transaction for AppendBuildJobLog: %w", err) } defer tx.Rollback() var currentLogs string querySelect := "SELECT logs FROM build_jobs WHERE id = ?" err = tx.QueryRowContext(ctx, querySelect, id).Scan(¤tLogs) if err != nil { if err == sql.ErrNoRows { return fmt.Errorf("build job with ID %d not found for AppendBuildJobLog", id) } return fmt.Errorf("failed to query current logs for AppendBuildJobLog: %w", err) } newLogEntry := fmt.Sprintf("%s: %s", time.Now().Format(time.RFC3339Nano), logMessage) var updatedLogs string if currentLogs == "" { updatedLogs = newLogEntry } else { updatedLogs = currentLogs + "\n" + newLogEntry } queryUpdate := "UPDATE build_jobs SET logs = ?, updated_at = ? WHERE id = ?" stmt, err := tx.PrepareContext(ctx, queryUpdate) if err != nil { return fmt.Errorf("failed to prepare update statement for AppendBuildJobLog: %w", err) } defer stmt.Close() _, err = stmt.ExecContext(ctx, updatedLogs, time.Now(), id) if err != nil { return fmt.Errorf("failed to execute update statement for AppendBuildJobLog: %w", err) } return tx.Commit() } // GetQueuedBuildJobs retrieves a list of build jobs that are in the 'pending' status, // ordered by their request time. func (s *SQLiteStore) GetQueuedBuildJobs(ctx context.Context, limit int) ([]models.BuildJob, error) { query := ` SELECT id, component_id, request_id, source_url, version, status, image_name, image_tag, full_image_uri, registry_url, registry_user, registry_password, build_context, dockerfile, dockerfile_content, no_cache, build_args, logs, error_message, requested_at, started_at, finished_at, worker_node_id, created_at, updated_at FROM build_jobs WHERE status = ? ORDER BY requested_at ASC LIMIT ?` rows, err := s.db.QueryContext(ctx, query, models.BuildStatusPending, limit) if err != nil { return nil, fmt.Errorf("failed to query queued build jobs: %w", err) } defer rows.Close() var jobs []models.BuildJob for rows.Next() { job := models.BuildJob{} var startedAt, finishedAt sql.NullTime err := rows.Scan( &job.ID, &job.ComponentID, &job.RequestID, &job.SourceURL, &job.Version, &job.Status, &job.ImageName, &job.ImageTag, &job.FullImageURI, &job.RegistryURL, &job.RegistryUser, &job.RegistryPassword, &job.BuildContext, &job.Dockerfile, &job.DockerfileContent, &job.NoCache, &job.BuildArgs, &job.Logs, &job.ErrorMessage, &job.RequestedAt, &startedAt, &finishedAt, &job.WorkerNodeID, &job.CreatedAt, &job.UpdatedAt, ) if err != nil { return nil, fmt.Errorf("failed to scan queued build job: %w", err) } if startedAt.Valid { job.StartedAt = &startedAt.Time } if finishedAt.Valid { job.FinishedAt = &finishedAt.Time } jobs = append(jobs, job) } if err = rows.Err(); err != nil { return nil, fmt.Errorf("error iterating queued build jobs: %w", err) } return jobs, nil } // GetBuildJobsByComponentID retrieves all build jobs for a specific application with pagination. func (s *SQLiteStore) GetBuildJobsByComponentID(ctx context.Context, componentID uint, page, pageSize int) ([]models.BuildJob, int64, error) { var total int64 countQuery := "SELECT COUNT(*) FROM build_jobs WHERE component_id = ?" err := s.db.QueryRowContext(ctx, countQuery, componentID).Scan(&total) if err != nil { return nil, 0, fmt.Errorf("failed to count build jobs by component ID: %w", err) } if total == 0 { return []models.BuildJob{}, 0, nil } offset := (page - 1) * pageSize query := ` SELECT id, component_id, request_id, source_url, version, status, image_name, image_tag, full_image_uri, registry_url, registry_user, registry_password, build_context, dockerfile, dockerfile_content, no_cache, build_args, logs, error_message, requested_at, started_at, finished_at, worker_node_id, created_at, updated_at FROM build_jobs WHERE component_id = ? ORDER BY requested_at DESC LIMIT ? OFFSET ?` rows, err := s.db.QueryContext(ctx, query, componentID, pageSize, offset) if err != nil { return nil, 0, fmt.Errorf("failed to query build jobs by component ID: %w", err) } defer rows.Close() var jobs []models.BuildJob for rows.Next() { job := models.BuildJob{} var startedAt, finishedAt sql.NullTime err := rows.Scan( &job.ID, &job.ComponentID, &job.RequestID, &job.SourceURL, &job.Version, &job.Status, &job.ImageName, &job.ImageTag, &job.FullImageURI, &job.RegistryURL, &job.RegistryUser, &job.RegistryPassword, &job.BuildContext, &job.Dockerfile, &job.DockerfileContent, &job.NoCache, &job.BuildArgs, &job.Logs, &job.ErrorMessage, &job.RequestedAt, &startedAt, &finishedAt, &job.WorkerNodeID, &job.CreatedAt, &job.UpdatedAt, ) if err != nil { return nil, 0, fmt.Errorf("failed to scan build job by component ID: %w", err) } if startedAt.Valid { job.StartedAt = &startedAt.Time } if finishedAt.Valid { job.FinishedAt = &finishedAt.Time } jobs = append(jobs, job) } if err = rows.Err(); err != nil { return nil, 0, fmt.Errorf("error iterating build jobs by component ID: %w", err) } return jobs, total, nil } // Helper to marshal map to JSON string for BuildArgs, if needed before calling Create/Update. // This is more of a service-layer concern or model method. /* func marshalBuildArgs(args map[string]string) (string, error) { if args == nil { return "{}", nil // Or "null" or "" depending on preference for empty args } bytes, err := json.Marshal(args) if err != nil { return "", err } return string(bytes), nil } // Helper to unmarshal JSON string to map for BuildArgs, if needed after fetching. // This is more of a service-layer concern or model method. func unmarshalBuildArgs(argsStr string) (map[string]string, error) { if strings.TrimSpace(argsStr) == "" || argsStr == "null" { return make(map[string]string), nil } var args map[string]string err := json.Unmarshal([]byte(argsStr), &args) if err != nil { return nil, err } return args, nil } */