1/*
2 * QEMU buffered QEMUFile
3 *
4 * Copyright IBM, Corp. 2008
5 *
6 * Authors:
7 *  Anthony Liguori   <aliguori@us.ibm.com>
8 *
9 * This work is licensed under the terms of the GNU GPL, version 2.  See
10 * the COPYING file in the top-level directory.
11 *
12 */
13
14#include "qemu-common.h"
15#include "hw/hw.h"
16#include "qemu/timer.h"
17#include "sysemu/char.h"
18#include "buffered_file.h"
19
20//#define DEBUG_BUFFERED_FILE
21
22typedef struct QEMUFileBuffered
23{
24    BufferedPutFunc *put_buffer;
25    BufferedPutReadyFunc *put_ready;
26    BufferedWaitForUnfreezeFunc *wait_for_unfreeze;
27    BufferedCloseFunc *close;
28    void *opaque;
29    QEMUFile *file;
30    int has_error;
31    int freeze_output;
32    size_t bytes_xfer;
33    size_t xfer_limit;
34    uint8_t *buffer;
35    size_t buffer_size;
36    size_t buffer_capacity;
37    QEMUTimer *timer;
38} QEMUFileBuffered;
39
40#ifdef DEBUG_BUFFERED_FILE
41#define DPRINTF(fmt, ...) \
42    do { printf("buffered-file: " fmt, ## __VA_ARGS__); } while (0)
43#else
44#define DPRINTF(fmt, ...) \
45    do { } while (0)
46#endif
47
48static void buffered_append(QEMUFileBuffered *s,
49                            const uint8_t *buf, size_t size)
50{
51    if (size > (s->buffer_capacity - s->buffer_size)) {
52        void *tmp;
53
54        DPRINTF("increasing buffer capacity from %zu by %zu\n",
55                s->buffer_capacity, size + 1024);
56
57        s->buffer_capacity += size + 1024;
58
59        tmp = g_realloc(s->buffer, s->buffer_capacity);
60        if (tmp == NULL) {
61            fprintf(stderr, "qemu file buffer expansion failed\n");
62            exit(1);
63        }
64
65        s->buffer = tmp;
66    }
67
68    memcpy(s->buffer + s->buffer_size, buf, size);
69    s->buffer_size += size;
70}
71
72static void buffered_flush(QEMUFileBuffered *s)
73{
74    size_t offset = 0;
75
76    if (s->has_error) {
77        DPRINTF("flush when error, bailing\n");
78        return;
79    }
80
81    DPRINTF("flushing %zu byte(s) of data\n", s->buffer_size);
82
83    while (offset < s->buffer_size) {
84        ssize_t ret;
85
86        ret = s->put_buffer(s->opaque, s->buffer + offset,
87                            s->buffer_size - offset);
88        if (ret == -EAGAIN) {
89            DPRINTF("backend not ready, freezing\n");
90            s->freeze_output = 1;
91            break;
92        }
93
94        if (ret <= 0) {
95            DPRINTF("error flushing data, %zd\n", ret);
96            s->has_error = 1;
97            break;
98        } else {
99            DPRINTF("flushed %zd byte(s)\n", ret);
100            offset += ret;
101        }
102    }
103
104    DPRINTF("flushed %zu of %zu byte(s)\n", offset, s->buffer_size);
105    memmove(s->buffer, s->buffer + offset, s->buffer_size - offset);
106    s->buffer_size -= offset;
107}
108
109static int buffered_put_buffer(void *opaque, const uint8_t *buf, int64_t pos, int size)
110{
111    QEMUFileBuffered *s = opaque;
112    int offset = 0;
113    ssize_t ret;
114
115    DPRINTF("putting %d bytes at %" PRId64 "\n", size, pos);
116
117    if (s->has_error) {
118        DPRINTF("flush when error, bailing\n");
119        return -EINVAL;
120    }
121
122    DPRINTF("unfreezing output\n");
123    s->freeze_output = 0;
124
125    buffered_flush(s);
126
127    while (!s->freeze_output && offset < size) {
128        if (s->bytes_xfer > s->xfer_limit) {
129            DPRINTF("transfer limit exceeded when putting\n");
130            break;
131        }
132
133        ret = s->put_buffer(s->opaque, buf + offset, size - offset);
134        if (ret == -EAGAIN) {
135            DPRINTF("backend not ready, freezing\n");
136            s->freeze_output = 1;
137            break;
138        }
139
140        if (ret <= 0) {
141            DPRINTF("error putting\n");
142            s->has_error = 1;
143            offset = -EINVAL;
144            break;
145        }
146
147        DPRINTF("put %zd byte(s)\n", ret);
148        offset += ret;
149        s->bytes_xfer += ret;
150    }
151
152    if (offset >= 0) {
153        DPRINTF("buffering %d bytes\n", size - offset);
154        buffered_append(s, buf + offset, size - offset);
155        offset = size;
156    }
157
158    if (pos == 0 && size == 0) {
159        DPRINTF("file is ready\n");
160        if (s->bytes_xfer <= s->xfer_limit) {
161            DPRINTF("notifying client\n");
162            s->put_ready(s->opaque);
163        }
164    }
165
166    return offset;
167}
168
169static int buffered_close(void *opaque)
170{
171    QEMUFileBuffered *s = opaque;
172    int ret;
173
174    DPRINTF("closing\n");
175
176    while (!s->has_error && s->buffer_size) {
177        buffered_flush(s);
178        if (s->freeze_output)
179            s->wait_for_unfreeze(s);
180    }
181
182    ret = s->close(s->opaque);
183
184    timer_del(s->timer);
185    timer_free(s->timer);
186    g_free(s->buffer);
187    g_free(s);
188
189    return ret;
190}
191
192static int buffered_rate_limit(void *opaque)
193{
194    QEMUFileBuffered *s = opaque;
195
196    if (s->has_error)
197        return 0;
198
199    if (s->freeze_output)
200        return 1;
201
202    if (s->bytes_xfer > s->xfer_limit)
203        return 1;
204
205    return 0;
206}
207
208static int64_t buffered_set_rate_limit(void *opaque, int64_t new_rate)
209{
210    QEMUFileBuffered *s = opaque;
211    if (s->has_error)
212        goto out;
213
214    if (new_rate > SIZE_MAX) {
215        new_rate = SIZE_MAX;
216    }
217
218    s->xfer_limit = new_rate / 10;
219
220out:
221    return s->xfer_limit;
222}
223
224static int64_t buffered_get_rate_limit(void *opaque)
225{
226    QEMUFileBuffered *s = opaque;
227
228    return s->xfer_limit;
229}
230
231static void buffered_rate_tick(void *opaque)
232{
233    QEMUFileBuffered *s = opaque;
234
235    if (s->has_error) {
236        buffered_close(s);
237        return;
238    }
239
240    timer_mod(s->timer, qemu_clock_get_ms(QEMU_CLOCK_REALTIME) + 100);
241
242    if (s->freeze_output)
243        return;
244
245    s->bytes_xfer = 0;
246
247    buffered_flush(s);
248
249    /* Add some checks around this */
250    s->put_ready(s->opaque);
251}
252
253static const QEMUFileOps buffered_file_ops = {
254    .put_buffer = buffered_put_buffer,
255    .close = buffered_close,
256    .rate_limit = buffered_rate_limit,
257    .set_rate_limit = buffered_set_rate_limit,
258    .get_rate_limit = buffered_get_rate_limit,
259};
260
261QEMUFile *qemu_fopen_ops_buffered(void *opaque,
262                                  size_t bytes_per_sec,
263                                  BufferedPutFunc *put_buffer,
264                                  BufferedPutReadyFunc *put_ready,
265                                  BufferedWaitForUnfreezeFunc *wait_for_unfreeze,
266                                  BufferedCloseFunc *close)
267{
268    QEMUFileBuffered *s;
269
270    s = g_malloc0(sizeof(*s));
271
272    s->opaque = opaque;
273    s->xfer_limit = bytes_per_sec / 10;
274    s->put_buffer = put_buffer;
275    s->put_ready = put_ready;
276    s->wait_for_unfreeze = wait_for_unfreeze;
277    s->close = close;
278
279    s->file = qemu_fopen_ops(s, &buffered_file_ops);
280    s->timer = timer_new(QEMU_CLOCK_REALTIME, SCALE_MS, buffered_rate_tick, s);
281
282    timer_mod(s->timer, qemu_clock_get_ms(QEMU_CLOCK_REALTIME) + 100);
283
284    return s->file;
285}
286