Implment worker api (fixes #3)
This commit is contained in:
+73
-5
@@ -1,8 +1,15 @@
|
||||
package runnermanager
|
||||
|
||||
import "nhooyr.io/websocket"
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
//var log = logging.MustGetLogger("cursorius-server")
|
||||
"google.golang.org/protobuf/proto"
|
||||
"google.golang.org/protobuf/reflect/protoreflect"
|
||||
"nhooyr.io/websocket"
|
||||
|
||||
runner_api "git.ohea.xyz/cursorius/runner-api/go/api/v2"
|
||||
)
|
||||
|
||||
type RunnerData struct {
|
||||
msgType websocket.MessageType
|
||||
@@ -13,7 +20,7 @@ type Runner struct {
|
||||
id string
|
||||
tags []string
|
||||
conn *websocket.Conn
|
||||
receiveChan chan RunnerData
|
||||
receiveChan chan []byte
|
||||
running bool
|
||||
}
|
||||
|
||||
@@ -21,6 +28,67 @@ func (r *Runner) Id() string {
|
||||
return r.id
|
||||
}
|
||||
|
||||
func (r *Runner) RunCommand(string) (int64, string, string, error) {
|
||||
return 0, "", "", nil
|
||||
func (r *Runner) Release() {
|
||||
r.running = false
|
||||
}
|
||||
|
||||
func (r *Runner) RunCommand(cmd string, args []string) (returnCode int64, stdout string, stderr string, err error) {
|
||||
|
||||
// Write RunCommand message to client
|
||||
serverToRunnerMsg := &runner_api.ServerToRunnerMsg{
|
||||
Msg: &runner_api.ServerToRunnerMsg_RunCommandMsg{
|
||||
RunCommandMsg: &runner_api.RunCommand{
|
||||
Command: cmd,
|
||||
Args: args,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
err = r.sendProtoStruct(serverToRunnerMsg)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("Could not send command to client: %w", err)
|
||||
return
|
||||
}
|
||||
|
||||
for {
|
||||
// Read RunCommandFinalResponse message from client
|
||||
data, ok := <-r.receiveChan
|
||||
if !ok {
|
||||
err = fmt.Errorf("Channel is closed on runner")
|
||||
return
|
||||
}
|
||||
|
||||
runnerToServerMsg := &runner_api.RunnerToServerMsg{}
|
||||
if err = proto.Unmarshal(data, runnerToServerMsg); err != nil {
|
||||
err = fmt.Errorf("Could not parse RunCommand response: %w", err)
|
||||
r.conn.Close(websocket.StatusUnsupportedData, "Invalid message")
|
||||
return
|
||||
}
|
||||
|
||||
switch x := runnerToServerMsg.Msg.(type) {
|
||||
case *runner_api.RunnerToServerMsg_RunCommandPartialResponseMsg:
|
||||
stdout += x.RunCommandPartialResponseMsg.Stdout
|
||||
stderr += x.RunCommandPartialResponseMsg.Stderr
|
||||
case *runner_api.RunnerToServerMsg_RunCommandFinalResponseMsg:
|
||||
stdout += x.RunCommandFinalResponseMsg.PartialResponse.Stdout
|
||||
stderr += x.RunCommandFinalResponseMsg.PartialResponse.Stderr
|
||||
returnCode = x.RunCommandFinalResponseMsg.ReturnCode
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Runner) sendProtoStruct(p protoreflect.ProtoMessage) error {
|
||||
protoOut, err := proto.Marshal(p)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Could not marshal proto: %w", err)
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
if err := r.conn.Write(ctx, websocket.MessageBinary, protoOut); err != nil {
|
||||
return fmt.Errorf("Could not send proto to websocket: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user