diff --git a/configure.ac b/configure.ac index c5f31b76e82..7bf4b6f9e45 100644 --- a/configure.ac +++ b/configure.ac @@ -1973,6 +1973,7 @@ AS_IF([test "x$enable_cppapi" = xyes], [ lib/atscppapi/examples/stat_example/Makefile lib/atscppapi/examples/timeout_example/Makefile lib/atscppapi/examples/transactionhook/Makefile + lib/atscppapi/examples/async_http_fetch_streaming/Makefile lib/atscppapi/src/Makefile ])]) diff --git a/lib/atscppapi/examples/Makefile.am b/lib/atscppapi/examples/Makefile.am index 08cfd4866ac..9f56838cf5d 100644 --- a/lib/atscppapi/examples/Makefile.am +++ b/lib/atscppapi/examples/Makefile.am @@ -35,4 +35,5 @@ SUBDIRS = \ timeout_example \ internal_transaction_handling \ async_timer \ - intercept + intercept \ + async_http_fetch_streaming diff --git a/lib/atscppapi/examples/async_http_fetch/AsyncHttpFetch.cc b/lib/atscppapi/examples/async_http_fetch/AsyncHttpFetch.cc index 14ff9ccdca2..fe98748576c 100644 --- a/lib/atscppapi/examples/async_http_fetch/AsyncHttpFetch.cc +++ b/lib/atscppapi/examples/async_http_fetch/AsyncHttpFetch.cc @@ -149,7 +149,8 @@ class TransactionHookPlugin : public TransactionPlugin, public AsyncReceiver(body_size), static_cast(body)); + TS_DEBUG(TAG, "Response body is %zu bytes long and is [%.*s]", body_size, static_cast(body_size), + static_cast(body)); } else { TS_ERROR(TAG, "Fetch did not complete successfully; Result %d", static_cast(async_http_fetch.getResult())); diff --git a/lib/atscppapi/examples/async_http_fetch_streaming/AsyncHttpFetchStreaming.cc b/lib/atscppapi/examples/async_http_fetch_streaming/AsyncHttpFetchStreaming.cc new file mode 100644 index 00000000000..f570f6fd8c7 --- /dev/null +++ b/lib/atscppapi/examples/async_http_fetch_streaming/AsyncHttpFetchStreaming.cc @@ -0,0 +1,155 @@ +/** + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace atscppapi; +using std::string; + +// This is for the -T tag debugging +// To view the debug messages ./traffic_server -T "async_http_fetch_example.*" +#define TAG "async_http_fetch_example" + +class Intercept : public InterceptPlugin, public AsyncReceiver { +public: + Intercept(Transaction &transaction) : InterceptPlugin(transaction, InterceptPlugin::SERVER_INTERCEPT), + transaction_(transaction), num_fetches_(0) { + main_url_ = transaction.getClientRequest().getUrl().getUrlString(); + } + void consume(const string &data, InterceptPlugin::RequestDataType type); + void handleInputComplete(); + void handleAsyncComplete(AsyncHttpFetch &async_http_fetch); + ~Intercept(); +private: + Transaction &transaction_; + string request_body_; + string main_url_; + string dependent_url_; + int num_fetches_; +}; + +class InterceptInstaller : public GlobalPlugin { +public: + InterceptInstaller() : GlobalPlugin(true /* ignore internal transactions */) { + GlobalPlugin::registerHook(Plugin::HOOK_READ_REQUEST_HEADERS_PRE_REMAP); + } + void handleReadRequestHeadersPreRemap(Transaction &transaction) { + transaction.addPlugin(new Intercept(transaction)); + TS_DEBUG(TAG, "Added intercept"); + transaction.resume(); + } +}; + +void TSPluginInit(int /* argc ATS_UNUSED */, const char * /* argv ATS_UNUSED */ []) { + new InterceptInstaller(); +} + +void Intercept::consume(const string &data, InterceptPlugin::RequestDataType type) { + if (type == InterceptPlugin::REQUEST_BODY) { + request_body_ += data; + } +} + +void Intercept::handleInputComplete() { + TS_DEBUG(TAG, "Request data complete"); + AsyncHttpFetch *async_http_fetch = request_body_.empty() ? + new AsyncHttpFetch(main_url_, AsyncHttpFetch::STREAMING_ENABLED, transaction_.getClientRequest().getMethod()) : + new AsyncHttpFetch(main_url_, AsyncHttpFetch::STREAMING_ENABLED, request_body_); + Async::execute(this, async_http_fetch, getMutex()); + ++num_fetches_; + size_t dependent_url_param_pos = main_url_.find("dependent_url="); + if (dependent_url_param_pos != string::npos) { + dependent_url_ = main_url_.substr(dependent_url_param_pos + 14); + Async::execute(this, new AsyncHttpFetch(dependent_url_, + AsyncHttpFetch::STREAMING_ENABLED), + getMutex()); + ++num_fetches_; + TS_DEBUG(TAG, "Started fetch for dependent URL [%s]", dependent_url_.c_str()); + } +} + +void Intercept::handleAsyncComplete(AsyncHttpFetch &async_http_fetch) { + AsyncHttpFetch::Result result = async_http_fetch.getResult(); + string url = async_http_fetch.getRequestUrl().getUrlString(); + if (result == AsyncHttpFetch::RESULT_HEADER_COMPLETE) { + TS_DEBUG(TAG, "Header completed for URL [%s]", url.c_str()); + const Response &response = async_http_fetch.getResponse(); + std::ostringstream oss; + oss << HTTP_VERSION_STRINGS[response.getVersion()] << ' ' << response.getStatusCode() << ' ' + << response.getReasonPhrase() << "\r\n"; + Headers &response_headers = response.getHeaders(); + for (Headers::iterator iter = response_headers.begin(), end = response_headers.end(); iter != end; ++iter) { + HeaderFieldName header_name = (*iter).name(); + if (header_name != "Transfer-Encoding") { + oss << header_name.str() << ": " << (*iter).values() << "\r\n"; + } + } + oss << "\r\n"; + if (url == main_url_) { + Intercept::produce(oss.str()); + } + else { + TS_DEBUG(TAG, "Response header for dependent URL\n%s", oss.str().c_str()); + } + } + else if (result == AsyncHttpFetch::RESULT_PARTIAL_BODY || result == AsyncHttpFetch::RESULT_BODY_COMPLETE) { + const void *body; + size_t body_size; + async_http_fetch.getResponseBody(body, body_size); + if (url == main_url_) { + Intercept::produce(string(static_cast(body), body_size)); + } + else { + TS_DEBUG(TAG, "Got dependent body bit; has %zu bytes and is [%.*s]", body_size, static_cast(body_size), + static_cast(body)); + } + if (result == AsyncHttpFetch::RESULT_BODY_COMPLETE) { + TS_DEBUG(TAG, "response body complete"); + } + } + else { + TS_ERROR(TAG, "Fetch did not complete successfully; Result %d", static_cast(result)); + if (url == main_url_) { + InterceptPlugin::produce("HTTP/1.1 500 Internal Server Error\r\n\r\n"); + } + } + if (result == AsyncHttpFetch::RESULT_TIMEOUT || result == AsyncHttpFetch::RESULT_FAILURE || + result == AsyncHttpFetch::RESULT_BODY_COMPLETE) { + if (--num_fetches_ == 0) { + TS_DEBUG(TAG, "Marking output as complete"); + InterceptPlugin::setOutputComplete(); + } + } +} + +Intercept::~Intercept() { + if (num_fetches_) { + TS_DEBUG(TAG, "Fetch still pending, but transaction closing"); + } + TS_DEBUG(TAG, "Shutting down"); +} diff --git a/lib/atscppapi/examples/async_http_fetch_streaming/Makefile.am b/lib/atscppapi/examples/async_http_fetch_streaming/Makefile.am new file mode 100644 index 00000000000..ecc204a8c5c --- /dev/null +++ b/lib/atscppapi/examples/async_http_fetch_streaming/Makefile.am @@ -0,0 +1,30 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +AM_CPPFLAGS = -I$(top_srcdir)/lib/atscppapi/src/include -Wno-unused-variable + +target=AsyncHttpFetchStreaming.so +pkglibdir = ${pkglibexecdir} +pkglib_LTLIBRARIES = AsyncHttpFetchStreaming.la +AsyncHttpFetchStreaming_la_SOURCES = AsyncHttpFetchStreaming.cc +AsyncHttpFetchStreaming_la_LDFLAGS = -module -avoid-version -shared -L$(top_srcdir)/lib/atscppapi/src/ -latscppapi + +all: + ln -sf .libs/$(target) + +clean-local: + rm -f $(target) diff --git a/lib/atscppapi/src/AsyncHttpFetch.cc b/lib/atscppapi/src/AsyncHttpFetch.cc index 042e457167b..af920b0a594 100644 --- a/lib/atscppapi/src/AsyncHttpFetch.cc +++ b/lib/atscppapi/src/AsyncHttpFetch.cc @@ -21,12 +21,15 @@ */ #include "atscppapi/AsyncHttpFetch.h" +#include "atscppapi/shared_ptr.h" #include +#include #include #include "logging_internal.h" #include "utils_internal.h" #include +#include using namespace atscppapi; using std::string; @@ -35,7 +38,7 @@ using std::string; * @private */ struct atscppapi::AsyncHttpFetchState : noncopyable { - Request request_; + shared_ptr request_; Response response_; string request_body_; AsyncHttpFetch::Result result_; @@ -44,11 +47,22 @@ struct atscppapi::AsyncHttpFetchState : noncopyable { TSMBuffer hdr_buf_; TSMLoc hdr_loc_; shared_ptr dispatch_controller_; + AsyncHttpFetch::StreamingFlag streaming_flag_; + TSFetchSM fetch_sm_; + static const size_t BODY_BUFFER_SIZE = 32 * 1024; + char body_buffer_[BODY_BUFFER_SIZE]; + + AsyncHttpFetchState(const string &url_str, HttpMethod http_method, string request_body, + AsyncHttpFetch::StreamingFlag streaming_flag) + : request_body_(request_body), result_(AsyncHttpFetch::RESULT_FAILURE), body_(NULL), body_size_(0), + hdr_buf_(NULL), hdr_loc_(NULL), streaming_flag_(streaming_flag), fetch_sm_(NULL) { + request_.reset(new Request(url_str, http_method, (streaming_flag_ == AsyncHttpFetch::STREAMING_DISABLED) ? + HTTP_VERSION_1_0 : HTTP_VERSION_1_1)); + if (streaming_flag_ == AsyncHttpFetch::STREAMING_ENABLED) { + body_ = body_buffer_; + } + } - AsyncHttpFetchState(const string &url_str, HttpMethod http_method, string request_body) - : request_(url_str, http_method, HTTP_VERSION_1_0), request_body_(request_body), - result_(AsyncHttpFetch::RESULT_FAILURE), body_(NULL), body_size_(0), hdr_buf_(NULL), hdr_loc_(NULL) { } - ~AsyncHttpFetchState() { if (hdr_loc_) { TSMLoc null_parent_loc = NULL; @@ -57,6 +71,9 @@ struct atscppapi::AsyncHttpFetchState : noncopyable { if (hdr_buf_) { TSMBufferDestroy(hdr_buf_); } + if (fetch_sm_) { + TSFetchDestroy(fetch_sm_); + } } }; @@ -66,63 +83,91 @@ const unsigned int LOCAL_IP_ADDRESS = 0x0100007f; const int LOCAL_PORT = 8080; static int handleFetchEvents(TSCont cont, TSEvent event, void *edata) { - LOG_DEBUG("Fetch result returned event = %d, edata = %p", event, edata); + LOG_DEBUG("Received fetch event = %d, edata = %p", event, edata); AsyncHttpFetch *fetch_provider = static_cast(TSContDataGet(cont)); AsyncHttpFetchState *state = utils::internal::getAsyncHttpFetchState(*fetch_provider); - - if (event == static_cast(AsyncHttpFetch::RESULT_SUCCESS)) { - TSHttpTxn txn = static_cast(edata); - int data_len; - const char *data_start = TSFetchRespGet(txn, &data_len); - - if (data_start && (data_len > 0)) { - const char *data_end = data_start + data_len; - TSHttpParser parser = TSHttpParserCreate(); - - state->hdr_buf_ = TSMBufferCreate(); - state->hdr_loc_ = TSHttpHdrCreate(state->hdr_buf_); - TSHttpHdrTypeSet(state->hdr_buf_, state->hdr_loc_, TS_HTTP_TYPE_RESPONSE); - if (TSHttpHdrParseResp(parser, state->hdr_buf_, state->hdr_loc_, &data_start, data_end) == TS_PARSE_DONE) { - TSHttpStatus status = TSHttpHdrStatusGet(state->hdr_buf_, state->hdr_loc_); - state->body_ = data_start; // data_start will now be pointing to body - state->body_size_ = data_end - data_start; - utils::internal::initResponse(state->response_, state->hdr_buf_, state->hdr_loc_); - LOG_DEBUG("Fetch result had a status code of %d with a body length of %ld", status, state->body_size_); - } else { - LOG_ERROR("Unable to parse response; Request URL [%s]; transaction %p", - state->request_.getUrl().getUrlString().c_str(), txn); + + if (state->streaming_flag_ == AsyncHttpFetch::STREAMING_DISABLED) { + if (event == static_cast(AsyncHttpFetch::RESULT_SUCCESS)) { + TSHttpTxn txn = static_cast(edata); + int data_len; + const char *data_start = TSFetchRespGet(txn, &data_len); + if (data_start && (data_len > 0)) { + const char *data_end = data_start + data_len; + TSHttpParser parser = TSHttpParserCreate(); + state->hdr_buf_ = TSMBufferCreate(); + state->hdr_loc_ = TSHttpHdrCreate(state->hdr_buf_); + TSHttpHdrTypeSet(state->hdr_buf_, state->hdr_loc_, TS_HTTP_TYPE_RESPONSE); + if (TSHttpHdrParseResp(parser, state->hdr_buf_, state->hdr_loc_, &data_start, data_end) == TS_PARSE_DONE) { + TSHttpStatus status = TSHttpHdrStatusGet(state->hdr_buf_, state->hdr_loc_); + state->body_ = data_start; // data_start will now be pointing to body + state->body_size_ = data_end - data_start; + utils::internal::initResponse(state->response_, state->hdr_buf_, state->hdr_loc_); + LOG_DEBUG("Fetch result had a status code of %d with a body length of %ld", status, state->body_size_); + } else { + LOG_ERROR("Unable to parse response; Request URL [%s]; transaction %p", + state->request_->getUrl().getUrlString().c_str(), txn); + event = static_cast(AsyncHttpFetch::RESULT_FAILURE); + } + TSHttpParserDestroy(parser); + } + else { + LOG_ERROR("Successful fetch did not result in any content. Assuming failure"); event = static_cast(AsyncHttpFetch::RESULT_FAILURE); } - TSHttpParserDestroy(parser); + state->result_ = static_cast(event); + } + } + else { + LOG_DEBUG("Handling streaming event %d", event); + if (event == static_cast(TS_FETCH_EVENT_EXT_HEAD_DONE)) { + utils::internal::initResponse(state->response_, TSFetchRespHdrMBufGet(state->fetch_sm_), + TSFetchRespHdrMLocGet(state->fetch_sm_)); + LOG_DEBUG("Response header initialized"); + state->result_ = AsyncHttpFetch::RESULT_HEADER_COMPLETE; } else { - LOG_ERROR("Successful fetch did not result in any content. Assuming failure"); - event = static_cast(AsyncHttpFetch::RESULT_FAILURE); + state->body_size_ = TSFetchReadData(state->fetch_sm_, state->body_buffer_, sizeof(state->body_buffer_)); + LOG_DEBUG("Read %zu bytes", state->body_size_); + state->result_ = (event == static_cast(TS_FETCH_EVENT_EXT_BODY_READY)) ? + AsyncHttpFetch::RESULT_PARTIAL_BODY : AsyncHttpFetch::RESULT_BODY_COMPLETE; } } - state->result_ = static_cast(event); if (!state->dispatch_controller_->dispatch()) { LOG_DEBUG("Unable to dispatch result from AsyncFetch because promise has died."); } - utils::internal::deleteAsyncHttpFetch(fetch_provider); // we must always cleans up when we're done. - TSContDestroy(cont); + if ((state->streaming_flag_ == AsyncHttpFetch::STREAMING_DISABLED) || + (state->result_ == AsyncHttpFetch::RESULT_BODY_COMPLETE)) { + LOG_DEBUG("Shutting down"); + utils::internal::deleteAsyncHttpFetch(fetch_provider); // we must always cleans up when we're done. + TSContDestroy(cont); + } return 0; } } AsyncHttpFetch::AsyncHttpFetch(const string &url_str, const string &request_body) { - init(url_str, HTTP_METHOD_POST, request_body); + init(url_str, HTTP_METHOD_POST, request_body, STREAMING_DISABLED); } AsyncHttpFetch::AsyncHttpFetch(const string &url_str, HttpMethod http_method) { - init(url_str, http_method, ""); + init(url_str, http_method, "", STREAMING_DISABLED); +} + +AsyncHttpFetch::AsyncHttpFetch(const string &url_str, StreamingFlag streaming_flag, const string &request_body) { + init(url_str, HTTP_METHOD_POST, request_body, streaming_flag); +} + +AsyncHttpFetch::AsyncHttpFetch(const string &url_str, StreamingFlag streaming_flag, HttpMethod http_method) { + init(url_str, http_method, "", streaming_flag); } -void AsyncHttpFetch::init(const string &url_str, HttpMethod http_method, const string &request_body) { +void AsyncHttpFetch::init(const string &url_str, HttpMethod http_method, const string &request_body, + StreamingFlag streaming_flag) { LOG_DEBUG("Created new AsyncHttpFetch object %p", this); - state_ = new AsyncHttpFetchState(url_str, http_method, request_body); + state_ = new AsyncHttpFetchState(url_str, http_method, request_body, streaming_flag); } void AsyncHttpFetch::run() { @@ -131,23 +176,12 @@ void AsyncHttpFetch::run() { TSCont fetchCont = TSContCreate(handleFetchEvents, TSMutexCreate()); TSContDataSet(fetchCont, static_cast(this)); // Providers have to clean themselves up when they are done. - TSFetchEvent event_ids; - event_ids.success_event_id = RESULT_SUCCESS; - event_ids.failure_event_id = RESULT_FAILURE; - event_ids.timeout_event_id = RESULT_TIMEOUT; - struct sockaddr_in addr; addr.sin_family = AF_INET; addr.sin_addr.s_addr = LOCAL_IP_ADDRESS; addr.sin_port = LOCAL_PORT; - string request_str(HTTP_METHOD_STRINGS[state_->request_.getMethod()]); - request_str += ' '; - request_str += state_->request_.getUrl().getUrlString(); - request_str += ' '; - request_str += HTTP_VERSION_STRINGS[state_->request_.getVersion()]; - request_str += "\r\n"; - Headers &headers = state_->request_.getHeaders(); + Headers &headers = state_->request_->getHeaders(); if (headers.size()) { // remove the possibility of keep-alive headers.erase("Connection"); @@ -156,19 +190,53 @@ void AsyncHttpFetch::run() { if (!state_->request_body_.empty()) { char size_buf[128]; snprintf(size_buf, sizeof(size_buf), "%zu", state_->request_body_.size()); - state_->request_.getHeaders().set("Content-Length", size_buf); + headers.set("Content-Length", size_buf); } - request_str += headers.wireStr(); - request_str += "\r\n"; - request_str += state_->request_body_; - LOG_DEBUG("Issing TSFetchUrl with request\n[%s]", request_str.c_str()); - TSFetchUrl(request_str.c_str(), request_str.size(), reinterpret_cast(&addr), fetchCont, - AFTER_BODY, event_ids); + if (state_->streaming_flag_ == STREAMING_DISABLED) { + TSFetchEvent event_ids; + event_ids.success_event_id = RESULT_SUCCESS; + event_ids.failure_event_id = RESULT_FAILURE; + event_ids.timeout_event_id = RESULT_TIMEOUT; + + string request_str(HTTP_METHOD_STRINGS[state_->request_->getMethod()]); + request_str += ' '; + request_str += state_->request_->getUrl().getUrlString(); + request_str += ' '; + request_str += HTTP_VERSION_STRINGS[state_->request_->getVersion()]; + request_str += "\r\n"; + request_str += headers.wireStr(); + request_str += "\r\n"; + request_str += state_->request_body_; + + LOG_DEBUG("Issing (non-streaming) TSFetchUrl with request\n[%s]", request_str.c_str()); + TSFetchUrl(request_str.c_str(), request_str.size(), reinterpret_cast(&addr), fetchCont, + AFTER_BODY, event_ids); + } + else { + state_->fetch_sm_ = TSFetchCreate(fetchCont, HTTP_METHOD_STRINGS[state_->request_->getMethod()].c_str(), + state_->request_->getUrl().getUrlString().c_str(), + HTTP_VERSION_STRINGS[state_->request_->getVersion()].c_str(), + reinterpret_cast(&addr), + TS_FETCH_FLAGS_STREAM | TS_FETCH_FLAGS_DECHUNK); + string header_value; + for (Headers::iterator iter = headers.begin(), end = headers.end(); iter != end; ++iter) { + HeaderFieldName header_name = (*iter).name(); + header_value = (*iter).values(); + TSFetchHeaderAdd(state_->fetch_sm_, header_name.c_str(), header_name.length(), header_value.data(), + header_value.size()); + } + LOG_DEBUG("Launching streaming fetch"); + TSFetchLaunch(state_->fetch_sm_); + if (state_->request_body_.size()) { + TSFetchWriteData(state_->fetch_sm_, state_->request_body_.data(), state_->request_body_.size()); + LOG_DEBUG("Wrote %zu bytes of data to fetch", state_->request_body_.size()); + } + } } Headers &AsyncHttpFetch::getRequestHeaders() { - return state_->request_.getHeaders(); + return state_->request_->getHeaders(); } AsyncHttpFetch::Result AsyncHttpFetch::getResult() const { @@ -176,7 +244,7 @@ AsyncHttpFetch::Result AsyncHttpFetch::getResult() const { } const Url &AsyncHttpFetch::getRequestUrl() const { - return state_->request_.getUrl(); + return state_->request_->getUrl(); } const string &AsyncHttpFetch::getRequestBody() const { diff --git a/lib/atscppapi/src/include/atscppapi/AsyncHttpFetch.h b/lib/atscppapi/src/include/atscppapi/AsyncHttpFetch.h index 161b9572bf3..35f5091d99e 100644 --- a/lib/atscppapi/src/include/atscppapi/AsyncHttpFetch.h +++ b/lib/atscppapi/src/include/atscppapi/AsyncHttpFetch.h @@ -41,13 +41,25 @@ namespace utils { class internal; } * makes HTTP requests asynchronously. This provider automatically * self-destructs after the completion of the request. * - * See example async_http_fetch for sample usage. + * See example async_http_fetch{,_streaming} for sample usage. */ class AsyncHttpFetch : public AsyncProvider { public: + /** Deprecated. Use variant with streaming flag argument */ AsyncHttpFetch(const std::string &url_str, HttpMethod http_method = HTTP_METHOD_GET); - AsyncHttpFetch(const std::string &url_str, const std::string &request_body); + /** Deprecated. Use variant with streaming flag argument */ + AsyncHttpFetch(const std::string &url_str, const std::string &request_body); + + enum StreamingFlag { + STREAMING_DISABLED = 0, + STREAMING_ENABLED = 0x1 + }; + + AsyncHttpFetch(const std::string &url_str, StreamingFlag streaming_flag, + HttpMethod http_method = HTTP_METHOD_GET); + + AsyncHttpFetch(const std::string &url_str, StreamingFlag streaming_flag, const std::string &request_body); /** * Used to manipulate the headers of the request to be made. @@ -56,10 +68,14 @@ class AsyncHttpFetch : public AsyncProvider { */ Headers &getRequestHeaders(); - enum Result { RESULT_SUCCESS = 10000, RESULT_TIMEOUT, RESULT_FAILURE }; + enum Result { RESULT_SUCCESS = 10000, RESULT_TIMEOUT, RESULT_FAILURE, RESULT_HEADER_COMPLETE, + RESULT_PARTIAL_BODY, RESULT_BODY_COMPLETE }; /** - * Used to extract the response after request completion. + * Used to extract the response after request completion. Without + * streaming, this can result success, failure or timeout. With + * streaming, this can result failure, timeout, header complete, + * partial body or body complete. * * @return Result of the operation */ @@ -76,7 +92,8 @@ class AsyncHttpFetch : public AsyncProvider { const std::string &getRequestBody() const; /** - * Used to extract the response after request completion. + * Used to extract the response after request completion (after + * RESULT_HEADER_COMPLETE in case of streaming). * * @return Non-mutable reference to the response. */ @@ -86,22 +103,25 @@ class AsyncHttpFetch : public AsyncProvider { * Used to extract the body of the response after request completion. On * unsuccessful completion, values (NULL, 0) are set. * + * When streaming is enabled, this can be called on either body result. + * * @param body Output argument; will point to the body - * @param body_size Output argument; will contain the size of the body - * + * @param body_size Output argument; will contain the size of the body + * */ void getResponseBody(const void *&body, size_t &body_size) const; /** * Starts a HTTP fetch of the Request contained. - */ + */ virtual void run(); protected: virtual ~AsyncHttpFetch(); private: AsyncHttpFetchState *state_; - void init(const std::string &url_str, HttpMethod http_method, const std::string &request_body); + void init(const std::string &url_str, HttpMethod http_method, const std::string &request_body, + StreamingFlag streaming_flag); friend class utils::internal; };