aboutsummaryrefslogtreecommitdiff
path: root/dht/node.go
diff options
context:
space:
mode:
Diffstat (limited to 'dht/node.go')
-rw-r--r--dht/node.go67
1 files changed, 37 insertions, 30 deletions
diff --git a/dht/node.go b/dht/node.go
index 169f219..7905964 100644
--- a/dht/node.go
+++ b/dht/node.go
@@ -1,10 +1,9 @@
package dht
import (
- //"fmt"
"context"
+ "fmt"
"net"
- "strconv"
"time"
"github.com/felix/dhtsearch/bencode"
@@ -14,6 +13,7 @@ import (
var (
routers = []string{
+ "dht.libtorrent.org:25401",
"router.bittorrent.com:6881",
"dht.transmissionbt.com:6881",
"router.utorrent.com:6881",
@@ -24,6 +24,7 @@ var (
// Node joins the DHT network
type Node struct {
id Infohash
+ family string
address string
port int
conn net.PacketConn
@@ -32,6 +33,7 @@ type Node struct {
udpTimeout int
packetsOut chan packet
log logger.Logger
+ limiter *rate.Limiter
//table routingTable
// OnAnnoucePeer is called for each peer that announces itself
@@ -50,10 +52,11 @@ func NewNode(opts ...Option) (n *Node, err error) {
n = &Node{
id: id,
- address: "0.0.0.0",
+ family: "udp4",
port: 6881,
udpTimeout: 10,
rTable: k,
+ limiter: rate.NewLimiter(rate.Limit(100000), 2000000),
log: logger.New(&logger.Options{Name: "dht"}),
}
@@ -65,12 +68,24 @@ func NewNode(opts ...Option) (n *Node, err error) {
}
}
- n.conn, err = net.ListenPacket("udp", n.address+":"+strconv.Itoa(n.port))
+ if n.family != "udp4" {
+ n.log.Debug("trying udp6 server")
+ n.conn, err = net.ListenPacket("udp6", fmt.Sprintf("[%s]:%d", net.IPv6zero.String(), n.port))
+ if err == nil {
+ n.family = "udp6"
+ }
+ }
+ if n.conn == nil {
+ n.conn, err = net.ListenPacket("udp4", fmt.Sprintf("%s:%d", net.IPv4zero.String(), n.port))
+ if err == nil {
+ n.family = "udp4"
+ }
+ }
if err != nil {
n.log.Error("failed to listen", "error", err)
return nil, err
}
- n.log.Info("listening", "id", n.id, "network", n.conn.LocalAddr().Network(), "address", n.conn.LocalAddr().String())
+ n.log.Info("listening", "id", n.id, "network", n.family, "address", n.conn.LocalAddr().String())
return n, nil
}
@@ -136,13 +151,13 @@ func (n *Node) makeNeighbours() {
}
}
-func (n Node) bootstrap() {
+func (n *Node) bootstrap() {
n.log.Debug("bootstrapping")
for _, s := range routers {
- addr, err := net.ResolveUDPAddr(n.conn.LocalAddr().Network(), s)
+ addr, err := net.ResolveUDPAddr(n.family, s)
if err != nil {
n.log.Error("failed to parse bootstrap address", "error", err)
- return
+ continue
}
rn := &remoteNode{address: addr}
n.findNode(rn, n.id)
@@ -150,24 +165,26 @@ func (n Node) bootstrap() {
}
func (n *Node) packetWriter() {
- l := rate.NewLimiter(rate.Limit(500), 100)
-
for p := range n.packetsOut {
+ if p.raddr.String() == n.conn.LocalAddr().String() {
+ continue
+ }
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
- if err := l.Wait(ctx); err != nil {
+ if err := n.limiter.WaitN(ctx, len(p.data)); err != nil {
n.log.Warn("rate limited", "error", err)
continue
}
_, err := n.conn.WriteTo(p.data, p.raddr)
if err != nil {
// TODO remove from routing or add to blacklist?
+ // TODO reduce limit
n.log.Warn("failed to write packet", "error", err)
}
}
}
-func (n Node) findNode(rn *remoteNode, id Infohash) {
+func (n *Node) findNode(rn *remoteNode, id Infohash) {
target := randomInfoHash()
n.sendQuery(rn, "find_node", map[string]interface{}{
"id": string(id),
@@ -183,7 +200,7 @@ func (n *Node) ping(rn *remoteNode) {
})
}
-func (n Node) sendQuery(rn *remoteNode, qType string, a map[string]interface{}) error {
+func (n *Node) sendQuery(rn *remoteNode, qType string, a map[string]interface{}) error {
// Stop if sending to self
if rn.id.Equal(n.id) {
return nil
@@ -328,7 +345,7 @@ func (n *Node) handleResponse(addr net.Addr, m map[string]interface{}) error {
if err == nil {
n.log.Debug("get_peers response", "source", rn)
for _, v := range values {
- addr := compactNodeInfoToString(v.(string))
+ addr := decodeCompactNodeAddr(v.(string))
n.log.Debug("unhandled get_peer request", "addres", addr)
// TODO new peer needs to be matched to previous get_peers request
@@ -359,19 +376,9 @@ func (n *Node) handleError(addr net.Addr, m map[string]interface{}) bool {
// Process another node's response to a find_node query.
func (n *Node) processFindNodeResults(rn remoteNode, nodeList string) {
nodeLength := 26
- /*
- if d.config.proto == "udp6" {
- nodeList = m.R.Nodes6
- nodeLength = 38
- } else {
- nodeList = m.R.Nodes
- }
-
- // Not much to do
- if nodeList == "" {
- return
- }
- */
+ if n.family == "udp6" {
+ nodeLength = 38
+ }
if len(nodeList)%nodeLength != 0 {
n.log.Error("node list is wrong length", "length", len(nodeList))
@@ -383,7 +390,7 @@ func (n *Node) processFindNodeResults(rn remoteNode, nodeList string) {
// We got a byte array in groups of 26 or 38
for i := 0; i < len(nodeList); i += nodeLength {
id := nodeList[i : i+ihLength]
- addrStr := compactNodeInfoToString(nodeList[i+ihLength : i+nodeLength])
+ addrStr := decodeCompactNodeAddr(nodeList[i+ihLength : i+nodeLength])
ih, err := InfohashFromString(id)
if err != nil {
@@ -391,9 +398,9 @@ func (n *Node) processFindNodeResults(rn remoteNode, nodeList string) {
continue
}
- addr, err := net.ResolveUDPAddr("udp", addrStr)
+ addr, err := net.ResolveUDPAddr(n.family, addrStr)
if err != nil || addr.Port == 0 {
- n.log.Warn("unable to resolve", "address", addrStr, "error", err)
+ //n.log.Warn("unable to resolve", "address", addrStr, "error", err)
continue
}