aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Makefile10
-rw-r--r--bt/options.go10
-rw-r--r--bt/worker.go26
-rw-r--r--cmd/main.go165
-rw-r--r--cmd/tag.go89
-rw-r--r--db/pgsql.go380
-rw-r--r--main.go31
-rw-r--r--models/infohash.go5
-rw-r--r--models/infohash_test.go9
-rw-r--r--models/peer.go7
-rw-r--r--models/storage.go26
-rw-r--r--models/torrent.go8
12 files changed, 674 insertions, 92 deletions
diff --git a/Makefile b/Makefile
index 2ddb5d0..0fd62a5 100644
--- a/Makefile
+++ b/Makefile
@@ -8,14 +8,16 @@ BINARIES = $(patsubst %,$(CMD)-%-v$(VERSION), $(TARGETS))
.DEFAULT_GOAL := help
-release: check-env $(BINARIES) ## Build all binaries
+release: $(BINARIES) ## Build all binaries
-build: check-env ## Build binary for current platform
+build: $(CMD) ## Build binary for current platform
+
+$(CMD): check-env $(SRC)
cd cmd && go build -o ../$(CMD) $(LDFLAGS)
standalone : TAGS = sqlite
-$(BINARIES): $(SRC)
+$(BINARIES): check-env $(SRC)
cd cmd && env GOOS=`echo $@ |cut -d'-' -f2` GOARCH=`echo $@ |cut -d'-' -f3 |cut -d'.' -f1` go build -o ../$@ $(LDFLAGS)
test: ## Run tests and create coverage report
@@ -38,4 +40,4 @@ endif
help:
@grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) |sort |awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-30s\033[0m %s\n", $$1, $$2}'
-.PHONY: help install build test lint clean
+.PHONY: help install test lint clean
diff --git a/bt/options.go b/bt/options.go
index 25ebc21..bb17505 100644
--- a/bt/options.go
+++ b/bt/options.go
@@ -7,7 +7,7 @@ import (
type Option func(*Worker) error
-// SetNewTorrent sets the callback
+// SetOnNewTorrent sets the callback
func SetOnNewTorrent(f func(models.Torrent)) Option {
return func(w *Worker) error {
w.OnNewTorrent = f
@@ -15,6 +15,14 @@ func SetOnNewTorrent(f func(models.Torrent)) Option {
}
}
+// SetOnBadPeer sets the callback
+func SetOnBadPeer(f func(models.Peer)) Option {
+ return func(w *Worker) error {
+ w.OnBadPeer = f
+ return nil
+ }
+}
+
// SetPort sets the port to listen on
func SetPort(p int) Option {
return func(w *Worker) error {
diff --git a/bt/worker.go b/bt/worker.go
index 6d247f4..95669a4 100644
--- a/bt/worker.go
+++ b/bt/worker.go
@@ -52,6 +52,7 @@ type Worker struct {
family string
tcpTimeout int
OnNewTorrent func(t models.Torrent)
+ OnBadPeer func(p models.Peer)
log logger.Logger
}
@@ -84,16 +85,19 @@ func (bt *Worker) Run() error {
bt.log.Debug("worker got work", "peer", p)
md, err := bt.fetchMetadata(p)
if err != nil {
- bt.log.Debug("failed to fetch metadata", "error", err)
+ //bt.log.Warn("failed to fetch metadata", "error", err)
+ if bt.OnBadPeer != nil {
+ bt.OnBadPeer(p)
+ }
continue
}
t, err := models.TorrentFromMetadata(p.Infohash, md)
if err != nil {
- bt.log.Debug("failed to load torrent", "error", err)
+ bt.log.Warn("failed to load torrent", "error", err)
continue
}
if bt.OnNewTorrent != nil {
- go bt.OnNewTorrent(*t)
+ bt.OnNewTorrent(*t)
}
}
}
@@ -115,9 +119,9 @@ func (bt *Worker) fetchMetadata(p models.Peer) (out []byte, err error) {
recover()
}()
- ll := bt.log.WithFields("address", p.Addr.String())
+ //ll := bt.log.WithFields("address", p.Addr.String())
- ll.Debug("connecting")
+ //ll.Debug("connecting")
dial, err := net.DialTimeout("tcp", p.Addr.String(), time.Second*15)
if err != nil {
return out, err
@@ -126,7 +130,7 @@ func (bt *Worker) fetchMetadata(p models.Peer) (out []byte, err error) {
conn := dial.(*net.TCPConn)
conn.SetLinger(0)
defer conn.Close()
- ll.Debug("dialed")
+ //ll.Debug("dialed")
data := bytes.NewBuffer(nil)
data.Grow(BlockSize)
@@ -134,26 +138,26 @@ func (bt *Worker) fetchMetadata(p models.Peer) (out []byte, err error) {
ih := models.GenInfohash()
// TCP handshake
- ll.Debug("sending handshake")
+ //ll.Debug("sending handshake")
_, err = sendHandshake(conn, p.Infohash, ih)
if err != nil {
return nil, err
}
// Handle the handshake response
- ll.Debug("handling handshake response")
+ //ll.Debug("handling handshake response")
err = read(conn, 68, data)
if err != nil {
return nil, err
}
next := data.Next(68)
- ll.Debug("got next data")
+ //ll.Debug("got next data")
if !(bytes.Equal(handshakePrefix[:20], next[:20]) && next[25]&0x10 != 0) {
- ll.Debug("next data does not match", "next", next)
+ //ll.Debug("next data does not match", "next", next)
return nil, errors.New("invalid handshake response")
}
- ll.Debug("sending ext handshake")
+ //ll.Debug("sending ext handshake")
_, err = sendExtHandshake(conn)
if err != nil {
return nil, err
diff --git a/cmd/main.go b/cmd/main.go
index 80ef9eb..3019ddf 100644
--- a/cmd/main.go
+++ b/cmd/main.go
@@ -4,9 +4,13 @@ import (
"flag"
"fmt"
"os"
+ "regexp"
+ "strings"
"time"
+ "unicode"
"github.com/felix/dhtsearch/bt"
+ "github.com/felix/dhtsearch/db"
"github.com/felix/dhtsearch/dht"
"github.com/felix/dhtsearch/models"
"github.com/felix/logger"
@@ -31,6 +35,15 @@ var (
pool chan chan models.Peer
torrents chan models.Torrent
btNodes int
+ tagREs map[string]*regexp.Regexp
+ skipTags string
+)
+
+// Store vars
+var (
+ dsn string
+ ihBlacklist map[string]bool
+ peerBlacklist map[string]bool
)
func main() {
@@ -40,6 +53,9 @@ func main() {
flag.IntVar(&dhtNodes, "dht-nodes", 1, "number of DHT nodes to start")
flag.IntVar(&btNodes, "bt-nodes", 3, "number of BT nodes to start")
+ flag.StringVar(&skipTags, "skip-tags", "xxx", "tags of torrents to skip")
+
+ flag.StringVar(&dsn, "dsn", "postgres://dhtsearch@localhost/dhtsearch", "database DSN")
flag.BoolVar(&showVersion, "v", false, "show version")
@@ -62,9 +78,25 @@ func main() {
log.Info("version", version)
log.Debug("debugging")
- go startDHTNodes(saveInfohash)
+ store, err := db.NewStore(dsn)
+ defer store.Close()
+ if err != nil {
+ log.Error("failed to connect store", "error", err)
+ os.Exit(1)
+ }
+
+ createTagRegexps()
+
+ ihBlacklist = make(map[string]bool)
+ peerBlacklist = make(map[string]bool)
+ // TODO read in existing blacklist
+ // TODO populate bloom filter
+
+ go startDHTNodes(store)
- go startBTWorkers(saveTorrent)
+ go startBTWorkers(store)
+
+ go processPendingPeers(store)
for {
select {
@@ -74,58 +106,97 @@ func main() {
}
}
-func startDHTNodes(f func(p models.Peer)) (nodes []*dht.Node, err error) {
- nodes = make([]*dht.Node, dhtNodes)
+func startDHTNodes(s models.PeerStore) {
+ log.Debug("starting dht nodes")
+ nodes := make([]*dht.Node, dhtNodes)
for i := 0; i < dhtNodes; i++ {
dht, err := dht.NewNode(
dht.SetLogger(log.Named("dht")),
dht.SetPort(port+i),
- dht.SetOnAnnouncePeer(f),
dht.SetIPv6(ipv6),
+ dht.SetOnAnnouncePeer(func(p models.Peer) {
+ if black := ihBlacklist[p.Infohash.String()]; black {
+ log.Debug("ignoring blacklisted infohash", "peer", p)
+ return
+ }
+ if black := peerBlacklist[p.Addr.String()]; black {
+ log.Debug("ignoring blacklisted peer", "peer", p)
+ return
+ }
+ log.Debug("peer announce", "peer", p)
+ err := s.SavePeer(&p)
+ if err != nil {
+ log.Error("failed to save peer", "error", err)
+ }
+ }),
)
if err != nil {
log.Error("failed to create node", "error", err)
- return nodes, err
+ //return nodes, err
}
go dht.Run()
nodes[i] = dht
}
- return nodes, err
+ //return nodes, err
}
-func saveInfohash(p models.Peer) {
- //log.Info("announce", "peer", p)
- // Blocks
- w := <-pool
- w <- p
- return
+func processPendingPeers(s models.InfohashStore) {
+ log.Debug("processing pending peers")
+ for {
+ peers, err := s.PendingInfohashes(1)
+ if err != nil {
+ log.Debug("failed to get pending peer", "error", err)
+ time.Sleep(time.Second * 1)
+ continue
+ }
+ for _, p := range peers {
+ //log.Debug("pending peer retrieved", "peer", *p)
+ select {
+ case w := <-pool:
+ //log.Debug("assigning peer to bt worker")
+ w <- *p
+ }
+ }
+ }
}
-func saveTorrent(t models.Torrent) {
- fmt.Printf("Torrent added, size: %d, name: %q, tags: %s, url: magnet:?xt=urn:btih:%s\n", t.Size, t.Name, t.Tags, t.Infohash)
- // Add tags
- //tagTorrent(&t)
+func startBTWorkers(s models.TorrentStore) {
+ log.Debug("starting bittorrent workers")
+ pool = make(chan chan models.Peer)
+ torrents = make(chan models.Torrent)
- /*
- // Not sure if I like continue labels, so this
- var discard = false
- for _, tag := range Config.SkipTags {
- if hasTag(t, tag) {
- fmt.Printf("Skipping torrent tagged '%s': %q\n", tag, t.Name)
- discard = true
- break
+ onNewTorrent := func(t models.Torrent) {
+ // Add tags
+ tags := tagTorrent(t, tagREs)
+ for _, skipTag := range strings.Split(skipTags, ",") {
+ for _, tg := range tags {
+ if skipTag == tg {
+ log.Debug("skipping torrent", "infohash", t.Infohash, "tags", tags)
+ ihBlacklist[t.Infohash.String()] = true
+ return
+ }
}
}
- if discard {
- continue
+ t.Tags = tags
+ log.Debug("torrent tagged", "infohash", t.Infohash, "tags", tags)
+ err := s.SaveTorrent(&t)
+ if err != nil {
+ log.Error("failed to save torrent", "error", err)
}
- */
-}
+ log.Info("torrent added", "name", t.Name, "size", t.Size, "tags", t.Tags)
+ }
-func startBTWorkers(f func(t models.Torrent)) {
- pool = make(chan chan models.Peer)
- torrents = make(chan models.Torrent)
+ /*
+ onBadPeer := func(p models.Peer) {
+ log.Debug("removing peer", "peer", p)
+ err := s.RemovePeer(&p)
+ if err != nil {
+ log.Error("failed to remove peer", "peer", p, "error", err)
+ }
+ peerBlacklist[p.Addr.String()] = true
+ }
+ */
for i := 0; i < btNodes; i++ {
w, err := bt.NewWorker(
@@ -133,12 +204,40 @@ func startBTWorkers(f func(t models.Torrent)) {
bt.SetLogger(log.Named("bt")),
bt.SetPort(port+i),
bt.SetIPv6(ipv6),
- bt.SetOnNewTorrent(f),
+ bt.SetOnNewTorrent(onNewTorrent),
+ //bt.SetOnBadPeer(onBadPeer),
)
if err != nil {
log.Error("failed to create bt worker", "error", err)
return
}
+ log.Debug("running bt node", "index", i)
go w.Run()
}
}
+
+// Filter on words, existing
+func createTagRegexps() {
+ tagREs = make(map[string]*regexp.Regexp)
+ for tag, re := range tags {
+ tagREs[tag] = regexp.MustCompile("(?i)" + re)
+ }
+ // Add character classes
+ for cc, _ := range unicode.Scripts {
+ if cc == "Latin" || cc == "Common" {
+ continue
+ }
+ className := strings.ToLower(cc)
+ // Test for 3 or more characters per character class
+ tagREs[className] = regexp.MustCompile(fmt.Sprintf(`(?i)\p{%s}{3,}`, cc))
+ }
+ // Merge user tags
+ /*
+ for tag, re := range Config.Tags {
+ if !Config.Quiet {
+ fmt.Printf("Adding user tag: %s = %s\n", tag, re)
+ }
+ tagREs[tag] = regexp.MustCompile("(?i)" + re)
+ }
+ */
+}
diff --git a/cmd/tag.go b/cmd/tag.go
new file mode 100644
index 0000000..ba2799f
--- /dev/null
+++ b/cmd/tag.go
@@ -0,0 +1,89 @@
+package main
+
+import (
+ "fmt"
+ "regexp"
+ "strings"
+ "unicode"
+
+ "github.com/felix/dhtsearch/models"
+)
+
+// Default tags, can be supplimented or overwritten by config
+var tags = map[string]string{
+ "flac": `\.flac$`,
+ "episode": "(season|episode|s[0-9]{2}e[0-9]{2})",
+ "1080": "1080",
+ "720": "720",
+ "hd": "hd|720|1080",
+ "bdrip": "bdrip",
+ "xxx": `(xxx|p(orn|ussy)|censor|sex|urbat|a(ss|nal)|o(rgy|gasm)|(fu|di|co)ck|esbian|milf|lust|gay)|rotic|18(\+|yr)|hore|hemale|virgin`,
+ "dvdrip": "dvdrip",
+ "ebook": "epub",
+ "application": `\.(apk|exe|msi|dmg)$`,
+ "android": `\.apk$`,
+ "apple": `\.dmg$`,
+ "subtitles": `\.s(rt|ub)$`,
+ "archive": `\.(zip|rar|p7|tgz|bz2|iso)$`,
+ "video": `\.(3g2|3gp|amv|asf|avi|drc|f4a|f4b|f4p|f4v|flv|gif|gifv|m2v|m4p|m4v|mkv|mng|mov|mp2|mp4|mpe|mpeg|mpg|mpv|mxf|net|nsv|ogv|qt|rm|rmvb|roq|svi|vob|webm|wmv|yuv)$`,
+ "audio": `\.(aa|aac|aax|act|aiff|amr|ape|au|awb|dct|dss|dvf|flac|gsm|iklax|ivs|m4a|m4b|mmf|mp3|mpc|msv|ogg|opus|ra|raw|sln|tta|vox|wav|wma|wv)$`,
+ "document": `\.(cbr|cbz|cb7|cbt|cba|epub|djvu|fb2|ibook|azw.|lit|prc|mobi|pdb|pdb|oxps|xps)$`,
+ "font": `(font|\.(ttf|fon)$)`,
+}
+
+func mergeCharacterTagREs(tagREs map[string]*regexp.Regexp) error {
+ var err error
+ // Add character classes
+ for cc := range unicode.Scripts {
+ if cc == "Latin" || cc == "Common" {
+ continue
+ }
+ className := strings.ToLower(cc)
+ // Test for 3 or more characters per character class
+ tagREs[className], err = regexp.Compile(fmt.Sprintf(`(?i)\p{%s}{3,}`, cc))
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func mergeTagRegexps(tagREs map[string]*regexp.Regexp, tags map[string]string) error {
+ var err error
+ for tag, re := range tags {
+ tagREs[tag], err = regexp.Compile("(?i)" + re)
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func tagTorrent(t models.Torrent, tagREs map[string]*regexp.Regexp) (tags []string) {
+ ttags := make(map[string]bool)
+
+ for tag, re := range tagREs {
+ if re.MatchString(t.Name) {
+ ttags[tag] = true
+ }
+ for _, f := range t.Files {
+ if re.MatchString(f.Path) {
+ ttags[tag] = true
+ }
+ }
+ }
+ // Make unique
+ for tt := range ttags {
+ tags = append(tags, tt)
+ }
+ return tags
+}
+
+func hasTag(t models.Torrent, tag string) bool {
+ for _, t := range t.Tags {
+ if tag == t {
+ return true
+ }
+ }
+ return false
+}
diff --git a/db/pgsql.go b/db/pgsql.go
new file mode 100644
index 0000000..990605c
--- /dev/null
+++ b/db/pgsql.go
@@ -0,0 +1,380 @@
+package db
+
+import (
+ "fmt"
+ "net"
+
+ "github.com/felix/dhtsearch/models"
+ "github.com/jackc/pgx"
+ "github.com/jackc/pgx/pgtype"
+)
+
+// Store is a store
+type Store struct {
+ *pgx.ConnPool
+}
+
+// NewStore connects and initializes a new store
+func NewStore(dsn string) (*Store, error) {
+ cfg, err := pgx.ParseURI(dsn)
+ if err != nil {
+ return nil, err
+ }
+ c, err := pgx.NewConnPool(pgx.ConnPoolConfig{ConnConfig: cfg, MaxConnections: 10})
+ if err != nil {
+ return nil, err
+ }
+
+ s := &Store{c}
+
+ // TODO
+ var upToDate bool
+ err = s.QueryRow(sqlCheckSchema).Scan(&upToDate)
+ if err != nil || !upToDate {
+ _, err = s.Exec(sqlSchema)
+ }
+ return s, err
+}
+
+// PendingInfohashes gets the next pending infohash from the store
+func (s *Store) PendingInfohashes(n int) (peers []*models.Peer, err error) {
+ rows, err := s.Query(sqlSelectPendingInfohashes, n)
+ defer rows.Close()
+ if err != nil {
+ return nil, err
+ }
+ for rows.Next() {
+ var p models.Peer
+ var ih pgtype.Bytea
+ var addr string
+ err = rows.Scan(&addr, &ih)
+ if err != nil {
+ return nil, err
+ }
+ // TODO save peer network?
+ p.Addr, err = net.ResolveUDPAddr("udp", addr)
+ if err != nil {
+ return nil, err
+ }
+ ih.AssignTo(&p.Infohash)
+ peers = append(peers, &p)
+ }
+ return peers, nil
+}
+
+// SaveTorrent implements torrentStore
+func (s *Store) SaveTorrent(t *models.Torrent) error {
+ tx, err := s.Begin()
+ if err != nil {
+ return err
+ }
+ defer tx.Rollback()
+
+ var torrentID int
+ err = tx.QueryRow(sqlInsertTorrent, t.Name, t.Infohash, t.Size).Scan(&torrentID)
+ if err != nil {
+ return fmt.Errorf("insertTorrent: %s", err)
+ }
+
+ // Write tags
+ for _, tag := range t.Tags {
+ tagID, err := s.SaveTag(tag)
+ if err != nil {
+ return fmt.Errorf("saveTag: %s", err)
+ }
+ _, err = tx.Exec(sqlInsertTagTorrent, tagID, torrentID)
+ if err != nil {
+ return fmt.Errorf("insertTagTorrent: %s", err)
+ }
+ }
+
+ // Write files
+ for _, f := range t.Files {
+ _, err := tx.Exec(sqlInsertFile, torrentID, f.Path, f.Size)
+ if err != nil {
+ return fmt.Errorf("insertFile: %s", err)
+ }
+ }
+
+ // Should this be outside the transaction?
+ _, err = tx.Exec(sqlUpdateFTSVectors, torrentID)
+ if err != nil {
+ return fmt.Errorf("updateVectors: %s", err)
+ }
+ return tx.Commit()
+}
+
+// SavePeer implements torrentStore
+func (s *Store) SavePeer(p *models.Peer) (err error) {
+ _, err = s.Exec(sqlInsertPeer, p.Addr.String(), p.Infohash.Bytes())
+ return err
+}
+
+func (s *Store) RemovePeer(p *models.Peer) (err error) {
+ _, err = s.Exec(sqlRemovePeer, p.Addr.String())
+ return err
+}
+
+// TorrentsByHash implements torrentStore
+func (s *Store) TorrentByHash(ih models.Infohash) (*models.Torrent, error) {
+ rows, err := s.Query(sqlGetTorrent, ih)
+ defer rows.Close()
+ if err != nil {
+ return nil, err
+ }
+ torrents, err := s.fetchTorrents(rows)
+ if err != nil {
+ return nil, err
+ }
+ return torrents[0], nil
+}
+
+// TorrentsByName implements torrentStore
+func (s *Store) TorrentsByName(query string, offset int) ([]*models.Torrent, error) {
+ rows, err := s.Query(sqlSearchTorrents, fmt.Sprintf("%%%s%%", query), offset)
+ defer rows.Close()
+ if err != nil {
+ return nil, err
+ }
+ torrents, err := s.fetchTorrents(rows)
+ if err != nil {
+ return nil, err
+ }
+ return torrents, nil
+}
+
+// TorrentsByTag implements torrentStore
+func (s *Store) TorrentsByTag(tag string, offset int) ([]*models.Torrent, error) {
+ rows, err := s.Query(sqlTorrentsByTag, tag, offset)
+ defer rows.Close()
+ if err != nil {
+ return nil, err
+ }
+ torrents, err := s.fetchTorrents(rows)
+ if err != nil {
+ return nil, err
+ }
+ return torrents, nil
+}
+
+// SaveTag implements tagStore interface
+func (s *Store) SaveTag(tag string) (tagID int, err error) {
+ err = s.QueryRow(sqlInsertTag, tag).Scan(&tagID)
+ return tagID, err
+}
+
+func (s *Store) fetchTorrents(rows *pgx.Rows) (torrents []*models.Torrent, err error) {
+ for rows.Next() {
+ var t models.Torrent
+ /*
+ t := &models.Torrent{
+ Files: []models.File{},
+ Tags: []string{},
+ }
+ */
+ err = rows.Scan(
+ &t.ID, &t.Infohash, &t.Name, &t.Size, &t.Created, &t.Updated,
+ )
+ if err != nil {
+ return nil, err
+ }
+
+ err = func() error {
+ rowsf, err := s.Query(sqlSelectFiles, t.ID)
+ defer rowsf.Close()
+ if err != nil {
+ return fmt.Errorf("failed to select files: %s", err)
+ }
+ for rowsf.Next() {
+ var f models.File
+ err = rowsf.Scan(&f.ID, &f.TorrentID, &f.Path, &f.Size)
+ if err != nil {
+ return fmt.Errorf("failed to build file: %s", err)
+ }
+ }
+ return nil
+ }()
+ if err != nil {
+ return nil, err
+ }
+
+ err = func() error {
+ rowst, err := s.Query(sqlSelectTags, t.ID)
+ defer rowst.Close()
+ if err != nil {
+ return fmt.Errorf("failed to select tags: %s", err)
+ }
+ for rowst.Next() {
+ var tg string
+ err = rowst.Scan(&tg)
+ if err != nil {
+ return fmt.Errorf("failed to build tag: %s", err)
+ }
+ t.Tags = append(t.Tags, tg)
+ }
+ return nil
+ }()
+ if err != nil {
+ return nil, err
+ }
+ torrents = append(torrents, &t)
+ }
+ return torrents, err
+}
+
+const (
+ sqlGetTorrent = `select *
+ from torrents
+ where infohash = $1 limit 1`
+
+ sqlInsertTorrent = `insert into torrents (
+ name, infohash, size, created, updated
+ ) values (
+ $1, $2, $3, now(), now()
+ ) on conflict (infohash) do
+ update set
+ name = $1,
+ size = $3,
+ updated = now()
+ returning id`
+
+ // peer.address
+ // torrent.infohash
+ sqlInsertPeer = `with save_peer as (
+ insert into peers
+ (address, created, updated) values ($1, now(), now())
+ returning id
+ ), save_torrent as (
+ insert into torrents (infohash, created, updated)
+ values ($2, now(), now())
+ on conflict (infohash) do update set
+ updated = now()
+ returning id
+ ) insert into peers_torrents
+ (peer_id, torrent_id)
+ select
+ sp.id, st.id
+ from save_peer sp, save_torrent st
+ on conflict do nothing`
+
+ sqlRemovePeer = `delete from peers
+ where address = $1`
+
+ sqlUpdateFTSVectors = `update torrents set
+ tsv = sub.tsv from (
+ select t.id,
+ setweight(to_tsvector(
+ translate(t.name, '._-', ' ')
+ ), 'A')
+ || setweight(to_tsvector(
+ translate(string_agg(coalesce(f.path, ''), ' '), './_-', ' ')
+ ), 'B') as tsv
+ from torrents t
+ left join files f on t.id = f.torrent_id
+ where t.id = $1
+ group by t.id
+ ) as sub
+ where sub.id = torrents.id`
+
+ sqlSelectPendingInfohashes = `with get_pending as (
+ select p.id
+ from peers p
+ join peers_torrents pt on p.id = pt.peer_id
+ join torrents t on pt.torrent_id = t.id
+ where t.name is null
+ order by p.updated asc
+ limit $1 for update
+ ), stamp_peer as (
+ update peers set updated = now()
+ where id in (select id from get_pending)
+ ) select
+ p.address, t.infohash
+ from peers p
+ join peers_torrents pt on p.id = pt.peer_id
+ join torrents t on pt.torrent_id = t.id
+ where p.id in (select id from get_pending)`
+
+ sqlSearchTorrents = `select
+ t.id, t.infohash, t.name, t.size, t.updated
+ from torrents t
+ where t.tsv @@ plainto_tsquery($1)
+ order by ts_rank(tsv, plainto_tsquery($1)) desc, t.updated desc
+ limit 50 offset $2`
+
+ sqlTorrentsByTag = `select
+ t.id, t.infohash, t.name, t.size, t.created, t.updated
+ from torrents t
+ inner join tags_torrents tt on t.id = tt.torrent_id
+ inner join tags ta on tt.tag_id = ta.id
+ where ta.name = $1 group by t.id
+ order by updated asc
+ limit 50 offset $2`
+
+ sqlSelectFiles = `select * from files
+ where torrent_id = $1
+ order by path asc`
+
+ sqlInsertFile = `insert into files
+ (torrent_id, path, size)
+ values
+ ($1, $2, $3)`
+
+ sqlSelectTags = `select name
+ from tags t
+ inner join tags_torrents tt on t.id = tt.tag_id
+ where tt.torrent_id = $1`
+
+ sqlInsertTagTorrent = `insert into tags_torrents
+ (tag_id, torrent_id) values ($1, $2)
+ on conflict do nothing`
+
+ sqlInsertTag = `insert into tags (name) values ($1)
+ on conflict (name) do update set name = excluded.name returning id`
+
+ sqlCheckSchema = `select exists (
+ select 1 from pg_tables
+ where schemaname = 'public'
+ and tablename = 'settings'
+ )`
+
+ sqlSchema = `create table if not exists torrents (
+ id serial primary key,
+ infohash bytea unique,
+ size bigint,
+ name text,
+ created timestamp with time zone,
+ updated timestamp with time zone,
+ tsv tsvector
+ );
+ create index tsv_idx on torrents using gin(tsv);
+ create table if not exists files (
+ id serial not null primary key,
+ torrent_id integer not null references torrents on delete cascade,
+ path text,
+ size bigint
+ );
+ create table if not exists tags (
+ id serial primary key,
+ name character varying(50) unique
+ );
+ create table if not exists tags_torrents (
+ tag_id integer not null references tags (id) on delete cascade,
+ torrent_id integer not null references torrents (id) on delete cascade,
+ primary key (tag_id, torrent_id)
+ );
+ create table if not exists peers (
+ id serial primary key,
+ address character varying(50),
+ created timestamp with time zone,
+ updated timestamp with time zone
+ );
+ create table if not exists peers_torrents (
+ peer_id integer not null references peers (id) on delete cascade,
+ torrent_id integer not null references torrents (id) on delete cascade,
+ primary key (peer_id, torrent_id)
+ );
+ create table if not exists settings (
+ schema_version integer not null
+ );
+ insert into settings (schema_version) values (1);`
+)
diff --git a/main.go b/main.go
index b6c4ea7..69ea5fe 100644
--- a/main.go
+++ b/main.go
@@ -63,9 +63,6 @@ func main() {
}
}
- // Filter torrents
- filteredPeers := make(chan peer)
-
// Create BT node
bt := &btClient{}
bt.log = log.Named("bt")
@@ -89,35 +86,9 @@ func main() {
}
// Simple cache of most recent
- cache := make(map[string]bool)
var p peer
var t Torrent
- // Filter peers
- go func() {
- for {
- select {
- case p = <-peers:
- if ok := cache[p.id]; ok {
- peersSkipped.Add(1)
- continue
- }
- peersAnnounced.Add(1)
- if len(cache) > Config.Advanced.PeerCacheSize {
- fmt.Printf("Flushing peer cache\n")
- cache = make(map[string]bool)
- }
- cache[p.id] = true
- if torrentExists(p.id) {
- peersSkipped.Add(1)
- continue
- }
- filteredPeers <- p
- dhtCachedPeers.Set(int64(len(cache)))
- }
- }
- }()
-
for {
select {
case t = <-torrents:
@@ -148,7 +119,7 @@ func main() {
continue
}
if !Config.Quiet {
- fmt.Printf("Torrrent added, length: %d, name: %q, tags: %s, url: magnet:?xt=urn:btih:%s\n", length, t.Name, t.Tags, t.InfoHash)
+ fmt.Printf("Torrrent added, length: %d, name: %q, tags: %s, url: magnet:?xt=urn:btih:%s\n", length, t.Name, t.Tags, t.Infohash)
}
torrentsSaved.Add(1)
torrentsTotal.Add(1)
diff --git a/models/infohash.go b/models/infohash.go
index e8db422..bb1bd81 100644
--- a/models/infohash.go
+++ b/models/infohash.go
@@ -37,6 +37,11 @@ func InfohashFromString(s string) (*Infohash, error) {
func (ih Infohash) String() string {
return hex.EncodeToString(ih)
}
+
+func (ih Infohash) Bytes() []byte {
+ return []byte(ih)
+}
+
func (ih Infohash) Valid() bool {
// TODO
return len(ih) == 20
diff --git a/models/infohash_test.go b/models/infohash_test.go
index e855e35..c1d9cca 100644
--- a/models/infohash_test.go
+++ b/models/infohash_test.go
@@ -30,6 +30,15 @@ func TestInfohashImport(t *testing.T) {
if !ih.Equal(ih2) {
t.Errorf("expected %s to equal %s", ih, ih2)
}
+ if ih.String() != tt.str {
+ t.Errorf("expected ih.String() to equal %s, got %s", tt.str, ih.String())
+ }
+ byt := ih.Bytes()
+ for i := range byt {
+ if byt[i] != []byte(tt.str)[i] {
+ t.Errorf("expected ih.Bytes() to equal %s, got %s", []byte(tt.str), ih.Bytes())
+ }
+ }
} else {
if err == nil {
t.Errorf("FromString should have failed for %s", tt.str)
diff --git a/models/peer.go b/models/peer.go
index 2feee03..220d979 100644
--- a/models/peer.go
+++ b/models/peer.go
@@ -3,12 +3,15 @@ package models
import (
"fmt"
"net"
+ "time"
)
// Peer on DHT network
type Peer struct {
- Addr net.Addr
- Infohash Infohash
+ Addr net.Addr `db:"address"`
+ Infohash Infohash `db:"infohash"`
+ Updated time.Time `json:"updated"`
+ Created time.Time `json:"created"`
}
// String implements fmt.Stringer
diff --git a/models/storage.go b/models/storage.go
index 84ac6aa..c8a8344 100644
--- a/models/storage.go
+++ b/models/storage.go
@@ -1,15 +1,27 @@
package models
+import ()
+
+type migratable interface {
+ MigrateSchema() error
+}
+
type torrentSearcher interface {
- torrentsByHash(hashes Infohash, offset, limit int) (*Torrent, error)
- torrentsByName(query string, offset, limit int) ([]*Torrent, error)
- torrentsByTags(tags []string, offset, limit int) ([]*Torrent, error)
+ TorrentsByHash(hash Infohash) (*Torrent, error)
+ TorrentsByName(query string, offset, limit int) ([]*Torrent, error)
+ TorrentsByTags(tags []string, offset, limit int) ([]*Torrent, error)
+}
+
+type PeerStore interface {
+ SavePeer(*Peer) error
}
-type peerStore interface {
- savePeer(*Peer) error
+type TorrentStore interface {
+ SaveTorrent(*Torrent) error
+ // TODO
+ RemovePeer(*Peer) error
}
-type torrentStore interface {
- saveTorrent(*Torrent) error
+type InfohashStore interface {
+ PendingInfohashes(int) ([]*Peer, error)
}
diff --git a/models/torrent.go b/models/torrent.go
index 4ff3143..6cae23c 100644
--- a/models/torrent.go
+++ b/models/torrent.go
@@ -3,7 +3,6 @@ package models
import (
"bytes"
"crypto/sha1"
- "encoding/hex"
"fmt"
"os"
"strings"
@@ -16,11 +15,12 @@ import (
// Data for persistent storage
type Torrent struct {
ID int `json:"-"`
- Infohash string `json:"infohash"`
+ Infohash Infohash `json:"infohash"`
Name string `json:"name"`
Files []File `json:"files" db:"-"`
Size int `json:"size"`
- Seen time.Time `json:"seen"`
+ Updated time.Time `json:"updated"`
+ Created time.Time `json:"created"`
Tags []string `json:"tags" db:"-"`
}
@@ -52,7 +52,7 @@ func TorrentFromMetadata(ih Infohash, md []byte) (*Torrent, error) {
}
bt := Torrent{
- Infohash: hex.EncodeToString([]byte(ih)),
+ Infohash: ih,
Name: name,
}