From 7bdce49c72dc1a9a966629ab3b688757e21c50d1 Mon Sep 17 00:00:00 2001 From: velvettear Date: Fri, 22 Sep 2023 15:50:03 +0200 Subject: [PATCH] added support for file transfers from remote to local --- .vscode/launch.json | 17 +++++++++ README.md | 17 +++++---- settings/arguments.go | 33 +++++++++++++---- settings/variables.go | 9 +++++ tools/rsync.go | 81 ++++++++++++++++++++++++++---------------- tools/scanner.go | 82 +++++++++++++++++++++++++++++++++++++++---- tools/ssh.go | 5 ++- 7 files changed, 191 insertions(+), 53 deletions(-) diff --git a/.vscode/launch.json b/.vscode/launch.json index 415d164..46226a0 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -18,6 +18,23 @@ ], "console": "integratedTerminal" }, + { + "name": "gosync-remote", + "type": "go", + "request": "launch", + "mode": "auto", + "program": "${workspaceFolder}/main.go", + "args": [ + "192.168.1.11:/share/music/Fu Manchu/*", + "/tmp", + "--password", + "$Velvet90", + "--concurrency", + "4", + "--verbose", + ], + "console": "integratedTerminal" + }, { "name": "gosync-root", "type": "go", diff --git a/README.md b/README.md index fa53692..929d095 100644 --- a/README.md +++ b/README.md @@ -14,9 +14,14 @@ a simple wrapper for concurrent rsync processes written in golang ### options -| short | long | description | -| ----- | ------------- | ------------------------------------------- | -| -u | --user | set user for ssh / rsync | -| -p | --password | set password for ssh / rsync | -| -c | --concurrency | set limit for concurrent rsync processes | -| -v | --verbose | enable verbose / debug output | \ No newline at end of file +| short | long | description | default | +| ----- | ------------- | ----------------------------------------------- | ------------------- | +| -u | --user | set user for ssh / rsync | | +| -p | --password | set password for ssh / rsync | | +| -c | --concurrency | set limit for concurrent rsync processes | number of cpu cores | +| -d | --delay | set the delay between rsync connections (in ms) | 100 | +| -v | --verbose | enable verbose / debug output | | + +### troubleshooting + +if you experience errors like `kex_exchange_identification: read: Connection reset by peer` it may be helpful to increase the default delay (100ms) between rsync connections or decrease the concurrency. \ No newline at end of file diff --git a/settings/arguments.go b/settings/arguments.go index 5fc5380..f290c66 100644 --- a/settings/arguments.go +++ b/settings/arguments.go @@ -5,6 +5,7 @@ import ( "runtime" "strconv" "strings" + "time" "velvettear/gosync/help" "velvettear/gosync/log" ) @@ -31,17 +32,24 @@ func Initialize() { fallthrough case "--concurrency": index++ - var concurrency int if index < len(os.Args) { - tmp, error := strconv.Atoi(os.Args[index]) - if error == nil { - concurrency = tmp + concurrency, error := strconv.Atoi(os.Args[index]) + if error != nil { + break } + setConcurrency(concurrency) } - if concurrency == 0 { - concurrency = runtime.NumCPU() + case "-d": + fallthrough + case "--delay": + index++ + if index < len(os.Args) { + delay, error := strconv.Atoi(os.Args[index]) + if error != nil { + break + } + setDelay(time.Duration(delay) * time.Millisecond) } - setConcurrency(concurrency) case "-p": fallthrough case "--password": @@ -74,4 +82,15 @@ func Initialize() { log.Fatal("given source does not exist", source) } } + setDefaults() +} + +// unexported function(s) +func setDefaults() { + if Concurrency == 0 { + setConcurrency(runtime.NumCPU()) + } + if Delay == 0 { + setDelay(100) + } } diff --git a/settings/variables.go b/settings/variables.go index 045859d..fbd66df 100644 --- a/settings/variables.go +++ b/settings/variables.go @@ -3,6 +3,7 @@ package settings import ( "strconv" "strings" + "time" "velvettear/gosync/log" ) @@ -11,6 +12,7 @@ var Verbose bool var Source string var Target string var Concurrency int +var Delay time.Duration var Password string var User string @@ -57,6 +59,13 @@ func setConcurrency(concurrency int) { log.Debug("set concurrency", strconv.Itoa(Concurrency)) } +func setDelay(delay time.Duration) { + Delay = delay + ms := Delay.Milliseconds() + derp := strconv.FormatInt(ms, 10) + log.Debug("set delay", derp) +} + func setPassword(password string) { Password = password log.Debug("set password", Password) diff --git a/tools/rsync.go b/tools/rsync.go index 75378e2..04c9dfb 100644 --- a/tools/rsync.go +++ b/tools/rsync.go @@ -17,17 +17,20 @@ import ( "github.com/vbauerster/mpb/v8/decor" ) -var transferSize float64 +var transferSize int64 + +const byteToMegabyte = 1048576 +const megabyteToGigabyte = 1024 // exported function(s) func Transfer() { - transferSize = 0 sourcefiles := getSourceFiles() sourcefilesCount := len(sourcefiles) if sourcefilesCount <= 0 { log.Info("nothing to do - exiting...") os.Exit(0) } + sourceFileSize, sourceFileSizeName := humanizeBytes(getSourceFileSize()) var waitgroup sync.WaitGroup waitgroup.Add(sourcefilesCount) barcontainer := mpb.New( @@ -35,45 +38,37 @@ func Transfer() { ) timestamp := time.Now() counter := 0 - totalbar := createProgressBar(barcontainer, "Total", int64(0), int64(sourcefilesCount), sourcefilesCount+1, true) + totalbar := createProgressBar(barcontainer, strconv.FormatFloat(sourceFileSize, 'f', 2, 64)+sourceFileSizeName+" 󰇙 Total", int64(0), int64(sourcefilesCount), sourcefilesCount+1, true) concurrency := settings.Concurrency if concurrency == 0 { concurrency = sourcefilesCount } + first := true channel := make(chan struct{}, concurrency) - for index, file := range sourcefiles { + for file, size := range sourcefiles { channel <- struct{}{} - if index > 0 { - time.Sleep(100 * time.Millisecond) + if !first { + time.Sleep(settings.Delay) + } else { + first = false } - go func(index int, file string) { + go func(file string, size int64) { defer waitgroup.Done() - stats, error := os.Stat(file) - if error != nil { - log.Warning("encountered an error getting file size", error.Error()) - return - } - bar := createProgressBar(barcontainer, filepath.Base(file), stats.Size(), int64(100), index, false) + bar := createProgressBar(barcontainer, filepath.Base(file), size, int64(100), counter, false) if transferFile(bar, file) { counter++ } totalbar.Increment() <-channel - }(index, file) + }(file, size) } barcontainer.Wait() timeDifference := time.Since(timestamp) - transferSpeedName := "bytes/sec" - transferSpeed := transferSize / timeDifference.Seconds() - if transferSpeed > 1048576 { - transferSpeed = transferSpeed / 1048576 - transferSpeedName = "mb/s" - } - transferSizeName := "bytes" - if transferSize > 1048576 { - transferSize = transferSize / 1048576 - transferSizeName = "mb" - } + transferSpeedName := "B/sec" + transferSpeed := float64(transferSize) / timeDifference.Seconds() + transferSpeed, transferSpeedName = humanizeBytes(int64(transferSpeed)) + transferSpeedName += "/s" + transferSize, transferSizeName := humanizeBytes(transferSize) log.InfoTimed("transferred "+strconv.Itoa(counter)+" files, "+strconv.Itoa(int(transferSize))+" "+transferSizeName+" ("+strconv.FormatFloat(transferSpeed, 'f', 2, 64)+" "+transferSpeedName+")", timestamp.UnixMilli()) } @@ -84,11 +79,18 @@ func transferFile(bar *mpb.Bar, file string) bool { if len(settings.Password) > 0 { arguments = append(arguments, "-p", settings.Password) } - arguments = append(arguments, "rsync", "-avz", "--mkpath", file) - if len(settings.User) > 0 { - target = settings.User + "@" + target + if settings.SourceIsRemote() { + remote, _, _ := strings.Cut(settings.Source, ":") + file = remote + ":" + file + if len(settings.User) > 0 { + file = settings.User + "@" + file + } + } else { + if len(settings.User) > 0 { + target = settings.User + "@" + target + } } - arguments = append(arguments, target, "--progress") + arguments = append(arguments, "rsync", "-avz", "--mkpath", file, target, "--progress") cmd := exec.Command("sshpass", arguments...) stdout, stdoutError := cmd.StdoutPipe() stderr, stderrError := cmd.StderrPipe() @@ -123,7 +125,7 @@ func transferFile(bar *mpb.Bar, file string) bool { _, tmp, _ := strings.Cut(lowerline, "sent") tmp, _, _ = strings.Cut(tmp, "bytes") tmp = strings.ReplaceAll(strings.TrimSpace(tmp), ".", "") - bytes, error := strconv.ParseFloat(tmp, 64) + bytes, error := strconv.ParseInt(tmp, 10, 64) if error != nil { log.Fatal("encountered an error converting the transferred bytes to int", error.Error()) } @@ -173,6 +175,9 @@ func getTargetLocation(sourceFile string) string { return filepath.Join(settings.Target, filepath.Base(sourceFile)) } source, _ := strings.CutSuffix(settings.Source, "/*") + if settings.SourceIsRemote() { + _, source, _ = strings.Cut(settings.Source, ":") + } return filepath.Join(settings.Target, strings.Replace(sourceFile, filepath.Dir(source), "", 1)) } @@ -191,7 +196,7 @@ func createProgressBar(barcontainer *mpb.Progress, name string, size int64, max defaultBarAppend := mpb.AppendDecorators( decor.OnCompleteMeta(decor.Elapsed(decor.ET_STYLE_GO, decor.WCSyncSpaceR), colorize(yellow)), decor.OnComplete( - decor.Name("|", decor.WCSyncSpaceR), "", + decor.Name("󰇙", decor.WCSyncSpaceR), "", ), decor.OnComplete( decor.Percentage(decor.WCSyncSpaceR), "", @@ -233,3 +238,17 @@ func colorize(c *color.Color) func(string) string { return c.Sprint(s) } } + +func humanizeBytes(bytes int64) (float64, string) { + name := "B" + size := float64(bytes) + if size >= byteToMegabyte { + name = "MB" + size = size / byteToMegabyte + } + if size >= megabyteToGigabyte { + name = "GB" + size = size / megabyteToGigabyte + } + return size, name +} diff --git a/tools/scanner.go b/tools/scanner.go index f77e1f4..d7f0316 100644 --- a/tools/scanner.go +++ b/tools/scanner.go @@ -1,8 +1,11 @@ package tools import ( + "errors" + "io" "io/fs" "os" + "os/exec" "path/filepath" "strconv" "strings" @@ -11,30 +14,84 @@ import ( "velvettear/gosync/settings" ) -var sourceFiles []string +var sourceFiles map[string]int64 // unexported function(s) -func getSourceFiles() []string { +func getSourceFiles() map[string]int64 { timestamp := time.Now() + sourceFiles = map[string]int64{} if settings.SourceIsRemote() { - + log.Info("getting the list of remote source files...") + error := fillRemoteSourceFiles() + if error != nil { + log.Fatal("encountered an error getting the list of remote source files", error.Error()) + } } else { source, _ := strings.CutSuffix(settings.Source, "/*") stats, error := os.Stat(source) if error != nil { - log.Error("encountered an error getting the stats for the source", error.Error()) + log.Fatal("encountered an error getting stats for source", error.Error()) } if stats.IsDir() { log.Info("scanning source...", source) filepath.WalkDir(source, fillSourceFiles) } else { - sourceFiles = append(sourceFiles, settings.Source) + sourceFiles[settings.Source] = stats.Size() } } log.InfoTimed("found "+strconv.Itoa(len(sourceFiles))+" source files", timestamp.UnixMilli()) return sourceFiles } +func fillRemoteSourceFiles() error { + var arguments []string + if len(settings.Password) > 0 { + arguments = append(arguments, "-p", settings.Password) + } + arguments = append(arguments, "ssh") + remote, path, _ := strings.Cut(settings.Source, ":") + path, _ = strings.CutSuffix(path, "/*") + if len(settings.User) > 0 { + remote = settings.User + "@" + remote + } + arguments = append(arguments, remote, "find", "\""+path+"\"", "-type", "f", "-exec", "du", "-b", "{}", "\\;") + cmd := exec.Command("sshpass", arguments...) + stdout, stdoutError := cmd.StdoutPipe() + stderr, stderrError := cmd.StderrPipe() + cmd.Start() + if stdoutError != nil { + return stdoutError + } + if stderrError != nil { + return stderrError + } + outBytes, stdoutError := io.ReadAll(stdout) + if stdoutError != nil { + return stdoutError + } + errorBytes, stderrError := io.ReadAll(stderr) + if stderrError != nil { + return stderrError + } + cmd.Wait() + error := strings.TrimSpace(string(errorBytes)) + if len(error) > 0 { + return errors.New(error) + } + for _, line := range strings.Split(string(outBytes), "\n") { + if !strings.Contains(line, "\t") { + continue + } + parts := strings.Split(line, "\t") + size, error := strconv.ParseInt(parts[0], 10, 64) + if error != nil { + log.Warning("encountered an error getting the file size for file '"+path+"'", error.Error()) + } + sourceFiles[parts[1]] = size + } + return nil +} + func fillSourceFiles(path string, dir fs.DirEntry, err error) error { if err != nil { return err @@ -42,6 +99,19 @@ func fillSourceFiles(path string, dir fs.DirEntry, err error) error { if dir.IsDir() { return nil } - sourceFiles = append(sourceFiles, path) + stats, error := os.Stat(path) + if error != nil { + log.Fatal("encountered an error getting stats for file", error.Error()) + return nil + } + sourceFiles[path] = stats.Size() return nil } + +func getSourceFileSize() int64 { + var total int64 + for _, size := range sourceFiles { + total += size + } + return total +} diff --git a/tools/ssh.go b/tools/ssh.go index fd8a997..da9533d 100644 --- a/tools/ssh.go +++ b/tools/ssh.go @@ -13,7 +13,7 @@ import ( func TestConnection() error { var remote string if settings.SourceIsRemote() { - remote, _, _ = strings.Cut(settings.Target, ":") + remote, _, _ = strings.Cut(settings.Source, ":") } else if settings.TargetIsRemote() { remote, _, _ = strings.Cut(settings.Target, ":") } else { @@ -24,11 +24,10 @@ func TestConnection() error { arguments = append(arguments, "-p", settings.Password) } arguments = append(arguments, "ssh") - if len(settings.User) > 0 { remote = settings.User + "@" + remote } - arguments = append(arguments, remote, "'exit'") + arguments = append(arguments, remote, "exit") cmd := exec.Command("sshpass", arguments...) stdout, stdoutError := cmd.StdoutPipe() stderr, stderrError := cmd.StderrPipe()