Using Golang to simply implement seven cattle image processing API

  • 2020-05-30 20:23:55
  • OfStack

Before 1, I used qiniu's storage service to generate thumbnails of pictures, fuzzy pictures and webp of videos. Now I need to move the storage to s3. Then I need to write these pictures and videos by myself.

Analyze requirements

First look at how the interface of qiniu handles images. For example, capture the image in the first second of the video, then thumbnail the image, and finally save it to a new key vframe/jpg/offset/1|imageMogr2/thumbnail/400x|saveas/xxx , you can see that the three operations are separated by the | symbol, similar to the pipe operation of unix.

The above operation counts as 1 cmd, 1 API request can process multiple cmd at the same time, cmd is split by semicolon, after processing, the processing result is returned in the callback, for example


{
    "id": "xxxxx",
    "pipeline": "xxx",
    "code": 0,
    "desc": "The fop was completed successfully",
    "reqid": "xTsAAFnxUbR5J10U",
    "inputBucket": "xxx",
    "inputKey": "xxxxx",
    "items": [
        {
            "cmd": "vframe/jpg/offset/1|imageMogr2/thumbnail/400x|saveas/ZmFtZS1wcml2YXRlOm1vbWVudC9jb3Zlci9zbmFwL3ZpZGVvL2M5YzdjZjQ5LTU3NGQtNGZjMS1iZDFkLTRkYjZkMzlkZWY1Ni8wLzA=",
            "code": 0,
            "desc": "The fop was completed successfully",
            "hash": "FhdN6V8EI4vW4XJGALSfxutvMEIv",
            "key": "xx",
            "returnOld": 0
        },
        {
            "cmd": "vframe/jpg/offset/1|imageMogr2/thumbnail/400x|imageMogr2/blur/45x8|saveas/ZmFtZS1wcml2YXRlOm1vbWVudC9jb3Zlci9zbmFwL3ZpZGVvL2M5YzdjZjQ5LTU3NGQtNGZjMS1iZDFkLTRkYjZkMzlkZWY1Ni8wLzBfYmx1cg==",
            "code": 0,
            "desc": "The fop was completed successfully",
            "hash": "FgNiRzrCsa7TZx1xVSb_4d5TiaK3",
            "key": "xxx",
            "returnOld": 0
        }
    ]
}

Decomposition of demand

This program generally needs the following parts:

1 http interface, accept the task, accept the task to the queue, return 1 job ID. worker asynchronous processing tasks, the number of worker and the number of parallel processing per worker can be configured, and worker has a retry mechanism.
Parse out the tasks that need to be done from job payload, parse out each cmd, preferably execute each cmd in parallel, and record the results of each cmd

There are multiple operation in each cmd, connected by pipe, and the output of the first operaion is the input of the second operation

We have written a model of worker before, and we refer to this article Handling 1 Million Requests Minute with Go for reference. It is more detailed. go channel is used as queue, and I added an beanstalk as providor of queue. Another improvement is that only the number of worker is provided in this article. I added another parameter to set the number of cooutines that each worker can execute in parallel. So let's focus on the 3 and 2 solutions

Pipe

You can refer to the library pipe, which is used as follows:


p := pipe.Line(
    pipe.ReadFile("test.png"),
    resize(300, 300),
    blur(0.5),
) output, err := pipe.CombinedOutput(p)
if err != nil {
    fmt.Printf("%v\n", err)
} buf := bytes.NewBuffer(output)
img, _ := imaging.Decode(buf) imaging.Save(img, "test_a.png")

It is more convenient to build one Cmd struct, put one []Op slice into each Operation parameter by regular matching 1, and finally execute struct and the method as follows:


type Cmd struct {
    cmd    string
    saveas string
    ops    []Op
    err    error
} type Op interface {
    getPipe() pipe.Pipe
} type ResizeOp struct {
    width, height int
} func (c ResizeOp) getPipe() pipe.Pipe {
    return resize(c.width, c.height)
} // Method of use
cmdStr := `file/test.png|thumbnail/x300|blur/20x8`
cmd := Cmd{cmdStr, "test_b.png", nil, nil} cmd.parse()
cmd.doOps()
sync.WaitGroup

After a single cmd is processed and solved, it is the parallel problem of multiple cmd. There is nothing to think about, and it can be solved perfectly with sync and WaitGroup directly. Step by step, let's take a look at the usage of struct:


func main() {
    cmds := []string{}
    for i := 0; i < 10000; i++ {
        cmds = append(cmds, fmt.Sprintf("cmd-%d", i))
    }     results := handleCmds(cmds)     fmt.Println(len(results)) // 10000
} func doCmd(cmd string) string {
    return fmt.Sprintf("cmd=%s", cmd)
} func handleCmds(cmds []string) (results []string) {
    fmt.Println(len(cmds)) //10000
    var count uint64     group := sync.WaitGroup{}
    lock := sync.Mutex{}
    for _, item := range cmds {
        // Count to add 1
        group.Add(1)
        go func(cmd string) {
            result := doCmd(cmd)
            atomic.AddUint64(&count, 1)             lock.Lock()
            results = append(results, result)
            lock.Unlock()
           
            // Count reduction 1
            group.Done()
        }(item)
    }     // blocking
    group.Wait()     fmt.Printf("count=%d \n", count) // 10000
    return
}

group is essentially about a counter, a count > At 0, group.Wait () will block until the count == 0. Another point to note here is that the operation of results = append(results, result) is thread unsafe.

We build one BenchCmd to store cmds. It is as follows:


type BenchCmd struct {
    cmds      []Cmd
    waitGroup sync.WaitGroup
    errs      []error
    lock      sync.Mutex
} func (b *BenchCmd) doCmds() {
    for _, item := range b.cmds {
        b.waitGroup.Add(1)         go func(cmd Cmd) {
            cmd.parse()
            err := cmd.doOps()             b.lock.Lock()
            b.errs = append(b.errs, err)
            b.lock.Unlock()             b.waitGroup.Done()
        }(item)
    }     b.waitGroup.Wait()
}

The final call looks like this:


var cmds []Cmd
cmd_a := Cmd{`file/test.png|thumbnail/x300|blur/20x8`, "test_a.png", nil, nil}
cmd_b := Cmd{`file/test.png|thumbnail/500x1000|blur/20x108`, "test_b.png", nil, nil}
cmd_c := Cmd{`file/test.png|thumbnail/300x300`, "test_c.png", nil, nil} cmds = append(cmds, cmd_a)
cmds = append(cmds, cmd_b)
cmds = append(cmds, cmd_c) bench := BenchCmd{
    cmds:      cmds,
    waitGroup: sync.WaitGroup{},
    lock:      sync.Mutex{},
} bench.doCmds() fmt.Println(bench.errs)

This is only a preliminary experiment, and the thinking is not comprehensive enough, and it only imitates API. qiniu should not do so, and the coupling is lower. Perhaps each Cmd has its own cluster to deal with, so the library pipe cannot be solved for the time being, but the current limitation is that every Cmd must be in a process.


Related articles: