aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFelix Hanley <felix@userspace.com.au>2018-02-26 11:29:38 +0000
committerFelix Hanley <felix@userspace.com.au>2018-02-26 11:29:38 +0000
commitdc5701ed047482eab793c7915f252fb330427df4 (patch)
treec48fab60493ef14f3d1f38a8eda50221b1daa9b3
parentfa772a105545f80e64be3554300e5ae8548d8fd2 (diff)
downloaddhtsearch-dc5701ed047482eab793c7915f252fb330427df4.tar.gz
dhtsearch-dc5701ed047482eab793c7915f252fb330427df4.tar.bz2
Get basic DHT monitoring working with peer announces
-rw-r--r--Makefile4
-rw-r--r--cmd/main.go212
2 files changed, 120 insertions, 96 deletions
diff --git a/Makefile b/Makefile
index d13acb9..2fbc4c2 100644
--- a/Makefile
+++ b/Makefile
@@ -6,9 +6,9 @@ SRC=$(shell find . -type f -name '*.go')
build: $(BINARY)
$(BINARY): $(SRC)
- go build -ldflags "-w -s \
+ cd cmd && go build -ldflags "-w -s \
-X main.version=$(VERSION)" \
- -o $(BINARY)
+ -o ../$(BINARY)
test:
go test -short -coverprofile=coverage.out
go tool cover -html=coverage.out -o coverage.html
diff --git a/cmd/main.go b/cmd/main.go
index a08c7f5..80ef9eb 100644
--- a/cmd/main.go
+++ b/cmd/main.go
@@ -4,117 +4,141 @@ import (
"flag"
"fmt"
"os"
- "path/filepath"
"time"
- "github.com/felix/dhtsearch"
- "github.com/felix/dhtsearch/crawler"
+ "github.com/felix/dhtsearch/bt"
+ "github.com/felix/dhtsearch/dht"
+ "github.com/felix/dhtsearch/models"
"github.com/felix/logger"
- "github.com/jawher/mow.cli"
)
var (
version string
+ log logger.Logger
+)
+
+// DHT vars
+var (
+ debug bool
+ port int
+ ipv6 bool
+ dhtNodes int
+ showVersion bool
+)
+
+// Torrent vars
+var (
+ pool chan chan models.Peer
+ torrents chan models.Torrent
+ btNodes int
)
func main() {
- app := cli.App("dhtsearch", "Crawl the DHT network")
-
- app.Version("version", version)
-
- var (
- port = app.Int(cli.IntOpt{
- Name: "p port",
- Value: 6881,
- EnvVar: "PORT",
- Desc: "listen port (and first of multiple ports)",
- })
- nodes = app.Int(cli.IntOpt{
- Name: "n nodes",
- Vale: 1,
- EnvVar: "NODES",
- Desc: "number of nodes to start",
- })
- debug = app.Bool(cli.BoolOpt{
- Name: "d debug",
- Value: false,
- EnvVar: "DEBUG",
- Desc: "show debug output",
- })
- dsn = app.String(cli.StringOpt{
- Name: "dsn",
- Value: "postgres://dht:dht@localhost/dht?sslmode=disable",
- EnvVar: "DSN",
- Desc: "postgres DSN",
- })
- httpHost = app.String(cli.StringOpt{
- Name: "http-host",
- Value: "localhost:6880",
- EnvVar: "HTTP_HOST",
- Desc: "HTTP listen address",
- })
- httpPort = app.Int(cli.IntOpt{
- Name: "http-port",
- Value: 6880,
- EnvVar: "HTTP_PORT",
- Desc: "HTTP listen port",
- })
- httpOff = app.Bool(cli.BoolOpt{
- Name: "disable-http",
- Value: false,
- EnvVar: "DISABLE_HTTP",
- Desc: "disable HTTP",
- })
- filterOff = app.Bool(cli.BoolOpt{
- Name: "disable-filter",
- Value: false,
- EnvVar: "DISABLE_FILTER",
- Desc: "disable HTTP",
- })
- tagFile = app.String(cli.StringOpt{
- Name: "tag-file",
- Value: "",
- EnvVar: "TAG_FILE",
- Desc: "file containingn custom tags",
- })
- skipTags = app.String(cli.StringOpt{
- Name: "skip-tags",
- Value: "adult",
- EnvVar: "SKIP_TAGS",
- Desc: "comma separated list of tags to skip",
- })
- )
-
- app.Action = func() {
- logOpts := &logger.Options{
- Name: "dhtsearch",
- Level: logger.Info,
- }
+ flag.IntVar(&port, "port", 6881, "listen port (and first for multiple nodes")
+ flag.BoolVar(&debug, "debug", false, "show debug output")
+ flag.BoolVar(&ipv6, "6", false, "listen on IPv6 also")
+ flag.IntVar(&dhtNodes, "dht-nodes", 1, "number of DHT nodes to start")
+
+ flag.IntVar(&btNodes, "bt-nodes", 3, "number of BT nodes to start")
+
+ flag.BoolVar(&showVersion, "v", false, "show version")
+
+ flag.Parse()
+
+ if showVersion {
+ fmt.Println(version)
+ os.Exit(0)
+ }
+
+ logOpts := &logger.Options{
+ Name: "dhtsearch",
+ Level: logger.Info,
+ }
+
+ if debug {
+ logOpts.Level = logger.Debug
+ }
+ log = logger.New(logOpts)
+ log.Info("version", version)
+ log.Debug("debugging")
+
+ go startDHTNodes(saveInfohash)
- if debug {
- logOpts.Level = logger.Debug
+ go startBTWorkers(saveTorrent)
+
+ for {
+ select {
+ case <-time.After(300 * time.Second):
+ fmt.Println("mark")
}
- log = logger.New(logOpts)
- log.Info("version", version)
-
- crawler, err := crawler.New(
- dsn,
- dhtsearch.SetLogger(log),
- dhtsearch.SetPort(port),
- dhtsearch.SetNodes(nodes),
+ }
+}
+
+func startDHTNodes(f func(p models.Peer)) (nodes []*dht.Node, err error) {
+ nodes = make([]*dht.Node, dhtNodes)
+
+ for i := 0; i < dhtNodes; i++ {
+ dht, err := dht.NewNode(
+ dht.SetLogger(log.Named("dht")),
+ dht.SetPort(port+i),
+ dht.SetOnAnnouncePeer(f),
+ dht.SetIPv6(ipv6),
)
if err != nil {
- log.Error("failed to create crawler", "error", err)
- cli.Exit(1)
+ log.Error("failed to create node", "error", err)
+ return nodes, err
}
+ go dht.Run()
+ nodes[i] = dht
+ }
+ return nodes, err
+}
+
+func saveInfohash(p models.Peer) {
+ //log.Info("announce", "peer", p)
+ // Blocks
+ w := <-pool
+ w <- p
+ return
+}
- err = server.Run()
+func saveTorrent(t models.Torrent) {
+ fmt.Printf("Torrent added, size: %d, name: %q, tags: %s, url: magnet:?xt=urn:btih:%s\n", t.Size, t.Name, t.Tags, t.Infohash)
+ // Add tags
+ //tagTorrent(&t)
+
+ /*
+ // Not sure if I like continue labels, so this
+ var discard = false
+ for _, tag := range Config.SkipTags {
+ if hasTag(t, tag) {
+ fmt.Printf("Skipping torrent tagged '%s': %q\n", tag, t.Name)
+ discard = true
+ break
+ }
+ }
+ if discard {
+ continue
+ }
+ */
+}
+
+func startBTWorkers(f func(t models.Torrent)) {
+ pool = make(chan chan models.Peer)
+ torrents = make(chan models.Torrent)
+
+ for i := 0; i < btNodes; i++ {
+ w, err := bt.NewWorker(
+ pool,
+ bt.SetLogger(log.Named("bt")),
+ bt.SetPort(port+i),
+ bt.SetIPv6(ipv6),
+ bt.SetOnNewTorrent(f),
+ )
if err != nil {
- log.Error("failed to run c", "error", err)
- cli.Exit(1)
+ log.Error("failed to create bt worker", "error", err)
+ return
}
- cli.Exit(0)
+ go w.Run()
}
-
- app.Run(os.Args)
}