diff options
| author | Felix Hanley <felix@userspace.com.au> | 2018-03-20 11:37:09 +0000 |
|---|---|---|
| committer | Felix Hanley <felix@userspace.com.au> | 2018-03-20 11:37:09 +0000 |
| commit | e2e746843dca7874d287420336b3ec4830203ff5 (patch) | |
| tree | c806afa152d33f1124ea03e943db60ead0497b9e | |
| parent | ef4aec50a8f6ace8076d4bfeef0ba64cf8598048 (diff) | |
| download | dhtsearch-e2e746843dca7874d287420336b3ec4830203ff5.tar.gz dhtsearch-e2e746843dca7874d287420336b3ec4830203ff5.tar.bz2 | |
Start working on SQL to relieve load
| -rw-r--r-- | bt/worker.go | 2 | ||||
| -rw-r--r-- | cmd/main.go | 29 | ||||
| -rw-r--r-- | db/pgsql.go | 28 | ||||
| -rw-r--r-- | dht/node.go | 10 | ||||
| -rw-r--r-- | dht/options.go | 13 | ||||
| -rw-r--r-- | models/peer.go | 4 | ||||
| -rw-r--r-- | models/storage.go | 3 |
7 files changed, 52 insertions, 37 deletions
diff --git a/bt/worker.go b/bt/worker.go index 95669a4..1ae64a9 100644 --- a/bt/worker.go +++ b/bt/worker.go @@ -85,7 +85,7 @@ func (bt *Worker) Run() error { bt.log.Debug("worker got work", "peer", p) md, err := bt.fetchMetadata(p) if err != nil { - //bt.log.Warn("failed to fetch metadata", "error", err) + bt.log.Debug("failed to fetch metadata", "error", err) if bt.OnBadPeer != nil { bt.OnBadPeer(p) } diff --git a/cmd/main.go b/cmd/main.go index 62a5826..95e6315 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -14,6 +14,7 @@ import ( "github.com/felix/dhtsearch/dht" "github.com/felix/dhtsearch/models" "github.com/felix/logger" + "github.com/hashicorp/golang-lru" ) var ( @@ -43,7 +44,7 @@ var ( var ( dsn string ihBlacklist map[string]bool - peerBlacklist map[string]bool + peerBlacklist *lru.ARCCache ) func main() { @@ -88,7 +89,11 @@ func main() { createTagRegexps() ihBlacklist = make(map[string]bool) - peerBlacklist = make(map[string]bool) + peerBlacklist, err = lru.NewARC(1000) + if err != nil { + log.Error("failed to create blacklist", "error", err) + os.Exit(1) + } // TODO read in existing blacklist // TODO populate bloom filter @@ -115,43 +120,45 @@ func startDHTNodes(s models.PeerStore) { dht.SetLogger(log.Named("dht")), dht.SetPort(port+i), dht.SetIPv6(ipv6), + dht.SetBlacklist(peerBlacklist), dht.SetOnAnnouncePeer(func(p models.Peer) { if black := ihBlacklist[p.Infohash.String()]; black { log.Debug("ignoring blacklisted infohash", "peer", p) return } - if black := peerBlacklist[p.Addr.String()]; black { - log.Debug("ignoring blacklisted peer", "peer", p) - return - } log.Debug("peer announce", "peer", p) err := s.SavePeer(&p) if err != nil { log.Error("failed to save peer", "error", err) } }), + dht.SetOnBadPeer(func(p models.Peer) { + err := s.RemovePeer(&p) + if err != nil { + log.Error("failed to remove peer", "error", err) + } + }), ) if err != nil { log.Error("failed to create node", "error", err) - //return nodes, err + continue } go dht.Run() nodes[i] = dht } - //return nodes, err } func processPendingPeers(s models.InfohashStore) { log.Debug("processing pending peers") for { - peers, err := s.PendingInfohashes(10) + peers, err := s.PendingInfohashes(100) if err != nil { - log.Debug("failed to get pending peer", "error", err) + log.Warn("failed to get pending peer", "error", err) time.Sleep(time.Second * 1) continue } for _, p := range peers { - //log.Debug("pending peer retrieved", "peer", *p) + log.Debug("pending peer retrieved", "peer", *p) select { case w := <-pool: //log.Debug("assigning peer to bt worker") diff --git a/db/pgsql.go b/db/pgsql.go index 990605c..e263d8f 100644 --- a/db/pgsql.go +++ b/db/pgsql.go @@ -276,23 +276,19 @@ const ( ) as sub where sub.id = torrents.id` - sqlSelectPendingInfohashes = `with get_pending as ( - select p.id - from peers p - join peers_torrents pt on p.id = pt.peer_id - join torrents t on pt.torrent_id = t.id + sqlSelectPendingInfohashes = `with get_order as ( + select t.id as torrent_id, min(pt.peer_id) as peer_id, count(pt.peer_id) as c + from torrents t + join peers_torrents pt on pt.torrent_id = t.id where t.name is null - order by p.updated asc - limit $1 for update - ), stamp_peer as ( - update peers set updated = now() - where id in (select id from get_pending) - ) select - p.address, t.infohash - from peers p - join peers_torrents pt on p.id = pt.peer_id - join torrents t on pt.torrent_id = t.id - where p.id in (select id from get_pending)` + group by t.id + -- order by c desc + order by t.updated desc + limit $1 + ) select p.address, t.infohash + from get_order go + join torrents t on t.id = go.torrent_id + join peers p on p.id = go.peer_id` sqlSearchTorrents = `select t.id, t.infohash, t.name, t.size, t.updated diff --git a/dht/node.go b/dht/node.go index 829f244..b0dd9ec 100644 --- a/dht/node.go +++ b/dht/node.go @@ -40,7 +40,9 @@ type Node struct { blacklist *lru.ARCCache // OnAnnoucePeer is called for each peer that announces itself - OnAnnouncePeer func(p models.Peer) + OnAnnouncePeer func(models.Peer) + // OnBadPeer is called for each bad peer + OnBadPeer func(models.Peer) } // NewNode creates a new DHT node @@ -191,6 +193,10 @@ func (n *Node) packetWriter() { n.blacklist.Add(p.raddr.String(), true) // TODO reduce limit n.log.Warn("failed to write packet", "error", err) + if n.OnBadPeer != nil { + peer := models.Peer{Addr: p.raddr} + go n.OnBadPeer(peer) + } } } } @@ -245,7 +251,7 @@ func (n *Node) processPacket(p packet) error { } if _, black := n.blacklist.Get(p.raddr.String()); black { - return fmt.Errorf("blacklisted", "address", p.raddr.String()) + return fmt.Errorf("blacklisted: %s", p.raddr.String()) } switch y { diff --git a/dht/options.go b/dht/options.go index 094d8f7..86d7af7 100644 --- a/dht/options.go +++ b/dht/options.go @@ -15,6 +15,13 @@ func SetOnAnnouncePeer(f func(models.Peer)) Option { } } +func SetOnBadPeer(f func(models.Peer)) Option { + return func(n *Node) error { + n.OnBadPeer = f + return nil + } +} + // SetAddress sets the IP address to listen on func SetAddress(ip string) Option { return func(n *Node) error { @@ -57,10 +64,10 @@ func SetLogger(l logger.Logger) Option { } } -// SetBlacklistSize sets the size of the node blacklist -func SetBlacklistSize(s int) Option { +// SetBlacklist sets the size of the node blacklist +func SetBlacklist(bl *lru.ARCCache) Option { return func(n *Node) (err error) { - n.blacklist, err = lru.NewARC(s) + n.blacklist = bl return err } } diff --git a/models/peer.go b/models/peer.go index 220d979..2810fd4 100644 --- a/models/peer.go +++ b/models/peer.go @@ -10,8 +10,8 @@ import ( type Peer struct { Addr net.Addr `db:"address"` Infohash Infohash `db:"infohash"` - Updated time.Time `json:"updated"` - Created time.Time `json:"created"` + Created time.Time `db:"created" json:"created"` + Updated time.Time `db:"updated" json:"updated"` } // String implements fmt.Stringer diff --git a/models/storage.go b/models/storage.go index c8a8344..92806ba 100644 --- a/models/storage.go +++ b/models/storage.go @@ -14,12 +14,11 @@ type torrentSearcher interface { type PeerStore interface { SavePeer(*Peer) error + RemovePeer(*Peer) error } type TorrentStore interface { SaveTorrent(*Torrent) error - // TODO - RemovePeer(*Peer) error } type InfohashStore interface { |
