2011-05-24

Simple task distribution to worker goroutines -- example

The utility program stsearchsdf that I wrote about previously has a simple supervisor-workers setup for processing the input molecules in parallel. To begin with I have an iterator that reads the input SDF file, and returns the text of the next molecule, on each call to Next(). The input file is represented by a *bufio.Reader as shown here.

// SDFile represents an on-disk file in MDL's SDF format.
type SDFile struct {
    f *os.File
}

// Init initializes an SDFile with the underlying file.
func (s *SDFile) Init(fn string, flag int, mode uint32) os.Error {
    f, e := os.OpenFile(fn, flag, mode)
    if e != nil {
        return e
    }

    s.f = f
    return nil
}

// Close closes the underlying file.
func (s *SDFile) Close() {
    s.f.Close()
}

// SdfIter is an iterator over SDFile.
type SdfIter struct {
    r *bufio.Reader
}

Once we open the input SDF file, we can request for an iterator as follows.

// Iter creates and returns an iterator over the current SDF file's
// molecules.
func (s *SDFile) Iter() *SdfIter {
    it := new(SdfIter)
    it.r, _ = bufio.NewReaderSize(s.f, BUF_SIZE)
    return it
}

This iterator utilizes the sequence of characters that separates consecutive molecules (the sequence is $$$$) to read the text corresponding to one molecule. It answers that text in the success scenario.

// Next reads the text of one molecule from the input buffered stream
// into a slice, and returns the same.
func (it *SdfIter) Next() (v string, b bool, err os.Error) {
    sl := make([]string, 0, SLICE_SIZE)

    for {
        s, e := it.r.ReadSlice(REC_SEP_H)
        if e != nil {
            err = io.ErrUnexpectedEOF
            return
        }
        sl = append(sl, string(s))
        s, e = it.r.ReadSlice('\n')
        if e != nil {
            err = io.ErrUnexpectedEOF
            return
        }
        sl = append(sl, string(s))

        if REC_SEP_T == strings.TrimRightFunc(string(s), IsWspace) {
            b = true
            break
        }
    }
    v = strings.Join(sl, "")

    return
}

This iterator is then made use of by the supervisor to read the molecules one at a time. Meanwhile, the supervisor creates the necessary communication channels. Since the nature of the actual task varies based on the user's request, the function that represents the task itself (simply denoted fn here) is injected into the supervisor loop function. opts.mx determines the number of goroutines to spawn, and is taken from the command line.

    tchs := make([]chan TMsg, opts.mx)
    rch := make(chan RMsg, opts.mx)
    for j := 0; j < int(opts.mx); j++ {
        tch := make(chan TMsg)
        tchs[j] = tch
        go fn(j, tch, rch)
    }

The supervisor keeps track of whether the entire input file is processed, whether the search result has been found, and the number of live worker goroutines. When there are no more worker goroutines, the supervisor loop itself can exit.

    // Loop over the file.
    found := false
    eof := false
    liveHelpers := opts.mx
L1: for {
        if 0 == liveHelpers {
            break L1
        }

When the result has been found or the input file is exhausted or there was an error, all workers are sent termination messages, which each of them acknowledges. The acknowledgement is shown later.

        select {
        case rmsg := <-rch:
            if (-1 == rmsg.mn) && (!rmsg.b) {
                liveHelpers--
                continue L1
            }

            if !found && rmsg.b {
                fmt.Println("-- Molecule number :", rmsg.mn, "\n")
                fmt.Println(rmsg.v)
                found = true
            }

            if found || eof {
                tchs[rmsg.id] <- TMsg{-1, nil, nil}
                continue L1
            }

We are now ready for the task distribution itself. A defined number of molecules serves as the quantum task size. This grouping helps in reducing the number of messages sent back-and-forth over the channels.

            var tmsg TMsg
            tmsg.mn = i
            tmsg.opts = opts
            tmsg.v = make([]string, 0, TASK_SIZE)
            k := 0
L2:         for k = 0; k < TASK_SIZE; k++ {
                v, b, _ := it.Next()
                if !b {
                    break L2 // TODO: This may need better handling.
                }
                tmsg.v = append(tmsg.v, v)
                i++
                if i >= opts.to {
                    break L2
                }
            }

            if k > 0 {
                tchs[rmsg.id] <- tmsg
            } else {
                tchs[rmsg.id] <- TMsg{-1, nil, nil}
                continue L1
            }
        }
    }

Once we exit the above loop, we can close the receiving channel since all the workers will have exited by then.

Now we take a look at one of the task functions that is run in a worker goroutine. As soon as a worker is created, it sends an idle message to the supervisor.

func searchaHelper(id int, tch chan TMsg, rch chan RMsg) {
    rch <- RMsg{id, 0, "", false, nil}

In response, the supervisor starts assigning work to it. The task handling loop is quite simple: it exits when it receives a termination request (which it first acknowledges); else it processes input data.

    for {
        tmsg := <-tch
        if -1 == tmsg.mn {
            rch <- RMsg{id, -1, "", false, nil}
            close(tch)
            break
        }

        var rmsg RMsg
        rmsg.id = id
        rmsg.b = false
        for k := 0; k < len(tmsg.v); k++ {
            m := mdl.NewMolecule()
            rmsg.e = m.ParseMol(strings.Split(tmsg.v[k], "\n", -1), skip)
            if nil == rmsg.e {
                if m.HasAtoms(tmsg.opts.acount, tmsg.opts.comp) {
                    rmsg.mn = tmsg.mn + k + 1
                    rmsg.v = tmsg.v[k]
                    rmsg.b = true
                    break
                }
            }
        }

        rch <- rmsg
    }

Finally, depending on the actual outcome of the search, we show an appropriate message to the user.

The above is conceptually simple, and easy to understand. For production, we have to introduce the requisite error handling code, of course. But, Go makes it really easy to think about the task distribution structure at a reasonably high level. What is more, it allows us to implement such a structure in a straight-forward manner, too!

No comments: