Fix runner logic and add git clone step

This commit is contained in:
2022-10-17 01:15:09 -06:00
parent c21e7d5d6b
commit f48872c1b3
3 changed files with 110 additions and 87 deletions
+10 -8
View File
@@ -6,10 +6,11 @@ import (
) )
type Config struct { type Config struct {
ServerUrl string ServerUrl string
Id string Id string
Secret string WorkingDir string
Tags []string Secret string
Tags []string
} }
func GetConfig() (config.Config[Config], bool, error) { func GetConfig() (config.Config[Config], bool, error) {
@@ -17,10 +18,11 @@ func GetConfig() (config.Config[Config], bool, error) {
Name: "cursorius", Name: "cursorius",
Filename: "runner", Filename: "runner",
Config: Config{ Config: Config{
ServerUrl: "FILL IN", ServerUrl: "FILL IN",
Id: "FILL IN", Id: "FILL IN",
Secret: "FILL IN", Secret: "FILL IN",
Tags: []string{}, WorkingDir: "FILL IN",
Tags: []string{},
}, },
} }
+12 -11
View File
@@ -4,6 +4,7 @@ import (
"context" "context"
"fmt" "fmt"
"git.ohea.xyz/cursorius/runner/config" "git.ohea.xyz/cursorius/runner/config"
"git.ohea.xyz/cursorius/runner/runner"
"github.com/op/go-logging" "github.com/op/go-logging"
"nhooyr.io/websocket" "nhooyr.io/websocket"
"nhooyr.io/websocket/wsjson" "nhooyr.io/websocket/wsjson"
@@ -19,11 +20,6 @@ type Register struct {
Tags []string Tags []string
} }
type Job struct {
URL *string
Folder *string
}
func main() { func main() {
var format = logging.MustStringFormatter( var format = logging.MustStringFormatter(
@@ -60,26 +56,31 @@ func main() {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute) ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel() defer cancel()
c, _, err := websocket.Dial(ctx, connectString, nil) conn, _, err := websocket.Dial(ctx, connectString, nil)
if err != nil { if err != nil {
log.Fatalf("Could not connect to server at \"%v\", %v", connectString, err) log.Fatalf("Could not connect to server at \"%v\", %v", connectString, err)
os.Exit(1) os.Exit(1)
} }
defer c.Close(websocket.StatusInternalError, "the sky is falling") defer conn.Close(websocket.StatusInternalError, "the sky is falling")
err = wsjson.Write(ctx, c, registrationData) err = wsjson.Write(ctx, conn, registrationData)
if err != nil { if err != nil {
log.Fatalf("Could not send registration information to server: %v", err) log.Fatalf("Could not send registration information to server: %v", err)
os.Exit(1) os.Exit(1)
} }
for { for {
var job Job var job runner.Job
err = wsjson.Read(ctx, c, &job) err = wsjson.Read(ctx, conn, &job)
if err != nil { if err != nil {
log.Fatalf("Could not receive from connection: %v", err) log.Fatalf("Could not receive from connection: %v", err)
continue continue
} }
log.Infof("Received job from server: %v", *job.URL) log.Infof("Received job from server: id: %v, URL: %v", job.Id, job.URL)
err = runner.RunJob(job, configData.Config.WorkingDir)
if err != nil {
log.Errorf("Could not run job: %v", err)
}
} }
} }
+88 -68
View File
@@ -4,81 +4,101 @@ import (
"bytes" "bytes"
"context" "context"
"fmt" "fmt"
"os"
"os/exec"
"path/filepath"
"github.com/docker/docker/api/types" "github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/mount"
"github.com/docker/docker/client" "github.com/docker/docker/client"
"github.com/docker/docker/pkg/stdcopy" "github.com/docker/docker/pkg/stdcopy"
"github.com/op/go-logging" "github.com/op/go-logging"
"os"
) )
var log = logging.MustGetLogger("cursorius-server") var log = logging.MustGetLogger("cursorius-server")
func runJob(job config.Job) { type Job struct {
if job.Folder != nil { Id string
log.Debugf("Job configured with folder \"%v\"", *job.Folder) URL string
}
cli, err := client.NewClientWithOpts(client.FromEnv)
if err != nil { func RunJob(job Job, workingDir string) error {
log.Errorf("Could not create docker clien: %va", err) jobFolder := filepath.Join(workingDir, job.Id)
return
} log.Debugf("Job %v configured with URL \"%v\"", job.Id, job.URL)
hostname, err := os.Hostname() log.Debugf("Job %v configured with folder \"%v\"", job.Id, jobFolder)
if err != nil {
log.Errorf("Could not get hostname: %v", err) err := os.MkdirAll(jobFolder, 0755)
return if err != nil {
} return fmt.Errorf("could not create working directory for job %v: %v", job.Id, err)
}
ctx := context.Background()
log.Infof("Cloning source from URL %v", job.URL)
resp, err := cli.ContainerCreate(ctx, cloneCmd := exec.Command("git", "clone", job.URL, jobFolder)
&container.Config{ output, err := cloneCmd.CombinedOutput()
Image: "cursorius:latest", if err != nil {
Cmd: []string{"/launcher.sh"}, log.Debugf("%s", output)
Tty: false, return fmt.Errorf("could not clone source: %v", err)
Env: []string{fmt.Sprintf("CURSORIUS_SRC_DIR=%s", *job.Folder)}, }
},
&container.HostConfig{ cli, err := client.NewClientWithOpts(client.FromEnv)
VolumesFrom: []string{hostname}, if err != nil {
}, return fmt.Errorf("Could not create docker client: %v", err)
nil, nil, "", }
)
if err != nil { ctx := context.Background()
log.Errorf("Could not create container: %v", err)
return resp, err := cli.ContainerCreate(ctx,
} &container.Config{
Image: "cursorius:latest",
if err := cli.ContainerStart(ctx, resp.ID, types.ContainerStartOptions{}); err != nil { Cmd: []string{"/launcher.sh"},
log.Errorf("Could not start container: %v", err) Tty: false,
return Env: []string{fmt.Sprintf("CURSORIUS_SRC_DIR=/job")},
} },
statusCh, errCh := cli.ContainerWait(ctx, resp.ID, container.WaitConditionNotRunning) // TODO: fix running the runner in docker (add VolumesFrom to HostConfig)
select { &container.HostConfig{
case err := <-errCh: Mounts: []mount.Mount{{
if err != nil { Source: jobFolder,
log.Errorf("Container returned error: %v", err) Target: "/job",
return ReadOnly: false,
} Consistency: mount.ConsistencyDefault,
case retCode := <-statusCh: }},
log.Debugf("Container finished running with return code: %v", retCode) },
} nil, nil, "",
)
out, err := cli.ContainerLogs(ctx, resp.ID, types.ContainerLogsOptions{ShowStdout: true, ShowStderr: true}) if err != nil {
if err != nil { return fmt.Errorf("could not create container: %v", err)
log.Errorf("Could not get container logs: %v", err) }
return
} log.Info("Launching container")
var stdOut bytes.Buffer if err := cli.ContainerStart(ctx, resp.ID, types.ContainerStartOptions{}); err != nil {
var stdErr bytes.Buffer return fmt.Errorf("could not start container: %v", err)
}
stdcopy.StdCopy(&stdOut, &stdErr, out) statusCh, errCh := cli.ContainerWait(ctx, resp.ID, container.WaitConditionNotRunning)
select {
log.Debugf("%s", stdOut.Bytes()) case err := <-errCh:
log.Debugf("%s", stdErr.Bytes()) if err != nil {
return fmt.Errorf("container returned error: %v", err)
} else if job.URL != nil { }
log.Debugf("Job configured with URL \"%v\"", *job.URL) case retCode := <-statusCh:
} log.Debugf("Container finished running with return code: %v", retCode)
}
out, err := cli.ContainerLogs(ctx, resp.ID, types.ContainerLogsOptions{ShowStdout: true, ShowStderr: true})
if err != nil {
return fmt.Errorf("could not get container logs: %v", err)
}
var stdOut bytes.Buffer
var stdErr bytes.Buffer
stdcopy.StdCopy(&stdOut, &stdErr, out)
log.Debugf("%s", stdOut.Bytes())
log.Debugf("%s", stdErr.Bytes())
return nil
} }