added support for file transfers from remote to local

This commit is contained in:
Daniel Sommer 2023-09-22 15:50:03 +02:00
parent bf8eadffaf
commit 7bdce49c72
7 changed files with 191 additions and 53 deletions

17
.vscode/launch.json vendored
View file

@ -18,6 +18,23 @@
], ],
"console": "integratedTerminal" "console": "integratedTerminal"
}, },
{
"name": "gosync-remote",
"type": "go",
"request": "launch",
"mode": "auto",
"program": "${workspaceFolder}/main.go",
"args": [
"192.168.1.11:/share/music/Fu Manchu/*",
"/tmp",
"--password",
"$Velvet90",
"--concurrency",
"4",
"--verbose",
],
"console": "integratedTerminal"
},
{ {
"name": "gosync-root", "name": "gosync-root",
"type": "go", "type": "go",

View file

@ -14,9 +14,14 @@ a simple wrapper for concurrent rsync processes written in golang
### options ### options
| short | long | description | | short | long | description | default |
| ----- | ------------- | ------------------------------------------- | | ----- | ------------- | ----------------------------------------------- | ------------------- |
| -u | --user | set user for ssh / rsync | | -u | --user | set user for ssh / rsync | |
| -p | --password | set password for ssh / rsync | | -p | --password | set password for ssh / rsync | |
| -c | --concurrency | set limit for concurrent rsync processes | | -c | --concurrency | set limit for concurrent rsync processes | number of cpu cores |
| -v | --verbose | enable verbose / debug output | | -d | --delay | set the delay between rsync connections (in ms) | 100 |
| -v | --verbose | enable verbose / debug output | |
### troubleshooting
if you experience errors like `kex_exchange_identification: read: Connection reset by peer` it may be helpful to increase the default delay (100ms) between rsync connections or decrease the concurrency.

View file

@ -5,6 +5,7 @@ import (
"runtime" "runtime"
"strconv" "strconv"
"strings" "strings"
"time"
"velvettear/gosync/help" "velvettear/gosync/help"
"velvettear/gosync/log" "velvettear/gosync/log"
) )
@ -31,17 +32,24 @@ func Initialize() {
fallthrough fallthrough
case "--concurrency": case "--concurrency":
index++ index++
var concurrency int
if index < len(os.Args) { if index < len(os.Args) {
tmp, error := strconv.Atoi(os.Args[index]) concurrency, error := strconv.Atoi(os.Args[index])
if error == nil { if error != nil {
concurrency = tmp break
} }
setConcurrency(concurrency)
} }
if concurrency == 0 { case "-d":
concurrency = runtime.NumCPU() fallthrough
case "--delay":
index++
if index < len(os.Args) {
delay, error := strconv.Atoi(os.Args[index])
if error != nil {
break
}
setDelay(time.Duration(delay) * time.Millisecond)
} }
setConcurrency(concurrency)
case "-p": case "-p":
fallthrough fallthrough
case "--password": case "--password":
@ -74,4 +82,15 @@ func Initialize() {
log.Fatal("given source does not exist", source) log.Fatal("given source does not exist", source)
} }
} }
setDefaults()
}
// unexported function(s)
func setDefaults() {
if Concurrency == 0 {
setConcurrency(runtime.NumCPU())
}
if Delay == 0 {
setDelay(100)
}
} }

View file

@ -3,6 +3,7 @@ package settings
import ( import (
"strconv" "strconv"
"strings" "strings"
"time"
"velvettear/gosync/log" "velvettear/gosync/log"
) )
@ -11,6 +12,7 @@ var Verbose bool
var Source string var Source string
var Target string var Target string
var Concurrency int var Concurrency int
var Delay time.Duration
var Password string var Password string
var User string var User string
@ -57,6 +59,13 @@ func setConcurrency(concurrency int) {
log.Debug("set concurrency", strconv.Itoa(Concurrency)) log.Debug("set concurrency", strconv.Itoa(Concurrency))
} }
func setDelay(delay time.Duration) {
Delay = delay
ms := Delay.Milliseconds()
derp := strconv.FormatInt(ms, 10)
log.Debug("set delay", derp)
}
func setPassword(password string) { func setPassword(password string) {
Password = password Password = password
log.Debug("set password", Password) log.Debug("set password", Password)

View file

@ -17,17 +17,20 @@ import (
"github.com/vbauerster/mpb/v8/decor" "github.com/vbauerster/mpb/v8/decor"
) )
var transferSize float64 var transferSize int64
const byteToMegabyte = 1048576
const megabyteToGigabyte = 1024
// exported function(s) // exported function(s)
func Transfer() { func Transfer() {
transferSize = 0
sourcefiles := getSourceFiles() sourcefiles := getSourceFiles()
sourcefilesCount := len(sourcefiles) sourcefilesCount := len(sourcefiles)
if sourcefilesCount <= 0 { if sourcefilesCount <= 0 {
log.Info("nothing to do - exiting...") log.Info("nothing to do - exiting...")
os.Exit(0) os.Exit(0)
} }
sourceFileSize, sourceFileSizeName := humanizeBytes(getSourceFileSize())
var waitgroup sync.WaitGroup var waitgroup sync.WaitGroup
waitgroup.Add(sourcefilesCount) waitgroup.Add(sourcefilesCount)
barcontainer := mpb.New( barcontainer := mpb.New(
@ -35,45 +38,37 @@ func Transfer() {
) )
timestamp := time.Now() timestamp := time.Now()
counter := 0 counter := 0
totalbar := createProgressBar(barcontainer, "Total", int64(0), int64(sourcefilesCount), sourcefilesCount+1, true) totalbar := createProgressBar(barcontainer, strconv.FormatFloat(sourceFileSize, 'f', 2, 64)+sourceFileSizeName+" 󰇙 Total", int64(0), int64(sourcefilesCount), sourcefilesCount+1, true)
concurrency := settings.Concurrency concurrency := settings.Concurrency
if concurrency == 0 { if concurrency == 0 {
concurrency = sourcefilesCount concurrency = sourcefilesCount
} }
first := true
channel := make(chan struct{}, concurrency) channel := make(chan struct{}, concurrency)
for index, file := range sourcefiles { for file, size := range sourcefiles {
channel <- struct{}{} channel <- struct{}{}
if index > 0 { if !first {
time.Sleep(100 * time.Millisecond) time.Sleep(settings.Delay)
} else {
first = false
} }
go func(index int, file string) { go func(file string, size int64) {
defer waitgroup.Done() defer waitgroup.Done()
stats, error := os.Stat(file) bar := createProgressBar(barcontainer, filepath.Base(file), size, int64(100), counter, false)
if error != nil {
log.Warning("encountered an error getting file size", error.Error())
return
}
bar := createProgressBar(barcontainer, filepath.Base(file), stats.Size(), int64(100), index, false)
if transferFile(bar, file) { if transferFile(bar, file) {
counter++ counter++
} }
totalbar.Increment() totalbar.Increment()
<-channel <-channel
}(index, file) }(file, size)
} }
barcontainer.Wait() barcontainer.Wait()
timeDifference := time.Since(timestamp) timeDifference := time.Since(timestamp)
transferSpeedName := "bytes/sec" transferSpeedName := "B/sec"
transferSpeed := transferSize / timeDifference.Seconds() transferSpeed := float64(transferSize) / timeDifference.Seconds()
if transferSpeed > 1048576 { transferSpeed, transferSpeedName = humanizeBytes(int64(transferSpeed))
transferSpeed = transferSpeed / 1048576 transferSpeedName += "/s"
transferSpeedName = "mb/s" transferSize, transferSizeName := humanizeBytes(transferSize)
}
transferSizeName := "bytes"
if transferSize > 1048576 {
transferSize = transferSize / 1048576
transferSizeName = "mb"
}
log.InfoTimed("transferred "+strconv.Itoa(counter)+" files, "+strconv.Itoa(int(transferSize))+" "+transferSizeName+" ("+strconv.FormatFloat(transferSpeed, 'f', 2, 64)+" "+transferSpeedName+")", timestamp.UnixMilli()) log.InfoTimed("transferred "+strconv.Itoa(counter)+" files, "+strconv.Itoa(int(transferSize))+" "+transferSizeName+" ("+strconv.FormatFloat(transferSpeed, 'f', 2, 64)+" "+transferSpeedName+")", timestamp.UnixMilli())
} }
@ -84,11 +79,18 @@ func transferFile(bar *mpb.Bar, file string) bool {
if len(settings.Password) > 0 { if len(settings.Password) > 0 {
arguments = append(arguments, "-p", settings.Password) arguments = append(arguments, "-p", settings.Password)
} }
arguments = append(arguments, "rsync", "-avz", "--mkpath", file) if settings.SourceIsRemote() {
if len(settings.User) > 0 { remote, _, _ := strings.Cut(settings.Source, ":")
target = settings.User + "@" + target file = remote + ":" + file
if len(settings.User) > 0 {
file = settings.User + "@" + file
}
} else {
if len(settings.User) > 0 {
target = settings.User + "@" + target
}
} }
arguments = append(arguments, target, "--progress") arguments = append(arguments, "rsync", "-avz", "--mkpath", file, target, "--progress")
cmd := exec.Command("sshpass", arguments...) cmd := exec.Command("sshpass", arguments...)
stdout, stdoutError := cmd.StdoutPipe() stdout, stdoutError := cmd.StdoutPipe()
stderr, stderrError := cmd.StderrPipe() stderr, stderrError := cmd.StderrPipe()
@ -123,7 +125,7 @@ func transferFile(bar *mpb.Bar, file string) bool {
_, tmp, _ := strings.Cut(lowerline, "sent") _, tmp, _ := strings.Cut(lowerline, "sent")
tmp, _, _ = strings.Cut(tmp, "bytes") tmp, _, _ = strings.Cut(tmp, "bytes")
tmp = strings.ReplaceAll(strings.TrimSpace(tmp), ".", "") tmp = strings.ReplaceAll(strings.TrimSpace(tmp), ".", "")
bytes, error := strconv.ParseFloat(tmp, 64) bytes, error := strconv.ParseInt(tmp, 10, 64)
if error != nil { if error != nil {
log.Fatal("encountered an error converting the transferred bytes to int", error.Error()) log.Fatal("encountered an error converting the transferred bytes to int", error.Error())
} }
@ -173,6 +175,9 @@ func getTargetLocation(sourceFile string) string {
return filepath.Join(settings.Target, filepath.Base(sourceFile)) return filepath.Join(settings.Target, filepath.Base(sourceFile))
} }
source, _ := strings.CutSuffix(settings.Source, "/*") source, _ := strings.CutSuffix(settings.Source, "/*")
if settings.SourceIsRemote() {
_, source, _ = strings.Cut(settings.Source, ":")
}
return filepath.Join(settings.Target, strings.Replace(sourceFile, filepath.Dir(source), "", 1)) return filepath.Join(settings.Target, strings.Replace(sourceFile, filepath.Dir(source), "", 1))
} }
@ -191,7 +196,7 @@ func createProgressBar(barcontainer *mpb.Progress, name string, size int64, max
defaultBarAppend := mpb.AppendDecorators( defaultBarAppend := mpb.AppendDecorators(
decor.OnCompleteMeta(decor.Elapsed(decor.ET_STYLE_GO, decor.WCSyncSpaceR), colorize(yellow)), decor.OnCompleteMeta(decor.Elapsed(decor.ET_STYLE_GO, decor.WCSyncSpaceR), colorize(yellow)),
decor.OnComplete( decor.OnComplete(
decor.Name("|", decor.WCSyncSpaceR), "", decor.Name("󰇙", decor.WCSyncSpaceR), "",
), ),
decor.OnComplete( decor.OnComplete(
decor.Percentage(decor.WCSyncSpaceR), "", decor.Percentage(decor.WCSyncSpaceR), "",
@ -233,3 +238,17 @@ func colorize(c *color.Color) func(string) string {
return c.Sprint(s) return c.Sprint(s)
} }
} }
func humanizeBytes(bytes int64) (float64, string) {
name := "B"
size := float64(bytes)
if size >= byteToMegabyte {
name = "MB"
size = size / byteToMegabyte
}
if size >= megabyteToGigabyte {
name = "GB"
size = size / megabyteToGigabyte
}
return size, name
}

View file

@ -1,8 +1,11 @@
package tools package tools
import ( import (
"errors"
"io"
"io/fs" "io/fs"
"os" "os"
"os/exec"
"path/filepath" "path/filepath"
"strconv" "strconv"
"strings" "strings"
@ -11,30 +14,84 @@ import (
"velvettear/gosync/settings" "velvettear/gosync/settings"
) )
var sourceFiles []string var sourceFiles map[string]int64
// unexported function(s) // unexported function(s)
func getSourceFiles() []string { func getSourceFiles() map[string]int64 {
timestamp := time.Now() timestamp := time.Now()
sourceFiles = map[string]int64{}
if settings.SourceIsRemote() { if settings.SourceIsRemote() {
log.Info("getting the list of remote source files...")
error := fillRemoteSourceFiles()
if error != nil {
log.Fatal("encountered an error getting the list of remote source files", error.Error())
}
} else { } else {
source, _ := strings.CutSuffix(settings.Source, "/*") source, _ := strings.CutSuffix(settings.Source, "/*")
stats, error := os.Stat(source) stats, error := os.Stat(source)
if error != nil { if error != nil {
log.Error("encountered an error getting the stats for the source", error.Error()) log.Fatal("encountered an error getting stats for source", error.Error())
} }
if stats.IsDir() { if stats.IsDir() {
log.Info("scanning source...", source) log.Info("scanning source...", source)
filepath.WalkDir(source, fillSourceFiles) filepath.WalkDir(source, fillSourceFiles)
} else { } else {
sourceFiles = append(sourceFiles, settings.Source) sourceFiles[settings.Source] = stats.Size()
} }
} }
log.InfoTimed("found "+strconv.Itoa(len(sourceFiles))+" source files", timestamp.UnixMilli()) log.InfoTimed("found "+strconv.Itoa(len(sourceFiles))+" source files", timestamp.UnixMilli())
return sourceFiles return sourceFiles
} }
func fillRemoteSourceFiles() error {
var arguments []string
if len(settings.Password) > 0 {
arguments = append(arguments, "-p", settings.Password)
}
arguments = append(arguments, "ssh")
remote, path, _ := strings.Cut(settings.Source, ":")
path, _ = strings.CutSuffix(path, "/*")
if len(settings.User) > 0 {
remote = settings.User + "@" + remote
}
arguments = append(arguments, remote, "find", "\""+path+"\"", "-type", "f", "-exec", "du", "-b", "{}", "\\;")
cmd := exec.Command("sshpass", arguments...)
stdout, stdoutError := cmd.StdoutPipe()
stderr, stderrError := cmd.StderrPipe()
cmd.Start()
if stdoutError != nil {
return stdoutError
}
if stderrError != nil {
return stderrError
}
outBytes, stdoutError := io.ReadAll(stdout)
if stdoutError != nil {
return stdoutError
}
errorBytes, stderrError := io.ReadAll(stderr)
if stderrError != nil {
return stderrError
}
cmd.Wait()
error := strings.TrimSpace(string(errorBytes))
if len(error) > 0 {
return errors.New(error)
}
for _, line := range strings.Split(string(outBytes), "\n") {
if !strings.Contains(line, "\t") {
continue
}
parts := strings.Split(line, "\t")
size, error := strconv.ParseInt(parts[0], 10, 64)
if error != nil {
log.Warning("encountered an error getting the file size for file '"+path+"'", error.Error())
}
sourceFiles[parts[1]] = size
}
return nil
}
func fillSourceFiles(path string, dir fs.DirEntry, err error) error { func fillSourceFiles(path string, dir fs.DirEntry, err error) error {
if err != nil { if err != nil {
return err return err
@ -42,6 +99,19 @@ func fillSourceFiles(path string, dir fs.DirEntry, err error) error {
if dir.IsDir() { if dir.IsDir() {
return nil return nil
} }
sourceFiles = append(sourceFiles, path) stats, error := os.Stat(path)
if error != nil {
log.Fatal("encountered an error getting stats for file", error.Error())
return nil
}
sourceFiles[path] = stats.Size()
return nil return nil
} }
func getSourceFileSize() int64 {
var total int64
for _, size := range sourceFiles {
total += size
}
return total
}

View file

@ -13,7 +13,7 @@ import (
func TestConnection() error { func TestConnection() error {
var remote string var remote string
if settings.SourceIsRemote() { if settings.SourceIsRemote() {
remote, _, _ = strings.Cut(settings.Target, ":") remote, _, _ = strings.Cut(settings.Source, ":")
} else if settings.TargetIsRemote() { } else if settings.TargetIsRemote() {
remote, _, _ = strings.Cut(settings.Target, ":") remote, _, _ = strings.Cut(settings.Target, ":")
} else { } else {
@ -24,11 +24,10 @@ func TestConnection() error {
arguments = append(arguments, "-p", settings.Password) arguments = append(arguments, "-p", settings.Password)
} }
arguments = append(arguments, "ssh") arguments = append(arguments, "ssh")
if len(settings.User) > 0 { if len(settings.User) > 0 {
remote = settings.User + "@" + remote remote = settings.User + "@" + remote
} }
arguments = append(arguments, remote, "'exit'") arguments = append(arguments, remote, "exit")
cmd := exec.Command("sshpass", arguments...) cmd := exec.Command("sshpass", arguments...)
stdout, stdoutError := cmd.StdoutPipe() stdout, stdoutError := cmd.StdoutPipe()
stderr, stderrError := cmd.StderrPipe() stderr, stderrError := cmd.StderrPipe()