summaryrefslogtreecommitdiff
path: root/rsync/scoop.c
blob: 9f68a6054a7ec9c7d05387ddb010e25e224fbec1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
/*=                    -*- c-basic-offset: 4; indent-tabs-mode: nil; -*-
 *
 * librsync -- the library for network deltas
 * $Id$
 * 
 * Copyright (C) 2000, 2001 by Martin Pool <mbp@samba.org>
 * 
 * This program is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public License
 * as published by the Free Software Foundation; either version 2.1 of
 * the License, or (at your option) any later version.
 * 
 * This program is distributed in the hope that it will be useful, but
 * WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Lesser General Public License for more details.
 * 
 * You should have received a copy of the GNU Lesser General Public
 * License along with this program; if not, write to the Free Software
 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
 */

/*
 * scoop.c -- This file deals with readahead from caller-supplied
 * buffers.
 *
 * Many functions require a certain minimum amount of input to do their
 * processing.  For example, to calculate a strong checksum of a block
 * we need at least a block of input.
 *
 * Since we put the buffers completely under the control of the caller,
 * we can't count on ever getting this much data all in one go.  We
 * can't simply wait, because the caller might have a smaller buffer
 * than we require and so we'll never get it.  For the same reason we
 * must always accept all the data we're given.
 *
 * So, stream input data that's required for readahead is put into a
 * special buffer, from which the caller can then read.  It's
 * essentially like an internal pipe, which on any given read request
 * may or may not be able to actually supply the data.
 *
 * As a future optimization, we might try to take data directly from the
 * input buffer if there's already enough there.
 */

/*
 * TODO: We probably know a maximum amount of data that can be scooped
 * up, so we could just avoid dynamic allocation.  However that can't
 * be fixed at compile time, because when generating a delta it needs
 * to be large enough to hold one full block.  Perhaps we can set it
 * up when the job is allocated?  It would be kind of nice to not do
 * any memory allocation after startup, as bzlib does this.
 */


                              /*
                               | To walk on water you've gotta sink 
                               | in the ice.
                               |   -- Shihad, `The General Electric'.
                               */ 

#include <config_rsync.h>

#include <assert.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>

#include "rsync.h"
#include "job.h"
#include "stream.h"
#include "trace.h"
#include "util.h"


#if 0
#  undef rs_trace
#  define rs_trace(s...)
#endif


/**
 * Try to accept a from the input buffer to get LEN bytes in the scoop.
 */
void rs_scoop_input(rs_job_t *job, size_t len)
{
    rs_buffers_t *stream = job->stream;
    size_t tocopy;

    assert(len > job->scoop_avail);

    if (job->scoop_alloc < len) {
        /* need to allocate a new buffer, too */
        char *newbuf;
        int newsize = 2 * len;
        newbuf = rs_alloc(newsize, "scoop buffer");
        if (job->scoop_avail)
            memcpy(newbuf, job->scoop_next, job->scoop_avail);
        if (job->scoop_buf)
            free(job->scoop_buf);
        job->scoop_buf = job->scoop_next = newbuf;
        rs_trace("resized scoop buffer to %.0f bytes from %.0f",
                 (double) newsize, (double) job->scoop_alloc);
        job->scoop_alloc = newsize;
    } else {
        /* this buffer size is fine, but move the existing
         * data down to the front. */
        memmove(job->scoop_buf, job->scoop_next, job->scoop_avail);
        job->scoop_next = job->scoop_buf;
    }

    /* take as much input as is available, to give up to LEN bytes
     * in the scoop. */
    tocopy = len - job->scoop_avail;
    if (tocopy > stream->avail_in)
        tocopy = stream->avail_in;
    assert(tocopy + job->scoop_avail <= job->scoop_alloc);

    memcpy(job->scoop_next + job->scoop_avail, stream->next_in, tocopy);
    rs_trace("accepted %.0f bytes from input to scoop", (double) tocopy);
    job->scoop_avail += tocopy;
    stream->next_in += tocopy;
    stream->avail_in -= tocopy;
}


/**
 * Advance the input cursor forward \p len bytes.  This is used after
 * doing readahead, when you decide you want to keep it.  \p len must
 * be no more than the amount of available data, so you can't cheat.
 *
 * So when creating a delta, we require one block of readahead.  But
 * after examining that block, we might decide to advance over all of
 * it (if there is a match), or just one byte (if not).
 */
void rs_scoop_advance(rs_job_t *job, size_t len)
{
    rs_buffers_t *stream = job->stream;

    /* It never makes sense to advance over a mixture of bytes from
     * the scoop and input, because you couldn't possibly have looked
     * at them all at the same time. */
    if (job->scoop_avail) {
        /* reading from the scoop buffer */
         rs_trace("advance over %d bytes from scoop", len); 
        assert(len <= job->scoop_avail);
        job->scoop_avail -= len;
        job->scoop_next += len;
    } else {
         rs_trace("advance over %d bytes from input buffer", len); 
        assert(len <= stream->avail_in);
        stream->avail_in -= len;
        stream->next_in += len;
    }
}



/**
 * \brief Read from scoop without advancing.
 *
 * Ask for LEN bytes of input from the stream.  If that much data is
 * available, then return a pointer to it in PTR, advance the stream
 * input pointer over the data, and return RS_DONE.  If there's not
 * enough data, then accept whatever is there into a buffer, advance
 * over it, and return RS_BLOCKED.
 *
 * The data is not actually removed from the input, so this function
 * lets you do readahead.  If you want to keep any of the data, you
 * should also call rs_scoop_advance() to skip over it.
 */
rs_result rs_scoop_readahead(rs_job_t *job, size_t len, void **ptr)
{
    rs_buffers_t *stream = job->stream;
    rs_job_check(job);
    
    if (job->scoop_avail >= len) {
        /* We have enough data queued to satisfy the request,
         * so go straight from the scoop buffer. */
        rs_trace("got %.0f bytes direct from scoop", (double) len);
        *ptr = job->scoop_next;
        return RS_DONE;
    } else if (job->scoop_avail) {
        /* We have some data in the scoop, but not enough to
         * satisfy the request. */
        rs_trace("data is present in the scoop and must be used");
        rs_scoop_input(job, len);

        if (job->scoop_avail < len) {
            rs_trace("still have only %.0f bytes in scoop",
                     (double) job->scoop_avail);
            return RS_BLOCKED;
        } else {
            rs_trace("scoop now has %.0f bytes, this is enough",
                     (double) job->scoop_avail);
            *ptr = job->scoop_next;
            return RS_DONE;
        }
    } else if (stream->avail_in >= len) {
        /* There's enough data in the stream's input */
        *ptr = stream->next_in;
        rs_trace("got %.0f bytes from input buffer", (double) len);
        return RS_DONE;
    } else if (stream->avail_in > 0) {
        /* Nothing was queued before, but we don't have enough
         * data to satisfy the request.  So queue what little
         * we have, and try again next time. */
        rs_trace("couldn't satisfy request for %.0f, scooping %.0f bytes",
                 (double) len, (double) job->scoop_avail);
        rs_scoop_input(job, len);
        return RS_BLOCKED;
    } else if (stream->eof_in) {
        /* Nothing is queued before, and nothing is in the input
         * buffer at the moment. */
        rs_trace("reached end of input stream");
        return RS_INPUT_ENDED;
    } else {
        /* Nothing queued at the moment. */
        rs_trace("blocked with no data in scoop or input buffer");
        return RS_BLOCKED;
    }
}



/**
 * Read LEN bytes if possible, and remove them from the input scoop.
 * If there's not enough data yet, return RS_BLOCKED.
 *
 * \param ptr will be updated to point to a read-only buffer holding
 * the data, if enough is available.
 *
 * \return RS_DONE if all the data was available, RS_BLOCKED if it's
 * not there.
 */
rs_result rs_scoop_read(rs_job_t *job, size_t len, void **ptr)
{
    rs_result result;

    result = rs_scoop_readahead(job, len, ptr);
    if (result == RS_DONE)
        rs_scoop_advance(job, len);

    return result;
}



/*
 * Read whatever remains in the input stream, assuming that it runs up
 * to the end of the file.  Set LEN appropriately.
 */
rs_result rs_scoop_read_rest(rs_job_t *job, size_t *len, void **ptr)
{
    rs_buffers_t *stream = job->stream;
    
    *len = job->scoop_avail + stream->avail_in;

    return rs_scoop_read(job, *len, ptr);
}



/**
 * Return the total number of bytes available including the scoop and input
 * buffer.
 */
size_t rs_scoop_total_avail(rs_job_t *job)
{
    return job->scoop_avail + job->stream->avail_in;
}