diff options
Diffstat (limited to 'dht/node.go')
| -rw-r--r-- | dht/node.go | 67 |
1 files changed, 37 insertions, 30 deletions
diff --git a/dht/node.go b/dht/node.go index 169f219..7905964 100644 --- a/dht/node.go +++ b/dht/node.go @@ -1,10 +1,9 @@ package dht import ( - //"fmt" "context" + "fmt" "net" - "strconv" "time" "github.com/felix/dhtsearch/bencode" @@ -14,6 +13,7 @@ import ( var ( routers = []string{ + "dht.libtorrent.org:25401", "router.bittorrent.com:6881", "dht.transmissionbt.com:6881", "router.utorrent.com:6881", @@ -24,6 +24,7 @@ var ( // Node joins the DHT network type Node struct { id Infohash + family string address string port int conn net.PacketConn @@ -32,6 +33,7 @@ type Node struct { udpTimeout int packetsOut chan packet log logger.Logger + limiter *rate.Limiter //table routingTable // OnAnnoucePeer is called for each peer that announces itself @@ -50,10 +52,11 @@ func NewNode(opts ...Option) (n *Node, err error) { n = &Node{ id: id, - address: "0.0.0.0", + family: "udp4", port: 6881, udpTimeout: 10, rTable: k, + limiter: rate.NewLimiter(rate.Limit(100000), 2000000), log: logger.New(&logger.Options{Name: "dht"}), } @@ -65,12 +68,24 @@ func NewNode(opts ...Option) (n *Node, err error) { } } - n.conn, err = net.ListenPacket("udp", n.address+":"+strconv.Itoa(n.port)) + if n.family != "udp4" { + n.log.Debug("trying udp6 server") + n.conn, err = net.ListenPacket("udp6", fmt.Sprintf("[%s]:%d", net.IPv6zero.String(), n.port)) + if err == nil { + n.family = "udp6" + } + } + if n.conn == nil { + n.conn, err = net.ListenPacket("udp4", fmt.Sprintf("%s:%d", net.IPv4zero.String(), n.port)) + if err == nil { + n.family = "udp4" + } + } if err != nil { n.log.Error("failed to listen", "error", err) return nil, err } - n.log.Info("listening", "id", n.id, "network", n.conn.LocalAddr().Network(), "address", n.conn.LocalAddr().String()) + n.log.Info("listening", "id", n.id, "network", n.family, "address", n.conn.LocalAddr().String()) return n, nil } @@ -136,13 +151,13 @@ func (n *Node) makeNeighbours() { } } -func (n Node) bootstrap() { +func (n *Node) bootstrap() { n.log.Debug("bootstrapping") for _, s := range routers { - addr, err := net.ResolveUDPAddr(n.conn.LocalAddr().Network(), s) + addr, err := net.ResolveUDPAddr(n.family, s) if err != nil { n.log.Error("failed to parse bootstrap address", "error", err) - return + continue } rn := &remoteNode{address: addr} n.findNode(rn, n.id) @@ -150,24 +165,26 @@ func (n Node) bootstrap() { } func (n *Node) packetWriter() { - l := rate.NewLimiter(rate.Limit(500), 100) - for p := range n.packetsOut { + if p.raddr.String() == n.conn.LocalAddr().String() { + continue + } ctx, cancel := context.WithCancel(context.Background()) defer cancel() - if err := l.Wait(ctx); err != nil { + if err := n.limiter.WaitN(ctx, len(p.data)); err != nil { n.log.Warn("rate limited", "error", err) continue } _, err := n.conn.WriteTo(p.data, p.raddr) if err != nil { // TODO remove from routing or add to blacklist? + // TODO reduce limit n.log.Warn("failed to write packet", "error", err) } } } -func (n Node) findNode(rn *remoteNode, id Infohash) { +func (n *Node) findNode(rn *remoteNode, id Infohash) { target := randomInfoHash() n.sendQuery(rn, "find_node", map[string]interface{}{ "id": string(id), @@ -183,7 +200,7 @@ func (n *Node) ping(rn *remoteNode) { }) } -func (n Node) sendQuery(rn *remoteNode, qType string, a map[string]interface{}) error { +func (n *Node) sendQuery(rn *remoteNode, qType string, a map[string]interface{}) error { // Stop if sending to self if rn.id.Equal(n.id) { return nil @@ -328,7 +345,7 @@ func (n *Node) handleResponse(addr net.Addr, m map[string]interface{}) error { if err == nil { n.log.Debug("get_peers response", "source", rn) for _, v := range values { - addr := compactNodeInfoToString(v.(string)) + addr := decodeCompactNodeAddr(v.(string)) n.log.Debug("unhandled get_peer request", "addres", addr) // TODO new peer needs to be matched to previous get_peers request @@ -359,19 +376,9 @@ func (n *Node) handleError(addr net.Addr, m map[string]interface{}) bool { // Process another node's response to a find_node query. func (n *Node) processFindNodeResults(rn remoteNode, nodeList string) { nodeLength := 26 - /* - if d.config.proto == "udp6" { - nodeList = m.R.Nodes6 - nodeLength = 38 - } else { - nodeList = m.R.Nodes - } - - // Not much to do - if nodeList == "" { - return - } - */ + if n.family == "udp6" { + nodeLength = 38 + } if len(nodeList)%nodeLength != 0 { n.log.Error("node list is wrong length", "length", len(nodeList)) @@ -383,7 +390,7 @@ func (n *Node) processFindNodeResults(rn remoteNode, nodeList string) { // We got a byte array in groups of 26 or 38 for i := 0; i < len(nodeList); i += nodeLength { id := nodeList[i : i+ihLength] - addrStr := compactNodeInfoToString(nodeList[i+ihLength : i+nodeLength]) + addrStr := decodeCompactNodeAddr(nodeList[i+ihLength : i+nodeLength]) ih, err := InfohashFromString(id) if err != nil { @@ -391,9 +398,9 @@ func (n *Node) processFindNodeResults(rn remoteNode, nodeList string) { continue } - addr, err := net.ResolveUDPAddr("udp", addrStr) + addr, err := net.ResolveUDPAddr(n.family, addrStr) if err != nil || addr.Port == 0 { - n.log.Warn("unable to resolve", "address", addrStr, "error", err) + //n.log.Warn("unable to resolve", "address", addrStr, "error", err) continue } |
