summaryrefslogtreecommitdiff
path: root/rsync/tube.c
authorkergoth <kergoth>2002-01-25 22:14:26 (UTC)
committer kergoth <kergoth>2002-01-25 22:14:26 (UTC)
commit15318cad33835e4e2dc620d033e43cd930676cdd (patch) (unidiff)
treec2fa0399a2c47fda8e2cd0092c73a809d17f68eb /rsync/tube.c
downloadopie-15318cad33835e4e2dc620d033e43cd930676cdd.zip
opie-15318cad33835e4e2dc620d033e43cd930676cdd.tar.gz
opie-15318cad33835e4e2dc620d033e43cd930676cdd.tar.bz2
Initial revision
Diffstat (limited to 'rsync/tube.c') (more/less context) (ignore whitespace changes)
-rw-r--r--rsync/tube.c264
1 files changed, 264 insertions, 0 deletions
diff --git a/rsync/tube.c b/rsync/tube.c
new file mode 100644
index 0000000..0b82adc
--- a/dev/null
+++ b/rsync/tube.c
@@ -0,0 +1,264 @@
1/*= -*- c-basic-offset: 4; indent-tabs-mode: nil; -*-
2 *
3 * librsync -- dynamic caching and delta update in HTTP
4 * $Id$
5 *
6 * Copyright (C) 2000, 2001 by Martin Pool <mbp@samba.org>
7 *
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU Lesser General Public License as published by
10 * the Free Software Foundation; either version 2.1 of the License, or
11 * (at your option) any later version.
12 *
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU Lesser General Public License for more details.
17 *
18 * You should have received a copy of the GNU Lesser General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21 */
22
23 /*
24 | Where a calculator on the ENIAC is
25 | equpped with 18,000 vaccuum tubes and
26 | weighs 30 tons, computers in the
27 | future may have only 1,000 vaccuum
28 | tubes and perhaps weigh 1 1/2
29 | tons.
30 | -- Popular Mechanics, March 1949
31 */
32
33
34/* tube: a somewhat elastic but fairly small buffer for data passing
35 * through a stream.
36 *
37 * In most cases the iter can adjust to send just as much data will
38 * fit. In some cases that would be too complicated, because it has
39 * to transmit an integer or something similar. So in that case we
40 * stick whatever won't fit into a small buffer.
41 *
42 * A tube can contain some literal data to go out (typically command
43 * bytes), and also an instruction to copy data from the stream's
44 * input or from some other location. Both literal data and a copy
45 * command can be queued at the same time, but only in that order and
46 * at most one of each. */
47
48
49/*
50 * TODO: As an optimization, write it directly to the stream if
51 * possible. But for simplicity don't do that yet.
52 *
53 * TODO: I think our current copy code will lock up if the application
54 * only ever calls us with either input or output buffers, and not
55 * both. So I guess in that case we might need to copy into some
56 * temporary buffer space, and then back out again later.
57 */
58
59
60#include <config_rsync.h>
61
62#include <assert.h>
63#include <stdlib.h>
64#include <string.h>
65#include <stdio.h>
66
67#include "rsync.h"
68#include "trace.h"
69#include "util.h"
70#include "job.h"
71#include "stream.h"
72
73
74static void rs_tube_catchup_write(rs_job_t *job)
75{
76 rs_buffers_t *stream = job->stream;
77 int len, remain;
78
79 len = job->write_len;
80 assert(len > 0);
81
82 assert(len > 0);
83 if ((size_t) len > stream->avail_out)
84 len = stream->avail_out;
85
86 if (!stream->avail_out) {
87 rs_trace("no output space available");
88 return;
89 }
90
91 memcpy(stream->next_out, job->write_buf, len);
92 stream->next_out += len;
93 stream->avail_out -= len;
94
95 remain = job->write_len - len;
96 rs_trace("transmitted %d write bytes from tube, "
97 "%d remain to be sent",
98 len, remain);
99
100 if (remain > 0) {
101 /* Still something left in the tube... */
102 memmove(job->write_buf, job->write_buf + len, remain);
103 } else {
104 assert(remain == 0);
105 }
106
107 job->write_len = remain;
108}
109
110
111/**
112 * Execute a copy command, taking data from the scoop.
113 *
114 * \sa rs_tube_catchup_copy()
115 */
116static void
117rs_tube_copy_from_scoop(rs_job_t *job)
118{
119 size_t this_len;
120 rs_buffers_t *stream = job->stream;
121
122 this_len = job->copy_len;
123 if (this_len > job->scoop_avail) {
124 this_len = job->scoop_avail;
125 }
126 if (this_len > stream->avail_out) {
127 this_len = stream->avail_out;
128 }
129
130 memcpy(stream->next_out, job->scoop_next, this_len);
131
132 stream->next_out += this_len;
133 stream->avail_out -= this_len;
134
135 job->scoop_avail -= this_len;
136 job->scoop_next += this_len;
137
138 job->copy_len -= this_len;
139
140 rs_trace("caught up on %ld copied bytes from scoop, %ld remain there, "
141 "%ld remain to be copied",
142 (long) this_len, (long) job->scoop_avail, (long) job->copy_len);
143}
144
145
146
147/**
148 * Catch up on an outstanding copy command.
149 *
150 * Takes data from the scoop, and the input (in that order), and
151 * writes as much as will fit to the output, up to the limit of the
152 * outstanding copy.
153 */
154static void rs_tube_catchup_copy(rs_job_t *job)
155{
156 rs_buffers_t *stream = job->stream;
157
158 assert(job->write_len == 0);
159 assert(job->copy_len > 0);
160
161 if (job->scoop_avail && job->copy_len) {
162 /* there's still some data in the scoop, so we should use that. */
163 rs_tube_copy_from_scoop(job);
164 }
165
166 if (job->copy_len) {
167 size_t this_copy;
168
169 this_copy = rs_buffers_copy(stream, job->copy_len);
170
171 job->copy_len -= this_copy;
172
173 rs_trace("copied %.0f bytes from input buffer, %.0f remain to be copied",
174 (double) this_copy, (double) job->copy_len);
175 }
176}
177
178
179/*
180 * Put whatever will fit from the tube into the output of the stream.
181 * Return RS_DONE if the tube is now empty and ready to accept another
182 * command, RS_BLOCKED if there is still stuff waiting to go out.
183 */
184int rs_tube_catchup(rs_job_t *job)
185{
186 if (job->write_len)
187 rs_tube_catchup_write(job);
188
189 if (job->write_len) {
190 /* there is still write data queued, so we can't send
191 * anything else. */
192 return RS_BLOCKED;
193 }
194
195 if (job->copy_len)
196 rs_tube_catchup_copy(job);
197
198 if (job->copy_len) {
199 if (job->stream->eof_in && !job->stream->avail_in && !job->scoop_avail) {
200 rs_log(RS_LOG_ERR,
201 "reached end of file while copying literal data through buffers");
202 return RS_INPUT_ENDED;
203 }
204
205 return RS_BLOCKED;
206 }
207
208 return RS_DONE;
209}
210
211
212/* Check whether there is data in the tube waiting to go out. So if true
213 * this basically means that the previous command has finished doing all its
214 * output. */
215int rs_tube_is_idle(rs_job_t const *job)
216{
217 return job->write_len == 0 && job->copy_len == 0;
218}
219
220
221/**
222 * Queue up a request to copy through \p len bytes from the input to
223 * the output of the stream.
224 *
225 * The data is copied from the scoop (if there is anything there) or
226 * from the input, on the next call to rs_tube_write().
227 *
228 * We can only accept this request if there is no copy command already
229 * pending.
230 */
231/* TODO: Try to do the copy immediately, and return a result. Then,
232 * people can try to continue if possible. Is this really required?
233 * Callers can just go out and back in again after flushing the
234 * tube. */
235void rs_tube_copy(rs_job_t *job, int len)
236{
237 assert(job->copy_len == 0);
238
239 job->copy_len = len;
240}
241
242
243
244/*
245 * Push some data into the tube for storage. The tube's never
246 * supposed to get very big, so this will just pop loudly if you do
247 * that.
248 *
249 * We can't accept write data if there's already a copy command in the
250 * tube, because the write data comes out first.
251 */
252void
253rs_tube_write(rs_job_t *job, const void *buf, size_t len)
254{
255 assert(job->copy_len == 0);
256
257 if (len > sizeof(job->write_buf) - job->write_len) {
258 rs_fatal("tube popped when trying to write %ld bytes!",
259 (long) len);
260 }
261
262 memcpy(job->write_buf + job->write_len, buf, len);
263 job->write_len += len;
264}