酒店预订平台
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

255 rivejä
7.8 KiB

  1. <?php
  2. namespace GuzzleHttp\Promise;
  3. /**
  4. * Represents a promise that iterates over many promises and invokes
  5. * side-effect functions in the process.
  6. */
  7. class EachPromise implements PromisorInterface
  8. {
  9. private $pending = [];
  10. private $nextPendingIndex = 0;
  11. /** @var \Iterator|null */
  12. private $iterable;
  13. /** @var callable|int|null */
  14. private $concurrency;
  15. /** @var callable|null */
  16. private $onFulfilled;
  17. /** @var callable|null */
  18. private $onRejected;
  19. /** @var Promise|null */
  20. private $aggregate;
  21. /** @var bool|null */
  22. private $mutex;
  23. /**
  24. * Configuration hash can include the following key value pairs:
  25. *
  26. * - fulfilled: (callable) Invoked when a promise fulfills. The function
  27. * is invoked with three arguments: the fulfillment value, the index
  28. * position from the iterable list of the promise, and the aggregate
  29. * promise that manages all of the promises. The aggregate promise may
  30. * be resolved from within the callback to short-circuit the promise.
  31. * - rejected: (callable) Invoked when a promise is rejected. The
  32. * function is invoked with three arguments: the rejection reason, the
  33. * index position from the iterable list of the promise, and the
  34. * aggregate promise that manages all of the promises. The aggregate
  35. * promise may be resolved from within the callback to short-circuit
  36. * the promise.
  37. * - concurrency: (integer) Pass this configuration option to limit the
  38. * allowed number of outstanding concurrently executing promises,
  39. * creating a capped pool of promises. There is no limit by default.
  40. *
  41. * @param mixed $iterable Promises or values to iterate.
  42. * @param array $config Configuration options
  43. */
  44. public function __construct($iterable, array $config = [])
  45. {
  46. $this->iterable = Create::iterFor($iterable);
  47. if (isset($config['concurrency'])) {
  48. $this->concurrency = $config['concurrency'];
  49. }
  50. if (isset($config['fulfilled'])) {
  51. $this->onFulfilled = $config['fulfilled'];
  52. }
  53. if (isset($config['rejected'])) {
  54. $this->onRejected = $config['rejected'];
  55. }
  56. }
  57. /** @psalm-suppress InvalidNullableReturnType */
  58. public function promise()
  59. {
  60. if ($this->aggregate) {
  61. return $this->aggregate;
  62. }
  63. try {
  64. $this->createPromise();
  65. /** @psalm-assert Promise $this->aggregate */
  66. $this->iterable->rewind();
  67. if (!$this->checkIfFinished()) {
  68. $this->refillPending();
  69. }
  70. } catch (\Throwable $e) {
  71. /**
  72. * @psalm-suppress NullReference
  73. * @phpstan-ignore-next-line
  74. */
  75. $this->aggregate->reject($e);
  76. } catch (\Exception $e) {
  77. /**
  78. * @psalm-suppress NullReference
  79. * @phpstan-ignore-next-line
  80. */
  81. $this->aggregate->reject($e);
  82. }
  83. /**
  84. * @psalm-suppress NullableReturnStatement
  85. * @phpstan-ignore-next-line
  86. */
  87. return $this->aggregate;
  88. }
  89. private function createPromise()
  90. {
  91. $this->mutex = false;
  92. $this->aggregate = new Promise(function () {
  93. reset($this->pending);
  94. // Consume a potentially fluctuating list of promises while
  95. // ensuring that indexes are maintained (precluding array_shift).
  96. while ($promise = current($this->pending)) {
  97. next($this->pending);
  98. $promise->wait();
  99. if (Is::settled($this->aggregate)) {
  100. return;
  101. }
  102. }
  103. });
  104. // Clear the references when the promise is resolved.
  105. $clearFn = function () {
  106. $this->iterable = $this->concurrency = $this->pending = null;
  107. $this->onFulfilled = $this->onRejected = null;
  108. $this->nextPendingIndex = 0;
  109. };
  110. $this->aggregate->then($clearFn, $clearFn);
  111. }
  112. private function refillPending()
  113. {
  114. if (!$this->concurrency) {
  115. // Add all pending promises.
  116. while ($this->addPending() && $this->advanceIterator());
  117. return;
  118. }
  119. // Add only up to N pending promises.
  120. $concurrency = is_callable($this->concurrency)
  121. ? call_user_func($this->concurrency, count($this->pending))
  122. : $this->concurrency;
  123. $concurrency = max($concurrency - count($this->pending), 0);
  124. // Concurrency may be set to 0 to disallow new promises.
  125. if (!$concurrency) {
  126. return;
  127. }
  128. // Add the first pending promise.
  129. $this->addPending();
  130. // Note this is special handling for concurrency=1 so that we do
  131. // not advance the iterator after adding the first promise. This
  132. // helps work around issues with generators that might not have the
  133. // next value to yield until promise callbacks are called.
  134. while (--$concurrency
  135. && $this->advanceIterator()
  136. && $this->addPending());
  137. }
  138. private function addPending()
  139. {
  140. if (!$this->iterable || !$this->iterable->valid()) {
  141. return false;
  142. }
  143. $promise = Create::promiseFor($this->iterable->current());
  144. $key = $this->iterable->key();
  145. // Iterable keys may not be unique, so we use a counter to
  146. // guarantee uniqueness
  147. $idx = $this->nextPendingIndex++;
  148. $this->pending[$idx] = $promise->then(
  149. function ($value) use ($idx, $key) {
  150. if ($this->onFulfilled) {
  151. call_user_func(
  152. $this->onFulfilled,
  153. $value,
  154. $key,
  155. $this->aggregate
  156. );
  157. }
  158. $this->step($idx);
  159. },
  160. function ($reason) use ($idx, $key) {
  161. if ($this->onRejected) {
  162. call_user_func(
  163. $this->onRejected,
  164. $reason,
  165. $key,
  166. $this->aggregate
  167. );
  168. }
  169. $this->step($idx);
  170. }
  171. );
  172. return true;
  173. }
  174. private function advanceIterator()
  175. {
  176. // Place a lock on the iterator so that we ensure to not recurse,
  177. // preventing fatal generator errors.
  178. if ($this->mutex) {
  179. return false;
  180. }
  181. $this->mutex = true;
  182. try {
  183. $this->iterable->next();
  184. $this->mutex = false;
  185. return true;
  186. } catch (\Throwable $e) {
  187. $this->aggregate->reject($e);
  188. $this->mutex = false;
  189. return false;
  190. } catch (\Exception $e) {
  191. $this->aggregate->reject($e);
  192. $this->mutex = false;
  193. return false;
  194. }
  195. }
  196. private function step($idx)
  197. {
  198. // If the promise was already resolved, then ignore this step.
  199. if (Is::settled($this->aggregate)) {
  200. return;
  201. }
  202. unset($this->pending[$idx]);
  203. // Only refill pending promises if we are not locked, preventing the
  204. // EachPromise to recursively invoke the provided iterator, which
  205. // cause a fatal error: "Cannot resume an already running generator"
  206. if ($this->advanceIterator() && !$this->checkIfFinished()) {
  207. // Add more pending promises if possible.
  208. $this->refillPending();
  209. }
  210. }
  211. private function checkIfFinished()
  212. {
  213. if (!$this->pending && !$this->iterable->valid()) {
  214. // Resolve the promise if there's nothing left to do.
  215. $this->aggregate->resolve(null);
  216. return true;
  217. }
  218. return false;
  219. }
  220. }