mirror of
https://github.com/ollama/ollama
synced 2026-04-23 08:45:14 +00:00
server: fix download stall watchdog not firing when no bytes arrive
The chunk watchdog in `downloadChunk` gated its timeout check behind `!lastUpdated.IsZero()` and, on detecting a stall, reset `lastUpdated` back to the zero value. Together these caused the watchdog to permanently skip stall detection on any retry whose new connection produced zero bytes before stalling — the reader goroutine would then block forever on a dead TCP stream with no recovery. This is the pattern described upstream as "certain parts stall completely and zero data is received from the backend. The connection itself is still healthy so it doesn't trigger a retry", and matches user reports of hangs near 99% that only recover via ctrl-c and `ollama pull`. Fix: initialize `part.lastUpdated` at watchdog entry so the timer always has a reference point, drop the zero-time guard, and remove the zero-time reset. The stall window (previously the hardcoded literal `30*time.Second`) is now a package var `stallDuration` so tests can shorten it. Tested with a new `TestDownloadChunkStallWatchdogFiresWithoutProgress` that reproduces the bug: an httptest server that sends response headers and flushes, then holds the body without writing. Before the fix the watchdog never fires and the call hangs until the caller's context deadline. After the fix the watchdog fires inside one ticker interval and `downloadChunk` returns `errPartStalled`, letting the existing retry loop open a fresh connection. Refs #1736
This commit is contained in:
parent
ff23dd343f
commit
3b754dde9b
|
|
@ -30,6 +30,10 @@ import (
|
|||
|
||||
const maxRetries = 6
|
||||
|
||||
// stallDuration is the no-progress window after which a part is declared
|
||||
// stalled. A package var (not a const) so tests can shorten it.
|
||||
var stallDuration = 30 * time.Second
|
||||
|
||||
var (
|
||||
errMaxRetriesExceeded = errors.New("max retries exceeded")
|
||||
errPartStalled = errors.New("part stalled")
|
||||
|
|
@ -359,6 +363,13 @@ func (b *blobDownload) downloadChunk(ctx context.Context, requestURL *url.URL, w
|
|||
})
|
||||
|
||||
g.Go(func() error {
|
||||
// Initialize at watchdog entry so the stall timer fires even when
|
||||
// no bytes ever arrive from the server (otherwise lastUpdated stays
|
||||
// zero and the stall check never triggers on that retry).
|
||||
part.lastUpdatedMu.Lock()
|
||||
part.lastUpdated = time.Now()
|
||||
part.lastUpdatedMu.Unlock()
|
||||
|
||||
ticker := time.NewTicker(time.Second)
|
||||
for {
|
||||
select {
|
||||
|
|
@ -371,13 +382,9 @@ func (b *blobDownload) downloadChunk(ctx context.Context, requestURL *url.URL, w
|
|||
lastUpdated := part.lastUpdated
|
||||
part.lastUpdatedMu.Unlock()
|
||||
|
||||
if !lastUpdated.IsZero() && time.Since(lastUpdated) > 30*time.Second {
|
||||
if time.Since(lastUpdated) > stallDuration {
|
||||
const msg = "%s part %d stalled; retrying. If this persists, press ctrl-c to exit, then 'ollama pull' to find a faster connection."
|
||||
slog.Info(fmt.Sprintf(msg, b.Digest[7:19], part.N))
|
||||
// reset last updated
|
||||
part.lastUpdatedMu.Lock()
|
||||
part.lastUpdated = time.Time{}
|
||||
part.lastUpdatedMu.Unlock()
|
||||
return errPartStalled
|
||||
}
|
||||
case <-ctx.Done():
|
||||
|
|
|
|||
63
server/download_test.go
Normal file
63
server/download_test.go
Normal file
|
|
@ -0,0 +1,63 @@
|
|||
package server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"net/url"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
// TestDownloadChunkStallWatchdogFiresWithoutProgress verifies that the
|
||||
// chunk watchdog fires when the server sends response headers but never
|
||||
// writes any body bytes. Previously the watchdog's IsZero guard caused
|
||||
// it to permanently skip stall detection in this case, leaving the
|
||||
// reader goroutine blocked on a dead TCP stream.
|
||||
func TestDownloadChunkStallWatchdogFiresWithoutProgress(t *testing.T) {
|
||||
origStall := stallDuration
|
||||
stallDuration = 200 * time.Millisecond
|
||||
t.Cleanup(func() { stallDuration = origStall })
|
||||
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Length", "1024")
|
||||
w.Header().Set("Content-Range", "bytes 0-1023/1024")
|
||||
w.WriteHeader(http.StatusPartialContent)
|
||||
w.(http.Flusher).Flush()
|
||||
<-r.Context().Done()
|
||||
}))
|
||||
t.Cleanup(srv.Close)
|
||||
|
||||
u, err := url.Parse(srv.URL)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
b := &blobDownload{
|
||||
Name: filepath.Join(t.TempDir(), "blob"),
|
||||
Digest: "sha256:deadbeef1234567890abcdef",
|
||||
}
|
||||
part := &blobDownloadPart{
|
||||
blobDownload: b,
|
||||
N: 0,
|
||||
Offset: 0,
|
||||
Size: 1024,
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
start := time.Now()
|
||||
err = b.downloadChunk(ctx, u, io.Discard, part)
|
||||
elapsed := time.Since(start)
|
||||
|
||||
if !errors.Is(err, errPartStalled) {
|
||||
t.Fatalf("want errPartStalled, got %v (elapsed %v)", err, elapsed)
|
||||
}
|
||||
if elapsed > 2*time.Second {
|
||||
t.Fatalf("watchdog took too long to fire: %v (want ~stallDuration=%v)", elapsed, stallDuration)
|
||||
}
|
||||
}
|
||||
Loading…
Reference in a new issue