package tools import ( "errors" "io" "os" "os/exec" "path/filepath" "strconv" "strings" "sync" "time" "velvettear/gosync/log" "velvettear/gosync/settings" "github.com/fatih/color" "github.com/vbauerster/mpb/v8" "github.com/vbauerster/mpb/v8/decor" ) var transferSize int64 const byteToMegabyte = 1048576 const megabyteToGigabyte = 1024 // exported function(s) func Transfer() { sourcefiles := getSourceFiles() sourcefilesCount := len(sourcefiles) if sourcefilesCount <= 0 { log.Info("nothing to do - exiting...") os.Exit(0) } sourceFileSize, sourceFileSizeName := humanizeBytes(getSourceFileSize()) var waitgroup sync.WaitGroup waitgroup.Add(sourcefilesCount) barcontainer := mpb.New( mpb.WithWaitGroup(&waitgroup), ) timestamp := time.Now() totalbar := createProgressBar(barcontainer, strconv.FormatFloat(sourceFileSize, 'f', 2, 64)+sourceFileSizeName+" 󰇙 Total", int64(0), int64(sourcefilesCount), sourcefilesCount+1, true) concurrency := settings.Concurrency if concurrency == 0 { concurrency = sourcefilesCount } counter := 0 index := 0 errors := make(map[string]error) channel := make(chan struct{}, concurrency) for file, size := range sourcefiles { channel <- struct{}{} if index > 0 { time.Sleep(settings.Delay) } go func(file string, size int64, index int) { defer waitgroup.Done() bar := createProgressBar(barcontainer, filepath.Base(file), size, int64(100), index, false) error := transferFile(bar, file) if error != nil { errors[file] = error bar.Abort(false) bar.Wait() } else { counter++ } totalbar.Increment() <-channel }(file, size, index) index++ } barcontainer.Wait() timeDifference := time.Since(timestamp) transferSpeedName := "B/sec" transferSpeed := float64(transferSize) / timeDifference.Seconds() transferSpeed, transferSpeedName = humanizeBytes(int64(transferSpeed)) transferSpeedName += "/s" transferSize, transferSizeName := humanizeBytes(transferSize) log.InfoTimed("transferred "+strconv.Itoa(counter)+" files, "+strconv.Itoa(int(transferSize))+" "+transferSizeName+" ("+strconv.FormatFloat(transferSpeed, 'f', 2, 64)+" "+transferSpeedName+")", timestamp.UnixMilli()) if len(errors) > 0 { log.Info("encountered " + strconv.Itoa(len(errors)) + " errors") for file, error := range errors { log.Info(file, error.Error()) } } } // unexported function(s) func transferFile(bar *mpb.Bar, file string) error { target := getTargetLocation(file) var arguments []string if len(settings.Password) > 0 { arguments = append(arguments, "-p", settings.Password) } if settings.SourceIsRemote() { remote, _, _ := strings.Cut(settings.Source, ":") file = remote + ":" + file if len(settings.User) > 0 { file = settings.User + "@" + file } } else { if len(settings.User) > 0 { target = settings.User + "@" + target } } arguments = append(arguments, "rsync", "-avz", "--secluded-args", "--mkpath", file, target, "--progress") cmd := exec.Command("sshpass", arguments...) stdout, stdoutError := cmd.StdoutPipe() stderr, stderrError := cmd.StderrPipe() cmd.Start() if stdoutError != nil { log.Warning("encountered an error opening remote stdout pipe", "file: "+file, stdoutError.Error()) return stderrError } if stderrError != nil { log.Warning("encountered an error opening remote stderr pipe", "file: "+file, stdoutError.Error()) return stderrError } var resultBytes []byte var waitgroup sync.WaitGroup waitgroup.Add(1) go func() { defer waitgroup.Done() for { tmp := make([]byte, 1024) readBytes, error := stdout.Read(tmp) if error != nil { if error != io.EOF { log.Warning("encountered an error reading stdout", "file: "+file, error.Error()) } break } if readBytes == 0 { break } resultBytes = append(resultBytes, tmp...) line := string(tmp) lowerline := strings.ToLower(line) if strings.Contains(lowerline, "sent") && strings.Contains(lowerline, "received") && strings.Contains(lowerline, "bytes") { _, tmp, _ := strings.Cut(lowerline, "sent") tmp, _, _ = strings.Cut(tmp, "bytes") tmp = strings.ReplaceAll(strings.TrimSpace(tmp), ".", "") bytes, error := strconv.ParseInt(tmp, 10, 64) if error != nil { log.Fatal("encountered an error converting the transferred bytes to int", "file: "+file, error.Error()) } transferSize += bytes } if strings.Contains(lowerline, "total size is") && strings.Contains(lowerline, "speedup is") { bar.SetCurrent(100) break } cut := strings.Index(line, "%") if cut <= 0 { continue } line = line[:cut] var percent string for index := len(line) - 1; index > 0; index-- { char := string(line[index]) if char == " " { break } percent = char + percent } value, error := strconv.Atoi(percent) if error != nil { continue } bar.SetCurrent(int64(value)) } }() errorBytes, stderrError := io.ReadAll(stderr) if stderrError != nil { log.Warning("encountered an error reading from remote stderr", "file: "+file, stdoutError.Error()) return stderrError } cmd.Wait() error := strings.Trim(string(errorBytes), "\n") if len(error) > 0 { log.Warning("encountered an remote error", "file: "+file, error) return errors.New(error) } waitgroup.Wait() stdout.Close() stderr.Close() return nil } func getTargetLocation(sourceFile string) string { if sourceFile == settings.Source { return filepath.Join(settings.Target, filepath.Base(sourceFile)) } source, _ := strings.CutSuffix(settings.Source, "/*") if settings.SourceIsRemote() { _, source, _ = strings.Cut(source, ":") source = filepath.Dir(source) } return filepath.Join(settings.Target, strings.Replace(sourceFile, source, "", 1)) } func createProgressBar(barcontainer *mpb.Progress, name string, size int64, max int64, priority int, total bool) *mpb.Bar { _, green, magenta, yellow, blue := color.New(color.FgRed), color.New(color.FgGreen), color.New(color.FgMagenta), color.New(color.FgYellow), color.New(color.FgBlue) barstyle := mpb.BarStyle().Lbound("").Filler("󰝤").Tip("").Padding(" ").Rbound("") defaultBarPrepend := mpb.PrependDecorators( decor.Name("[info] > "), decor.OnCompleteMeta( decor.OnComplete( decor.Meta(decor.Name(name, decor.WCSyncSpaceR), colorize(blue)), ""+name, ), colorize(green), ), // decor.OnAbortMeta( // decor.OnAbort( // decor.Meta(decor.Name(name, decor.WCSyncSpaceR), colorize(blue)), ""+name, // ), // colorize(red), // ), ) defaultBarAppend := mpb.AppendDecorators( decor.OnCompleteMeta(decor.Elapsed(decor.ET_STYLE_GO, decor.WCSyncSpaceR), colorize(yellow)), decor.OnComplete( decor.Name("󰇙", decor.WCSyncSpaceR), "", ), decor.OnComplete( decor.Percentage(decor.WCSyncSpaceR), "", ), // decor.OnAbort( // decor.Name("󰇙", decor.WCSyncSpaceR), "", // ), // decor.OnAbort( // decor.Percentage(decor.WCSyncSpace), "", // ), ) totalBarPrepend := mpb.PrependDecorators( decor.Name("[info] > "), decor.OnCompleteMeta( decor.OnComplete( decor.Meta(decor.Name(name, decor.WC{W: len(name) + 1, C: decor.DidentRight}), colorize(yellow)), ""+name, ), colorize(magenta), ), decor.CountersNoUnit("%d / %d"), ) if total { return barcontainer.New( max, barstyle, mpb.BarPriority(priority), mpb.BarFillerClearOnComplete(), totalBarPrepend, defaultBarAppend, ) } else { return barcontainer.New( max, barstyle, mpb.BarPriority(priority), mpb.BarFillerClearOnComplete(), defaultBarPrepend, defaultBarAppend, ) } } func colorize(c *color.Color) func(string) string { return func(s string) string { return c.Sprint(s) } } func humanizeBytes(bytes int64) (float64, string) { name := "B" size := float64(bytes) if size >= byteToMegabyte { name = "MB" size = size / byteToMegabyte } if size >= megabyteToGigabyte { name = "GB" size = size / megabyteToGigabyte } return size, name }