191 lines
4.8 KiB
Go
191 lines
4.8 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"os/exec"
|
|
"time"
|
|
|
|
runner_api "git.ohea.xyz/cursorius/runner-api/go/api/v2"
|
|
"git.ohea.xyz/cursorius/runner/config"
|
|
|
|
//"git.ohea.xyz/cursorius/runner/jobrunner"
|
|
"github.com/op/go-logging"
|
|
"google.golang.org/protobuf/proto"
|
|
"google.golang.org/protobuf/reflect/protoreflect"
|
|
"nhooyr.io/websocket"
|
|
)
|
|
|
|
var log = logging.MustGetLogger("cursorius-server")
|
|
|
|
type Register struct {
|
|
Secret string
|
|
Id string
|
|
Tags []string
|
|
}
|
|
|
|
func main() {
|
|
|
|
var format = logging.MustStringFormatter(
|
|
`%{color}%{time:15:04:05.000} %{level:.4s}:%{color:reset} %{message}`,
|
|
)
|
|
|
|
backend := logging.NewLogBackend(os.Stderr, "", 0)
|
|
backendFormatter := logging.NewBackendFormatter(backend, format)
|
|
backendLeveled := logging.AddModuleLevel(backendFormatter)
|
|
backendLeveled.SetLevel(logging.DEBUG, "")
|
|
|
|
logging.SetBackend(backendLeveled)
|
|
|
|
log.Info("Starting cursorius-runner")
|
|
|
|
configData, isNew, err := config.GetConfig()
|
|
if err != nil {
|
|
log.Fatalf("Could not get configuration: %v", err)
|
|
os.Exit(1)
|
|
}
|
|
if isNew {
|
|
log.Infof("New config file created at %s, please update and relaunch", *configData.ConfigFile)
|
|
os.Exit(0)
|
|
}
|
|
|
|
connectString := fmt.Sprintf("ws://%v/runner", configData.Config.ServerUrl)
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
|
|
|
|
conn, _, err := websocket.Dial(ctx, connectString, nil)
|
|
if err != nil {
|
|
log.Fatalf("Could not connect to server at \"%v\", %v", connectString, err)
|
|
os.Exit(1)
|
|
}
|
|
defer conn.Close(websocket.StatusInternalError, "the sky is falling")
|
|
|
|
cancel()
|
|
|
|
ctx = context.Background()
|
|
|
|
registrationProto := &runner_api.Register{}
|
|
registrationProto.Secret = configData.Config.Secret
|
|
registrationProto.Id = configData.Config.Id
|
|
registrationProto.Tags = configData.Config.Tags
|
|
|
|
err = sendProtoStruct(conn, registrationProto)
|
|
if err != nil {
|
|
log.Fatalf("Could not send registration proto: %v", err)
|
|
os.Exit(1)
|
|
}
|
|
|
|
for {
|
|
typ, r, err := conn.Read(ctx)
|
|
if err != nil {
|
|
log.Errorf("Could not read from runner websocket connection: %v", err)
|
|
log.Errorf("Disconnecting...")
|
|
return
|
|
}
|
|
if typ != websocket.MessageBinary {
|
|
log.Error("Got non binary message from runner, disconnecting...")
|
|
conn.Close(websocket.StatusUnsupportedData, "Requires binary data")
|
|
return
|
|
}
|
|
serverToRunnerMsg := &runner_api.ServerToRunnerMsg{}
|
|
if err := proto.Unmarshal(r, serverToRunnerMsg); err != nil {
|
|
log.Error("Could not parse registration message from runner, disconnection....")
|
|
conn.Close(websocket.StatusUnsupportedData, "Invalid message")
|
|
return
|
|
}
|
|
|
|
switch x := serverToRunnerMsg.Msg.(type) {
|
|
case *runner_api.ServerToRunnerMsg_RunCommandMsg:
|
|
log.Debugf("Server asked to run the command %v", x.RunCommandMsg.Command)
|
|
returnCode, stdout, stderr, err := RunCommand(x.RunCommandMsg.Command, x.RunCommandMsg.Args)
|
|
if err != nil {
|
|
log.Errorf("Could not run command: %v", err)
|
|
os.Exit(1)
|
|
}
|
|
|
|
runnerToServerMsg := &runner_api.RunnerToServerMsg{
|
|
Msg: &runner_api.RunnerToServerMsg_RunCommandFinalResponseMsg{
|
|
RunCommandFinalResponseMsg: &runner_api.RunCommandFinalResponse{
|
|
ReturnCode: returnCode,
|
|
PartialResponse: &runner_api.RunCommandPartialResponse{
|
|
Stdout: stdout,
|
|
Stderr: stderr,
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
err = sendProtoStruct(conn, runnerToServerMsg)
|
|
if err != nil {
|
|
log.Errorf("Could not send results to server: %v", err)
|
|
os.Exit(1)
|
|
}
|
|
|
|
}
|
|
}
|
|
}
|
|
|
|
func sendProtoStruct(conn *websocket.Conn, p protoreflect.ProtoMessage) error {
|
|
protoOut, err := proto.Marshal(p)
|
|
if err != nil {
|
|
return fmt.Errorf("Could not marshal proto: %w", err)
|
|
}
|
|
|
|
ctx := context.Background()
|
|
|
|
if err := conn.Write(ctx, websocket.MessageBinary, protoOut); err != nil {
|
|
return fmt.Errorf("Could not send proto to websocket: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func RunCommand(command string, args []string) (returnCode int64, stdout string, stderr string, err error) {
|
|
cmd := exec.Command(command, args...)
|
|
stdoutPipe, err := cmd.StdoutPipe()
|
|
if err != nil {
|
|
err = fmt.Errorf("Could not get stdout pipe: %w", err)
|
|
return
|
|
}
|
|
|
|
stderrPipe, err := cmd.StderrPipe()
|
|
if err != nil {
|
|
err = fmt.Errorf("Could not get stderr pipe: %w", err)
|
|
return
|
|
}
|
|
|
|
log.Infof("Running command: \"%v\"", command)
|
|
if err = cmd.Start(); err != nil {
|
|
err = fmt.Errorf("Could not start command: %w", err)
|
|
return
|
|
}
|
|
|
|
stdoutBytes, err := io.ReadAll(stdoutPipe)
|
|
if err != nil {
|
|
err = fmt.Errorf("Could not read stdout: %w", err)
|
|
return
|
|
}
|
|
stdout = string(stdoutBytes)
|
|
|
|
stderrBytes, err := io.ReadAll(stderrPipe)
|
|
if err != nil {
|
|
err = fmt.Errorf("Could not read stderr: %w", err)
|
|
return
|
|
}
|
|
stderr = string(stderrBytes)
|
|
|
|
if err = cmd.Wait(); err != nil {
|
|
switch x := err.(type) {
|
|
case *exec.ExitError:
|
|
returnCode = int64(x.ExitCode())
|
|
return
|
|
default:
|
|
err = fmt.Errorf("Could not wait for command: %w", err)
|
|
return
|
|
}
|
|
}
|
|
return
|
|
}
|