summaryrefslogtreecommitdiff
path: root/rsync/job.c
authorkergoth <kergoth>2002-01-25 22:14:26 (UTC)
committer kergoth <kergoth>2002-01-25 22:14:26 (UTC)
commit15318cad33835e4e2dc620d033e43cd930676cdd (patch) (unidiff)
treec2fa0399a2c47fda8e2cd0092c73a809d17f68eb /rsync/job.c
downloadopie-15318cad33835e4e2dc620d033e43cd930676cdd.zip
opie-15318cad33835e4e2dc620d033e43cd930676cdd.tar.gz
opie-15318cad33835e4e2dc620d033e43cd930676cdd.tar.bz2
Initial revision
Diffstat (limited to 'rsync/job.c') (more/less context) (ignore whitespace changes)
-rw-r--r--rsync/job.c251
1 files changed, 251 insertions, 0 deletions
diff --git a/rsync/job.c b/rsync/job.c
new file mode 100644
index 0000000..680982d
--- a/dev/null
+++ b/rsync/job.c
@@ -0,0 +1,251 @@
1/*= -*- c-basic-offset: 4; indent-tabs-mode: nil; -*-
2 *
3 * librsync -- the library for network deltas
4 * $Id$
5 *
6 * Copyright (C) 2000, 2001 by Martin Pool <mbp@samba.org>
7 *
8 * This program is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public License
10 * as published by the Free Software Foundation; either version 2.1 of
11 * the License, or (at your option) any later version.
12 *
13 * This program is distributed in the hope that it will be useful, but
14 * WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 * Lesser General Public License for more details.
17 *
18 * You should have received a copy of the GNU Lesser General Public
19 * License along with this program; if not, write to the Free Software
20 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21 */
22
23
24 /*
25 | The hard, lifeless I covered up the
26 | warm, pulsing It; protecting and
27 | sheltering.
28 */
29
30/*
31 * job.c -- Generic state-machine interface. The point of this is
32 * that we need to be able to suspend and resume processing at any
33 * point at which the buffers may block. We could do that using
34 * setjmp or similar tricks, but this is probably simpler.
35 *
36 * TODO: We have a few functions to do with reading a netint, stashing
37 * it somewhere, then moving into a different state. Is it worth
38 * writing generic functions fo r that, or would it be too confusing?
39 */
40
41
42#include <config_rsync.h>
43
44#include <stdlib.h>
45#include <assert.h>
46#include <stdio.h>
47
48#include "rsync.h"
49#include "stream.h"
50#include "util.h"
51#include "sumset.h"
52#include "job.h"
53#include "trace.h"
54
55
56static const int rs_job_tag = 20010225;
57
58static rs_result rs_job_work(rs_job_t *job, rs_buffers_t *buffers);
59
60
61rs_job_t * rs_job_new(char const *job_name, rs_result (*statefn)(rs_job_t *))
62{
63 rs_job_t *job;
64
65 job = rs_alloc_struct(rs_job_t);
66
67 job->job_name = job_name;
68 job->dogtag = rs_job_tag;
69 job->statefn = statefn;
70
71 job->stats.op = job_name;
72
73 rs_trace("start %s job", job_name);
74
75 return job;
76}
77
78
79void rs_job_check(rs_job_t *job)
80{
81 assert(job->dogtag == rs_job_tag);
82}
83
84
85rs_result rs_job_free(rs_job_t *job)
86{
87 rs_bzero(job, sizeof *job);
88 free(job);
89
90 return RS_DONE;
91}
92
93
94
95static rs_result rs_job_s_complete(rs_job_t *job)
96{
97 rs_fatal("should not be reached");
98 return RS_INTERNAL_ERROR;
99}
100
101
102static rs_result rs_job_complete(rs_job_t *job, rs_result result)
103{
104 rs_job_check(job);
105
106 job->statefn = rs_job_s_complete;
107 job->final_result = result;
108
109 if (result != RS_DONE) {
110 rs_error("%s job failed: %s", job->job_name, rs_strerror(result));
111 } else {
112 rs_trace("%s job complete", job->job_name);
113 }
114
115 if (result == RS_DONE && !rs_tube_is_idle(job))
116 /* Processing is finished, but there is still some data
117 * waiting to get into the output buffer. */
118 return RS_BLOCKED;
119 else
120 return result;
121}
122
123
124/**
125 * \brief Run a ::rs_job_t state machine until it blocks
126 * (::RS_BLOCKED), returns an error, or completes (::RS_COMPLETE).
127 *
128 * \return The ::rs_result that caused iteration to stop.
129 *
130 * \param ending True if there is no more data after what's in the
131 * input buffer. The final block checksum will run across whatever's
132 * in there, without trying to accumulate anything else.
133 */
134rs_result rs_job_iter(rs_job_t *job, rs_buffers_t *buffers)
135{
136 rs_result result;
137 rs_long_t orig_in, orig_out;
138
139 orig_in = buffers->avail_in;
140 orig_out = buffers->avail_out;
141
142 result = rs_job_work(job, buffers);
143
144 if (result == RS_BLOCKED || result == RS_DONE)
145 if ((orig_in == buffers->avail_in) && (orig_out == buffers->avail_out)
146 && orig_in && orig_out) {
147 rs_log(RS_LOG_ERR, "internal error: job made no progress "
148 "[orig_in=%.0f, orig_out=%.0f, final_in=%.0f, final_out=%.0f]",
149 (double) orig_in, (double) orig_out, (double) buffers->avail_in,
150 (double) buffers->avail_out);
151 return RS_INTERNAL_ERROR;
152 }
153
154 return result;
155}
156
157
158static rs_result
159rs_job_work(rs_job_t *job, rs_buffers_t *buffers)
160{
161 rs_result result;
162
163 rs_job_check(job);
164
165 if (!buffers) {
166 rs_error("NULL buffer passed to rs_job_iter");
167 return RS_PARAM_ERROR;
168 }
169 job->stream = buffers;
170
171 while (1) {
172 result = rs_tube_catchup(job);
173 if (result == RS_BLOCKED)
174 return result;
175 else if (result != RS_DONE)
176 return rs_job_complete(job, result);
177
178 if (job->statefn == rs_job_s_complete) {
179 if (rs_tube_is_idle(job))
180 return RS_DONE;
181 else
182 return RS_BLOCKED;
183 } else {
184 result = job->statefn(job);
185 if (result == RS_RUNNING)
186 continue;
187 else if (result == RS_BLOCKED)
188 return result;
189 else
190 return rs_job_complete(job, result);
191 }
192 }
193
194 /* TODO: Before returning, check that we actually made some
195 * progress. If not, and we're not returning an error, this is a
196 * bug. */
197}
198
199
200/**
201 * Return pointer to statistics accumulated about this job.
202 */
203const rs_stats_t *
204rs_job_statistics(rs_job_t *job)
205{
206 return &job->stats;
207}
208
209
210int
211rs_job_input_is_ending(rs_job_t *job)
212{
213 return job->stream->eof_in;
214}
215
216
217
218/**
219 * Actively process a job, by making callbacks to fill and empty the
220 * buffers until the job is done.
221 */
222rs_result
223rs_job_drive(rs_job_t *job, rs_buffers_t *buf,
224 rs_driven_cb in_cb, void *in_opaque,
225 rs_driven_cb out_cb, void *out_opaque)
226{
227 rs_result result, iores;
228
229 rs_bzero(buf, sizeof *buf);
230
231 do {
232 if (!buf->eof_in && in_cb) {
233 iores = in_cb(job, buf, in_opaque);
234 if (iores != RS_DONE)
235 return iores;
236 }
237
238 result = rs_job_iter(job, buf);
239 if (result != RS_DONE && result != RS_BLOCKED)
240 return result;
241
242 if (out_cb) {
243 iores = (out_cb)(job, buf, out_opaque);
244 if (iores != RS_DONE)
245 return iores;
246 }
247 } while (result != RS_DONE);
248
249 return result;
250}
251