diff options
| author | Felix Hanley <felix@userspace.com.au> | 2018-03-13 11:29:49 +0000 |
|---|---|---|
| committer | Felix Hanley <felix@userspace.com.au> | 2018-03-13 11:29:49 +0000 |
| commit | 79c6a22d81e6ceadb23636e90e5d53f2a86e090d (patch) | |
| tree | 33776c9bedc99ccd87c1c7061b6195500c05f54d /cmd/main.go | |
| parent | a67075e332458002ce4c56c0aa07e03807dc22ea (diff) | |
| download | dhtsearch-79c6a22d81e6ceadb23636e90e5d53f2a86e090d.tar.gz dhtsearch-79c6a22d81e6ceadb23636e90e5d53f2a86e090d.tar.bz2 | |
Fix tagging and announce callbacks
Diffstat (limited to 'cmd/main.go')
| -rw-r--r-- | cmd/main.go | 165 |
1 files changed, 132 insertions, 33 deletions
diff --git a/cmd/main.go b/cmd/main.go index 80ef9eb..3019ddf 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -4,9 +4,13 @@ import ( "flag" "fmt" "os" + "regexp" + "strings" "time" + "unicode" "github.com/felix/dhtsearch/bt" + "github.com/felix/dhtsearch/db" "github.com/felix/dhtsearch/dht" "github.com/felix/dhtsearch/models" "github.com/felix/logger" @@ -31,6 +35,15 @@ var ( pool chan chan models.Peer torrents chan models.Torrent btNodes int + tagREs map[string]*regexp.Regexp + skipTags string +) + +// Store vars +var ( + dsn string + ihBlacklist map[string]bool + peerBlacklist map[string]bool ) func main() { @@ -40,6 +53,9 @@ func main() { flag.IntVar(&dhtNodes, "dht-nodes", 1, "number of DHT nodes to start") flag.IntVar(&btNodes, "bt-nodes", 3, "number of BT nodes to start") + flag.StringVar(&skipTags, "skip-tags", "xxx", "tags of torrents to skip") + + flag.StringVar(&dsn, "dsn", "postgres://dhtsearch@localhost/dhtsearch", "database DSN") flag.BoolVar(&showVersion, "v", false, "show version") @@ -62,9 +78,25 @@ func main() { log.Info("version", version) log.Debug("debugging") - go startDHTNodes(saveInfohash) + store, err := db.NewStore(dsn) + defer store.Close() + if err != nil { + log.Error("failed to connect store", "error", err) + os.Exit(1) + } + + createTagRegexps() + + ihBlacklist = make(map[string]bool) + peerBlacklist = make(map[string]bool) + // TODO read in existing blacklist + // TODO populate bloom filter + + go startDHTNodes(store) - go startBTWorkers(saveTorrent) + go startBTWorkers(store) + + go processPendingPeers(store) for { select { @@ -74,58 +106,97 @@ func main() { } } -func startDHTNodes(f func(p models.Peer)) (nodes []*dht.Node, err error) { - nodes = make([]*dht.Node, dhtNodes) +func startDHTNodes(s models.PeerStore) { + log.Debug("starting dht nodes") + nodes := make([]*dht.Node, dhtNodes) for i := 0; i < dhtNodes; i++ { dht, err := dht.NewNode( dht.SetLogger(log.Named("dht")), dht.SetPort(port+i), - dht.SetOnAnnouncePeer(f), dht.SetIPv6(ipv6), + dht.SetOnAnnouncePeer(func(p models.Peer) { + if black := ihBlacklist[p.Infohash.String()]; black { + log.Debug("ignoring blacklisted infohash", "peer", p) + return + } + if black := peerBlacklist[p.Addr.String()]; black { + log.Debug("ignoring blacklisted peer", "peer", p) + return + } + log.Debug("peer announce", "peer", p) + err := s.SavePeer(&p) + if err != nil { + log.Error("failed to save peer", "error", err) + } + }), ) if err != nil { log.Error("failed to create node", "error", err) - return nodes, err + //return nodes, err } go dht.Run() nodes[i] = dht } - return nodes, err + //return nodes, err } -func saveInfohash(p models.Peer) { - //log.Info("announce", "peer", p) - // Blocks - w := <-pool - w <- p - return +func processPendingPeers(s models.InfohashStore) { + log.Debug("processing pending peers") + for { + peers, err := s.PendingInfohashes(1) + if err != nil { + log.Debug("failed to get pending peer", "error", err) + time.Sleep(time.Second * 1) + continue + } + for _, p := range peers { + //log.Debug("pending peer retrieved", "peer", *p) + select { + case w := <-pool: + //log.Debug("assigning peer to bt worker") + w <- *p + } + } + } } -func saveTorrent(t models.Torrent) { - fmt.Printf("Torrent added, size: %d, name: %q, tags: %s, url: magnet:?xt=urn:btih:%s\n", t.Size, t.Name, t.Tags, t.Infohash) - // Add tags - //tagTorrent(&t) +func startBTWorkers(s models.TorrentStore) { + log.Debug("starting bittorrent workers") + pool = make(chan chan models.Peer) + torrents = make(chan models.Torrent) - /* - // Not sure if I like continue labels, so this - var discard = false - for _, tag := range Config.SkipTags { - if hasTag(t, tag) { - fmt.Printf("Skipping torrent tagged '%s': %q\n", tag, t.Name) - discard = true - break + onNewTorrent := func(t models.Torrent) { + // Add tags + tags := tagTorrent(t, tagREs) + for _, skipTag := range strings.Split(skipTags, ",") { + for _, tg := range tags { + if skipTag == tg { + log.Debug("skipping torrent", "infohash", t.Infohash, "tags", tags) + ihBlacklist[t.Infohash.String()] = true + return + } } } - if discard { - continue + t.Tags = tags + log.Debug("torrent tagged", "infohash", t.Infohash, "tags", tags) + err := s.SaveTorrent(&t) + if err != nil { + log.Error("failed to save torrent", "error", err) } - */ -} + log.Info("torrent added", "name", t.Name, "size", t.Size, "tags", t.Tags) + } -func startBTWorkers(f func(t models.Torrent)) { - pool = make(chan chan models.Peer) - torrents = make(chan models.Torrent) + /* + onBadPeer := func(p models.Peer) { + log.Debug("removing peer", "peer", p) + err := s.RemovePeer(&p) + if err != nil { + log.Error("failed to remove peer", "peer", p, "error", err) + } + peerBlacklist[p.Addr.String()] = true + } + */ for i := 0; i < btNodes; i++ { w, err := bt.NewWorker( @@ -133,12 +204,40 @@ func startBTWorkers(f func(t models.Torrent)) { bt.SetLogger(log.Named("bt")), bt.SetPort(port+i), bt.SetIPv6(ipv6), - bt.SetOnNewTorrent(f), + bt.SetOnNewTorrent(onNewTorrent), + //bt.SetOnBadPeer(onBadPeer), ) if err != nil { log.Error("failed to create bt worker", "error", err) return } + log.Debug("running bt node", "index", i) go w.Run() } } + +// Filter on words, existing +func createTagRegexps() { + tagREs = make(map[string]*regexp.Regexp) + for tag, re := range tags { + tagREs[tag] = regexp.MustCompile("(?i)" + re) + } + // Add character classes + for cc, _ := range unicode.Scripts { + if cc == "Latin" || cc == "Common" { + continue + } + className := strings.ToLower(cc) + // Test for 3 or more characters per character class + tagREs[className] = regexp.MustCompile(fmt.Sprintf(`(?i)\p{%s}{3,}`, cc)) + } + // Merge user tags + /* + for tag, re := range Config.Tags { + if !Config.Quiet { + fmt.Printf("Adding user tag: %s = %s\n", tag, re) + } + tagREs[tag] = regexp.MustCompile("(?i)" + re) + } + */ +} |
