You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

channeldump.go 3.5KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. package cmd
  2. import (
  3. "github.com/spf13/cobra"
  4. "os"
  5. "time"
  6. "bufio"
  7. "log"
  8. "github.com/terorie/yt-mango/api"
  9. "fmt"
  10. "github.com/terorie/yt-mango/common"
  11. "sync/atomic"
  12. "errors"
  13. )
  14. var channelDumpContext = struct{
  15. startTime time.Time
  16. printResults bool
  17. writer *bufio.Writer
  18. pagesDone uint64
  19. errorOccured int32 // Use atomic boolean here
  20. }{}
  21. var channelDumpCmd = cobra.Command{
  22. Use: "dumpurls <channel ID> [file]",
  23. Short: "Get all public video URLs from channel",
  24. Long: "Write all videos URLs of a channel to a file",
  25. Args: cobra.RangeArgs(1, 2),
  26. Run: func(cmd *cobra.Command, args []string) {
  27. printResults := false
  28. fileName := ""
  29. channelID := args[0]
  30. if len(args) != 2 {
  31. printResults = true
  32. } else {
  33. fileName = args[1]
  34. }
  35. channelDumpContext.printResults = printResults
  36. channelID, err := api.GetChannelID(channelID)
  37. if err != nil {
  38. log.Print(err)
  39. os.Exit(1)
  40. }
  41. log.Printf("Starting work on channel ID \"%s\".", channelID)
  42. channelDumpContext.startTime = time.Now()
  43. var flags int
  44. if force {
  45. flags = os.O_WRONLY | os.O_CREATE | os.O_TRUNC
  46. } else {
  47. flags = os.O_WRONLY | os.O_CREATE | os.O_EXCL
  48. }
  49. var file *os.File
  50. if !printResults {
  51. var err error
  52. file, err = os.OpenFile(fileName, flags, 0640)
  53. if err != nil {
  54. log.Fatal(err)
  55. os.Exit(1)
  56. }
  57. defer file.Close()
  58. writer := bufio.NewWriter(file)
  59. defer writer.Flush()
  60. channelDumpContext.writer = writer
  61. }
  62. results := make(chan common.JobResult)
  63. terminateSub := make(chan bool)
  64. // TODO Clean up
  65. go processResults(results, terminateSub)
  66. page := offset
  67. for {
  68. // Terminate if error detected
  69. if atomic.LoadInt32(&channelDumpContext.errorOccured) != 0 {
  70. goto terminate
  71. }
  72. // Send new requests
  73. req := api.Main.GrabChannelPage(channelID, page)
  74. common.DoAsyncHTTP(req, results, page)
  75. page++
  76. }
  77. terminate:
  78. // Requests sent, wait for remaining requests to finish
  79. for {
  80. done := uint64(offset) + atomic.LoadUint64(&channelDumpContext.pagesDone)
  81. target := uint64(page)
  82. if done >= target { break }
  83. // TODO use semaphore
  84. time.Sleep(time.Millisecond)
  85. }
  86. // TODO Don't ignore pending results
  87. duration := time.Since(channelDumpContext.startTime)
  88. log.Printf("Done in %s.", duration.String())
  89. terminateSub <- true
  90. },
  91. }
  92. // TODO combine channels into one
  93. func processResults(results chan common.JobResult, terminateSub chan bool) {
  94. totalURLs := 0
  95. for {
  96. select {
  97. case <-terminateSub:
  98. log.Printf("Got %d URLs", totalURLs)
  99. os.Exit(0)
  100. return
  101. case res := <-results:
  102. var err error
  103. var channelURLs []string
  104. page := res.ReqData.(uint)
  105. if res.Err != nil {
  106. err = res.Err
  107. goto endError
  108. }
  109. channelURLs, err = api.Main.ParseChannelVideoURLs(res.Res)
  110. if err != nil { goto endError }
  111. if len(channelURLs) == 0 {
  112. err = errors.New("returned no videos")
  113. goto endError
  114. }
  115. totalURLs += len(channelURLs)
  116. log.Printf("Received page %d: %d videos.", page, len(channelURLs))
  117. if channelDumpContext.printResults {
  118. for _, _url := range channelURLs {
  119. fmt.Println(_url)
  120. }
  121. } else {
  122. for _, _url := range channelURLs {
  123. _, err := channelDumpContext.writer.WriteString(_url + "\n")
  124. if err != nil { panic(err) }
  125. }
  126. }
  127. // Increment done pages count
  128. atomic.AddUint64(&channelDumpContext.pagesDone, 1)
  129. continue
  130. endError:
  131. atomic.AddUint64(&channelDumpContext.pagesDone, 1)
  132. atomic.StoreInt32(&channelDumpContext.errorOccured, 1)
  133. log.Printf("Error at page %d: %v", page, err)
  134. }
  135. }
  136. }