Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,7 @@
.env
PLAN.md
issues/
.DS_Store
.DS_Store
branch_structure.json
temp_auto_push.bat
temp_interactive_push.bat
156 changes: 50 additions & 106 deletions cmd/runner/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,156 +2,100 @@ package main

import (
"context"
"database/sql"
"encoding/json"
"log"
"os"
"os/signal"
"strconv"
"sync"
"syscall"
"time"

"github.com/jackc/pgx/v5/pgxpool"
_ "github.com/jackc/pgx/v5/stdlib"
"github.com/redis/go-redis/v9"

"CodeSCE/internal/core/models"
"CodeSCE/internal/db"
"CodeSCE/internal/execution"
"CodeSCE/internal/services"
"CodeSCE/redisclient"
)

const runnerGroup = "runners"

func main() {
// connect to postgres
dsn := os.Getenv("DATABASE_URL")
if dsn == "" {
log.Fatal("DATABASE_URL environment variable is required")
}
database, err := sql.Open("pgx", dsn)
database, err := pgxpool.New(context.Background(), dsn)
if err != nil {
log.Fatalf("failed to open database: %v", err)
log.Fatalf("failed to create postgres pool: %v", err)
}
defer database.Close()
if err := database.Ping(); err != nil {
if err := database.Ping(context.Background()); err != nil {
log.Fatalf("failed to ping database: %v", err)
}
if err := db.InitSchema(database); err != nil {
log.Fatalf("failed to initialize schema: %v", err)
}

// connect to redis
redisURL := os.Getenv("REDIS_URL")
if redisURL == "" {
log.Fatal("REDIS_URL environment variable is required")
}
if err := redisclient.Init(); err != nil {
log.Fatalf("redis fail connect: %v", err)
}
defer redisclient.Close()

// creates context, cancels on kill signals
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()

rdb := redisclient.GetClient()

// set up consumer group for jobs stream
hostname, _ := os.Hostname()
consumer := "runner-" + hostname

err = rdb.XGroupCreateMkStream(ctx, services.JobsStream, runnerGroup, "0").Err()
if err != nil && err.Error() != "BUSYGROUP Consumer Group name already exists" {
log.Printf("warning: could not create jobs consumer group: %v", err)
}

log.Printf("code runner started (consumer=%s), waiting for jobs...", consumer)

for {
select {
case <-ctx.Done():
log.Println("runner shutting down")
return
default:
workerCount := 4
if raw := os.Getenv("WORKER_COUNT"); raw != "" {
v, err := strconv.Atoi(raw)
if err != nil || v <= 0 {
log.Fatalf("invalid WORKER_COUNT: %q", raw)
}
workerCount = v
}

streams, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: runnerGroup,
Consumer: consumer,
Streams: []string{services.JobsStream, ">"},
Count: 1,
Block: 5 * time.Second,
}).Result()

executionTimeout := 10 * time.Second
if raw := os.Getenv("EXECUTION_TIMEOUT"); raw != "" {
d, err := time.ParseDuration(raw)
if err != nil {
if err == redis.Nil || ctx.Err() != nil {
continue
}
log.Printf("error reading jobs stream: %v", err)
time.Sleep(1 * time.Second)
continue
}

//iterates over messages and processes each job
for _, stream := range streams {
for _, msg := range stream.Messages {
processJob(ctx, rdb, msg)
}
log.Fatalf("invalid EXECUTION_TIMEOUT: %q", raw)
}
executionTimeout = d
}
}

func processJob(ctx context.Context, rdb *redis.Client, msg redis.XMessage) {
data, ok := msg.Values["data"].(string)
if !ok {
log.Printf("invalid job message: %s", msg.ID)
rdb.XAck(ctx, services.JobsStream, runnerGroup, msg.ID)
return
sandboxMemoryLimit := os.Getenv("SANDBOX_MEMORY_LIMIT")
if sandboxMemoryLimit == "" {
sandboxMemoryLimit = "256m"
}

var job models.ExecuteJob
if err := json.Unmarshal([]byte(data), &job); err != nil {
log.Printf("failed to unmarshal job: %v", err)
rdb.XAck(ctx, services.JobsStream, runnerGroup, msg.ID)
return
}
// creates context, cancels on kill signals
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()

log.Printf("executing job %s: %s for room %s", job.JobID, job.Language, job.RoomID)
var wg sync.WaitGroup

result := models.ExecuteResult{
JobID: job.JobID,
RoomID: job.RoomID,
UserID: job.UserID,
}
// Run method in execution/sandbox.go look like
// Run(ctx context.Context, language string, sourceCode string, stdin string) (*Result, error)
execResult, err := execution.Run(ctx, job.Language, job.Code, "")
if err != nil {
result.Stdout = ""
result.Stderr = err.Error()
result.Code = -1
} else {
result.Stdout = execResult.Stdout
result.Stderr = execResult.Stderr
result.Code = execResult.Code
}
// start worker pool
for i := 0; i < workerCount; i++ {
worker := &execution.Worker{
ID: i + 1,
Redis: rdb,
DB: database,
ExecutionTimeout: executionTimeout,
SandboxMemoryLimit: sandboxMemoryLimit,
}

// result back to Redis
resultData, err := json.Marshal(result)
if err != nil {
log.Printf("failed to marshal result for job %s: %v", job.JobID, err)
rdb.XAck(ctx, services.JobsStream, runnerGroup, msg.ID)
return
wg.Add(1)
go func(w *execution.Worker) {
defer wg.Done()
w.Run(ctx)
}(worker)
}

if err := rdb.XAdd(ctx, &redis.XAddArgs{
Stream: services.ResultsStream,
MaxLen: 1000,
Approx: true,
Values: map[string]interface{}{
"data": string(resultData),
},
}).Err(); err != nil {
log.Printf("failed to publish result for job %s: %v", job.JobID, err)
}
// block until shutdown signal
<-ctx.Done()
log.Println("runner shutting down, waiting for in-flight workers to finish...")

// wait for workers to drain and exit
wg.Wait()

rdb.XAck(ctx, services.JobsStream, runnerGroup, msg.ID)
log.Println("runner shutdown complete")

log.Printf("completed job %s (exit code: %d)", job.JobID, result.Code)
}
6 changes: 5 additions & 1 deletion frontend/tailwind.config.js

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

40 changes: 40 additions & 0 deletions internal/execution/enqueue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package execution

import (
"context"
"encoding/json"
"fmt"

"github.com/redis/go-redis/v9"
)

const SubmissionsQueue = "codesce:submissions"

// same as worker.go Job
type EnqueueJob struct {
SubmissionID int64 `json:"submission_id"`
QuestionID int64 `json:"question_id"`
Language string `json:"language"`
SourceCode string `json:"source_code"`
TestCases []TestCase `json:"test_cases"`
}

type TestCase struct {
Input string `json:"input"`
ExpectedOutput string `json:"expected_output"`
Score int `json:"score"`
}

// push a job onto redis queue
func Enqueue(ctx context.Context, rdb *redis.Client, job EnqueueJob) error {
data, err := json.Marshal(job)
if err != nil {
return fmt.Errorf("marshal job: %w", err)
}

if err := rdb.LPush(ctx, SubmissionsQueue, data).Err(); err != nil {
return fmt.Errorf("enqueue job: %w", err)
}

return nil
}
Loading