diff --git a/src/libstd/rt/uv/uvio.rs b/src/libstd/rt/uv/uvio.rs index 2a62c782d0ba..4dfeb3fccdbd 100644 --- a/src/libstd/rt/uv/uvio.rs +++ b/src/libstd/rt/uv/uvio.rs @@ -460,11 +460,10 @@ impl IoFactory for UvIoFactory { } fn fs_from_raw_fd(&mut self, fd: c_int, close_on_drop: bool) -> ~RtioFileStream { - ~UvFileStream { - loop_: Loop{handle:self.uv_loop().native_handle()}, - fd: file::FileDescriptor(fd), - close_on_drop: close_on_drop, - } as ~RtioFileStream + let loop_ = Loop {handle: self.uv_loop().native_handle()}; + let fd = file::FileDescriptor(fd); + let home = get_handle_to_current_scheduler!(); + ~UvFileStream::new(loop_, fd, close_on_drop, home) as ~RtioFileStream } fn fs_open(&mut self, path: &P, flags: int, mode: int) @@ -480,10 +479,11 @@ impl IoFactory for UvIoFactory { let path = path_cell.take(); do file::FileDescriptor::open(loop_, path, flags, mode) |req,err| { if err.is_none() { - let res = Ok(~UvFileStream { - loop_: loop_, - fd: file::FileDescriptor(req.get_result()), - close_on_drop: true} as ~RtioFileStream); + let home = get_handle_to_current_scheduler!(); + let fd = file::FileDescriptor(req.get_result()); + let fs = ~UvFileStream::new( + loop_, fd, true, home) as ~RtioFileStream; + let res = Ok(fs); unsafe { (*result_cell_ptr).put_back(res); } let scheduler = Local::take::(); scheduler.resume_blocked_task_immediately(task_cell.take()); @@ -1061,46 +1061,64 @@ impl RtioTimer for UvTimer { pub struct UvFileStream { loop_: Loop, fd: file::FileDescriptor, - close_on_drop: bool + close_on_drop: bool, + home: SchedHandle +} + +impl HomingIO for UvFileStream { + fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home } } impl UvFileStream { + fn new(loop_: Loop, fd: file::FileDescriptor, close_on_drop: bool, + home: SchedHandle) -> UvFileStream { + UvFileStream { + loop_: loop_, + fd: fd, + close_on_drop: close_on_drop, + home: home + } + } fn base_read(&mut self, buf: &mut [u8], offset: i64) -> Result { - let scheduler = Local::take::(); let result_cell = Cell::new_empty(); let result_cell_ptr: *Cell> = &result_cell; let buf_ptr: *&mut [u8] = &buf; - do scheduler.deschedule_running_task_and_then |_, task| { - let buf = unsafe { slice_to_uv_buf(*buf_ptr) }; - let task_cell = Cell::new(task); - do self.fd.read(self.loop_, buf, offset) |req, uverr| { - let res = match uverr { - None => Ok(req.get_result() as int), - Some(err) => Err(uv_error_to_io_error(err)) + do self.home_for_io |self_| { + let scheduler = Local::take::(); + do scheduler.deschedule_running_task_and_then |_, task| { + let buf = unsafe { slice_to_uv_buf(*buf_ptr) }; + let task_cell = Cell::new(task); + do self_.fd.read(self.loop_, buf, offset) |req, uverr| { + let res = match uverr { + None => Ok(req.get_result() as int), + Some(err) => Err(uv_error_to_io_error(err)) + }; + unsafe { (*result_cell_ptr).put_back(res); } + let scheduler = Local::take::(); + scheduler.resume_blocked_task_immediately(task_cell.take()); }; - unsafe { (*result_cell_ptr).put_back(res); } - let scheduler = Local::take::(); - scheduler.resume_blocked_task_immediately(task_cell.take()); }; }; result_cell.take() } fn base_write(&mut self, buf: &[u8], offset: i64) -> Result<(), IoError> { - let scheduler = Local::take::(); let result_cell = Cell::new_empty(); let result_cell_ptr: *Cell> = &result_cell; let buf_ptr: *&[u8] = &buf; - do scheduler.deschedule_running_task_and_then |_, task| { - let buf = unsafe { slice_to_uv_buf(*buf_ptr) }; - let task_cell = Cell::new(task); - do self.fd.write(self.loop_, buf, offset) |_, uverr| { - let res = match uverr { - None => Ok(()), - Some(err) => Err(uv_error_to_io_error(err)) + do self.home_for_io |self_| { + let scheduler = Local::take::(); + do scheduler.deschedule_running_task_and_then |_, task| { + let buf = unsafe { slice_to_uv_buf(*buf_ptr) }; + let task_cell = Cell::new(task); + do self_.fd.write(self.loop_, buf, offset) |_, uverr| { + let res = match uverr { + None => Ok(()), + Some(err) => Err(uv_error_to_io_error(err)) + }; + unsafe { (*result_cell_ptr).put_back(res); } + let scheduler = Local::take::(); + scheduler.resume_blocked_task_immediately(task_cell.take()); }; - unsafe { (*result_cell_ptr).put_back(res); } - let scheduler = Local::take::(); - scheduler.resume_blocked_task_immediately(task_cell.take()); }; }; result_cell.take() @@ -1109,15 +1127,18 @@ impl UvFileStream { impl Drop for UvFileStream { fn drop(&self) { + let self_ = unsafe { transmute::<&UvFileStream, &mut UvFileStream>(self) }; if self.close_on_drop { - let scheduler = Local::take::(); - do scheduler.deschedule_running_task_and_then |_, task| { - let task_cell = Cell::new(task); - do self.fd.close(self.loop_) |_,_| { - let scheduler = Local::take::(); - scheduler.resume_blocked_task_immediately(task_cell.take()); + do self_.home_for_io |self_| { + let scheduler = Local::take::(); + do scheduler.deschedule_running_task_and_then |_, task| { + let task_cell = Cell::new(task); + do self_.fd.close(self.loop_) |_,_| { + let scheduler = Local::take::(); + scheduler.resume_blocked_task_immediately(task_cell.take()); + }; }; - }; + } } } }