Back to merge requests
Open !45 opened 2 hours ago by
A
alice

feat: Implement REFT-native object streaming

feature/object-streaming main enhancement core
Description

Summary

This PR adds support for streaming large objects directly via the REFT wire protocol, avoiding the need to buffer entire objects in memory.

Changes

  • Added ObjectStream trait for streaming object access
  • Implemented chunked transfer for blobs larger than 1MB
  • Added progress callbacks for upload/download operations
  • Updated tests and documentation


  • Testing

  • Unit tests for stream handling
  • Integration tests with large files (up to 1GB)
  • Manual testing with slow network simulation


  • Related Issues

    Fixes #123
    +342 -45 3 files changed
    src/stream.rs
    +156 -0
    @@ -0,0 +1,156 @@
    1+//! Object streaming support for REFT protocol
    2+
    3+use std::io::{Read, Write};
    4+use crate::Result;
    5+
    6+/// Trait for streaming object access
    7+pub trait ObjectStream {
    8+ /// Read a chunk of the object
    9+ fn read_chunk(&mut self, buf: &mut [u8]) -> Result<usize>;
    10+
    11+ /// Get the total size if known
    12+ fn total_size(&self) -> Option<u64>;
    13+}
    src/client.rs
    +86 -12
    @@ -45,12 +45,24 @@
    4545impl ReftClient {
    4646 /// Fetch an object by hash
    47- pub async fn get_object(&self, hash: &Hash) -> Result<Vec<u8>> {
    48- let response = self.wire.fetch_object(hash).await?;
    49- Ok(response.data)
    47+ pub async fn get_object(&self, hash: &Hash) -> Result<ObjectReader> {
    48+ let stream = self.wire.fetch_object_stream(hash).await?;
    49+ Ok(ObjectReader::new(stream))
    5050 }
    5151
    52+ /// Fetch an object with progress callback
    53+ pub async fn get_object_with_progress<F>(
    54+ &self,
    55+ hash: &Hash,
    56+ on_progress: F,
    57+ ) -> Result<ObjectReader>
    58+ where
    59+ F: Fn(u64, u64) + Send + Sync,
    60+ {
    61+ let stream = self.wire.fetch_object_stream(hash).await?;
    62+ Ok(ObjectReader::with_progress(stream, on_progress))
    63+ }
    tests/streaming_test.rs
    +100 -0
    Large diff - click to expand

    Checks

    Approvals 1/2
    Pipeline Passed
    Conflicts None

    Reviewers

    B
    bob
    C
    charlie