diff options
| author | Felix Hanley <felix@userspace.com.au> | 2018-02-15 11:42:34 +0000 |
|---|---|---|
| committer | Felix Hanley <felix@userspace.com.au> | 2018-02-15 11:42:40 +0000 |
| commit | 32a655f042a3752d93c4507b4c128b21bf6aa602 (patch) | |
| tree | 224c0d7e51efccac3b32dc5d0662baa2ab7304a5 /dht/messages.go | |
| parent | 2ded0704c8f675c3d92cf2b4874a32c65faf2553 (diff) | |
| download | dhtsearch-32a655f042a3752d93c4507b4c128b21bf6aa602.tar.gz dhtsearch-32a655f042a3752d93c4507b4c128b21bf6aa602.tar.bz2 | |
Refactor DHT code into separate package
Diffstat (limited to 'dht/messages.go')
| -rw-r--r-- | dht/messages.go | 105 |
1 files changed, 105 insertions, 0 deletions
diff --git a/dht/messages.go b/dht/messages.go new file mode 100644 index 0000000..94b15ee --- /dev/null +++ b/dht/messages.go @@ -0,0 +1,105 @@ +package dht + +import ( + "fmt" + "net" +) + +func (n *Node) onPingQuery(rn remoteNode, msg map[string]interface{}) { + t := msg["t"].(string) + //n.log.Debug("ping", "source", rn) + n.queueMsg(rn, makeResponse(t, map[string]interface{}{ + "id": string(n.id), + })) +} + +func (n *Node) onGetPeersQuery(rn remoteNode, msg map[string]interface{}) { + a := msg["a"].(map[string]interface{}) + if err := checkKey(a, "info_hash", "string"); err != nil { + //n.queueMsg(addr, makeError(t, protocolError, err.Error())) + return + } + + // This is the ih of the torrent + th, err := InfohashFromString(a["info_hash"].(string)) + if err != nil { + n.log.Warn("invalid torrent", "infohash", a["info_hash"]) + } + n.log.Debug("get_peers query", "source", rn, "torrent", th) + + token := []byte(*th)[:2] + + id := generateNeighbour(n.id, *th) + t := msg["t"].(string) + n.queueMsg(rn, makeResponse(t, map[string]interface{}{ + "id": string(id), + "token": token, + "nodes": "", + })) + + nodes := n.rTable.get(50) + fmt.Printf("sending get_peers for %s to %d nodes\n", *th, len(nodes)) + q := makeQuery(newTransactionID(), "get_peers", map[string]interface{}{ + "id": string(id), + "info_hash": string(*th), + }) + for _, o := range nodes { + n.queueMsg(*o, q) + } +} + +func (n *Node) onAnnouncePeerQuery(rn remoteNode, msg map[string]interface{}) { + a := msg["a"].(map[string]interface{}) + err := checkKeys(a, [][]string{ + {"info_hash", "string"}, + {"port", "int"}, + {"token", "string"}, + }) + if err != nil { + //n.queueMsg(addr, makeError(t, protocolError, err.Error())) + return + } + + n.log.Debug("announce_peer", "source", rn) + + // TODO + if impliedPort, ok := a["implied_port"]; ok && impliedPort.(int) != 0 { + // Use the port from the network + } else { + // Use the port in the message + host, _, err := net.SplitHostPort(rn.address.String()) + if err != nil { + n.log.Warn("failed to split host/port", "error", err) + return + } + newPort := a["port"] + if newPort == 0 { + n.log.Warn("sent port 0", "source", rn) + return + } + addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", host, newPort)) + rn = remoteNode{address: addr, id: rn.id} + } + + // TODO do we reply? + + ih, err := InfohashFromString(a["info_hash"].(string)) + if err != nil { + n.log.Warn("invalid torrent", "infohash", a["info_hash"]) + } + + p := Peer{Node: rn, Infohash: *ih} + n.log.Info("anounce_peer", p) + if n.OnAnnouncePeer != nil { + go n.OnAnnouncePeer(p) + } +} + +func (n *Node) onFindNodeResponse(rn remoteNode, msg map[string]interface{}) { + r := msg["r"].(map[string]interface{}) + if err := checkKey(r, "id", "string"); err != nil { + return + } + nodes := r["nodes"].(string) + n.processFindNodeResults(rn, nodes) +} |
