diff options
| author | Felix Hanley <felix@userspace.com.au> | 2018-02-21 04:20:06 +0000 |
|---|---|---|
| committer | Felix Hanley <felix@userspace.com.au> | 2018-02-21 04:21:39 +0000 |
| commit | e9adf3a2bf8b81615275a6705b7957e43753f0ec (patch) | |
| tree | 1eaeb5081f3914a8ffa936d96ad1f1548c9aeb2f | |
| parent | 020a8f9ec7e541d284ddb65111aafe42547927e5 (diff) | |
| download | dhtsearch-e9adf3a2bf8b81615275a6705b7957e43753f0ec.tar.gz dhtsearch-e9adf3a2bf8b81615275a6705b7957e43753f0ec.tar.bz2 | |
Seperate shared packages
| -rw-r--r-- | db.go | 28 | ||||
| -rw-r--r-- | dht/infohash.go | 2 | ||||
| -rw-r--r-- | dht/infohash_test.go | 2 | ||||
| -rw-r--r-- | dht/messages.go | 38 | ||||
| -rw-r--r-- | dht/node.go | 77 | ||||
| -rw-r--r-- | dht/peer.go | 11 | ||||
| -rw-r--r-- | dht/remote_node.go | 7 | ||||
| -rw-r--r-- | dht/routing_table.go | 6 | ||||
| -rw-r--r-- | dht/routing_table_test.go | 2 | ||||
| -rw-r--r-- | http.go | 299 | ||||
| -rw-r--r-- | infohash_test.go | 43 | ||||
| -rw-r--r-- | krpc/krpc.go (renamed from dht/krpc.go) | 30 | ||||
| -rw-r--r-- | krpc/krpc_test.go (renamed from dht/krpc_test.go) | 2 | ||||
| -rw-r--r-- | models/tag.go | 5 | ||||
| -rw-r--r-- | models/torrent.go | 98 | ||||
| -rw-r--r-- | torrent.go | 190 | ||||
| -rw-r--r-- | util.go | 92 |
17 files changed, 187 insertions, 745 deletions
@@ -1,28 +0,0 @@ -package dhtsearch - -import ( - "fmt" - _ "github.com/jackc/pgx/stdlib" - "github.com/jmoiron/sqlx" -) - -type database struct { - *sqlx.DB -} - -// Global -var DB *database - -func newDB(dsn string) (*database, error) { - d, err := sqlx.Connect("pgx", dsn) - if err != nil { - fmt.Printf("Error creating DB %q\n", err) - return nil, err - } - var count int - err = d.QueryRow("select count(*) from torrents").Scan(&count) - if err != nil { - return nil, err - } - return &database{d}, nil -} diff --git a/dht/infohash.go b/dht/infohash.go index 6d4596d..cb5170e 100644 --- a/dht/infohash.go +++ b/dht/infohash.go @@ -82,7 +82,7 @@ func generateNeighbour(first, second Infohash) Infohash { return Infohash(s) } -func randomInfoHash() (ih Infohash) { +func GenInfohash() (ih Infohash) { random := rand.New(rand.NewSource(time.Now().UnixNano())) hash := sha1.New() io.WriteString(hash, time.Now().String()) diff --git a/dht/infohash_test.go b/dht/infohash_test.go index 1574b19..6d627fc 100644 --- a/dht/infohash_test.go +++ b/dht/infohash_test.go @@ -39,7 +39,7 @@ func TestInfohashImport(t *testing.T) { } func TestInfohashLength(t *testing.T) { - ih := randomInfoHash() + ih := GenInfohash() if len(ih) != 20 { t.Errorf("%s as string should be length 20, got %d", ih, len(ih)) } diff --git a/dht/messages.go b/dht/messages.go index 023e27d..9305b6f 100644 --- a/dht/messages.go +++ b/dht/messages.go @@ -3,28 +3,29 @@ package dht import ( "fmt" "net" - //"strings" + + "github.com/felix/dhtsearch/krpc" ) func (n *Node) onPingQuery(rn remoteNode, msg map[string]interface{}) error { - t, err := getStringKey(msg, "t") + t, err := krpc.GetString(msg, "t") if err != nil { return err } - n.queueMsg(rn, makeResponse(t, map[string]interface{}{ + n.queueMsg(rn, krpc.MakeResponse(t, map[string]interface{}{ "id": string(n.id), })) return nil } func (n *Node) onGetPeersQuery(rn remoteNode, msg map[string]interface{}) error { - a, err := getMapKey(msg, "a") + a, err := krpc.GetMap(msg, "a") if err != nil { return err } // This is the ih of the torrent - torrent, err := getStringKey(a, "info_hash") + torrent, err := krpc.GetString(a, "info_hash") if err != nil { return err } @@ -40,7 +41,7 @@ func (n *Node) onGetPeersQuery(rn remoteNode, msg map[string]interface{}) error nodes := n.rTable.get(8) compactNS := []string{} for _, rn := range nodes { - ns := encodeCompactNodeAddr(rn.address.String()) + ns := encodeCompactNodeAddr(rn.addr.String()) if ns == "" { n.log.Warn("failed to compact node", "address", rn.address.String()) continue @@ -50,7 +51,7 @@ func (n *Node) onGetPeersQuery(rn remoteNode, msg map[string]interface{}) error */ t := msg["t"].(string) - n.queueMsg(rn, makeResponse(t, map[string]interface{}{ + n.queueMsg(rn, krpc.MakeResponse(t, map[string]interface{}{ "id": string(neighbour), "token": token, "nodes": "", @@ -60,7 +61,7 @@ func (n *Node) onGetPeersQuery(rn remoteNode, msg map[string]interface{}) error //nodes := n.rTable.get(50) /* fmt.Printf("sending get_peers for %s to %d nodes\n", *th, len(nodes)) - q := makeQuery(newTransactionID(), "get_peers", map[string]interface{}{ + q := krpc.MakeQuery(newTransactionID(), "get_peers", map[string]interface{}{ "id": string(id), "info_hash": string(*th), }) @@ -72,22 +73,17 @@ func (n *Node) onGetPeersQuery(rn remoteNode, msg map[string]interface{}) error } func (n *Node) onAnnouncePeerQuery(rn remoteNode, msg map[string]interface{}) error { - a, err := getMapKey(msg, "a") + a, err := krpc.GetMap(msg, "a") if err != nil { return err } - err = checkKeys(a, [][]string{ - {"info_hash", "string"}, - {"port", "int"}, - {"token", "string"}, - }) n.log.Debug("announce_peer", "source", rn) - if impliedPort, err := getIntKey(a, "implied_port"); err == nil { + if impliedPort, err := krpc.GetInt(a, "implied_port"); err == nil { if impliedPort != 0 { // Use the port in the message - host, _, err := net.SplitHostPort(rn.address.String()) + host, _, err := net.SplitHostPort(rn.addr.String()) if err != nil { return err } @@ -96,13 +92,13 @@ func (n *Node) onAnnouncePeerQuery(rn remoteNode, msg map[string]interface{}) er return fmt.Errorf("ignoring port 0") } addr, err := net.ResolveUDPAddr(n.family, fmt.Sprintf("%s:%d", host, newPort)) - rn = remoteNode{address: addr, id: rn.id} + rn = remoteNode{addr: addr, id: rn.id} } } // TODO do we reply? - ihStr, err := getStringKey(a, "info_hash") + ihStr, err := krpc.GetString(a, "info_hash") if err != nil { return err } @@ -111,8 +107,7 @@ func (n *Node) onAnnouncePeerQuery(rn remoteNode, msg map[string]interface{}) er n.log.Warn("invalid torrent", "infohash", ihStr) } - p := Peer{Node: rn, Infohash: *ih} - n.log.Info("anounce_peer", p) + p := Peer{Addr: rn.addr, ID: rn.id, Infohash: *ih} if n.OnAnnouncePeer != nil { go n.OnAnnouncePeer(p) } @@ -121,9 +116,6 @@ func (n *Node) onAnnouncePeerQuery(rn remoteNode, msg map[string]interface{}) er func (n *Node) onFindNodeResponse(rn remoteNode, msg map[string]interface{}) { r := msg["r"].(map[string]interface{}) - if err := checkKey(r, "id", "string"); err != nil { - return - } nodes := r["nodes"].(string) n.processFindNodeResults(rn, nodes) } diff --git a/dht/node.go b/dht/node.go index db61eed..f8e6113 100644 --- a/dht/node.go +++ b/dht/node.go @@ -7,6 +7,7 @@ import ( "time" "github.com/felix/dhtsearch/bencode" + "github.com/felix/dhtsearch/krpc" "github.com/felix/logger" "golang.org/x/time/rate" ) @@ -42,7 +43,7 @@ type Node struct { // NewNode creates a new DHT node func NewNode(opts ...Option) (n *Node, err error) { - id := randomInfoHash() + id := GenInfohash() k, err := newRoutingTable(id, 2000) if err != nil { @@ -159,7 +160,7 @@ func (n *Node) bootstrap() { n.log.Error("failed to parse bootstrap address", "error", err) continue } - rn := &remoteNode{address: addr} + rn := &remoteNode{addr: addr} n.findNode(rn, n.id) } } @@ -186,7 +187,7 @@ func (n *Node) packetWriter() { } func (n *Node) findNode(rn *remoteNode, id Infohash) { - target := randomInfoHash() + target := GenInfohash() n.sendQuery(rn, "find_node", map[string]interface{}{ "id": string(id), "target": string(target), @@ -207,10 +208,9 @@ func (n *Node) sendQuery(rn *remoteNode, qType string, a map[string]interface{}) return nil } - t := newTransactionID() - //n.log.Debug("sending message", "type", qType, "remote", rn) + t := krpc.NewTransactionID() - data := makeQuery(t, qType, a) + data := krpc.MakeQuery(t, qType, a) b, err := bencode.Encode(data) if err != nil { return err @@ -218,43 +218,38 @@ func (n *Node) sendQuery(rn *remoteNode, qType string, a map[string]interface{}) //fmt.Printf("sending %s to %s\n", qType, rn.String()) n.packetsOut <- packet{ data: b, - raddr: rn.address, + raddr: rn.addr, } return nil } // Parse a KRPC packet into a message -func (n *Node) processPacket(p packet) { - data, err := bencode.Decode(p.data) +func (n *Node) processPacket(p packet) error { + response, _, err := bencode.DecodeDict(p.data, 0) if err != nil { - return - } - - response, ok := data.(map[string]interface{}) - if !ok { - n.log.Debug("failed to parse packet", "error", "response is not dict") - return + return err } - if err := checkKeys(response, [][]string{{"t", "string"}, {"y", "string"}}); err != nil { - n.log.Debug("failed to parse packet", "error", err) - return + y, err := krpc.GetString(response, "y") + if err != nil { + return err } - switch response["y"].(string) { + switch y { case "q": err = n.handleRequest(p.raddr, response) case "r": err = n.handleResponse(p.raddr, response) case "e": - n.handleError(p.raddr, response) + err = n.handleError(p.raddr, response) default: n.log.Warn("missing request type") - return + return nil } if err != nil { n.log.Warn("failed to process packet", "error", err) } + return err } // bencode data and send @@ -265,24 +260,24 @@ func (n *Node) queueMsg(rn remoteNode, data map[string]interface{}) error { } n.packetsOut <- packet{ data: b, - raddr: rn.address, + raddr: rn.addr, } return nil } // handleRequest handles the requests received from udp. func (n *Node) handleRequest(addr net.Addr, m map[string]interface{}) error { - q, err := getStringKey(m, "q") + q, err := krpc.GetString(m, "q") if err != nil { return err } - a, err := getMapKey(m, "a") + a, err := krpc.GetMap(m, "a") if err != nil { return err } - id, err := getStringKey(a, "id") + id, err := krpc.GetString(a, "id") if err != nil { return err } @@ -296,7 +291,7 @@ func (n *Node) handleRequest(addr net.Addr, m map[string]interface{}) error { return nil } - rn := &remoteNode{address: addr, id: *ih} + rn := &remoteNode{addr: addr, id: *ih} switch q { case "ping": @@ -318,11 +313,11 @@ func (n *Node) handleRequest(addr net.Addr, m map[string]interface{}) error { // handleResponse handles responses received from udp. func (n *Node) handleResponse(addr net.Addr, m map[string]interface{}) error { - r, err := getMapKey(m, "r") + r, err := krpc.GetMap(m, "r") if err != nil { return err } - id, err := getStringKey(r, "id") + id, err := krpc.GetString(r, "id") if err != nil { return err } @@ -331,9 +326,9 @@ func (n *Node) handleResponse(addr net.Addr, m map[string]interface{}) error { return err } - rn := &remoteNode{address: addr, id: *ih} + rn := &remoteNode{addr: addr, id: *ih} - nodes, err := getStringKey(r, "nodes") + nodes, err := krpc.GetString(r, "nodes") // find_nodes/get_peers response with nodes if err == nil { n.onFindNodeResponse(*rn, m) @@ -342,12 +337,12 @@ func (n *Node) handleResponse(addr net.Addr, m map[string]interface{}) error { return nil } - values, err := getListKey(r, "values") + values, err := krpc.GetList(r, "values") // get_peers response if err == nil { n.log.Debug("get_peers response", "source", rn) for _, v := range values { - addr := decodeCompactNodeAddr(v.(string)) + addr := krpc.DecodeCompactNodeAddr(v.(string)) n.log.Debug("unhandled get_peer request", "addres", addr) // TODO new peer needs to be matched to previous get_peers request @@ -359,20 +354,20 @@ func (n *Node) handleResponse(addr net.Addr, m map[string]interface{}) error { } // handleError handles errors received from udp. -func (n *Node) handleError(addr net.Addr, m map[string]interface{}) bool { - if err := checkKey(m, "e", "list"); err != nil { - return false +func (n *Node) handleError(addr net.Addr, m map[string]interface{}) error { + e, err := krpc.GetList(m, "e") + if err != nil { + return err } - e := m["e"].([]interface{}) if len(e) != 2 { - return false + return fmt.Errorf("error packet wrong length %d", len(e)) } code := e[0].(int64) msg := e[1].(string) n.log.Debug("error packet", "address", addr.String(), "code", code, "error", msg) - return true + return nil } // Process another node's response to a find_node query. @@ -392,7 +387,7 @@ func (n *Node) processFindNodeResults(rn remoteNode, nodeList string) { // We got a byte array in groups of 26 or 38 for i := 0; i < len(nodeList); i += nodeLength { id := nodeList[i : i+ihLength] - addrStr := decodeCompactNodeAddr(nodeList[i+ihLength : i+nodeLength]) + addrStr := krpc.DecodeCompactNodeAddr(nodeList[i+ihLength : i+nodeLength]) ih, err := InfohashFromString(id) if err != nil { @@ -406,7 +401,7 @@ func (n *Node) processFindNodeResults(rn remoteNode, nodeList string) { continue } - rn := &remoteNode{address: addr, id: *ih} + rn := &remoteNode{addr: addr, id: *ih} n.rTable.add(rn) } } diff --git a/dht/peer.go b/dht/peer.go index f9669ba..42e8438 100644 --- a/dht/peer.go +++ b/dht/peer.go @@ -1,13 +1,18 @@ package dht -import "fmt" +import ( + "fmt" + "net" +) // Peer on DHT network type Peer struct { - Node remoteNode + Addr net.Addr + ID Infohash Infohash Infohash } +// String implements fmt.Stringer func (p Peer) String() string { - return fmt.Sprintf("%s (%s)", p.Infohash, p.Node) + return fmt.Sprintf("%s (%s)", p.Infohash, p.Addr.String()) } diff --git a/dht/remote_node.go b/dht/remote_node.go index 5bb2585..4bb9319 100644 --- a/dht/remote_node.go +++ b/dht/remote_node.go @@ -6,12 +6,11 @@ import ( ) type remoteNode struct { - address net.Addr - id Infohash - //lastSeen time.Time + addr net.Addr + id Infohash } // String implements fmt.Stringer func (r remoteNode) String() string { - return fmt.Sprintf("%s (%s)", r.id.String(), r.address.String()) + return fmt.Sprintf("%s (%s)", r.id.String(), r.addr.String()) } diff --git a/dht/routing_table.go b/dht/routing_table.go index 3c1f2d2..b10574c 100644 --- a/dht/routing_table.go +++ b/dht/routing_table.go @@ -73,10 +73,10 @@ func (k *routingTable) add(rn *remoteNode) { k.Lock() defer k.Unlock() - if _, ok := k.addresses[rn.address.String()]; ok { + if _, ok := k.addresses[rn.addr.String()]; ok { return } - k.addresses[rn.address.String()] = rn + k.addresses[rn.addr.String()] = rn item := &rItem{ value: rn, @@ -88,7 +88,7 @@ func (k *routingTable) add(rn *remoteNode) { if len(k.items) > k.max { for i := k.max - 1; i < len(k.items); i++ { old := k.items[i] - delete(k.addresses, old.value.address.String()) + delete(k.addresses, old.value.addr.String()) heap.Remove(&k.items, i) } } diff --git a/dht/routing_table_test.go b/dht/routing_table_test.go index 77c0a17..1eeeca3 100644 --- a/dht/routing_table_test.go +++ b/dht/routing_table_test.go @@ -32,7 +32,7 @@ func TestPriorityQueue(t *testing.T) { t.Errorf("failed to create infohash: %s\n", err) } addr, _ := net.ResolveUDPAddr("udp", fmt.Sprintf("0.0.0.0:%d", i)) - pq.add(&remoteNode{id: *iht, address: addr}) + pq.add(&remoteNode{id: *iht, addr: addr}) } if len(pq.items) != len(pq.addresses) { diff --git a/http.go b/http.go deleted file mode 100644 index 32fdae9..0000000 --- a/http.go +++ /dev/null @@ -1,299 +0,0 @@ -package dhtsearch - -import ( - "encoding/json" - "expvar" - "fmt" - "net/http" - "strconv" -) - -type results struct { - Page int `json:"page"` - PageSize int `json:"page_size"` - Torrents []Torrent `json:"torrents"` -} - -func indexHandler(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Cache-Control", "public") - if r.URL.Path != "/" { - w.WriteHeader(404) - return - } - w.Header().Set("Content-Type", "text/html; charset=utf-8") - w.WriteHeader(200) - w.Write(html) -} - -func statsHandler(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", "application/json; charset=utf-8") - w.Header().Set("Cache-Control", "no-cache") - w.WriteHeader(200) - fmt.Fprintf(w, "{") - first := true - expvar.Do(func(kv expvar.KeyValue) { - if kv.Key == "cmdline" || kv.Key == "memstats" { - return - } - if !first { - fmt.Fprintf(w, ",") - } - first = false - fmt.Fprintf(w, "%q: %s", kv.Key, kv.Value) - }) - fmt.Fprintf(w, "}") -} - -func searchHandler(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", "application/json; charset=utf-8") - w.Header().Set("Cache-Control", "no-cache") - - offset := 0 - page := 1 - var err error - pStr := r.URL.Query().Get("page") - if pStr != "" { - page, err = strconv.Atoi(pStr) - if err != nil { - fmt.Printf("Failed to parse page: %q\n", err) - } - offset = (page - 1) * 50 - } - - if q := r.URL.Query().Get("q"); q != "" { - torrents, err := torrentsByName(q, offset) - if err != nil { - w.WriteHeader(500) - fmt.Printf("Error: %q\n", err) - return - } - w.WriteHeader(200) - json.NewEncoder(w).Encode(results{Page: page, PageSize: Config.ResultsPageSize, Torrents: torrents}) - return - } - - if tag := r.URL.Query().Get("tag"); tag != "" { - torrents, err := torrentsByTag(tag, offset) - if err != nil { - w.WriteHeader(500) - fmt.Printf("Error: %q\n", err) - return - } - w.WriteHeader(200) - json.NewEncoder(w).Encode(results{Page: page, PageSize: Config.ResultsPageSize, Torrents: torrents}) - return - } - - w.WriteHeader(406) - json.NewEncoder(w).Encode("Query required") -} - -var html = []byte(` -<!doctype html> -<html> - <head> - <title>DHT search</title> - <meta charset="UTF-8"> - <meta name="viewport" content="width=device-width, initial-scale=1.0, maximum-scale=1"> - <style> - * { box-sizing:border-box; } - body { padding:0;margin:0;color:#666;line-height:1.5;font-size:16px;font-family:sans-serif; } - ul { list-style:none;padding:0;margin:0; } - a { color:#000;text-decoration:none; } - .header { padding:1em;border-bottom:1px solid #555;background-color:#eee; } - .search { display:flex;float:left;margin:0;padding:0; } - .search__input { min-width:0;font-size:1.25em;padding:2px; } - .page { padding:1em; } - .torrent { margin-bottom:1em; } - .torrent__name { display:block;overflow:hidden; } - .search__button, .torrent__magnet { height:32px;width:32px;background: url() no-repeat top left; } - .search__button { flex-shrink:0;display:inline-block;border:none;margin-left:5px;margin-right:5px; } - *:focus { outline:none; } - .search__button--active { animation-name:spin;animation-duration:3s;animation-iteration-count:infinite;-webkit-animation-name:spin;-webkit-animation-duration:3s;-webkit-animation-iteration-count:infinite; } - .torrent__magnet { display:block;float:left;margin-top:4px;margin-right:.5em; } - .torrent__size, .torrent__file-count, .torrent__seen, .torrent__tags { display:inline-block;padding-right:1em;white-space:nowrap; } - .tag { display:inline; } - .files { padding-left:2em;font-family:monospace;font-size:.75em; } - .files { display:none; } - .files--active { display:block; } - .file__size { margin-left:.5em;font-size:.875em; } - .stats { display:block;margin:0;margin-top:1em; } - .stats__key, .stats__value { float:left;padding:0;margin:0; } - .stats__key { clear:left;margin-right:.25em;color:#222;white-space:nowrap; } - .stats__key:after { content:':'; } - .stats__value { margin-right:.5em;color:#888; } - - .site-nav:after { content:'';display:table;clear:both; } - .site-nav__open, .site-nav__close { position:absolute;top:1.25em;right:1em;color:#666; } - .site-nav__close { left:1em; } - .site-nav__open:before { content:'☰ ' } - .site-nav__target { position:fixed;top:0;left:0; } - .site-nav__target:target + .site-nav__drawer { transform:none; } - .site-nav__drawer { position:fixed;top:0;right:0;bottom:0;margin:0;padding:1em;padding-top:2em;width:300px;float:right;background-color:#e8e8e8;overflow:visible;z-index:1;transition:0.2s;will-change:tranform;transform:translateX(100%); } - @keyframes spin { from {transform:rotate(0deg);} to {transform:rotate(360deg);} } - @-webkit-keyframes spin { from {transform:rotate(0deg);} to {transform:rotate(360deg);} } - </style> - </head> - <body id="body"> - <div class="header"> - <nav class="site-nav"> - <div class="search"> - <input id="search" class="search__input" type="text" name="search" placeholder="Search" /> - <button id="go" class="search__button"></button> - </div> - <a href="#trigger:nav" class="site-nav__open">Stats</a> - <a class="site-nav__target" id="trigger:nav"></a> - <div class="site-nav__drawer"> - <a href="#0" class="site-nav__close">Close</a> - <dl id="stats" class="stats"></dl> - </div> - </nav> - </div> - <div id="page" class="page"> - </div> - <script> -var interval = 5000 -var showSize = function (bytes) { - if (bytes === 0) { - return '0B' - } - var sizes = ['B', 'KB', 'MB', 'GB', 'TB'] - var sizeIndex = parseInt(Math.floor(Math.log(bytes) / Math.log(1024)), 10) - if (sizeIndex >= sizes.length) { - sizeIndex = sizes.length - 1 - } - return (bytes / Math.pow(1024, sizeIndex)).toFixed(0) + sizes[sizeIndex] -} -var showDuration = function (seconds) { - var s = parseInt(seconds, 10) - var d = Math.floor(s / 86400) - var h = Math.floor((s -= (d * 86400)) / 3600) - var m = Math.floor((s -= (h * 3600)) / 60) - return d + 'd ' + h + 'h ' + m + 'm ' + (s - (m * 60)) + 's' -} -var showRate = function (o, n) { - o = o || 0 - var rate = (n - o) / (interval / 1000) - return showSize(rate) + '/s' -} -var processResponse = function (resp) { - if (resp.status === 200 || resp.status === 0) { - return resp.json() - } else { - return new Error(resp.statusText) - } -} -var search = function (term, page) { - if (term.length < 3) { - return - } - bEl.classList.add('search__button--active') - page = parseInt(page, 10) || 1 - var query = '' - var tIdx = term.indexOf('tag:') - if (tIdx >= 0) { - query = 'tag=' + term.slice(tIdx+4).trim() - } else { - query = 'q=' + term.trim() - } - query += '&page=' + page - fetch('/search?' + query) - .then(processResponse) - .then(function (data) { - var pre = [ - '<p>Displaying ', data.torrents.length, ' torrents. ', - (data.page > 1) ? '<a class="pager prev" href="#">Previous</a> ' : '', - (data.torrents.length === data.page_size) ? '<a class="pager next" href="#">Next</a>' : '', - '</p>', - '<ul id="results">' - ].join('') - var torrents = data.torrents.map(function (t) { - var magnet = 'magnet:?xt=urn:btih:' + t.infohash - return [ - '<li class="torrent">', - '<a class="torrent__magnet" href="', magnet, '"></a>', - '<a class="torrent__name" href="', magnet, '">', t.name, '</a>', - '<span class="torrent__size">Size: ', showSize(t.size), '</span>', - '<span class="torrent__seen">Last seen: <time datetime="', t.seen, '">', new Date(t.seen).toLocaleString(), '</time></span>', - '<span class="torrent__tags">Tags: ', - t.tags.map(function (g) { return '<a class="tag" href="/tags/' + g + '">' + g + '</a>' }).join(', '), - '</span>', - t.files.length === 0 ? '' : ['<a class="torrent__file-count toggler" href="#">Files: ', t.files.length, '</a>'].join(''), - '<ul class="files">', - t.files.map(function (f) { - return [ - '<li class="files__file file">', - '<span class="file__path">', f.path, '</span>', - '<span class="file__size">[', showSize(f.size), ']</span>', - '</li>' - ].join('') - }).join(''), - '</ul>', - '</li>' - ].join('') - }).join('') - var post = [ - '</ul><p>', - (data.page > 1) ? '<a class="pager prev" href="#">Previous</a> ' : '', - (data.torrents.length === data.page_size) ? '<a class="pager next" href="#">Next</a>' : '', - '</p>'].join('') - pEl.innerHTML = pre + torrents + post - bEl.classList.remove('search__button--active') - var togglers = document.getElementsByClassName('toggler') - for (var i = 0; i < togglers.length; i += 1) { - var el = togglers[i] - el.addEventListener('click', function (e) { - e.preventDefault() - e.target.nextElementSibling.classList.toggle('files--active') - }) - } - var pagers = document.getElementsByClassName('pager') - for (var i = 0; i < pagers.length; i += 1) { - var el = pagers[i] - el.addEventListener('click', function (e) { - e.preventDefault() - if (el.classList.contains('next')) { - search(term, data.page + 1) - } else { - search(term, data.page - 1) - } - }) - } - }) -} -var pEl = document.getElementById('page') -var sEl = document.getElementById('search') -var bEl = document.getElementById('go') -bEl.addEventListener('click', function () { - var term = sEl.value - search(term) -}) -sEl.addEventListener('keyup', function (e) { - if (e.keyCode === 13) { - bEl.click() - } -}) -var statsEl = document.getElementById('stats') -var oldStats = {} -var getStats = function () { - fetch('/stats') - .then(processResponse) - .then(function (data) { - statsEl.innerHTML = Object.keys(data).map(function (k) { - return [ - '<dt class="stats__key">', - k.replace(/_/g,' '), - '</dt><dd class="stats__value">', - k.indexOf('bytes') < 0 ? k.indexOf('time') < 0 ? data[k] : showDuration(data[k]) : showSize(data[k]) + ' (' + showRate(oldStats[k], data[k]) + ')', - '</dd>' - ].join('') - }).join('') - oldStats = data - setTimeout(getStats, interval) - }) -} -getStats() - </script> - </body> -</html> -`) diff --git a/infohash_test.go b/infohash_test.go deleted file mode 100644 index b62f3e2..0000000 --- a/infohash_test.go +++ /dev/null @@ -1,43 +0,0 @@ -package dhtsearch - -import ( - "testing" -) - -var hashes = []struct { - s string - valid bool -}{ - {"59066769b9ad42da2e508611c33d7c4480b3857b", true}, - {"59066769b9ad42da2e508611c33d7c4480b3857", false}, - {"59066769b9ad42da2e508611c33d7c4480b385", false}, - {"59066769b9ad42da2e508611c33d7c4480b3857k", false}, - {"5906676b99a4d2d2ae506811c33d7c4480b8357b", true}, -} - -func TestGenNeighbour(t *testing.T) { - for _, test := range hashes { - r := genNeighbour(test.s) - if r != test.valid { - t.Errorf("isValidInfoHash(%q) => %v expected %v", test.s, r, test.valid) - } - } -} - -func TestIsValidInfoHash(t *testing.T) { - for _, test := range hashes { - r := isValidInfoHash(test.s) - if r != test.valid { - t.Errorf("isValidInfoHash(%q) => %v, expected %v", test.s, r, test.valid) - } - } -} - -func TestDecodeInfoHash(t *testing.T) { - for _, test := range hashes { - _, err := decodeInfoHash(test.s) - if (err == nil) != test.valid { - t.Errorf("decodeInfoHash(%q) => %v expected %v", test.s, err, test.valid) - } - } -} diff --git a/dht/krpc.go b/krpc/krpc.go index 2a7c103..a766fcf 100644 --- a/dht/krpc.go +++ b/krpc/krpc.go @@ -1,4 +1,4 @@ -package dht +package krpc import ( "errors" @@ -10,7 +10,7 @@ import ( const transIDBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ" -func newTransactionID() string { +func NewTransactionID() string { b := make([]byte, 2) for i := range b { b[i] = transIDBytes[rand.Int63()%int64(len(transIDBytes))] @@ -19,25 +19,25 @@ func newTransactionID() string { } // makeQuery returns a query-formed data. -func makeQuery(t, q string, a map[string]interface{}) map[string]interface{} { +func MakeQuery(transaction, query string, data map[string]interface{}) map[string]interface{} { return map[string]interface{}{ - "t": t, + "t": transaction, "y": "q", - "q": q, - "a": a, + "q": query, + "a": data, } } // makeResponse returns a response-formed data. -func makeResponse(t string, r map[string]interface{}) map[string]interface{} { +func MakeResponse(transaction string, data map[string]interface{}) map[string]interface{} { return map[string]interface{}{ - "t": t, + "t": transaction, "y": "r", - "r": r, + "r": data, } } -func getStringKey(data map[string]interface{}, key string) (string, error) { +func GetString(data map[string]interface{}, key string) (string, error) { val, ok := data[key] if !ok { return "", fmt.Errorf("krpc: missing key %s", key) @@ -49,7 +49,7 @@ func getStringKey(data map[string]interface{}, key string) (string, error) { return out, nil } -func getIntKey(data map[string]interface{}, key string) (int, error) { +func GetInt(data map[string]interface{}, key string) (int, error) { val, ok := data[key] if !ok { return 0, fmt.Errorf("krpc: missing key %s", key) @@ -61,7 +61,7 @@ func getIntKey(data map[string]interface{}, key string) (int, error) { return out, nil } -func getMapKey(data map[string]interface{}, key string) (map[string]interface{}, error) { +func GetMap(data map[string]interface{}, key string) (map[string]interface{}, error) { val, ok := data[key] if !ok { return nil, fmt.Errorf("krpc: missing key %s", key) @@ -73,7 +73,7 @@ func getMapKey(data map[string]interface{}, key string) (map[string]interface{}, return out, nil } -func getListKey(data map[string]interface{}, key string) ([]interface{}, error) { +func GetList(data map[string]interface{}, key string) ([]interface{}, error) { val, ok := data[key] if !ok { return nil, fmt.Errorf("krpc: missing key %s", key) @@ -125,7 +125,7 @@ func checkKey(data map[string]interface{}, key string, t string) error { } // Swiped from nictuku -func decodeCompactNodeAddr(cni string) string { +func DecodeCompactNodeAddr(cni string) string { if len(cni) == 6 { return fmt.Sprintf("%d.%d.%d.%d:%d", cni[0], cni[1], cni[2], cni[3], (uint16(cni[4])<<8)|uint16(cni[5])) } else if len(cni) == 18 { @@ -136,7 +136,7 @@ func decodeCompactNodeAddr(cni string) string { } } -func encodeCompactNodeAddr(addr string) string { +func EncodeCompactNodeAddr(addr string) string { var a []uint8 host, port, _ := net.SplitHostPort(addr) ip := net.ParseIP(host) diff --git a/dht/krpc_test.go b/krpc/krpc_test.go index 5bc8373..c46d70c 100644 --- a/dht/krpc_test.go +++ b/krpc/krpc_test.go @@ -1,4 +1,4 @@ -package dht +package krpc import ( "encoding/hex" diff --git a/models/tag.go b/models/tag.go new file mode 100644 index 0000000..3675c75 --- /dev/null +++ b/models/tag.go @@ -0,0 +1,5 @@ +package models + +type tagStore interface { + saveTag(string) (int, error) +} diff --git a/models/torrent.go b/models/torrent.go new file mode 100644 index 0000000..960a6de --- /dev/null +++ b/models/torrent.go @@ -0,0 +1,98 @@ +package models + +import ( + "bytes" + "crypto/sha1" + "encoding/hex" + "fmt" + "os" + "strings" + "time" + + "github.com/felix/dhtsearch/bencode" + "github.com/felix/dhtsearch/dht" + "github.com/felix/dhtsearch/krpc" +) + +// Data for persistent storage +type Torrent struct { + ID int `json:"-"` + InfoHash string `json:"infohash"` + Name string `json:"name"` + Files []File `json:"files" db:"-"` + Size int `json:"size"` + Seen time.Time `json:"seen"` + Tags []string `json:"tags" db:"-"` +} + +type File struct { + ID int `json:"-"` + Path string `json:"path"` + Size int `json:"size"` + TorrentID int `json:"torrent_id" db:"torrent_id"` +} + +type torrentStore interface { + saveTorrent(*Torrent) error + torrentsByHash(hashes dht.Infohash, offset, limit int) (*Torrent, error) + torrentsByName(query string, offset, limit int) ([]*Torrent, error) + torrentsByTags(tags []string, offset, limit int) ([]*Torrent, error) +} + +func validMetadata(ih dht.Infohash, md []byte) bool { + info := sha1.Sum(md) + return bytes.Equal([]byte(ih), info[:]) +} + +func TorrentFromMetadata(ih dht.Infohash, md []byte) (*Torrent, error) { + if !validMetadata(ih, md) { + return nil, fmt.Errorf("infohash does not match metadata") + } + info, _, err := bencode.DecodeDict(md, 0) + if err != nil { + return nil, err + } + + // Get the directory or advisory filename + name, err := krpc.GetString(info, "name") + if err != nil { + return nil, err + } + + bt := Torrent{ + InfoHash: hex.EncodeToString([]byte(ih)), + Name: name, + } + + if files, err := krpc.GetList(info, "files"); err == nil { + // Multiple file mode + bt.Files = make([]File, len(files)) + + // Files is a list of dicts + for i, item := range files { + file := item.(map[string]interface{}) + + // Paths is a list of strings + paths := file["path"].([]interface{}) + path := make([]string, len(paths)) + for j, p := range paths { + path[j] = p.(string) + } + + fSize := file["length"].(int) + bt.Files[i] = File{ + // Assume Unix path sep? + Path: strings.Join(path[:], string(os.PathSeparator)), + Size: fSize, + } + // Ensure the torrent size totals all files' + bt.Size = bt.Size + fSize + } + } else if length, err := krpc.GetInt(info, "length"); err == nil { + // Single file mode + bt.Size = length + } else { + return nil, fmt.Errorf("found neither length or files") + } + return &bt, nil +} diff --git a/torrent.go b/torrent.go deleted file mode 100644 index bb8dcff..0000000 --- a/torrent.go +++ /dev/null @@ -1,190 +0,0 @@ -package dhtsearch - -import ( - "fmt" - "time" -) - -// Data for persistent storage -type Torrent struct { - Id int `json:"-"` - InfoHash string `json:"infohash"` - Name string `json:"name"` - Files []File `json:"files" db:"-"` - Size int `json:"size"` - Seen time.Time `json:"seen"` - Tags []string `json:"tags" db:"-"` -} - -type File struct { - Id int `json:"-"` - Path string `json:"path"` - Size int `json:"size"` - TorrentId int `json:"torrent_id" db:"torrent_id"` -} - -func torrentExists(ih string) bool { - rows, err := DB.Query(sqlGetTorrent, fmt.Sprintf("%s", ih)) - defer rows.Close() - if err != nil { - fmt.Printf("Failed to exec SQL: %q\n", err) - return false - } - return rows.Next() -} - -func (t *Torrent) save() error { - tx, err := DB.Begin() - if err != nil { - fmt.Printf("Transaction err %q\n", err) - } - defer tx.Commit() - - var torrentId int - - // Need to turn infohash into string here - err = tx.QueryRow(sqlInsertTorrent, t.Name, fmt.Sprintf("%s", t.InfoHash), t.Size).Scan(&torrentId) - if err != nil { - tx.Rollback() - return err - } - - // Write tags - for _, tag := range t.Tags { - tagId, err := createTag(tag) - if err != nil { - tx.Rollback() - return err - } - _, err = tx.Exec(sqlInsertTagTorrent, tagId, torrentId) - if err != nil { - tx.Rollback() - return err - } - } - - // Write files - for _, f := range t.Files { - _, err := tx.Exec(sqlInsertFile, torrentId, f.Path, f.Size) - if err != nil { - tx.Rollback() - return err - } - } - - // Should this be outside the transaction? - tx.Exec(sqlUpdateFTSVectors, torrentId) - if err != nil { - tx.Rollback() - return err - } - return nil -} - -// Fill in a torrents dependant data -func (t *Torrent) load() (err error) { - // Files - t.Files = []File{} - err = DB.Select(&t.Files, sqlSelectFiles, t.Id) - if err != nil { - fmt.Printf("Error selecting files %s\n", err) - } - // t.Files = files - - // Tags - t.Tags = []string{} - err = DB.Select(&t.Tags, sqlSelectTags, t.Id) - if err != nil { - fmt.Printf("Error selecting tags %s\n", err) - } - return -} - -func torrentsByName(query string, offset int) ([]Torrent, error) { - torrents := []Torrent{} - err := DB.Select(&torrents, sqlSearchTorrents, fmt.Sprintf("%%%s%%", query), offset) - if err != nil { - return nil, err - } - fmt.Printf("Search for %q returned %d torrents\n", query, len(torrents)) - - for idx, _ := range torrents { - torrents[idx].load() - } - return torrents, nil -} - -func torrentsByTag(tag string, offset int) ([]Torrent, error) { - torrents := []Torrent{} - err := DB.Select(&torrents, sqlTorrentsByTag, tag, offset) - if err != nil { - return nil, err - } - fmt.Printf("Search for tag %q returned %d torrents\n", tag, len(torrents)) - - for idx, _ := range torrents { - torrents[idx].load() - } - return torrents, nil -} - -const ( - sqlGetTorrent = `update torrents - set seen = now() - where infohash = $1 - returning id` - - sqlInsertTorrent = `insert into torrents ( - name, infohash, size, seen - ) values ( - $1, $2, $3, now() - ) on conflict (infohash) do - update set seen = now() - returning id` - - sqlUpdateFTSVectors = `update torrents - set tsv = sub.tsv from ( - select t.id, - setweight(to_tsvector(translate(t.name, '._-', ' ')), 'A') || - setweight(to_tsvector(translate(string_agg(coalesce(f.path, ''), ' '), './_-', ' ')), 'B') as tsv - from torrents t - left join files f on t.id = f.torrent_id - where t.id = $1 - group by t.id - ) as sub - where sub.id = torrents.id` - - sqlSearchTorrents = ` - select t.id, t.infohash, t.name, t.size, t.seen - from torrents t - where t.tsv @@ plainto_tsquery($1) - order by ts_rank(tsv, plainto_tsquery($1)) desc, t.seen desc - limit 50 offset $2` - - sqlTorrentsByTag = ` - select t.id, t.infohash, t.name, t.size, t.seen - from torrents t - inner join tags_torrents tt on t.id = tt.torrent_id - inner join tags ta on tt.tag_id = ta.id - where ta.name = $1 group by t.id - order by seen desc - limit 50 offset $2` - - sqlSelectFiles = `select * from files - where torrent_id = $1 - order by path asc` - - sqlInsertFile = `insert into files ( - torrent_id, path, size - ) values($1, $2, $3)` - - sqlSelectTags = `select name - from tags t - inner join tags_torrents tt on t.id = tt.tag_id - where tt.torrent_id = $1` - - sqlInsertTagTorrent = `insert into tags_torrents ( - tag_id, torrent_id - ) values ($1, $2) - on conflict do nothing` -) diff --git a/util.go b/util.go deleted file mode 100644 index 1548fab..0000000 --- a/util.go +++ /dev/null @@ -1,92 +0,0 @@ -package dhtsearch - -import ( - "errors" - "fmt" - "net" -) - -// makeQuery returns a query-formed data. -func makeQuery(t, q string, a map[string]interface{}) map[string]interface{} { - return map[string]interface{}{ - "t": t, - "y": "q", - "q": q, - "a": a, - } -} - -// makeResponse returns a response-formed data. -func makeResponse(t string, r map[string]interface{}) map[string]interface{} { - return map[string]interface{}{ - "t": t, - "y": "r", - "r": r, - } -} - -// parseKeys parses keys. It just wraps parseKey. -func parseKeys(data map[string]interface{}, pairs [][]string) error { - for _, args := range pairs { - key, t := args[0], args[1] - if err := parseKey(data, key, t); err != nil { - return err - } - } - return nil -} - -// parseKey parses the key in dict data. `t` is type of the keyed value. -// It's one of "int", "string", "map", "list". -func parseKey(data map[string]interface{}, key string, t string) error { - val, ok := data[key] - if !ok { - return errors.New("lack of key") - } - - switch t { - case "string": - _, ok = val.(string) - case "int": - _, ok = val.(int) - case "map": - _, ok = val.(map[string]interface{}) - case "list": - _, ok = val.([]interface{}) - default: - panic("invalid type") - } - - if !ok { - return errors.New("invalid key type") - } - - return nil -} - -// parseMessage parses the basic data received from udp. -// It returns a map value. -func parseMessage(data interface{}) (map[string]interface{}, error) { - response, ok := data.(map[string]interface{}) - if !ok { - return nil, errors.New("response is not dict") - } - - if err := parseKeys(response, [][]string{{"t", "string"}, {"y", "string"}}); err != nil { - return nil, err - } - - return response, nil -} - -// Swiped from nictuku -func compactNodeInfoToString(cni string) string { - if len(cni) == 6 { - return fmt.Sprintf("%d.%d.%d.%d:%d", cni[0], cni[1], cni[2], cni[3], (uint16(cni[4])<<8)|uint16(cni[5])) - } else if len(cni) == 18 { - b := []byte(cni[:16]) - return fmt.Sprintf("[%s]:%d", net.IP.String(b), (uint16(cni[16])<<8)|uint16(cni[17])) - } else { - return "" - } -} |
