signed-commit remote shell (see also https://github.com/dyne/scorsh)
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
scorsh/scorshd.go

151 lines
3.9 KiB

package main
import (
"flag"
"fmt"
"log"
)
// manage debugging messages
const debug debugging = true
type debugging bool
func (d debugging) log(format string, args ...interface{}) {
if d {
log.Printf(format, args...)
}
}
///////////
var conf_file = flag.String("c", "./scorsh.cfg", "Configuration file for SCORSH")
func SCORSHerr(err int) error {
var err_str string
switch err {
case SCORSH_ERR_NO_FILE:
err_str = "Invalid file name"
case SCORSH_ERR_KEYRING:
err_str = "Invalid keyring"
case SCORSH_ERR_NO_REPO:
err_str = "Invalid repository"
case SCORSH_ERR_NO_COMMIT:
err_str = "Invalid commit ID"
case SCORSH_ERR_SIGNATURE:
err_str = "Invalid signature"
default:
err_str = "Generic Error"
}
return fmt.Errorf("%s", err_str)
}
func FindMatchingWorkers(master *SCORSHmaster, msg *SCORSHmsg) []*SCORSHworker {
var ret []*SCORSHworker
for idx, w := range master.Workers {
if w.Matches(msg.Repo, msg.Branch) {
debug.log("--- Worker: %s matches %s:%s\n", w.Name, msg.Repo, msg.Branch)
ret = append(ret, &(master.Workers[idx]))
}
}
return ret
}
func Master(master *SCORSHmaster) {
// master main loop:
var matching_workers []*SCORSHworker
matching_workers = make([]*SCORSHworker, len(master.Workers))
log.Println("[master] Master started ")
debug.log("[master] StatusChan: %s\n", master.StatusChan)
for {
debug.log("[master] Receive loop...\n")
select {
case push_msg := <-master.Spooler:
// here we manage the stuff we receive from the spooler
debug.log("[master] received message: %s\n", push_msg)
// - lookup the repos map for matching workers
matching_workers = FindMatchingWorkers(master, &push_msg)
debug.log("[master] matching workers: \n%s\n", matching_workers)
// add the message to WorkingMsg, if it's not a duplicate!
if _, ok := master.WorkingMsg[push_msg.Id]; ok {
log.Printf("[master] detected duplicate message %s \n", push_msg.Id)
} else {
master.WorkingMsg[push_msg.Id] = 0
// - dispatch the message to all the matching workers
for _, w := range matching_workers {
debug.log("[master] sending msg to worker: %s\n", w.Name)
// send the message to the worker
w.MsgChan <- push_msg
// increase the counter associated to the message
master.WorkingMsg[push_msg.Id] += 1
debug.log("[master] now WorkingMsg[%s] is: %d\n", push_msg.Id, master.WorkingMsg[push_msg.Id])
}
}
case done_msg := <-master.StatusChan:
// Here we manage a status message from a worker
debug.log("[master] received message from StatusChan: %s\n", done_msg)
if _, ok := master.WorkingMsg[done_msg.Id]; ok && master.WorkingMsg[done_msg.Id] > 0 {
master.WorkingMsg[done_msg.Id] -= 1
if master.WorkingMsg[done_msg.Id] == 0 {
delete(master.WorkingMsg, done_msg.Id)
master.Spooler <- done_msg
}
} else {
log.Printf("[master] received completion event for non-existing message name: %s\n", done_msg.Id)
}
}
}
debug.log("[master] Exiting the for loop, for some mysterious reason...\n")
}
func InitMaster() *SCORSHmaster {
master := ReadGlobalConfig(*conf_file)
master.Repos = make(map[string][]*SCORSHworker)
master.WorkingMsg = make(map[string]int)
// This is the channel on which we receive acks from workers
master.StatusChan = make(chan SCORSHmsg)
// This is the channel on which we exchange messages with the spooler
master.Spooler = make(chan SCORSHmsg)
debug.log("[InitMaster] StatusChan: %s\n", master.StatusChan)
err_workers := StartWorkers(master)
if err_workers != nil {
log.Fatal("Error starting workers: ", err_workers)
} else {
log.Println("Workers started correctly")
}
err_spooler := StartSpooler(master)
if err_spooler != nil {
log.Fatal("Error starting spooler: ", err_spooler)
}
return master
}
func main() {
var done chan int
flag.Parse()
master := InitMaster()
go Master(master)
// wait indefinitely -- we should implement signal handling...
<-done
}