Working implementation of runner API (fixes #2)

This commit is contained in:
2023-01-09 00:53:34 -07:00
parent 6ea6c6f437
commit 71de335534
4 changed files with 136 additions and 23 deletions
+126 -22
View File
@@ -3,13 +3,19 @@ package main
import (
"context"
"fmt"
"git.ohea.xyz/cursorius/runner/config"
"git.ohea.xyz/cursorius/runner/jobrunner"
"github.com/op/go-logging"
"nhooyr.io/websocket"
"nhooyr.io/websocket/wsjson"
"io"
"os"
"os/exec"
"time"
runner_api "git.ohea.xyz/cursorius/runner-api/go/api/v2"
"git.ohea.xyz/cursorius/runner/config"
//"git.ohea.xyz/cursorius/runner/jobrunner"
"github.com/op/go-logging"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protoreflect"
"nhooyr.io/websocket"
)
var log = logging.MustGetLogger("cursorius-server")
@@ -45,16 +51,9 @@ func main() {
os.Exit(0)
}
registrationData := Register{
Secret: configData.Config.Secret,
Id: configData.Config.Id,
Tags: configData.Config.Tags,
}
connectString := fmt.Sprintf("ws://%v/runner", configData.Config.ServerUrl)
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
conn, _, err := websocket.Dial(ctx, connectString, nil)
if err != nil {
@@ -63,24 +62,129 @@ func main() {
}
defer conn.Close(websocket.StatusInternalError, "the sky is falling")
err = wsjson.Write(ctx, conn, registrationData)
cancel()
ctx = context.Background()
registrationProto := &runner_api.Register{}
registrationProto.Secret = configData.Config.Secret
registrationProto.Id = configData.Config.Id
registrationProto.Tags = configData.Config.Tags
err = sendProtoStruct(conn, registrationProto)
if err != nil {
log.Fatalf("Could not send registration information to server: %v", err)
log.Fatalf("Could not send registration proto: %v", err)
os.Exit(1)
}
for {
var job jobrunner.Job
err = wsjson.Read(ctx, conn, &job)
typ, r, err := conn.Read(ctx)
if err != nil {
log.Fatalf("Could not receive from connection: %v", err)
continue
log.Errorf("Could not read from runner websocket connection: %v", err)
log.Errorf("Disconnecting...")
return
}
if typ != websocket.MessageBinary {
log.Error("Got non binary message from runner, disconnecting...")
conn.Close(websocket.StatusUnsupportedData, "Requires binary data")
return
}
serverToRunnerMsg := &runner_api.ServerToRunnerMsg{}
if err := proto.Unmarshal(r, serverToRunnerMsg); err != nil {
log.Error("Could not parse registration message from runner, disconnection....")
conn.Close(websocket.StatusUnsupportedData, "Invalid message")
return
}
log.Infof("Received job from server: id: %v, URL: %v", job.Id, job.URL)
err = jobrunner.RunJob(job, configData.Config.WorkingDir)
if err != nil {
log.Errorf("Could not run job: %v", err)
switch x := serverToRunnerMsg.Msg.(type) {
case *runner_api.ServerToRunnerMsg_RunCommandMsg:
log.Debugf("Server asked to run the command %v", x.RunCommandMsg.Command)
returnCode, stdout, stderr, err := RunCommand(x.RunCommandMsg.Command, x.RunCommandMsg.Args)
if err != nil {
log.Errorf("Could not run command: %v", err)
os.Exit(1)
}
runnerToServerMsg := &runner_api.RunnerToServerMsg{
Msg: &runner_api.RunnerToServerMsg_RunCommandFinalResponseMsg{
RunCommandFinalResponseMsg: &runner_api.RunCommandFinalResponse{
ReturnCode: returnCode,
PartialResponse: &runner_api.RunCommandPartialResponse{
Stdout: stdout,
Stderr: stderr,
},
},
},
}
err = sendProtoStruct(conn, runnerToServerMsg)
if err != nil {
log.Errorf("Could not send results to server: %v", err)
os.Exit(1)
}
}
}
}
func sendProtoStruct(conn *websocket.Conn, p protoreflect.ProtoMessage) error {
protoOut, err := proto.Marshal(p)
if err != nil {
return fmt.Errorf("Could not marshal proto: %w", err)
}
ctx := context.Background()
if err := conn.Write(ctx, websocket.MessageBinary, protoOut); err != nil {
return fmt.Errorf("Could not send proto to websocket: %w", err)
}
return nil
}
func RunCommand(command string, args []string) (returnCode int64, stdout string, stderr string, err error) {
cmd := exec.Command(command, args...)
stdoutPipe, err := cmd.StdoutPipe()
if err != nil {
err = fmt.Errorf("Could not get stdout pipe: %w", err)
return
}
stderrPipe, err := cmd.StderrPipe()
if err != nil {
err = fmt.Errorf("Could not get stderr pipe: %w", err)
return
}
log.Infof("Running command: \"%v\"", command)
if err = cmd.Start(); err != nil {
err = fmt.Errorf("Could not start command: %w", err)
return
}
stdoutBytes, err := io.ReadAll(stdoutPipe)
if err != nil {
err = fmt.Errorf("Could not read stdout: %w", err)
return
}
stdout = string(stdoutBytes)
stderrBytes, err := io.ReadAll(stderrPipe)
if err != nil {
err = fmt.Errorf("Could not read stderr: %w", err)
return
}
stderr = string(stderrBytes)
if err = cmd.Wait(); err != nil {
switch x := err.(type) {
case *exec.ExitError:
returnCode = int64(x.ExitCode())
return
default:
err = fmt.Errorf("Could not wait for command: %w", err)
return
}
}
return
}