diff options
| -rw-r--r-- | Makefile | 10 | ||||
| -rw-r--r-- | bt/options.go | 10 | ||||
| -rw-r--r-- | bt/worker.go | 26 | ||||
| -rw-r--r-- | cmd/main.go | 165 | ||||
| -rw-r--r-- | cmd/tag.go | 89 | ||||
| -rw-r--r-- | db/pgsql.go | 380 | ||||
| -rw-r--r-- | main.go | 31 | ||||
| -rw-r--r-- | models/infohash.go | 5 | ||||
| -rw-r--r-- | models/infohash_test.go | 9 | ||||
| -rw-r--r-- | models/peer.go | 7 | ||||
| -rw-r--r-- | models/storage.go | 26 | ||||
| -rw-r--r-- | models/torrent.go | 8 |
12 files changed, 674 insertions, 92 deletions
@@ -8,14 +8,16 @@ BINARIES = $(patsubst %,$(CMD)-%-v$(VERSION), $(TARGETS)) .DEFAULT_GOAL := help -release: check-env $(BINARIES) ## Build all binaries +release: $(BINARIES) ## Build all binaries -build: check-env ## Build binary for current platform +build: $(CMD) ## Build binary for current platform + +$(CMD): check-env $(SRC) cd cmd && go build -o ../$(CMD) $(LDFLAGS) standalone : TAGS = sqlite -$(BINARIES): $(SRC) +$(BINARIES): check-env $(SRC) cd cmd && env GOOS=`echo $@ |cut -d'-' -f2` GOARCH=`echo $@ |cut -d'-' -f3 |cut -d'.' -f1` go build -o ../$@ $(LDFLAGS) test: ## Run tests and create coverage report @@ -38,4 +40,4 @@ endif help: @grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) |sort |awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-30s\033[0m %s\n", $$1, $$2}' -.PHONY: help install build test lint clean +.PHONY: help install test lint clean diff --git a/bt/options.go b/bt/options.go index 25ebc21..bb17505 100644 --- a/bt/options.go +++ b/bt/options.go @@ -7,7 +7,7 @@ import ( type Option func(*Worker) error -// SetNewTorrent sets the callback +// SetOnNewTorrent sets the callback func SetOnNewTorrent(f func(models.Torrent)) Option { return func(w *Worker) error { w.OnNewTorrent = f @@ -15,6 +15,14 @@ func SetOnNewTorrent(f func(models.Torrent)) Option { } } +// SetOnBadPeer sets the callback +func SetOnBadPeer(f func(models.Peer)) Option { + return func(w *Worker) error { + w.OnBadPeer = f + return nil + } +} + // SetPort sets the port to listen on func SetPort(p int) Option { return func(w *Worker) error { diff --git a/bt/worker.go b/bt/worker.go index 6d247f4..95669a4 100644 --- a/bt/worker.go +++ b/bt/worker.go @@ -52,6 +52,7 @@ type Worker struct { family string tcpTimeout int OnNewTorrent func(t models.Torrent) + OnBadPeer func(p models.Peer) log logger.Logger } @@ -84,16 +85,19 @@ func (bt *Worker) Run() error { bt.log.Debug("worker got work", "peer", p) md, err := bt.fetchMetadata(p) if err != nil { - bt.log.Debug("failed to fetch metadata", "error", err) + //bt.log.Warn("failed to fetch metadata", "error", err) + if bt.OnBadPeer != nil { + bt.OnBadPeer(p) + } continue } t, err := models.TorrentFromMetadata(p.Infohash, md) if err != nil { - bt.log.Debug("failed to load torrent", "error", err) + bt.log.Warn("failed to load torrent", "error", err) continue } if bt.OnNewTorrent != nil { - go bt.OnNewTorrent(*t) + bt.OnNewTorrent(*t) } } } @@ -115,9 +119,9 @@ func (bt *Worker) fetchMetadata(p models.Peer) (out []byte, err error) { recover() }() - ll := bt.log.WithFields("address", p.Addr.String()) + //ll := bt.log.WithFields("address", p.Addr.String()) - ll.Debug("connecting") + //ll.Debug("connecting") dial, err := net.DialTimeout("tcp", p.Addr.String(), time.Second*15) if err != nil { return out, err @@ -126,7 +130,7 @@ func (bt *Worker) fetchMetadata(p models.Peer) (out []byte, err error) { conn := dial.(*net.TCPConn) conn.SetLinger(0) defer conn.Close() - ll.Debug("dialed") + //ll.Debug("dialed") data := bytes.NewBuffer(nil) data.Grow(BlockSize) @@ -134,26 +138,26 @@ func (bt *Worker) fetchMetadata(p models.Peer) (out []byte, err error) { ih := models.GenInfohash() // TCP handshake - ll.Debug("sending handshake") + //ll.Debug("sending handshake") _, err = sendHandshake(conn, p.Infohash, ih) if err != nil { return nil, err } // Handle the handshake response - ll.Debug("handling handshake response") + //ll.Debug("handling handshake response") err = read(conn, 68, data) if err != nil { return nil, err } next := data.Next(68) - ll.Debug("got next data") + //ll.Debug("got next data") if !(bytes.Equal(handshakePrefix[:20], next[:20]) && next[25]&0x10 != 0) { - ll.Debug("next data does not match", "next", next) + //ll.Debug("next data does not match", "next", next) return nil, errors.New("invalid handshake response") } - ll.Debug("sending ext handshake") + //ll.Debug("sending ext handshake") _, err = sendExtHandshake(conn) if err != nil { return nil, err diff --git a/cmd/main.go b/cmd/main.go index 80ef9eb..3019ddf 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -4,9 +4,13 @@ import ( "flag" "fmt" "os" + "regexp" + "strings" "time" + "unicode" "github.com/felix/dhtsearch/bt" + "github.com/felix/dhtsearch/db" "github.com/felix/dhtsearch/dht" "github.com/felix/dhtsearch/models" "github.com/felix/logger" @@ -31,6 +35,15 @@ var ( pool chan chan models.Peer torrents chan models.Torrent btNodes int + tagREs map[string]*regexp.Regexp + skipTags string +) + +// Store vars +var ( + dsn string + ihBlacklist map[string]bool + peerBlacklist map[string]bool ) func main() { @@ -40,6 +53,9 @@ func main() { flag.IntVar(&dhtNodes, "dht-nodes", 1, "number of DHT nodes to start") flag.IntVar(&btNodes, "bt-nodes", 3, "number of BT nodes to start") + flag.StringVar(&skipTags, "skip-tags", "xxx", "tags of torrents to skip") + + flag.StringVar(&dsn, "dsn", "postgres://dhtsearch@localhost/dhtsearch", "database DSN") flag.BoolVar(&showVersion, "v", false, "show version") @@ -62,9 +78,25 @@ func main() { log.Info("version", version) log.Debug("debugging") - go startDHTNodes(saveInfohash) + store, err := db.NewStore(dsn) + defer store.Close() + if err != nil { + log.Error("failed to connect store", "error", err) + os.Exit(1) + } + + createTagRegexps() + + ihBlacklist = make(map[string]bool) + peerBlacklist = make(map[string]bool) + // TODO read in existing blacklist + // TODO populate bloom filter + + go startDHTNodes(store) - go startBTWorkers(saveTorrent) + go startBTWorkers(store) + + go processPendingPeers(store) for { select { @@ -74,58 +106,97 @@ func main() { } } -func startDHTNodes(f func(p models.Peer)) (nodes []*dht.Node, err error) { - nodes = make([]*dht.Node, dhtNodes) +func startDHTNodes(s models.PeerStore) { + log.Debug("starting dht nodes") + nodes := make([]*dht.Node, dhtNodes) for i := 0; i < dhtNodes; i++ { dht, err := dht.NewNode( dht.SetLogger(log.Named("dht")), dht.SetPort(port+i), - dht.SetOnAnnouncePeer(f), dht.SetIPv6(ipv6), + dht.SetOnAnnouncePeer(func(p models.Peer) { + if black := ihBlacklist[p.Infohash.String()]; black { + log.Debug("ignoring blacklisted infohash", "peer", p) + return + } + if black := peerBlacklist[p.Addr.String()]; black { + log.Debug("ignoring blacklisted peer", "peer", p) + return + } + log.Debug("peer announce", "peer", p) + err := s.SavePeer(&p) + if err != nil { + log.Error("failed to save peer", "error", err) + } + }), ) if err != nil { log.Error("failed to create node", "error", err) - return nodes, err + //return nodes, err } go dht.Run() nodes[i] = dht } - return nodes, err + //return nodes, err } -func saveInfohash(p models.Peer) { - //log.Info("announce", "peer", p) - // Blocks - w := <-pool - w <- p - return +func processPendingPeers(s models.InfohashStore) { + log.Debug("processing pending peers") + for { + peers, err := s.PendingInfohashes(1) + if err != nil { + log.Debug("failed to get pending peer", "error", err) + time.Sleep(time.Second * 1) + continue + } + for _, p := range peers { + //log.Debug("pending peer retrieved", "peer", *p) + select { + case w := <-pool: + //log.Debug("assigning peer to bt worker") + w <- *p + } + } + } } -func saveTorrent(t models.Torrent) { - fmt.Printf("Torrent added, size: %d, name: %q, tags: %s, url: magnet:?xt=urn:btih:%s\n", t.Size, t.Name, t.Tags, t.Infohash) - // Add tags - //tagTorrent(&t) +func startBTWorkers(s models.TorrentStore) { + log.Debug("starting bittorrent workers") + pool = make(chan chan models.Peer) + torrents = make(chan models.Torrent) - /* - // Not sure if I like continue labels, so this - var discard = false - for _, tag := range Config.SkipTags { - if hasTag(t, tag) { - fmt.Printf("Skipping torrent tagged '%s': %q\n", tag, t.Name) - discard = true - break + onNewTorrent := func(t models.Torrent) { + // Add tags + tags := tagTorrent(t, tagREs) + for _, skipTag := range strings.Split(skipTags, ",") { + for _, tg := range tags { + if skipTag == tg { + log.Debug("skipping torrent", "infohash", t.Infohash, "tags", tags) + ihBlacklist[t.Infohash.String()] = true + return + } } } - if discard { - continue + t.Tags = tags + log.Debug("torrent tagged", "infohash", t.Infohash, "tags", tags) + err := s.SaveTorrent(&t) + if err != nil { + log.Error("failed to save torrent", "error", err) } - */ -} + log.Info("torrent added", "name", t.Name, "size", t.Size, "tags", t.Tags) + } -func startBTWorkers(f func(t models.Torrent)) { - pool = make(chan chan models.Peer) - torrents = make(chan models.Torrent) + /* + onBadPeer := func(p models.Peer) { + log.Debug("removing peer", "peer", p) + err := s.RemovePeer(&p) + if err != nil { + log.Error("failed to remove peer", "peer", p, "error", err) + } + peerBlacklist[p.Addr.String()] = true + } + */ for i := 0; i < btNodes; i++ { w, err := bt.NewWorker( @@ -133,12 +204,40 @@ func startBTWorkers(f func(t models.Torrent)) { bt.SetLogger(log.Named("bt")), bt.SetPort(port+i), bt.SetIPv6(ipv6), - bt.SetOnNewTorrent(f), + bt.SetOnNewTorrent(onNewTorrent), + //bt.SetOnBadPeer(onBadPeer), ) if err != nil { log.Error("failed to create bt worker", "error", err) return } + log.Debug("running bt node", "index", i) go w.Run() } } + +// Filter on words, existing +func createTagRegexps() { + tagREs = make(map[string]*regexp.Regexp) + for tag, re := range tags { + tagREs[tag] = regexp.MustCompile("(?i)" + re) + } + // Add character classes + for cc, _ := range unicode.Scripts { + if cc == "Latin" || cc == "Common" { + continue + } + className := strings.ToLower(cc) + // Test for 3 or more characters per character class + tagREs[className] = regexp.MustCompile(fmt.Sprintf(`(?i)\p{%s}{3,}`, cc)) + } + // Merge user tags + /* + for tag, re := range Config.Tags { + if !Config.Quiet { + fmt.Printf("Adding user tag: %s = %s\n", tag, re) + } + tagREs[tag] = regexp.MustCompile("(?i)" + re) + } + */ +} diff --git a/cmd/tag.go b/cmd/tag.go new file mode 100644 index 0000000..ba2799f --- /dev/null +++ b/cmd/tag.go @@ -0,0 +1,89 @@ +package main + +import ( + "fmt" + "regexp" + "strings" + "unicode" + + "github.com/felix/dhtsearch/models" +) + +// Default tags, can be supplimented or overwritten by config +var tags = map[string]string{ + "flac": `\.flac$`, + "episode": "(season|episode|s[0-9]{2}e[0-9]{2})", + "1080": "1080", + "720": "720", + "hd": "hd|720|1080", + "bdrip": "bdrip", + "xxx": `(xxx|p(orn|ussy)|censor|sex|urbat|a(ss|nal)|o(rgy|gasm)|(fu|di|co)ck|esbian|milf|lust|gay)|rotic|18(\+|yr)|hore|hemale|virgin`, + "dvdrip": "dvdrip", + "ebook": "epub", + "application": `\.(apk|exe|msi|dmg)$`, + "android": `\.apk$`, + "apple": `\.dmg$`, + "subtitles": `\.s(rt|ub)$`, + "archive": `\.(zip|rar|p7|tgz|bz2|iso)$`, + "video": `\.(3g2|3gp|amv|asf|avi|drc|f4a|f4b|f4p|f4v|flv|gif|gifv|m2v|m4p|m4v|mkv|mng|mov|mp2|mp4|mpe|mpeg|mpg|mpv|mxf|net|nsv|ogv|qt|rm|rmvb|roq|svi|vob|webm|wmv|yuv)$`, + "audio": `\.(aa|aac|aax|act|aiff|amr|ape|au|awb|dct|dss|dvf|flac|gsm|iklax|ivs|m4a|m4b|mmf|mp3|mpc|msv|ogg|opus|ra|raw|sln|tta|vox|wav|wma|wv)$`, + "document": `\.(cbr|cbz|cb7|cbt|cba|epub|djvu|fb2|ibook|azw.|lit|prc|mobi|pdb|pdb|oxps|xps)$`, + "font": `(font|\.(ttf|fon)$)`, +} + +func mergeCharacterTagREs(tagREs map[string]*regexp.Regexp) error { + var err error + // Add character classes + for cc := range unicode.Scripts { + if cc == "Latin" || cc == "Common" { + continue + } + className := strings.ToLower(cc) + // Test for 3 or more characters per character class + tagREs[className], err = regexp.Compile(fmt.Sprintf(`(?i)\p{%s}{3,}`, cc)) + if err != nil { + return err + } + } + return nil +} + +func mergeTagRegexps(tagREs map[string]*regexp.Regexp, tags map[string]string) error { + var err error + for tag, re := range tags { + tagREs[tag], err = regexp.Compile("(?i)" + re) + if err != nil { + return err + } + } + return nil +} + +func tagTorrent(t models.Torrent, tagREs map[string]*regexp.Regexp) (tags []string) { + ttags := make(map[string]bool) + + for tag, re := range tagREs { + if re.MatchString(t.Name) { + ttags[tag] = true + } + for _, f := range t.Files { + if re.MatchString(f.Path) { + ttags[tag] = true + } + } + } + // Make unique + for tt := range ttags { + tags = append(tags, tt) + } + return tags +} + +func hasTag(t models.Torrent, tag string) bool { + for _, t := range t.Tags { + if tag == t { + return true + } + } + return false +} diff --git a/db/pgsql.go b/db/pgsql.go new file mode 100644 index 0000000..990605c --- /dev/null +++ b/db/pgsql.go @@ -0,0 +1,380 @@ +package db + +import ( + "fmt" + "net" + + "github.com/felix/dhtsearch/models" + "github.com/jackc/pgx" + "github.com/jackc/pgx/pgtype" +) + +// Store is a store +type Store struct { + *pgx.ConnPool +} + +// NewStore connects and initializes a new store +func NewStore(dsn string) (*Store, error) { + cfg, err := pgx.ParseURI(dsn) + if err != nil { + return nil, err + } + c, err := pgx.NewConnPool(pgx.ConnPoolConfig{ConnConfig: cfg, MaxConnections: 10}) + if err != nil { + return nil, err + } + + s := &Store{c} + + // TODO + var upToDate bool + err = s.QueryRow(sqlCheckSchema).Scan(&upToDate) + if err != nil || !upToDate { + _, err = s.Exec(sqlSchema) + } + return s, err +} + +// PendingInfohashes gets the next pending infohash from the store +func (s *Store) PendingInfohashes(n int) (peers []*models.Peer, err error) { + rows, err := s.Query(sqlSelectPendingInfohashes, n) + defer rows.Close() + if err != nil { + return nil, err + } + for rows.Next() { + var p models.Peer + var ih pgtype.Bytea + var addr string + err = rows.Scan(&addr, &ih) + if err != nil { + return nil, err + } + // TODO save peer network? + p.Addr, err = net.ResolveUDPAddr("udp", addr) + if err != nil { + return nil, err + } + ih.AssignTo(&p.Infohash) + peers = append(peers, &p) + } + return peers, nil +} + +// SaveTorrent implements torrentStore +func (s *Store) SaveTorrent(t *models.Torrent) error { + tx, err := s.Begin() + if err != nil { + return err + } + defer tx.Rollback() + + var torrentID int + err = tx.QueryRow(sqlInsertTorrent, t.Name, t.Infohash, t.Size).Scan(&torrentID) + if err != nil { + return fmt.Errorf("insertTorrent: %s", err) + } + + // Write tags + for _, tag := range t.Tags { + tagID, err := s.SaveTag(tag) + if err != nil { + return fmt.Errorf("saveTag: %s", err) + } + _, err = tx.Exec(sqlInsertTagTorrent, tagID, torrentID) + if err != nil { + return fmt.Errorf("insertTagTorrent: %s", err) + } + } + + // Write files + for _, f := range t.Files { + _, err := tx.Exec(sqlInsertFile, torrentID, f.Path, f.Size) + if err != nil { + return fmt.Errorf("insertFile: %s", err) + } + } + + // Should this be outside the transaction? + _, err = tx.Exec(sqlUpdateFTSVectors, torrentID) + if err != nil { + return fmt.Errorf("updateVectors: %s", err) + } + return tx.Commit() +} + +// SavePeer implements torrentStore +func (s *Store) SavePeer(p *models.Peer) (err error) { + _, err = s.Exec(sqlInsertPeer, p.Addr.String(), p.Infohash.Bytes()) + return err +} + +func (s *Store) RemovePeer(p *models.Peer) (err error) { + _, err = s.Exec(sqlRemovePeer, p.Addr.String()) + return err +} + +// TorrentsByHash implements torrentStore +func (s *Store) TorrentByHash(ih models.Infohash) (*models.Torrent, error) { + rows, err := s.Query(sqlGetTorrent, ih) + defer rows.Close() + if err != nil { + return nil, err + } + torrents, err := s.fetchTorrents(rows) + if err != nil { + return nil, err + } + return torrents[0], nil +} + +// TorrentsByName implements torrentStore +func (s *Store) TorrentsByName(query string, offset int) ([]*models.Torrent, error) { + rows, err := s.Query(sqlSearchTorrents, fmt.Sprintf("%%%s%%", query), offset) + defer rows.Close() + if err != nil { + return nil, err + } + torrents, err := s.fetchTorrents(rows) + if err != nil { + return nil, err + } + return torrents, nil +} + +// TorrentsByTag implements torrentStore +func (s *Store) TorrentsByTag(tag string, offset int) ([]*models.Torrent, error) { + rows, err := s.Query(sqlTorrentsByTag, tag, offset) + defer rows.Close() + if err != nil { + return nil, err + } + torrents, err := s.fetchTorrents(rows) + if err != nil { + return nil, err + } + return torrents, nil +} + +// SaveTag implements tagStore interface +func (s *Store) SaveTag(tag string) (tagID int, err error) { + err = s.QueryRow(sqlInsertTag, tag).Scan(&tagID) + return tagID, err +} + +func (s *Store) fetchTorrents(rows *pgx.Rows) (torrents []*models.Torrent, err error) { + for rows.Next() { + var t models.Torrent + /* + t := &models.Torrent{ + Files: []models.File{}, + Tags: []string{}, + } + */ + err = rows.Scan( + &t.ID, &t.Infohash, &t.Name, &t.Size, &t.Created, &t.Updated, + ) + if err != nil { + return nil, err + } + + err = func() error { + rowsf, err := s.Query(sqlSelectFiles, t.ID) + defer rowsf.Close() + if err != nil { + return fmt.Errorf("failed to select files: %s", err) + } + for rowsf.Next() { + var f models.File + err = rowsf.Scan(&f.ID, &f.TorrentID, &f.Path, &f.Size) + if err != nil { + return fmt.Errorf("failed to build file: %s", err) + } + } + return nil + }() + if err != nil { + return nil, err + } + + err = func() error { + rowst, err := s.Query(sqlSelectTags, t.ID) + defer rowst.Close() + if err != nil { + return fmt.Errorf("failed to select tags: %s", err) + } + for rowst.Next() { + var tg string + err = rowst.Scan(&tg) + if err != nil { + return fmt.Errorf("failed to build tag: %s", err) + } + t.Tags = append(t.Tags, tg) + } + return nil + }() + if err != nil { + return nil, err + } + torrents = append(torrents, &t) + } + return torrents, err +} + +const ( + sqlGetTorrent = `select * + from torrents + where infohash = $1 limit 1` + + sqlInsertTorrent = `insert into torrents ( + name, infohash, size, created, updated + ) values ( + $1, $2, $3, now(), now() + ) on conflict (infohash) do + update set + name = $1, + size = $3, + updated = now() + returning id` + + // peer.address + // torrent.infohash + sqlInsertPeer = `with save_peer as ( + insert into peers + (address, created, updated) values ($1, now(), now()) + returning id + ), save_torrent as ( + insert into torrents (infohash, created, updated) + values ($2, now(), now()) + on conflict (infohash) do update set + updated = now() + returning id + ) insert into peers_torrents + (peer_id, torrent_id) + select + sp.id, st.id + from save_peer sp, save_torrent st + on conflict do nothing` + + sqlRemovePeer = `delete from peers + where address = $1` + + sqlUpdateFTSVectors = `update torrents set + tsv = sub.tsv from ( + select t.id, + setweight(to_tsvector( + translate(t.name, '._-', ' ') + ), 'A') + || setweight(to_tsvector( + translate(string_agg(coalesce(f.path, ''), ' '), './_-', ' ') + ), 'B') as tsv + from torrents t + left join files f on t.id = f.torrent_id + where t.id = $1 + group by t.id + ) as sub + where sub.id = torrents.id` + + sqlSelectPendingInfohashes = `with get_pending as ( + select p.id + from peers p + join peers_torrents pt on p.id = pt.peer_id + join torrents t on pt.torrent_id = t.id + where t.name is null + order by p.updated asc + limit $1 for update + ), stamp_peer as ( + update peers set updated = now() + where id in (select id from get_pending) + ) select + p.address, t.infohash + from peers p + join peers_torrents pt on p.id = pt.peer_id + join torrents t on pt.torrent_id = t.id + where p.id in (select id from get_pending)` + + sqlSearchTorrents = `select + t.id, t.infohash, t.name, t.size, t.updated + from torrents t + where t.tsv @@ plainto_tsquery($1) + order by ts_rank(tsv, plainto_tsquery($1)) desc, t.updated desc + limit 50 offset $2` + + sqlTorrentsByTag = `select + t.id, t.infohash, t.name, t.size, t.created, t.updated + from torrents t + inner join tags_torrents tt on t.id = tt.torrent_id + inner join tags ta on tt.tag_id = ta.id + where ta.name = $1 group by t.id + order by updated asc + limit 50 offset $2` + + sqlSelectFiles = `select * from files + where torrent_id = $1 + order by path asc` + + sqlInsertFile = `insert into files + (torrent_id, path, size) + values + ($1, $2, $3)` + + sqlSelectTags = `select name + from tags t + inner join tags_torrents tt on t.id = tt.tag_id + where tt.torrent_id = $1` + + sqlInsertTagTorrent = `insert into tags_torrents + (tag_id, torrent_id) values ($1, $2) + on conflict do nothing` + + sqlInsertTag = `insert into tags (name) values ($1) + on conflict (name) do update set name = excluded.name returning id` + + sqlCheckSchema = `select exists ( + select 1 from pg_tables + where schemaname = 'public' + and tablename = 'settings' + )` + + sqlSchema = `create table if not exists torrents ( + id serial primary key, + infohash bytea unique, + size bigint, + name text, + created timestamp with time zone, + updated timestamp with time zone, + tsv tsvector + ); + create index tsv_idx on torrents using gin(tsv); + create table if not exists files ( + id serial not null primary key, + torrent_id integer not null references torrents on delete cascade, + path text, + size bigint + ); + create table if not exists tags ( + id serial primary key, + name character varying(50) unique + ); + create table if not exists tags_torrents ( + tag_id integer not null references tags (id) on delete cascade, + torrent_id integer not null references torrents (id) on delete cascade, + primary key (tag_id, torrent_id) + ); + create table if not exists peers ( + id serial primary key, + address character varying(50), + created timestamp with time zone, + updated timestamp with time zone + ); + create table if not exists peers_torrents ( + peer_id integer not null references peers (id) on delete cascade, + torrent_id integer not null references torrents (id) on delete cascade, + primary key (peer_id, torrent_id) + ); + create table if not exists settings ( + schema_version integer not null + ); + insert into settings (schema_version) values (1);` +) @@ -63,9 +63,6 @@ func main() { } } - // Filter torrents - filteredPeers := make(chan peer) - // Create BT node bt := &btClient{} bt.log = log.Named("bt") @@ -89,35 +86,9 @@ func main() { } // Simple cache of most recent - cache := make(map[string]bool) var p peer var t Torrent - // Filter peers - go func() { - for { - select { - case p = <-peers: - if ok := cache[p.id]; ok { - peersSkipped.Add(1) - continue - } - peersAnnounced.Add(1) - if len(cache) > Config.Advanced.PeerCacheSize { - fmt.Printf("Flushing peer cache\n") - cache = make(map[string]bool) - } - cache[p.id] = true - if torrentExists(p.id) { - peersSkipped.Add(1) - continue - } - filteredPeers <- p - dhtCachedPeers.Set(int64(len(cache))) - } - } - }() - for { select { case t = <-torrents: @@ -148,7 +119,7 @@ func main() { continue } if !Config.Quiet { - fmt.Printf("Torrrent added, length: %d, name: %q, tags: %s, url: magnet:?xt=urn:btih:%s\n", length, t.Name, t.Tags, t.InfoHash) + fmt.Printf("Torrrent added, length: %d, name: %q, tags: %s, url: magnet:?xt=urn:btih:%s\n", length, t.Name, t.Tags, t.Infohash) } torrentsSaved.Add(1) torrentsTotal.Add(1) diff --git a/models/infohash.go b/models/infohash.go index e8db422..bb1bd81 100644 --- a/models/infohash.go +++ b/models/infohash.go @@ -37,6 +37,11 @@ func InfohashFromString(s string) (*Infohash, error) { func (ih Infohash) String() string { return hex.EncodeToString(ih) } + +func (ih Infohash) Bytes() []byte { + return []byte(ih) +} + func (ih Infohash) Valid() bool { // TODO return len(ih) == 20 diff --git a/models/infohash_test.go b/models/infohash_test.go index e855e35..c1d9cca 100644 --- a/models/infohash_test.go +++ b/models/infohash_test.go @@ -30,6 +30,15 @@ func TestInfohashImport(t *testing.T) { if !ih.Equal(ih2) { t.Errorf("expected %s to equal %s", ih, ih2) } + if ih.String() != tt.str { + t.Errorf("expected ih.String() to equal %s, got %s", tt.str, ih.String()) + } + byt := ih.Bytes() + for i := range byt { + if byt[i] != []byte(tt.str)[i] { + t.Errorf("expected ih.Bytes() to equal %s, got %s", []byte(tt.str), ih.Bytes()) + } + } } else { if err == nil { t.Errorf("FromString should have failed for %s", tt.str) diff --git a/models/peer.go b/models/peer.go index 2feee03..220d979 100644 --- a/models/peer.go +++ b/models/peer.go @@ -3,12 +3,15 @@ package models import ( "fmt" "net" + "time" ) // Peer on DHT network type Peer struct { - Addr net.Addr - Infohash Infohash + Addr net.Addr `db:"address"` + Infohash Infohash `db:"infohash"` + Updated time.Time `json:"updated"` + Created time.Time `json:"created"` } // String implements fmt.Stringer diff --git a/models/storage.go b/models/storage.go index 84ac6aa..c8a8344 100644 --- a/models/storage.go +++ b/models/storage.go @@ -1,15 +1,27 @@ package models +import () + +type migratable interface { + MigrateSchema() error +} + type torrentSearcher interface { - torrentsByHash(hashes Infohash, offset, limit int) (*Torrent, error) - torrentsByName(query string, offset, limit int) ([]*Torrent, error) - torrentsByTags(tags []string, offset, limit int) ([]*Torrent, error) + TorrentsByHash(hash Infohash) (*Torrent, error) + TorrentsByName(query string, offset, limit int) ([]*Torrent, error) + TorrentsByTags(tags []string, offset, limit int) ([]*Torrent, error) +} + +type PeerStore interface { + SavePeer(*Peer) error } -type peerStore interface { - savePeer(*Peer) error +type TorrentStore interface { + SaveTorrent(*Torrent) error + // TODO + RemovePeer(*Peer) error } -type torrentStore interface { - saveTorrent(*Torrent) error +type InfohashStore interface { + PendingInfohashes(int) ([]*Peer, error) } diff --git a/models/torrent.go b/models/torrent.go index 4ff3143..6cae23c 100644 --- a/models/torrent.go +++ b/models/torrent.go @@ -3,7 +3,6 @@ package models import ( "bytes" "crypto/sha1" - "encoding/hex" "fmt" "os" "strings" @@ -16,11 +15,12 @@ import ( // Data for persistent storage type Torrent struct { ID int `json:"-"` - Infohash string `json:"infohash"` + Infohash Infohash `json:"infohash"` Name string `json:"name"` Files []File `json:"files" db:"-"` Size int `json:"size"` - Seen time.Time `json:"seen"` + Updated time.Time `json:"updated"` + Created time.Time `json:"created"` Tags []string `json:"tags" db:"-"` } @@ -52,7 +52,7 @@ func TorrentFromMetadata(ih Infohash, md []byte) (*Torrent, error) { } bt := Torrent{ - Infohash: hex.EncodeToString([]byte(ih)), + Infohash: ih, Name: name, } |
