aboutsummaryrefslogtreecommitdiff
path: root/dht
diff options
context:
space:
mode:
authorFelix Hanley <felix@userspace.com.au>2018-02-26 11:28:02 +0000
committerFelix Hanley <felix@userspace.com.au>2018-02-26 11:28:02 +0000
commit9c8cbbfa6ecad37533c99c94adc9e0c7e8aa88b8 (patch)
treeb0decacd30fc0e9b0136a7a06b0d04d0872cb890 /dht
parent19f63bb03bf2a83515fd47e6cf10a4db18a923d7 (diff)
downloaddhtsearch-9c8cbbfa6ecad37533c99c94adc9e0c7e8aa88b8.tar.gz
dhtsearch-9c8cbbfa6ecad37533c99c94adc9e0c7e8aa88b8.tar.bz2
Update references to shared structs
Diffstat (limited to 'dht')
-rw-r--r--dht/messages.go33
-rw-r--r--dht/node.go47
-rw-r--r--dht/options.go3
-rw-r--r--dht/remote_node.go4
-rw-r--r--dht/routing_table.go6
-rw-r--r--dht/routing_table_test.go6
6 files changed, 56 insertions, 43 deletions
diff --git a/dht/messages.go b/dht/messages.go
index 9305b6f..d972d65 100644
--- a/dht/messages.go
+++ b/dht/messages.go
@@ -5,6 +5,7 @@ import (
"net"
"github.com/felix/dhtsearch/krpc"
+ "github.com/felix/dhtsearch/models"
)
func (n *Node) onPingQuery(rn remoteNode, msg map[string]interface{}) error {
@@ -29,14 +30,14 @@ func (n *Node) onGetPeersQuery(rn remoteNode, msg map[string]interface{}) error
if err != nil {
return err
}
- th, err := InfohashFromString(torrent)
+ th, err := models.InfohashFromString(torrent)
if err != nil {
return err
}
- n.log.Debug("get_peers query", "source", rn, "torrent", th)
+ //n.log.Debug("get_peers query", "source", rn, "torrent", th)
token := torrent[:2]
- neighbour := generateNeighbour(n.id, *th)
+ neighbour := models.GenerateNeighbour(n.id, *th)
/*
nodes := n.rTable.get(8)
compactNS := []string{}
@@ -78,20 +79,24 @@ func (n *Node) onAnnouncePeerQuery(rn remoteNode, msg map[string]interface{}) er
return err
}
- n.log.Debug("announce_peer", "source", rn)
+ //n.log.Debug("announce_peer", "source", rn)
- if impliedPort, err := krpc.GetInt(a, "implied_port"); err == nil {
- if impliedPort != 0 {
+ host, port, err := net.SplitHostPort(rn.addr.String())
+ if err != nil {
+ return err
+ }
+ if port == "0" {
+ return fmt.Errorf("ignoring port 0")
+ }
+
+ newPort, err := krpc.GetInt(a, "port")
+ if err == nil {
+ if iPort, err := krpc.GetInt(a, "implied_port"); err == nil && iPort == 0 {
// Use the port in the message
- host, _, err := net.SplitHostPort(rn.addr.String())
+ addr, err := net.ResolveUDPAddr(n.family, fmt.Sprintf("%s:%d", host, newPort))
if err != nil {
return err
}
- newPort := a["port"]
- if newPort == 0 {
- return fmt.Errorf("ignoring port 0")
- }
- addr, err := net.ResolveUDPAddr(n.family, fmt.Sprintf("%s:%d", host, newPort))
rn = remoteNode{addr: addr, id: rn.id}
}
}
@@ -102,12 +107,12 @@ func (n *Node) onAnnouncePeerQuery(rn remoteNode, msg map[string]interface{}) er
if err != nil {
return err
}
- ih, err := InfohashFromString(ihStr)
+ ih, err := models.InfohashFromString(ihStr)
if err != nil {
n.log.Warn("invalid torrent", "infohash", ihStr)
}
- p := Peer{Addr: rn.addr, ID: rn.id, Infohash: *ih}
+ p := models.Peer{Addr: rn.addr, Infohash: *ih}
if n.OnAnnouncePeer != nil {
go n.OnAnnouncePeer(p)
}
diff --git a/dht/node.go b/dht/node.go
index f8e6113..64b3fd3 100644
--- a/dht/node.go
+++ b/dht/node.go
@@ -8,6 +8,7 @@ import (
"github.com/felix/dhtsearch/bencode"
"github.com/felix/dhtsearch/krpc"
+ "github.com/felix/dhtsearch/models"
"github.com/felix/logger"
"golang.org/x/time/rate"
)
@@ -24,7 +25,7 @@ var (
// Node joins the DHT network
type Node struct {
- id Infohash
+ id models.Infohash
family string
address string
port int
@@ -38,29 +39,29 @@ type Node struct {
//table routingTable
// OnAnnoucePeer is called for each peer that announces itself
- OnAnnouncePeer func(p Peer)
+ OnAnnouncePeer func(p models.Peer)
}
// NewNode creates a new DHT node
-func NewNode(opts ...Option) (n *Node, err error) {
- id := GenInfohash()
+func NewNode(opts ...Option) (*Node, error) {
+ var err error
+ id := models.GenInfohash()
- k, err := newRoutingTable(id, 2000)
- if err != nil {
- n.log.Error("failed to create routing table", "error", err)
- return nil, err
- }
-
- n = &Node{
+ n := &Node{
id: id,
family: "udp4",
port: 6881,
udpTimeout: 10,
- rTable: k,
limiter: rate.NewLimiter(rate.Limit(100000), 2000000),
log: logger.New(&logger.Options{Name: "dht"}),
}
+ n.rTable, err = newRoutingTable(id, 2000)
+ if err != nil {
+ n.log.Error("failed to create routing table", "error", err)
+ return nil, err
+ }
+
// Set variadic options passed
for _, option := range opts {
err = option(n)
@@ -144,7 +145,7 @@ func (n *Node) makeNeighbours() {
// Send to all nodes
nodes := n.rTable.get(0)
for _, rn := range nodes {
- n.findNode(rn, generateNeighbour(n.id, rn.id))
+ n.findNode(rn, models.GenerateNeighbour(n.id, rn.id))
}
n.rTable.flush()
}
@@ -186,8 +187,8 @@ func (n *Node) packetWriter() {
}
}
-func (n *Node) findNode(rn *remoteNode, id Infohash) {
- target := GenInfohash()
+func (n *Node) findNode(rn *remoteNode, id models.Infohash) {
+ target := models.GenInfohash()
n.sendQuery(rn, "find_node", map[string]interface{}{
"id": string(id),
"target": string(target),
@@ -196,7 +197,7 @@ func (n *Node) findNode(rn *remoteNode, id Infohash) {
// ping sends ping query to the chan.
func (n *Node) ping(rn *remoteNode) {
- id := generateNeighbour(n.id, rn.id)
+ id := models.GenerateNeighbour(n.id, rn.id)
n.sendQuery(rn, "ping", map[string]interface{}{
"id": string(id),
})
@@ -282,7 +283,7 @@ func (n *Node) handleRequest(addr net.Addr, m map[string]interface{}) error {
return err
}
- ih, err := InfohashFromString(id)
+ ih, err := models.InfohashFromString(id)
if err != nil {
return err
}
@@ -321,7 +322,7 @@ func (n *Node) handleResponse(addr net.Addr, m map[string]interface{}) error {
if err != nil {
return err
}
- ih, err := InfohashFromString(id)
+ ih, err := models.InfohashFromString(id)
if err != nil {
return err
}
@@ -372,9 +373,9 @@ func (n *Node) handleError(addr net.Addr, m map[string]interface{}) error {
// Process another node's response to a find_node query.
func (n *Node) processFindNodeResults(rn remoteNode, nodeList string) {
- nodeLength := 26
+ nodeLength := krpc.IPv4NodeAddrLen
if n.family == "udp6" {
- nodeLength = 38
+ nodeLength = krpc.IPv6NodeAddrLen
}
if len(nodeList)%nodeLength != 0 {
@@ -386,10 +387,10 @@ func (n *Node) processFindNodeResults(rn remoteNode, nodeList string) {
// We got a byte array in groups of 26 or 38
for i := 0; i < len(nodeList); i += nodeLength {
- id := nodeList[i : i+ihLength]
- addrStr := krpc.DecodeCompactNodeAddr(nodeList[i+ihLength : i+nodeLength])
+ id := nodeList[i : i+models.InfohashLength]
+ addrStr := krpc.DecodeCompactNodeAddr(nodeList[i+models.InfohashLength : i+nodeLength])
- ih, err := InfohashFromString(id)
+ ih, err := models.InfohashFromString(id)
if err != nil {
n.log.Warn("invalid infohash in node list")
continue
diff --git a/dht/options.go b/dht/options.go
index b7ded8a..12a64a6 100644
--- a/dht/options.go
+++ b/dht/options.go
@@ -1,12 +1,13 @@
package dht
import (
+ "github.com/felix/dhtsearch/models"
"github.com/felix/logger"
)
type Option func(*Node) error
-func SetOnAnnouncePeer(f func(Peer)) Option {
+func SetOnAnnouncePeer(f func(models.Peer)) Option {
return func(n *Node) error {
n.OnAnnouncePeer = f
return nil
diff --git a/dht/remote_node.go b/dht/remote_node.go
index 4bb9319..7dbf893 100644
--- a/dht/remote_node.go
+++ b/dht/remote_node.go
@@ -3,11 +3,13 @@ package dht
import (
"fmt"
"net"
+
+ "github.com/felix/dhtsearch/models"
)
type remoteNode struct {
addr net.Addr
- id Infohash
+ id models.Infohash
}
// String implements fmt.Stringer
diff --git a/dht/routing_table.go b/dht/routing_table.go
index b10574c..3c9f72c 100644
--- a/dht/routing_table.go
+++ b/dht/routing_table.go
@@ -3,6 +3,8 @@ package dht
import (
"container/heap"
"sync"
+
+ "github.com/felix/dhtsearch/models"
)
type rItem struct {
@@ -14,14 +16,14 @@ type rItem struct {
type priorityQueue []*rItem
type routingTable struct {
- id Infohash
+ id models.Infohash
max int
items priorityQueue
addresses map[string]*remoteNode
sync.Mutex
}
-func newRoutingTable(id Infohash, max int) (*routingTable, error) {
+func newRoutingTable(id models.Infohash, max int) (*routingTable, error) {
k := &routingTable{
id: id,
max: max,
diff --git a/dht/routing_table_test.go b/dht/routing_table_test.go
index 1eeeca3..e251791 100644
--- a/dht/routing_table_test.go
+++ b/dht/routing_table_test.go
@@ -4,6 +4,8 @@ import (
"fmt"
"net"
"testing"
+
+ "github.com/felix/dhtsearch/models"
)
func TestPriorityQueue(t *testing.T) {
@@ -16,7 +18,7 @@ func TestPriorityQueue(t *testing.T) {
"d1c5676ae7ac98e8b19f63565905105e3c4c37a3", // distance of 159
}
- ih, err := InfohashFromString(id)
+ ih, err := models.InfohashFromString(id)
if err != nil {
t.Errorf("failed to create infohash: %s\n", err)
}
@@ -27,7 +29,7 @@ func TestPriorityQueue(t *testing.T) {
}
for i, idt := range tests {
- iht, err := InfohashFromString(idt)
+ iht, err := models.InfohashFromString(idt)
if err != nil {
t.Errorf("failed to create infohash: %s\n", err)
}