aboutsummaryrefslogtreecommitdiff
path: root/cmd/main.go
diff options
context:
space:
mode:
authorFelix Hanley <felix@userspace.com.au>2018-03-13 11:29:49 +0000
committerFelix Hanley <felix@userspace.com.au>2018-03-13 11:29:49 +0000
commit79c6a22d81e6ceadb23636e90e5d53f2a86e090d (patch)
tree33776c9bedc99ccd87c1c7061b6195500c05f54d /cmd/main.go
parenta67075e332458002ce4c56c0aa07e03807dc22ea (diff)
downloaddhtsearch-79c6a22d81e6ceadb23636e90e5d53f2a86e090d.tar.gz
dhtsearch-79c6a22d81e6ceadb23636e90e5d53f2a86e090d.tar.bz2
Fix tagging and announce callbacks
Diffstat (limited to 'cmd/main.go')
-rw-r--r--cmd/main.go165
1 files changed, 132 insertions, 33 deletions
diff --git a/cmd/main.go b/cmd/main.go
index 80ef9eb..3019ddf 100644
--- a/cmd/main.go
+++ b/cmd/main.go
@@ -4,9 +4,13 @@ import (
"flag"
"fmt"
"os"
+ "regexp"
+ "strings"
"time"
+ "unicode"
"github.com/felix/dhtsearch/bt"
+ "github.com/felix/dhtsearch/db"
"github.com/felix/dhtsearch/dht"
"github.com/felix/dhtsearch/models"
"github.com/felix/logger"
@@ -31,6 +35,15 @@ var (
pool chan chan models.Peer
torrents chan models.Torrent
btNodes int
+ tagREs map[string]*regexp.Regexp
+ skipTags string
+)
+
+// Store vars
+var (
+ dsn string
+ ihBlacklist map[string]bool
+ peerBlacklist map[string]bool
)
func main() {
@@ -40,6 +53,9 @@ func main() {
flag.IntVar(&dhtNodes, "dht-nodes", 1, "number of DHT nodes to start")
flag.IntVar(&btNodes, "bt-nodes", 3, "number of BT nodes to start")
+ flag.StringVar(&skipTags, "skip-tags", "xxx", "tags of torrents to skip")
+
+ flag.StringVar(&dsn, "dsn", "postgres://dhtsearch@localhost/dhtsearch", "database DSN")
flag.BoolVar(&showVersion, "v", false, "show version")
@@ -62,9 +78,25 @@ func main() {
log.Info("version", version)
log.Debug("debugging")
- go startDHTNodes(saveInfohash)
+ store, err := db.NewStore(dsn)
+ defer store.Close()
+ if err != nil {
+ log.Error("failed to connect store", "error", err)
+ os.Exit(1)
+ }
+
+ createTagRegexps()
+
+ ihBlacklist = make(map[string]bool)
+ peerBlacklist = make(map[string]bool)
+ // TODO read in existing blacklist
+ // TODO populate bloom filter
+
+ go startDHTNodes(store)
- go startBTWorkers(saveTorrent)
+ go startBTWorkers(store)
+
+ go processPendingPeers(store)
for {
select {
@@ -74,58 +106,97 @@ func main() {
}
}
-func startDHTNodes(f func(p models.Peer)) (nodes []*dht.Node, err error) {
- nodes = make([]*dht.Node, dhtNodes)
+func startDHTNodes(s models.PeerStore) {
+ log.Debug("starting dht nodes")
+ nodes := make([]*dht.Node, dhtNodes)
for i := 0; i < dhtNodes; i++ {
dht, err := dht.NewNode(
dht.SetLogger(log.Named("dht")),
dht.SetPort(port+i),
- dht.SetOnAnnouncePeer(f),
dht.SetIPv6(ipv6),
+ dht.SetOnAnnouncePeer(func(p models.Peer) {
+ if black := ihBlacklist[p.Infohash.String()]; black {
+ log.Debug("ignoring blacklisted infohash", "peer", p)
+ return
+ }
+ if black := peerBlacklist[p.Addr.String()]; black {
+ log.Debug("ignoring blacklisted peer", "peer", p)
+ return
+ }
+ log.Debug("peer announce", "peer", p)
+ err := s.SavePeer(&p)
+ if err != nil {
+ log.Error("failed to save peer", "error", err)
+ }
+ }),
)
if err != nil {
log.Error("failed to create node", "error", err)
- return nodes, err
+ //return nodes, err
}
go dht.Run()
nodes[i] = dht
}
- return nodes, err
+ //return nodes, err
}
-func saveInfohash(p models.Peer) {
- //log.Info("announce", "peer", p)
- // Blocks
- w := <-pool
- w <- p
- return
+func processPendingPeers(s models.InfohashStore) {
+ log.Debug("processing pending peers")
+ for {
+ peers, err := s.PendingInfohashes(1)
+ if err != nil {
+ log.Debug("failed to get pending peer", "error", err)
+ time.Sleep(time.Second * 1)
+ continue
+ }
+ for _, p := range peers {
+ //log.Debug("pending peer retrieved", "peer", *p)
+ select {
+ case w := <-pool:
+ //log.Debug("assigning peer to bt worker")
+ w <- *p
+ }
+ }
+ }
}
-func saveTorrent(t models.Torrent) {
- fmt.Printf("Torrent added, size: %d, name: %q, tags: %s, url: magnet:?xt=urn:btih:%s\n", t.Size, t.Name, t.Tags, t.Infohash)
- // Add tags
- //tagTorrent(&t)
+func startBTWorkers(s models.TorrentStore) {
+ log.Debug("starting bittorrent workers")
+ pool = make(chan chan models.Peer)
+ torrents = make(chan models.Torrent)
- /*
- // Not sure if I like continue labels, so this
- var discard = false
- for _, tag := range Config.SkipTags {
- if hasTag(t, tag) {
- fmt.Printf("Skipping torrent tagged '%s': %q\n", tag, t.Name)
- discard = true
- break
+ onNewTorrent := func(t models.Torrent) {
+ // Add tags
+ tags := tagTorrent(t, tagREs)
+ for _, skipTag := range strings.Split(skipTags, ",") {
+ for _, tg := range tags {
+ if skipTag == tg {
+ log.Debug("skipping torrent", "infohash", t.Infohash, "tags", tags)
+ ihBlacklist[t.Infohash.String()] = true
+ return
+ }
}
}
- if discard {
- continue
+ t.Tags = tags
+ log.Debug("torrent tagged", "infohash", t.Infohash, "tags", tags)
+ err := s.SaveTorrent(&t)
+ if err != nil {
+ log.Error("failed to save torrent", "error", err)
}
- */
-}
+ log.Info("torrent added", "name", t.Name, "size", t.Size, "tags", t.Tags)
+ }
-func startBTWorkers(f func(t models.Torrent)) {
- pool = make(chan chan models.Peer)
- torrents = make(chan models.Torrent)
+ /*
+ onBadPeer := func(p models.Peer) {
+ log.Debug("removing peer", "peer", p)
+ err := s.RemovePeer(&p)
+ if err != nil {
+ log.Error("failed to remove peer", "peer", p, "error", err)
+ }
+ peerBlacklist[p.Addr.String()] = true
+ }
+ */
for i := 0; i < btNodes; i++ {
w, err := bt.NewWorker(
@@ -133,12 +204,40 @@ func startBTWorkers(f func(t models.Torrent)) {
bt.SetLogger(log.Named("bt")),
bt.SetPort(port+i),
bt.SetIPv6(ipv6),
- bt.SetOnNewTorrent(f),
+ bt.SetOnNewTorrent(onNewTorrent),
+ //bt.SetOnBadPeer(onBadPeer),
)
if err != nil {
log.Error("failed to create bt worker", "error", err)
return
}
+ log.Debug("running bt node", "index", i)
go w.Run()
}
}
+
+// Filter on words, existing
+func createTagRegexps() {
+ tagREs = make(map[string]*regexp.Regexp)
+ for tag, re := range tags {
+ tagREs[tag] = regexp.MustCompile("(?i)" + re)
+ }
+ // Add character classes
+ for cc, _ := range unicode.Scripts {
+ if cc == "Latin" || cc == "Common" {
+ continue
+ }
+ className := strings.ToLower(cc)
+ // Test for 3 or more characters per character class
+ tagREs[className] = regexp.MustCompile(fmt.Sprintf(`(?i)\p{%s}{3,}`, cc))
+ }
+ // Merge user tags
+ /*
+ for tag, re := range Config.Tags {
+ if !Config.Quiet {
+ fmt.Printf("Adding user tag: %s = %s\n", tag, re)
+ }
+ tagREs[tag] = regexp.MustCompile("(?i)" + re)
+ }
+ */
+}