diff options
Diffstat (limited to 'dht')
| -rw-r--r-- | dht/infohash.go | 2 | ||||
| -rw-r--r-- | dht/krpc.go | 12 | ||||
| -rw-r--r-- | dht/messages.go | 110 | ||||
| -rw-r--r-- | dht/node.go | 38 |
4 files changed, 93 insertions, 69 deletions
diff --git a/dht/infohash.go b/dht/infohash.go index cd12446..6d4596d 100644 --- a/dht/infohash.go +++ b/dht/infohash.go @@ -78,7 +78,7 @@ func (ih Infohash) Distance(other Infohash) int { } func generateNeighbour(first, second Infohash) Infohash { - s := append(first[:10], second[10:]...) + s := append(second[:10], first[10:]...) return Infohash(s) } diff --git a/dht/krpc.go b/dht/krpc.go index bf66e20..2a7c103 100644 --- a/dht/krpc.go +++ b/dht/krpc.go @@ -49,6 +49,18 @@ func getStringKey(data map[string]interface{}, key string) (string, error) { return out, nil } +func getIntKey(data map[string]interface{}, key string) (int, error) { + val, ok := data[key] + if !ok { + return 0, fmt.Errorf("krpc: missing key %s", key) + } + out, ok := val.(int) + if !ok { + return 0, fmt.Errorf("krpc: key type mismatch") + } + return out, nil +} + func getMapKey(data map[string]interface{}, key string) (map[string]interface{}, error) { val, ok := data[key] if !ok { diff --git a/dht/messages.go b/dht/messages.go index be7d6b6..023e27d 100644 --- a/dht/messages.go +++ b/dht/messages.go @@ -3,50 +3,58 @@ package dht import ( "fmt" "net" - "strings" + //"strings" ) -func (n *Node) onPingQuery(rn remoteNode, msg map[string]interface{}) { - t := msg["t"].(string) - //n.log.Debug("ping", "source", rn) +func (n *Node) onPingQuery(rn remoteNode, msg map[string]interface{}) error { + t, err := getStringKey(msg, "t") + if err != nil { + return err + } n.queueMsg(rn, makeResponse(t, map[string]interface{}{ "id": string(n.id), })) + return nil } -func (n *Node) onGetPeersQuery(rn remoteNode, msg map[string]interface{}) { - a := msg["a"].(map[string]interface{}) - if err := checkKey(a, "info_hash", "string"); err != nil { - //n.queueMsg(addr, makeError(t, protocolError, err.Error())) - return +func (n *Node) onGetPeersQuery(rn remoteNode, msg map[string]interface{}) error { + a, err := getMapKey(msg, "a") + if err != nil { + return err } // This is the ih of the torrent - th, err := InfohashFromString(a["info_hash"].(string)) + torrent, err := getStringKey(a, "info_hash") if err != nil { - n.log.Warn("invalid torrent", "infohash", a["info_hash"]) + return err + } + th, err := InfohashFromString(torrent) + if err != nil { + return err } n.log.Debug("get_peers query", "source", rn, "torrent", th) - token := []byte(*th)[:2] - - id := generateNeighbour(n.id, *th) - nodes := n.rTable.get(8) - compactNS := []string{} - for _, rn := range nodes { - ns := encodeCompactNodeAddr(rn.address.String()) - if ns == "" { - n.log.Warn("failed to compact node", "address", rn.address.String()) - continue + token := torrent[:2] + neighbour := generateNeighbour(n.id, *th) + /* + nodes := n.rTable.get(8) + compactNS := []string{} + for _, rn := range nodes { + ns := encodeCompactNodeAddr(rn.address.String()) + if ns == "" { + n.log.Warn("failed to compact node", "address", rn.address.String()) + continue + } + compactNS = append(compactNS, ns) } - compactNS = append(compactNS, ns) - } + */ t := msg["t"].(string) n.queueMsg(rn, makeResponse(t, map[string]interface{}{ - "id": string(id), + "id": string(neighbour), "token": token, - "nodes": strings.Join(compactNS, ""), + "nodes": "", + //"nodes": strings.Join(compactNS, ""), })) //nodes := n.rTable.get(50) @@ -60,46 +68,47 @@ func (n *Node) onGetPeersQuery(rn remoteNode, msg map[string]interface{}) { n.queueMsg(*o, q) } */ + return nil } -func (n *Node) onAnnouncePeerQuery(rn remoteNode, msg map[string]interface{}) { - a := msg["a"].(map[string]interface{}) - err := checkKeys(a, [][]string{ +func (n *Node) onAnnouncePeerQuery(rn remoteNode, msg map[string]interface{}) error { + a, err := getMapKey(msg, "a") + if err != nil { + return err + } + err = checkKeys(a, [][]string{ {"info_hash", "string"}, {"port", "int"}, {"token", "string"}, }) - if err != nil { - //n.queueMsg(addr, makeError(t, protocolError, err.Error())) - return - } n.log.Debug("announce_peer", "source", rn) - // TODO - if impliedPort, ok := a["implied_port"]; ok && impliedPort.(int) != 0 { - // Use the port from the network - } else { - // Use the port in the message - host, _, err := net.SplitHostPort(rn.address.String()) - if err != nil { - n.log.Warn("failed to split host/port", "error", err) - return - } - newPort := a["port"] - if newPort == 0 { - n.log.Warn("sent port 0", "source", rn) - return + if impliedPort, err := getIntKey(a, "implied_port"); err == nil { + if impliedPort != 0 { + // Use the port in the message + host, _, err := net.SplitHostPort(rn.address.String()) + if err != nil { + return err + } + newPort := a["port"] + if newPort == 0 { + return fmt.Errorf("ignoring port 0") + } + addr, err := net.ResolveUDPAddr(n.family, fmt.Sprintf("%s:%d", host, newPort)) + rn = remoteNode{address: addr, id: rn.id} } - addr, err := net.ResolveUDPAddr(n.family, fmt.Sprintf("%s:%d", host, newPort)) - rn = remoteNode{address: addr, id: rn.id} } // TODO do we reply? - ih, err := InfohashFromString(a["info_hash"].(string)) + ihStr, err := getStringKey(a, "info_hash") + if err != nil { + return err + } + ih, err := InfohashFromString(ihStr) if err != nil { - n.log.Warn("invalid torrent", "infohash", a["info_hash"]) + n.log.Warn("invalid torrent", "infohash", ihStr) } p := Peer{Node: rn, Infohash: *ih} @@ -107,6 +116,7 @@ func (n *Node) onAnnouncePeerQuery(rn remoteNode, msg map[string]interface{}) { if n.OnAnnouncePeer != nil { go n.OnAnnouncePeer(p) } + return nil } func (n *Node) onFindNodeResponse(rn remoteNode, msg map[string]interface{}) { diff --git a/dht/node.go b/dht/node.go index 7905964..db61eed 100644 --- a/dht/node.go +++ b/dht/node.go @@ -175,6 +175,7 @@ func (n *Node) packetWriter() { n.log.Warn("rate limited", "error", err) continue } + //n.log.Debug("writing packet", "dest", p.raddr.String()) _, err := n.conn.WriteTo(p.data, p.raddr) if err != nil { // TODO remove from routing or add to blacklist? @@ -242,7 +243,7 @@ func (n *Node) processPacket(p packet) { switch response["y"].(string) { case "q": - n.handleRequest(p.raddr, response) + err = n.handleRequest(p.raddr, response) case "r": err = n.handleResponse(p.raddr, response) case "e": @@ -270,48 +271,49 @@ func (n *Node) queueMsg(rn remoteNode, data map[string]interface{}) error { } // handleRequest handles the requests received from udp. -func (n *Node) handleRequest(addr net.Addr, m map[string]interface{}) (success bool) { - if err := checkKeys(m, [][]string{{"q", "string"}, {"a", "map"}}); err != nil { - - //d.queueMsg(addr, makeError(t, protocolError, err.Error())) - return +func (n *Node) handleRequest(addr net.Addr, m map[string]interface{}) error { + q, err := getStringKey(m, "q") + if err != nil { + return err } - a := m["a"].(map[string]interface{}) + a, err := getMapKey(m, "a") + if err != nil { + return err + } - if err := checkKey(a, "id", "string"); err != nil { - //d.queueMsg(addr, makeError(t, protocolError, err.Error())) - return + id, err := getStringKey(a, "id") + if err != nil { + return err } - ih, err := InfohashFromString(a["id"].(string)) + ih, err := InfohashFromString(id) if err != nil { - n.log.Warn("invalid request", "infohash", a["id"].(string)) + return err } if n.id.Equal(*ih) { - return + return nil } rn := &remoteNode{address: addr, id: *ih} - q := m["q"].(string) switch q { case "ping": - n.onPingQuery(*rn, m) + err = n.onPingQuery(*rn, m) case "get_peers": - n.onGetPeersQuery(*rn, m) + err = n.onGetPeersQuery(*rn, m) case "announce_peer": n.onAnnouncePeerQuery(*rn, m) default: //n.queueMsg(addr, makeError(t, protocolError, "invalid q")) - return + return nil } n.rTable.add(rn) - return true + return err } // handleResponse handles responses received from udp. |
