From 9b72d1065ca106c98b01b62caefdd2cf8f08178c Mon Sep 17 00:00:00 2001 From: Felix Hanley Date: Mon, 9 Nov 2020 10:33:08 +1100 Subject: Clean up build and docs --- cmd/dhtsearch/main.go | 257 ++++++++++++++++++++++++++++++++++++++++++++++++++ cmd/dhtsearch/tag.go | 90 ++++++++++++++++++ cmd/main.go | 257 -------------------------------------------------- cmd/tag.go | 90 ------------------ 4 files changed, 347 insertions(+), 347 deletions(-) create mode 100644 cmd/dhtsearch/main.go create mode 100644 cmd/dhtsearch/tag.go delete mode 100644 cmd/main.go delete mode 100644 cmd/tag.go (limited to 'cmd') diff --git a/cmd/dhtsearch/main.go b/cmd/dhtsearch/main.go new file mode 100644 index 0000000..921de77 --- /dev/null +++ b/cmd/dhtsearch/main.go @@ -0,0 +1,257 @@ +package main + +import ( + "flag" + "fmt" + "os" + "regexp" + "strings" + "time" + "unicode" + + "github.com/hashicorp/golang-lru" + "src.userspace.com.au/dhtsearch/bt" + "src.userspace.com.au/dhtsearch/db" + "src.userspace.com.au/dhtsearch/dht" + "src.userspace.com.au/dhtsearch/models" + "src.userspace.com.au/logger" + //"github.com/pkg/profile" +) + +var ( + version string + log logger.Logger +) + +// DHT vars +var ( + debug bool + port int + ipv6 bool + dhtNodes int + showVersion bool +) + +// Torrent vars +var ( + pool chan chan models.Peer + torrents chan models.Torrent + btNodes int + tagREs map[string]*regexp.Regexp + skipTags string +) + +// Store vars +var ( + dsn string + ihBlacklist *lru.ARCCache + peerBlacklist *lru.ARCCache +) + +func main() { + //defer profile.Start(profile.MemProfile).Stop() + flag.IntVar(&port, "port", 6881, "listen port (and first for multiple nodes") + flag.BoolVar(&debug, "debug", false, "show debug output") + flag.BoolVar(&ipv6, "6", false, "listen on IPv6 also") + flag.IntVar(&dhtNodes, "dht-nodes", 1, "number of DHT nodes to start") + + flag.IntVar(&btNodes, "bt-nodes", 3, "number of BT nodes to start") + flag.StringVar(&skipTags, "skip-tags", "xxx", "tags of torrents to skip") + + flag.StringVar(&dsn, "dsn", "file:dhtsearch.db?cache=shared&mode=memory", "database DSN") + + flag.BoolVar(&showVersion, "v", false, "show version") + + flag.Parse() + + if showVersion { + fmt.Println(version) + os.Exit(0) + } + + logOpts := &logger.Options{ + Name: "dhtsearch", + Level: logger.Info, + } + + if debug { + logOpts.Level = logger.Debug + } + log = logger.New(logOpts) + log.Info("version", version) + log.Debug("debugging") + + store, err := db.NewStore(dsn) + if err != nil { + log.Error("failed to connect store", "error", err) + os.Exit(1) + } + defer store.Close() + + createTagRegexps() + + ihBlacklist, err = lru.NewARC(1000) + if err != nil { + log.Error("failed to create infohash blacklist", "error", err) + os.Exit(1) + } + peerBlacklist, err = lru.NewARC(1000) + if err != nil { + log.Error("failed to create blacklist", "error", err) + os.Exit(1) + } + // TODO read in existing blacklist + // TODO populate bloom filter + + go startDHTNodes(store) + + go startBTWorkers(store) + + go processPendingPeers(store) + + for { + select { + case <-time.After(300 * time.Second): + log.Info("---- mark ----") + } + } +} + +func startDHTNodes(s models.PeerStore) { + log.Debug("starting dht nodes") + nodes := make([]*dht.Node, dhtNodes) + + for i := 0; i < dhtNodes; i++ { + dht, err := dht.NewNode( + dht.SetLogger(log.Named("dht")), + dht.SetPort(port+i), + dht.SetIPv6(ipv6), + dht.SetBlacklist(peerBlacklist), + dht.SetOnAnnouncePeer(func(p models.Peer) { + if _, black := ihBlacklist.Get(p.Infohash.String()); black { + log.Debug("ignoring blacklisted infohash", "peer", p) + return + } + //log.Debug("peer announce", "peer", p) + err := s.SavePeer(&p) + if err != nil { + log.Error("failed to save peer", "error", err) + } + }), + dht.SetOnBadPeer(func(p models.Peer) { + err := s.RemovePeer(&p) + if err != nil { + log.Error("failed to remove peer", "error", err) + } + }), + ) + if err != nil { + log.Error("failed to create node", "error", err) + continue + } + go dht.Run() + nodes[i] = dht + } +} + +func processPendingPeers(s models.InfohashStore) { + log.Debug("processing pending peers") + for { + peers, err := s.PendingInfohashes(10) + if err != nil { + log.Warn("failed to get pending peer", "error", err) + time.Sleep(time.Second * 1) + continue + } + for _, p := range peers { + log.Debug("pending peer retrieved", "peer", *p) + select { + case w := <-pool: + //log.Debug("assigning peer to bt worker") + w <- *p + } + } + } +} + +func startBTWorkers(s models.TorrentStore) { + log.Debug("starting bittorrent workers") + pool = make(chan chan models.Peer) + torrents = make(chan models.Torrent) + + onNewTorrent := func(t models.Torrent) { + // Add tags + tags := tagTorrent(t, tagREs) + for _, skipTag := range strings.Split(skipTags, ",") { + for _, tg := range tags { + if skipTag == tg { + log.Debug("skipping torrent", "infohash", t.Infohash, "tags", tags) + ihBlacklist.Add(t.Infohash.String(), true) + s.RemoveTorrent(&t) + return + } + } + } + t.Tags = tags + log.Debug("torrent tagged", "infohash", t.Infohash, "tags", tags) + err := s.SaveTorrent(&t) + if err != nil { + log.Error("failed to save torrent", "error", err) + ihBlacklist.Add(t.Infohash.String(), true) + s.RemoveTorrent(&t) + } + log.Info("torrent added", "name", t.Name, "size", t.Size, "tags", t.Tags) + } + + onBadPeer := func(p models.Peer) { + log.Debug("removing peer", "peer", p) + err := s.RemovePeer(&p) + if err != nil { + log.Error("failed to remove peer", "peer", p, "error", err) + } + peerBlacklist.Add(p.Addr.String(), true) + } + + for i := 0; i < btNodes; i++ { + w, err := bt.NewWorker( + pool, + bt.SetLogger(log.Named("bt")), + bt.SetPort(port+i), + bt.SetIPv6(ipv6), + bt.SetOnNewTorrent(onNewTorrent), + bt.SetOnBadPeer(onBadPeer), + ) + if err != nil { + log.Error("failed to create bt worker", "error", err) + return + } + log.Debug("running bt node", "index", i) + go w.Run() + } +} + +// Filter on words, existing +func createTagRegexps() { + tagREs = make(map[string]*regexp.Regexp) + for tag, re := range tags { + tagREs[tag] = regexp.MustCompile("(?i)" + re) + } + // Add character classes + for cc, _ := range unicode.Scripts { + if cc == "Latin" || cc == "Common" { + continue + } + className := strings.ToLower(cc) + // Test for 3 or more characters per character class + tagREs[className] = regexp.MustCompile(fmt.Sprintf(`(?i)\p{%s}{3,}`, cc)) + } + // Merge user tags + /* + for tag, re := range Config.Tags { + if !Config.Quiet { + fmt.Printf("Adding user tag: %s = %s\n", tag, re) + } + tagREs[tag] = regexp.MustCompile("(?i)" + re) + } + */ +} diff --git a/cmd/dhtsearch/tag.go b/cmd/dhtsearch/tag.go new file mode 100644 index 0000000..6d5c0ea --- /dev/null +++ b/cmd/dhtsearch/tag.go @@ -0,0 +1,90 @@ +package main + +import ( + "fmt" + "regexp" + "strings" + "unicode" + + "src.userspace.com.au/dhtsearch/models" +) + +// Default tags, can be supplimented or overwritten by config +var tags = map[string]string{ + "flac": `\.flac$`, + "episode": "(season|episode|s[0-9]{2}e[0-9]{2})", + "1080": "1080", + "720": "720", + "hd": "hd|720|1080|4k", + "rip": "(bdrip|dvd[- ]?rip|dvdmux|br[- ]?rip|dvd[-]?r|web[- ]?dl|hdrip)", + "xxx": `(xxx|p(orn|ussy)|censor|sex|urbat|a(ss|nal)|o(rgy|gasm)|(fu|di|co)ck|esbian|milf|lust|gay)|rotic|18(\+|yr)|hore|hemale|virgin`, + "ebook": "epub", + "application": `\.(apk|exe|msi|dmg)$`, + "android": `\.apk$`, + "apple": `\.dmg$`, + "subtitles": `\.s(rt|ub)$`, + "archive": `\.(zip|rar|p7|tgz|bz2|iso)$`, + "video": `(\.(3g2|3gp|amv|asf|avi|drc|f4a|f4b|f4p|f4v|flv|gif|gifv|m2v|m4p|m4v|mkv|mng|mov|mp2|mp4|mpe|mpeg|mpg|mpv|mxf|net|nsv|ogv|qt|rm|rmvb|roq|svi|vob|webm|wmv|yuv)$|divx|x264|x265)`, + "audio": `\.(aa|aac|aax|act|aiff|amr|ape|au|awb|dct|dss|dvf|flac|gsm|iklax|ivs|m4a|m4b|mmf|mp3|mpc|msv|ogg|opus|ra|raw|sln|tta|vox|wav|wma|wv)$`, + "document": `\.(cbr|cbz|cb7|cbt|cba|epub|djvu|fb2|ibook|azw.|lit|prc|mobi|pdb|pdb|oxps|xps|pdf)$`, + "bootleg": `(camrip|hdts|[-. ](ts|tc)[-. ]|hdtc)`, + "screener": `(bd[-]?scr|screener|dvd[-]?scr|r5)`, + "font": `(font|\.(ttf|fon|otf)$)`, +} + +func mergeCharacterTagREs(tagREs map[string]*regexp.Regexp) error { + var err error + // Add character classes + for cc := range unicode.Scripts { + if cc == "Latin" || cc == "Common" { + continue + } + className := strings.ToLower(cc) + // Test for 3 or more characters per character class + tagREs[className], err = regexp.Compile(fmt.Sprintf(`(?i)\p{%s}{3,}`, cc)) + if err != nil { + return err + } + } + return nil +} + +func mergeTagRegexps(tagREs map[string]*regexp.Regexp, tags map[string]string) error { + var err error + for tag, re := range tags { + tagREs[tag], err = regexp.Compile("(?i)" + re) + if err != nil { + return err + } + } + return nil +} + +func tagTorrent(t models.Torrent, tagREs map[string]*regexp.Regexp) (tags []string) { + ttags := make(map[string]bool) + + for tag, re := range tagREs { + if re.MatchString(t.Name) { + ttags[tag] = true + } + for _, f := range t.Files { + if re.MatchString(f.Path) { + ttags[tag] = true + } + } + } + // Make unique + for tt := range ttags { + tags = append(tags, tt) + } + return tags +} + +func hasTag(t models.Torrent, tag string) bool { + for _, t := range t.Tags { + if tag == t { + return true + } + } + return false +} diff --git a/cmd/main.go b/cmd/main.go deleted file mode 100644 index b81ea4c..0000000 --- a/cmd/main.go +++ /dev/null @@ -1,257 +0,0 @@ -package main - -import ( - "flag" - "fmt" - "os" - "regexp" - "strings" - "time" - "unicode" - - "github.com/hashicorp/golang-lru" - "src.userspace.com.au/dhtsearch/bt" - "src.userspace.com.au/dhtsearch/db" - "src.userspace.com.au/dhtsearch/dht" - "src.userspace.com.au/dhtsearch/models" - "src.userspace.com.au/logger" - //"github.com/pkg/profile" -) - -var ( - version string - log logger.Logger -) - -// DHT vars -var ( - debug bool - port int - ipv6 bool - dhtNodes int - showVersion bool -) - -// Torrent vars -var ( - pool chan chan models.Peer - torrents chan models.Torrent - btNodes int - tagREs map[string]*regexp.Regexp - skipTags string -) - -// Store vars -var ( - dsn string - ihBlacklist *lru.ARCCache - peerBlacklist *lru.ARCCache -) - -func main() { - //defer profile.Start(profile.MemProfile).Stop() - flag.IntVar(&port, "port", 6881, "listen port (and first for multiple nodes") - flag.BoolVar(&debug, "debug", false, "show debug output") - flag.BoolVar(&ipv6, "6", false, "listen on IPv6 also") - flag.IntVar(&dhtNodes, "dht-nodes", 1, "number of DHT nodes to start") - - flag.IntVar(&btNodes, "bt-nodes", 3, "number of BT nodes to start") - flag.StringVar(&skipTags, "skip-tags", "xxx", "tags of torrents to skip") - - flag.StringVar(&dsn, "dsn", "postgres://dhtsearch@localhost/dhtsearch", "database DSN") - - flag.BoolVar(&showVersion, "v", false, "show version") - - flag.Parse() - - if showVersion { - fmt.Println(version) - os.Exit(0) - } - - logOpts := &logger.Options{ - Name: "dhtsearch", - Level: logger.Info, - } - - if debug { - logOpts.Level = logger.Debug - } - log = logger.New(logOpts) - log.Info("version", version) - log.Debug("debugging") - - store, err := db.NewStore(dsn) - if err != nil { - log.Error("failed to connect store", "error", err) - os.Exit(1) - } - defer store.Close() - - createTagRegexps() - - ihBlacklist, err = lru.NewARC(1000) - if err != nil { - log.Error("failed to create infohash blacklist", "error", err) - os.Exit(1) - } - peerBlacklist, err = lru.NewARC(1000) - if err != nil { - log.Error("failed to create blacklist", "error", err) - os.Exit(1) - } - // TODO read in existing blacklist - // TODO populate bloom filter - - go startDHTNodes(store) - - go startBTWorkers(store) - - go processPendingPeers(store) - - for { - select { - case <-time.After(300 * time.Second): - log.Info("---- mark ----") - } - } -} - -func startDHTNodes(s models.PeerStore) { - log.Debug("starting dht nodes") - nodes := make([]*dht.Node, dhtNodes) - - for i := 0; i < dhtNodes; i++ { - dht, err := dht.NewNode( - dht.SetLogger(log.Named("dht")), - dht.SetPort(port+i), - dht.SetIPv6(ipv6), - dht.SetBlacklist(peerBlacklist), - dht.SetOnAnnouncePeer(func(p models.Peer) { - if _, black := ihBlacklist.Get(p.Infohash.String()); black { - log.Debug("ignoring blacklisted infohash", "peer", p) - return - } - //log.Debug("peer announce", "peer", p) - err := s.SavePeer(&p) - if err != nil { - log.Error("failed to save peer", "error", err) - } - }), - dht.SetOnBadPeer(func(p models.Peer) { - err := s.RemovePeer(&p) - if err != nil { - log.Error("failed to remove peer", "error", err) - } - }), - ) - if err != nil { - log.Error("failed to create node", "error", err) - continue - } - go dht.Run() - nodes[i] = dht - } -} - -func processPendingPeers(s models.InfohashStore) { - log.Debug("processing pending peers") - for { - peers, err := s.PendingInfohashes(10) - if err != nil { - log.Warn("failed to get pending peer", "error", err) - time.Sleep(time.Second * 1) - continue - } - for _, p := range peers { - log.Debug("pending peer retrieved", "peer", *p) - select { - case w := <-pool: - //log.Debug("assigning peer to bt worker") - w <- *p - } - } - } -} - -func startBTWorkers(s models.TorrentStore) { - log.Debug("starting bittorrent workers") - pool = make(chan chan models.Peer) - torrents = make(chan models.Torrent) - - onNewTorrent := func(t models.Torrent) { - // Add tags - tags := tagTorrent(t, tagREs) - for _, skipTag := range strings.Split(skipTags, ",") { - for _, tg := range tags { - if skipTag == tg { - log.Debug("skipping torrent", "infohash", t.Infohash, "tags", tags) - ihBlacklist.Add(t.Infohash.String(), true) - s.RemoveTorrent(&t) - return - } - } - } - t.Tags = tags - log.Debug("torrent tagged", "infohash", t.Infohash, "tags", tags) - err := s.SaveTorrent(&t) - if err != nil { - log.Error("failed to save torrent", "error", err) - ihBlacklist.Add(t.Infohash.String(), true) - s.RemoveTorrent(&t) - } - log.Info("torrent added", "name", t.Name, "size", t.Size, "tags", t.Tags) - } - - onBadPeer := func(p models.Peer) { - log.Debug("removing peer", "peer", p) - err := s.RemovePeer(&p) - if err != nil { - log.Error("failed to remove peer", "peer", p, "error", err) - } - peerBlacklist.Add(p.Addr.String(), true) - } - - for i := 0; i < btNodes; i++ { - w, err := bt.NewWorker( - pool, - bt.SetLogger(log.Named("bt")), - bt.SetPort(port+i), - bt.SetIPv6(ipv6), - bt.SetOnNewTorrent(onNewTorrent), - bt.SetOnBadPeer(onBadPeer), - ) - if err != nil { - log.Error("failed to create bt worker", "error", err) - return - } - log.Debug("running bt node", "index", i) - go w.Run() - } -} - -// Filter on words, existing -func createTagRegexps() { - tagREs = make(map[string]*regexp.Regexp) - for tag, re := range tags { - tagREs[tag] = regexp.MustCompile("(?i)" + re) - } - // Add character classes - for cc, _ := range unicode.Scripts { - if cc == "Latin" || cc == "Common" { - continue - } - className := strings.ToLower(cc) - // Test for 3 or more characters per character class - tagREs[className] = regexp.MustCompile(fmt.Sprintf(`(?i)\p{%s}{3,}`, cc)) - } - // Merge user tags - /* - for tag, re := range Config.Tags { - if !Config.Quiet { - fmt.Printf("Adding user tag: %s = %s\n", tag, re) - } - tagREs[tag] = regexp.MustCompile("(?i)" + re) - } - */ -} diff --git a/cmd/tag.go b/cmd/tag.go deleted file mode 100644 index 6d5c0ea..0000000 --- a/cmd/tag.go +++ /dev/null @@ -1,90 +0,0 @@ -package main - -import ( - "fmt" - "regexp" - "strings" - "unicode" - - "src.userspace.com.au/dhtsearch/models" -) - -// Default tags, can be supplimented or overwritten by config -var tags = map[string]string{ - "flac": `\.flac$`, - "episode": "(season|episode|s[0-9]{2}e[0-9]{2})", - "1080": "1080", - "720": "720", - "hd": "hd|720|1080|4k", - "rip": "(bdrip|dvd[- ]?rip|dvdmux|br[- ]?rip|dvd[-]?r|web[- ]?dl|hdrip)", - "xxx": `(xxx|p(orn|ussy)|censor|sex|urbat|a(ss|nal)|o(rgy|gasm)|(fu|di|co)ck|esbian|milf|lust|gay)|rotic|18(\+|yr)|hore|hemale|virgin`, - "ebook": "epub", - "application": `\.(apk|exe|msi|dmg)$`, - "android": `\.apk$`, - "apple": `\.dmg$`, - "subtitles": `\.s(rt|ub)$`, - "archive": `\.(zip|rar|p7|tgz|bz2|iso)$`, - "video": `(\.(3g2|3gp|amv|asf|avi|drc|f4a|f4b|f4p|f4v|flv|gif|gifv|m2v|m4p|m4v|mkv|mng|mov|mp2|mp4|mpe|mpeg|mpg|mpv|mxf|net|nsv|ogv|qt|rm|rmvb|roq|svi|vob|webm|wmv|yuv)$|divx|x264|x265)`, - "audio": `\.(aa|aac|aax|act|aiff|amr|ape|au|awb|dct|dss|dvf|flac|gsm|iklax|ivs|m4a|m4b|mmf|mp3|mpc|msv|ogg|opus|ra|raw|sln|tta|vox|wav|wma|wv)$`, - "document": `\.(cbr|cbz|cb7|cbt|cba|epub|djvu|fb2|ibook|azw.|lit|prc|mobi|pdb|pdb|oxps|xps|pdf)$`, - "bootleg": `(camrip|hdts|[-. ](ts|tc)[-. ]|hdtc)`, - "screener": `(bd[-]?scr|screener|dvd[-]?scr|r5)`, - "font": `(font|\.(ttf|fon|otf)$)`, -} - -func mergeCharacterTagREs(tagREs map[string]*regexp.Regexp) error { - var err error - // Add character classes - for cc := range unicode.Scripts { - if cc == "Latin" || cc == "Common" { - continue - } - className := strings.ToLower(cc) - // Test for 3 or more characters per character class - tagREs[className], err = regexp.Compile(fmt.Sprintf(`(?i)\p{%s}{3,}`, cc)) - if err != nil { - return err - } - } - return nil -} - -func mergeTagRegexps(tagREs map[string]*regexp.Regexp, tags map[string]string) error { - var err error - for tag, re := range tags { - tagREs[tag], err = regexp.Compile("(?i)" + re) - if err != nil { - return err - } - } - return nil -} - -func tagTorrent(t models.Torrent, tagREs map[string]*regexp.Regexp) (tags []string) { - ttags := make(map[string]bool) - - for tag, re := range tagREs { - if re.MatchString(t.Name) { - ttags[tag] = true - } - for _, f := range t.Files { - if re.MatchString(f.Path) { - ttags[tag] = true - } - } - } - // Make unique - for tt := range ttags { - tags = append(tags, tt) - } - return tags -} - -func hasTag(t models.Torrent, tag string) bool { - for _, t := range t.Tags { - if tag == t { - return true - } - } - return false -} -- cgit v1.2.3