diff options
| author | Felix Hanley <felix@userspace.com.au> | 2018-03-23 04:37:46 +0000 |
|---|---|---|
| committer | Felix Hanley <felix@userspace.com.au> | 2018-03-23 04:37:57 +0000 |
| commit | 082c7506f302c93d7e2126f611058b536d262e31 (patch) | |
| tree | ee45b2823e543f262756a5da5535424388d72372 /db/sqlite.go | |
| parent | 2cb7f7a926d844b1fff65ccd386d774ba83bd561 (diff) | |
| download | dhtsearch-082c7506f302c93d7e2126f611058b536d262e31.tar.gz dhtsearch-082c7506f302c93d7e2126f611058b536d262e31.tar.bz2 | |
Basic sqlite functionality
Diffstat (limited to 'db/sqlite.go')
| -rw-r--r-- | db/sqlite.go | 519 |
1 files changed, 519 insertions, 0 deletions
diff --git a/db/sqlite.go b/db/sqlite.go new file mode 100644 index 0000000..1b068cd --- /dev/null +++ b/db/sqlite.go @@ -0,0 +1,519 @@ +package db + +import ( + "database/sql" + "fmt" + "net" + "sync" + + "github.com/felix/dhtsearch/models" + _ "github.com/mattn/go-sqlite3" +) + +// Store is a store +type Store struct { + stmts map[string]*sql.Stmt + conn *sql.DB + lock sync.RWMutex +} + +// NewStore connects and initializes a new store +func NewStore(dsn string) (*Store, error) { + conn, err := sql.Open("sqlite3", dsn) + if err != nil { + return nil, fmt.Errorf("failed to open store: %s", err) + } + + s := &Store{conn: conn, stmts: make(map[string]*sql.Stmt)} + + err = s.migrate() + if err != nil { + return nil, err + } + + err = s.prepareStatements() + if err != nil { + return nil, err + } + + return s, err +} + +func (s *Store) Close() error { + return s.conn.Close() +} + +// PendingInfohashes gets the next pending infohash from the store +func (s *Store) PendingInfohashes(n int) (peers []*models.Peer, err error) { + s.lock.RLock() + defer s.lock.RUnlock() + + rows, err := s.stmts["selectPendingInfohashes"].Query(n) + if err != nil { + return nil, err + } + defer rows.Close() + for rows.Next() { + var p models.Peer + var ih models.Infohash + var addr string + err = rows.Scan(&addr, &ih) + if err != nil { + return nil, err + } + // TODO save peer network? + p.Addr, err = net.ResolveUDPAddr("udp", addr) + if err != nil { + return nil, err + } + p.Infohash = ih + peers = append(peers, &p) + } + return peers, nil +} + +// SaveTorrent implements torrentStore +func (s *Store) SaveTorrent(t *models.Torrent) error { + s.lock.Lock() + defer s.lock.Unlock() + + tx, err := s.conn.Begin() + if err != nil { + return fmt.Errorf("saveTorrent: %s", err) + } + defer tx.Rollback() + + var torrentID int64 + var res sql.Result + res, err = tx.Stmt(s.stmts["insertTorrent"]).Exec(t.Name, t.Infohash, t.Size) + if err != nil { + return fmt.Errorf("insertTorrent: %s", err) + } + if torrentID, err = res.LastInsertId(); err != nil { + return fmt.Errorf("insertTorrent: %s", err) + } + + // Write tags + for _, tag := range t.Tags { + tagID, err := s.SaveTag(tag) + if err != nil { + return fmt.Errorf("saveTag: %s", err) + } + _, err = tx.Stmt(s.stmts["insertTagTorrent"]).Exec(tagID, torrentID) + if err != nil { + return fmt.Errorf("insertTagTorrent: %s", err) + } + } + + // Write files + for _, f := range t.Files { + _, err := tx.Stmt(s.stmts["insertFile"]).Exec(torrentID, f.Path, f.Size) + if err != nil { + return fmt.Errorf("insertFile: %s", err) + } + } + + return tx.Commit() +} + +func (s *Store) RemoveTorrent(t *models.Torrent) (err error) { + s.lock.Lock() + defer s.lock.Unlock() + + _, err = s.stmts["removeTorrent"].Exec(t.Infohash.Bytes()) + return fmt.Errorf("removeTorrent: %s", err) +} + +// SavePeer implements torrentStore +func (s *Store) SavePeer(p *models.Peer) (err error) { + s.lock.Lock() + defer s.lock.Unlock() + + var peerID int64 + var torrentID int64 + var res sql.Result + + tx, err := s.conn.Begin() + if err != nil { + return err + } + defer tx.Rollback() + + if res, err = tx.Stmt(s.stmts["insertPeer"]).Exec(p.Addr.String()); err != nil { + return fmt.Errorf("savePeer: %s", err) + } + if peerID, err = res.LastInsertId(); err != nil { + return fmt.Errorf("savePeer: %s", err) + } + + if res, err = tx.Stmt(s.stmts["insertTorrent"]).Exec(nil, p.Infohash.Bytes(), 0); err != nil { + return fmt.Errorf("savePeer: %s", err) + } + if torrentID, err = res.LastInsertId(); err != nil { + return fmt.Errorf("savePeer: %s", err) + } + + if _, err = tx.Stmt(s.stmts["insertPeerTorrent"]).Exec(peerID, torrentID); err != nil { + return fmt.Errorf("savePeer: %s", err) + } + return tx.Commit() +} + +func (s *Store) RemovePeer(p *models.Peer) (err error) { + s.lock.Lock() + defer s.lock.Unlock() + + _, err = s.stmts["removePeer"].Exec(p.Addr.String()) + return err +} + +// TorrentsByHash implements torrentStore +func (s *Store) TorrentByHash(ih models.Infohash) (*models.Torrent, error) { + s.lock.RLock() + defer s.lock.RUnlock() + + rows, err := s.stmts["getTorrent"].Query(ih) + if err != nil { + return nil, err + } + defer rows.Close() + torrents, err := s.fetchTorrents(rows) + if err != nil { + return nil, err + } + return torrents[0], nil +} + +// TorrentsByName implements torrentStore +func (s *Store) TorrentsByName(query string, offset int) ([]*models.Torrent, error) { + s.lock.RLock() + defer s.lock.RUnlock() + + rows, err := s.stmts["searchTorrents"].Query(fmt.Sprintf("%%%s%%", query), offset) + if err != nil { + return nil, err + } + defer rows.Close() + torrents, err := s.fetchTorrents(rows) + if err != nil { + return nil, err + } + return torrents, nil +} + +// TorrentsByTag implements torrentStore +func (s *Store) TorrentsByTag(tag string, offset int) ([]*models.Torrent, error) { + s.lock.RLock() + defer s.lock.RUnlock() + + rows, err := s.stmts["torrentsByTag"].Query(tag, offset) + if err != nil { + return nil, err + } + defer rows.Close() + torrents, err := s.fetchTorrents(rows) + if err != nil { + return nil, err + } + return torrents, nil +} + +// SaveTag implements tagStore interface +func (s *Store) SaveTag(tag string) (int, error) { + s.lock.Lock() + defer s.lock.Unlock() + + res, err := s.stmts["insertTag"].Exec(tag) + if err != nil { + return 0, fmt.Errorf("saveTag: %s", err) + } + tagID, err := res.LastInsertId() + if err != nil { + return 0, fmt.Errorf("saveTag: %s", err) + } + return int(tagID), nil +} + +func (s *Store) fetchTorrents(rows *sql.Rows) (torrents []*models.Torrent, err error) { + for rows.Next() { + var t models.Torrent + /* + t := &models.Torrent{ + Files: []models.File{}, + Tags: []string{}, + } + */ + err = rows.Scan( + &t.ID, &t.Infohash, &t.Name, &t.Size, &t.Created, &t.Updated, + ) + if err != nil { + return nil, err + } + + err = func() error { + rowsf, err := s.stmts["selectFiles"].Query(t.ID) + defer rowsf.Close() + if err != nil { + return fmt.Errorf("failed to select files: %s", err) + } + for rowsf.Next() { + var f models.File + err = rowsf.Scan(&f.ID, &f.TorrentID, &f.Path, &f.Size) + if err != nil { + return fmt.Errorf("failed to build file: %s", err) + } + } + return nil + }() + if err != nil { + return nil, err + } + + err = func() error { + rowst, err := s.stmts["selectTags"].Query(t.ID) + defer rowst.Close() + if err != nil { + return fmt.Errorf("failed to select tags: %s", err) + } + for rowst.Next() { + var tg string + err = rowst.Scan(&tg) + if err != nil { + return fmt.Errorf("failed to build tag: %s", err) + } + t.Tags = append(t.Tags, tg) + } + return nil + }() + if err != nil { + return nil, err + } + torrents = append(torrents, &t) + } + return torrents, err +} + +func (s *Store) migrate() error { + _, err := s.conn.Exec(` + pragma journal_mode=wal; + pragma temp_store=1; + pragma foreign_keys=on; + pragma encoding='utf-8'; + `) + if err != nil { + return err + } + + tx, err := s.conn.Begin() + if err != nil { + return err + } + defer tx.Rollback() + + var version int + err = tx.QueryRow("pragma user_version;").Scan(&version) + if err != nil { + return err + } + + if version == 0 { + _, err = tx.Exec(sqliteSchema) + if err != nil { + return err + } + } + tx.Commit() + + return nil +} + +func (s *Store) prepareStatements() error { + var err error + if s.stmts["removeTorrent"], err = s.conn.Prepare( + `delete from torrents + where infohash = ?`, + ); err != nil { + return err + } + + if s.stmts["selectPendingInfohashes"], err = s.conn.Prepare( + `with get_order as ( + select t.id as torrent_id, min(pt.peer_id) as peer_id, count(pt.peer_id) as c + from torrents t + join peers_torrents pt on pt.torrent_id = t.id + where t.name is null + group by t.id + -- order by c desc + order by t.updated desc + limit ? + ) select p.address, t.infohash + from get_order go + join torrents t on t.id = go.torrent_id + join peers p on p.id = go.peer_id`, + ); err != nil { + return err + } + + if s.stmts["selectFiles"], err = s.conn.Prepare( + `select * from files + where torrent_id = ? + order by path asc`, + ); err != nil { + return err + } + + if s.stmts["insertPeer"], err = s.conn.Prepare( + `insert or replace into peers + (address, created, updated) + values + (?, date('now'), date('now'))`, + ); err != nil { + return err + } + + if s.stmts["insertPeerTorrent"], err = s.conn.Prepare( + `insert or ignore into peers_torrents + (peer_id, torrent_id) + values + (?, ?)`, + ); err != nil { + return err + } + + if s.stmts["insertTorrent"], err = s.conn.Prepare( + `insert or replace into torrents ( + name, infohash, size, created, updated + ) values ( + ?, ?, ?, date('now'), date('now') + )`, + ); err != nil { + return err + } + + if s.stmts["getTorrent"], err = s.conn.Prepare( + `select * from torrents where infohash = ? limit 1`, + ); err != nil { + return err + } + + if s.stmts["insertFile"], err = s.conn.Prepare( + `insert into files + (torrent_id, path, size) + values + (?, ?, ?)`, + ); err != nil { + return err + } + + if s.stmts["selectTags"], err = s.conn.Prepare( + `select name + from tags t + inner join tags_torrents tt on t.id = tt.tag_id + where tt.torrent_id = ?`, + ); err != nil { + return err + } + + if s.stmts["removePeer"], err = s.conn.Prepare( + `delete from peers where address = ?`, + ); err != nil { + return err + } + + if s.stmts["insertTagTorrent"], err = s.conn.Prepare( + `insert or ignore into tags_torrents + (tag_id, torrent_id) values (?, ?)`, + ); err != nil { + return err + } + + if s.stmts["insertTag"], err = s.conn.Prepare( + `insert or replace into tags (name) values (?)`, + ); err != nil { + return err + } + + if s.stmts["torrentsByTag"], err = s.conn.Prepare( + `select t.id, t.infohash, t.name, t.size, t.created, t.updated + from torrents t + inner join tags_torrents tt on t.id = tt.torrent_id + inner join tags ta on tt.tag_id = ta.id + where ta.name = ? group by t.id + order by updated asc + limit 50 offset ?`, + ); err != nil { + return err + } + + if s.stmts["searchTorrents"], err = s.conn.Prepare( + `select id, infohash, name, size, updated + from torrents + where id in ( + select * from torrents_fts + where torrents_fts match ? + order by rank desc + ) + order by updated desc + limit 50 offset ?`, + ); err != nil { + return err + } + + return nil +} + +const sqliteSchema = `create table if not exists torrents ( + id integer primary key, + infohash blob unique, + size bigint, + name text, + created timestamp with time zone, + updated timestamp with time zone, + tsv tsvector +); +create virtual table torrents_fts using fts5( + name, content='torrents', content_rowid='id', + tokenize="porter unicode61 separators ' !""#$%&''()*+,-./:;<=>?@[\]^_` + "`" + `{|}~'" +); +create trigger torrents_after_insert after insert on torrents begin +insert into torrents_fts(rowid, name) values (new.id, new.name); +end; +create trigger torrents_ad after delete on torrents begin +insert into torrents_fts(torrents_fts, rowid, name) values('delete', old.id, old.name); +end; +create trigger torrents_au after update on torrents begin +insert into torrents_fts(torrents_fts, rowid, name) values('delete', old.id, old.name); +insert into torrents_fts(rowid, name) values (new.id, new.name); +end; +create table if not exists files ( + id integer primary key, + torrent_id integer not null references torrents on delete cascade, + path text, + size bigint +); +create index files_torrent_idx on files (torrent_id); +create table if not exists tags ( + id integer primary key, + name character varying(50) unique +); +create table if not exists tags_torrents ( + tag_id integer not null references tags on delete cascade, + torrent_id integer not null references torrents on delete cascade, + primary key (tag_id, torrent_id) +); +create index tags_torrents_tag_idx on tags_torrents (tag_id); +create index tags_torrents_torrent_idx on tags_torrents (torrent_id); +create table if not exists peers ( + id integer primary key, + address character varying(50), + created timestamp with time zone, + updated timestamp with time zone +); +create table if not exists peers_torrents ( + peer_id integer not null references peers on delete cascade, + torrent_id integer not null references torrents on delete cascade, + primary key (peer_id, torrent_id) +); +create index peers_torrents_peer_idx on peers_torrents (peer_id); +create index peers_torrents_torrent_idx on peers_torrents (torrent_id); +pragma user_version = 1;` |
