diff options
| author | Felix Hanley <felix@userspace.com.au> | 2018-02-26 11:28:02 +0000 |
|---|---|---|
| committer | Felix Hanley <felix@userspace.com.au> | 2018-02-26 11:28:02 +0000 |
| commit | 9c8cbbfa6ecad37533c99c94adc9e0c7e8aa88b8 (patch) | |
| tree | b0decacd30fc0e9b0136a7a06b0d04d0872cb890 /dht | |
| parent | 19f63bb03bf2a83515fd47e6cf10a4db18a923d7 (diff) | |
| download | dhtsearch-9c8cbbfa6ecad37533c99c94adc9e0c7e8aa88b8.tar.gz dhtsearch-9c8cbbfa6ecad37533c99c94adc9e0c7e8aa88b8.tar.bz2 | |
Update references to shared structs
Diffstat (limited to 'dht')
| -rw-r--r-- | dht/messages.go | 33 | ||||
| -rw-r--r-- | dht/node.go | 47 | ||||
| -rw-r--r-- | dht/options.go | 3 | ||||
| -rw-r--r-- | dht/remote_node.go | 4 | ||||
| -rw-r--r-- | dht/routing_table.go | 6 | ||||
| -rw-r--r-- | dht/routing_table_test.go | 6 |
6 files changed, 56 insertions, 43 deletions
diff --git a/dht/messages.go b/dht/messages.go index 9305b6f..d972d65 100644 --- a/dht/messages.go +++ b/dht/messages.go @@ -5,6 +5,7 @@ import ( "net" "github.com/felix/dhtsearch/krpc" + "github.com/felix/dhtsearch/models" ) func (n *Node) onPingQuery(rn remoteNode, msg map[string]interface{}) error { @@ -29,14 +30,14 @@ func (n *Node) onGetPeersQuery(rn remoteNode, msg map[string]interface{}) error if err != nil { return err } - th, err := InfohashFromString(torrent) + th, err := models.InfohashFromString(torrent) if err != nil { return err } - n.log.Debug("get_peers query", "source", rn, "torrent", th) + //n.log.Debug("get_peers query", "source", rn, "torrent", th) token := torrent[:2] - neighbour := generateNeighbour(n.id, *th) + neighbour := models.GenerateNeighbour(n.id, *th) /* nodes := n.rTable.get(8) compactNS := []string{} @@ -78,20 +79,24 @@ func (n *Node) onAnnouncePeerQuery(rn remoteNode, msg map[string]interface{}) er return err } - n.log.Debug("announce_peer", "source", rn) + //n.log.Debug("announce_peer", "source", rn) - if impliedPort, err := krpc.GetInt(a, "implied_port"); err == nil { - if impliedPort != 0 { + host, port, err := net.SplitHostPort(rn.addr.String()) + if err != nil { + return err + } + if port == "0" { + return fmt.Errorf("ignoring port 0") + } + + newPort, err := krpc.GetInt(a, "port") + if err == nil { + if iPort, err := krpc.GetInt(a, "implied_port"); err == nil && iPort == 0 { // Use the port in the message - host, _, err := net.SplitHostPort(rn.addr.String()) + addr, err := net.ResolveUDPAddr(n.family, fmt.Sprintf("%s:%d", host, newPort)) if err != nil { return err } - newPort := a["port"] - if newPort == 0 { - return fmt.Errorf("ignoring port 0") - } - addr, err := net.ResolveUDPAddr(n.family, fmt.Sprintf("%s:%d", host, newPort)) rn = remoteNode{addr: addr, id: rn.id} } } @@ -102,12 +107,12 @@ func (n *Node) onAnnouncePeerQuery(rn remoteNode, msg map[string]interface{}) er if err != nil { return err } - ih, err := InfohashFromString(ihStr) + ih, err := models.InfohashFromString(ihStr) if err != nil { n.log.Warn("invalid torrent", "infohash", ihStr) } - p := Peer{Addr: rn.addr, ID: rn.id, Infohash: *ih} + p := models.Peer{Addr: rn.addr, Infohash: *ih} if n.OnAnnouncePeer != nil { go n.OnAnnouncePeer(p) } diff --git a/dht/node.go b/dht/node.go index f8e6113..64b3fd3 100644 --- a/dht/node.go +++ b/dht/node.go @@ -8,6 +8,7 @@ import ( "github.com/felix/dhtsearch/bencode" "github.com/felix/dhtsearch/krpc" + "github.com/felix/dhtsearch/models" "github.com/felix/logger" "golang.org/x/time/rate" ) @@ -24,7 +25,7 @@ var ( // Node joins the DHT network type Node struct { - id Infohash + id models.Infohash family string address string port int @@ -38,29 +39,29 @@ type Node struct { //table routingTable // OnAnnoucePeer is called for each peer that announces itself - OnAnnouncePeer func(p Peer) + OnAnnouncePeer func(p models.Peer) } // NewNode creates a new DHT node -func NewNode(opts ...Option) (n *Node, err error) { - id := GenInfohash() +func NewNode(opts ...Option) (*Node, error) { + var err error + id := models.GenInfohash() - k, err := newRoutingTable(id, 2000) - if err != nil { - n.log.Error("failed to create routing table", "error", err) - return nil, err - } - - n = &Node{ + n := &Node{ id: id, family: "udp4", port: 6881, udpTimeout: 10, - rTable: k, limiter: rate.NewLimiter(rate.Limit(100000), 2000000), log: logger.New(&logger.Options{Name: "dht"}), } + n.rTable, err = newRoutingTable(id, 2000) + if err != nil { + n.log.Error("failed to create routing table", "error", err) + return nil, err + } + // Set variadic options passed for _, option := range opts { err = option(n) @@ -144,7 +145,7 @@ func (n *Node) makeNeighbours() { // Send to all nodes nodes := n.rTable.get(0) for _, rn := range nodes { - n.findNode(rn, generateNeighbour(n.id, rn.id)) + n.findNode(rn, models.GenerateNeighbour(n.id, rn.id)) } n.rTable.flush() } @@ -186,8 +187,8 @@ func (n *Node) packetWriter() { } } -func (n *Node) findNode(rn *remoteNode, id Infohash) { - target := GenInfohash() +func (n *Node) findNode(rn *remoteNode, id models.Infohash) { + target := models.GenInfohash() n.sendQuery(rn, "find_node", map[string]interface{}{ "id": string(id), "target": string(target), @@ -196,7 +197,7 @@ func (n *Node) findNode(rn *remoteNode, id Infohash) { // ping sends ping query to the chan. func (n *Node) ping(rn *remoteNode) { - id := generateNeighbour(n.id, rn.id) + id := models.GenerateNeighbour(n.id, rn.id) n.sendQuery(rn, "ping", map[string]interface{}{ "id": string(id), }) @@ -282,7 +283,7 @@ func (n *Node) handleRequest(addr net.Addr, m map[string]interface{}) error { return err } - ih, err := InfohashFromString(id) + ih, err := models.InfohashFromString(id) if err != nil { return err } @@ -321,7 +322,7 @@ func (n *Node) handleResponse(addr net.Addr, m map[string]interface{}) error { if err != nil { return err } - ih, err := InfohashFromString(id) + ih, err := models.InfohashFromString(id) if err != nil { return err } @@ -372,9 +373,9 @@ func (n *Node) handleError(addr net.Addr, m map[string]interface{}) error { // Process another node's response to a find_node query. func (n *Node) processFindNodeResults(rn remoteNode, nodeList string) { - nodeLength := 26 + nodeLength := krpc.IPv4NodeAddrLen if n.family == "udp6" { - nodeLength = 38 + nodeLength = krpc.IPv6NodeAddrLen } if len(nodeList)%nodeLength != 0 { @@ -386,10 +387,10 @@ func (n *Node) processFindNodeResults(rn remoteNode, nodeList string) { // We got a byte array in groups of 26 or 38 for i := 0; i < len(nodeList); i += nodeLength { - id := nodeList[i : i+ihLength] - addrStr := krpc.DecodeCompactNodeAddr(nodeList[i+ihLength : i+nodeLength]) + id := nodeList[i : i+models.InfohashLength] + addrStr := krpc.DecodeCompactNodeAddr(nodeList[i+models.InfohashLength : i+nodeLength]) - ih, err := InfohashFromString(id) + ih, err := models.InfohashFromString(id) if err != nil { n.log.Warn("invalid infohash in node list") continue diff --git a/dht/options.go b/dht/options.go index b7ded8a..12a64a6 100644 --- a/dht/options.go +++ b/dht/options.go @@ -1,12 +1,13 @@ package dht import ( + "github.com/felix/dhtsearch/models" "github.com/felix/logger" ) type Option func(*Node) error -func SetOnAnnouncePeer(f func(Peer)) Option { +func SetOnAnnouncePeer(f func(models.Peer)) Option { return func(n *Node) error { n.OnAnnouncePeer = f return nil diff --git a/dht/remote_node.go b/dht/remote_node.go index 4bb9319..7dbf893 100644 --- a/dht/remote_node.go +++ b/dht/remote_node.go @@ -3,11 +3,13 @@ package dht import ( "fmt" "net" + + "github.com/felix/dhtsearch/models" ) type remoteNode struct { addr net.Addr - id Infohash + id models.Infohash } // String implements fmt.Stringer diff --git a/dht/routing_table.go b/dht/routing_table.go index b10574c..3c9f72c 100644 --- a/dht/routing_table.go +++ b/dht/routing_table.go @@ -3,6 +3,8 @@ package dht import ( "container/heap" "sync" + + "github.com/felix/dhtsearch/models" ) type rItem struct { @@ -14,14 +16,14 @@ type rItem struct { type priorityQueue []*rItem type routingTable struct { - id Infohash + id models.Infohash max int items priorityQueue addresses map[string]*remoteNode sync.Mutex } -func newRoutingTable(id Infohash, max int) (*routingTable, error) { +func newRoutingTable(id models.Infohash, max int) (*routingTable, error) { k := &routingTable{ id: id, max: max, diff --git a/dht/routing_table_test.go b/dht/routing_table_test.go index 1eeeca3..e251791 100644 --- a/dht/routing_table_test.go +++ b/dht/routing_table_test.go @@ -4,6 +4,8 @@ import ( "fmt" "net" "testing" + + "github.com/felix/dhtsearch/models" ) func TestPriorityQueue(t *testing.T) { @@ -16,7 +18,7 @@ func TestPriorityQueue(t *testing.T) { "d1c5676ae7ac98e8b19f63565905105e3c4c37a3", // distance of 159 } - ih, err := InfohashFromString(id) + ih, err := models.InfohashFromString(id) if err != nil { t.Errorf("failed to create infohash: %s\n", err) } @@ -27,7 +29,7 @@ func TestPriorityQueue(t *testing.T) { } for i, idt := range tests { - iht, err := InfohashFromString(idt) + iht, err := models.InfohashFromString(idt) if err != nil { t.Errorf("failed to create infohash: %s\n", err) } |
