master and worker initialisation (draft)

pull/1/head
KatolaZ 8 years ago
parent b2b083e059
commit 00c61083d7
  1. 19
      config.go
  2. 10
      scorsh.cfg
  3. 29
      scorsh.go
  4. 2
      spooler.go
  5. 31
      types.go
  6. 28
      worker_config.cfg
  7. 128
      workers.go

@ -20,7 +20,9 @@ func ReadGlobalConfig(fname string) *SCORSHmaster {
log.Fatal("Error while reading file: ", err) log.Fatal("Error while reading file: ", err)
} }
var cfg *SCORSHmaster var cfg *SCORSHmaster
cfg = new(SCORSHmaster) cfg = new(SCORSHmaster)
// Unmarshal the YAML configuration file into a SCORSHcfg structure // Unmarshal the YAML configuration file into a SCORSHcfg structure
@ -37,20 +39,22 @@ func ReadGlobalConfig(fname string) *SCORSHmaster {
} }
// Check if the user has set a custom logprefix // Check if the user has set a custom logprefix
if cfg.LogPrefix != "" {
log.SetPrefix(cfg.LogPrefix)
}
// Check if the user wants to redirect the logs to a file // Check if the user wants to redirect the logs to a file
if cfg.Logfile != "" { if cfg.Logfile != "" {
f, err := os.Open(cfg.Logfile) log.Printf("Opening log file: %s\n", cfg.Logfile)
f, err := os.OpenFile(cfg.Logfile, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0600)
if err != nil { if err != nil {
log.SetOutput(io.Writer(f)) log.SetOutput(io.Writer(f))
} else { } else {
log.Printf("Error opening logfile: \n", err) log.Fatal("Error opening logfile: ", cfg.Logfile, err)
} }
} }
if cfg.LogPrefix != "" {
log.SetPrefix(cfg.LogPrefix)
}
// If we got so far, then there is some sort of config in cfg // If we got so far, then there is some sort of config in cfg
log.Printf("Successfully read config from %s\n", fname) log.Printf("Successfully read config from %s\n", fname)
@ -58,7 +62,10 @@ func ReadGlobalConfig(fname string) *SCORSHmaster {
} }
func (cfg *SCORSHmaster_cfg) String() string {
func (cfg *SCORSHmaster) String() string {
var buff bytes.Buffer var buff bytes.Buffer

@ -1,12 +1,6 @@
#
# This is a typical scorsh configuration. We declare here the list of
# workers, with the corresponding repo/branches regular expressions
# and the associated folder
#
--- ---
s_spooldir: "/var/spool/scorsh" s_spooldir: "./spool"
s_logfile: "/var/log/scorsh/scorsh.log" s_logfile: "./scorsh.log"
s_logprefix: "[scorsh]" s_logprefix: "[scorsh]"
s_workers: s_workers:

@ -61,27 +61,48 @@ func Master(master *SCORSHmaster) {
case push_msg = <- master.Spooler: case push_msg = <- master.Spooler:
// - lookup the repos map for matching workers // - lookup the repos map for matching workers
matching_workers = FindMatchingWorkers(master, &push_msg) matching_workers = FindMatchingWorkers(master, &push_msg)
// add the message to PendingMsg
//...
// - dispatch the message to all the matching workers // - dispatch the message to all the matching workers
for _, w := range matching_workers { for _, w := range matching_workers {
w.Chan <- push_msg // increase the counter associated to the message
w.MsgChan <- push_msg
} }
} }
} }
} }
func main() { func InitMaster() *SCORSHmaster {
flag.Parse()
master := ReadGlobalConfig(*conf_file) master := ReadGlobalConfig(*conf_file)
master.Repos = make(map[string][]*SCORSHworker)
master.WorkingMsg = make(map[string]int)
// This is the mutex-channel on which we receive acks from workers
master.StatusChan = make(chan SCORSHmsg, 1)
err_workers := StartWorkers(master) err_workers := StartWorkers(master)
if err_workers != nil { if err_workers != nil {
log.Fatal("Error starting workers: ", err_workers) log.Fatal("Error starting workers: ", err_workers)
} else {
log.Println("Workers started correctly")
} }
err_spooler := StartSpooler(master) err_spooler := StartSpooler(master)
if err_spooler != nil { if err_spooler != nil {
log.Fatal("Error starting spooler: ", err_spooler) log.Fatal("Error starting spooler: ", err_spooler)
} else {
log.Println("Spooler started correctly")
} }
return master
}
func main() {
flag.Parse()
master := InitMaster()
go Master(master) go Master(master)
} }

@ -17,6 +17,8 @@ func parse_request(fname string) (SCORSHmsg, error) {
return ret, SCORSHerr(SCORSH_ERR_NO_FILE) return ret, SCORSHerr(SCORSH_ERR_NO_FILE)
} }
// FIXME: Fill in the ret structure
return ret, nil return ret, nil
} }

@ -15,22 +15,22 @@ const (
// the SCORSHmsg type represents messages received from the spool and // the SCORSHmsg type represents messages received from the spool and
// sent to workers // sent to workers
type SCORSHmsg struct { type SCORSHmsg struct {
name string
repo string repo string
branch string branch string
old_rev string old_rev string
new_rev string new_rev string
} }
type SCORSHcmd struct { type SCORSHcmd struct {
URL string URL string `yaml:"c_url"`
hash string Hash string `yaml:"c_hash"`
} }
type SCORSHtag struct { type SCORSHtag struct {
TagName string Name string `yaml:"t_name"`
Keyrings []string Keyrings []string `yaml:"t_keyrings"`
Commands []SCORSHcmd Commands []SCORSHcmd `yaml:"t_commands"`
} }
// Configuration of a worker // Configuration of a worker
@ -45,16 +45,17 @@ type SCORSHworker_cfg struct {
// State of a worker // State of a worker
type SCORSHworker_state struct { type SCORSHworker_state struct {
Tags map[string]SCORSHtag Tags []SCORSHtag `yaml:"w_tags"`
Keys map[string]openpgp.KeyRing Keys map[string]openpgp.KeyRing
Chan chan SCORSHmsg MsgChan chan SCORSHmsg
StatusChan chan SCORSHmsg
} }
// The type SCORSHworker represents the configuration and state of a // The type SCORSHworker represents the configuration and state of a
// worker // worker
type SCORSHworker struct { type SCORSHworker struct {
SCORSHworker_cfg SCORSHworker_cfg `yaml:",inline"`
SCORSHworker_state SCORSHworker_state `yaml:",inline"`
} }
// Configuration of the master // Configuration of the master
@ -67,13 +68,15 @@ type SCORSHmaster_cfg struct {
// State of the master // State of the master
type SCORSHmaster_state struct { type SCORSHmaster_state struct {
Spooler chan SCORSHmsg Spooler chan SCORSHmsg
Repos map[string][]*SCORSHworker StatusChan chan SCORSHmsg
Repos map[string][]*SCORSHworker
WorkingMsg map[string]int
} }
// The type SCORSHmaster represents the configuration and state of the // The type SCORSHmaster represents the configuration and state of the
// master // master
type SCORSHmaster struct { type SCORSHmaster struct {
SCORSHmaster_cfg SCORSHmaster_cfg `yaml:",inline"`
SCORSHmaster_state SCORSHmaster_state
} }

@ -9,11 +9,11 @@
--- ---
w_tags: w_tags:
[
{ {
t_name: "BUILD", t_name: "BUILD",
t_keyrings: ["build_keyring.asc", "general_keyring.asc"], {
t_commands: [ t_keyrings: ["build_keyring.asc", "general_keyring.asc"],
t_commands: [
{ {
c_url: "file:///home/user/bin/script.sh $1 $2", c_url: "file:///home/user/bin/script.sh $1 $2",
c_hash: "12da324fb76s924acbce" c_hash: "12da324fb76s924acbce"
@ -21,17 +21,19 @@ w_tags:
{ {
c_url: "http://my.server.net/call.pl?branch=$1" c_url: "http://my.server.net/call.pl?branch=$1"
} }
] ]
}, }
{ {
t_name: "PUBLISH", t_name: "PUBLISH",
t_keyrings: ["web_developers.asc"], {
t_commands: [ t_keyrings: ["web_developers.asc"],
{ t_commands: [
c_url: "file:///usr/local/bin/publish.py $repo $branch", {
c_hash: "3234567898765432345678" c_url: "file:///usr/local/bin/publish.py $repo $branch",
} c_hash: "3234567898765432345678"
] }
]
}
} }
] }
... ...

@ -0,0 +1,128 @@
package main
import (
"fmt"
"github.com/go-yaml/yaml"
"golang.org/x/crypto/openpgp"
"io/ioutil"
"log"
"os"
"regexp"
"strings"
)
func (worker *SCORSHworker) Matches(repo, branch string) bool {
for _, r := range worker.Repos {
parts := strings.SplitN(r, ":", 2)
repo_pattern := parts[0]
branch_pattern := parts[1]
repo_match, _ := regexp.MatchString(repo_pattern, repo)
branch_match, _ := regexp.MatchString(branch_pattern, branch)
if repo_match && branch_match {
return true
}
}
return false
}
func (w *SCORSHworker) LoadKeyrings() error {
w.Keys = make(map[string]openpgp.KeyRing, len(w.Keyrings))
// Open the keyring files
for _, keyring := range w.Keyrings {
f, err_file := os.Open(keyring)
if err_file != nil {
log.Printf("[worker] cannot open keyring:", err_file)
f.Close()
return fmt.Errorf("Unable to open keyring: ", err_file)
}
// load the keyring
kr, err_key := openpgp.ReadArmoredKeyRing(f)
if err_key != nil {
log.Printf("[worker] cannot load keyring: ", err_key)
f.Close()
return fmt.Errorf("Unable to load keyring: ", err_key)
}
w.Keys[keyring] = kr
f.Close()
}
return nil
}
// Still to be implemented
func (w *SCORSHworker) LoadTags() error {
w_tags, err := ioutil.ReadFile(w.Tagfile)
if err != nil{
log.Printf("[worker:%s] Cannot read worker config: ", w.Name, err)
return err
}
err = yaml.Unmarshal(w_tags, w.Tags)
if err != nil {
log.Printf("[worker:%s] Error while reading tags: ", w.Name, err)
return err
}
return nil
}
// FIXME--- STILL UNDER HEAVY WORK...
func SCORSHWorker(w *SCORSHworker) {
// This is the main worker loop
for {
select {
case msg := <-w.MsgChan:
// process message
err := walk_commits(msg, w)
if err != nil {
log.Printf("[worker: %s] error in walk_commits: %s", err)
}
}
}
}
// StartWorkers starts all the workers specified in a given
// configuration and fills in the SCORSHmaster struct
func StartWorkers(master *SCORSHmaster) error {
num_workers := len(master.Workers)
// We should now start each worker
for w:=1; w<num_workers; w++ {
worker := & (master.Workers[w])
// Set the Status and Msg channels
worker.StatusChan = master.StatusChan
worker.MsgChan = make(chan SCORSHmsg)
// Load worker keyrings
err := worker.LoadKeyrings()
if err != nil {
log.Printf("[worker: %s] Unable to load keyrings (Exiting): %s\n", worker.Name, err)
close(worker.MsgChan)
return err
}
// Load worker tags from worker.Tagfile
err = worker.LoadTags()
if err != nil {
log.Printf("[worker: %s] Unable to load tags (Exiting): %s\n", worker.Name, err)
close(worker.MsgChan)
return err
}
// Add the repos definitions to the map master.Repos
for _, repo_name := range worker.Repos {
master.Repos[repo_name] = append(master.Repos[repo_name], worker)
}
}
return nil
}
Loading…
Cancel
Save