aboutsummaryrefslogtreecommitdiff
path: root/db/pgsql.go
diff options
context:
space:
mode:
Diffstat (limited to 'db/pgsql.go')
-rw-r--r--db/pgsql.go455
1 files changed, 280 insertions, 175 deletions
diff --git a/db/pgsql.go b/db/pgsql.go
index b37b13d..db850c2 100644
--- a/db/pgsql.go
+++ b/db/pgsql.go
@@ -1,3 +1,5 @@
+// +build ignore postgres
+
package db
import (
@@ -27,22 +29,27 @@ func NewStore(dsn string) (*Store, error) {
s := &Store{c}
- // TODO
- var upToDate bool
- err = s.QueryRow(sqlCheckSchema).Scan(&upToDate)
- if err != nil || !upToDate {
- _, err = s.Exec(sqlSchema)
+ err = s.migrate()
+ if err != nil {
+ return nil, err
}
+
+ err = s.prepareStatements()
+ if err != nil {
+ return nil, err
+ }
+
return s, err
}
// PendingInfohashes gets the next pending infohash from the store
func (s *Store) PendingInfohashes(n int) (peers []*models.Peer, err error) {
- rows, err := s.Query(sqlSelectPendingInfohashes, n)
- defer rows.Close()
+
+ rows, err := s.Query("selectPendingInfohashes", n)
if err != nil {
return nil, err
}
+ defer rows.Close()
for rows.Next() {
var p models.Peer
var ih pgtype.Bytea
@@ -71,7 +78,7 @@ func (s *Store) SaveTorrent(t *models.Torrent) error {
defer tx.Rollback()
var torrentID int
- err = tx.QueryRow(sqlInsertTorrent, t.Name, t.Infohash, t.Size).Scan(&torrentID)
+ err = tx.QueryRow("insertTorrent", t.Name, t.Infohash, t.Size).Scan(&torrentID)
if err != nil {
return fmt.Errorf("insertTorrent: %s", err)
}
@@ -82,7 +89,7 @@ func (s *Store) SaveTorrent(t *models.Torrent) error {
if err != nil {
return fmt.Errorf("saveTag: %s", err)
}
- _, err = tx.Exec(sqlInsertTagTorrent, tagID, torrentID)
+ _, err = tx.Exec("insertTagTorrent", tagID, torrentID)
if err != nil {
return fmt.Errorf("insertTagTorrent: %s", err)
}
@@ -90,14 +97,14 @@ func (s *Store) SaveTorrent(t *models.Torrent) error {
// Write files
for _, f := range t.Files {
- _, err := tx.Exec(sqlInsertFile, torrentID, f.Path, f.Size)
+ _, err := tx.Exec("insertFile", torrentID, f.Path, f.Size)
if err != nil {
return fmt.Errorf("insertFile: %s", err)
}
}
// Should this be outside the transaction?
- _, err = tx.Exec(sqlUpdateFTSVectors, torrentID)
+ _, err = tx.Exec("updateFTSVectors", torrentID)
if err != nil {
return fmt.Errorf("updateVectors: %s", err)
}
@@ -105,28 +112,28 @@ func (s *Store) SaveTorrent(t *models.Torrent) error {
}
func (s *Store) RemoveTorrent(t *models.Torrent) (err error) {
- _, err = s.Exec(sqlRemoveTorrent, t.Infohash.Bytes())
+ _, err = s.Exec("removeTorrent", t.Infohash.Bytes())
return err
}
// SavePeer implements torrentStore
func (s *Store) SavePeer(p *models.Peer) (err error) {
- _, err = s.Exec(sqlInsertPeer, p.Addr.String(), p.Infohash.Bytes())
+ _, err = s.Exec("insertPeer", p.Addr.String(), p.Infohash.Bytes())
return err
}
func (s *Store) RemovePeer(p *models.Peer) (err error) {
- _, err = s.Exec(sqlRemovePeer, p.Addr.String())
+ _, err = s.Exec("removePeer", p.Addr.String())
return err
}
// TorrentsByHash implements torrentStore
func (s *Store) TorrentByHash(ih models.Infohash) (*models.Torrent, error) {
- rows, err := s.Query(sqlGetTorrent, ih)
- defer rows.Close()
+ rows, err := s.Query("getTorrent", ih)
if err != nil {
return nil, err
}
+ defer rows.Close()
torrents, err := s.fetchTorrents(rows)
if err != nil {
return nil, err
@@ -136,11 +143,11 @@ func (s *Store) TorrentByHash(ih models.Infohash) (*models.Torrent, error) {
// TorrentsByName implements torrentStore
func (s *Store) TorrentsByName(query string, offset int) ([]*models.Torrent, error) {
- rows, err := s.Query(sqlSearchTorrents, fmt.Sprintf("%%%s%%", query), offset)
- defer rows.Close()
+ rows, err := s.Query("searchTorrents", fmt.Sprintf("%%%s%%", query), offset)
if err != nil {
return nil, err
}
+ defer rows.Close()
torrents, err := s.fetchTorrents(rows)
if err != nil {
return nil, err
@@ -150,11 +157,11 @@ func (s *Store) TorrentsByName(query string, offset int) ([]*models.Torrent, err
// TorrentsByTag implements torrentStore
func (s *Store) TorrentsByTag(tag string, offset int) ([]*models.Torrent, error) {
- rows, err := s.Query(sqlTorrentsByTag, tag, offset)
- defer rows.Close()
+ rows, err := s.Query("torrentsByTag", tag, offset)
if err != nil {
return nil, err
}
+ defer rows.Close()
torrents, err := s.fetchTorrents(rows)
if err != nil {
return nil, err
@@ -164,7 +171,7 @@ func (s *Store) TorrentsByTag(tag string, offset int) ([]*models.Torrent, error)
// SaveTag implements tagStore interface
func (s *Store) SaveTag(tag string) (tagID int, err error) {
- err = s.QueryRow(sqlInsertTag, tag).Scan(&tagID)
+ err = s.QueryRow("insertTag", tag).Scan(&tagID)
return tagID, err
}
@@ -185,7 +192,7 @@ func (s *Store) fetchTorrents(rows *pgx.Rows) (torrents []*models.Torrent, err e
}
err = func() error {
- rowsf, err := s.Query(sqlSelectFiles, t.ID)
+ rowsf, err := s.Query("selectFiles", t.ID)
defer rowsf.Close()
if err != nil {
return fmt.Errorf("failed to select files: %s", err)
@@ -204,7 +211,7 @@ func (s *Store) fetchTorrents(rows *pgx.Rows) (torrents []*models.Torrent, err e
}
err = func() error {
- rowst, err := s.Query(sqlSelectTags, t.ID)
+ rowst, err := s.Query("selectTags", t.ID)
defer rowst.Close()
if err != nil {
return fmt.Errorf("failed to select tags: %s", err)
@@ -227,158 +234,256 @@ func (s *Store) fetchTorrents(rows *pgx.Rows) (torrents []*models.Torrent, err e
return torrents, err
}
-const (
- sqlGetTorrent = `select *
- from torrents
- where infohash = $1 limit 1`
-
- sqlInsertTorrent = `insert into torrents (
- name, infohash, size, created, updated
- ) values (
- $1, $2, $3, now(), now()
- ) on conflict (infohash) do
- update set
- name = $1,
- size = $3,
- updated = now()
- returning id`
-
- sqlRemoveTorrent = `delete from torrents
- where infohash = $1`
-
- // peer.address
- // torrent.infohash
- sqlInsertPeer = `with save_peer as (
- insert into peers
- (address, created, updated) values ($1, now(), now())
- returning id
- ), save_torrent as (
- insert into torrents (infohash, created, updated)
- values ($2, now(), now())
- on conflict (infohash) do update set
- updated = now()
- returning id
- ) insert into peers_torrents
- (peer_id, torrent_id)
- select
- sp.id, st.id
- from save_peer sp, save_torrent st
- on conflict do nothing`
-
- sqlRemovePeer = `delete from peers
- where address = $1`
-
- sqlUpdateFTSVectors = `update torrents set
- tsv = sub.tsv from (
- select t.id,
- setweight(to_tsvector(
- translate(t.name, '._-', ' ')
- ), 'A')
- || setweight(to_tsvector(
- translate(string_agg(coalesce(f.path, ''), ' '), './_-', ' ')
- ), 'B') as tsv
- from torrents t
- left join files f on t.id = f.torrent_id
- where t.id = $1
- group by t.id
- ) as sub
- where sub.id = torrents.id`
-
- sqlSelectPendingInfohashes = `with get_order as (
- select t.id as torrent_id, min(pt.peer_id) as peer_id, count(pt.peer_id) as c
- from torrents t
- join peers_torrents pt on pt.torrent_id = t.id
- where t.name is null
- group by t.id
- -- order by c desc
- order by t.updated desc
- limit $1
- ) select p.address, t.infohash
- from get_order go
- join torrents t on t.id = go.torrent_id
- join peers p on p.id = go.peer_id`
-
- sqlSearchTorrents = `select
- t.id, t.infohash, t.name, t.size, t.updated
- from torrents t
- where t.tsv @@ plainto_tsquery($1)
- order by ts_rank(tsv, plainto_tsquery($1)) desc, t.updated desc
- limit 50 offset $2`
-
- sqlTorrentsByTag = `select
- t.id, t.infohash, t.name, t.size, t.created, t.updated
- from torrents t
- inner join tags_torrents tt on t.id = tt.torrent_id
- inner join tags ta on tt.tag_id = ta.id
- where ta.name = $1 group by t.id
- order by updated asc
- limit 50 offset $2`
-
- sqlSelectFiles = `select * from files
- where torrent_id = $1
- order by path asc`
-
- sqlInsertFile = `insert into files
- (torrent_id, path, size)
- values
- ($1, $2, $3)`
-
- sqlSelectTags = `select name
- from tags t
- inner join tags_torrents tt on t.id = tt.tag_id
- where tt.torrent_id = $1`
-
- sqlInsertTagTorrent = `insert into tags_torrents
- (tag_id, torrent_id) values ($1, $2)
- on conflict do nothing`
-
- sqlInsertTag = `insert into tags (name) values ($1)
- on conflict (name) do update set name = excluded.name returning id`
-
- sqlCheckSchema = `select exists (
+func (s *Store) migrate() error {
+ tx, err := s.Begin()
+ if err != nil {
+ return err
+ }
+ defer tx.Rollback()
+
+ var initialized bool
+ err = tx.QueryRow(`select exists (
select 1 from pg_tables
where schemaname = 'public'
and tablename = 'settings'
- )`
-
- sqlSchema = `create table if not exists torrents (
- id serial primary key,
- infohash bytea unique,
- size bigint,
- name text,
- created timestamp with time zone,
- updated timestamp with time zone,
- tsv tsvector
- );
- create index tsv_idx on torrents using gin(tsv);
- create table if not exists files (
- id serial not null primary key,
- torrent_id integer not null references torrents on delete cascade,
- path text,
- size bigint
- );
- create table if not exists tags (
- id serial primary key,
- name character varying(50) unique
- );
- create table if not exists tags_torrents (
- tag_id integer not null references tags (id) on delete cascade,
- torrent_id integer not null references torrents (id) on delete cascade,
- primary key (tag_id, torrent_id)
- );
- create table if not exists peers (
- id serial primary key,
- address character varying(50),
- created timestamp with time zone,
- updated timestamp with time zone
- );
- create table if not exists peers_torrents (
- peer_id integer not null references peers (id) on delete cascade,
- torrent_id integer not null references torrents (id) on delete cascade,
- primary key (peer_id, torrent_id)
- );
- create table if not exists settings (
- schema_version integer not null
- );
- insert into settings (schema_version) values (1);`
-)
+ )`).Scan(&initialized)
+ if err != nil {
+ return err
+ }
+
+ if !initialized {
+ _, err = tx.Exec("baseSchema")
+ }
+
+ // Start migrations
+ var currentVersion int
+ err = tx.QueryRow("select schema_version from settings").Scan(currentVersion)
+ if err != nil {
+ return err
+ }
+
+ switch currentVersion {
+ case 1:
+ default:
+ }
+
+ return nil
+}
+
+func (s *Store) prepareStatements() error {
+ if _, err := s.Prepare(
+ "removeTorrent",
+ `delete from torrents
+ where infohash = $1`,
+ ); err != nil {
+ return err
+ }
+ if _, err := s.Prepare(
+ "selectPendingInfohashes",
+ `with get_order as (
+ select t.id as torrent_id, min(pt.peer_id) as peer_id, count(pt.peer_id) as c
+ from torrents t
+ join peers_torrents pt on pt.torrent_id = t.id
+ where t.name is null
+ group by t.id
+ -- order by c desc
+ order by t.updated desc
+ limit $1
+ ) select p.address, t.infohash
+ from get_order go
+ join torrents t on t.id = go.torrent_id
+ join peers p on p.id = go.peer_id`,
+ ); err != nil {
+ return err
+ }
+
+ if _, err := s.Prepare(
+ "selectFiles",
+ `select * from files
+ where torrent_id = $1
+ order by path asc`,
+ ); err != nil {
+ return err
+ }
+
+ if _, err := s.Prepare(
+ "insertPeer",
+ `with save_peer as (
+ insert into peers
+ (address, created, updated) values ($1, now(), now())
+ returning id
+ ), save_torrent as (
+ insert into torrents (infohash, created, updated)
+ values ($2, now(), now())
+ on conflict (infohash) do update set
+ updated = now()
+ returning id
+ ) insert into peers_torrents
+ (peer_id, torrent_id)
+ select
+ sp.id, st.id
+ from save_peer sp, save_torrent st
+ on conflict do nothing`,
+ ); err != nil {
+ return err
+ }
+
+ if _, err := s.Prepare(
+ "getTorrent",
+ `select * from torrents where infohash = $1 limit 1`,
+ ); err != nil {
+ return err
+ }
+
+ if _, err := s.Prepare(
+ "insertFile",
+ `insert into files
+ (torrent_id, path, size)
+ values
+ ($1, $2, $3)`,
+ ); err != nil {
+ return err
+ }
+
+ if _, err := s.Prepare(
+ "selectTags",
+ `select name
+ from tags t
+ inner join tags_torrents tt on t.id = tt.tag_id
+ where tt.torrent_id = $1`,
+ ); err != nil {
+ return err
+ }
+
+ if _, err := s.Prepare(
+ "removePeer",
+ `delete from peers where address = $1`,
+ ); err != nil {
+ return err
+ }
+
+ if _, err := s.Prepare(
+ "insertTagTorrent",
+ `insert into tags_torrents
+ (tag_id, torrent_id) values ($1, $2)
+ on conflict do nothing`,
+ ); err != nil {
+ return err
+ }
+
+ if _, err := s.Prepare(
+ "insertTag",
+ `insert into tags (name) values ($1)
+ on conflict (name) do update set name = excluded.name returning id`,
+ ); err != nil {
+ return err
+ }
+
+ if _, err := s.Prepare(
+ "torrentsByTag",
+ `select t.id, t.infohash, t.name, t.size, t.created, t.updated
+ from torrents t
+ inner join tags_torrents tt on t.id = tt.torrent_id
+ inner join tags ta on tt.tag_id = ta.id
+ where ta.name = $1 group by t.id
+ order by updated asc
+ limit 50 offset $2`,
+ ); err != nil {
+ return err
+ }
+
+ if _, err := s.Prepare(
+ "searchTorrents",
+ `select t.id, t.infohash, t.name, t.size, t.updated
+ from torrents t
+ where t.tsv @@ plainto_tsquery($1)
+ order by ts_rank(tsv, plainto_tsquery($1)) desc, t.updated desc
+ limit 50 offset $2`,
+ ); err != nil {
+ return err
+ }
+
+ if _, err := s.Prepare(
+ "insertTorrent",
+ `insert into torrents (
+ name, infohash, size, created, updated
+ ) values (
+ $1, $2, $3, now(), now()
+ ) on conflict (infohash) do
+ update set
+ name = $1,
+ size = $3,
+ updated = now()
+ returning id`,
+ ); err != nil {
+ return err
+ }
+
+ if _, err := s.Prepare(
+ "updateFTSVectors",
+ `update torrents set
+ tsv = sub.tsv from (
+ select t.id,
+ setweight(to_tsvector(
+ translate(t.name, '._-', ' ')
+ ), 'A')
+ || setweight(to_tsvector(
+ translate(string_agg(coalesce(f.path, ''), ' '), './_-', ' ')
+ ), 'B') as tsv
+ from torrents t
+ left join files f on t.id = f.torrent_id
+ where t.id = $1
+ group by t.id
+ ) as sub
+ where sub.id = torrents.id`,
+ ); err != nil {
+ return err
+ }
+
+ if _, err := s.Prepare(
+ "baseSchema",
+ `create table if not exists torrents (
+ id serial primary key,
+ infohash bytea not null unique,
+ size bigint,
+ name text,
+ created timestamp with time zone,
+ updated timestamp with time zone,
+ tsv tsvector
+ );
+ create index tsv_idx on torrents using gin(tsv);
+ create table if not exists files (
+ id serial not null primary key,
+ torrent_id integer not null references torrents on delete cascade,
+ path text,
+ size bigint
+ );
+ create table if not exists tags (
+ id serial primary key,
+ name character varying(50) unique
+ );
+ create table if not exists tags_torrents (
+ tag_id integer not null references tags (id) on delete cascade,
+ torrent_id integer not null references torrents (id) on delete cascade,
+ primary key (tag_id, torrent_id)
+ );
+ create table if not exists peers (
+ id serial primary key,
+ address character varying(50) not null,
+ created timestamp with time zone,
+ updated timestamp with time zone
+ );
+ create table if not exists peers_torrents (
+ peer_id integer not null references peers (id) on delete cascade,
+ torrent_id integer not null references torrents (id) on delete cascade,
+ primary key (peer_id, torrent_id)
+ );
+ create table if not exists settings (
+ schema_version integer not null
+ );
+ insert into settings (schema_version) values (1);`,
+ ); err != nil {
+ return err
+ }
+ return nil
+}