This commit is contained in:
Jaehoon Jeong 2026-04-22 23:38:02 -05:00 committed by GitHub
commit a29e0a8b5b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -60,9 +60,6 @@ type blobDownloadPart struct {
Size int64
Completed atomic.Int64
lastUpdatedMu sync.Mutex
lastUpdated time.Time
*blobDownload `json:"-"`
}
@ -102,6 +99,8 @@ const (
maxDownloadPartSize int64 = 1000 * format.MegaByte
)
var lastUpdated atomic.Value
func (p *blobDownloadPart) Name() string {
return strings.Join([]string{
p.blobDownload.Name, "partial", strconv.Itoa(p.N),
@ -119,9 +118,7 @@ func (p *blobDownloadPart) StopsAt() int64 {
func (p *blobDownloadPart) Write(b []byte) (n int, err error) {
n = len(b)
p.blobDownload.Completed.Add(int64(n))
p.lastUpdatedMu.Lock()
p.lastUpdated = time.Now()
p.lastUpdatedMu.Unlock()
lastUpdated.Store(time.Now())
return n, nil
}
@ -272,6 +269,7 @@ func (b *blobDownload) run(ctx context.Context, requestURL *url.URL, opts *regis
return err
}
lastUpdated.Store(time.Now())
g, inner := errgroup.WithContext(ctx)
g.SetLimit(numDownloadParts)
for i := range b.Parts {
@ -282,9 +280,11 @@ func (b *blobDownload) run(ctx context.Context, requestURL *url.URL, opts *regis
g.Go(func() error {
var err error
var realTry = 0
for try := 0; try < maxRetries; try++ {
w := io.NewOffsetWriter(file, part.StartsAt())
err = b.downloadChunk(inner, directURL, w, part)
realTry++
switch {
case errors.Is(err, context.Canceled), errors.Is(err, syscall.ENOSPC):
// return immediately if the context is canceled or the device is out of space
@ -294,8 +294,17 @@ func (b *blobDownload) run(ctx context.Context, requestURL *url.URL, opts *regis
continue
case err != nil:
sleep := time.Second * time.Duration(math.Pow(2, float64(try)))
slog.Info(fmt.Sprintf("%s part %d attempt %d failed: %v, retrying in %s", b.Digest[7:19], part.N, try, err, sleep))
slog.Info(fmt.Sprintf("%s part %d attempt %d failed: %v, retrying in %s", b.Digest[7:19], part.N, realTry, err, sleep))
time.Sleep(sleep)
if time.Since(lastUpdated.Load().(time.Time)) < 5*time.Second {
try--
slog.Info(fmt.Sprintf("%s part %d wait to finish to download other part", b.Digest[7:19], part.N))
for time.Since(lastUpdated.Load().(time.Time)) < 5*time.Second {
time.Sleep(5 * time.Second)
}
slog.Info(fmt.Sprintf("%s part %d continue to download", b.Digest[7:19], part.N))
}
continue
default:
return nil
@ -367,17 +376,10 @@ func (b *blobDownload) downloadChunk(ctx context.Context, requestURL *url.URL, w
return nil
}
part.lastUpdatedMu.Lock()
lastUpdated := part.lastUpdated
part.lastUpdatedMu.Unlock()
if !lastUpdated.IsZero() && time.Since(lastUpdated) > 30*time.Second {
if time.Since(lastUpdated.Load().(time.Time)) > 5*time.Second {
const msg = "%s part %d stalled; retrying. If this persists, press ctrl-c to exit, then 'ollama pull' to find a faster connection."
slog.Info(fmt.Sprintf(msg, b.Digest[7:19], part.N))
// reset last updated
part.lastUpdatedMu.Lock()
part.lastUpdated = time.Time{}
part.lastUpdatedMu.Unlock()
lastUpdated.Store(time.Now())
return errPartStalled
}
case <-ctx.Done():