Browse Source

Multithreading proof-of-concept

master
terorie 4 years ago
parent
commit
593c61acca
  1. 32
      api/api.go
  2. 25
      apijson/get.go
  3. 50
      apijson/grab.go
  4. 19
      apijson/parsechannel.go
  5. 15
      apijson/parsevideo.go
  6. 4
      cmd/channel.go
  7. 104
      cmd/channeldump.go
  8. 16
      main.go

32
api/api.go

@ -2,29 +2,37 @@ package api @@ -2,29 +2,37 @@ package api
import (
"github.com/terorie/yt-mango/data"
"github.com/terorie/yt-mango/apiclassic"
"net/http"
"github.com/terorie/yt-mango/apijson"
)
type API struct {
GetVideo func(*data.Video) error
GetVideoSubtitleList func(*data.Video) error
GetChannel func(*data.Channel) error
GetChannelVideoURLs func(channelID string, page uint) ([]string, error)
GrabVideo func(videoID string) *http.Request
ParseVideo func(*data.Video, *http.Response) error
GrabVideoSubtitleList func(videoID string) *http.Request
ParseVideoSubtitleList func(*data.Video, *http.Response) error
GrabChannel func(channelID string) *http.Request
ParseChannel func(*data.Channel, *http.Response) error
GrabChannelPage func(channelID string, page uint) *http.Request
ParseChannelVideoURLs func(*http.Response) ([]string, error)
}
// TODO Fallback option
var DefaultAPI *API = nil
var Main *API = nil
// TODO: Remove when everything is implemented
var TempAPI = API{
GetVideo: apiclassic.GetVideo,
GetVideoSubtitleList: apiclassic.GetVideoSubtitleList,
GetChannel: apiclassic.GetChannel,
GetChannelVideoURLs: apijson.GetChannelVideoURLs,
GrabVideo: apijson.GrabVideo,
ParseVideo: apijson.ParseVideo,
GrabChannelPage: apijson.GrabChannelPage,
ParseChannelVideoURLs: apijson.ParseChannelVideoURLs,
}
var ClassicAPI = API{
/*var ClassicAPI = API{
GetVideo: apiclassic.GetVideo,
GetVideoSubtitleList: apiclassic.GetVideoSubtitleList,
GetChannel: apiclassic.GetChannel,
@ -36,4 +44,4 @@ var JsonAPI = API{ @@ -36,4 +44,4 @@ var JsonAPI = API{
GetVideoSubtitleList: apiclassic.GetVideoSubtitleList,
GetChannel: apijson.GetChannel,
GetChannelVideoURLs: apijson.GetChannelVideoURLs,
}
}*/

25
apijson/get.go

@ -1,25 +0,0 @@ @@ -1,25 +0,0 @@
package apijson
import (
"github.com/terorie/yt-mango/data"
"errors"
)
func GetVideo(v *data.Video) (err error) {
jsn, err := GrabVideo(v)
if err != nil { return }
err = ParseVideo(v, jsn)
if err != nil { return }
return
}
func GetChannel(c *data.Channel) error {
return errors.New("not implemented")
}
func GetChannelVideoURLs(channelID string, page uint) (urls []string, err error) {
jsn, err := GrabChannelPage(channelID, page)
if err != nil { return }
urls, err = ParseChannelPageLinks(jsn)
return
}

50
apijson/grab.go

@ -1,68 +1,32 @@ @@ -1,68 +1,32 @@
package apijson
import (
"github.com/terorie/yt-mango/data"
"net/http"
"github.com/terorie/yt-mango/common"
"github.com/valyala/fastjson"
"io/ioutil"
"errors"
)
const videoURL = "https://www.youtube.com/watch?pbj=1&v="
const channelURL = "https://www.youtube.com/browse_ajax?ctoken="
func GrabVideo(v *data.Video) (root *fastjson.Value, err error) {
func GrabVideo(videoID string) *http.Request {
// Prepare request
req, err := http.NewRequest("GET", videoURL+ v.ID, nil)
if err != nil { return nil, err }
req, err := http.NewRequest("GET", videoURL + videoID, nil)
if err != nil { panic(err) }
setHeaders(&req.Header)
// Send request
res, err := common.Client.Do(req)
if err != nil { return }
// Download response
body, err := ioutil.ReadAll(res.Body)
if err != nil { return }
// Parse JSON
var p fastjson.Parser
root, err = p.ParseBytes(body)
if err != nil { return }
return
return req
}
func GrabChannelPage(channelID string, page uint) (root *fastjson.Value, err error) {
func GrabChannelPage(channelID string, page uint) *http.Request {
// Generate page URL
token := GenChannelPageToken(channelID, uint64(page))
url := channelURL + token
// Prepare request
req, err := http.NewRequest("GET", url, nil)
if err != nil { return nil, err }
if err != nil { panic(err) }
setHeaders(&req.Header)
// Send request
res, err := common.Client.Do(req)
if err != nil { return nil, err }
if res.StatusCode == 500 {
defer res.Body.Close()
buf, _ := ioutil.ReadAll(res.Body)
println(string(buf))
}
if res.StatusCode != 200 { return nil, errors.New("HTTP failure") }
// Download response
defer res.Body.Close()
buf, err := ioutil.ReadAll(res.Body)
if err != nil { return nil, err }
// Parse JSON
var p fastjson.Parser
root, err = p.ParseBytes(buf)
return
return req
}
func setHeaders(h *http.Header) {

19
apijson/parsechannel.go

@ -4,12 +4,29 @@ import ( @@ -4,12 +4,29 @@ import (
"github.com/valyala/fastjson"
"errors"
"strings"
"net/http"
"io/ioutil"
"fmt"
)
var MissingData = errors.New("missing data")
var ServerError = errors.New("server error")
func ParseChannelPageLinks(rootObj *fastjson.Value) ([]string, error) {
func ParseChannelVideoURLs(res *http.Response) ([]string, error) {
if res.StatusCode != 200 {
return nil, fmt.Errorf("HTTP error: %s", res.Request.URL.String())
}
// Download response
defer res.Body.Close()
buf, err := ioutil.ReadAll(res.Body)
if err != nil { return nil, err }
// Parse JSON
var p fastjson.Parser
rootObj, err := p.ParseBytes(buf)
if err != nil { return nil, err }
// Root as array
root, err := rootObj.Array()
if err != nil { return nil, err }

15
apijson/parsevideo.go

@ -4,12 +4,25 @@ import ( @@ -4,12 +4,25 @@ import (
"github.com/valyala/fastjson"
"github.com/terorie/yt-mango/data"
"errors"
"io/ioutil"
"net/http"
)
var missingData = errors.New("missing data")
var unexpectedType = errors.New("unexpected type")
func ParseVideo(v *data.Video, root *fastjson.Value) error {
func ParseVideo(v *data.Video, res *http.Response) error {
defer res.Body.Close()
// Download response
body, err := ioutil.ReadAll(res.Body)
if err != nil { return err }
// Parse JSON
var p fastjson.Parser
root, err := p.ParseBytes(body)
if err != nil { return err }
rootArray := root.GetArray()
if rootArray == nil { return unexpectedType }

4
cmd/channel.go

@ -6,7 +6,7 @@ import ( @@ -6,7 +6,7 @@ import (
)
var force bool
var offset uint32
var offset uint
var Channel = cobra.Command{
Use: "channel",
@ -17,6 +17,6 @@ var matchChannelID = regexp.MustCompile("^([\\w\\-]|(%3[dD]))+$") @@ -17,6 +17,6 @@ var matchChannelID = regexp.MustCompile("^([\\w\\-]|(%3[dD]))+$")
func init() {
channelDumpCmd.Flags().BoolVarP(&force, "force", "f", false, "Overwrite the output file if it already exists")
channelDumpCmd.Flags().Uint32Var(&offset, "page-offset", 1, "Start getting videos at this page. (A page is usually 30 videos)")
channelDumpCmd.Flags().UintVar(&offset, "page-offset", 1, "Start getting videos at this page. (A page is usually 30 videos)")
Channel.AddCommand(&channelDumpCmd)
}

104
cmd/channeldump.go

@ -10,8 +10,19 @@ import ( @@ -10,8 +10,19 @@ import (
"log"
"github.com/terorie/yt-mango/api"
"fmt"
"github.com/terorie/yt-mango/common"
"sync/atomic"
"errors"
)
var channelDumpContext = struct{
startTime time.Time
printResults bool
writer *bufio.Writer
pagesDone uint64
errorOccured int32 // Use atomic boolean here
}{}
var channelDumpCmd = cobra.Command{
Use: "dumpurls <channel ID> [file]",
Short: "Get all public video URLs from channel",
@ -26,6 +37,7 @@ var channelDumpCmd = cobra.Command{ @@ -26,6 +37,7 @@ var channelDumpCmd = cobra.Command{
} else {
fileName = args[1]
}
channelDumpContext.printResults = printResults
if !matchChannelID.MatchString(channelID) {
// Check if youtube.com domain
@ -59,7 +71,7 @@ var channelDumpCmd = cobra.Command{ @@ -59,7 +71,7 @@ var channelDumpCmd = cobra.Command{
}
log.Printf("Starting work on channel ID \"%s\".", channelID)
startTime := time.Now()
channelDumpContext.startTime = time.Now()
var flags int
if force {
@ -69,7 +81,6 @@ var channelDumpCmd = cobra.Command{ @@ -69,7 +81,6 @@ var channelDumpCmd = cobra.Command{
}
var file *os.File
var writer *bufio.Writer
if !printResults {
var err error
@ -80,37 +91,94 @@ var channelDumpCmd = cobra.Command{ @@ -80,37 +91,94 @@ var channelDumpCmd = cobra.Command{
}
defer file.Close()
writer = bufio.NewWriter(file)
writer := bufio.NewWriter(file)
defer writer.Flush()
channelDumpContext.writer = writer
}
totalURLs := 0
for i := offset; true; i++ {
channelURLs, err := api.DefaultAPI.GetChannelVideoURLs(channelID, uint(i))
if err != nil {
log.Printf("Aborting on error: %v.", err)
break
results := make(chan common.JobResult)
terminateSub := make(chan bool)
// TODO Clean up
go processResults(results, terminateSub)
page := offset
for {
// Terminate if error detected
if atomic.LoadInt32(&channelDumpContext.errorOccured) != 0 {
goto terminate
}
// Send new requests
req := api.Main.GrabChannelPage(channelID, page)
common.DoAsyncHTTP(req, results, page)
page++
}
terminate:
log.Printf("&")
// Requests sent, wait for remaining requests to finish
for {
done := atomic.LoadUint64(&channelDumpContext.pagesDone)
// Page starts at 1
target := uint64(page) - 1
if done >= target { break }
// TODO use semaphore
time.Sleep(time.Millisecond)
}
// TODO Don't ignore pending results
duration := time.Since(channelDumpContext.startTime)
log.Printf("Done in %s.", duration.String())
terminateSub <- true
},
}
// TODO combine channels into one
func processResults(results chan common.JobResult, terminateSub chan bool) {
totalURLs := 0
for {
select {
case <-terminateSub:
log.Printf("Got %d URLs", totalURLs)
os.Exit(0)
return
case res := <-results:
var err error
var channelURLs []string
page := res.ReqData.(uint)
if res.Err != nil {
err = res.Err
goto endError
}
channelURLs, err = api.Main.ParseChannelVideoURLs(res.Res)
if err != nil { goto endError }
if len(channelURLs) == 0 {
log.Printf("Page %d returned no videos.", i)
break
err = errors.New("returned no videos")
goto endError
}
totalURLs += len(channelURLs)
log.Printf("Received page %d: %d videos.", i, len(channelURLs))
log.Printf("Received page %d: %d videos.", page, len(channelURLs))
if printResults {
if channelDumpContext.printResults {
for _, _url := range channelURLs {
fmt.Println(_url)
}
} else {
for _, _url := range channelURLs {
_, err := writer.WriteString(_url + "\n")
_, err := channelDumpContext.writer.WriteString(_url + "\n")
if err != nil { panic(err) }
}
}
// Increment done pages count
atomic.AddUint64(&channelDumpContext.pagesDone, 1)
continue
endError:
atomic.AddUint64(&channelDumpContext.pagesDone, 1)
atomic.StoreInt32(&channelDumpContext.errorOccured, 1)
log.Printf("Error at page %d: %v", page, err)
}
duration := time.Since(startTime)
log.Printf("Got %d URLs in %s.", totalURLs, duration.String())
},
}
}

16
main.go

@ -10,20 +10,18 @@ import ( @@ -10,20 +10,18 @@ import (
"github.com/terorie/yt-mango/cmd"
"log"
"github.com/terorie/yt-mango/api"
"github.com/terorie/yt-mango/common"
)
const Version = "v0.1 -- dev"
func printVersion(_ *cobra.Command, _ []string) {
fmt.Println("YT-Mango archiver", Version)
}
func main() {
// All diagnostics (logging) should go to stderr
log.SetOutput(os.Stderr)
var printVersion bool
var forceAPI string
var concurrentRequests uint
rootCmd := cobra.Command{
Use: "yt-mango",
@ -37,10 +35,12 @@ func main() { @@ -37,10 +35,12 @@ func main() {
}
},
PersistentPreRun: func(cmd *cobra.Command, args []string) {
common.InitAsyncHTTP(concurrentRequests)
switch forceAPI {
case "": api.DefaultAPI = &api.TempAPI
case "classic": api.DefaultAPI = &api.ClassicAPI
case "json": api.DefaultAPI = &api.JsonAPI
case "": api.Main = &api.TempAPI
//case "classic": api.Main = &api.ClassicAPI
//case "json": api.Main = &api.JsonAPI
default:
fmt.Fprintln(os.Stderr, "Invalid API specified.\n" +
"Valid options are: \"classic\" and \"json\"")
@ -54,6 +54,8 @@ func main() { @@ -54,6 +54,8 @@ func main() {
rootCmd.Flags().StringVarP(&forceAPI, "api", "a", "",
"Use the specified API for all calls.\n" +
"Possible options: \"classic\" and \"json\"")
rootCmd.PersistentFlags().UintVarP(&concurrentRequests, "concurrency", "c", 4,
"Number of maximum concurrent HTTP requests")
rootCmd.AddCommand(&cmd.Channel)
rootCmd.AddCommand(&cmd.Video)

Loading…
Cancel
Save