aboutsummaryrefslogtreecommitdiff
path: root/dht
diff options
context:
space:
mode:
Diffstat (limited to 'dht')
-rw-r--r--dht/infohash.go2
-rw-r--r--dht/krpc.go12
-rw-r--r--dht/messages.go110
-rw-r--r--dht/node.go38
4 files changed, 93 insertions, 69 deletions
diff --git a/dht/infohash.go b/dht/infohash.go
index cd12446..6d4596d 100644
--- a/dht/infohash.go
+++ b/dht/infohash.go
@@ -78,7 +78,7 @@ func (ih Infohash) Distance(other Infohash) int {
}
func generateNeighbour(first, second Infohash) Infohash {
- s := append(first[:10], second[10:]...)
+ s := append(second[:10], first[10:]...)
return Infohash(s)
}
diff --git a/dht/krpc.go b/dht/krpc.go
index bf66e20..2a7c103 100644
--- a/dht/krpc.go
+++ b/dht/krpc.go
@@ -49,6 +49,18 @@ func getStringKey(data map[string]interface{}, key string) (string, error) {
return out, nil
}
+func getIntKey(data map[string]interface{}, key string) (int, error) {
+ val, ok := data[key]
+ if !ok {
+ return 0, fmt.Errorf("krpc: missing key %s", key)
+ }
+ out, ok := val.(int)
+ if !ok {
+ return 0, fmt.Errorf("krpc: key type mismatch")
+ }
+ return out, nil
+}
+
func getMapKey(data map[string]interface{}, key string) (map[string]interface{}, error) {
val, ok := data[key]
if !ok {
diff --git a/dht/messages.go b/dht/messages.go
index be7d6b6..023e27d 100644
--- a/dht/messages.go
+++ b/dht/messages.go
@@ -3,50 +3,58 @@ package dht
import (
"fmt"
"net"
- "strings"
+ //"strings"
)
-func (n *Node) onPingQuery(rn remoteNode, msg map[string]interface{}) {
- t := msg["t"].(string)
- //n.log.Debug("ping", "source", rn)
+func (n *Node) onPingQuery(rn remoteNode, msg map[string]interface{}) error {
+ t, err := getStringKey(msg, "t")
+ if err != nil {
+ return err
+ }
n.queueMsg(rn, makeResponse(t, map[string]interface{}{
"id": string(n.id),
}))
+ return nil
}
-func (n *Node) onGetPeersQuery(rn remoteNode, msg map[string]interface{}) {
- a := msg["a"].(map[string]interface{})
- if err := checkKey(a, "info_hash", "string"); err != nil {
- //n.queueMsg(addr, makeError(t, protocolError, err.Error()))
- return
+func (n *Node) onGetPeersQuery(rn remoteNode, msg map[string]interface{}) error {
+ a, err := getMapKey(msg, "a")
+ if err != nil {
+ return err
}
// This is the ih of the torrent
- th, err := InfohashFromString(a["info_hash"].(string))
+ torrent, err := getStringKey(a, "info_hash")
if err != nil {
- n.log.Warn("invalid torrent", "infohash", a["info_hash"])
+ return err
+ }
+ th, err := InfohashFromString(torrent)
+ if err != nil {
+ return err
}
n.log.Debug("get_peers query", "source", rn, "torrent", th)
- token := []byte(*th)[:2]
-
- id := generateNeighbour(n.id, *th)
- nodes := n.rTable.get(8)
- compactNS := []string{}
- for _, rn := range nodes {
- ns := encodeCompactNodeAddr(rn.address.String())
- if ns == "" {
- n.log.Warn("failed to compact node", "address", rn.address.String())
- continue
+ token := torrent[:2]
+ neighbour := generateNeighbour(n.id, *th)
+ /*
+ nodes := n.rTable.get(8)
+ compactNS := []string{}
+ for _, rn := range nodes {
+ ns := encodeCompactNodeAddr(rn.address.String())
+ if ns == "" {
+ n.log.Warn("failed to compact node", "address", rn.address.String())
+ continue
+ }
+ compactNS = append(compactNS, ns)
}
- compactNS = append(compactNS, ns)
- }
+ */
t := msg["t"].(string)
n.queueMsg(rn, makeResponse(t, map[string]interface{}{
- "id": string(id),
+ "id": string(neighbour),
"token": token,
- "nodes": strings.Join(compactNS, ""),
+ "nodes": "",
+ //"nodes": strings.Join(compactNS, ""),
}))
//nodes := n.rTable.get(50)
@@ -60,46 +68,47 @@ func (n *Node) onGetPeersQuery(rn remoteNode, msg map[string]interface{}) {
n.queueMsg(*o, q)
}
*/
+ return nil
}
-func (n *Node) onAnnouncePeerQuery(rn remoteNode, msg map[string]interface{}) {
- a := msg["a"].(map[string]interface{})
- err := checkKeys(a, [][]string{
+func (n *Node) onAnnouncePeerQuery(rn remoteNode, msg map[string]interface{}) error {
+ a, err := getMapKey(msg, "a")
+ if err != nil {
+ return err
+ }
+ err = checkKeys(a, [][]string{
{"info_hash", "string"},
{"port", "int"},
{"token", "string"},
})
- if err != nil {
- //n.queueMsg(addr, makeError(t, protocolError, err.Error()))
- return
- }
n.log.Debug("announce_peer", "source", rn)
- // TODO
- if impliedPort, ok := a["implied_port"]; ok && impliedPort.(int) != 0 {
- // Use the port from the network
- } else {
- // Use the port in the message
- host, _, err := net.SplitHostPort(rn.address.String())
- if err != nil {
- n.log.Warn("failed to split host/port", "error", err)
- return
- }
- newPort := a["port"]
- if newPort == 0 {
- n.log.Warn("sent port 0", "source", rn)
- return
+ if impliedPort, err := getIntKey(a, "implied_port"); err == nil {
+ if impliedPort != 0 {
+ // Use the port in the message
+ host, _, err := net.SplitHostPort(rn.address.String())
+ if err != nil {
+ return err
+ }
+ newPort := a["port"]
+ if newPort == 0 {
+ return fmt.Errorf("ignoring port 0")
+ }
+ addr, err := net.ResolveUDPAddr(n.family, fmt.Sprintf("%s:%d", host, newPort))
+ rn = remoteNode{address: addr, id: rn.id}
}
- addr, err := net.ResolveUDPAddr(n.family, fmt.Sprintf("%s:%d", host, newPort))
- rn = remoteNode{address: addr, id: rn.id}
}
// TODO do we reply?
- ih, err := InfohashFromString(a["info_hash"].(string))
+ ihStr, err := getStringKey(a, "info_hash")
+ if err != nil {
+ return err
+ }
+ ih, err := InfohashFromString(ihStr)
if err != nil {
- n.log.Warn("invalid torrent", "infohash", a["info_hash"])
+ n.log.Warn("invalid torrent", "infohash", ihStr)
}
p := Peer{Node: rn, Infohash: *ih}
@@ -107,6 +116,7 @@ func (n *Node) onAnnouncePeerQuery(rn remoteNode, msg map[string]interface{}) {
if n.OnAnnouncePeer != nil {
go n.OnAnnouncePeer(p)
}
+ return nil
}
func (n *Node) onFindNodeResponse(rn remoteNode, msg map[string]interface{}) {
diff --git a/dht/node.go b/dht/node.go
index 7905964..db61eed 100644
--- a/dht/node.go
+++ b/dht/node.go
@@ -175,6 +175,7 @@ func (n *Node) packetWriter() {
n.log.Warn("rate limited", "error", err)
continue
}
+ //n.log.Debug("writing packet", "dest", p.raddr.String())
_, err := n.conn.WriteTo(p.data, p.raddr)
if err != nil {
// TODO remove from routing or add to blacklist?
@@ -242,7 +243,7 @@ func (n *Node) processPacket(p packet) {
switch response["y"].(string) {
case "q":
- n.handleRequest(p.raddr, response)
+ err = n.handleRequest(p.raddr, response)
case "r":
err = n.handleResponse(p.raddr, response)
case "e":
@@ -270,48 +271,49 @@ func (n *Node) queueMsg(rn remoteNode, data map[string]interface{}) error {
}
// handleRequest handles the requests received from udp.
-func (n *Node) handleRequest(addr net.Addr, m map[string]interface{}) (success bool) {
- if err := checkKeys(m, [][]string{{"q", "string"}, {"a", "map"}}); err != nil {
-
- //d.queueMsg(addr, makeError(t, protocolError, err.Error()))
- return
+func (n *Node) handleRequest(addr net.Addr, m map[string]interface{}) error {
+ q, err := getStringKey(m, "q")
+ if err != nil {
+ return err
}
- a := m["a"].(map[string]interface{})
+ a, err := getMapKey(m, "a")
+ if err != nil {
+ return err
+ }
- if err := checkKey(a, "id", "string"); err != nil {
- //d.queueMsg(addr, makeError(t, protocolError, err.Error()))
- return
+ id, err := getStringKey(a, "id")
+ if err != nil {
+ return err
}
- ih, err := InfohashFromString(a["id"].(string))
+ ih, err := InfohashFromString(id)
if err != nil {
- n.log.Warn("invalid request", "infohash", a["id"].(string))
+ return err
}
if n.id.Equal(*ih) {
- return
+ return nil
}
rn := &remoteNode{address: addr, id: *ih}
- q := m["q"].(string)
switch q {
case "ping":
- n.onPingQuery(*rn, m)
+ err = n.onPingQuery(*rn, m)
case "get_peers":
- n.onGetPeersQuery(*rn, m)
+ err = n.onGetPeersQuery(*rn, m)
case "announce_peer":
n.onAnnouncePeerQuery(*rn, m)
default:
//n.queueMsg(addr, makeError(t, protocolError, "invalid q"))
- return
+ return nil
}
n.rTable.add(rn)
- return true
+ return err
}
// handleResponse handles responses received from udp.