server: download: replace mutax to atomic.Value

This commit is contained in:
Jaehoon Jeong 2025-03-26 22:53:42 +09:00
parent e306d8e032
commit bfd79d152b

View file

@ -97,10 +97,7 @@ const (
maxDownloadPartSize int64 = 1000 * format.MegaByte
)
var (
lastUpdatedMu sync.Mutex
lastUpdated time.Time = time.Now()
)
var lastUpdated atomic.Value
func (p *blobDownloadPart) Name() string {
return strings.Join([]string{
@ -119,9 +116,7 @@ func (p *blobDownloadPart) StopsAt() int64 {
func (p *blobDownloadPart) Write(b []byte) (n int, err error) {
n = len(b)
p.blobDownload.Completed.Add(int64(n))
lastUpdatedMu.Lock()
lastUpdated = time.Now()
lastUpdatedMu.Unlock()
lastUpdated.Store(time.Now())
return n, nil
}
@ -272,6 +267,7 @@ func (b *blobDownload) run(ctx context.Context, requestURL *url.URL, opts *regis
return err
}
lastUpdated.Store(time.Now())
g, inner := errgroup.WithContext(ctx)
g.SetLimit(numDownloadParts)
for i := range b.Parts {
@ -299,17 +295,11 @@ func (b *blobDownload) run(ctx context.Context, requestURL *url.URL, opts *regis
slog.Info(fmt.Sprintf("%s part %d attempt %d failed: %v, retrying in %s", b.Digest[7:19], part.N, realTry, err, sleep))
time.Sleep(sleep)
lastUpdatedMu.Lock()
localLastUpdated := lastUpdated
lastUpdatedMu.Unlock()
if time.Since(localLastUpdated) < 5*time.Second {
if time.Since(lastUpdated.Load().(time.Time)) < 5*time.Second {
try--
slog.Info(fmt.Sprintf("%s part %d wait to finish to download other part", b.Digest[7:19], part.N))
for time.Since(localLastUpdated) < 5*time.Second {
for time.Since(lastUpdated.Load().(time.Time)) < 5*time.Second {
time.Sleep(5 * time.Second)
lastUpdatedMu.Lock()
localLastUpdated = lastUpdated
lastUpdatedMu.Unlock()
}
slog.Info(fmt.Sprintf("%s part %d continue to download", b.Digest[7:19], part.N))
}
@ -384,16 +374,10 @@ func (b *blobDownload) downloadChunk(ctx context.Context, requestURL *url.URL, w
return nil
}
lastUpdatedMu.Lock()
localLastUpdated := lastUpdated
lastUpdatedMu.Unlock()
if time.Since(localLastUpdated) > 5*time.Second {
if time.Since(lastUpdated.Load().(time.Time)) > 5*time.Second {
const msg = "%s part %d stalled; retrying. If this persists, press ctrl-c to exit, then 'ollama pull' to find a faster connection."
slog.Info(fmt.Sprintf(msg, b.Digest[7:19], part.N))
lastUpdatedMu.Lock()
lastUpdated = time.Now()
lastUpdatedMu.Unlock()
lastUpdated.Store(time.Now())
return errPartStalled
}
case <-ctx.Done():