Compare commits

...

2 Commits

Author SHA1 Message Date
restitux 6fc9af8280 Implement skeleton of RunCommand 2023-01-01 13:44:09 -07:00
restitux ff78167325 Cleanup runnermanager code 2023-01-01 13:43:45 -07:00
5 changed files with 190 additions and 136 deletions
+39 -6
View File
@@ -19,7 +19,8 @@ var log = logging.MustGetLogger("cursorius-server")
type ApiServer struct {
getRunnerCh chan runnermanager.GetRunnerRequest
allocatedRunners map[uuid.UUID]RunnerWrapper
allocatedRunners map[uuid.UUID]*RunnerWrapper
allocatedRunnersMutex sync.RWMutex
}
type RunnerWrapper struct {
@@ -27,6 +28,23 @@ type RunnerWrapper struct {
mutex sync.Mutex
}
func (r *RunnerWrapper) RunCommand(cmd string) (int64, string, string, error) {
r.mutex.Unlock()
defer r.mutex.Lock()
return_code, stdout, stderr, err := r.runner.RunCommand(cmd)
// TODO: run command by sending websocket packet
// TODO: get stdout and stderr response
return return_code, stdout, stderr, err
}
func (s *ApiServer) GetRunnerFromMap(uuid uuid) *RunnerWrapper {
s.allocatedRunnersMutex.RLock()
defer s.allocatedRunnersMutex.RUnlock()
return s.allocatedRunners[uuid]
}
func (s *ApiServer) GetRunner(
ctx context.Context,
req *connect.Request[apiv1.GetRunnerRequest],
@@ -54,7 +72,9 @@ func (s *ApiServer) GetRunner(
runnerUuid := uuid.New()
s.allocatedRunners[runnerUuid] = RunnerWrapper{runner: response.Runner}
s.allocatedRunnersMutex.Lock()
s.allocatedRunners[runnerUuid] = &RunnerWrapper{runner: response.Runner}
s.allocatedRunnersMutex.Unlock()
res := connect.NewResponse(&apiv1.GetRunnerResponse{
Id: runnerUuid.String(),
@@ -68,10 +88,23 @@ func (s *ApiServer) RunCommand(
req *connect.Request[apiv1.RunCommandRequest],
) (*connect.Response[apiv1.RunCommandResponse], error) {
uuid, err := uuid.Parse(req.Msg.Id)
if err != nil {
return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("Invalid runner id"))
}
runner := s.GetRunnerFromMap(uuid)
return_code, stdout, stderr, err := runner.RunCommand(req.Msg.Command)
if err != nil {
log.Errorf("Could not run command on runner \"%v\", %v", runner.runner.Id(), err)
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("Could not run command"))
}
res := connect.NewResponse(&apiv1.RunCommandResponse{
ReturnCode: 0,
Stdout: "",
Stderr: "",
ReturnCode: return_code,
Stdout: stdout,
Stderr: stderr,
})
res.Header().Set("RunCommand-Version", "v1")
return res, nil
@@ -80,7 +113,7 @@ func (s *ApiServer) RunCommand(
func CreateHandler(mux *http.ServeMux, getRunnerCh chan runnermanager.GetRunnerRequest) {
api_server := &ApiServer{
getRunnerCh: getRunnerCh,
allocatedRunners: make(map[uuid.UUID]RunnerWrapper),
allocatedRunners: make(map[uuid.UUID]*RunnerWrapper),
}
path, handler := apiv1connect.NewGetRunnerServiceHandler(api_server)
mux.Handle(path, handler)
+1 -1
View File
@@ -5,7 +5,7 @@ package api.v1;
option go_package = "git.ohea.xyz/cursorius/server/proto/gen/api/v1;apiv1";
message RunCommandRequest {
int64 runner_id = 1;
string id = 1;
string command = 2;
}
+25 -26
View File
@@ -25,7 +25,7 @@ type RunCommandRequest struct {
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
RunnerId int64 `protobuf:"varint,1,opt,name=runner_id,json=runnerId,proto3" json:"runner_id,omitempty"`
Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
Command string `protobuf:"bytes,2,opt,name=command,proto3" json:"command,omitempty"`
}
@@ -61,11 +61,11 @@ func (*RunCommandRequest) Descriptor() ([]byte, []int) {
return file_api_v1_run_command_proto_rawDescGZIP(), []int{0}
}
func (x *RunCommandRequest) GetRunnerId() int64 {
func (x *RunCommandRequest) GetId() string {
if x != nil {
return x.RunnerId
return x.Id
}
return 0
return ""
}
func (x *RunCommandRequest) GetCommand() string {
@@ -143,28 +143,27 @@ var File_api_v1_run_command_proto protoreflect.FileDescriptor
var file_api_v1_run_command_proto_rawDesc = []byte{
0x0a, 0x18, 0x61, 0x70, 0x69, 0x2f, 0x76, 0x31, 0x2f, 0x72, 0x75, 0x6e, 0x5f, 0x63, 0x6f, 0x6d,
0x6d, 0x61, 0x6e, 0x64, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x06, 0x61, 0x70, 0x69, 0x2e,
0x76, 0x31, 0x22, 0x4a, 0x0a, 0x11, 0x52, 0x75, 0x6e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64,
0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1b, 0x0a, 0x09, 0x72, 0x75, 0x6e, 0x6e, 0x65,
0x72, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x08, 0x72, 0x75, 0x6e, 0x6e,
0x65, 0x72, 0x49, 0x64, 0x12, 0x18, 0x0a, 0x07, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x18,
0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x22, 0x65,
0x0a, 0x12, 0x52, 0x75, 0x6e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x52, 0x65, 0x73, 0x70,
0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1f, 0x0a, 0x0b, 0x72, 0x65, 0x74, 0x75, 0x72, 0x6e, 0x5f, 0x63,
0x6f, 0x64, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0a, 0x72, 0x65, 0x74, 0x75, 0x72,
0x6e, 0x43, 0x6f, 0x64, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x64, 0x6f, 0x75, 0x74, 0x18,
0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x73, 0x74, 0x64, 0x6f, 0x75, 0x74, 0x12, 0x16, 0x0a,
0x06, 0x73, 0x74, 0x64, 0x65, 0x72, 0x72, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x73,
0x74, 0x64, 0x65, 0x72, 0x72, 0x32, 0x5a, 0x0a, 0x11, 0x52, 0x75, 0x6e, 0x43, 0x6f, 0x6d, 0x6d,
0x61, 0x6e, 0x64, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x45, 0x0a, 0x0a, 0x52, 0x75,
0x6e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x12, 0x19, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x76,
0x31, 0x2e, 0x52, 0x75, 0x6e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x52, 0x65, 0x71, 0x75,
0x65, 0x73, 0x74, 0x1a, 0x1a, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x75, 0x6e,
0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22,
0x00, 0x42, 0x36, 0x5a, 0x34, 0x67, 0x69, 0x74, 0x2e, 0x6f, 0x68, 0x65, 0x61, 0x2e, 0x78, 0x79,
0x7a, 0x2f, 0x63, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x69, 0x75, 0x73, 0x2f, 0x73, 0x65, 0x72, 0x76,
0x65, 0x72, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x67, 0x65, 0x6e, 0x2f, 0x61, 0x70, 0x69,
0x2f, 0x76, 0x31, 0x3b, 0x61, 0x70, 0x69, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f,
0x33,
0x76, 0x31, 0x22, 0x3d, 0x0a, 0x11, 0x52, 0x75, 0x6e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64,
0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20,
0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x18, 0x0a, 0x07, 0x63, 0x6f, 0x6d, 0x6d, 0x61,
0x6e, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e,
0x64, 0x22, 0x65, 0x0a, 0x12, 0x52, 0x75, 0x6e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x52,
0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1f, 0x0a, 0x0b, 0x72, 0x65, 0x74, 0x75, 0x72,
0x6e, 0x5f, 0x63, 0x6f, 0x64, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0a, 0x72, 0x65,
0x74, 0x75, 0x72, 0x6e, 0x43, 0x6f, 0x64, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x64, 0x6f,
0x75, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x73, 0x74, 0x64, 0x6f, 0x75, 0x74,
0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x64, 0x65, 0x72, 0x72, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09,
0x52, 0x06, 0x73, 0x74, 0x64, 0x65, 0x72, 0x72, 0x32, 0x5a, 0x0a, 0x11, 0x52, 0x75, 0x6e, 0x43,
0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x45, 0x0a,
0x0a, 0x52, 0x75, 0x6e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x12, 0x19, 0x2e, 0x61, 0x70,
0x69, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x75, 0x6e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x52,
0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1a, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x76, 0x31, 0x2e,
0x52, 0x75, 0x6e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e,
0x73, 0x65, 0x22, 0x00, 0x42, 0x36, 0x5a, 0x34, 0x67, 0x69, 0x74, 0x2e, 0x6f, 0x68, 0x65, 0x61,
0x2e, 0x78, 0x79, 0x7a, 0x2f, 0x63, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x69, 0x75, 0x73, 0x2f, 0x73,
0x65, 0x72, 0x76, 0x65, 0x72, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x67, 0x65, 0x6e, 0x2f,
0x61, 0x70, 0x69, 0x2f, 0x76, 0x31, 0x3b, 0x61, 0x70, 0x69, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72,
0x6f, 0x74, 0x6f, 0x33,
}
var (
+26
View File
@@ -0,0 +1,26 @@
package runnermanager
import "nhooyr.io/websocket"
//var log = logging.MustGetLogger("cursorius-server")
type RunnerData struct {
msgType websocket.MessageType
data []byte
}
type Runner struct {
id string
tags []string
conn *websocket.Conn
receiveChan chan RunnerData
running bool
}
func (r *Runner) Id() string {
return r.id
}
func (r *Runner) RunCommand() (int64, string, string, error) {
return 0, "", "", nil
}
+32 -36
View File
@@ -21,18 +21,6 @@ type RunnerRegistration struct {
conn *websocket.Conn
}
type RunnerData struct {
msgType websocket.MessageType
data []byte
}
type Runner struct {
id string
tags []string
conn *websocket.Conn
receiveChan chan RunnerData
running bool
}
type runnerManager struct {
getRunnerCh chan GetRunnerRequest
registerCh chan RunnerRegistration
@@ -55,15 +43,10 @@ type runnerJob struct {
URL string
}
func runRunnerManager(r runnerManager) {
for {
msgCase:
select {
case request := <-r.getRunnerCh:
func (r *runnerManager) processRequest(req GetRunnerRequest) {
var runnerTagsStr strings.Builder
fmt.Fprintf(&runnerTagsStr, "%v", request.Tags[0])
for _, tag := range request.Tags[1:] {
fmt.Fprintf(&runnerTagsStr, "%v", req.Tags[0])
for _, tag := range req.Tags[1:] {
fmt.Fprintf(&runnerTagsStr, " %v", tag)
}
log.Infof("Got request for runner with tags \"%v\"", runnerTagsStr.String())
@@ -83,6 +66,7 @@ func runRunnerManager(r runnerManager) {
case _, ok := <-runner.receiveChan:
if ok {
// this should never happen
// TODO: should we disconnect the runner?
log.Errorf("Recieved data from inactive runner %v, this is a bug", runner.id)
continue
}
@@ -92,11 +76,11 @@ func runRunnerManager(r runnerManager) {
r.connectedRunners = r.connectedRunners[:len(r.connectedRunners)-1]
default:
runner.running = true
request.RespChan <- GetRunnerResponse{
req.RespChan <- GetRunnerResponse{
Runner: runner,
Err: nil,
}
break msgCase
return
}
}
errorMsg := "could not find valid runner"
@@ -104,20 +88,22 @@ func runRunnerManager(r runnerManager) {
errorMsg = "no connected runners"
}
log.Errorf("Could not allocate runner with tags \"%v\": %v", runnerTagsStr.String(), errorMsg)
request.RespChan <- GetRunnerResponse{
req.RespChan <- GetRunnerResponse{
Runner: Runner{},
Err: fmt.Errorf("Could not allocate runner: %v", errorMsg),
}
case registration := <-r.registerCh:
log.Debugf("New runner appeared with id: %v and secret: %v", registration.Id, registration.Secret)
if configuredRunner, doesExist := r.configuredRunners[registration.Id]; doesExist {
if configuredRunner.Secret == registration.Secret {
log.Infof("Registering runner \"%v\" with tags %v", registration.Id, registration.Tags)
}
func (r *runnerManager) processRegistration(reg RunnerRegistration) {
log.Debugf("New runner appeared with id: %v and secret: %v", reg.Id, reg.Secret)
if configuredRunner, doesExist := r.configuredRunners[reg.Id]; doesExist {
if configuredRunner.Secret == reg.Secret {
log.Infof("Registering runner \"%v\" with tags %v", reg.Id, reg.Tags)
runner := Runner{
id: registration.Id,
tags: registration.Tags,
conn: registration.conn,
id: reg.Id,
tags: reg.Tags,
conn: reg.conn,
receiveChan: make(chan RunnerData),
running: false,
}
@@ -126,7 +112,7 @@ func runRunnerManager(r runnerManager) {
// this is required to keep the connection functioning
go func() {
for {
msgType, data, err := registration.conn.Read(context.Background())
msgType, data, err := reg.conn.Read(context.Background())
if err != nil {
// TODO: this is still racy, since a runner could be alloctade between the
// connection returning an err and the channel closing
@@ -143,13 +129,23 @@ func runRunnerManager(r runnerManager) {
}()
} else {
log.Errorf("Disconnecting runner with id: %v and invalid secret: %v", registration.Id, registration.Secret)
registration.conn.Close(websocket.StatusNormalClosure, "registration invalid")
log.Errorf("Disconnecting runner with id: %v and invalid secret: %v", reg.Id, registration.Secret)
reg.conn.Close(websocket.StatusNormalClosure, "registration invalid")
}
} else {
log.Errorf("Disconnecting runner with invalid id: %v", registration.Id)
registration.conn.Close(websocket.StatusNormalClosure, "registration invalid")
log.Errorf("Disconnecting runner with invalid id: %v", reg.Id)
reg.conn.Close(websocket.StatusNormalClosure, "registration invalid")
}
}
func runRunnerManager(r runnerManager) {
for {
select {
case request := <-r.getRunnerCh:
r.processRequest(request)
case registration := <-r.registerCh:
r.processRegistration(registration)
}
}
}