Rizin
unix-like reverse engineering framework and cli tools
stream_encoder_mt.c
Go to the documentation of this file.
1 //
5 //
6 // Author: Lasse Collin
7 //
8 // This file has been put into the public domain.
9 // You can do whatever you want with this file.
10 //
12 
13 #include "filter_encoder.h"
14 #include "easy_preset.h"
15 #include "block_encoder.h"
16 #include "block_buffer_encoder.h"
17 #include "index_encoder.h"
18 #include "outqueue.h"
19 
20 
23 #define BLOCK_SIZE_MAX (UINT64_MAX / LZMA_THREADS_MAX)
24 
25 
26 typedef enum {
29 
32 
36 
40 
44 
45 } worker_state;
46 
48 
49 typedef struct worker_thread_s worker_thread;
52 
57 
60  size_t in_size;
61 
66 
70 
75 
78 
81 
84 
87 
90 
91  mythread_mutex mutex;
92  mythread_cond cond;
93 
96  mythread thread_id;
97 };
98 
99 
101  enum {
107 
110  size_t block_size;
111 
114 
115 
118 
121 
122 
125 
128 
130  size_t header_pos;
131 
132 
135 
136 
140 
141 
144 
147 
151 
155 
160 
164 
165 
169 
173 
174 
175  mythread_mutex mutex;
176  mythread_cond cond;
177 };
178 
179 
181 static void
183 {
184  assert(ret != LZMA_OK);
185  assert(ret != LZMA_STREAM_END);
186 
187  mythread_sync(thr->coder->mutex) {
188  if (thr->coder->thread_error == LZMA_OK)
189  thr->coder->thread_error = ret;
190 
191  mythread_cond_signal(&thr->coder->cond);
192  }
193 
194  return;
195 }
196 
197 
198 static worker_state
200 {
201  assert(thr->progress_in == 0);
202  assert(thr->progress_out == 0);
203 
204  // Set the Block options.
205  thr->block_options = (lzma_block){
206  .version = 0,
207  .check = thr->coder->stream_flags.check,
208  .compressed_size = thr->coder->outq.buf_size_max,
209  .uncompressed_size = thr->coder->block_size,
210 
211  // TODO: To allow changing the filter chain, the filters
212  // array must be copied to each worker_thread.
213  .filters = thr->coder->filters,
214  };
215 
216  // Calculate maximum size of the Block Header. This amount is
217  // reserved in the beginning of the buffer so that Block Header
218  // along with Compressed Size and Uncompressed Size can be
219  // written there.
220  lzma_ret ret = lzma_block_header_size(&thr->block_options);
221  if (ret != LZMA_OK) {
222  worker_error(thr, ret);
223  return THR_STOP;
224  }
225 
226  // Initialize the Block encoder.
228  thr->allocator, &thr->block_options);
229  if (ret != LZMA_OK) {
230  worker_error(thr, ret);
231  return THR_STOP;
232  }
233 
234  size_t in_pos = 0;
235  size_t in_size = 0;
236 
237  thr->outbuf->size = thr->block_options.header_size;
238  const size_t out_size = thr->coder->outq.buf_size_max;
239 
240  do {
241  mythread_sync(thr->mutex) {
242  // Store in_pos and out_pos into *thr so that
243  // an application may read them via
244  // lzma_get_progress() to get progress information.
245  //
246  // NOTE: These aren't updated when the encoding
247  // finishes. Instead, the final values are taken
248  // later from thr->outbuf.
249  thr->progress_in = in_pos;
250  thr->progress_out = thr->outbuf->size;
251 
252  while (in_size == thr->in_size
253  && thr->state == THR_RUN)
254  mythread_cond_wait(&thr->cond, &thr->mutex);
255 
256  state = thr->state;
257  in_size = thr->in_size;
258  }
259 
260  // Return if we were asked to stop or exit.
261  if (state >= THR_STOP)
262  return state;
263 
265  ? LZMA_FINISH : LZMA_RUN;
266 
267  // Limit the amount of input given to the Block encoder
268  // at once. This way this thread can react fairly quickly
269  // if the main thread wants us to stop or exit.
270  static const size_t in_chunk_max = 16384;
271  size_t in_limit = in_size;
272  if (in_size - in_pos > in_chunk_max) {
273  in_limit = in_pos + in_chunk_max;
274  action = LZMA_RUN;
275  }
276 
277  ret = thr->block_encoder.code(
278  thr->block_encoder.coder, thr->allocator,
279  thr->in, &in_pos, in_limit, thr->outbuf->buf,
280  &thr->outbuf->size, out_size, action);
281  } while (ret == LZMA_OK && thr->outbuf->size < out_size);
282 
283  switch (ret) {
284  case LZMA_STREAM_END:
285  assert(state == THR_FINISH);
286 
287  // Encode the Block Header. By doing it after
288  // the compression, we can store the Compressed Size
289  // and Uncompressed Size fields.
290  ret = lzma_block_header_encode(&thr->block_options,
291  thr->outbuf->buf);
292  if (ret != LZMA_OK) {
293  worker_error(thr, ret);
294  return THR_STOP;
295  }
296 
297  break;
298 
299  case LZMA_OK:
300  // The data was incompressible. Encode it using uncompressed
301  // LZMA2 chunks.
302  //
303  // First wait that we have gotten all the input.
304  mythread_sync(thr->mutex) {
305  while (thr->state == THR_RUN)
306  mythread_cond_wait(&thr->cond, &thr->mutex);
307 
308  state = thr->state;
309  in_size = thr->in_size;
310  }
311 
312  if (state >= THR_STOP)
313  return state;
314 
315  // Do the encoding. This takes care of the Block Header too.
316  thr->outbuf->size = 0;
317  ret = lzma_block_uncomp_encode(&thr->block_options,
318  thr->in, in_size, thr->outbuf->buf,
319  &thr->outbuf->size, out_size);
320 
321  // It shouldn't fail.
322  if (ret != LZMA_OK) {
324  return THR_STOP;
325  }
326 
327  break;
328 
329  default:
330  worker_error(thr, ret);
331  return THR_STOP;
332  }
333 
334  // Set the size information that will be read by the main thread
335  // to write the Index field.
336  thr->outbuf->unpadded_size
337  = lzma_block_unpadded_size(&thr->block_options);
338  assert(thr->outbuf->unpadded_size != 0);
340 
341  return THR_FINISH;
342 }
343 
344 
345 static MYTHREAD_RET_TYPE
346 worker_start(void *thr_ptr)
347 {
348  worker_thread *thr = thr_ptr;
349  worker_state state = THR_IDLE; // Init to silence a warning
350 
351  while (true) {
352  // Wait for work.
353  mythread_sync(thr->mutex) {
354  while (true) {
355  // The thread is already idle so if we are
356  // requested to stop, just set the state.
357  if (thr->state == THR_STOP) {
358  thr->state = THR_IDLE;
359  mythread_cond_signal(&thr->cond);
360  }
361 
362  state = thr->state;
363  if (state != THR_IDLE)
364  break;
365 
366  mythread_cond_wait(&thr->cond, &thr->mutex);
367  }
368  }
369 
370  assert(state != THR_IDLE);
371  assert(state != THR_STOP);
372 
373  if (state <= THR_FINISH)
374  state = worker_encode(thr, state);
375 
376  if (state == THR_EXIT)
377  break;
378 
379  // Mark the thread as idle unless the main thread has
380  // told us to exit. Signal is needed for the case
381  // where the main thread is waiting for the threads to stop.
382  mythread_sync(thr->mutex) {
383  if (thr->state != THR_EXIT) {
384  thr->state = THR_IDLE;
385  mythread_cond_signal(&thr->cond);
386  }
387  }
388 
389  mythread_sync(thr->coder->mutex) {
390  // Mark the output buffer as finished if
391  // no errors occurred.
392  thr->outbuf->finished = state == THR_FINISH;
393 
394  // Update the main progress info.
395  thr->coder->progress_in
396  += thr->outbuf->uncompressed_size;
397  thr->coder->progress_out += thr->outbuf->size;
398  thr->progress_in = 0;
399  thr->progress_out = 0;
400 
401  // Return this thread to the stack of free threads.
402  thr->next = thr->coder->threads_free;
403  thr->coder->threads_free = thr;
404 
405  mythread_cond_signal(&thr->coder->cond);
406  }
407  }
408 
409  // Exiting, free the resources.
410  mythread_mutex_destroy(&thr->mutex);
411  mythread_cond_destroy(&thr->cond);
412 
414  lzma_free(thr->in, thr->allocator);
415  return MYTHREAD_RET_VALUE;
416 }
417 
418 
420 static void
421 threads_stop(lzma_stream_coder *coder, bool wait_for_threads)
422 {
423  // Tell the threads to stop.
424  for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
425  mythread_sync(coder->threads[i].mutex) {
426  coder->threads[i].state = THR_STOP;
427  mythread_cond_signal(&coder->threads[i].cond);
428  }
429  }
430 
431  if (!wait_for_threads)
432  return;
433 
434  // Wait for the threads to settle in the idle state.
435  for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
436  mythread_sync(coder->threads[i].mutex) {
437  while (coder->threads[i].state != THR_IDLE)
438  mythread_cond_wait(&coder->threads[i].cond,
439  &coder->threads[i].mutex);
440  }
441  }
442 
443  return;
444 }
445 
446 
449 static void
451 {
452  for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
453  mythread_sync(coder->threads[i].mutex) {
454  coder->threads[i].state = THR_EXIT;
455  mythread_cond_signal(&coder->threads[i].cond);
456  }
457  }
458 
459  for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
460  int ret = mythread_join(coder->threads[i].thread_id);
461  assert(ret == 0);
462  (void)ret;
463  }
464 
465  lzma_free(coder->threads, allocator);
466  return;
467 }
468 
469 
471 static lzma_ret
473  const lzma_allocator *allocator)
474 {
475  worker_thread *thr = &coder->threads[coder->threads_initialized];
476 
477  thr->in = lzma_alloc(coder->block_size, allocator);
478  if (thr->in == NULL)
479  return LZMA_MEM_ERROR;
480 
481  if (mythread_mutex_init(&thr->mutex))
482  goto error_mutex;
483 
484  if (mythread_cond_init(&thr->cond))
485  goto error_cond;
486 
487  thr->state = THR_IDLE;
488  thr->allocator = allocator;
489  thr->coder = coder;
490  thr->progress_in = 0;
491  thr->progress_out = 0;
493 
494  if (mythread_create(&thr->thread_id, &worker_start, thr))
495  goto error_thread;
496 
497  ++coder->threads_initialized;
498  coder->thr = thr;
499 
500  return LZMA_OK;
501 
502 error_thread:
503  mythread_cond_destroy(&thr->cond);
504 
505 error_cond:
506  mythread_mutex_destroy(&thr->mutex);
507 
508 error_mutex:
509  lzma_free(thr->in, allocator);
510  return LZMA_MEM_ERROR;
511 }
512 
513 
514 static lzma_ret
516 {
517  // If there are no free output subqueues, there is no
518  // point to try getting a thread.
519  if (!lzma_outq_has_buf(&coder->outq))
520  return LZMA_OK;
521 
522  // If there is a free structure on the stack, use it.
523  mythread_sync(coder->mutex) {
524  if (coder->threads_free != NULL) {
525  coder->thr = coder->threads_free;
526  coder->threads_free = coder->threads_free->next;
527  }
528  }
529 
530  if (coder->thr == NULL) {
531  // If there are no uninitialized structures left, return.
532  if (coder->threads_initialized == coder->threads_max)
533  return LZMA_OK;
534 
535  // Initialize a new thread.
537  }
538 
539  // Reset the parts of the thread state that have to be done
540  // in the main thread.
541  mythread_sync(coder->thr->mutex) {
542  coder->thr->state = THR_RUN;
543  coder->thr->in_size = 0;
544  coder->thr->outbuf = lzma_outq_get_buf(&coder->outq);
545  mythread_cond_signal(&coder->thr->cond);
546  }
547 
548  return LZMA_OK;
549 }
550 
551 
552 static lzma_ret
554  const uint8_t *restrict in, size_t *restrict in_pos,
555  size_t in_size, lzma_action action)
556 {
557  while (*in_pos < in_size
558  || (coder->thr != NULL && action != LZMA_RUN)) {
559  if (coder->thr == NULL) {
560  // Get a new thread.
561  const lzma_ret ret = get_thread(coder, allocator);
562  if (coder->thr == NULL)
563  return ret;
564  }
565 
566  // Copy the input data to thread's buffer.
567  size_t thr_in_size = coder->thr->in_size;
568  lzma_bufcpy(in, in_pos, in_size, coder->thr->in,
569  &thr_in_size, coder->block_size);
570 
571  // Tell the Block encoder to finish if
572  // - it has got block_size bytes of input; or
573  // - all input was used and LZMA_FINISH, LZMA_FULL_FLUSH,
574  // or LZMA_FULL_BARRIER was used.
575  //
576  // TODO: LZMA_SYNC_FLUSH and LZMA_SYNC_BARRIER.
577  const bool finish = thr_in_size == coder->block_size
578  || (*in_pos == in_size && action != LZMA_RUN);
579 
580  bool block_error = false;
581 
582  mythread_sync(coder->thr->mutex) {
583  if (coder->thr->state == THR_IDLE) {
584  // Something has gone wrong with the Block
585  // encoder. It has set coder->thread_error
586  // which we will read a few lines later.
587  block_error = true;
588  } else {
589  // Tell the Block encoder its new amount
590  // of input and update the state if needed.
591  coder->thr->in_size = thr_in_size;
592 
593  if (finish)
594  coder->thr->state = THR_FINISH;
595 
596  mythread_cond_signal(&coder->thr->cond);
597  }
598  }
599 
600  if (block_error) {
601  lzma_ret ret;
602 
603  mythread_sync(coder->mutex) {
604  ret = coder->thread_error;
605  }
606 
607  return ret;
608  }
609 
610  if (finish)
611  coder->thr = NULL;
612  }
613 
614  return LZMA_OK;
615 }
616 
617 
620 static bool
621 wait_for_work(lzma_stream_coder *coder, mythread_condtime *wait_abs,
622  bool *has_blocked, bool has_input)
623 {
624  if (coder->timeout != 0 && !*has_blocked) {
625  // Every time when stream_encode_mt() is called via
626  // lzma_code(), *has_blocked starts as false. We set it
627  // to true here and calculate the absolute time when
628  // we must return if there's nothing to do.
629  //
630  // The idea of *has_blocked is to avoid unneeded calls
631  // to mythread_condtime_set(), which may do a syscall
632  // depending on the operating system.
633  *has_blocked = true;
634  mythread_condtime_set(wait_abs, &coder->cond, coder->timeout);
635  }
636 
637  bool timed_out = false;
638 
639  mythread_sync(coder->mutex) {
640  // There are four things that we wait. If one of them
641  // becomes possible, we return.
642  // - If there is input left, we need to get a free
643  // worker thread and an output buffer for it.
644  // - Data ready to be read from the output queue.
645  // - A worker thread indicates an error.
646  // - Time out occurs.
647  while ((!has_input || coder->threads_free == NULL
648  || !lzma_outq_has_buf(&coder->outq))
649  && !lzma_outq_is_readable(&coder->outq)
650  && coder->thread_error == LZMA_OK
651  && !timed_out) {
652  if (coder->timeout != 0)
653  timed_out = mythread_cond_timedwait(
654  &coder->cond, &coder->mutex,
655  wait_abs) != 0;
656  else
657  mythread_cond_wait(&coder->cond,
658  &coder->mutex);
659  }
660  }
661 
662  return timed_out;
663 }
664 
665 
666 static lzma_ret
667 stream_encode_mt(void *coder_ptr, const lzma_allocator *allocator,
668  const uint8_t *restrict in, size_t *restrict in_pos,
669  size_t in_size, uint8_t *restrict out,
670  size_t *restrict out_pos, size_t out_size, lzma_action action)
671 {
672  lzma_stream_coder *coder = coder_ptr;
673 
674  switch (coder->sequence) {
675  case SEQ_STREAM_HEADER:
676  lzma_bufcpy(coder->header, &coder->header_pos,
677  sizeof(coder->header),
678  out, out_pos, out_size);
679  if (coder->header_pos < sizeof(coder->header))
680  return LZMA_OK;
681 
682  coder->header_pos = 0;
683  coder->sequence = SEQ_BLOCK;
684 
685  // Fall through
686 
687  case SEQ_BLOCK: {
688  // Initialized to silence warnings.
691  lzma_ret ret = LZMA_OK;
692 
693  // These are for wait_for_work().
694  bool has_blocked = false;
695  mythread_condtime wait_abs;
696 
697  while (true) {
698  mythread_sync(coder->mutex) {
699  // Check for Block encoder errors.
700  ret = coder->thread_error;
701  if (ret != LZMA_OK) {
702  assert(ret != LZMA_STREAM_END);
703  break; // Break out of mythread_sync.
704  }
705 
706  // Try to read compressed data to out[].
707  ret = lzma_outq_read(&coder->outq,
708  out, out_pos, out_size,
709  &unpadded_size,
711  }
712 
713  if (ret == LZMA_STREAM_END) {
714  // End of Block. Add it to the Index.
715  ret = lzma_index_append(coder->index,
718 
719  // If we didn't fill the output buffer yet,
720  // try to read more data. Maybe the next
721  // outbuf has been finished already too.
722  if (*out_pos < out_size)
723  continue;
724  }
725 
726  if (ret != LZMA_OK) {
727  // coder->thread_error was set or
728  // lzma_index_append() failed.
729  threads_stop(coder, false);
730  return ret;
731  }
732 
733  // Try to give uncompressed data to a worker thread.
734  ret = stream_encode_in(coder, allocator,
735  in, in_pos, in_size, action);
736  if (ret != LZMA_OK) {
737  threads_stop(coder, false);
738  return ret;
739  }
740 
741  // See if we should wait or return.
742  //
743  // TODO: LZMA_SYNC_FLUSH and LZMA_SYNC_BARRIER.
744  if (*in_pos == in_size) {
745  // LZMA_RUN: More data is probably coming
746  // so return to let the caller fill the
747  // input buffer.
748  if (action == LZMA_RUN)
749  return LZMA_OK;
750 
751  // LZMA_FULL_BARRIER: The same as with
752  // LZMA_RUN but tell the caller that the
753  // barrier was completed.
754  if (action == LZMA_FULL_BARRIER)
755  return LZMA_STREAM_END;
756 
757  // Finishing or flushing isn't completed until
758  // all input data has been encoded and copied
759  // to the output buffer.
760  if (lzma_outq_is_empty(&coder->outq)) {
761  // LZMA_FINISH: Continue to encode
762  // the Index field.
763  if (action == LZMA_FINISH)
764  break;
765 
766  // LZMA_FULL_FLUSH: Return to tell
767  // the caller that flushing was
768  // completed.
769  if (action == LZMA_FULL_FLUSH)
770  return LZMA_STREAM_END;
771  }
772  }
773 
774  // Return if there is no output space left.
775  // This check must be done after testing the input
776  // buffer, because we might want to use a different
777  // return code.
778  if (*out_pos == out_size)
779  return LZMA_OK;
780 
781  // Neither in nor out has been used completely.
782  // Wait until there's something we can do.
783  if (wait_for_work(coder, &wait_abs, &has_blocked,
784  *in_pos < in_size))
785  return LZMA_TIMED_OUT;
786  }
787 
788  // All Blocks have been encoded and the threads have stopped.
789  // Prepare to encode the Index field.
791  &coder->index_encoder, allocator,
792  coder->index));
793  coder->sequence = SEQ_INDEX;
794 
795  // Update the progress info to take the Index and
796  // Stream Footer into account. Those are very fast to encode
797  // so in terms of progress information they can be thought
798  // to be ready to be copied out.
799  coder->progress_out += lzma_index_size(coder->index)
801  }
802 
803  // Fall through
804 
805  case SEQ_INDEX: {
806  // Call the Index encoder. It doesn't take any input, so
807  // those pointers can be NULL.
808  const lzma_ret ret = coder->index_encoder.code(
809  coder->index_encoder.coder, allocator,
810  NULL, NULL, 0,
811  out, out_pos, out_size, LZMA_RUN);
812  if (ret != LZMA_STREAM_END)
813  return ret;
814 
815  // Encode the Stream Footer into coder->buffer.
817  = lzma_index_size(coder->index);
818  if (lzma_stream_footer_encode(&coder->stream_flags,
819  coder->header) != LZMA_OK)
820  return LZMA_PROG_ERROR;
821 
822  coder->sequence = SEQ_STREAM_FOOTER;
823  }
824 
825  // Fall through
826 
827  case SEQ_STREAM_FOOTER:
828  lzma_bufcpy(coder->header, &coder->header_pos,
829  sizeof(coder->header),
830  out, out_pos, out_size);
831  return coder->header_pos < sizeof(coder->header)
833  }
834 
835  assert(0);
836  return LZMA_PROG_ERROR;
837 }
838 
839 
840 static void
842 {
843  lzma_stream_coder *coder = coder_ptr;
844 
845  // Threads must be killed before the output queue can be freed.
846  threads_end(coder, allocator);
847  lzma_outq_end(&coder->outq, allocator);
848 
849  for (size_t i = 0; coder->filters[i].id != LZMA_VLI_UNKNOWN; ++i)
850  lzma_free(coder->filters[i].options, allocator);
851 
853  lzma_index_end(coder->index, allocator);
854 
855  mythread_cond_destroy(&coder->cond);
856  mythread_mutex_destroy(&coder->mutex);
857 
858  lzma_free(coder, allocator);
859  return;
860 }
861 
862 
865 static lzma_ret
867  const lzma_filter **filters, uint64_t *block_size,
868  uint64_t *outbuf_size_max)
869 {
870  // Validate some of the options.
871  if (options == NULL)
872  return LZMA_PROG_ERROR;
873 
874  if (options->flags != 0 || options->threads == 0
875  || options->threads > LZMA_THREADS_MAX)
876  return LZMA_OPTIONS_ERROR;
877 
878  if (options->filters != NULL) {
879  // Filter chain was given, use it as is.
880  *filters = options->filters;
881  } else {
882  // Use a preset.
883  if (lzma_easy_preset(opt_easy, options->preset))
884  return LZMA_OPTIONS_ERROR;
885 
886  *filters = opt_easy->filters;
887  }
888 
889  // Block size
890  if (options->block_size > 0) {
891  if (options->block_size > BLOCK_SIZE_MAX)
892  return LZMA_OPTIONS_ERROR;
893 
894  *block_size = options->block_size;
895  } else {
896  // Determine the Block size from the filter chain.
897  *block_size = lzma_mt_block_size(*filters);
898  if (*block_size == 0)
899  return LZMA_OPTIONS_ERROR;
900 
901  assert(*block_size <= BLOCK_SIZE_MAX);
902  }
903 
904  // Calculate the maximum amount output that a single output buffer
905  // may need to hold. This is the same as the maximum total size of
906  // a Block.
907  *outbuf_size_max = lzma_block_buffer_bound64(*block_size);
908  if (*outbuf_size_max == 0)
909  return LZMA_MEM_ERROR;
910 
911  return LZMA_OK;
912 }
913 
914 
915 static void
916 get_progress(void *coder_ptr, uint64_t *progress_in, uint64_t *progress_out)
917 {
918  lzma_stream_coder *coder = coder_ptr;
919 
920  // Lock coder->mutex to prevent finishing threads from moving their
921  // progress info from the worker_thread structure to lzma_stream_coder.
922  mythread_sync(coder->mutex) {
923  *progress_in = coder->progress_in;
924  *progress_out = coder->progress_out;
925 
926  for (size_t i = 0; i < coder->threads_initialized; ++i) {
927  mythread_sync(coder->threads[i].mutex) {
928  *progress_in += coder->threads[i].progress_in;
929  *progress_out += coder->threads[i]
930  .progress_out;
931  }
932  }
933  }
934 
935  return;
936 }
937 
938 
939 static lzma_ret
941  const lzma_mt *options)
942 {
944 
945  // Get the filter chain.
946  lzma_options_easy easy;
947  const lzma_filter *filters;
948  uint64_t block_size;
949  uint64_t outbuf_size_max;
951  &block_size, &outbuf_size_max));
952 
953 #if SIZE_MAX < UINT64_MAX
954  if (block_size > SIZE_MAX)
955  return LZMA_MEM_ERROR;
956 #endif
957 
958  // Validate the filter chain so that we can give an error in this
959  // function instead of delaying it to the first call to lzma_code().
960  // The memory usage calculation verifies the filter chain as
961  // a side effect so we take advantage of that.
962  if (lzma_raw_encoder_memusage(filters) == UINT64_MAX)
963  return LZMA_OPTIONS_ERROR;
964 
965  // Validate the Check ID.
966  if ((unsigned int)(options->check) > LZMA_CHECK_ID_MAX)
967  return LZMA_PROG_ERROR;
968 
969  if (!lzma_check_is_supported(options->check))
970  return LZMA_UNSUPPORTED_CHECK;
971 
972  // Allocate and initialize the base structure if needed.
973  lzma_stream_coder *coder = next->coder;
974  if (coder == NULL) {
975  coder = lzma_alloc(sizeof(lzma_stream_coder), allocator);
976  if (coder == NULL)
977  return LZMA_MEM_ERROR;
978 
979  next->coder = coder;
980 
981  // For the mutex and condition variable initializations
982  // the error handling has to be done here because
983  // stream_encoder_mt_end() doesn't know if they have
984  // already been initialized or not.
985  if (mythread_mutex_init(&coder->mutex)) {
986  lzma_free(coder, allocator);
987  next->coder = NULL;
988  return LZMA_MEM_ERROR;
989  }
990 
991  if (mythread_cond_init(&coder->cond)) {
992  mythread_mutex_destroy(&coder->mutex);
993  lzma_free(coder, allocator);
994  next->coder = NULL;
995  return LZMA_MEM_ERROR;
996  }
997 
998  next->code = &stream_encode_mt;
999  next->end = &stream_encoder_mt_end;
1000  next->get_progress = &get_progress;
1001 // next->update = &stream_encoder_mt_update;
1002 
1003  coder->filters[0].id = LZMA_VLI_UNKNOWN;
1005  coder->index = NULL;
1006  memzero(&coder->outq, sizeof(coder->outq));
1007  coder->threads = NULL;
1008  coder->threads_max = 0;
1009  coder->threads_initialized = 0;
1010  }
1011 
1012  // Basic initializations
1013  coder->sequence = SEQ_STREAM_HEADER;
1014  coder->block_size = (size_t)(block_size);
1015  coder->thread_error = LZMA_OK;
1016  coder->thr = NULL;
1017 
1018  // Allocate the thread-specific base structures.
1019  assert(options->threads > 0);
1020  if (coder->threads_max != options->threads) {
1021  threads_end(coder, allocator);
1022 
1023  coder->threads = NULL;
1024  coder->threads_max = 0;
1025 
1026  coder->threads_initialized = 0;
1027  coder->threads_free = NULL;
1028 
1029  coder->threads = lzma_alloc(
1030  options->threads * sizeof(worker_thread),
1031  allocator);
1032  if (coder->threads == NULL)
1033  return LZMA_MEM_ERROR;
1034 
1035  coder->threads_max = options->threads;
1036  } else {
1037  // Reuse the old structures and threads. Tell the running
1038  // threads to stop and wait until they have stopped.
1039  threads_stop(coder, true);
1040  }
1041 
1042  // Output queue
1044  outbuf_size_max, options->threads));
1045 
1046  // Timeout
1047  coder->timeout = options->timeout;
1048 
1049  // Free the old filter chain and copy the new one.
1050  for (size_t i = 0; coder->filters[i].id != LZMA_VLI_UNKNOWN; ++i)
1051  lzma_free(coder->filters[i].options, allocator);
1052 
1053  return_if_error(lzma_filters_copy(
1054  filters, coder->filters, allocator));
1055 
1056  // Index
1057  lzma_index_end(coder->index, allocator);
1058  coder->index = lzma_index_init(allocator);
1059  if (coder->index == NULL)
1060  return LZMA_MEM_ERROR;
1061 
1062  // Stream Header
1063  coder->stream_flags.version = 0;
1064  coder->stream_flags.check = options->check;
1065  return_if_error(lzma_stream_header_encode(
1066  &coder->stream_flags, coder->header));
1067 
1068  coder->header_pos = 0;
1069 
1070  // Progress info
1071  coder->progress_in = 0;
1072  coder->progress_out = LZMA_STREAM_HEADER_SIZE;
1073 
1074  return LZMA_OK;
1075 }
1076 
1077 
1079 lzma_stream_encoder_mt(lzma_stream *strm, const lzma_mt *options)
1080 {
1082 
1084 // strm->internal->supported_actions[LZMA_SYNC_FLUSH] = true;
1088 
1089  return LZMA_OK;
1090 }
1091 
1092 
1093 // This function name is a monster but it's consistent with the older
1094 // monster names. :-( 31 chars is the max that C99 requires so in that
1095 // sense it's not too long. ;-)
1097 lzma_stream_encoder_mt_memusage(const lzma_mt *options)
1098 {
1099  lzma_options_easy easy;
1100  const lzma_filter *filters;
1101  uint64_t block_size;
1102  uint64_t outbuf_size_max;
1103 
1104  if (get_options(options, &easy, &filters, &block_size,
1105  &outbuf_size_max) != LZMA_OK)
1106  return UINT64_MAX;
1107 
1108  // Memory usage of the input buffers
1109  const uint64_t inbuf_memusage = options->threads * block_size;
1110 
1111  // Memory usage of the filter encoders
1112  uint64_t filters_memusage = lzma_raw_encoder_memusage(filters);
1113  if (filters_memusage == UINT64_MAX)
1114  return UINT64_MAX;
1115 
1116  filters_memusage *= options->threads;
1117 
1118  // Memory usage of the output queue
1119  const uint64_t outq_memusage = lzma_outq_memusage(
1120  outbuf_size_max, options->threads);
1121  if (outq_memusage == UINT64_MAX)
1122  return UINT64_MAX;
1123 
1124  // Sum them with overflow checking.
1125  uint64_t total_memusage = LZMA_MEMUSAGE_BASE
1126  + sizeof(lzma_stream_coder)
1127  + options->threads * sizeof(worker_thread);
1128 
1129  if (UINT64_MAX - total_memusage < inbuf_memusage)
1130  return UINT64_MAX;
1131 
1132  total_memusage += inbuf_memusage;
1133 
1134  if (UINT64_MAX - total_memusage < filters_memusage)
1135  return UINT64_MAX;
1136 
1137  total_memusage += filters_memusage;
1138 
1139  if (UINT64_MAX - total_memusage < outq_memusage)
1140  return UINT64_MAX;
1141 
1142  return total_memusage + outq_memusage;
1143 }
static bool finish(void *user)
Definition: analysis_pyc.c:133
#define LZMA_CHECK_ID_MAX
Maximum valid Check ID.
Definition: check.h:68
lzma_index ** i
Definition: index.h:629
const lzma_allocator lzma_vli unpadded_size
Definition: index.h:345
const lzma_allocator const uint8_t size_t uint8_t size_t * out_pos
Definition: block.h:528
const lzma_allocator const uint8_t size_t * in_pos
Definition: block.h:579
const lzma_allocator const uint8_t size_t in_size
Definition: block.h:527
const lzma_allocator * allocator
Definition: block.h:377
const lzma_allocator const uint8_t * in
Definition: block.h:527
const lzma_allocator const uint8_t size_t uint8_t * out
Definition: block.h:528
uint64_t lzma_block_buffer_bound64(uint64_t uncompressed_size)
Single-call .xz Block encoder.
lzma_ret lzma_block_encoder_init(lzma_next_coder *next, const lzma_allocator *allocator, lzma_block *block)
Encodes .xz Blocks.
const lzma_filter * filters
Definition: container.h:315
#define NULL
Definition: cris-opc.c:27
bool lzma_easy_preset(lzma_options_easy *opt_easy, uint32_t preset)
Definition: easy_preset.c:17
Preset handling for easy encoder and decoder.
#define LZMA_FILTERS_MAX
Maximum number of filters in a chain.
Definition: filter.h:26
uint64_t lzma_mt_block_size(const lzma_filter *filters)
static lzma_stream strm
Definition: full_flush.c:20
lzma_ret lzma_index_encoder_init(lzma_next_coder *next, const lzma_allocator *allocator, const lzma_index *i)
Encodes the Index field.
#define restrict
static const char struct stat static buf struct stat static buf static vhangup int options
Definition: sflib.h:145
assert(limit<=UINT32_MAX/2)
lzma_outbuf * lzma_outq_get_buf(lzma_outq *outq)
Get a new buffer.
Definition: outqueue.c:114
void lzma_outq_end(lzma_outq *outq, const lzma_allocator *allocator)
Free the memory associated with the output queue.
Definition: outqueue.c:101
bool lzma_outq_is_readable(const lzma_outq *outq)
Test if there is data ready to be read.
Definition: outqueue.c:136
lzma_ret lzma_outq_read(lzma_outq *restrict outq, uint8_t *restrict out, size_t *restrict out_pos, size_t out_size, lzma_vli *restrict unpadded_size, lzma_vli *restrict uncompressed_size)
Read finished data.
Definition: outqueue.c:147
uint64_t lzma_outq_memusage(uint64_t buf_size_max, uint32_t threads)
Calculate the memory usage of an output queue.
Definition: outqueue.c:42
lzma_ret lzma_outq_init(lzma_outq *outq, const lzma_allocator *allocator, uint64_t buf_size_max, uint32_t threads)
Initialize an output queue.
Definition: outqueue.c:57
Output queue handling in multithreaded coding.
static bool lzma_outq_has_buf(const lzma_outq *outq)
Test if there is at least one buffer free.
Definition: outqueue.h:145
static bool lzma_outq_is_empty(const lzma_outq *outq)
Test if the queue is completely empty.
Definition: outqueue.h:153
int size_t
Definition: sftypes.h:40
unsigned int uint32_t
Definition: sftypes.h:29
unsigned long uint64_t
Definition: sftypes.h:28
unsigned char uint8_t
Definition: sftypes.h:31
#define UINT64_MAX
#define SIZE_MAX
static lzma_ret stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator, const lzma_mt *options)
static void threads_stop(lzma_stream_coder *coder, bool wait_for_threads)
Make the threads stop but not exit. Optionally wait for them to stop.
LZMA_API(lzma_ret)
static lzma_ret stream_encode_mt(void *coder_ptr, const lzma_allocator *allocator, const uint8_t *restrict in, size_t *restrict in_pos, size_t in_size, uint8_t *restrict out, size_t *restrict out_pos, size_t out_size, lzma_action action)
static worker_state worker_encode(worker_thread *thr, worker_state state)
static void stream_encoder_mt_end(void *coder_ptr, const lzma_allocator *allocator)
static lzma_ret get_thread(lzma_stream_coder *coder, const lzma_allocator *allocator)
struct lzma_stream_coder_s lzma_stream_coder
static void worker_error(worker_thread *thr, lzma_ret ret)
Tell the main thread that something has gone wrong.
worker_state
@ THR_RUN
Encoding is in progress.
@ THR_EXIT
@ THR_FINISH
@ THR_STOP
@ THR_IDLE
Waiting for work.
static void get_progress(void *coder_ptr, uint64_t *progress_in, uint64_t *progress_out)
#define BLOCK_SIZE_MAX
static lzma_ret initialize_new_thread(lzma_stream_coder *coder, const lzma_allocator *allocator)
Initialize a new worker_thread structure and create a new thread.
static void threads_end(lzma_stream_coder *coder, const lzma_allocator *allocator)
static MYTHREAD_RET_TYPE worker_start(void *thr_ptr)
static bool wait_for_work(lzma_stream_coder *coder, mythread_condtime *wait_abs, bool *has_blocked, bool has_input)
static lzma_ret stream_encode_in(lzma_stream_coder *coder, const lzma_allocator *allocator, const uint8_t *restrict in, size_t *restrict in_pos, size_t in_size, lzma_action action)
static lzma_ret get_options(const lzma_mt *options, lzma_options_easy *opt_easy, const lzma_filter **filters, uint64_t *block_size, uint64_t *outbuf_size_max)
#define LZMA_STREAM_HEADER_SIZE
Size of Stream Header and Stream Footer.
Definition: stream_flags.h:27
Custom functions for memory handling.
Definition: base.h:372
Options for the Block and Block Header encoders and decoders.
Definition: block.h:30
lzma_vli uncompressed_size
Uncompressed Size in bytes.
Definition: block.h:172
uint32_t header_size
Size of the Block Header field.
Definition: block.h:72
uint32_t version
Block format version.
Definition: block.h:52
Filter options.
Definition: filter.h:43
void * options
Pointer to filter-specific options structure.
Definition: filter.h:63
lzma_vli id
Filter ID.
Definition: filter.h:54
bool supported_actions[LZMA_ACTION_MAX+1]
Indicates which lzma_action values are allowed by next.code.
Definition: common.h:220
Multithreading options.
Definition: container.h:66
Hold data and function pointers of the next filter in the chain.
Definition: common.h:135
void(* get_progress)(void *coder, uint64_t *progress_in, uint64_t *progress_out)
Definition: common.h:159
lzma_code_function code
Pointer to function to do the actual coding.
Definition: common.h:150
void * coder
Pointer to coder-specific data.
Definition: common.h:137
lzma_end_function end
Definition: common.h:155
lzma_filter filters[LZMA_FILTERS_MAX+1]
Definition: easy_preset.h:19
Output buffer for a single thread.
Definition: outqueue.h:17
bool finished
Definition: outqueue.h:32
size_t size
Amount of data written to buf.
Definition: outqueue.h:22
lzma_vli uncompressed_size
Definition: outqueue.h:26
uint8_t * buf
Pointer to the output buffer of lzma_outq.buf_size_max bytes.
Definition: outqueue.h:19
lzma_vli unpadded_size
Additional size information.
Definition: outqueue.h:25
uint8_t header[LZMA_STREAM_HEADER_SIZE]
Buffer to hold Stream Header and Stream Footer.
enum lzma_stream_coder_s::@654 sequence
lzma_next_coder index_encoder
Index encoder.
lzma_outq outq
Output buffer queue for compressed data.
lzma_filter filters[LZMA_FILTERS_MAX+1]
The filter chain currently in use.
lzma_index * index
Index to hold sizes of the Blocks.
worker_thread * threads_free
lzma_ret thread_error
Error code from a worker thread.
worker_thread * threads
Array of allocated thread-specific structures.
lzma_stream_flags stream_flags
Stream Flags for encoding the Stream Header and Stream Footer.
size_t header_pos
Read position in header[].
lzma_next_coder index_encoder
lzma_index * index
Index to hold sizes of the Blocks.
enum lzma_stream_coder::@652 sequence
lzma_stream_flags stream_flags
Stream Flags from Stream Header.
lzma_filter filters[LZMA_FILTERS_MAX+1]
The filter chain currently in use.
Options for encoding/decoding Stream Header and Stream Footer.
Definition: stream_flags.h:33
uint32_t version
Stream Flags format version.
Definition: stream_flags.h:51
lzma_vli backward_size
Backward Size.
Definition: stream_flags.h:69
lzma_check check
Check ID.
Definition: stream_flags.h:79
Passing data to and from liblzma.
Definition: base.h:485
lzma_internal * internal
Definition: base.h:505
Definition: dis.h:43
lzma_outbuf * outbuf
worker_thread * next
Next structure in the stack of free worker threads.
lzma_block block_options
Compression options for this Block.
uint64_t progress_in
Amount of uncompressed data that has already been compressed.
lzma_stream_coder * coder
lzma_next_coder block_encoder
Block encoder.
mythread_mutex mutex
const lzma_allocator * allocator
uint64_t progress_out
Amount of compressed data that is ready.
worker_state state
mythread_cond cond
#define LZMA_NEXT_CODER_INIT
Macro to initialize lzma_next_coder structure.
Definition: common.h:180
#define LZMA_MEMUSAGE_BASE
Definition: common.h:63
#define return_if_error(expr)
Return if expression doesn't evaluate to LZMA_OK.
Definition: common.h:278
#define lzma_next_coder_init(func, next, allocator)
Definition: common.h:291
#define LZMA_TIMED_OUT
Definition: common.h:88
#define lzma_next_strm_init(func, strm,...)
Definition: common.h:303
void * lzma_alloc(size_t size, const lzma_allocator *allocator) lzma_attribute((__malloc__)) lzma_attr_alloc_size(1)
Allocates memory.
#define LZMA_THREADS_MAX
Definition: common.h:56
uint64_t uncompressed_size
Definition: list.c:106
#define memzero(s, n)
Definition: sysdefs.h:180
uint64_t lzma_vli
Variable-length integer type.
Definition: vli.h:63
#define LZMA_VLI_UNKNOWN
VLI value to denote that the value is unknown.
Definition: vli.h:39
lzma_ret
Return values used by several functions in liblzma.
Definition: base.h:57
@ LZMA_PROG_ERROR
Programming error.
Definition: base.h:218
@ LZMA_MEM_ERROR
Cannot allocate memory.
Definition: base.h:128
@ LZMA_STREAM_END
End of stream was reached.
Definition: base.h:63
@ LZMA_UNSUPPORTED_CHECK
Cannot calculate the integrity check.
Definition: base.h:90
@ LZMA_OPTIONS_ERROR
Invalid or unsupported options.
Definition: base.h:160
@ LZMA_OK
Operation completed successfully.
Definition: base.h:58
lzma_action
The ‘action’ argument for lzma_code()
Definition: base.h:250
@ LZMA_FINISH
Finish the coding operation.
Definition: base.h:328
@ LZMA_RUN
Continue coding.
Definition: base.h:251
@ LZMA_FULL_FLUSH
Finish encoding of the current Block.
Definition: base.h:290
@ LZMA_FULL_BARRIER
Finish encoding of the current Block.
Definition: base.h:305
uint64_t * progress_in
Definition: base.h:599
void lzma_free(void *ptr, const lzma_allocator *allocator)
Frees memory.
Definition: common.c:78
size_t lzma_bufcpy(const uint8_t *restrict in, size_t *restrict in_pos, size_t in_size, uint8_t *restrict out, size_t *restrict out_pos, size_t out_size)
Definition: common.c:94
void lzma_next_end(lzma_next_coder *next, const lzma_allocator *allocator)
Definition: common.c:145