Skip to content

feat(storage): add multistream support to MRD#13792

Open
krishnamd-jkp wants to merge 3 commits intogoogleapis:mainfrom
krishnamd-jkp:multistream_func
Open

feat(storage): add multistream support to MRD#13792
krishnamd-jkp wants to merge 3 commits intogoogleapis:mainfrom
krishnamd-jkp:multistream_func

Conversation

@krishnamd-jkp
Copy link
Contributor

No description provided.

@krishnamd-jkp krishnamd-jkp requested review from a team as code owners February 5, 2026 10:47
@product-auto-label product-auto-label bot added the api: storage Issues related to the Cloud Storage API. label Feb 5, 2026
@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @krishnamd-jkp, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly upgrades the MultiRangeDownloader (MRD) by integrating multistream capabilities. This enhancement allows the MRD to leverage multiple concurrent gRPC streams for downloading object ranges, which can lead to improved throughput and reduced latency, especially for large objects or high-concurrency scenarios. The changes include mechanisms for dynamic stream creation, intelligent load distribution of range requests across these streams, and new configuration options to fine-tune the multistream behavior based on application requirements.

Highlights

  • Multistream Support for MRD: The MultiRangeDownloader (MRD) now supports using multiple concurrent bidirectional gRPC streams to download object ranges, enhancing download efficiency.
  • Dynamic Stream Management: The system can dynamically create and manage multiple streams, adding new streams when existing ones reach configurable pending range or byte thresholds, up to a maximum connection limit.
  • Configurable Stream Behavior: New options (minConnections, maxConnections, targetPendingRanges, targetPendingBytes) are introduced to allow users to control the number of streams and the conditions under which new streams are added.
  • Load Balancing: A minBytesPicker strategy is implemented to distribute range requests among active streams, aiming to balance the outstanding bytes across connections.
Changelog
  • storage/client.go
    • Added minConnections, maxConnections, targetPendingRanges, and targetPendingBytes fields to newMultiRangeDownloaderParams for configuring multistream behavior.
  • storage/grpc_reader_multi_range.go
    • Introduced new constants: mrdSendChannelSize, defaultTargetPendingBytes, defaultTargetPendingRanges.
    • Defined streamPickerStrategy interface and minBytesPicker implementation for selecting streams for range requests.
    • Added mrdStream struct to manage individual stream state, including pending ranges and byte counts.
    • Refactored multiRangeDownloaderManager to maintain a map of mrdStream instances, replacing the single currentSession.
    • Implemented shouldAddStream and addNewStream functions to dynamically create new streams based on connection limits and pending load thresholds.
    • Updated the eventLoop to handle commands and session results from multiple streams, and to trigger new stream creation.
    • Modified createNewSession to accept a stream id and to optionally wait for the first result.
    • Adjusted handleAddCmd to use the streamPicker for routing range requests to specific streams and updating stream-specific metrics.
    • Updated processSessionResult, handleStreamEnd, ensureSession, failRange, and failAllPending to correctly manage state and operations across multiple streams.
    • Introduced addStreamCmd and mrdAddStreamErrorCmd for internal stream management.
    • Modified mrdSessionResult to include the stream id.
    • Changed bidiReadStreamSession to store its id.
    • Introduced mrdQueuedRequest to associate queued requests with their target stream id.
  • storage/reader.go
    • Defined mrdOption interface and concrete option types (minConnections, maxConnections, targetPendingRanges, targetPendingBytes) for configuring MultiRangeDownloader.
    • Updated NewMultiRangeDownloader to accept and apply these new mrdOptions.
Activity
  • No specific activity (comments, reviews, etc.) was provided in the context. The pull request was created by krishnamd-jkp.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces multistream support for the Multi-Range Downloader (MRD), refactoring the single-stream architecture to manage multiple concurrent download streams. However, it introduces a critical race condition that can lead to a nil pointer dereference and a medium-severity data race. This occurs because a new stream can send responses to the manager before it's fully tracked, and the gRPC read specification is shared and modified across multiple goroutines without proper synchronization. Additionally, there is a suggestion regarding the public API for configuring the new multistream options to improve consistency.

@krishnamd-jkp
Copy link
Contributor Author

/gemini review

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request adds support for multiple concurrent streams to the Multi-Range Downloader (MRD). However, a critical data race was identified in the addNewStream method, which needs to be addressed for reliability. Additionally, a significant resilience issue exists where a single stream failure can bring down the entire downloader, and clarifications to the documentation for new configuration options are suggested.

Copy link
Contributor

@cpriti-os cpriti-os left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initial review

// This should never be hit in practice, but is a safety valve to prevent
// unbounded memory usage if the user is adding ranges faster than they
// can be processed.
mrdAddInternalQueueMaxSize = 50000
defaultTargetPendingBytes = 1 << 30 // 1 GiB
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will it be better to tune this based on machine spec, a little intelligent default than a standard one?

pendingRanges: make(map[int64]*rangeRequest),
},
}
}(id, proto.Clone(m.readSpec).(*storagepb.BidiReadObjectSpec))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add the updated read handle to this spec (lastReadHandle)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

m.ReadSpec stores the latest read handle (till that point) anytime there is a new readhandle sent by the server

@@ -515,21 +639,29 @@ func (m *multiRangeDownloaderManager) handleAddCmd(ctx context.Context, cmd *mrd
}
m.readIDCounter++

// Attributes should be ready if we are processing Add commands
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Period after comments.


// Beyond minConnections, we only add if all existing active streams are at capacity.
allAtCapacity := true
for _, stream := range m.streams {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do something better than iterate to understand if streams are at capacity? What if we keep track of the number of streams at capacity whenever they meet targetPendingRanges or targetPendingBytes and take them out of capacity when the results are processed. if the number of streams at capacity = number of current streams, add one
This looks like too many iterations for each Add command.

}
m.pendingRanges = make(map[int64]*rangeRequest)
m.unsentRequests = newRequestQueue()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we initializing Requests with a Queue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to clear any pending requests left in the queue upon failure. just like we are clearing stream.pendingRanges with make(map[int64]*rangeRequest) in line 924

Copy link
Contributor

@cpriti-os cpriti-os Feb 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: add a function like Clear() where we can call list.Init() to make this clearer.

}
err := result.err
var ensureErr error

if result.redirect != nil {
m.readSpec.RoutingToken = result.redirect.RoutingToken
m.readSpec.ReadHandle = result.redirect.ReadHandle
ensureErr = m.ensureSession(m.ctx)
ensureErr = m.ensureSession(m.ctx, stream)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since now we have multiple streams, re establishing an existing stream should not halt other streams from processing new commands. This synchronous call could become a bottleneck

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

api: storage Issues related to the Cloud Storage API.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants