From f48872c1b341d034e9ed0be9c71ac77cfd0c23bc Mon Sep 17 00:00:00 2001 From: restitux Date: Mon, 17 Oct 2022 01:15:09 -0600 Subject: [PATCH] Fix runner logic and add git clone step --- config/config.go | 18 +++--- main.go | 23 +++---- runner/runner.go | 156 ++++++++++++++++++++++++++--------------------- 3 files changed, 110 insertions(+), 87 deletions(-) diff --git a/config/config.go b/config/config.go index 8c7ef3a..53eca8c 100644 --- a/config/config.go +++ b/config/config.go @@ -6,10 +6,11 @@ import ( ) type Config struct { - ServerUrl string - Id string - Secret string - Tags []string + ServerUrl string + Id string + WorkingDir string + Secret string + Tags []string } func GetConfig() (config.Config[Config], bool, error) { @@ -17,10 +18,11 @@ func GetConfig() (config.Config[Config], bool, error) { Name: "cursorius", Filename: "runner", Config: Config{ - ServerUrl: "FILL IN", - Id: "FILL IN", - Secret: "FILL IN", - Tags: []string{}, + ServerUrl: "FILL IN", + Id: "FILL IN", + Secret: "FILL IN", + WorkingDir: "FILL IN", + Tags: []string{}, }, } diff --git a/main.go b/main.go index a7aad3f..3e2e7bb 100644 --- a/main.go +++ b/main.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "git.ohea.xyz/cursorius/runner/config" + "git.ohea.xyz/cursorius/runner/runner" "github.com/op/go-logging" "nhooyr.io/websocket" "nhooyr.io/websocket/wsjson" @@ -19,11 +20,6 @@ type Register struct { Tags []string } -type Job struct { - URL *string - Folder *string -} - func main() { var format = logging.MustStringFormatter( @@ -60,26 +56,31 @@ func main() { ctx, cancel := context.WithTimeout(context.Background(), time.Minute) defer cancel() - c, _, err := websocket.Dial(ctx, connectString, nil) + conn, _, err := websocket.Dial(ctx, connectString, nil) if err != nil { log.Fatalf("Could not connect to server at \"%v\", %v", connectString, err) os.Exit(1) } - defer c.Close(websocket.StatusInternalError, "the sky is falling") + defer conn.Close(websocket.StatusInternalError, "the sky is falling") - err = wsjson.Write(ctx, c, registrationData) + err = wsjson.Write(ctx, conn, registrationData) if err != nil { log.Fatalf("Could not send registration information to server: %v", err) os.Exit(1) } for { - var job Job - err = wsjson.Read(ctx, c, &job) + var job runner.Job + err = wsjson.Read(ctx, conn, &job) if err != nil { log.Fatalf("Could not receive from connection: %v", err) continue } - log.Infof("Received job from server: %v", *job.URL) + log.Infof("Received job from server: id: %v, URL: %v", job.Id, job.URL) + + err = runner.RunJob(job, configData.Config.WorkingDir) + if err != nil { + log.Errorf("Could not run job: %v", err) + } } } diff --git a/runner/runner.go b/runner/runner.go index e07b1eb..d9e341b 100644 --- a/runner/runner.go +++ b/runner/runner.go @@ -4,81 +4,101 @@ import ( "bytes" "context" "fmt" + "os" + "os/exec" + "path/filepath" + "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/container" + "github.com/docker/docker/api/types/mount" "github.com/docker/docker/client" "github.com/docker/docker/pkg/stdcopy" "github.com/op/go-logging" - "os" ) var log = logging.MustGetLogger("cursorius-server") -func runJob(job config.Job) { - if job.Folder != nil { - log.Debugf("Job configured with folder \"%v\"", *job.Folder) - - cli, err := client.NewClientWithOpts(client.FromEnv) - if err != nil { - log.Errorf("Could not create docker clien: %va", err) - return - } - - hostname, err := os.Hostname() - if err != nil { - log.Errorf("Could not get hostname: %v", err) - return - } - - ctx := context.Background() - - resp, err := cli.ContainerCreate(ctx, - &container.Config{ - Image: "cursorius:latest", - Cmd: []string{"/launcher.sh"}, - Tty: false, - Env: []string{fmt.Sprintf("CURSORIUS_SRC_DIR=%s", *job.Folder)}, - }, - &container.HostConfig{ - VolumesFrom: []string{hostname}, - }, - nil, nil, "", - ) - if err != nil { - log.Errorf("Could not create container: %v", err) - return - } - - if err := cli.ContainerStart(ctx, resp.ID, types.ContainerStartOptions{}); err != nil { - log.Errorf("Could not start container: %v", err) - return - } - statusCh, errCh := cli.ContainerWait(ctx, resp.ID, container.WaitConditionNotRunning) - select { - case err := <-errCh: - if err != nil { - log.Errorf("Container returned error: %v", err) - return - } - case retCode := <-statusCh: - log.Debugf("Container finished running with return code: %v", retCode) - } - - out, err := cli.ContainerLogs(ctx, resp.ID, types.ContainerLogsOptions{ShowStdout: true, ShowStderr: true}) - if err != nil { - log.Errorf("Could not get container logs: %v", err) - return - } - - var stdOut bytes.Buffer - var stdErr bytes.Buffer - - stdcopy.StdCopy(&stdOut, &stdErr, out) - - log.Debugf("%s", stdOut.Bytes()) - log.Debugf("%s", stdErr.Bytes()) - - } else if job.URL != nil { - log.Debugf("Job configured with URL \"%v\"", *job.URL) - } +type Job struct { + Id string + URL string +} + +func RunJob(job Job, workingDir string) error { + jobFolder := filepath.Join(workingDir, job.Id) + + log.Debugf("Job %v configured with URL \"%v\"", job.Id, job.URL) + + log.Debugf("Job %v configured with folder \"%v\"", job.Id, jobFolder) + + err := os.MkdirAll(jobFolder, 0755) + if err != nil { + return fmt.Errorf("could not create working directory for job %v: %v", job.Id, err) + } + + log.Infof("Cloning source from URL %v", job.URL) + cloneCmd := exec.Command("git", "clone", job.URL, jobFolder) + output, err := cloneCmd.CombinedOutput() + if err != nil { + log.Debugf("%s", output) + return fmt.Errorf("could not clone source: %v", err) + } + + cli, err := client.NewClientWithOpts(client.FromEnv) + if err != nil { + return fmt.Errorf("Could not create docker client: %v", err) + } + + ctx := context.Background() + + resp, err := cli.ContainerCreate(ctx, + &container.Config{ + Image: "cursorius:latest", + Cmd: []string{"/launcher.sh"}, + Tty: false, + Env: []string{fmt.Sprintf("CURSORIUS_SRC_DIR=/job")}, + }, + // TODO: fix running the runner in docker (add VolumesFrom to HostConfig) + &container.HostConfig{ + Mounts: []mount.Mount{{ + Source: jobFolder, + Target: "/job", + ReadOnly: false, + Consistency: mount.ConsistencyDefault, + }}, + }, + nil, nil, "", + ) + if err != nil { + return fmt.Errorf("could not create container: %v", err) + } + + log.Info("Launching container") + + if err := cli.ContainerStart(ctx, resp.ID, types.ContainerStartOptions{}); err != nil { + return fmt.Errorf("could not start container: %v", err) + } + statusCh, errCh := cli.ContainerWait(ctx, resp.ID, container.WaitConditionNotRunning) + select { + case err := <-errCh: + if err != nil { + return fmt.Errorf("container returned error: %v", err) + } + case retCode := <-statusCh: + log.Debugf("Container finished running with return code: %v", retCode) + } + + out, err := cli.ContainerLogs(ctx, resp.ID, types.ContainerLogsOptions{ShowStdout: true, ShowStderr: true}) + if err != nil { + return fmt.Errorf("could not get container logs: %v", err) + } + + var stdOut bytes.Buffer + var stdErr bytes.Buffer + + stdcopy.StdCopy(&stdOut, &stdErr, out) + + log.Debugf("%s", stdOut.Bytes()) + log.Debugf("%s", stdErr.Bytes()) + + return nil }