1/* MtCoder.c -- Multi-thread Coder
22010-09-24 : Igor Pavlov : Public domain */
3
4#include <stdio.h>
5
6#include "MtCoder.h"
7
8void LoopThread_Construct(CLoopThread *p)
9{
10  Thread_Construct(&p->thread);
11  Event_Construct(&p->startEvent);
12  Event_Construct(&p->finishedEvent);
13}
14
15void LoopThread_Close(CLoopThread *p)
16{
17  Thread_Close(&p->thread);
18  Event_Close(&p->startEvent);
19  Event_Close(&p->finishedEvent);
20}
21
22static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE LoopThreadFunc(void *pp)
23{
24  CLoopThread *p = (CLoopThread *)pp;
25  for (;;)
26  {
27    if (Event_Wait(&p->startEvent) != 0)
28      return SZ_ERROR_THREAD;
29    if (p->stop)
30      return 0;
31    p->res = p->func(p->param);
32    if (Event_Set(&p->finishedEvent) != 0)
33      return SZ_ERROR_THREAD;
34  }
35}
36
37WRes LoopThread_Create(CLoopThread *p)
38{
39  p->stop = 0;
40  RINOK(AutoResetEvent_CreateNotSignaled(&p->startEvent));
41  RINOK(AutoResetEvent_CreateNotSignaled(&p->finishedEvent));
42  return Thread_Create(&p->thread, LoopThreadFunc, p);
43}
44
45WRes LoopThread_StopAndWait(CLoopThread *p)
46{
47  p->stop = 1;
48  if (Event_Set(&p->startEvent) != 0)
49    return SZ_ERROR_THREAD;
50  return Thread_Wait(&p->thread);
51}
52
53WRes LoopThread_StartSubThread(CLoopThread *p) { return Event_Set(&p->startEvent); }
54WRes LoopThread_WaitSubThread(CLoopThread *p) { return Event_Wait(&p->finishedEvent); }
55
56static SRes Progress(ICompressProgress *p, UInt64 inSize, UInt64 outSize)
57{
58  return (p && p->Progress(p, inSize, outSize) != SZ_OK) ? SZ_ERROR_PROGRESS : SZ_OK;
59}
60
61static void MtProgress_Init(CMtProgress *p, ICompressProgress *progress)
62{
63  unsigned i;
64  for (i = 0; i < NUM_MT_CODER_THREADS_MAX; i++)
65    p->inSizes[i] = p->outSizes[i] = 0;
66  p->totalInSize = p->totalOutSize = 0;
67  p->progress = progress;
68  p->res = SZ_OK;
69}
70
71static void MtProgress_Reinit(CMtProgress *p, unsigned index)
72{
73  p->inSizes[index] = 0;
74  p->outSizes[index] = 0;
75}
76
77#define UPDATE_PROGRESS(size, prev, total) \
78  if (size != (UInt64)(Int64)-1) { total += size - prev; prev = size; }
79
80SRes MtProgress_Set(CMtProgress *p, unsigned index, UInt64 inSize, UInt64 outSize)
81{
82  SRes res;
83  CriticalSection_Enter(&p->cs);
84  UPDATE_PROGRESS(inSize, p->inSizes[index], p->totalInSize)
85  UPDATE_PROGRESS(outSize, p->outSizes[index], p->totalOutSize)
86  if (p->res == SZ_OK)
87    p->res = Progress(p->progress, p->totalInSize, p->totalOutSize);
88  res = p->res;
89  CriticalSection_Leave(&p->cs);
90  return res;
91}
92
93static void MtProgress_SetError(CMtProgress *p, SRes res)
94{
95  CriticalSection_Enter(&p->cs);
96  if (p->res == SZ_OK)
97    p->res = res;
98  CriticalSection_Leave(&p->cs);
99}
100
101static void MtCoder_SetError(CMtCoder* p, SRes res)
102{
103  CriticalSection_Enter(&p->cs);
104  if (p->res == SZ_OK)
105    p->res = res;
106  CriticalSection_Leave(&p->cs);
107}
108
109/* ---------- MtThread ---------- */
110
111void CMtThread_Construct(CMtThread *p, CMtCoder *mtCoder)
112{
113  p->mtCoder = mtCoder;
114  p->outBuf = 0;
115  p->inBuf = 0;
116  Event_Construct(&p->canRead);
117  Event_Construct(&p->canWrite);
118  LoopThread_Construct(&p->thread);
119}
120
121#define RINOK_THREAD(x) { if((x) != 0) return SZ_ERROR_THREAD; }
122
123static void CMtThread_CloseEvents(CMtThread *p)
124{
125  Event_Close(&p->canRead);
126  Event_Close(&p->canWrite);
127}
128
129static void CMtThread_Destruct(CMtThread *p)
130{
131  CMtThread_CloseEvents(p);
132
133  if (Thread_WasCreated(&p->thread.thread))
134  {
135    LoopThread_StopAndWait(&p->thread);
136    LoopThread_Close(&p->thread);
137  }
138
139  if (p->mtCoder->alloc)
140    IAlloc_Free(p->mtCoder->alloc, p->outBuf);
141  p->outBuf = 0;
142
143  if (p->mtCoder->alloc)
144    IAlloc_Free(p->mtCoder->alloc, p->inBuf);
145  p->inBuf = 0;
146}
147
148#define MY_BUF_ALLOC(buf, size, newSize) \
149  if (buf == 0 || size != newSize) \
150  { IAlloc_Free(p->mtCoder->alloc, buf); \
151    size = newSize; buf = (Byte *)IAlloc_Alloc(p->mtCoder->alloc, size); \
152    if (buf == 0) return SZ_ERROR_MEM; }
153
154static SRes CMtThread_Prepare(CMtThread *p)
155{
156  MY_BUF_ALLOC(p->inBuf, p->inBufSize, p->mtCoder->blockSize)
157  MY_BUF_ALLOC(p->outBuf, p->outBufSize, p->mtCoder->destBlockSize)
158
159  p->stopReading = False;
160  p->stopWriting = False;
161  RINOK_THREAD(AutoResetEvent_CreateNotSignaled(&p->canRead));
162  RINOK_THREAD(AutoResetEvent_CreateNotSignaled(&p->canWrite));
163
164  return SZ_OK;
165}
166
167static SRes FullRead(ISeqInStream *stream, Byte *data, size_t *processedSize)
168{
169  size_t size = *processedSize;
170  *processedSize = 0;
171  while (size != 0)
172  {
173    size_t curSize = size;
174    SRes res = stream->Read(stream, data, &curSize);
175    *processedSize += curSize;
176    data += curSize;
177    size -= curSize;
178    RINOK(res);
179    if (curSize == 0)
180      return SZ_OK;
181  }
182  return SZ_OK;
183}
184
185#define GET_NEXT_THREAD(p) &p->mtCoder->threads[p->index == p->mtCoder->numThreads  - 1 ? 0 : p->index + 1]
186
187static SRes MtThread_Process(CMtThread *p, Bool *stop)
188{
189  CMtThread *next;
190  *stop = True;
191  if (Event_Wait(&p->canRead) != 0)
192    return SZ_ERROR_THREAD;
193
194  next = GET_NEXT_THREAD(p);
195
196  if (p->stopReading)
197  {
198    next->stopReading = True;
199    return Event_Set(&next->canRead) == 0 ? SZ_OK : SZ_ERROR_THREAD;
200  }
201
202  {
203    size_t size = p->mtCoder->blockSize;
204    size_t destSize = p->outBufSize;
205
206    RINOK(FullRead(p->mtCoder->inStream, p->inBuf, &size));
207    next->stopReading = *stop = (size != p->mtCoder->blockSize);
208    if (Event_Set(&next->canRead) != 0)
209      return SZ_ERROR_THREAD;
210
211    RINOK(p->mtCoder->mtCallback->Code(p->mtCoder->mtCallback, p->index,
212        p->outBuf, &destSize, p->inBuf, size, *stop));
213
214    MtProgress_Reinit(&p->mtCoder->mtProgress, p->index);
215
216    if (Event_Wait(&p->canWrite) != 0)
217      return SZ_ERROR_THREAD;
218    if (p->stopWriting)
219      return SZ_ERROR_FAIL;
220    if (p->mtCoder->outStream->Write(p->mtCoder->outStream, p->outBuf, destSize) != destSize)
221      return SZ_ERROR_WRITE;
222    return Event_Set(&next->canWrite) == 0 ? SZ_OK : SZ_ERROR_THREAD;
223  }
224}
225
226static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc(void *pp)
227{
228  CMtThread *p = (CMtThread *)pp;
229  for (;;)
230  {
231    Bool stop;
232    CMtThread *next = GET_NEXT_THREAD(p);
233    SRes res = MtThread_Process(p, &stop);
234    if (res != SZ_OK)
235    {
236      MtCoder_SetError(p->mtCoder, res);
237      MtProgress_SetError(&p->mtCoder->mtProgress, res);
238      next->stopReading = True;
239      next->stopWriting = True;
240      Event_Set(&next->canRead);
241      Event_Set(&next->canWrite);
242      return res;
243    }
244    if (stop)
245      return 0;
246  }
247}
248
249void MtCoder_Construct(CMtCoder* p)
250{
251  unsigned i;
252  p->alloc = 0;
253  for (i = 0; i < NUM_MT_CODER_THREADS_MAX; i++)
254  {
255    CMtThread *t = &p->threads[i];
256    t->index = i;
257    CMtThread_Construct(t, p);
258  }
259  CriticalSection_Init(&p->cs);
260  CriticalSection_Init(&p->mtProgress.cs);
261}
262
263void MtCoder_Destruct(CMtCoder* p)
264{
265  unsigned i;
266  for (i = 0; i < NUM_MT_CODER_THREADS_MAX; i++)
267    CMtThread_Destruct(&p->threads[i]);
268  CriticalSection_Delete(&p->cs);
269  CriticalSection_Delete(&p->mtProgress.cs);
270}
271
272SRes MtCoder_Code(CMtCoder *p)
273{
274  unsigned i, numThreads = p->numThreads;
275  SRes res = SZ_OK;
276  p->res = SZ_OK;
277
278  MtProgress_Init(&p->mtProgress, p->progress);
279
280  for (i = 0; i < numThreads; i++)
281  {
282    RINOK(CMtThread_Prepare(&p->threads[i]));
283  }
284
285  for (i = 0; i < numThreads; i++)
286  {
287    CMtThread *t = &p->threads[i];
288    CLoopThread *lt = &t->thread;
289
290    if (!Thread_WasCreated(&lt->thread))
291    {
292      lt->func = ThreadFunc;
293      lt->param = t;
294
295      if (LoopThread_Create(lt) != SZ_OK)
296      {
297        res = SZ_ERROR_THREAD;
298        break;
299      }
300    }
301  }
302
303  if (res == SZ_OK)
304  {
305    unsigned j;
306    for (i = 0; i < numThreads; i++)
307    {
308      CMtThread *t = &p->threads[i];
309      if (LoopThread_StartSubThread(&t->thread) != SZ_OK)
310      {
311        res = SZ_ERROR_THREAD;
312        p->threads[0].stopReading = True;
313        break;
314      }
315    }
316
317    Event_Set(&p->threads[0].canWrite);
318    Event_Set(&p->threads[0].canRead);
319
320    for (j = 0; j < i; j++)
321      LoopThread_WaitSubThread(&p->threads[j].thread);
322  }
323
324  for (i = 0; i < numThreads; i++)
325    CMtThread_CloseEvents(&p->threads[i]);
326  return (res == SZ_OK) ? p->res : res;
327}
328