aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFelix Hanley <felix@userspace.com.au>2018-03-23 04:37:46 +0000
committerFelix Hanley <felix@userspace.com.au>2018-03-23 04:37:57 +0000
commit082c7506f302c93d7e2126f611058b536d262e31 (patch)
treeee45b2823e543f262756a5da5535424388d72372
parent2cb7f7a926d844b1fff65ccd386d774ba83bd561 (diff)
downloaddhtsearch-082c7506f302c93d7e2126f611058b536d262e31.tar.gz
dhtsearch-082c7506f302c93d7e2126f611058b536d262e31.tar.bz2
Basic sqlite functionality
-rw-r--r--.gitignore2
-rw-r--r--Makefile28
-rw-r--r--bt/options.go8
-rw-r--r--bt/worker.go8
-rw-r--r--db/sqlite.go519
-rw-r--r--dht/messages.go21
6 files changed, 547 insertions, 39 deletions
diff --git a/.gitignore b/.gitignore
index 8eee1ff..05c7f8d 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,2 +1,2 @@
-dht-search
+dhtsearch*
config.toml
diff --git a/Makefile b/Makefile
index 0fd62a5..1d41c0b 100644
--- a/Makefile
+++ b/Makefile
@@ -3,33 +3,32 @@ TARGETS = freebsd-amd64 linux-386 linux-amd64 linux-arm linux-arm64 darwin-amd64
CMD = dhtsearch
VERSION ?= $(shell git describe --tags --always)
SRC = $(shell find . -type f -name '*.go')
-LDFLAGS = -ldflags="-w -s -X=main.version=$(VERSION)"
+FLAGS = --tags fts5
BINARIES = $(patsubst %,$(CMD)-%-v$(VERSION), $(TARGETS))
.DEFAULT_GOAL := help
-release: $(BINARIES) ## Build all binaries
-
-build: $(CMD) ## Build binary for current platform
-
-$(CMD): check-env $(SRC)
- cd cmd && go build -o ../$(CMD) $(LDFLAGS)
-
-standalone : TAGS = sqlite
+build: sqlite $(BINARIES) ## Build all binaries
$(BINARIES): check-env $(SRC)
- cd cmd && env GOOS=`echo $@ |cut -d'-' -f2` GOARCH=`echo $@ |cut -d'-' -f3 |cut -d'.' -f1` go build -o ../$@ $(LDFLAGS)
+ cd cmd && env GOOS=`echo $@ |cut -d'-' -f2` \
+ GOARCH=`echo $@ |cut -d'-' -f3 |cut -d'.' -f1` \
+ go build -o ../$@ $(FLAGS) -ldflags="-w -s -X=main.version=$(VERSION)"
+
+sqlite:
+ go get -u $(FLAGS) github.com/mattn/go-sqlite3
+ go install $(FLAGS) github.com/mattn/go-sqlite3
test: ## Run tests and create coverage report
go test -short -coverprofile=coverage.out ./...
go tool cover -html=coverage.out -o coverage.html
lint:
- @for file in $$(find . -name 'vendor' -prune -o -type f -name '*.go'); do golint $$file; done
+ @for file in $$(find . -name 'vendor' -prune -o -type f -name '*.go'); do \
+ golint $$file; done
clean: check-env ## Clean up temp files and binaries
- rm -f $(BINARIES)
- rm -f $(CMD)
+ rm -f $(CMD)-*-v*
rm -rf coverage*
check-env:
@@ -38,6 +37,7 @@ ifndef VERSION
endif
help:
- @grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) |sort |awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-30s\033[0m %s\n", $$1, $$2}'
+ @grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) |sort \
+ |awk 'BEGIN{FS=":.*?## "};{printf "\033[36m%-30s\033[0m %s\n",$$1,$$2}'
.PHONY: help install test lint clean
diff --git a/bt/options.go b/bt/options.go
index bb17505..ef015ce 100644
--- a/bt/options.go
+++ b/bt/options.go
@@ -41,14 +41,6 @@ func SetIPv6(b bool) Option {
}
}
-// SetUDPTimeout sets the number of seconds to wait for UDP connections
-func SetTCPTimeout(s int) Option {
- return func(w *Worker) error {
- w.tcpTimeout = s
- return nil
- }
-}
-
// SetLogger sets the logger
func SetLogger(l logger.Logger) Option {
return func(w *Worker) error {
diff --git a/bt/worker.go b/bt/worker.go
index 1ae64a9..b00d1e5 100644
--- a/bt/worker.go
+++ b/bt/worker.go
@@ -17,11 +17,6 @@ import (
)
const (
- TCPTimeout = 5
- UDPTimeout = 5
-)
-
-const (
// MsgRequest marks a request message type
MsgRequest = iota
// MsgData marks a data message type
@@ -39,6 +34,8 @@ const (
MaxMetadataSize = BlockSize * 1000
// HandshakeBit represents handshake bit
HandshakeBit = 0
+ // TCPTimeout for BT connections
+ TCPTimeout = 5
)
var handshakePrefix = []byte{
@@ -50,7 +47,6 @@ type Worker struct {
pool chan chan models.Peer
port int
family string
- tcpTimeout int
OnNewTorrent func(t models.Torrent)
OnBadPeer func(p models.Peer)
log logger.Logger
diff --git a/db/sqlite.go b/db/sqlite.go
new file mode 100644
index 0000000..1b068cd
--- /dev/null
+++ b/db/sqlite.go
@@ -0,0 +1,519 @@
+package db
+
+import (
+ "database/sql"
+ "fmt"
+ "net"
+ "sync"
+
+ "github.com/felix/dhtsearch/models"
+ _ "github.com/mattn/go-sqlite3"
+)
+
+// Store is a store
+type Store struct {
+ stmts map[string]*sql.Stmt
+ conn *sql.DB
+ lock sync.RWMutex
+}
+
+// NewStore connects and initializes a new store
+func NewStore(dsn string) (*Store, error) {
+ conn, err := sql.Open("sqlite3", dsn)
+ if err != nil {
+ return nil, fmt.Errorf("failed to open store: %s", err)
+ }
+
+ s := &Store{conn: conn, stmts: make(map[string]*sql.Stmt)}
+
+ err = s.migrate()
+ if err != nil {
+ return nil, err
+ }
+
+ err = s.prepareStatements()
+ if err != nil {
+ return nil, err
+ }
+
+ return s, err
+}
+
+func (s *Store) Close() error {
+ return s.conn.Close()
+}
+
+// PendingInfohashes gets the next pending infohash from the store
+func (s *Store) PendingInfohashes(n int) (peers []*models.Peer, err error) {
+ s.lock.RLock()
+ defer s.lock.RUnlock()
+
+ rows, err := s.stmts["selectPendingInfohashes"].Query(n)
+ if err != nil {
+ return nil, err
+ }
+ defer rows.Close()
+ for rows.Next() {
+ var p models.Peer
+ var ih models.Infohash
+ var addr string
+ err = rows.Scan(&addr, &ih)
+ if err != nil {
+ return nil, err
+ }
+ // TODO save peer network?
+ p.Addr, err = net.ResolveUDPAddr("udp", addr)
+ if err != nil {
+ return nil, err
+ }
+ p.Infohash = ih
+ peers = append(peers, &p)
+ }
+ return peers, nil
+}
+
+// SaveTorrent implements torrentStore
+func (s *Store) SaveTorrent(t *models.Torrent) error {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+
+ tx, err := s.conn.Begin()
+ if err != nil {
+ return fmt.Errorf("saveTorrent: %s", err)
+ }
+ defer tx.Rollback()
+
+ var torrentID int64
+ var res sql.Result
+ res, err = tx.Stmt(s.stmts["insertTorrent"]).Exec(t.Name, t.Infohash, t.Size)
+ if err != nil {
+ return fmt.Errorf("insertTorrent: %s", err)
+ }
+ if torrentID, err = res.LastInsertId(); err != nil {
+ return fmt.Errorf("insertTorrent: %s", err)
+ }
+
+ // Write tags
+ for _, tag := range t.Tags {
+ tagID, err := s.SaveTag(tag)
+ if err != nil {
+ return fmt.Errorf("saveTag: %s", err)
+ }
+ _, err = tx.Stmt(s.stmts["insertTagTorrent"]).Exec(tagID, torrentID)
+ if err != nil {
+ return fmt.Errorf("insertTagTorrent: %s", err)
+ }
+ }
+
+ // Write files
+ for _, f := range t.Files {
+ _, err := tx.Stmt(s.stmts["insertFile"]).Exec(torrentID, f.Path, f.Size)
+ if err != nil {
+ return fmt.Errorf("insertFile: %s", err)
+ }
+ }
+
+ return tx.Commit()
+}
+
+func (s *Store) RemoveTorrent(t *models.Torrent) (err error) {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+
+ _, err = s.stmts["removeTorrent"].Exec(t.Infohash.Bytes())
+ return fmt.Errorf("removeTorrent: %s", err)
+}
+
+// SavePeer implements torrentStore
+func (s *Store) SavePeer(p *models.Peer) (err error) {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+
+ var peerID int64
+ var torrentID int64
+ var res sql.Result
+
+ tx, err := s.conn.Begin()
+ if err != nil {
+ return err
+ }
+ defer tx.Rollback()
+
+ if res, err = tx.Stmt(s.stmts["insertPeer"]).Exec(p.Addr.String()); err != nil {
+ return fmt.Errorf("savePeer: %s", err)
+ }
+ if peerID, err = res.LastInsertId(); err != nil {
+ return fmt.Errorf("savePeer: %s", err)
+ }
+
+ if res, err = tx.Stmt(s.stmts["insertTorrent"]).Exec(nil, p.Infohash.Bytes(), 0); err != nil {
+ return fmt.Errorf("savePeer: %s", err)
+ }
+ if torrentID, err = res.LastInsertId(); err != nil {
+ return fmt.Errorf("savePeer: %s", err)
+ }
+
+ if _, err = tx.Stmt(s.stmts["insertPeerTorrent"]).Exec(peerID, torrentID); err != nil {
+ return fmt.Errorf("savePeer: %s", err)
+ }
+ return tx.Commit()
+}
+
+func (s *Store) RemovePeer(p *models.Peer) (err error) {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+
+ _, err = s.stmts["removePeer"].Exec(p.Addr.String())
+ return err
+}
+
+// TorrentsByHash implements torrentStore
+func (s *Store) TorrentByHash(ih models.Infohash) (*models.Torrent, error) {
+ s.lock.RLock()
+ defer s.lock.RUnlock()
+
+ rows, err := s.stmts["getTorrent"].Query(ih)
+ if err != nil {
+ return nil, err
+ }
+ defer rows.Close()
+ torrents, err := s.fetchTorrents(rows)
+ if err != nil {
+ return nil, err
+ }
+ return torrents[0], nil
+}
+
+// TorrentsByName implements torrentStore
+func (s *Store) TorrentsByName(query string, offset int) ([]*models.Torrent, error) {
+ s.lock.RLock()
+ defer s.lock.RUnlock()
+
+ rows, err := s.stmts["searchTorrents"].Query(fmt.Sprintf("%%%s%%", query), offset)
+ if err != nil {
+ return nil, err
+ }
+ defer rows.Close()
+ torrents, err := s.fetchTorrents(rows)
+ if err != nil {
+ return nil, err
+ }
+ return torrents, nil
+}
+
+// TorrentsByTag implements torrentStore
+func (s *Store) TorrentsByTag(tag string, offset int) ([]*models.Torrent, error) {
+ s.lock.RLock()
+ defer s.lock.RUnlock()
+
+ rows, err := s.stmts["torrentsByTag"].Query(tag, offset)
+ if err != nil {
+ return nil, err
+ }
+ defer rows.Close()
+ torrents, err := s.fetchTorrents(rows)
+ if err != nil {
+ return nil, err
+ }
+ return torrents, nil
+}
+
+// SaveTag implements tagStore interface
+func (s *Store) SaveTag(tag string) (int, error) {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+
+ res, err := s.stmts["insertTag"].Exec(tag)
+ if err != nil {
+ return 0, fmt.Errorf("saveTag: %s", err)
+ }
+ tagID, err := res.LastInsertId()
+ if err != nil {
+ return 0, fmt.Errorf("saveTag: %s", err)
+ }
+ return int(tagID), nil
+}
+
+func (s *Store) fetchTorrents(rows *sql.Rows) (torrents []*models.Torrent, err error) {
+ for rows.Next() {
+ var t models.Torrent
+ /*
+ t := &models.Torrent{
+ Files: []models.File{},
+ Tags: []string{},
+ }
+ */
+ err = rows.Scan(
+ &t.ID, &t.Infohash, &t.Name, &t.Size, &t.Created, &t.Updated,
+ )
+ if err != nil {
+ return nil, err
+ }
+
+ err = func() error {
+ rowsf, err := s.stmts["selectFiles"].Query(t.ID)
+ defer rowsf.Close()
+ if err != nil {
+ return fmt.Errorf("failed to select files: %s", err)
+ }
+ for rowsf.Next() {
+ var f models.File
+ err = rowsf.Scan(&f.ID, &f.TorrentID, &f.Path, &f.Size)
+ if err != nil {
+ return fmt.Errorf("failed to build file: %s", err)
+ }
+ }
+ return nil
+ }()
+ if err != nil {
+ return nil, err
+ }
+
+ err = func() error {
+ rowst, err := s.stmts["selectTags"].Query(t.ID)
+ defer rowst.Close()
+ if err != nil {
+ return fmt.Errorf("failed to select tags: %s", err)
+ }
+ for rowst.Next() {
+ var tg string
+ err = rowst.Scan(&tg)
+ if err != nil {
+ return fmt.Errorf("failed to build tag: %s", err)
+ }
+ t.Tags = append(t.Tags, tg)
+ }
+ return nil
+ }()
+ if err != nil {
+ return nil, err
+ }
+ torrents = append(torrents, &t)
+ }
+ return torrents, err
+}
+
+func (s *Store) migrate() error {
+ _, err := s.conn.Exec(`
+ pragma journal_mode=wal;
+ pragma temp_store=1;
+ pragma foreign_keys=on;
+ pragma encoding='utf-8';
+ `)
+ if err != nil {
+ return err
+ }
+
+ tx, err := s.conn.Begin()
+ if err != nil {
+ return err
+ }
+ defer tx.Rollback()
+
+ var version int
+ err = tx.QueryRow("pragma user_version;").Scan(&version)
+ if err != nil {
+ return err
+ }
+
+ if version == 0 {
+ _, err = tx.Exec(sqliteSchema)
+ if err != nil {
+ return err
+ }
+ }
+ tx.Commit()
+
+ return nil
+}
+
+func (s *Store) prepareStatements() error {
+ var err error
+ if s.stmts["removeTorrent"], err = s.conn.Prepare(
+ `delete from torrents
+ where infohash = ?`,
+ ); err != nil {
+ return err
+ }
+
+ if s.stmts["selectPendingInfohashes"], err = s.conn.Prepare(
+ `with get_order as (
+ select t.id as torrent_id, min(pt.peer_id) as peer_id, count(pt.peer_id) as c
+ from torrents t
+ join peers_torrents pt on pt.torrent_id = t.id
+ where t.name is null
+ group by t.id
+ -- order by c desc
+ order by t.updated desc
+ limit ?
+ ) select p.address, t.infohash
+ from get_order go
+ join torrents t on t.id = go.torrent_id
+ join peers p on p.id = go.peer_id`,
+ ); err != nil {
+ return err
+ }
+
+ if s.stmts["selectFiles"], err = s.conn.Prepare(
+ `select * from files
+ where torrent_id = ?
+ order by path asc`,
+ ); err != nil {
+ return err
+ }
+
+ if s.stmts["insertPeer"], err = s.conn.Prepare(
+ `insert or replace into peers
+ (address, created, updated)
+ values
+ (?, date('now'), date('now'))`,
+ ); err != nil {
+ return err
+ }
+
+ if s.stmts["insertPeerTorrent"], err = s.conn.Prepare(
+ `insert or ignore into peers_torrents
+ (peer_id, torrent_id)
+ values
+ (?, ?)`,
+ ); err != nil {
+ return err
+ }
+
+ if s.stmts["insertTorrent"], err = s.conn.Prepare(
+ `insert or replace into torrents (
+ name, infohash, size, created, updated
+ ) values (
+ ?, ?, ?, date('now'), date('now')
+ )`,
+ ); err != nil {
+ return err
+ }
+
+ if s.stmts["getTorrent"], err = s.conn.Prepare(
+ `select * from torrents where infohash = ? limit 1`,
+ ); err != nil {
+ return err
+ }
+
+ if s.stmts["insertFile"], err = s.conn.Prepare(
+ `insert into files
+ (torrent_id, path, size)
+ values
+ (?, ?, ?)`,
+ ); err != nil {
+ return err
+ }
+
+ if s.stmts["selectTags"], err = s.conn.Prepare(
+ `select name
+ from tags t
+ inner join tags_torrents tt on t.id = tt.tag_id
+ where tt.torrent_id = ?`,
+ ); err != nil {
+ return err
+ }
+
+ if s.stmts["removePeer"], err = s.conn.Prepare(
+ `delete from peers where address = ?`,
+ ); err != nil {
+ return err
+ }
+
+ if s.stmts["insertTagTorrent"], err = s.conn.Prepare(
+ `insert or ignore into tags_torrents
+ (tag_id, torrent_id) values (?, ?)`,
+ ); err != nil {
+ return err
+ }
+
+ if s.stmts["insertTag"], err = s.conn.Prepare(
+ `insert or replace into tags (name) values (?)`,
+ ); err != nil {
+ return err
+ }
+
+ if s.stmts["torrentsByTag"], err = s.conn.Prepare(
+ `select t.id, t.infohash, t.name, t.size, t.created, t.updated
+ from torrents t
+ inner join tags_torrents tt on t.id = tt.torrent_id
+ inner join tags ta on tt.tag_id = ta.id
+ where ta.name = ? group by t.id
+ order by updated asc
+ limit 50 offset ?`,
+ ); err != nil {
+ return err
+ }
+
+ if s.stmts["searchTorrents"], err = s.conn.Prepare(
+ `select id, infohash, name, size, updated
+ from torrents
+ where id in (
+ select * from torrents_fts
+ where torrents_fts match ?
+ order by rank desc
+ )
+ order by updated desc
+ limit 50 offset ?`,
+ ); err != nil {
+ return err
+ }
+
+ return nil
+}
+
+const sqliteSchema = `create table if not exists torrents (
+ id integer primary key,
+ infohash blob unique,
+ size bigint,
+ name text,
+ created timestamp with time zone,
+ updated timestamp with time zone,
+ tsv tsvector
+);
+create virtual table torrents_fts using fts5(
+ name, content='torrents', content_rowid='id',
+ tokenize="porter unicode61 separators ' !""#$%&''()*+,-./:;<=>?@[\]^_` + "`" + `{|}~'"
+);
+create trigger torrents_after_insert after insert on torrents begin
+insert into torrents_fts(rowid, name) values (new.id, new.name);
+end;
+create trigger torrents_ad after delete on torrents begin
+insert into torrents_fts(torrents_fts, rowid, name) values('delete', old.id, old.name);
+end;
+create trigger torrents_au after update on torrents begin
+insert into torrents_fts(torrents_fts, rowid, name) values('delete', old.id, old.name);
+insert into torrents_fts(rowid, name) values (new.id, new.name);
+end;
+create table if not exists files (
+ id integer primary key,
+ torrent_id integer not null references torrents on delete cascade,
+ path text,
+ size bigint
+);
+create index files_torrent_idx on files (torrent_id);
+create table if not exists tags (
+ id integer primary key,
+ name character varying(50) unique
+);
+create table if not exists tags_torrents (
+ tag_id integer not null references tags on delete cascade,
+ torrent_id integer not null references torrents on delete cascade,
+ primary key (tag_id, torrent_id)
+);
+create index tags_torrents_tag_idx on tags_torrents (tag_id);
+create index tags_torrents_torrent_idx on tags_torrents (torrent_id);
+create table if not exists peers (
+ id integer primary key,
+ address character varying(50),
+ created timestamp with time zone,
+ updated timestamp with time zone
+);
+create table if not exists peers_torrents (
+ peer_id integer not null references peers on delete cascade,
+ torrent_id integer not null references torrents on delete cascade,
+ primary key (peer_id, torrent_id)
+);
+create index peers_torrents_peer_idx on peers_torrents (peer_id);
+create index peers_torrents_torrent_idx on peers_torrents (torrent_id);
+pragma user_version = 1;`
diff --git a/dht/messages.go b/dht/messages.go
index d972d65..4063f01 100644
--- a/dht/messages.go
+++ b/dht/messages.go
@@ -79,7 +79,7 @@ func (n *Node) onAnnouncePeerQuery(rn remoteNode, msg map[string]interface{}) er
return err
}
- //n.log.Debug("announce_peer", "source", rn)
+ n.log.Debug("announce_peer", "source", rn)
host, port, err := net.SplitHostPort(rn.addr.String())
if err != nil {
@@ -89,6 +89,15 @@ func (n *Node) onAnnouncePeerQuery(rn remoteNode, msg map[string]interface{}) er
return fmt.Errorf("ignoring port 0")
}
+ ihStr, err := krpc.GetString(a, "info_hash")
+ if err != nil {
+ return err
+ }
+ ih, err := models.InfohashFromString(ihStr)
+ if err != nil {
+ return fmt.Errorf("invalid torrent: %s", err)
+ }
+
newPort, err := krpc.GetInt(a, "port")
if err == nil {
if iPort, err := krpc.GetInt(a, "implied_port"); err == nil && iPort == 0 {
@@ -97,21 +106,13 @@ func (n *Node) onAnnouncePeerQuery(rn remoteNode, msg map[string]interface{}) er
if err != nil {
return err
}
+ n.log.Debug("implied port", "infohash", ih, "original", rn.addr.String(), "new", addr.String())
rn = remoteNode{addr: addr, id: rn.id}
}
}
// TODO do we reply?
- ihStr, err := krpc.GetString(a, "info_hash")
- if err != nil {
- return err
- }
- ih, err := models.InfohashFromString(ihStr)
- if err != nil {
- n.log.Warn("invalid torrent", "infohash", ihStr)
- }
-
p := models.Peer{Addr: rn.addr, Infohash: *ih}
if n.OnAnnouncePeer != nil {
go n.OnAnnouncePeer(p)