golang's method for downloading multiple files concurrently

  • 2020-07-21 08:29:30
  • OfStack

The background that

Suppose you have a distributed file system from which you need to concurrently download a part of the files to your local machine.

Some nodes of the file system, ip, and the list of files to be downloaded, are known, and the download address can be spliced with this information.

The list of nodes ip is saved in ES10en_node.txt and the fileID to be downloaded is saved in ES14en_fileID.txt.

Code sample


package main

import (
  "bufio"
  "flag"
  "fmt"
  "io"
  "math/rand"
  "net/http"
  "os"
  "time"
)

var (
  clustername = flag.String("clustername", "c1", "download clustername")
)

//  Read the file contents line by line 
func ReadLines(fpath string) []string {
  fd, err := os.Open(fpath)
  if err != nil {
    panic(err)
  }
  defer fd.Close()

  var lines []string
  scanner := bufio.NewScanner(fd)
  for scanner.Scan() {
    lines = append(lines, scanner.Text())
  }
  if err := scanner.Err(); err != nil {
    fmt.Fprintln(os.Stderr, err)
  }

  return lines
}

//  Implement a single file download 
func Download(clustername string, node string, fileID string) string {
  nt := time.Now().Format("2006-01-02 15:04:05")
  fmt.Printf("[%s]To download %s\n", nt, fileID)

  url := fmt.Sprintf("http://%s/file/%s", node, fileID)
  fpath := fmt.Sprintf("/yourpath/download/%s_%s", clustername, fileID)
  newFile, err := os.Create(fpath)
  if err != nil {
    fmt.Println(err.Error())
    return "process failed for " + fileID
  }
  defer newFile.Close()

  client := http.Client{Timeout: 900 * time.Second}
  resp, err := client.Get(url)
  defer resp.Body.Close()

  _, err = io.Copy(newFile, resp.Body)
  if err != nil {
    fmt.Println(err.Error())
  }
  return fileID
}

func main() {
  flag.Parse()

  //  Read nodes from a file ip The list of 
  nodelist := ReadLines(fmt.Sprintf("%s_node.txt", *clustername))
  if len(nodelist) == 0 {
    return
  }

  //  Reads the file to be downloaded from the file ID The list of 
  fileIDlist := ReadLines(fmt.Sprintf("%s_fileID.txt", *clustername))
  if len(fileIDlist) == 0 {
    return
  }

  ch := make(chan string)

  //  each goroutine To deal with 1 The download of three files 
  r := rand.New(rand.NewSource(time.Now().UnixNano()))
  for _, fileID := range fileIDlist {
    node := nodelist[r.Intn(len(nodelist))]
    go func(node, fileID string) {
      ch <- Download(*clustername, node, fileID)
    }(node, fileID)
  }

  //  Wait for each file download to complete and check the timeout 
  timeout := time.After(900 * time.Second)
  for idx := 0; idx < len(fileIDlist); idx++ {
    select {
    case res := <-ch:
      nt := time.Now().Format("2006-01-02 15:04:05")
      fmt.Printf("[%s]Finish download %s\n", nt, res)
    case <-timeout:
      fmt.Println("Timeout...")
      break
    }
  }
}

summary

The default http Client is not used and the timeout is specified.

goroutine will be suspended when the system call is called while downloading the file.

After downloading the file, it will wake up the suspended goroutine, which will exit after executing the following code.

Global timeout control. The main thread exits after timeout.


Related articles: