The pipeline "spooling -> routing -> file deletion" works

lint-refactor
KatolaZ 8 years ago
parent 74f8a74ecf
commit 7df9111d9f
  1. 30
      commits.go
  2. 62
      scorsh.go
  3. 29
      spooler.go
  4. 95
      types.go
  5. 13
      workers.go

@ -4,6 +4,7 @@ import (
"fmt"
"github.com/KatolaZ/git2go"
"golang.org/x/crypto/openpgp"
"log"
"os"
"strings"
// "log"
@ -23,15 +24,15 @@ func CommitToString(commit *git.Commit) string {
}
// FIXME: RETURN THE ENTITY PROVIDED BY THE CHECK, OR nil
func check_signature(commit *git.Commit, keys []*openpgp.KeyRing) (signature, signed string, err error) {
func check_signature(commit *git.Commit, keys *map[string]openpgp.KeyRing) (signature, signed string, err error) {
signature, signed, err = commit.ExtractSignature()
if err == nil {
for _, keyring := range keys {
for _, keyring := range *keys {
_, err_sig :=
openpgp.CheckArmoredDetachedSignature(*keyring, strings.NewReader(signed),
openpgp.CheckArmoredDetachedSignature(keyring, strings.NewReader(signed),
strings.NewReader(signature))
if err_sig == nil {
@ -45,6 +46,14 @@ func check_signature(commit *git.Commit, keys []*openpgp.KeyRing) (signature, si
return "", "", err
}
func find_scorsh_message(commit *git.Commit) (string, error) {
msg := commit.RawMessage()
debug.log("[find_scorsg_msg] found message:\n %s\n", msg)
return msg, nil
}
// traverse all the commits between two references, looking for scorsh
// commands
// fixme: we don't have just one keyring here....
@ -91,12 +100,17 @@ func walk_commits(msg SCORSHmsg, w *SCORSHworker) error {
// check if it can be verified by any of the keyrings associated
// with the scorsh-tag
//signature, signed, err := check_signature(commit, &keyring)
// check if the commit contains a scorsh command
_, err = find_scorsh_message(commit)
//signature, signed, err := check_signature(commit, &w.Keys)
//_, _, err := check_signature(commit, w.keys)
//if err != nil {
// log.Printf("%s\n", SCORSHerr(SCORSH_ERR_SIGNATURE))
//
//}
if err != nil {
log.Printf("[worker: %s] %s\n", w.Name, SCORSHerr(SCORSH_ERR_SIGNATURE))
} else {
}
cur_commit = commit.Parent(0)
} else {
fmt.Printf("Commit %x not found!\n", cur_commit.Id())

@ -62,32 +62,52 @@ func Master(master *SCORSHmaster) {
// master main loop:
var matching_workers []*SCORSHworker
var push_msg SCORSHmsg
matching_workers = make([]*SCORSHworker, len(master.Workers))
log.Println("[master] Master started ")
debug.log("[master] StatusChan: %s\n", master.StatusChan)
for {
debug.log("[master] Receive loop...\n")
select {
// - receive stuff from the spooler
case push_msg = <-master.Spooler:
case push_msg := <-master.Spooler:
// here we manage the stuff we receive from the spooler
debug.log("[master] received message: %s\n", push_msg)
// - lookup the repos map for matching workers
matching_workers = FindMatchingWorkers(master, &push_msg)
debug.log("[master] matching workers: %s\n", matching_workers)
// add the message to PendingMsg
//...
// - dispatch the message to all the matching workers
for _, w := range matching_workers {
// increase the counter associated to the message
w.MsgChan <- push_msg
debug.log("[master] matching workers: \n%s\n", matching_workers)
// add the message to WorkingMsg, if it's not a duplicate!
if _, ok := master.WorkingMsg[push_msg.Id]; ok {
log.Printf("[master] detected duplicate message %s \n", push_msg.Id)
} else {
master.WorkingMsg[push_msg.Id] = 0
// - dispatch the message to all the matching workers
for _, w := range matching_workers {
debug.log("[master] sending msg to worker: %s\n", w.Name)
// send the message to the worker
w.MsgChan <- push_msg
// increase the counter associated to the message
master.WorkingMsg[push_msg.Id] += 1
debug.log("[master] now WorkingMsg[%s] is: %d\n", push_msg.Id, master.WorkingMsg[push_msg.Id])
}
}
case done_msg := <-master.StatusChan:
// Here we manage a status message from a worker
debug.log("[master] received message from StatusChan: %s\n", done_msg)
if _, ok := master.WorkingMsg[done_msg.Id]; ok && master.WorkingMsg[done_msg.Id] > 0 {
master.WorkingMsg[done_msg.Id] -= 1
if master.WorkingMsg[done_msg.Id] == 0 {
delete(master.WorkingMsg, done_msg.Id)
master.Spooler <- done_msg
}
} else {
log.Printf("[master] received completion event for non-existing message name: %s\n", done_msg.Id)
}
}
}
debug.log("[master] Exiting the for loop, for some mysterious reason...\n")
}
func InitMaster() *SCORSHmaster {
@ -96,9 +116,12 @@ func InitMaster() *SCORSHmaster {
master.Repos = make(map[string][]*SCORSHworker)
master.WorkingMsg = make(map[string]int)
// This is the mutex-channel on which we receive acks from workers
master.StatusChan = make(chan SCORSHmsg, 1)
master.Spooler = make(chan SCORSHmsg, 1)
// This is the channel on which we receive acks from workers
master.StatusChan = make(chan SCORSHmsg)
// This is the channel on which we exchange messages with the spooler
master.Spooler = make(chan SCORSHmsg)
debug.log("[InitMaster] StatusChan: %s\n", master.StatusChan)
err_workers := StartWorkers(master)
if err_workers != nil {
@ -111,17 +134,18 @@ func InitMaster() *SCORSHmaster {
log.Fatal("Error starting spooler: ", err_spooler)
}
return master
}
func main() {
var done chan int
flag.Parse()
master := InitMaster()
go Master(master)
<-master.StatusChan
// wait indefinitely -- we should implement signal handling...
<-done
}

@ -6,6 +6,7 @@ import (
"github.com/go-yaml/yaml"
"io/ioutil"
"log"
"os"
// "time"
)
@ -32,28 +33,40 @@ func parse_request(fname string, msg *SCORSHmsg) error {
return nil
}
func spooler(watcher *fsnotify.Watcher, worker chan SCORSHmsg) {
func spooler(watcher *fsnotify.Watcher, master chan SCORSHmsg) {
log.Println("Spooler started correctly")
var msg *SCORSHmsg
msg = new(SCORSHmsg)
for {
select {
case event := <-watcher.Events:
// Here we manage genuine events from fsnotify. We catch the
// "Write" event, which should happen only when the file is
// created
if event.Op == fsnotify.Write {
//time.Sleep(1000 * time.Millisecond)
var msg SCORSHmsg
debug.log("[spooler] new file %s detected\n", event.Name)
err := parse_request(event.Name, msg)
err := parse_request(event.Name, &msg)
if err != nil {
log.Printf("Invalid packet received. [%s]\n", err)
}
debug.log("[spooler] read message: %s\n", msg)
worker <- *msg
msg.Path = event.Name
master <- msg
}
case err := <-watcher.Errors:
log.Println("error:", err)
// here we manage event errors
log.Println("[spooler] error: ", err)
case msg := <-master:
// Here we receive messages from the master about files to be
// removed
log.Printf("[spooler] received deletion request for: %s\n", msg.Path)
err := os.Remove(msg.Path)
if err != nil {
log.Printf("[spooler] error removing file: %s\n", err)
} else {
log.Printf("[spooler] file %s successfully removed\n", msg.Path)
}
}
}
}

@ -2,6 +2,7 @@ package main
import (
"bytes"
"fmt"
"golang.org/x/crypto/openpgp"
)
@ -16,11 +17,12 @@ const (
// the SCORSHmsg type represents messages received from the spool and
// sent to workers
type SCORSHmsg struct {
Name string `yaml:"m_id"`
Id string `yaml:"m_id"`
Repo string `yaml:"m_repo"`
Branch string `yaml:"m_branch"`
Old_rev string `yaml:"m_oldrev"`
New_rev string `yaml:"m_newrev"`
Path string
}
type SCORSHcmd struct {
@ -28,7 +30,7 @@ type SCORSHcmd struct {
Hash string `yaml:"c_hash"`
}
type SCORSHtag struct {
type SCORSHtag_cfg struct {
Name string `yaml:"t_name"`
Keyrings []string `yaml:"t_keyrings"`
Commands []SCORSHcmd `yaml:"t_commands"`
@ -36,13 +38,13 @@ type SCORSHtag struct {
// Configuration of a worker
type SCORSHworker_cfg struct {
Name string `yaml:"w_name"`
Repos []string `yaml:"w_repos"`
Folder string `yaml:"w_folder"`
Logfile string `yaml:"w_logfile"`
Tagfile string `yaml:"w_tagfile"`
Keyrings []string `yaml:"w_keyrings"`
Tags []SCORSHtag `yaml:"w_tags"`
Name string `yaml:"w_name"`
Repos []string `yaml:"w_repos"`
Folder string `yaml:"w_folder"`
Logfile string `yaml:"w_logfile"`
Tagfile string `yaml:"w_tagfile"`
Keyrings []string `yaml:"w_keyrings"`
Tags []SCORSHtag_cfg `yaml:"w_tags"`
}
// State of a worker
@ -82,39 +84,30 @@ type SCORSHmaster struct {
SCORSHmaster_state
}
// client commands
type SCORSHtag struct {
Tag string `yaml:"s_tag"`
Args []string `yaml:"s_args"`
}
type SCORSHclient_msg struct {
Tags []SCORSHtag `yaml:"scorsh"`
}
////////////////////////
func (cfg *SCORSHmaster) String() string {
var buff bytes.Buffer
buff.WriteString("spooldir: ")
buff.WriteString(cfg.Spooldir)
buff.WriteString("\nlogfile: ")
buff.WriteString(cfg.Logfile)
buff.WriteString("\nlogprefix: ")
buff.WriteString(cfg.LogPrefix)
buff.WriteString("\nWorkers: \n")
fmt.Fprintf(&buff, "spooldir: %s\n", cfg.Spooldir)
fmt.Fprintf(&buff, "logfile: %s\n", cfg.Logfile)
fmt.Fprintf(&buff, "logprefix: %s\n", cfg.LogPrefix)
fmt.Fprintf(&buff, "Workers: \n")
for _, w := range cfg.Workers {
buff.WriteString("---\n name: ")
buff.WriteString(w.Name)
buff.WriteString("\n repos: ")
for _, r := range w.Repos {
buff.WriteString("\n ")
buff.WriteString(r)
}
buff.WriteString("\n folder: ")
buff.WriteString(w.Folder)
buff.WriteString("\n logfile: ")
buff.WriteString(w.Logfile)
buff.WriteString("\n tagfile: ")
buff.WriteString(w.Tagfile)
buff.WriteString("\n keyrings: ")
for _, k := range w.Keyrings {
buff.WriteString("\n ")
buff.WriteString(k)
}
buff.WriteString("\n...\n")
fmt.Fprintf(&buff, "%s", &w)
}
return buff.String()
@ -123,16 +116,26 @@ func (cfg *SCORSHmaster) String() string {
func (msg *SCORSHmsg) String() string {
var buff bytes.Buffer
buff.WriteString("\nName: ")
buff.WriteString(msg.Name)
buff.WriteString("\nRepo: ")
buff.WriteString(msg.Repo)
buff.WriteString("\nBranch: ")
buff.WriteString(msg.Branch)
buff.WriteString("\nOld_rev: ")
buff.WriteString(msg.Old_rev)
buff.WriteString("\nNew_rev: ")
buff.WriteString(msg.New_rev)
fmt.Fprintf(&buff, "Id: %s\n", msg.Id)
fmt.Fprintf(&buff, "Repo: %s\n", msg.Repo)
fmt.Fprintf(&buff, "Branch: %s\n", msg.Branch)
fmt.Fprintf(&buff, "Old_Rev: %s\n", msg.Old_rev)
fmt.Fprintf(&buff, "New_rev: %s\n", msg.New_rev)
fmt.Fprintf(&buff, "Path: %s\n", msg.Path)
return buff.String()
}
func (w *SCORSHworker) String() string {
var buff bytes.Buffer
fmt.Fprintf(&buff, "Name: %s\n", w.Name)
fmt.Fprintf(&buff, "Repos: %s\n", w.Repos)
fmt.Fprintf(&buff, "Folder: %s\n", w.Folder)
fmt.Fprintf(&buff, "Logfile: %s\n", w.Logfile)
fmt.Fprintf(&buff, "Tagfile: %s\n", w.Tagfile)
fmt.Fprintf(&buff, "Keyrings: %s\n", w.Keyrings)
return buff.String()
}

@ -9,6 +9,7 @@ import (
"os"
"regexp"
"strings"
"time"
)
func (worker *SCORSHworker) Matches(repo, branch string) bool {
@ -80,20 +81,26 @@ func Worker(w *SCORSHworker) {
var msg SCORSHmsg
log.Printf("[worker: %s] Started\n", w.Name)
debug.log("[worker: %s] MsgChan: %s\n", w.Name, w.MsgChan)
// notify that we have been started!
w.StatusChan <- msg
// This is the main worker loop
for {
select {
case msg = <-w.MsgChan:
debug.log("[worker: %s] received message %s\n", w.Name, msg.Name)
debug.log("[worker: %s] received message %s\n", w.Name, msg.Id)
// process message
// err := walk_commits(msg, w)
// if err != nil {
// log.Printf("[worker: %s] error in walk_commits: %s", err)
// }
log.Printf("[worker: %s] Received message: ", w.Name, msg)
debug.log("[worker: %s] Received message: %s", w.Name, msg)
debug.log("[worker: %s] StatusChan: %s\n", w.Name, w.StatusChan)
time.Sleep(1000 * time.Millisecond)
w.StatusChan <- msg
debug.log("[worker: %s] Sent message back: %s", w.Name, msg)
}
}
}
@ -113,7 +120,7 @@ func StartWorkers(master *SCORSHmaster) error {
worker := &(master.Workers[w])
// Set the Status and Msg channels
worker.StatusChan = master.StatusChan
worker.MsgChan = make(chan SCORSHmsg)
worker.MsgChan = make(chan SCORSHmsg, 10)
// Load worker keyrings
err := worker.LoadKeyrings()
if err != nil {

Loading…
Cancel
Save