summaryrefslogtreecommitdiffstats
path: root/vendor/guzzlehttp/guzzle/src/Pool.php
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/guzzlehttp/guzzle/src/Pool.php')
-rw-r--r--vendor/guzzlehttp/guzzle/src/Pool.php134
1 files changed, 134 insertions, 0 deletions
diff --git a/vendor/guzzlehttp/guzzle/src/Pool.php b/vendor/guzzlehttp/guzzle/src/Pool.php
new file mode 100644
index 0000000..5838db4
--- /dev/null
+++ b/vendor/guzzlehttp/guzzle/src/Pool.php
@@ -0,0 +1,134 @@
+<?php
+namespace GuzzleHttp;
+
+use GuzzleHttp\Promise\EachPromise;
+use GuzzleHttp\Promise\PromiseInterface;
+use GuzzleHttp\Promise\PromisorInterface;
+use Psr\Http\Message\RequestInterface;
+
+/**
+ * Sends an iterator of requests concurrently using a capped pool size.
+ *
+ * The pool will read from an iterator until it is cancelled or until the
+ * iterator is consumed. When a request is yielded, the request is sent after
+ * applying the "request_options" request options (if provided in the ctor).
+ *
+ * When a function is yielded by the iterator, the function is provided the
+ * "request_options" array that should be merged on top of any existing
+ * options, and the function MUST then return a wait-able promise.
+ */
+class Pool implements PromisorInterface
+{
+ /** @var EachPromise */
+ private $each;
+
+ /**
+ * @param ClientInterface $client Client used to send the requests.
+ * @param array|\Iterator $requests Requests or functions that return
+ * requests to send concurrently.
+ * @param array $config Associative array of options
+ * - concurrency: (int) Maximum number of requests to send concurrently
+ * - options: Array of request options to apply to each request.
+ * - fulfilled: (callable) Function to invoke when a request completes.
+ * - rejected: (callable) Function to invoke when a request is rejected.
+ */
+ public function __construct(
+ ClientInterface $client,
+ $requests,
+ array $config = []
+ ) {
+ // Backwards compatibility.
+ if (isset($config['pool_size'])) {
+ $config['concurrency'] = $config['pool_size'];
+ } elseif (!isset($config['concurrency'])) {
+ $config['concurrency'] = 25;
+ }
+
+ if (isset($config['options'])) {
+ $opts = $config['options'];
+ unset($config['options']);
+ } else {
+ $opts = [];
+ }
+
+ $iterable = \GuzzleHttp\Promise\iter_for($requests);
+ $requests = function () use ($iterable, $client, $opts) {
+ foreach ($iterable as $key => $rfn) {
+ if ($rfn instanceof RequestInterface) {
+ yield $key => $client->sendAsync($rfn, $opts);
+ } elseif (is_callable($rfn)) {
+ yield $key => $rfn($opts);
+ } else {
+ throw new \InvalidArgumentException('Each value yielded by '
+ . 'the iterator must be a Psr7\Http\Message\RequestInterface '
+ . 'or a callable that returns a promise that fulfills '
+ . 'with a Psr7\Message\Http\ResponseInterface object.');
+ }
+ }
+ };
+
+ $this->each = new EachPromise($requests(), $config);
+ }
+
+ /**
+ * Get promise
+ *
+ * @return PromiseInterface
+ */
+ public function promise()
+ {
+ return $this->each->promise();
+ }
+
+ /**
+ * Sends multiple requests concurrently and returns an array of responses
+ * and exceptions that uses the same ordering as the provided requests.
+ *
+ * IMPORTANT: This method keeps every request and response in memory, and
+ * as such, is NOT recommended when sending a large number or an
+ * indeterminate number of requests concurrently.
+ *
+ * @param ClientInterface $client Client used to send the requests
+ * @param array|\Iterator $requests Requests to send concurrently.
+ * @param array $options Passes through the options available in
+ * {@see GuzzleHttp\Pool::__construct}
+ *
+ * @return array Returns an array containing the response or an exception
+ * in the same order that the requests were sent.
+ * @throws \InvalidArgumentException if the event format is incorrect.
+ */
+ public static function batch(
+ ClientInterface $client,
+ $requests,
+ array $options = []
+ ) {
+ $res = [];
+ self::cmpCallback($options, 'fulfilled', $res);
+ self::cmpCallback($options, 'rejected', $res);
+ $pool = new static($client, $requests, $options);
+ $pool->promise()->wait();
+ ksort($res);
+
+ return $res;
+ }
+
+ /**
+ * Execute callback(s)
+ *
+ * @return void
+ */
+ private static function cmpCallback(array &$options, $name, array &$results)
+ {
+ if (!isset($options[$name])) {
+ $options[$name] = function ($v, $k) use (&$results) {
+ $results[$k] = $v;
+ };
+ } else {
+ $currentFn = $options[$name];
+ $options[$name] = function ($v, $k) use (&$results, $currentFn) {
+ $currentFn($v, $k);
+ $results[$k] = $v;
+ };
+ }
+ }
+}