You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

functions.php 12 KiB

4 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457
  1. <?php
  2. namespace GuzzleHttp\Promise;
  3. /**
  4. * Get the global task queue used for promise resolution.
  5. *
  6. * This task queue MUST be run in an event loop in order for promises to be
  7. * settled asynchronously. It will be automatically run when synchronously
  8. * waiting on a promise.
  9. *
  10. * <code>
  11. * while ($eventLoop->isRunning()) {
  12. * GuzzleHttp\Promise\queue()->run();
  13. * }
  14. * </code>
  15. *
  16. * @param TaskQueueInterface $assign Optionally specify a new queue instance.
  17. *
  18. * @return TaskQueueInterface
  19. */
  20. function queue(TaskQueueInterface $assign = null)
  21. {
  22. static $queue;
  23. if ($assign) {
  24. $queue = $assign;
  25. } elseif (!$queue) {
  26. $queue = new TaskQueue();
  27. }
  28. return $queue;
  29. }
  30. /**
  31. * Adds a function to run in the task queue when it is next `run()` and returns
  32. * a promise that is fulfilled or rejected with the result.
  33. *
  34. * @param callable $task Task function to run.
  35. *
  36. * @return PromiseInterface
  37. */
  38. function task(callable $task)
  39. {
  40. $queue = queue();
  41. $promise = new Promise([$queue, 'run']);
  42. $queue->add(function () use ($task, $promise) {
  43. try {
  44. $promise->resolve($task());
  45. } catch (\Throwable $e) {
  46. $promise->reject($e);
  47. } catch (\Exception $e) {
  48. $promise->reject($e);
  49. }
  50. });
  51. return $promise;
  52. }
  53. /**
  54. * Creates a promise for a value if the value is not a promise.
  55. *
  56. * @param mixed $value Promise or value.
  57. *
  58. * @return PromiseInterface
  59. */
  60. function promise_for($value)
  61. {
  62. if ($value instanceof PromiseInterface) {
  63. return $value;
  64. }
  65. // Return a Guzzle promise that shadows the given promise.
  66. if (method_exists($value, 'then')) {
  67. $wfn = method_exists($value, 'wait') ? [$value, 'wait'] : null;
  68. $cfn = method_exists($value, 'cancel') ? [$value, 'cancel'] : null;
  69. $promise = new Promise($wfn, $cfn);
  70. $value->then([$promise, 'resolve'], [$promise, 'reject']);
  71. return $promise;
  72. }
  73. return new FulfilledPromise($value);
  74. }
  75. /**
  76. * Creates a rejected promise for a reason if the reason is not a promise. If
  77. * the provided reason is a promise, then it is returned as-is.
  78. *
  79. * @param mixed $reason Promise or reason.
  80. *
  81. * @return PromiseInterface
  82. */
  83. function rejection_for($reason)
  84. {
  85. if ($reason instanceof PromiseInterface) {
  86. return $reason;
  87. }
  88. return new RejectedPromise($reason);
  89. }
  90. /**
  91. * Create an exception for a rejected promise value.
  92. *
  93. * @param mixed $reason
  94. *
  95. * @return \Exception|\Throwable
  96. */
  97. function exception_for($reason)
  98. {
  99. return $reason instanceof \Exception || $reason instanceof \Throwable
  100. ? $reason
  101. : new RejectionException($reason);
  102. }
  103. /**
  104. * Returns an iterator for the given value.
  105. *
  106. * @param mixed $value
  107. *
  108. * @return \Iterator
  109. */
  110. function iter_for($value)
  111. {
  112. if ($value instanceof \Iterator) {
  113. return $value;
  114. } elseif (is_array($value)) {
  115. return new \ArrayIterator($value);
  116. } else {
  117. return new \ArrayIterator([$value]);
  118. }
  119. }
  120. /**
  121. * Synchronously waits on a promise to resolve and returns an inspection state
  122. * array.
  123. *
  124. * Returns a state associative array containing a "state" key mapping to a
  125. * valid promise state. If the state of the promise is "fulfilled", the array
  126. * will contain a "value" key mapping to the fulfilled value of the promise. If
  127. * the promise is rejected, the array will contain a "reason" key mapping to
  128. * the rejection reason of the promise.
  129. *
  130. * @param PromiseInterface $promise Promise or value.
  131. *
  132. * @return array
  133. */
  134. function inspect(PromiseInterface $promise)
  135. {
  136. try {
  137. return [
  138. 'state' => PromiseInterface::FULFILLED,
  139. 'value' => $promise->wait()
  140. ];
  141. } catch (RejectionException $e) {
  142. return ['state' => PromiseInterface::REJECTED, 'reason' => $e->getReason()];
  143. } catch (\Throwable $e) {
  144. return ['state' => PromiseInterface::REJECTED, 'reason' => $e];
  145. } catch (\Exception $e) {
  146. return ['state' => PromiseInterface::REJECTED, 'reason' => $e];
  147. }
  148. }
  149. /**
  150. * Waits on all of the provided promises, but does not unwrap rejected promises
  151. * as thrown exception.
  152. *
  153. * Returns an array of inspection state arrays.
  154. *
  155. * @param PromiseInterface[] $promises Traversable of promises to wait upon.
  156. *
  157. * @return array
  158. * @see GuzzleHttp\Promise\inspect for the inspection state array format.
  159. */
  160. function inspect_all($promises)
  161. {
  162. $results = [];
  163. foreach ($promises as $key => $promise) {
  164. $results[$key] = inspect($promise);
  165. }
  166. return $results;
  167. }
  168. /**
  169. * Waits on all of the provided promises and returns the fulfilled values.
  170. *
  171. * Returns an array that contains the value of each promise (in the same order
  172. * the promises were provided). An exception is thrown if any of the promises
  173. * are rejected.
  174. *
  175. * @param mixed $promises Iterable of PromiseInterface objects to wait on.
  176. *
  177. * @return array
  178. * @throws \Exception on error
  179. * @throws \Throwable on error in PHP >=7
  180. */
  181. function unwrap($promises)
  182. {
  183. $results = [];
  184. foreach ($promises as $key => $promise) {
  185. $results[$key] = $promise->wait();
  186. }
  187. return $results;
  188. }
  189. /**
  190. * Given an array of promises, return a promise that is fulfilled when all the
  191. * items in the array are fulfilled.
  192. *
  193. * The promise's fulfillment value is an array with fulfillment values at
  194. * respective positions to the original array. If any promise in the array
  195. * rejects, the returned promise is rejected with the rejection reason.
  196. *
  197. * @param mixed $promises Promises or values.
  198. *
  199. * @return PromiseInterface
  200. */
  201. function all($promises)
  202. {
  203. $results = [];
  204. return each(
  205. $promises,
  206. function ($value, $idx) use (&$results) {
  207. $results[$idx] = $value;
  208. },
  209. function ($reason, $idx, Promise $aggregate) {
  210. $aggregate->reject($reason);
  211. }
  212. )->then(function () use (&$results) {
  213. ksort($results);
  214. return $results;
  215. });
  216. }
  217. /**
  218. * Initiate a competitive race between multiple promises or values (values will
  219. * become immediately fulfilled promises).
  220. *
  221. * When count amount of promises have been fulfilled, the returned promise is
  222. * fulfilled with an array that contains the fulfillment values of the winners
  223. * in order of resolution.
  224. *
  225. * This prommise is rejected with a {@see GuzzleHttp\Promise\AggregateException}
  226. * if the number of fulfilled promises is less than the desired $count.
  227. *
  228. * @param int $count Total number of promises.
  229. * @param mixed $promises Promises or values.
  230. *
  231. * @return PromiseInterface
  232. */
  233. function some($count, $promises)
  234. {
  235. $results = [];
  236. $rejections = [];
  237. return each(
  238. $promises,
  239. function ($value, $idx, PromiseInterface $p) use (&$results, $count) {
  240. if ($p->getState() !== PromiseInterface::PENDING) {
  241. return;
  242. }
  243. $results[$idx] = $value;
  244. if (count($results) >= $count) {
  245. $p->resolve(null);
  246. }
  247. },
  248. function ($reason) use (&$rejections) {
  249. $rejections[] = $reason;
  250. }
  251. )->then(
  252. function () use (&$results, &$rejections, $count) {
  253. if (count($results) !== $count) {
  254. throw new AggregateException(
  255. 'Not enough promises to fulfill count',
  256. $rejections
  257. );
  258. }
  259. ksort($results);
  260. return array_values($results);
  261. }
  262. );
  263. }
  264. /**
  265. * Like some(), with 1 as count. However, if the promise fulfills, the
  266. * fulfillment value is not an array of 1 but the value directly.
  267. *
  268. * @param mixed $promises Promises or values.
  269. *
  270. * @return PromiseInterface
  271. */
  272. function any($promises)
  273. {
  274. return some(1, $promises)->then(function ($values) { return $values[0]; });
  275. }
  276. /**
  277. * Returns a promise that is fulfilled when all of the provided promises have
  278. * been fulfilled or rejected.
  279. *
  280. * The returned promise is fulfilled with an array of inspection state arrays.
  281. *
  282. * @param mixed $promises Promises or values.
  283. *
  284. * @return PromiseInterface
  285. * @see GuzzleHttp\Promise\inspect for the inspection state array format.
  286. */
  287. function settle($promises)
  288. {
  289. $results = [];
  290. return each(
  291. $promises,
  292. function ($value, $idx) use (&$results) {
  293. $results[$idx] = ['state' => PromiseInterface::FULFILLED, 'value' => $value];
  294. },
  295. function ($reason, $idx) use (&$results) {
  296. $results[$idx] = ['state' => PromiseInterface::REJECTED, 'reason' => $reason];
  297. }
  298. )->then(function () use (&$results) {
  299. ksort($results);
  300. return $results;
  301. });
  302. }
  303. /**
  304. * Given an iterator that yields promises or values, returns a promise that is
  305. * fulfilled with a null value when the iterator has been consumed or the
  306. * aggregate promise has been fulfilled or rejected.
  307. *
  308. * $onFulfilled is a function that accepts the fulfilled value, iterator
  309. * index, and the aggregate promise. The callback can invoke any necessary side
  310. * effects and choose to resolve or reject the aggregate promise if needed.
  311. *
  312. * $onRejected is a function that accepts the rejection reason, iterator
  313. * index, and the aggregate promise. The callback can invoke any necessary side
  314. * effects and choose to resolve or reject the aggregate promise if needed.
  315. *
  316. * @param mixed $iterable Iterator or array to iterate over.
  317. * @param callable $onFulfilled
  318. * @param callable $onRejected
  319. *
  320. * @return PromiseInterface
  321. */
  322. function each(
  323. $iterable,
  324. callable $onFulfilled = null,
  325. callable $onRejected = null
  326. ) {
  327. return (new EachPromise($iterable, [
  328. 'fulfilled' => $onFulfilled,
  329. 'rejected' => $onRejected
  330. ]))->promise();
  331. }
  332. /**
  333. * Like each, but only allows a certain number of outstanding promises at any
  334. * given time.
  335. *
  336. * $concurrency may be an integer or a function that accepts the number of
  337. * pending promises and returns a numeric concurrency limit value to allow for
  338. * dynamic a concurrency size.
  339. *
  340. * @param mixed $iterable
  341. * @param int|callable $concurrency
  342. * @param callable $onFulfilled
  343. * @param callable $onRejected
  344. *
  345. * @return PromiseInterface
  346. */
  347. function each_limit(
  348. $iterable,
  349. $concurrency,
  350. callable $onFulfilled = null,
  351. callable $onRejected = null
  352. ) {
  353. return (new EachPromise($iterable, [
  354. 'fulfilled' => $onFulfilled,
  355. 'rejected' => $onRejected,
  356. 'concurrency' => $concurrency
  357. ]))->promise();
  358. }
  359. /**
  360. * Like each_limit, but ensures that no promise in the given $iterable argument
  361. * is rejected. If any promise is rejected, then the aggregate promise is
  362. * rejected with the encountered rejection.
  363. *
  364. * @param mixed $iterable
  365. * @param int|callable $concurrency
  366. * @param callable $onFulfilled
  367. *
  368. * @return PromiseInterface
  369. */
  370. function each_limit_all(
  371. $iterable,
  372. $concurrency,
  373. callable $onFulfilled = null
  374. ) {
  375. return each_limit(
  376. $iterable,
  377. $concurrency,
  378. $onFulfilled,
  379. function ($reason, $idx, PromiseInterface $aggregate) {
  380. $aggregate->reject($reason);
  381. }
  382. );
  383. }
  384. /**
  385. * Returns true if a promise is fulfilled.
  386. *
  387. * @param PromiseInterface $promise
  388. *
  389. * @return bool
  390. */
  391. function is_fulfilled(PromiseInterface $promise)
  392. {
  393. return $promise->getState() === PromiseInterface::FULFILLED;
  394. }
  395. /**
  396. * Returns true if a promise is rejected.
  397. *
  398. * @param PromiseInterface $promise
  399. *
  400. * @return bool
  401. */
  402. function is_rejected(PromiseInterface $promise)
  403. {
  404. return $promise->getState() === PromiseInterface::REJECTED;
  405. }
  406. /**
  407. * Returns true if a promise is fulfilled or rejected.
  408. *
  409. * @param PromiseInterface $promise
  410. *
  411. * @return bool
  412. */
  413. function is_settled(PromiseInterface $promise)
  414. {
  415. return $promise->getState() !== PromiseInterface::PENDING;
  416. }
  417. /**
  418. * @see Coroutine
  419. *
  420. * @param callable $generatorFn
  421. *
  422. * @return PromiseInterface
  423. */
  424. function coroutine(callable $generatorFn)
  425. {
  426. return new Coroutine($generatorFn);
  427. }