Browse Source

Clean up API and fix channel URL dump

terorie 9 months ago
parent
commit
43063ec2fe
10 changed files with 212 additions and 150 deletions
  1. 1
    1
      api/escape.go
  2. 1
    1
      api/markdown.go
  3. 3
    3
      apiclassic/grab.go
  4. 3
    3
      apiclassic/parsedescription.go
  5. 0
    2
      cmd/channel.go
  6. 125
    102
      cmd/channeldump.go
  7. 0
    35
      common/httpasync.go
  8. 2
    2
      main.go
  9. 76
    0
      net/asynchttp.go
  10. 1
    1
      net/transport.go

common/escape.go → api/escape.go View File

@@ -1,4 +1,4 @@
1
-package common
1
+package api
2 2
 
3 3
 import "bytes"
4 4
 

common/markdown.go → api/markdown.go View File

@@ -1,4 +1,4 @@
1
-package common
1
+package api
2 2
 
3 3
 var MarkdownTextEscape EscapeMap
4 4
 var MarkdownLinkEscape EscapeMap

+ 3
- 3
apiclassic/grab.go View File

@@ -5,7 +5,7 @@ import (
5 5
 	"errors"
6 6
 	"encoding/xml"
7 7
 	"github.com/PuerkitoBio/goquery"
8
-	"github.com/terorie/yt-mango/common"
8
+	"github.com/terorie/yt-mango/net"
9 9
 )
10 10
 
11 11
 const mainURL = "https://www.youtube.com/watch?has_verified=1&bpctr=6969696969&v="
@@ -17,7 +17,7 @@ func GrabVideo(videoID string) (doc *goquery.Document, err error) {
17 17
 	if err != nil { return }
18 18
 	setHeaders(&req.Header)
19 19
 
20
-	res, err := common.Client.Do(req)
20
+	res, err := net.Client.Do(req)
21 21
 	if err != nil { return }
22 22
 	if res.StatusCode != 200 { return nil, errors.New("HTTP failure") }
23 23
 
@@ -34,7 +34,7 @@ func GrabSubtitleList(videoID string) (tracks *XMLSubTrackList, err error) {
34 34
 	if err != nil { return }
35 35
 	setHeaders(&req.Header)
36 36
 
37
-	res, err := common.Client.Do(req)
37
+	res, err := net.Client.Do(req)
38 38
 	if err != nil { return }
39 39
 	if res.StatusCode != 200 { return nil, errors.New("HTTP failure") }
40 40
 

+ 3
- 3
apiclassic/parsedescription.go View File

@@ -4,7 +4,7 @@ import (
4 4
 	"errors"
5 5
 	"golang.org/x/net/html"
6 6
 	"bytes"
7
-	"github.com/terorie/yt-mango/common"
7
+	"github.com/terorie/yt-mango/net"
8 8
 	"strings"
9 9
 )
10 10
 
@@ -24,7 +24,7 @@ func (p *parseInfo) parseDescription() error {
24 24
 		case html.TextNode:
25 25
 			// FIXME: "&amp;lt;" gets parsed to => "<"
26 26
 			// Write text to buffer, escaping markdown
27
-			err := common.MarkdownTextEscape.ToBuffer(c.Data, &buffer)
27
+			err := net.MarkdownTextEscape.ToBuffer(c.Data, &buffer)
28 28
 			if err != nil { return err }
29 29
 		case html.ElementNode:
30 30
 			switch c.Data {
@@ -70,7 +70,7 @@ func parseLink(c *html.Node, dest *bytes.Buffer) error {
70 70
 				link, err := decodeLink(attr.Val)
71 71
 				if err != nil { return err }
72 72
 				// Escape to markdown
73
-				link, err = common.MarkdownLinkEscape.ToString(link)
73
+				link, err = net.MarkdownLinkEscape.ToString(link)
74 74
 				if err != nil { return err }
75 75
 				// Write to buffer
76 76
 				dest.WriteString(fmt.Sprintf("[%s](%s)\n", text, link))

+ 0
- 2
cmd/channel.go View File

@@ -5,7 +5,6 @@ import (
5 5
 )
6 6
 
7 7
 var force bool
8
-var offset uint
9 8
 
10 9
 var Channel = cobra.Command{
11 10
 	Use: "channel",
@@ -14,6 +13,5 @@ var Channel = cobra.Command{
14 13
 
15 14
 func init() {
16 15
 	channelDumpCmd.Flags().BoolVarP(&force, "force", "f", false, "Overwrite the output file if it already exists")
17
-	channelDumpCmd.Flags().UintVar(&offset, "page-offset", 1, "Start getting videos at this page. (A page is usually 30 videos)")
18 16
 	Channel.AddCommand(&channelDumpCmd)
19 17
 }

+ 125
- 102
cmd/channeldump.go View File

@@ -8,148 +8,171 @@ import (
8 8
 	"log"
9 9
 	"github.com/terorie/yt-mango/api"
10 10
 	"fmt"
11
-	"github.com/terorie/yt-mango/common"
11
+	"github.com/terorie/yt-mango/net"
12 12
 	"sync/atomic"
13 13
 	"errors"
14
+	"sync"
14 15
 )
15 16
 
16
-var channelDumpContext = struct{
17
+var offset uint
18
+
19
+func init() {
20
+	channelDumpCmd.Flags().UintVar(&offset, "page-offset", 1, "Start getting videos at this page. (A page is usually 30 videos)")
21
+}
22
+
23
+// The shared context of the request and response threads
24
+var channelDumpContext = struct {
17 25
 	startTime time.Time
18 26
 	printResults bool
19 27
 	writer *bufio.Writer
20
-	pagesDone uint64
21
-	errorOccured int32 // Use atomic boolean here
28
+	// Number of pages that have been
29
+	// requested but not yet received.
30
+	// Additional +1 is added if additional
31
+	// are planned to be requested
32
+	pagesToReceive sync.WaitGroup
33
+	// If set to non-zero, an error was received
34
+	errorOccurred int32
22 35
 }{}
23 36
 
37
+// The channel dump route lists
24 38
 var channelDumpCmd = cobra.Command{
25 39
 	Use: "dumpurls <channel ID> [file]",
26 40
 	Short: "Get all public video URLs from channel",
27 41
 	Long: "Write all videos URLs of a channel to a file",
28 42
 	Args: cobra.RangeArgs(1, 2),
29
-	Run: func(cmd *cobra.Command, args []string) {
30
-		printResults := false
31
-		fileName := ""
32
-		channelID := args[0]
33
-		if len(args) != 2 {
34
-			printResults = true
35
-		} else {
36
-			fileName = args[1]
37
-		}
38
-		channelDumpContext.printResults = printResults
43
+	Run: doChannelDump,
44
+}
39 45
 
40
-		channelID, err := api.GetChannelID(channelID)
41
-		if err != nil {
42
-			log.Print(err)
43
-			os.Exit(1)
44
-		}
46
+func doChannelDump(_ *cobra.Command, args []string) {
47
+	if offset == 0 { offset = 1 }
45 48
 
46
-		log.Printf("Starting work on channel ID \"%s\".", channelID)
47
-		channelDumpContext.startTime = time.Now()
49
+	printResults := false
50
+	fileName := ""
51
+	channelID := args[0]
52
+	if len(args) != 2 {
53
+		printResults = true
54
+	} else {
55
+		fileName = args[1]
56
+	}
57
+	channelDumpContext.printResults = printResults
48 58
 
49
-		var flags int
50
-		if force {
51
-			flags = os.O_WRONLY | os.O_CREATE | os.O_TRUNC
52
-		} else {
53
-			flags = os.O_WRONLY | os.O_CREATE | os.O_EXCL
54
-		}
59
+	channelID, err := api.GetChannelID(channelID)
60
+	if err != nil {
61
+		log.Print(err)
62
+		os.Exit(1)
63
+	}
55 64
 
56
-		var file *os.File
65
+	log.Printf("Starting work on channel ID \"%s\".", channelID)
66
+	channelDumpContext.startTime = time.Now()
57 67
 
58
-		if !printResults {
59
-			var err error
60
-			file, err = os.OpenFile(fileName, flags, 0640)
61
-			if err != nil {
62
-				log.Fatal(err)
63
-				os.Exit(1)
64
-			}
65
-			defer file.Close()
68
+	var flags int
69
+	if force {
70
+		flags = os.O_WRONLY | os.O_CREATE | os.O_TRUNC
71
+	} else {
72
+		flags = os.O_WRONLY | os.O_CREATE | os.O_EXCL
73
+	}
74
+
75
+	var file *os.File
66 76
 
67
-			writer := bufio.NewWriter(file)
68
-			defer writer.Flush()
69
-			channelDumpContext.writer = writer
77
+	if !printResults {
78
+		var err error
79
+		file, err = os.OpenFile(fileName, flags, 0640)
80
+		if err != nil {
81
+			log.Fatal(err)
82
+			os.Exit(1)
70 83
 		}
84
+		defer file.Close()
71 85
 
72
-		results := make(chan common.JobResult)
73
-		terminateSub := make(chan bool)
86
+		writer := bufio.NewWriter(file)
87
+		defer writer.Flush()
88
+		channelDumpContext.writer = writer
89
+	}
74 90
 
75
-		// TODO Clean up
76
-		go processResults(results, terminateSub)
91
+	results := make(chan net.JobResult)
92
+	terminateSub := make(chan bool)
77 93
 
78
-		page := offset
79
-		for {
80
-			// Terminate if error detected
81
-			if atomic.LoadInt32(&channelDumpContext.errorOccured) != 0 {
82
-				goto terminate
83
-			}
84
-			// Send new requests
85
-			req := api.Main.GrabChannelPage(channelID, page)
86
-			common.DoAsyncHTTP(req, results, page)
94
+	// TODO Clean up
95
+	go channelDumpResults(results, terminateSub)
87 96
 
88
-			page++
97
+	page := offset
98
+	for {
99
+		// Terminate if error detected
100
+		if atomic.LoadInt32(&channelDumpContext.errorOccurred) != 0 {
101
+			goto terminate
89 102
 		}
90
-		terminate:
103
+		// Send new requests
104
+		req := api.Main.GrabChannelPage(channelID, page)
105
+		channelDumpContext.pagesToReceive.Add(1)
106
+		net.DoAsyncHTTP(req, results, page)
91 107
 
92
-		// Requests sent, wait for remaining requests to finish
93
-		for {
94
-			done := uint64(offset) + atomic.LoadUint64(&channelDumpContext.pagesDone)
95
-			target := uint64(page)
96
-			if done >= target { break }
97
-
98
-			// TODO use semaphore
99
-			time.Sleep(time.Millisecond)
100
-		}
108
+		page++
109
+	}
110
+	terminate:
101 111
 
102
-		// TODO Don't ignore pending results
103
-		duration := time.Since(channelDumpContext.startTime)
104
-		log.Printf("Done in %s.", duration.String())
112
+	// Requests sent, wait for remaining requests to finish
113
+	channelDumpContext.pagesToReceive.Wait()
105 114
 
106
-		terminateSub <- true
107
-	},
115
+	terminateSub <- true
108 116
 }
109 117
 
110
-// TODO combine channels into one
111
-func processResults(results chan common.JobResult, terminateSub chan bool) {
118
+	// Helper goroutine that processes HTTP results.
119
+// HTTP results are received on "results".
120
+// The routine exits if a value on "terminateSub" is received.
121
+// For every incoming result (error or response),
122
+// the "pagesToReceive" counter is decreased.
123
+// If an error is received, the "errorOccurred" flag is set.
124
+func channelDumpResults(results chan net.JobResult, terminateSub chan bool) {
112 125
 	totalURLs := 0
113 126
 	for {
114 127
 		select {
115 128
 		case <-terminateSub:
116
-			log.Printf("Got %d URLs", totalURLs)
129
+			duration := time.Since(channelDumpContext.startTime)
130
+			log.Printf("Got %d URLs in %s.", totalURLs, duration.String())
117 131
 			os.Exit(0)
118 132
 			return
119 133
 		case res := <-results:
120
-			var err error
121
-			var channelURLs []string
122
-			page := res.ReqData.(uint)
123
-			if res.Err != nil {
124
-				err = res.Err
125
-				goto endError
126
-			}
127
-			channelURLs, err = api.Main.ParseChannelVideoURLs(res.Res)
128
-			if err != nil { goto endError }
129
-			if len(channelURLs) == 0 {
130
-				err = errors.New("returned no videos")
131
-				goto endError
132
-			}
133
-			totalURLs += len(channelURLs)
134
-			log.Printf("Received page %d: %d videos.", page, len(channelURLs))
135
-
136
-			if channelDumpContext.printResults {
137
-				for _, _url := range channelURLs {
138
-					fmt.Println(_url)
139
-				}
134
+			page, numURLs, err := channelDumpResult(&res)
135
+			// Mark page as processed
136
+			channelDumpContext.pagesToReceive.Done()
137
+			// Report back error
138
+			if err != nil {
139
+				atomic.StoreInt32(&channelDumpContext.errorOccurred, 1)
140
+				log.Printf("Error at page %d: %v", page, err)
140 141
 			} else {
141
-				for _, _url := range channelURLs {
142
-					_, err := channelDumpContext.writer.WriteString(_url + "\n")
143
-					if err != nil { panic(err) }
144
-				}
142
+				totalURLs += numURLs
145 143
 			}
146
-			// Increment done pages count
147
-			atomic.AddUint64(&channelDumpContext.pagesDone, 1)
148
-			continue
149
-			endError:
150
-				atomic.AddUint64(&channelDumpContext.pagesDone, 1)
151
-				atomic.StoreInt32(&channelDumpContext.errorOccured, 1)
152
-				log.Printf("Error at page %d: %v", page, err)
153 144
 		}
154 145
 	}
155 146
 }
147
+
148
+// Processes a HTTP result
149
+func channelDumpResult(res *net.JobResult) (page uint, numURLs int, err error) {
150
+	var channelURLs []string
151
+
152
+	// Extra data is page number
153
+	page = res.ReqData.(uint)
154
+	// Abort if request failed
155
+	if res.Err != nil { return page, 0, res.Err }
156
+
157
+	// Parse response
158
+	channelURLs, err = api.Main.ParseChannelVideoURLs(res.Res)
159
+	if err != nil { return }
160
+	numURLs = len(channelURLs)
161
+	if numURLs == 0 { return page, 0, errors.New("returned no videos") }
162
+
163
+	// Print results
164
+	log.Printf("Received page %d: %d videos.", page, numURLs)
165
+
166
+	if channelDumpContext.printResults {
167
+		for _, _url := range channelURLs {
168
+			fmt.Println(_url)
169
+		}
170
+	} else {
171
+		for _, _url := range channelURLs {
172
+			_, err := channelDumpContext.writer.WriteString(_url + "\n")
173
+			if err != nil { panic(err) }
174
+		}
175
+	}
176
+
177
+	return
178
+}

+ 0
- 35
common/httpasync.go View File

@@ -1,35 +0,0 @@
1
-package common
2
-
3
-import "net/http"
4
-
5
-type JobResult struct {
6
-	Res *http.Response
7
-	Err error
8
-	ReqData interface{} // job.data
9
-}
10
-
11
-type job struct {
12
-	req *http.Request
13
-	c chan JobResult
14
-	data interface{}
15
-}
16
-
17
-var jobs = make(chan job)
18
-
19
-func InitAsyncHTTP(nWorkers uint) {
20
-	for i := uint(0); i < nWorkers; i++ {
21
-		go asyncHTTPWorker()
22
-	}
23
-}
24
-
25
-func DoAsyncHTTP(r *http.Request, c chan JobResult, data interface{}) {
26
-	jobs <- job{r, c, data}
27
-}
28
-
29
-func asyncHTTPWorker() {
30
-	for {
31
-		job := <-jobs
32
-		res, err := Client.Do(job.req)
33
-		job.c <- JobResult{res, err, job.data}
34
-	}
35
-}

+ 2
- 2
main.go View File

@@ -10,7 +10,7 @@ import (
10 10
 	"github.com/terorie/yt-mango/cmd"
11 11
 	"log"
12 12
 	"github.com/terorie/yt-mango/api"
13
-	"github.com/terorie/yt-mango/common"
13
+	"github.com/terorie/yt-mango/net"
14 14
 )
15 15
 
16 16
 const Version = "v0.1 -- dev"
@@ -35,7 +35,7 @@ func main() {
35 35
 			}
36 36
 		},
37 37
 		PersistentPreRun: func(cmd *cobra.Command, args []string) {
38
-			common.InitAsyncHTTP(concurrentRequests)
38
+			net.MaxWorkers = uint32(concurrentRequests)
39 39
 
40 40
 			switch forceAPI {
41 41
 			case "": api.Main = &api.TempAPI

+ 76
- 0
net/asynchttp.go View File

@@ -0,0 +1,76 @@
1
+package net
2
+
3
+import (
4
+	"net/http"
5
+	"sync/atomic"
6
+	"time"
7
+)
8
+
9
+// Max number of HTTP workers
10
+var MaxWorkers uint32 = 4
11
+// Current number of HTTP workers
12
+// atomic variable, don't use directly
13
+var activeWorkers int32
14
+
15
+// Kill a worker routine if it
16
+// doesn't get any jobs after "timeOut"
17
+const timeOut = 10 * time.Second
18
+
19
+// Result of the HTTP request
20
+type JobResult struct {
21
+	// HTTP Response (can be nil)
22
+	Res *http.Response
23
+	// HTTP error (can be nil)
24
+	Err error
25
+	// data parameter from DoAsyncHTTP
26
+	ReqData interface{} // job.data
27
+}
28
+
29
+type job struct {
30
+	req *http.Request
31
+	c chan JobResult
32
+	data interface{}
33
+}
34
+
35
+// Job queue
36
+var jobs = make(chan job)
37
+
38
+// Enqueue a new HTTP request and send the result to "c" (send to "c" guaranteed)
39
+// Additional data like an ID can be passed in "data" to be returned with "c"
40
+func DoAsyncHTTP(r *http.Request, c chan JobResult, data interface{}) {
41
+	newJob := job{r, c, data}
42
+	select {
43
+		// Try to send to the channel and
44
+		// see if an idle worker picks the job up
45
+		case jobs <- newJob:
46
+			break
47
+
48
+		// Every routine is busy
49
+		default:
50
+			if atomic.LoadInt32(&activeWorkers) < int32(MaxWorkers) {
51
+				// Another thread is allowed to spawn
52
+				// TODO Race condition here: DoAsyncHTTP is not thread safe!
53
+				atomic.AddInt32(&activeWorkers, 1)
54
+				go asyncHTTPWorker()
55
+			}
56
+			// Block until another routine finishes
57
+			jobs <- newJob
58
+	}
59
+}
60
+
61
+// Routine that reads continually reads requests from "jobs"
62
+// and quits if it doesn't find any jobs for some time
63
+func asyncHTTPWorker() {
64
+	for {
65
+		select {
66
+			// Get a new job from the queue and process it
67
+			case job := <-jobs:
68
+				res, err := Client.Do(job.req)
69
+				job.c <- JobResult{res, err, job.data}
70
+			// Timeout, kill the routine
71
+			case <-time.After(timeOut):
72
+				atomic.AddInt32(&activeWorkers, -1)
73
+				return
74
+		}
75
+	}
76
+}

common/http.go → net/transport.go View File

@@ -1,4 +1,4 @@
1
-package common
1
+package net
2 2
 
3 3
 import "net/http"
4 4
 

Loading…
Cancel
Save