aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFelix Hanley <felix@userspace.com.au>2018-03-20 11:37:09 +0000
committerFelix Hanley <felix@userspace.com.au>2018-03-20 11:37:09 +0000
commite2e746843dca7874d287420336b3ec4830203ff5 (patch)
treec806afa152d33f1124ea03e943db60ead0497b9e
parentef4aec50a8f6ace8076d4bfeef0ba64cf8598048 (diff)
downloaddhtsearch-e2e746843dca7874d287420336b3ec4830203ff5.tar.gz
dhtsearch-e2e746843dca7874d287420336b3ec4830203ff5.tar.bz2
Start working on SQL to relieve load
-rw-r--r--bt/worker.go2
-rw-r--r--cmd/main.go29
-rw-r--r--db/pgsql.go28
-rw-r--r--dht/node.go10
-rw-r--r--dht/options.go13
-rw-r--r--models/peer.go4
-rw-r--r--models/storage.go3
7 files changed, 52 insertions, 37 deletions
diff --git a/bt/worker.go b/bt/worker.go
index 95669a4..1ae64a9 100644
--- a/bt/worker.go
+++ b/bt/worker.go
@@ -85,7 +85,7 @@ func (bt *Worker) Run() error {
bt.log.Debug("worker got work", "peer", p)
md, err := bt.fetchMetadata(p)
if err != nil {
- //bt.log.Warn("failed to fetch metadata", "error", err)
+ bt.log.Debug("failed to fetch metadata", "error", err)
if bt.OnBadPeer != nil {
bt.OnBadPeer(p)
}
diff --git a/cmd/main.go b/cmd/main.go
index 62a5826..95e6315 100644
--- a/cmd/main.go
+++ b/cmd/main.go
@@ -14,6 +14,7 @@ import (
"github.com/felix/dhtsearch/dht"
"github.com/felix/dhtsearch/models"
"github.com/felix/logger"
+ "github.com/hashicorp/golang-lru"
)
var (
@@ -43,7 +44,7 @@ var (
var (
dsn string
ihBlacklist map[string]bool
- peerBlacklist map[string]bool
+ peerBlacklist *lru.ARCCache
)
func main() {
@@ -88,7 +89,11 @@ func main() {
createTagRegexps()
ihBlacklist = make(map[string]bool)
- peerBlacklist = make(map[string]bool)
+ peerBlacklist, err = lru.NewARC(1000)
+ if err != nil {
+ log.Error("failed to create blacklist", "error", err)
+ os.Exit(1)
+ }
// TODO read in existing blacklist
// TODO populate bloom filter
@@ -115,43 +120,45 @@ func startDHTNodes(s models.PeerStore) {
dht.SetLogger(log.Named("dht")),
dht.SetPort(port+i),
dht.SetIPv6(ipv6),
+ dht.SetBlacklist(peerBlacklist),
dht.SetOnAnnouncePeer(func(p models.Peer) {
if black := ihBlacklist[p.Infohash.String()]; black {
log.Debug("ignoring blacklisted infohash", "peer", p)
return
}
- if black := peerBlacklist[p.Addr.String()]; black {
- log.Debug("ignoring blacklisted peer", "peer", p)
- return
- }
log.Debug("peer announce", "peer", p)
err := s.SavePeer(&p)
if err != nil {
log.Error("failed to save peer", "error", err)
}
}),
+ dht.SetOnBadPeer(func(p models.Peer) {
+ err := s.RemovePeer(&p)
+ if err != nil {
+ log.Error("failed to remove peer", "error", err)
+ }
+ }),
)
if err != nil {
log.Error("failed to create node", "error", err)
- //return nodes, err
+ continue
}
go dht.Run()
nodes[i] = dht
}
- //return nodes, err
}
func processPendingPeers(s models.InfohashStore) {
log.Debug("processing pending peers")
for {
- peers, err := s.PendingInfohashes(10)
+ peers, err := s.PendingInfohashes(100)
if err != nil {
- log.Debug("failed to get pending peer", "error", err)
+ log.Warn("failed to get pending peer", "error", err)
time.Sleep(time.Second * 1)
continue
}
for _, p := range peers {
- //log.Debug("pending peer retrieved", "peer", *p)
+ log.Debug("pending peer retrieved", "peer", *p)
select {
case w := <-pool:
//log.Debug("assigning peer to bt worker")
diff --git a/db/pgsql.go b/db/pgsql.go
index 990605c..e263d8f 100644
--- a/db/pgsql.go
+++ b/db/pgsql.go
@@ -276,23 +276,19 @@ const (
) as sub
where sub.id = torrents.id`
- sqlSelectPendingInfohashes = `with get_pending as (
- select p.id
- from peers p
- join peers_torrents pt on p.id = pt.peer_id
- join torrents t on pt.torrent_id = t.id
+ sqlSelectPendingInfohashes = `with get_order as (
+ select t.id as torrent_id, min(pt.peer_id) as peer_id, count(pt.peer_id) as c
+ from torrents t
+ join peers_torrents pt on pt.torrent_id = t.id
where t.name is null
- order by p.updated asc
- limit $1 for update
- ), stamp_peer as (
- update peers set updated = now()
- where id in (select id from get_pending)
- ) select
- p.address, t.infohash
- from peers p
- join peers_torrents pt on p.id = pt.peer_id
- join torrents t on pt.torrent_id = t.id
- where p.id in (select id from get_pending)`
+ group by t.id
+ -- order by c desc
+ order by t.updated desc
+ limit $1
+ ) select p.address, t.infohash
+ from get_order go
+ join torrents t on t.id = go.torrent_id
+ join peers p on p.id = go.peer_id`
sqlSearchTorrents = `select
t.id, t.infohash, t.name, t.size, t.updated
diff --git a/dht/node.go b/dht/node.go
index 829f244..b0dd9ec 100644
--- a/dht/node.go
+++ b/dht/node.go
@@ -40,7 +40,9 @@ type Node struct {
blacklist *lru.ARCCache
// OnAnnoucePeer is called for each peer that announces itself
- OnAnnouncePeer func(p models.Peer)
+ OnAnnouncePeer func(models.Peer)
+ // OnBadPeer is called for each bad peer
+ OnBadPeer func(models.Peer)
}
// NewNode creates a new DHT node
@@ -191,6 +193,10 @@ func (n *Node) packetWriter() {
n.blacklist.Add(p.raddr.String(), true)
// TODO reduce limit
n.log.Warn("failed to write packet", "error", err)
+ if n.OnBadPeer != nil {
+ peer := models.Peer{Addr: p.raddr}
+ go n.OnBadPeer(peer)
+ }
}
}
}
@@ -245,7 +251,7 @@ func (n *Node) processPacket(p packet) error {
}
if _, black := n.blacklist.Get(p.raddr.String()); black {
- return fmt.Errorf("blacklisted", "address", p.raddr.String())
+ return fmt.Errorf("blacklisted: %s", p.raddr.String())
}
switch y {
diff --git a/dht/options.go b/dht/options.go
index 094d8f7..86d7af7 100644
--- a/dht/options.go
+++ b/dht/options.go
@@ -15,6 +15,13 @@ func SetOnAnnouncePeer(f func(models.Peer)) Option {
}
}
+func SetOnBadPeer(f func(models.Peer)) Option {
+ return func(n *Node) error {
+ n.OnBadPeer = f
+ return nil
+ }
+}
+
// SetAddress sets the IP address to listen on
func SetAddress(ip string) Option {
return func(n *Node) error {
@@ -57,10 +64,10 @@ func SetLogger(l logger.Logger) Option {
}
}
-// SetBlacklistSize sets the size of the node blacklist
-func SetBlacklistSize(s int) Option {
+// SetBlacklist sets the size of the node blacklist
+func SetBlacklist(bl *lru.ARCCache) Option {
return func(n *Node) (err error) {
- n.blacklist, err = lru.NewARC(s)
+ n.blacklist = bl
return err
}
}
diff --git a/models/peer.go b/models/peer.go
index 220d979..2810fd4 100644
--- a/models/peer.go
+++ b/models/peer.go
@@ -10,8 +10,8 @@ import (
type Peer struct {
Addr net.Addr `db:"address"`
Infohash Infohash `db:"infohash"`
- Updated time.Time `json:"updated"`
- Created time.Time `json:"created"`
+ Created time.Time `db:"created" json:"created"`
+ Updated time.Time `db:"updated" json:"updated"`
}
// String implements fmt.Stringer
diff --git a/models/storage.go b/models/storage.go
index c8a8344..92806ba 100644
--- a/models/storage.go
+++ b/models/storage.go
@@ -14,12 +14,11 @@ type torrentSearcher interface {
type PeerStore interface {
SavePeer(*Peer) error
+ RemovePeer(*Peer) error
}
type TorrentStore interface {
SaveTorrent(*Torrent) error
- // TODO
- RemovePeer(*Peer) error
}
type InfohashStore interface {