From 00c61083d7139f19b8d99dfc7ac6d7e22c4f9a78 Mon Sep 17 00:00:00 2001 From: KatolaZ Date: Thu, 13 Jul 2017 07:55:44 +0100 Subject: [PATCH] master and worker initialisation (draft) --- config.go | 23 ++++++--- scorsh.cfg | 12 ++--- scorsh.go | 31 +++++++++-- spooler.go | 2 + types.go | 31 ++++++----- worker_config.cfg | 28 +++++----- workers.go | 128 ++++++++++++++++++++++++++++++++++++++++++++++ 7 files changed, 206 insertions(+), 49 deletions(-) create mode 100644 workers.go diff --git a/config.go b/config.go index 15e234b..e64a35c 100644 --- a/config.go +++ b/config.go @@ -20,9 +20,11 @@ func ReadGlobalConfig(fname string) *SCORSHmaster { log.Fatal("Error while reading file: ", err) } + var cfg *SCORSHmaster - cfg = new(SCORSHmaster) + cfg = new(SCORSHmaster) + // Unmarshal the YAML configuration file into a SCORSHcfg structure err = yaml.Unmarshal(data, cfg) if err != nil { @@ -30,27 +32,29 @@ func ReadGlobalConfig(fname string) *SCORSHmaster { } fmt.Printf("%s", cfg) - + // If the user has not set a spooldir, crash loudly if cfg.Spooldir == "" { log.Fatal("No spooldir defined in ", fname, ". Exiting\n") } // Check if the user has set a custom logprefix - if cfg.LogPrefix != "" { - log.SetPrefix(cfg.LogPrefix) - } // Check if the user wants to redirect the logs to a file if cfg.Logfile != "" { - f, err := os.Open(cfg.Logfile) + log.Printf("Opening log file: %s\n", cfg.Logfile) + f, err := os.OpenFile(cfg.Logfile, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0600) if err != nil { log.SetOutput(io.Writer(f)) } else { - log.Printf("Error opening logfile: \n", err) + log.Fatal("Error opening logfile: ", cfg.Logfile, err) } } + if cfg.LogPrefix != "" { + log.SetPrefix(cfg.LogPrefix) + } + // If we got so far, then there is some sort of config in cfg log.Printf("Successfully read config from %s\n", fname) @@ -58,7 +62,10 @@ func ReadGlobalConfig(fname string) *SCORSHmaster { } -func (cfg *SCORSHmaster_cfg) String() string { + + + +func (cfg *SCORSHmaster) String() string { var buff bytes.Buffer diff --git a/scorsh.cfg b/scorsh.cfg index a1320a3..01e8def 100644 --- a/scorsh.cfg +++ b/scorsh.cfg @@ -1,12 +1,6 @@ -# -# This is a typical scorsh configuration. We declare here the list of -# workers, with the corresponding repo/branches regular expressions -# and the associated folder -# - --- -s_spooldir: "/var/spool/scorsh" -s_logfile: "/var/log/scorsh/scorsh.log" +s_spooldir: "./spool" +s_logfile: "./scorsh.log" s_logprefix: "[scorsh]" s_workers: @@ -50,4 +44,4 @@ s_workers: ], } ] -... \ No newline at end of file +... diff --git a/scorsh.go b/scorsh.go index 1d345b4..dceb6d7 100644 --- a/scorsh.go +++ b/scorsh.go @@ -61,27 +61,48 @@ func Master(master *SCORSHmaster) { case push_msg = <- master.Spooler: // - lookup the repos map for matching workers matching_workers = FindMatchingWorkers(master, &push_msg) + // add the message to PendingMsg + //... // - dispatch the message to all the matching workers for _, w := range matching_workers { - w.Chan <- push_msg + // increase the counter associated to the message + w.MsgChan <- push_msg } } } } -func main() { - - flag.Parse() +func InitMaster() *SCORSHmaster { master := ReadGlobalConfig(*conf_file) - + + master.Repos = make(map[string][]*SCORSHworker) + master.WorkingMsg = make(map[string]int) + // This is the mutex-channel on which we receive acks from workers + master.StatusChan = make(chan SCORSHmsg, 1) + err_workers := StartWorkers(master) if err_workers != nil { log.Fatal("Error starting workers: ", err_workers) + } else { + log.Println("Workers started correctly") } err_spooler := StartSpooler(master) if err_spooler != nil { log.Fatal("Error starting spooler: ", err_spooler) + } else { + log.Println("Spooler started correctly") } + return master + +} + + +func main() { + + flag.Parse() + + master := InitMaster() + go Master(master) } diff --git a/spooler.go b/spooler.go index 8d7bdc9..e11b980 100644 --- a/spooler.go +++ b/spooler.go @@ -16,6 +16,8 @@ func parse_request(fname string) (SCORSHmsg, error) { log.Printf("Unable to open file: %s\n", fname) return ret, SCORSHerr(SCORSH_ERR_NO_FILE) } + + // FIXME: Fill in the ret structure return ret, nil diff --git a/types.go b/types.go index b14a103..3ccb6cc 100644 --- a/types.go +++ b/types.go @@ -15,22 +15,22 @@ const ( // the SCORSHmsg type represents messages received from the spool and // sent to workers type SCORSHmsg struct { + name string repo string branch string old_rev string new_rev string } - type SCORSHcmd struct { - URL string - hash string + URL string `yaml:"c_url"` + Hash string `yaml:"c_hash"` } type SCORSHtag struct { - TagName string - Keyrings []string - Commands []SCORSHcmd + Name string `yaml:"t_name"` + Keyrings []string `yaml:"t_keyrings"` + Commands []SCORSHcmd `yaml:"t_commands"` } // Configuration of a worker @@ -45,16 +45,17 @@ type SCORSHworker_cfg struct { // State of a worker type SCORSHworker_state struct { - Tags map[string]SCORSHtag - Keys map[string]openpgp.KeyRing - Chan chan SCORSHmsg + Tags []SCORSHtag `yaml:"w_tags"` + Keys map[string]openpgp.KeyRing + MsgChan chan SCORSHmsg + StatusChan chan SCORSHmsg } // The type SCORSHworker represents the configuration and state of a // worker type SCORSHworker struct { - SCORSHworker_cfg - SCORSHworker_state + SCORSHworker_cfg `yaml:",inline"` + SCORSHworker_state `yaml:",inline"` } // Configuration of the master @@ -67,13 +68,15 @@ type SCORSHmaster_cfg struct { // State of the master type SCORSHmaster_state struct { - Spooler chan SCORSHmsg - Repos map[string][]*SCORSHworker + Spooler chan SCORSHmsg + StatusChan chan SCORSHmsg + Repos map[string][]*SCORSHworker + WorkingMsg map[string]int } // The type SCORSHmaster represents the configuration and state of the // master type SCORSHmaster struct { - SCORSHmaster_cfg + SCORSHmaster_cfg `yaml:",inline"` SCORSHmaster_state } diff --git a/worker_config.cfg b/worker_config.cfg index 5173b6f..a156ac8 100644 --- a/worker_config.cfg +++ b/worker_config.cfg @@ -9,11 +9,11 @@ --- w_tags: - [ { t_name: "BUILD", - t_keyrings: ["build_keyring.asc", "general_keyring.asc"], - t_commands: [ + { + t_keyrings: ["build_keyring.asc", "general_keyring.asc"], + t_commands: [ { c_url: "file:///home/user/bin/script.sh $1 $2", c_hash: "12da324fb76s924acbce" @@ -21,17 +21,19 @@ w_tags: { c_url: "http://my.server.net/call.pl?branch=$1" } - ] - }, + ] + } { t_name: "PUBLISH", - t_keyrings: ["web_developers.asc"], - t_commands: [ - { - c_url: "file:///usr/local/bin/publish.py $repo $branch", - c_hash: "3234567898765432345678" - } - ] + { + t_keyrings: ["web_developers.asc"], + t_commands: [ + { + c_url: "file:///usr/local/bin/publish.py $repo $branch", + c_hash: "3234567898765432345678" + } + ] + } } - ] + } ... \ No newline at end of file diff --git a/workers.go b/workers.go new file mode 100644 index 0000000..d5462c1 --- /dev/null +++ b/workers.go @@ -0,0 +1,128 @@ +package main + +import ( + "fmt" + "github.com/go-yaml/yaml" + "golang.org/x/crypto/openpgp" + "io/ioutil" + "log" + "os" + "regexp" + "strings" +) + +func (worker *SCORSHworker) Matches(repo, branch string) bool { + + for _, r := range worker.Repos { + parts := strings.SplitN(r, ":", 2) + repo_pattern := parts[0] + branch_pattern := parts[1] + repo_match, _ := regexp.MatchString(repo_pattern, repo) + branch_match, _ := regexp.MatchString(branch_pattern, branch) + if repo_match && branch_match { + return true + } + } + return false +} + +func (w *SCORSHworker) LoadKeyrings() error { + + w.Keys = make(map[string]openpgp.KeyRing, len(w.Keyrings)) + + // Open the keyring files + for _, keyring := range w.Keyrings { + f, err_file := os.Open(keyring) + + if err_file != nil { + log.Printf("[worker] cannot open keyring:", err_file) + f.Close() + return fmt.Errorf("Unable to open keyring: ", err_file) + } + + // load the keyring + kr, err_key := openpgp.ReadArmoredKeyRing(f) + + if err_key != nil { + log.Printf("[worker] cannot load keyring: ", err_key) + f.Close() + return fmt.Errorf("Unable to load keyring: ", err_key) + } + w.Keys[keyring] = kr + f.Close() + } + return nil +} + +// Still to be implemented +func (w *SCORSHworker) LoadTags() error { + + w_tags, err := ioutil.ReadFile(w.Tagfile) + if err != nil{ + log.Printf("[worker:%s] Cannot read worker config: ", w.Name, err) + return err + } + + err = yaml.Unmarshal(w_tags, w.Tags) + + if err != nil { + log.Printf("[worker:%s] Error while reading tags: ", w.Name, err) + return err + } + + + return nil +} + +// FIXME--- STILL UNDER HEAVY WORK... +func SCORSHWorker(w *SCORSHworker) { + + + // This is the main worker loop + for { + select { + case msg := <-w.MsgChan: + // process message + err := walk_commits(msg, w) + if err != nil { + log.Printf("[worker: %s] error in walk_commits: %s", err) + } + } + } +} + +// StartWorkers starts all the workers specified in a given +// configuration and fills in the SCORSHmaster struct +func StartWorkers(master *SCORSHmaster) error { + + num_workers := len(master.Workers) + + // We should now start each worker + + for w:=1; w