feat(storage): add multistream support to MRD#13792
feat(storage): add multistream support to MRD#13792krishnamd-jkp wants to merge 3 commits intogoogleapis:mainfrom
Conversation
Summary of ChangesHello @krishnamd-jkp, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly upgrades the MultiRangeDownloader (MRD) by integrating multistream capabilities. This enhancement allows the MRD to leverage multiple concurrent gRPC streams for downloading object ranges, which can lead to improved throughput and reduced latency, especially for large objects or high-concurrency scenarios. The changes include mechanisms for dynamic stream creation, intelligent load distribution of range requests across these streams, and new configuration options to fine-tune the multistream behavior based on application requirements. Highlights
Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces multistream support for the Multi-Range Downloader (MRD), refactoring the single-stream architecture to manage multiple concurrent download streams. However, it introduces a critical race condition that can lead to a nil pointer dereference and a medium-severity data race. This occurs because a new stream can send responses to the manager before it's fully tracked, and the gRPC read specification is shared and modified across multiple goroutines without proper synchronization. Additionally, there is a suggestion regarding the public API for configuring the new multistream options to improve consistency.
2d77a84 to
23cf756
Compare
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request adds support for multiple concurrent streams to the Multi-Range Downloader (MRD). However, a critical data race was identified in the addNewStream method, which needs to be addressed for reliability. Additionally, a significant resilience issue exists where a single stream failure can bring down the entire downloader, and clarifications to the documentation for new configuration options are suggested.
| // This should never be hit in practice, but is a safety valve to prevent | ||
| // unbounded memory usage if the user is adding ranges faster than they | ||
| // can be processed. | ||
| mrdAddInternalQueueMaxSize = 50000 | ||
| defaultTargetPendingBytes = 1 << 30 // 1 GiB |
There was a problem hiding this comment.
Will it be better to tune this based on machine spec, a little intelligent default than a standard one?
| pendingRanges: make(map[int64]*rangeRequest), | ||
| }, | ||
| } | ||
| }(id, proto.Clone(m.readSpec).(*storagepb.BidiReadObjectSpec)) |
There was a problem hiding this comment.
Should we add the updated read handle to this spec (lastReadHandle)?
There was a problem hiding this comment.
m.ReadSpec stores the latest read handle (till that point) anytime there is a new readhandle sent by the server
| @@ -515,21 +639,29 @@ func (m *multiRangeDownloaderManager) handleAddCmd(ctx context.Context, cmd *mrd | |||
| } | |||
| m.readIDCounter++ | |||
|
|
|||
| // Attributes should be ready if we are processing Add commands | |||
There was a problem hiding this comment.
Nit: Period after comments.
|
|
||
| // Beyond minConnections, we only add if all existing active streams are at capacity. | ||
| allAtCapacity := true | ||
| for _, stream := range m.streams { |
There was a problem hiding this comment.
Can we do something better than iterate to understand if streams are at capacity? What if we keep track of the number of streams at capacity whenever they meet targetPendingRanges or targetPendingBytes and take them out of capacity when the results are processed. if the number of streams at capacity = number of current streams, add one
This looks like too many iterations for each Add command.
| } | ||
| m.pendingRanges = make(map[int64]*rangeRequest) | ||
| m.unsentRequests = newRequestQueue() |
There was a problem hiding this comment.
Why are we initializing Requests with a Queue?
There was a problem hiding this comment.
This is to clear any pending requests left in the queue upon failure. just like we are clearing stream.pendingRanges with make(map[int64]*rangeRequest) in line 924
There was a problem hiding this comment.
Nit: add a function like Clear() where we can call list.Init() to make this clearer.
| } | ||
| err := result.err | ||
| var ensureErr error | ||
|
|
||
| if result.redirect != nil { | ||
| m.readSpec.RoutingToken = result.redirect.RoutingToken | ||
| m.readSpec.ReadHandle = result.redirect.ReadHandle | ||
| ensureErr = m.ensureSession(m.ctx) | ||
| ensureErr = m.ensureSession(m.ctx, stream) |
There was a problem hiding this comment.
Since now we have multiple streams, re establishing an existing stream should not halt other streams from processing new commands. This synchronous call could become a bottleneck
No description provided.