1/*
2 * Copyright 2015 Google Inc.
3 *
4 * Use of this source code is governed by a BSD-style license that can be
5 * found in the LICENSE file.
6 */
7
8#include "SkRWBuffer.h"
9
10#include "SkAtomics.h"
11#include "SkMalloc.h"
12#include "SkMakeUnique.h"
13#include "SkStream.h"
14
15#include <atomic>
16
17// Force small chunks to be a page's worth
18static const size_t kMinAllocSize = 4096;
19
20struct SkBufferBlock {
21    SkBufferBlock*  fNext;      // updated by the writer
22    size_t          fUsed;      // updated by the writer
23    const size_t    fCapacity;
24
25    SkBufferBlock(size_t capacity) : fNext(nullptr), fUsed(0), fCapacity(capacity) {}
26
27    const void* startData() const { return this + 1; }
28
29    size_t avail() const { return fCapacity - fUsed; }
30    void* availData() { return (char*)this->startData() + fUsed; }
31
32    static SkBufferBlock* Alloc(size_t length) {
33        size_t capacity = LengthToCapacity(length);
34        void* buffer = sk_malloc_throw(sizeof(SkBufferBlock) + capacity);
35        return new (buffer) SkBufferBlock(capacity);
36    }
37
38    // Return number of bytes actually appended. Important that we always completely this block
39    // before spilling into the next, since the reader uses fCapacity to know how many it can read.
40    //
41    size_t append(const void* src, size_t length) {
42        this->validate();
43        size_t amount = SkTMin(this->avail(), length);
44        memcpy(this->availData(), src, amount);
45        fUsed += amount;
46        this->validate();
47        return amount;
48    }
49
50    // Do not call in the reader thread, since the writer may be updating fUsed.
51    // (The assertion is still true, but TSAN still may complain about its raciness.)
52    void validate() const {
53#ifdef SK_DEBUG
54        SkASSERT(fCapacity > 0);
55        SkASSERT(fUsed <= fCapacity);
56#endif
57    }
58
59private:
60    static size_t LengthToCapacity(size_t length) {
61        const size_t minSize = kMinAllocSize - sizeof(SkBufferBlock);
62        return SkTMax(length, minSize);
63    }
64};
65
66struct SkBufferHead {
67    mutable std::atomic<int32_t> fRefCnt;
68    SkBufferBlock   fBlock;
69
70    SkBufferHead(size_t capacity) : fRefCnt(1), fBlock(capacity) {}
71
72    static size_t LengthToCapacity(size_t length) {
73        const size_t minSize = kMinAllocSize - sizeof(SkBufferHead);
74        return SkTMax(length, minSize);
75    }
76
77    static SkBufferHead* Alloc(size_t length) {
78        size_t capacity = LengthToCapacity(length);
79        size_t size = sizeof(SkBufferHead) + capacity;
80        void* buffer = sk_malloc_throw(size);
81        return new (buffer) SkBufferHead(capacity);
82    }
83
84    void ref() const {
85        SkAssertResult(fRefCnt.fetch_add(+1, std::memory_order_relaxed));
86    }
87
88    void unref() const {
89        // A release here acts in place of all releases we "should" have been doing in ref().
90        int32_t oldRefCnt = fRefCnt.fetch_add(-1, std::memory_order_acq_rel);
91        SkASSERT(oldRefCnt);
92        if (1 == oldRefCnt) {
93            // Like unique(), the acquire is only needed on success.
94            SkBufferBlock* block = fBlock.fNext;
95            sk_free((void*)this);
96            while (block) {
97                SkBufferBlock* next = block->fNext;
98                sk_free(block);
99                block = next;
100            }
101        }
102    }
103
104    void validate(size_t minUsed, const SkBufferBlock* tail = nullptr) const {
105#ifdef SK_DEBUG
106        SkASSERT(fRefCnt.load(std::memory_order_relaxed) > 0);
107        size_t totalUsed = 0;
108        const SkBufferBlock* block = &fBlock;
109        const SkBufferBlock* lastBlock = block;
110        while (block) {
111            block->validate();
112            totalUsed += block->fUsed;
113            lastBlock = block;
114            block = block->fNext;
115        }
116        SkASSERT(minUsed <= totalUsed);
117        if (tail) {
118            SkASSERT(tail == lastBlock);
119        }
120#endif
121    }
122};
123
124///////////////////////////////////////////////////////////////////////////////////////////////////
125// The reader can only access block.fCapacity (which never changes), and cannot access
126// block.fUsed, which may be updated by the writer.
127//
128SkROBuffer::SkROBuffer(const SkBufferHead* head, size_t available, const SkBufferBlock* tail)
129    : fHead(head), fAvailable(available), fTail(tail)
130{
131    if (head) {
132        fHead->ref();
133        SkASSERT(available > 0);
134        head->validate(available, tail);
135    } else {
136        SkASSERT(0 == available);
137        SkASSERT(!tail);
138    }
139}
140
141SkROBuffer::~SkROBuffer() {
142    if (fHead) {
143        fHead->unref();
144    }
145}
146
147SkROBuffer::Iter::Iter(const SkROBuffer* buffer) {
148    this->reset(buffer);
149}
150
151SkROBuffer::Iter::Iter(const sk_sp<SkROBuffer>& buffer) {
152    this->reset(buffer.get());
153}
154
155void SkROBuffer::Iter::reset(const SkROBuffer* buffer) {
156    fBuffer = buffer;
157    if (buffer && buffer->fHead) {
158        fBlock = &buffer->fHead->fBlock;
159        fRemaining = buffer->fAvailable;
160    } else {
161        fBlock = nullptr;
162        fRemaining = 0;
163    }
164}
165
166const void* SkROBuffer::Iter::data() const {
167    return fRemaining ? fBlock->startData() : nullptr;
168}
169
170size_t SkROBuffer::Iter::size() const {
171    if (!fBlock) {
172        return 0;
173    }
174    return SkTMin(fBlock->fCapacity, fRemaining);
175}
176
177bool SkROBuffer::Iter::next() {
178    if (fRemaining) {
179        fRemaining -= this->size();
180        if (fBuffer->fTail == fBlock) {
181            // There are more blocks, but fBuffer does not know about them.
182            SkASSERT(0 == fRemaining);
183            fBlock = nullptr;
184        } else {
185            fBlock = fBlock->fNext;
186        }
187    }
188    return fRemaining != 0;
189}
190
191///////////////////////////////////////////////////////////////////////////////////////////////////
192
193SkRWBuffer::SkRWBuffer(size_t initialCapacity) : fHead(nullptr), fTail(nullptr), fTotalUsed(0) {
194    if (initialCapacity) {
195        fHead = SkBufferHead::Alloc(initialCapacity);
196        fTail = &fHead->fBlock;
197    }
198}
199
200SkRWBuffer::~SkRWBuffer() {
201    this->validate();
202    if (fHead) {
203        fHead->unref();
204    }
205}
206
207// It is important that we always completely fill the current block before spilling over to the
208// next, since our reader will be using fCapacity (min'd against its total available) to know how
209// many bytes to read from a given block.
210//
211void SkRWBuffer::append(const void* src, size_t length, size_t reserve) {
212    this->validate();
213    if (0 == length) {
214        return;
215    }
216
217    fTotalUsed += length;
218
219    if (nullptr == fHead) {
220        fHead = SkBufferHead::Alloc(length + reserve);
221        fTail = &fHead->fBlock;
222    }
223
224    size_t written = fTail->append(src, length);
225    SkASSERT(written <= length);
226    src = (const char*)src + written;
227    length -= written;
228
229    if (length) {
230        SkBufferBlock* block = SkBufferBlock::Alloc(length + reserve);
231        fTail->fNext = block;
232        fTail = block;
233        written = fTail->append(src, length);
234        SkASSERT(written == length);
235    }
236    this->validate();
237}
238
239#ifdef SK_DEBUG
240void SkRWBuffer::validate() const {
241    if (fHead) {
242        fHead->validate(fTotalUsed, fTail);
243    } else {
244        SkASSERT(nullptr == fTail);
245        SkASSERT(0 == fTotalUsed);
246    }
247}
248#endif
249
250///////////////////////////////////////////////////////////////////////////////////////////////////
251
252class SkROBufferStreamAsset : public SkStreamAsset {
253    void validate() const {
254#ifdef SK_DEBUG
255        SkASSERT(fGlobalOffset <= fBuffer->size());
256        SkASSERT(fLocalOffset <= fIter.size());
257        SkASSERT(fLocalOffset <= fGlobalOffset);
258#endif
259    }
260
261#ifdef SK_DEBUG
262    class AutoValidate {
263        SkROBufferStreamAsset* fStream;
264    public:
265        AutoValidate(SkROBufferStreamAsset* stream) : fStream(stream) { stream->validate(); }
266        ~AutoValidate() { fStream->validate(); }
267    };
268    #define AUTO_VALIDATE   AutoValidate av(this);
269#else
270    #define AUTO_VALIDATE
271#endif
272
273public:
274    SkROBufferStreamAsset(sk_sp<SkROBuffer> buffer) : fBuffer(std::move(buffer)), fIter(fBuffer) {
275        fGlobalOffset = fLocalOffset = 0;
276    }
277
278    size_t getLength() const override { return fBuffer->size(); }
279
280    bool rewind() override {
281        AUTO_VALIDATE
282        fIter.reset(fBuffer.get());
283        fGlobalOffset = fLocalOffset = 0;
284        return true;
285    }
286
287    size_t read(void* dst, size_t request) override {
288        AUTO_VALIDATE
289        size_t bytesRead = 0;
290        for (;;) {
291            size_t size = fIter.size();
292            SkASSERT(fLocalOffset <= size);
293            size_t avail = SkTMin(size - fLocalOffset, request - bytesRead);
294            if (dst) {
295                memcpy(dst, (const char*)fIter.data() + fLocalOffset, avail);
296                dst = (char*)dst + avail;
297            }
298            bytesRead += avail;
299            fLocalOffset += avail;
300            SkASSERT(bytesRead <= request);
301            if (bytesRead == request) {
302                break;
303            }
304            // If we get here, we've exhausted the current iter
305            SkASSERT(fLocalOffset == size);
306            fLocalOffset = 0;
307            if (!fIter.next()) {
308                break;   // ran out of data
309            }
310        }
311        fGlobalOffset += bytesRead;
312        SkASSERT(fGlobalOffset <= fBuffer->size());
313        return bytesRead;
314    }
315
316    bool isAtEnd() const override {
317        return fBuffer->size() == fGlobalOffset;
318    }
319
320    size_t getPosition() const override {
321        return fGlobalOffset;
322    }
323
324    bool seek(size_t position) override {
325        AUTO_VALIDATE
326        if (position < fGlobalOffset) {
327            this->rewind();
328        }
329        (void)this->skip(position - fGlobalOffset);
330        return true;
331    }
332
333    bool move(long offset)  override{
334        AUTO_VALIDATE
335        offset += fGlobalOffset;
336        if (offset <= 0) {
337            this->rewind();
338        } else {
339            (void)this->seek(SkToSizeT(offset));
340        }
341        return true;
342    }
343
344private:
345    SkStreamAsset* onDuplicate() const override {
346        return new SkROBufferStreamAsset(fBuffer);
347    }
348
349    SkStreamAsset* onFork() const override {
350        auto clone = this->duplicate();
351        clone->seek(this->getPosition());
352        return clone.release();
353    }
354
355    sk_sp<SkROBuffer> fBuffer;
356    SkROBuffer::Iter  fIter;
357    size_t            fLocalOffset;
358    size_t            fGlobalOffset;
359};
360
361std::unique_ptr<SkStreamAsset> SkRWBuffer::makeStreamSnapshot() const {
362    return skstd::make_unique<SkROBufferStreamAsset>(this->makeROBufferSnapshot());
363}
364