-rw-r--r-- | rsync/tube.c | 264 |
1 files changed, 264 insertions, 0 deletions
diff --git a/rsync/tube.c b/rsync/tube.c new file mode 100644 index 0000000..0b82adc --- a/dev/null +++ b/rsync/tube.c | |||
@@ -0,0 +1,264 @@ | |||
1 | /*= -*- c-basic-offset: 4; indent-tabs-mode: nil; -*- | ||
2 | * | ||
3 | * librsync -- dynamic caching and delta update in HTTP | ||
4 | * $Id$ | ||
5 | * | ||
6 | * Copyright (C) 2000, 2001 by Martin Pool <mbp@samba.org> | ||
7 | * | ||
8 | * This program is free software; you can redistribute it and/or modify | ||
9 | * it under the terms of the GNU Lesser General Public License as published by | ||
10 | * the Free Software Foundation; either version 2.1 of the License, or | ||
11 | * (at your option) any later version. | ||
12 | * | ||
13 | * This program is distributed in the hope that it will be useful, | ||
14 | * but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
15 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
16 | * GNU Lesser General Public License for more details. | ||
17 | * | ||
18 | * You should have received a copy of the GNU Lesser General Public License | ||
19 | * along with this program; if not, write to the Free Software | ||
20 | * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. | ||
21 | */ | ||
22 | |||
23 | /* | ||
24 | | Where a calculator on the ENIAC is | ||
25 | | equpped with 18,000 vaccuum tubes and | ||
26 | | weighs 30 tons, computers in the | ||
27 | | future may have only 1,000 vaccuum | ||
28 | | tubes and perhaps weigh 1 1/2 | ||
29 | | tons. | ||
30 | | -- Popular Mechanics, March 1949 | ||
31 | */ | ||
32 | |||
33 | |||
34 | /* tube: a somewhat elastic but fairly small buffer for data passing | ||
35 | * through a stream. | ||
36 | * | ||
37 | * In most cases the iter can adjust to send just as much data will | ||
38 | * fit. In some cases that would be too complicated, because it has | ||
39 | * to transmit an integer or something similar. So in that case we | ||
40 | * stick whatever won't fit into a small buffer. | ||
41 | * | ||
42 | * A tube can contain some literal data to go out (typically command | ||
43 | * bytes), and also an instruction to copy data from the stream's | ||
44 | * input or from some other location. Both literal data and a copy | ||
45 | * command can be queued at the same time, but only in that order and | ||
46 | * at most one of each. */ | ||
47 | |||
48 | |||
49 | /* | ||
50 | * TODO: As an optimization, write it directly to the stream if | ||
51 | * possible. But for simplicity don't do that yet. | ||
52 | * | ||
53 | * TODO: I think our current copy code will lock up if the application | ||
54 | * only ever calls us with either input or output buffers, and not | ||
55 | * both. So I guess in that case we might need to copy into some | ||
56 | * temporary buffer space, and then back out again later. | ||
57 | */ | ||
58 | |||
59 | |||
60 | #include <config_rsync.h> | ||
61 | |||
62 | #include <assert.h> | ||
63 | #include <stdlib.h> | ||
64 | #include <string.h> | ||
65 | #include <stdio.h> | ||
66 | |||
67 | #include "rsync.h" | ||
68 | #include "trace.h" | ||
69 | #include "util.h" | ||
70 | #include "job.h" | ||
71 | #include "stream.h" | ||
72 | |||
73 | |||
74 | static void rs_tube_catchup_write(rs_job_t *job) | ||
75 | { | ||
76 | rs_buffers_t *stream = job->stream; | ||
77 | int len, remain; | ||
78 | |||
79 | len = job->write_len; | ||
80 | assert(len > 0); | ||
81 | |||
82 | assert(len > 0); | ||
83 | if ((size_t) len > stream->avail_out) | ||
84 | len = stream->avail_out; | ||
85 | |||
86 | if (!stream->avail_out) { | ||
87 | rs_trace("no output space available"); | ||
88 | return; | ||
89 | } | ||
90 | |||
91 | memcpy(stream->next_out, job->write_buf, len); | ||
92 | stream->next_out += len; | ||
93 | stream->avail_out -= len; | ||
94 | |||
95 | remain = job->write_len - len; | ||
96 | rs_trace("transmitted %d write bytes from tube, " | ||
97 | "%d remain to be sent", | ||
98 | len, remain); | ||
99 | |||
100 | if (remain > 0) { | ||
101 | /* Still something left in the tube... */ | ||
102 | memmove(job->write_buf, job->write_buf + len, remain); | ||
103 | } else { | ||
104 | assert(remain == 0); | ||
105 | } | ||
106 | |||
107 | job->write_len = remain; | ||
108 | } | ||
109 | |||
110 | |||
111 | /** | ||
112 | * Execute a copy command, taking data from the scoop. | ||
113 | * | ||
114 | * \sa rs_tube_catchup_copy() | ||
115 | */ | ||
116 | static void | ||
117 | rs_tube_copy_from_scoop(rs_job_t *job) | ||
118 | { | ||
119 | size_t this_len; | ||
120 | rs_buffers_t *stream = job->stream; | ||
121 | |||
122 | this_len = job->copy_len; | ||
123 | if (this_len > job->scoop_avail) { | ||
124 | this_len = job->scoop_avail; | ||
125 | } | ||
126 | if (this_len > stream->avail_out) { | ||
127 | this_len = stream->avail_out; | ||
128 | } | ||
129 | |||
130 | memcpy(stream->next_out, job->scoop_next, this_len); | ||
131 | |||
132 | stream->next_out += this_len; | ||
133 | stream->avail_out -= this_len; | ||
134 | |||
135 | job->scoop_avail -= this_len; | ||
136 | job->scoop_next += this_len; | ||
137 | |||
138 | job->copy_len -= this_len; | ||
139 | |||
140 | rs_trace("caught up on %ld copied bytes from scoop, %ld remain there, " | ||
141 | "%ld remain to be copied", | ||
142 | (long) this_len, (long) job->scoop_avail, (long) job->copy_len); | ||
143 | } | ||
144 | |||
145 | |||
146 | |||
147 | /** | ||
148 | * Catch up on an outstanding copy command. | ||
149 | * | ||
150 | * Takes data from the scoop, and the input (in that order), and | ||
151 | * writes as much as will fit to the output, up to the limit of the | ||
152 | * outstanding copy. | ||
153 | */ | ||
154 | static void rs_tube_catchup_copy(rs_job_t *job) | ||
155 | { | ||
156 | rs_buffers_t *stream = job->stream; | ||
157 | |||
158 | assert(job->write_len == 0); | ||
159 | assert(job->copy_len > 0); | ||
160 | |||
161 | if (job->scoop_avail && job->copy_len) { | ||
162 | /* there's still some data in the scoop, so we should use that. */ | ||
163 | rs_tube_copy_from_scoop(job); | ||
164 | } | ||
165 | |||
166 | if (job->copy_len) { | ||
167 | size_t this_copy; | ||
168 | |||
169 | this_copy = rs_buffers_copy(stream, job->copy_len); | ||
170 | |||
171 | job->copy_len -= this_copy; | ||
172 | |||
173 | rs_trace("copied %.0f bytes from input buffer, %.0f remain to be copied", | ||
174 | (double) this_copy, (double) job->copy_len); | ||
175 | } | ||
176 | } | ||
177 | |||
178 | |||
179 | /* | ||
180 | * Put whatever will fit from the tube into the output of the stream. | ||
181 | * Return RS_DONE if the tube is now empty and ready to accept another | ||
182 | * command, RS_BLOCKED if there is still stuff waiting to go out. | ||
183 | */ | ||
184 | int rs_tube_catchup(rs_job_t *job) | ||
185 | { | ||
186 | if (job->write_len) | ||
187 | rs_tube_catchup_write(job); | ||
188 | |||
189 | if (job->write_len) { | ||
190 | /* there is still write data queued, so we can't send | ||
191 | * anything else. */ | ||
192 | return RS_BLOCKED; | ||
193 | } | ||
194 | |||
195 | if (job->copy_len) | ||
196 | rs_tube_catchup_copy(job); | ||
197 | |||
198 | if (job->copy_len) { | ||
199 | if (job->stream->eof_in && !job->stream->avail_in && !job->scoop_avail) { | ||
200 | rs_log(RS_LOG_ERR, | ||
201 | "reached end of file while copying literal data through buffers"); | ||
202 | return RS_INPUT_ENDED; | ||
203 | } | ||
204 | |||
205 | return RS_BLOCKED; | ||
206 | } | ||
207 | |||
208 | return RS_DONE; | ||
209 | } | ||
210 | |||
211 | |||
212 | /* Check whether there is data in the tube waiting to go out. So if true | ||
213 | * this basically means that the previous command has finished doing all its | ||
214 | * output. */ | ||
215 | int rs_tube_is_idle(rs_job_t const *job) | ||
216 | { | ||
217 | return job->write_len == 0 && job->copy_len == 0; | ||
218 | } | ||
219 | |||
220 | |||
221 | /** | ||
222 | * Queue up a request to copy through \p len bytes from the input to | ||
223 | * the output of the stream. | ||
224 | * | ||
225 | * The data is copied from the scoop (if there is anything there) or | ||
226 | * from the input, on the next call to rs_tube_write(). | ||
227 | * | ||
228 | * We can only accept this request if there is no copy command already | ||
229 | * pending. | ||
230 | */ | ||
231 | /* TODO: Try to do the copy immediately, and return a result. Then, | ||
232 | * people can try to continue if possible. Is this really required? | ||
233 | * Callers can just go out and back in again after flushing the | ||
234 | * tube. */ | ||
235 | void rs_tube_copy(rs_job_t *job, int len) | ||
236 | { | ||
237 | assert(job->copy_len == 0); | ||
238 | |||
239 | job->copy_len = len; | ||
240 | } | ||
241 | |||
242 | |||
243 | |||
244 | /* | ||
245 | * Push some data into the tube for storage. The tube's never | ||
246 | * supposed to get very big, so this will just pop loudly if you do | ||
247 | * that. | ||
248 | * | ||
249 | * We can't accept write data if there's already a copy command in the | ||
250 | * tube, because the write data comes out first. | ||
251 | */ | ||
252 | void | ||
253 | rs_tube_write(rs_job_t *job, const void *buf, size_t len) | ||
254 | { | ||
255 | assert(job->copy_len == 0); | ||
256 | |||
257 | if (len > sizeof(job->write_buf) - job->write_len) { | ||
258 | rs_fatal("tube popped when trying to write %ld bytes!", | ||
259 | (long) len); | ||
260 | } | ||
261 | |||
262 | memcpy(job->write_buf + job->write_len, buf, len); | ||
263 | job->write_len += len; | ||
264 | } | ||