Files
server/pipeline_executor/pipeline_executor.go
T

183 lines
4.7 KiB
Go

package pipeline_executor
import (
"bytes"
"context"
"fmt"
"io"
"os"
"os/exec"
"path/filepath"
"git.ohea.xyz/cursorius/server/config"
"git.ohea.xyz/cursorius/server/database"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/mount"
"github.com/docker/docker/client"
"github.com/docker/docker/pkg/stdcopy"
"github.com/op/go-logging"
)
var log = logging.MustGetLogger("cursorius-server")
type PipelineExecution struct {
Pipeline database.Pipeline
Ref string
Run database.Run
}
func ExecutePipeline(pe PipelineExecution, db database.Database, pipelineConf config.PipelineConf) {
jobFolder := filepath.Join(pipelineConf.WorkingDir, pe.Pipeline.Id.String(), pe.Run.Id.String())
log.Debugf("Job %v configured with URL \"%v\"", pe.Pipeline.Name, pe.Pipeline.Url)
log.Debugf("Job %v configured with folder \"%v\"", pe.Pipeline.Name, jobFolder)
err := os.RemoveAll(jobFolder)
if err != nil {
log.Errorf("could not delete existing folder %v", jobFolder)
return
}
err = os.MkdirAll(jobFolder, 0755)
if err != nil {
log.Errorf("could not create working directory for job %v: %w", pe.Pipeline.Name, err)
return
}
log.Infof("Cloning source from URL %v", pe.Pipeline.Url)
// TODO: should I use go-git here instead of shelling out to raw git?
cloneCmd := exec.Command("git", "clone", pe.Pipeline.Url, jobFolder)
output, err := cloneCmd.CombinedOutput()
if err != nil {
log.Debugf("%s", output)
log.Errorf("could not clone source: %w", err)
return
}
cli, err := client.NewClientWithOpts(client.FromEnv)
if err != nil {
log.Errorf("Could not create docker client: %w", err)
return
}
log.Info("Source cloned successfully")
ctx := context.Background()
imageName := "git.ohea.xyz/cursorius/pipeline-api/cursorius-pipeline:v2"
log.Infof("Pulling image %v", imageName)
pullOutput, err := cli.ImagePull(ctx, imageName, types.ImagePullOptions{})
if err != nil {
log.Errorf("could not pull image %v: %w", imageName, err)
return
}
buf, err := io.ReadAll(pullOutput)
if err != nil {
log.Errorf("could not read from io.ReadCloser:, %w", err)
return
}
log.Infof("%s", buf)
err = pullOutput.Close()
if err != nil {
log.Errorf("could not close io.ReadCloser: %w", err)
return
}
log.Info("Image pulled sucessfully")
hostConfig := container.HostConfig{}
if pipelineConf.DockerNetwork != nil {
hostConfig.NetworkMode = container.NetworkMode(*pipelineConf.DockerNetwork)
}
if pipelineConf.MountConf.Type == config.Bind {
sourceDir := filepath.Join(pipelineConf.MountConf.Source, pe.Pipeline.Id.String(), pe.Run.Id.String())
hostConfig.Mounts = append(hostConfig.Mounts,
mount.Mount{
Type: mount.TypeBind,
Source: sourceDir,
Target: "/cursorius/src",
ReadOnly: false,
Consistency: mount.ConsistencyDefault,
},
)
} else if pipelineConf.MountConf.Type == config.Volume {
hostConfig.Mounts = append(hostConfig.Mounts,
mount.Mount{
Type: mount.TypeVolume,
Source: pipelineConf.MountConf.Source,
Target: "/cursorius/src",
ReadOnly: false,
Consistency: mount.ConsistencyDefault,
},
)
}
resp, err := cli.ContainerCreate(ctx,
&container.Config{
Image: imageName,
Tty: false,
Env: []string{
fmt.Sprintf("RUNID=%v", pe.Run.Id),
"CURSORIUS_SRC_DIR=/cursorius/src",
fmt.Sprintf("CUROSRIUS_SERVER_URL=%v", pipelineConf.AccessURL),
},
},
// TODO: fix running the runner in docker (add VolumesFrom to HostConfig)
&hostConfig,
nil, nil, "",
)
if err != nil {
log.Errorf("could not create container: %w", err)
return
}
log.Info("Launching container")
if err := cli.ContainerStart(ctx, resp.ID, types.ContainerStartOptions{}); err != nil {
log.Errorf("could not start container: %v", err)
return
}
statusCh, errCh := cli.ContainerWait(ctx, resp.ID, container.WaitConditionNotRunning)
select {
case err := <-errCh:
if err != nil {
log.Errorf("container returned error: %v", err)
return
}
case okBody := <-statusCh:
if okBody.Error != nil {
log.Errorf("Could not wait on container: %v", err)
return
} else {
log.Debugf("Container finished running with return code: %v", okBody.StatusCode)
pe.Run.Result = &okBody.StatusCode
}
}
pe.Run.InProgress = false
out, err := cli.ContainerLogs(ctx, resp.ID, types.ContainerLogsOptions{ShowStdout: true, ShowStderr: true})
if err != nil {
log.Errorf("could not get container logs: %w", err)
return
}
var stdOut bytes.Buffer
var stdErr bytes.Buffer
stdcopy.StdCopy(&stdOut, &stdErr, out)
pe.Run.Stdout = stdOut.Bytes()
pe.Run.Stderr = stdErr.Bytes()
db.UpdateRunResult(pe.Run)
return
}