Rizin
unix-like reverse engineering framework and cli tools
stream_encoder_mt.c File Reference

Multithreaded .xz Stream encoder. More...

#include "filter_encoder.h"
#include "easy_preset.h"
#include "block_encoder.h"
#include "block_buffer_encoder.h"
#include "index_encoder.h"
#include "outqueue.h"

Go to the source code of this file.

Classes

struct  worker_thread_s
 
struct  lzma_stream_coder_s
 

Macros

#define BLOCK_SIZE_MAX   (UINT64_MAX / LZMA_THREADS_MAX)
 

Typedefs

typedef struct lzma_stream_coder_s lzma_stream_coder
 
typedef struct worker_thread_s worker_thread
 

Enumerations

enum  worker_state {
  THR_IDLE , THR_RUN , THR_FINISH , THR_STOP ,
  THR_EXIT
}
 

Functions

static void worker_error (worker_thread *thr, lzma_ret ret)
 Tell the main thread that something has gone wrong. More...
 
static worker_state worker_encode (worker_thread *thr, worker_state state)
 
static MYTHREAD_RET_TYPE worker_start (void *thr_ptr)
 
static void threads_stop (lzma_stream_coder *coder, bool wait_for_threads)
 Make the threads stop but not exit. Optionally wait for them to stop. More...
 
static void threads_end (lzma_stream_coder *coder, const lzma_allocator *allocator)
 
static lzma_ret initialize_new_thread (lzma_stream_coder *coder, const lzma_allocator *allocator)
 Initialize a new worker_thread structure and create a new thread. More...
 
static lzma_ret get_thread (lzma_stream_coder *coder, const lzma_allocator *allocator)
 
static lzma_ret stream_encode_in (lzma_stream_coder *coder, const lzma_allocator *allocator, const uint8_t *restrict in, size_t *restrict in_pos, size_t in_size, lzma_action action)
 
static bool wait_for_work (lzma_stream_coder *coder, mythread_condtime *wait_abs, bool *has_blocked, bool has_input)
 
static lzma_ret stream_encode_mt (void *coder_ptr, const lzma_allocator *allocator, const uint8_t *restrict in, size_t *restrict in_pos, size_t in_size, uint8_t *restrict out, size_t *restrict out_pos, size_t out_size, lzma_action action)
 
static void stream_encoder_mt_end (void *coder_ptr, const lzma_allocator *allocator)
 
static lzma_ret get_options (const lzma_mt *options, lzma_options_easy *opt_easy, const lzma_filter **filters, uint64_t *block_size, uint64_t *outbuf_size_max)
 
static void get_progress (void *coder_ptr, uint64_t *progress_in, uint64_t *progress_out)
 
static lzma_ret stream_encoder_mt_init (lzma_next_coder *next, const lzma_allocator *allocator, const lzma_mt *options)
 
 LZMA_API (lzma_ret)
 
 LZMA_API (uint64_t)
 Calculate approximate memory usage of easy encoder. More...
 

Detailed Description

Multithreaded .xz Stream encoder.

Definition in file stream_encoder_mt.c.

Macro Definition Documentation

◆ BLOCK_SIZE_MAX

#define BLOCK_SIZE_MAX   (UINT64_MAX / LZMA_THREADS_MAX)

Maximum supported block size. This makes it simpler to prevent integer overflows if we are given unusually large block size.

Definition at line 23 of file stream_encoder_mt.c.

Typedef Documentation

◆ lzma_stream_coder

Definition at line 1 of file stream_encoder_mt.c.

◆ worker_thread

Definition at line 1 of file stream_encoder_mt.c.

Enumeration Type Documentation

◆ worker_state

Enumerator
THR_IDLE 

Waiting for work.

THR_RUN 

Encoding is in progress.

THR_FINISH 

Encoding is in progress but no more input data will be read.

THR_STOP 

The main thread wants the thread to stop whatever it was doing but not exit.

THR_EXIT 

The main thread wants the thread to exit. We could use cancellation but since there's stopped anyway, this is lazier.

Definition at line 26 of file stream_encoder_mt.c.

26  {
28  THR_IDLE,
29 
31  THR_RUN,
32 
35  THR_FINISH,
36 
39  THR_STOP,
40 
43  THR_EXIT,
44 
45 } worker_state;
worker_state
@ THR_RUN
Encoding is in progress.
@ THR_EXIT
@ THR_FINISH
@ THR_STOP
@ THR_IDLE
Waiting for work.

Function Documentation

◆ get_options()

static lzma_ret get_options ( const lzma_mt options,
lzma_options_easy opt_easy,
const lzma_filter **  filters,
uint64_t block_size,
uint64_t outbuf_size_max 
)
static

Options handling for lzma_stream_encoder_mt_init() and lzma_stream_encoder_mt_memusage()

Definition at line 866 of file stream_encoder_mt.c.

869 {
870  // Validate some of the options.
871  if (options == NULL)
872  return LZMA_PROG_ERROR;
873 
874  if (options->flags != 0 || options->threads == 0
875  || options->threads > LZMA_THREADS_MAX)
876  return LZMA_OPTIONS_ERROR;
877 
878  if (options->filters != NULL) {
879  // Filter chain was given, use it as is.
880  *filters = options->filters;
881  } else {
882  // Use a preset.
883  if (lzma_easy_preset(opt_easy, options->preset))
884  return LZMA_OPTIONS_ERROR;
885 
886  *filters = opt_easy->filters;
887  }
888 
889  // Block size
890  if (options->block_size > 0) {
891  if (options->block_size > BLOCK_SIZE_MAX)
892  return LZMA_OPTIONS_ERROR;
893 
894  *block_size = options->block_size;
895  } else {
896  // Determine the Block size from the filter chain.
897  *block_size = lzma_mt_block_size(*filters);
898  if (*block_size == 0)
899  return LZMA_OPTIONS_ERROR;
900 
901  assert(*block_size <= BLOCK_SIZE_MAX);
902  }
903 
904  // Calculate the maximum amount output that a single output buffer
905  // may need to hold. This is the same as the maximum total size of
906  // a Block.
907  *outbuf_size_max = lzma_block_buffer_bound64(*block_size);
908  if (*outbuf_size_max == 0)
909  return LZMA_MEM_ERROR;
910 
911  return LZMA_OK;
912 }
uint64_t lzma_block_buffer_bound64(uint64_t uncompressed_size)
const lzma_filter * filters
Definition: container.h:315
#define NULL
Definition: cris-opc.c:27
bool lzma_easy_preset(lzma_options_easy *opt_easy, uint32_t preset)
Definition: easy_preset.c:17
uint64_t lzma_mt_block_size(const lzma_filter *filters)
static const char struct stat static buf struct stat static buf static vhangup int options
Definition: sflib.h:145
assert(limit<=UINT32_MAX/2)
#define BLOCK_SIZE_MAX
lzma_filter filters[LZMA_FILTERS_MAX+1]
Definition: easy_preset.h:19
#define LZMA_THREADS_MAX
Definition: common.h:56
@ LZMA_PROG_ERROR
Programming error.
Definition: base.h:218
@ LZMA_MEM_ERROR
Cannot allocate memory.
Definition: base.h:128
@ LZMA_OPTIONS_ERROR
Invalid or unsupported options.
Definition: base.h:160
@ LZMA_OK
Operation completed successfully.
Definition: base.h:58

References assert(), BLOCK_SIZE_MAX, filters, lzma_options_easy::filters, lzma_block_buffer_bound64(), lzma_easy_preset(), LZMA_MEM_ERROR, lzma_mt_block_size(), LZMA_OK, LZMA_OPTIONS_ERROR, LZMA_PROG_ERROR, LZMA_THREADS_MAX, NULL, and options.

Referenced by LZMA_API(), and stream_encoder_mt_init().

◆ get_progress()

static void get_progress ( void *  coder_ptr,
uint64_t progress_in,
uint64_t progress_out 
)
static

Definition at line 916 of file stream_encoder_mt.c.

917 {
918  lzma_stream_coder *coder = coder_ptr;
919 
920  // Lock coder->mutex to prevent finishing threads from moving their
921  // progress info from the worker_thread structure to lzma_stream_coder.
922  mythread_sync(coder->mutex) {
923  *progress_in = coder->progress_in;
924  *progress_out = coder->progress_out;
925 
926  for (size_t i = 0; i < coder->threads_initialized; ++i) {
927  mythread_sync(coder->threads[i].mutex) {
928  *progress_in += coder->threads[i].progress_in;
929  *progress_out += coder->threads[i]
930  .progress_out;
931  }
932  }
933  }
934 
935  return;
936 }
lzma_index ** i
Definition: index.h:629
uint64_t * progress_in
Definition: base.h:599

References i, and progress_in.

Referenced by stream_encoder_mt_init().

◆ get_thread()

static lzma_ret get_thread ( lzma_stream_coder coder,
const lzma_allocator allocator 
)
static

Definition at line 515 of file stream_encoder_mt.c.

516 {
517  // If there are no free output subqueues, there is no
518  // point to try getting a thread.
519  if (!lzma_outq_has_buf(&coder->outq))
520  return LZMA_OK;
521 
522  // If there is a free structure on the stack, use it.
523  mythread_sync(coder->mutex) {
524  if (coder->threads_free != NULL) {
525  coder->thr = coder->threads_free;
526  coder->threads_free = coder->threads_free->next;
527  }
528  }
529 
530  if (coder->thr == NULL) {
531  // If there are no uninitialized structures left, return.
532  if (coder->threads_initialized == coder->threads_max)
533  return LZMA_OK;
534 
535  // Initialize a new thread.
537  }
538 
539  // Reset the parts of the thread state that have to be done
540  // in the main thread.
541  mythread_sync(coder->thr->mutex) {
542  coder->thr->state = THR_RUN;
543  coder->thr->in_size = 0;
544  coder->thr->outbuf = lzma_outq_get_buf(&coder->outq);
545  mythread_cond_signal(&coder->thr->cond);
546  }
547 
548  return LZMA_OK;
549 }
const lzma_allocator * allocator
Definition: block.h:377
lzma_outbuf * lzma_outq_get_buf(lzma_outq *outq)
Get a new buffer.
Definition: outqueue.c:114
static bool lzma_outq_has_buf(const lzma_outq *outq)
Test if there is at least one buffer free.
Definition: outqueue.h:145
static lzma_ret initialize_new_thread(lzma_stream_coder *coder, const lzma_allocator *allocator)
Initialize a new worker_thread structure and create a new thread.
#define return_if_error(expr)
Return if expression doesn't evaluate to LZMA_OK.
Definition: common.h:278

References allocator, initialize_new_thread(), LZMA_OK, lzma_outq_get_buf(), lzma_outq_has_buf(), NULL, return_if_error, and THR_RUN.

Referenced by stream_encode_in().

◆ initialize_new_thread()

static lzma_ret initialize_new_thread ( lzma_stream_coder coder,
const lzma_allocator allocator 
)
static

Initialize a new worker_thread structure and create a new thread.

Definition at line 472 of file stream_encoder_mt.c.

474 {
475  worker_thread *thr = &coder->threads[coder->threads_initialized];
476 
477  thr->in = lzma_alloc(coder->block_size, allocator);
478  if (thr->in == NULL)
479  return LZMA_MEM_ERROR;
480 
481  if (mythread_mutex_init(&thr->mutex))
482  goto error_mutex;
483 
484  if (mythread_cond_init(&thr->cond))
485  goto error_cond;
486 
487  thr->state = THR_IDLE;
488  thr->allocator = allocator;
489  thr->coder = coder;
490  thr->progress_in = 0;
491  thr->progress_out = 0;
493 
494  if (mythread_create(&thr->thread_id, &worker_start, thr))
495  goto error_thread;
496 
497  ++coder->threads_initialized;
498  coder->thr = thr;
499 
500  return LZMA_OK;
501 
502 error_thread:
503  mythread_cond_destroy(&thr->cond);
504 
505 error_cond:
506  mythread_mutex_destroy(&thr->mutex);
507 
508 error_mutex:
509  lzma_free(thr->in, allocator);
510  return LZMA_MEM_ERROR;
511 }
static MYTHREAD_RET_TYPE worker_start(void *thr_ptr)
uint64_t progress_in
Amount of uncompressed data that has already been compressed.
lzma_stream_coder * coder
lzma_next_coder block_encoder
Block encoder.
mythread_mutex mutex
const lzma_allocator * allocator
uint64_t progress_out
Amount of compressed data that is ready.
worker_state state
mythread_cond cond
#define LZMA_NEXT_CODER_INIT
Macro to initialize lzma_next_coder structure.
Definition: common.h:180
void * lzma_alloc(size_t size, const lzma_allocator *allocator) lzma_attribute((__malloc__)) lzma_attr_alloc_size(1)
Allocates memory.
void lzma_free(void *ptr, const lzma_allocator *allocator)
Frees memory.
Definition: common.c:78

References allocator, worker_thread_s::allocator, worker_thread_s::block_encoder, worker_thread_s::coder, worker_thread_s::cond, worker_thread_s::in, lzma_alloc(), lzma_free(), LZMA_MEM_ERROR, LZMA_NEXT_CODER_INIT, LZMA_OK, worker_thread_s::mutex, NULL, worker_thread_s::progress_in, worker_thread_s::progress_out, worker_thread_s::state, THR_IDLE, worker_thread_s::thread_id, and worker_start().

Referenced by get_thread().

◆ LZMA_API() [1/2]

LZMA_API ( lzma_ret  )

Definition at line 1078 of file stream_encoder_mt.c.

1080 {
1082 
1084 // strm->internal->supported_actions[LZMA_SYNC_FLUSH] = true;
1088 
1089  return LZMA_OK;
1090 }
static lzma_stream strm
Definition: full_flush.c:20
static lzma_ret stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator, const lzma_mt *options)
bool supported_actions[LZMA_ACTION_MAX+1]
Indicates which lzma_action values are allowed by next.code.
Definition: common.h:220
lzma_internal * internal
Definition: base.h:505
#define lzma_next_strm_init(func, strm,...)
Definition: common.h:303
@ LZMA_FINISH
Finish the coding operation.
Definition: base.h:328
@ LZMA_RUN
Continue coding.
Definition: base.h:251
@ LZMA_FULL_FLUSH
Finish encoding of the current Block.
Definition: base.h:290
@ LZMA_FULL_BARRIER
Finish encoding of the current Block.
Definition: base.h:305

References lzma_stream::internal, LZMA_FINISH, LZMA_FULL_BARRIER, LZMA_FULL_FLUSH, lzma_next_strm_init, LZMA_OK, LZMA_RUN, options, stream_encoder_mt_init(), strm, and lzma_internal_s::supported_actions.

◆ LZMA_API() [2/2]

LZMA_API ( uint64_t  )

Calculate approximate memory usage of easy encoder.

Get the total amount of physical memory (RAM) in bytes.

Calculate approximate memory usage of multithreaded .xz encoder.

Calculate approximate decoder memory usage of a preset.

This function is a wrapper for lzma_raw_encoder_memusage().

Parameters
presetCompression preset (level and possible flags)
Returns
Number of bytes of memory required for the given preset when encoding. If an error occurs, for example due to unsupported preset, UINT64_MAX is returned.

This function is a wrapper for lzma_raw_decoder_memusage().

Parameters
presetCompression preset (level and possible flags)
Returns
Number of bytes of memory required to decompress a file that was compressed using the given preset. If an error occurs, for example due to unsupported preset, UINT64_MAX is returned.

Since doing the encoding in threaded mode doesn't affect the memory requirements of single-threaded decompressor, you can use lzma_easy_decoder_memusage(options->preset) or lzma_raw_decoder_memusage(options->filters) to calculate the decompressor memory requirements.

Parameters
optionsCompression options
Returns
Number of bytes of memory required for encoding with the given options. If an error occurs, for example due to unsupported preset or filter chain, UINT64_MAX is returned.

Calculate approximate memory usage of easy encoder.

Get the uncompressed size of the file.

Get the total size of the file.

Get the total size of the Blocks.

Get the total size of the Stream.

Get the size of the Index field as bytes.

Get the number of Blocks.

Get the number of Streams.

Calculate the memory usage of an existing lzma_index.

On disk, the size of the Index field depends on both the number of Records stored and how big values the Records store (due to variable-length integer encoding). When the Index is kept in lzma_index structure, the memory usage depends only on the number of Records/Blocks stored in the Index(es), and in case of concatenated lzma_indexes, the number of Streams. The size in RAM is almost always significantly bigger than in the encoded form on disk.

This function calculates an approximate amount of memory needed hold the given number of Streams and Blocks in lzma_index structure. This value may vary between CPU architectures and also between liblzma versions if the internal implementation is modified.

This is a shorthand for lzma_index_memusage(lzma_index_stream_count(i), lzma_index_block_count(i)).

This returns the total number of Blocks in lzma_index. To get number of Blocks in individual Streams, use lzma_index_iter.

This is needed to verify the Backward Size field in the Stream Footer.

If multiple lzma_indexes have been combined, this works as if the Blocks were in a single Stream. This is useful if you are going to combine Blocks from multiple Streams into a single new Stream.

This doesn't include the Stream Header, Stream Footer, Stream Padding, or Index fields.

When no lzma_indexes have been combined with lzma_index_cat() and there is no Stream Padding, this function is identical to lzma_index_stream_size(). If multiple lzma_indexes have been combined, this includes also the headers of each separate Stream and the possible Stream Padding fields.

Definition at line 1096 of file stream_encoder_mt.c.

1098 {
1099  lzma_options_easy easy;
1100  const lzma_filter *filters;
1101  uint64_t block_size;
1102  uint64_t outbuf_size_max;
1103 
1104  if (get_options(options, &easy, &filters, &block_size,
1105  &outbuf_size_max) != LZMA_OK)
1106  return UINT64_MAX;
1107 
1108  // Memory usage of the input buffers
1109  const uint64_t inbuf_memusage = options->threads * block_size;
1110 
1111  // Memory usage of the filter encoders
1112  uint64_t filters_memusage = lzma_raw_encoder_memusage(filters);
1113  if (filters_memusage == UINT64_MAX)
1114  return UINT64_MAX;
1115 
1116  filters_memusage *= options->threads;
1117 
1118  // Memory usage of the output queue
1119  const uint64_t outq_memusage = lzma_outq_memusage(
1120  outbuf_size_max, options->threads);
1121  if (outq_memusage == UINT64_MAX)
1122  return UINT64_MAX;
1123 
1124  // Sum them with overflow checking.
1125  uint64_t total_memusage = LZMA_MEMUSAGE_BASE
1126  + sizeof(lzma_stream_coder)
1127  + options->threads * sizeof(worker_thread);
1128 
1129  if (UINT64_MAX - total_memusage < inbuf_memusage)
1130  return UINT64_MAX;
1131 
1132  total_memusage += inbuf_memusage;
1133 
1134  if (UINT64_MAX - total_memusage < filters_memusage)
1135  return UINT64_MAX;
1136 
1137  total_memusage += filters_memusage;
1138 
1139  if (UINT64_MAX - total_memusage < outq_memusage)
1140  return UINT64_MAX;
1141 
1142  return total_memusage + outq_memusage;
1143 }
uint64_t lzma_outq_memusage(uint64_t buf_size_max, uint32_t threads)
Calculate the memory usage of an output queue.
Definition: outqueue.c:42
unsigned long uint64_t
Definition: sftypes.h:28
#define UINT64_MAX
struct lzma_stream_coder_s lzma_stream_coder
static lzma_ret get_options(const lzma_mt *options, lzma_options_easy *opt_easy, const lzma_filter **filters, uint64_t *block_size, uint64_t *outbuf_size_max)
Filter options.
Definition: filter.h:43
#define LZMA_MEMUSAGE_BASE
Definition: common.h:63

References A, A1, aligned_read32ne(), B, bswap64, C, D, filters, get_options(), limit, lzma_crc64_table, LZMA_MEMUSAGE_BASE, LZMA_OK, lzma_outq_memusage(), options, S32, S8, autogen_x86imm::tmp, and UINT64_MAX.

◆ stream_encode_in()

static lzma_ret stream_encode_in ( lzma_stream_coder coder,
const lzma_allocator allocator,
const uint8_t *restrict  in,
size_t *restrict  in_pos,
size_t  in_size,
lzma_action  action 
)
static

Definition at line 553 of file stream_encoder_mt.c.

556 {
557  while (*in_pos < in_size
558  || (coder->thr != NULL && action != LZMA_RUN)) {
559  if (coder->thr == NULL) {
560  // Get a new thread.
561  const lzma_ret ret = get_thread(coder, allocator);
562  if (coder->thr == NULL)
563  return ret;
564  }
565 
566  // Copy the input data to thread's buffer.
567  size_t thr_in_size = coder->thr->in_size;
568  lzma_bufcpy(in, in_pos, in_size, coder->thr->in,
569  &thr_in_size, coder->block_size);
570 
571  // Tell the Block encoder to finish if
572  // - it has got block_size bytes of input; or
573  // - all input was used and LZMA_FINISH, LZMA_FULL_FLUSH,
574  // or LZMA_FULL_BARRIER was used.
575  //
576  // TODO: LZMA_SYNC_FLUSH and LZMA_SYNC_BARRIER.
577  const bool finish = thr_in_size == coder->block_size
578  || (*in_pos == in_size && action != LZMA_RUN);
579 
580  bool block_error = false;
581 
582  mythread_sync(coder->thr->mutex) {
583  if (coder->thr->state == THR_IDLE) {
584  // Something has gone wrong with the Block
585  // encoder. It has set coder->thread_error
586  // which we will read a few lines later.
587  block_error = true;
588  } else {
589  // Tell the Block encoder its new amount
590  // of input and update the state if needed.
591  coder->thr->in_size = thr_in_size;
592 
593  if (finish)
594  coder->thr->state = THR_FINISH;
595 
596  mythread_cond_signal(&coder->thr->cond);
597  }
598  }
599 
600  if (block_error) {
601  lzma_ret ret;
602 
603  mythread_sync(coder->mutex) {
604  ret = coder->thread_error;
605  }
606 
607  return ret;
608  }
609 
610  if (finish)
611  coder->thr = NULL;
612  }
613 
614  return LZMA_OK;
615 }
static bool finish(void *user)
Definition: analysis_pyc.c:133
const lzma_allocator const uint8_t size_t * in_pos
Definition: block.h:579
const lzma_allocator const uint8_t size_t in_size
Definition: block.h:527
const lzma_allocator const uint8_t * in
Definition: block.h:527
static lzma_ret get_thread(lzma_stream_coder *coder, const lzma_allocator *allocator)
lzma_ret
Return values used by several functions in liblzma.
Definition: base.h:57
size_t lzma_bufcpy(const uint8_t *restrict in, size_t *restrict in_pos, size_t in_size, uint8_t *restrict out, size_t *restrict out_pos, size_t out_size)
Definition: common.c:94

References test-lz4-speed::action, allocator, finish(), get_thread(), in, in_pos, in_size, lzma_bufcpy(), LZMA_OK, LZMA_RUN, NULL, THR_FINISH, and THR_IDLE.

Referenced by stream_encode_mt().

◆ stream_encode_mt()

static lzma_ret stream_encode_mt ( void *  coder_ptr,
const lzma_allocator allocator,
const uint8_t *restrict  in,
size_t *restrict  in_pos,
size_t  in_size,
uint8_t *restrict  out,
size_t *restrict  out_pos,
size_t  out_size,
lzma_action  action 
)
static

Definition at line 667 of file stream_encoder_mt.c.

671 {
672  lzma_stream_coder *coder = coder_ptr;
673 
674  switch (coder->sequence) {
675  case SEQ_STREAM_HEADER:
676  lzma_bufcpy(coder->header, &coder->header_pos,
677  sizeof(coder->header),
678  out, out_pos, out_size);
679  if (coder->header_pos < sizeof(coder->header))
680  return LZMA_OK;
681 
682  coder->header_pos = 0;
683  coder->sequence = SEQ_BLOCK;
684 
685  // Fall through
686 
687  case SEQ_BLOCK: {
688  // Initialized to silence warnings.
691  lzma_ret ret = LZMA_OK;
692 
693  // These are for wait_for_work().
694  bool has_blocked = false;
695  mythread_condtime wait_abs;
696 
697  while (true) {
698  mythread_sync(coder->mutex) {
699  // Check for Block encoder errors.
700  ret = coder->thread_error;
701  if (ret != LZMA_OK) {
702  assert(ret != LZMA_STREAM_END);
703  break; // Break out of mythread_sync.
704  }
705 
706  // Try to read compressed data to out[].
707  ret = lzma_outq_read(&coder->outq,
708  out, out_pos, out_size,
709  &unpadded_size,
711  }
712 
713  if (ret == LZMA_STREAM_END) {
714  // End of Block. Add it to the Index.
715  ret = lzma_index_append(coder->index,
718 
719  // If we didn't fill the output buffer yet,
720  // try to read more data. Maybe the next
721  // outbuf has been finished already too.
722  if (*out_pos < out_size)
723  continue;
724  }
725 
726  if (ret != LZMA_OK) {
727  // coder->thread_error was set or
728  // lzma_index_append() failed.
729  threads_stop(coder, false);
730  return ret;
731  }
732 
733  // Try to give uncompressed data to a worker thread.
734  ret = stream_encode_in(coder, allocator,
735  in, in_pos, in_size, action);
736  if (ret != LZMA_OK) {
737  threads_stop(coder, false);
738  return ret;
739  }
740 
741  // See if we should wait or return.
742  //
743  // TODO: LZMA_SYNC_FLUSH and LZMA_SYNC_BARRIER.
744  if (*in_pos == in_size) {
745  // LZMA_RUN: More data is probably coming
746  // so return to let the caller fill the
747  // input buffer.
748  if (action == LZMA_RUN)
749  return LZMA_OK;
750 
751  // LZMA_FULL_BARRIER: The same as with
752  // LZMA_RUN but tell the caller that the
753  // barrier was completed.
754  if (action == LZMA_FULL_BARRIER)
755  return LZMA_STREAM_END;
756 
757  // Finishing or flushing isn't completed until
758  // all input data has been encoded and copied
759  // to the output buffer.
760  if (lzma_outq_is_empty(&coder->outq)) {
761  // LZMA_FINISH: Continue to encode
762  // the Index field.
763  if (action == LZMA_FINISH)
764  break;
765 
766  // LZMA_FULL_FLUSH: Return to tell
767  // the caller that flushing was
768  // completed.
769  if (action == LZMA_FULL_FLUSH)
770  return LZMA_STREAM_END;
771  }
772  }
773 
774  // Return if there is no output space left.
775  // This check must be done after testing the input
776  // buffer, because we might want to use a different
777  // return code.
778  if (*out_pos == out_size)
779  return LZMA_OK;
780 
781  // Neither in nor out has been used completely.
782  // Wait until there's something we can do.
783  if (wait_for_work(coder, &wait_abs, &has_blocked,
784  *in_pos < in_size))
785  return LZMA_TIMED_OUT;
786  }
787 
788  // All Blocks have been encoded and the threads have stopped.
789  // Prepare to encode the Index field.
791  &coder->index_encoder, allocator,
792  coder->index));
793  coder->sequence = SEQ_INDEX;
794 
795  // Update the progress info to take the Index and
796  // Stream Footer into account. Those are very fast to encode
797  // so in terms of progress information they can be thought
798  // to be ready to be copied out.
799  coder->progress_out += lzma_index_size(coder->index)
801  }
802 
803  // Fall through
804 
805  case SEQ_INDEX: {
806  // Call the Index encoder. It doesn't take any input, so
807  // those pointers can be NULL.
808  const lzma_ret ret = coder->index_encoder.code(
809  coder->index_encoder.coder, allocator,
810  NULL, NULL, 0,
811  out, out_pos, out_size, LZMA_RUN);
812  if (ret != LZMA_STREAM_END)
813  return ret;
814 
815  // Encode the Stream Footer into coder->buffer.
817  = lzma_index_size(coder->index);
818  if (lzma_stream_footer_encode(&coder->stream_flags,
819  coder->header) != LZMA_OK)
820  return LZMA_PROG_ERROR;
821 
822  coder->sequence = SEQ_STREAM_FOOTER;
823  }
824 
825  // Fall through
826 
827  case SEQ_STREAM_FOOTER:
828  lzma_bufcpy(coder->header, &coder->header_pos,
829  sizeof(coder->header),
830  out, out_pos, out_size);
831  return coder->header_pos < sizeof(coder->header)
833  }
834 
835  assert(0);
836  return LZMA_PROG_ERROR;
837 }
const lzma_allocator lzma_vli unpadded_size
Definition: index.h:345
const lzma_allocator const uint8_t size_t uint8_t size_t * out_pos
Definition: block.h:528
const lzma_allocator const uint8_t size_t uint8_t * out
Definition: block.h:528
lzma_ret lzma_index_encoder_init(lzma_next_coder *next, const lzma_allocator *allocator, const lzma_index *i)
lzma_ret lzma_outq_read(lzma_outq *restrict outq, uint8_t *restrict out, size_t *restrict out_pos, size_t out_size, lzma_vli *restrict unpadded_size, lzma_vli *restrict uncompressed_size)
Read finished data.
Definition: outqueue.c:147
static bool lzma_outq_is_empty(const lzma_outq *outq)
Test if the queue is completely empty.
Definition: outqueue.h:153
static void threads_stop(lzma_stream_coder *coder, bool wait_for_threads)
Make the threads stop but not exit. Optionally wait for them to stop.
static bool wait_for_work(lzma_stream_coder *coder, mythread_condtime *wait_abs, bool *has_blocked, bool has_input)
static lzma_ret stream_encode_in(lzma_stream_coder *coder, const lzma_allocator *allocator, const uint8_t *restrict in, size_t *restrict in_pos, size_t in_size, lzma_action action)
#define LZMA_STREAM_HEADER_SIZE
Size of Stream Header and Stream Footer.
Definition: stream_flags.h:27
lzma_code_function code
Pointer to function to do the actual coding.
Definition: common.h:150
void * coder
Pointer to coder-specific data.
Definition: common.h:137
lzma_next_coder index_encoder
lzma_index * index
Index to hold sizes of the Blocks.
enum lzma_stream_coder::@652 sequence
lzma_stream_flags stream_flags
Stream Flags from Stream Header.
lzma_vli backward_size
Backward Size.
Definition: stream_flags.h:69
#define LZMA_TIMED_OUT
Definition: common.h:88
uint64_t uncompressed_size
Definition: list.c:106
uint64_t lzma_vli
Variable-length integer type.
Definition: vli.h:63
@ LZMA_STREAM_END
End of stream was reached.
Definition: base.h:63

References test-lz4-speed::action, allocator, assert(), lzma_stream_flags::backward_size, lzma_next_coder_s::code, lzma_next_coder_s::coder, in, in_pos, in_size, lzma_stream_coder::index, lzma_stream_coder::index_encoder, lzma_bufcpy(), LZMA_FINISH, LZMA_FULL_BARRIER, LZMA_FULL_FLUSH, lzma_index_encoder_init(), LZMA_OK, lzma_outq_is_empty(), lzma_outq_read(), LZMA_PROG_ERROR, LZMA_RUN, LZMA_STREAM_END, LZMA_STREAM_HEADER_SIZE, LZMA_TIMED_OUT, NULL, out, out_pos, return_if_error, lzma_stream_coder::sequence, stream_encode_in(), lzma_stream_coder::stream_flags, threads_stop(), uncompressed_size, unpadded_size, and wait_for_work().

Referenced by stream_encoder_mt_init().

◆ stream_encoder_mt_end()

static void stream_encoder_mt_end ( void *  coder_ptr,
const lzma_allocator allocator 
)
static

Definition at line 841 of file stream_encoder_mt.c.

842 {
843  lzma_stream_coder *coder = coder_ptr;
844 
845  // Threads must be killed before the output queue can be freed.
846  threads_end(coder, allocator);
847  lzma_outq_end(&coder->outq, allocator);
848 
849  for (size_t i = 0; coder->filters[i].id != LZMA_VLI_UNKNOWN; ++i)
850  lzma_free(coder->filters[i].options, allocator);
851 
853  lzma_index_end(coder->index, allocator);
854 
855  mythread_cond_destroy(&coder->cond);
856  mythread_mutex_destroy(&coder->mutex);
857 
858  lzma_free(coder, allocator);
859  return;
860 }
void lzma_outq_end(lzma_outq *outq, const lzma_allocator *allocator)
Free the memory associated with the output queue.
Definition: outqueue.c:101
static void threads_end(lzma_stream_coder *coder, const lzma_allocator *allocator)
void * options
Pointer to filter-specific options structure.
Definition: filter.h:63
lzma_vli id
Filter ID.
Definition: filter.h:54
lzma_filter filters[LZMA_FILTERS_MAX+1]
The filter chain currently in use.
#define LZMA_VLI_UNKNOWN
VLI value to denote that the value is unknown.
Definition: vli.h:39
void lzma_next_end(lzma_next_coder *next, const lzma_allocator *allocator)
Definition: common.c:145

References allocator, lzma_stream_coder::filters, i, lzma_filter::id, lzma_stream_coder::index, lzma_stream_coder::index_encoder, lzma_free(), lzma_next_end(), lzma_outq_end(), LZMA_VLI_UNKNOWN, lzma_filter::options, and threads_end().

Referenced by stream_encoder_mt_init().

◆ stream_encoder_mt_init()

static lzma_ret stream_encoder_mt_init ( lzma_next_coder next,
const lzma_allocator allocator,
const lzma_mt options 
)
static

Definition at line 940 of file stream_encoder_mt.c.

942 {
944 
945  // Get the filter chain.
946  lzma_options_easy easy;
947  const lzma_filter *filters;
948  uint64_t block_size;
949  uint64_t outbuf_size_max;
951  &block_size, &outbuf_size_max));
952 
953 #if SIZE_MAX < UINT64_MAX
954  if (block_size > SIZE_MAX)
955  return LZMA_MEM_ERROR;
956 #endif
957 
958  // Validate the filter chain so that we can give an error in this
959  // function instead of delaying it to the first call to lzma_code().
960  // The memory usage calculation verifies the filter chain as
961  // a side effect so we take advantage of that.
962  if (lzma_raw_encoder_memusage(filters) == UINT64_MAX)
963  return LZMA_OPTIONS_ERROR;
964 
965  // Validate the Check ID.
966  if ((unsigned int)(options->check) > LZMA_CHECK_ID_MAX)
967  return LZMA_PROG_ERROR;
968 
969  if (!lzma_check_is_supported(options->check))
970  return LZMA_UNSUPPORTED_CHECK;
971 
972  // Allocate and initialize the base structure if needed.
973  lzma_stream_coder *coder = next->coder;
974  if (coder == NULL) {
975  coder = lzma_alloc(sizeof(lzma_stream_coder), allocator);
976  if (coder == NULL)
977  return LZMA_MEM_ERROR;
978 
979  next->coder = coder;
980 
981  // For the mutex and condition variable initializations
982  // the error handling has to be done here because
983  // stream_encoder_mt_end() doesn't know if they have
984  // already been initialized or not.
985  if (mythread_mutex_init(&coder->mutex)) {
986  lzma_free(coder, allocator);
987  next->coder = NULL;
988  return LZMA_MEM_ERROR;
989  }
990 
991  if (mythread_cond_init(&coder->cond)) {
992  mythread_mutex_destroy(&coder->mutex);
993  lzma_free(coder, allocator);
994  next->coder = NULL;
995  return LZMA_MEM_ERROR;
996  }
997 
998  next->code = &stream_encode_mt;
999  next->end = &stream_encoder_mt_end;
1000  next->get_progress = &get_progress;
1001 // next->update = &stream_encoder_mt_update;
1002 
1003  coder->filters[0].id = LZMA_VLI_UNKNOWN;
1005  coder->index = NULL;
1006  memzero(&coder->outq, sizeof(coder->outq));
1007  coder->threads = NULL;
1008  coder->threads_max = 0;
1009  coder->threads_initialized = 0;
1010  }
1011 
1012  // Basic initializations
1013  coder->sequence = SEQ_STREAM_HEADER;
1014  coder->block_size = (size_t)(block_size);
1015  coder->thread_error = LZMA_OK;
1016  coder->thr = NULL;
1017 
1018  // Allocate the thread-specific base structures.
1019  assert(options->threads > 0);
1020  if (coder->threads_max != options->threads) {
1021  threads_end(coder, allocator);
1022 
1023  coder->threads = NULL;
1024  coder->threads_max = 0;
1025 
1026  coder->threads_initialized = 0;
1027  coder->threads_free = NULL;
1028 
1029  coder->threads = lzma_alloc(
1030  options->threads * sizeof(worker_thread),
1031  allocator);
1032  if (coder->threads == NULL)
1033  return LZMA_MEM_ERROR;
1034 
1035  coder->threads_max = options->threads;
1036  } else {
1037  // Reuse the old structures and threads. Tell the running
1038  // threads to stop and wait until they have stopped.
1039  threads_stop(coder, true);
1040  }
1041 
1042  // Output queue
1044  outbuf_size_max, options->threads));
1045 
1046  // Timeout
1047  coder->timeout = options->timeout;
1048 
1049  // Free the old filter chain and copy the new one.
1050  for (size_t i = 0; coder->filters[i].id != LZMA_VLI_UNKNOWN; ++i)
1051  lzma_free(coder->filters[i].options, allocator);
1052 
1053  return_if_error(lzma_filters_copy(
1054  filters, coder->filters, allocator));
1055 
1056  // Index
1057  lzma_index_end(coder->index, allocator);
1058  coder->index = lzma_index_init(allocator);
1059  if (coder->index == NULL)
1060  return LZMA_MEM_ERROR;
1061 
1062  // Stream Header
1063  coder->stream_flags.version = 0;
1064  coder->stream_flags.check = options->check;
1065  return_if_error(lzma_stream_header_encode(
1066  &coder->stream_flags, coder->header));
1067 
1068  coder->header_pos = 0;
1069 
1070  // Progress info
1071  coder->progress_in = 0;
1072  coder->progress_out = LZMA_STREAM_HEADER_SIZE;
1073 
1074  return LZMA_OK;
1075 }
#define LZMA_CHECK_ID_MAX
Maximum valid Check ID.
Definition: check.h:68
lzma_ret lzma_outq_init(lzma_outq *outq, const lzma_allocator *allocator, uint64_t buf_size_max, uint32_t threads)
Initialize an output queue.
Definition: outqueue.c:57
int size_t
Definition: sftypes.h:40
#define SIZE_MAX
static lzma_ret stream_encode_mt(void *coder_ptr, const lzma_allocator *allocator, const uint8_t *restrict in, size_t *restrict in_pos, size_t in_size, uint8_t *restrict out, size_t *restrict out_pos, size_t out_size, lzma_action action)
static void stream_encoder_mt_end(void *coder_ptr, const lzma_allocator *allocator)
static void get_progress(void *coder_ptr, uint64_t *progress_in, uint64_t *progress_out)
void(* get_progress)(void *coder, uint64_t *progress_in, uint64_t *progress_out)
Definition: common.h:159
lzma_end_function end
Definition: common.h:155
uint32_t version
Stream Flags format version.
Definition: stream_flags.h:51
lzma_check check
Check ID.
Definition: stream_flags.h:79
#define lzma_next_coder_init(func, next, allocator)
Definition: common.h:291
#define memzero(s, n)
Definition: sysdefs.h:180
@ LZMA_UNSUPPORTED_CHECK
Cannot calculate the integrity check.
Definition: base.h:90

References allocator, assert(), lzma_stream_flags::check, lzma_next_coder_s::code, lzma_next_coder_s::coder, lzma_next_coder_s::end, filters, lzma_stream_coder::filters, get_options(), get_progress(), lzma_next_coder_s::get_progress, i, lzma_filter::id, lzma_stream_coder::index, lzma_stream_coder::index_encoder, lzma_alloc(), LZMA_CHECK_ID_MAX, lzma_free(), LZMA_MEM_ERROR, LZMA_NEXT_CODER_INIT, lzma_next_coder_init, LZMA_OK, LZMA_OPTIONS_ERROR, lzma_outq_init(), LZMA_PROG_ERROR, LZMA_STREAM_HEADER_SIZE, LZMA_UNSUPPORTED_CHECK, LZMA_VLI_UNKNOWN, memzero, NULL, options, lzma_filter::options, return_if_error, lzma_stream_coder::sequence, SIZE_MAX, stream_encode_mt(), stream_encoder_mt_end(), lzma_stream_coder::stream_flags, threads_end(), threads_stop(), UINT64_MAX, and lzma_stream_flags::version.

Referenced by LZMA_API().

◆ threads_end()

static void threads_end ( lzma_stream_coder coder,
const lzma_allocator allocator 
)
static

Stop the threads and free the resources associated with them. Wait until the threads have exited.

Definition at line 450 of file stream_encoder_mt.c.

451 {
452  for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
453  mythread_sync(coder->threads[i].mutex) {
454  coder->threads[i].state = THR_EXIT;
455  mythread_cond_signal(&coder->threads[i].cond);
456  }
457  }
458 
459  for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
460  int ret = mythread_join(coder->threads[i].thread_id);
461  assert(ret == 0);
462  (void)ret;
463  }
464 
465  lzma_free(coder->threads, allocator);
466  return;
467 }
unsigned int uint32_t
Definition: sftypes.h:29

References allocator, assert(), i, lzma_free(), and THR_EXIT.

Referenced by stream_encoder_mt_end(), and stream_encoder_mt_init().

◆ threads_stop()

static void threads_stop ( lzma_stream_coder coder,
bool  wait_for_threads 
)
static

Make the threads stop but not exit. Optionally wait for them to stop.

Definition at line 421 of file stream_encoder_mt.c.

422 {
423  // Tell the threads to stop.
424  for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
425  mythread_sync(coder->threads[i].mutex) {
426  coder->threads[i].state = THR_STOP;
427  mythread_cond_signal(&coder->threads[i].cond);
428  }
429  }
430 
431  if (!wait_for_threads)
432  return;
433 
434  // Wait for the threads to settle in the idle state.
435  for (uint32_t i = 0; i < coder->threads_initialized; ++i) {
436  mythread_sync(coder->threads[i].mutex) {
437  while (coder->threads[i].state != THR_IDLE)
438  mythread_cond_wait(&coder->threads[i].cond,
439  &coder->threads[i].mutex);
440  }
441  }
442 
443  return;
444 }

References i, THR_IDLE, and THR_STOP.

Referenced by stream_encode_mt(), and stream_encoder_mt_init().

◆ wait_for_work()

static bool wait_for_work ( lzma_stream_coder coder,
mythread_condtime *  wait_abs,
bool has_blocked,
bool  has_input 
)
static

Wait until more input can be consumed, more output can be read, or an optional timeout is reached.

Definition at line 621 of file stream_encoder_mt.c.

623 {
624  if (coder->timeout != 0 && !*has_blocked) {
625  // Every time when stream_encode_mt() is called via
626  // lzma_code(), *has_blocked starts as false. We set it
627  // to true here and calculate the absolute time when
628  // we must return if there's nothing to do.
629  //
630  // The idea of *has_blocked is to avoid unneeded calls
631  // to mythread_condtime_set(), which may do a syscall
632  // depending on the operating system.
633  *has_blocked = true;
634  mythread_condtime_set(wait_abs, &coder->cond, coder->timeout);
635  }
636 
637  bool timed_out = false;
638 
639  mythread_sync(coder->mutex) {
640  // There are four things that we wait. If one of them
641  // becomes possible, we return.
642  // - If there is input left, we need to get a free
643  // worker thread and an output buffer for it.
644  // - Data ready to be read from the output queue.
645  // - A worker thread indicates an error.
646  // - Time out occurs.
647  while ((!has_input || coder->threads_free == NULL
648  || !lzma_outq_has_buf(&coder->outq))
649  && !lzma_outq_is_readable(&coder->outq)
650  && coder->thread_error == LZMA_OK
651  && !timed_out) {
652  if (coder->timeout != 0)
653  timed_out = mythread_cond_timedwait(
654  &coder->cond, &coder->mutex,
655  wait_abs) != 0;
656  else
657  mythread_cond_wait(&coder->cond,
658  &coder->mutex);
659  }
660  }
661 
662  return timed_out;
663 }
bool lzma_outq_is_readable(const lzma_outq *outq)
Test if there is data ready to be read.
Definition: outqueue.c:136

References LZMA_OK, lzma_outq_has_buf(), lzma_outq_is_readable(), and NULL.

Referenced by stream_encode_mt().

◆ worker_encode()

static worker_state worker_encode ( worker_thread thr,
worker_state  state 
)
static

Definition at line 199 of file stream_encoder_mt.c.

200 {
201  assert(thr->progress_in == 0);
202  assert(thr->progress_out == 0);
203 
204  // Set the Block options.
205  thr->block_options = (lzma_block){
206  .version = 0,
207  .check = thr->coder->stream_flags.check,
208  .compressed_size = thr->coder->outq.buf_size_max,
209  .uncompressed_size = thr->coder->block_size,
210 
211  // TODO: To allow changing the filter chain, the filters
212  // array must be copied to each worker_thread.
213  .filters = thr->coder->filters,
214  };
215 
216  // Calculate maximum size of the Block Header. This amount is
217  // reserved in the beginning of the buffer so that Block Header
218  // along with Compressed Size and Uncompressed Size can be
219  // written there.
220  lzma_ret ret = lzma_block_header_size(&thr->block_options);
221  if (ret != LZMA_OK) {
222  worker_error(thr, ret);
223  return THR_STOP;
224  }
225 
226  // Initialize the Block encoder.
228  thr->allocator, &thr->block_options);
229  if (ret != LZMA_OK) {
230  worker_error(thr, ret);
231  return THR_STOP;
232  }
233 
234  size_t in_pos = 0;
235  size_t in_size = 0;
236 
237  thr->outbuf->size = thr->block_options.header_size;
238  const size_t out_size = thr->coder->outq.buf_size_max;
239 
240  do {
241  mythread_sync(thr->mutex) {
242  // Store in_pos and out_pos into *thr so that
243  // an application may read them via
244  // lzma_get_progress() to get progress information.
245  //
246  // NOTE: These aren't updated when the encoding
247  // finishes. Instead, the final values are taken
248  // later from thr->outbuf.
249  thr->progress_in = in_pos;
250  thr->progress_out = thr->outbuf->size;
251 
252  while (in_size == thr->in_size
253  && thr->state == THR_RUN)
254  mythread_cond_wait(&thr->cond, &thr->mutex);
255 
256  state = thr->state;
257  in_size = thr->in_size;
258  }
259 
260  // Return if we were asked to stop or exit.
261  if (state >= THR_STOP)
262  return state;
263 
265  ? LZMA_FINISH : LZMA_RUN;
266 
267  // Limit the amount of input given to the Block encoder
268  // at once. This way this thread can react fairly quickly
269  // if the main thread wants us to stop or exit.
270  static const size_t in_chunk_max = 16384;
271  size_t in_limit = in_size;
272  if (in_size - in_pos > in_chunk_max) {
273  in_limit = in_pos + in_chunk_max;
274  action = LZMA_RUN;
275  }
276 
277  ret = thr->block_encoder.code(
278  thr->block_encoder.coder, thr->allocator,
279  thr->in, &in_pos, in_limit, thr->outbuf->buf,
280  &thr->outbuf->size, out_size, action);
281  } while (ret == LZMA_OK && thr->outbuf->size < out_size);
282 
283  switch (ret) {
284  case LZMA_STREAM_END:
285  assert(state == THR_FINISH);
286 
287  // Encode the Block Header. By doing it after
288  // the compression, we can store the Compressed Size
289  // and Uncompressed Size fields.
290  ret = lzma_block_header_encode(&thr->block_options,
291  thr->outbuf->buf);
292  if (ret != LZMA_OK) {
293  worker_error(thr, ret);
294  return THR_STOP;
295  }
296 
297  break;
298 
299  case LZMA_OK:
300  // The data was incompressible. Encode it using uncompressed
301  // LZMA2 chunks.
302  //
303  // First wait that we have gotten all the input.
304  mythread_sync(thr->mutex) {
305  while (thr->state == THR_RUN)
306  mythread_cond_wait(&thr->cond, &thr->mutex);
307 
308  state = thr->state;
309  in_size = thr->in_size;
310  }
311 
312  if (state >= THR_STOP)
313  return state;
314 
315  // Do the encoding. This takes care of the Block Header too.
316  thr->outbuf->size = 0;
317  ret = lzma_block_uncomp_encode(&thr->block_options,
318  thr->in, in_size, thr->outbuf->buf,
319  &thr->outbuf->size, out_size);
320 
321  // It shouldn't fail.
322  if (ret != LZMA_OK) {
324  return THR_STOP;
325  }
326 
327  break;
328 
329  default:
330  worker_error(thr, ret);
331  return THR_STOP;
332  }
333 
334  // Set the size information that will be read by the main thread
335  // to write the Index field.
336  thr->outbuf->unpadded_size
337  = lzma_block_unpadded_size(&thr->block_options);
338  assert(thr->outbuf->unpadded_size != 0);
340 
341  return THR_FINISH;
342 }
lzma_ret lzma_block_encoder_init(lzma_next_coder *next, const lzma_allocator *allocator, lzma_block *block)
static void worker_error(worker_thread *thr, lzma_ret ret)
Tell the main thread that something has gone wrong.
Options for the Block and Block Header encoders and decoders.
Definition: block.h:30
lzma_vli uncompressed_size
Uncompressed Size in bytes.
Definition: block.h:172
uint32_t header_size
Size of the Block Header field.
Definition: block.h:72
uint32_t version
Block format version.
Definition: block.h:52
size_t size
Amount of data written to buf.
Definition: outqueue.h:22
lzma_vli uncompressed_size
Definition: outqueue.h:26
uint8_t * buf
Pointer to the output buffer of lzma_outq.buf_size_max bytes.
Definition: outqueue.h:19
lzma_vli unpadded_size
Additional size information.
Definition: outqueue.h:25
Definition: dis.h:43
lzma_outbuf * outbuf
lzma_block block_options
Compression options for this Block.
lzma_action
The ‘action’ argument for lzma_code()
Definition: base.h:250

References test-lz4-speed::action, worker_thread_s::allocator, assert(), worker_thread_s::block_encoder, worker_thread_s::block_options, lzma_outbuf::buf, lzma_stream_flags::check, lzma_next_coder_s::code, lzma_next_coder_s::coder, worker_thread_s::coder, worker_thread_s::cond, lzma_stream_coder::filters, lzma_block::header_size, worker_thread_s::in, in_pos, in_size, worker_thread_s::in_size, lzma_block_encoder_init(), LZMA_FINISH, LZMA_OK, LZMA_PROG_ERROR, LZMA_RUN, LZMA_STREAM_END, worker_thread_s::mutex, worker_thread_s::outbuf, worker_thread_s::progress_in, worker_thread_s::progress_out, lzma_outbuf::size, worker_thread_s::state, lzma_stream_coder::stream_flags, THR_FINISH, THR_RUN, THR_STOP, lzma_block::uncompressed_size, lzma_outbuf::uncompressed_size, lzma_outbuf::unpadded_size, lzma_block::version, and worker_error().

Referenced by worker_start().

◆ worker_error()

static void worker_error ( worker_thread thr,
lzma_ret  ret 
)
static

Tell the main thread that something has gone wrong.

Definition at line 182 of file stream_encoder_mt.c.

183 {
184  assert(ret != LZMA_OK);
185  assert(ret != LZMA_STREAM_END);
186 
187  mythread_sync(thr->coder->mutex) {
188  if (thr->coder->thread_error == LZMA_OK)
189  thr->coder->thread_error = ret;
190 
191  mythread_cond_signal(&thr->coder->cond);
192  }
193 
194  return;
195 }

References assert(), worker_thread_s::coder, LZMA_OK, and LZMA_STREAM_END.

Referenced by worker_encode().

◆ worker_start()

static MYTHREAD_RET_TYPE worker_start ( void *  thr_ptr)
static

Definition at line 346 of file stream_encoder_mt.c.

347 {
348  worker_thread *thr = thr_ptr;
349  worker_state state = THR_IDLE; // Init to silence a warning
350 
351  while (true) {
352  // Wait for work.
353  mythread_sync(thr->mutex) {
354  while (true) {
355  // The thread is already idle so if we are
356  // requested to stop, just set the state.
357  if (thr->state == THR_STOP) {
358  thr->state = THR_IDLE;
359  mythread_cond_signal(&thr->cond);
360  }
361 
362  state = thr->state;
363  if (state != THR_IDLE)
364  break;
365 
366  mythread_cond_wait(&thr->cond, &thr->mutex);
367  }
368  }
369 
370  assert(state != THR_IDLE);
371  assert(state != THR_STOP);
372 
373  if (state <= THR_FINISH)
374  state = worker_encode(thr, state);
375 
376  if (state == THR_EXIT)
377  break;
378 
379  // Mark the thread as idle unless the main thread has
380  // told us to exit. Signal is needed for the case
381  // where the main thread is waiting for the threads to stop.
382  mythread_sync(thr->mutex) {
383  if (thr->state != THR_EXIT) {
384  thr->state = THR_IDLE;
385  mythread_cond_signal(&thr->cond);
386  }
387  }
388 
389  mythread_sync(thr->coder->mutex) {
390  // Mark the output buffer as finished if
391  // no errors occurred.
392  thr->outbuf->finished = state == THR_FINISH;
393 
394  // Update the main progress info.
395  thr->coder->progress_in
396  += thr->outbuf->uncompressed_size;
397  thr->coder->progress_out += thr->outbuf->size;
398  thr->progress_in = 0;
399  thr->progress_out = 0;
400 
401  // Return this thread to the stack of free threads.
402  thr->next = thr->coder->threads_free;
403  thr->coder->threads_free = thr;
404 
405  mythread_cond_signal(&thr->coder->cond);
406  }
407  }
408 
409  // Exiting, free the resources.
410  mythread_mutex_destroy(&thr->mutex);
411  mythread_cond_destroy(&thr->cond);
412 
414  lzma_free(thr->in, thr->allocator);
415  return MYTHREAD_RET_VALUE;
416 }
static worker_state worker_encode(worker_thread *thr, worker_state state)
bool finished
Definition: outqueue.h:32
worker_thread * next
Next structure in the stack of free worker threads.

References worker_thread_s::allocator, assert(), worker_thread_s::block_encoder, worker_thread_s::coder, worker_thread_s::cond, lzma_outbuf::finished, worker_thread_s::in, lzma_free(), lzma_next_end(), worker_thread_s::mutex, worker_thread_s::next, worker_thread_s::outbuf, worker_thread_s::progress_in, worker_thread_s::progress_out, lzma_outbuf::size, worker_thread_s::state, THR_EXIT, THR_FINISH, THR_IDLE, THR_STOP, lzma_outbuf::uncompressed_size, and worker_encode().

Referenced by initialize_new_thread().