17 #ifndef PAGESPEED_CONTROLLER_REQUEST_RESULT_RPC_CLIENT_H_
18 #define PAGESPEED_CONTROLLER_REQUEST_RESULT_RPC_CLIENT_H_
22 #include "base/logging.h"
23 #include "pagespeed/controller/controller.grpc.pb.h"
28 #include "pagespeed/kernel/base/thread_annotations.h"
35 namespace net_instaweb {
40 template <
typename ReaderWriter>
56 rw()->Finish(&status_,
MakeFunction(
this, &RpcHolder::FinishSucceeded,
57 &RpcHolder::FinishFailed));
60 ::grpc::ClientContext* context() {
return &context_; }
62 MessageHandler* handler() {
return handler_; }
64 ReaderWriter* rw() {
return rw_.get(); }
66 void SetReaderWriter(std::unique_ptr<ReaderWriter> rw) {
67 DCHECK(rw_ ==
nullptr);
72 void FinishSucceeded() {
74 if (status_.error_code() != ::grpc::StatusCode::OK &&
75 status_.error_code() != ::grpc::StatusCode::CANCELLED) {
76 MessageType severity =
78 (status_.error_code() == ::grpc::StatusCode::ABORTED) ? kFatal :
84 "Received error status from CentralController: %d (%s)",
85 status_.error_code(), status_.error_message().c_str());
91 PS_LOG_WARN(handler_,
"RpcHolder Finish failed");
96 PS_LOG_WARN(handler_,
"RpcHolder cleanup to CentralController failed");
100 MessageHandler* handler_;
101 ::grpc::ClientContext context_;
102 std::unique_ptr<ReaderWriter> rw_;
103 ::grpc::Status status_;
119 template <
typename RequestT,
typename ResponseT,
typename CallbackT>
122 typedef ::grpc::ClientAsyncReaderWriterInterface<RequestT, ResponseT>
126 ::grpc::CompletionQueue* queue,
ThreadSystem* thread_system,
128 : mutex_(thread_system->
NewMutex()),
132 CHECK(callback_ !=
nullptr);
143 void Start(grpc::CentralControllerRpcService::StubInterface* stub) {
145 rpc_->SetReaderWriter(
146 StartRpc(stub, rpc_->context(), queue_,
147 MakeFunction(
this, &RequestResultRpcClient::BootStrapFinished,
148 &RequestResultRpcClient::StartUpFailed)));
157 if (rpc_ ==
nullptr) {
172 virtual std::unique_ptr<ReaderWriter> StartRpc(
173 grpc::CentralControllerRpcService::StubInterface* stub,
174 ::grpc::ClientContext* context, ::grpc::CompletionQueue* queue,
181 virtual void PopulateServerRequest(RequestT* request) = 0;
184 void BootStrapFinished() {
187 PopulateServerRequest(&req);
190 MakeFunction(
this, &RequestResultRpcClient::WriteServerRequestComplete,
191 &RequestResultRpcClient::StartUpFailed));
194 void WriteServerRequestComplete() {
198 this, &RequestResultRpcClient::NotifyClientOfServerDecision,
199 &RequestResultRpcClient::StartUpFailed));
202 void NotifyClientOfServerDecision() {
204 DCHECK(rpc_ !=
nullptr);
207 bool ok_to_proceed = resp_.ok_to_proceed();
210 CallbackT* cb = callback_;
229 void StartUpFailed() LOCKS_EXCLUDED(mutex_) {
231 DCHECK(rpc_ !=
nullptr);
232 if (rpc_ !=
nullptr) {
233 PS_LOG_WARN(rpc_->handler(),
234 "Couldn't get response from CentralController");
240 RpcHolder<ReaderWriter>* rpc = rpc_.release();
243 Function* cb = callback_;
252 std::unique_ptr<AbstractMutex> mutex_;
253 ::grpc::CompletionQueue* queue_;
254 CallbackT* callback_ GUARDED_BY(mutex_);
255 std::unique_ptr<RpcHolder<ReaderWriter>> rpc_ GUARDED_BY(mutex_);
256 ResponseT resp_ GUARDED_BY(mutex_);
Function * MakeFunction(C *object, void(C::*run)())
Makes a Function* that calls a 0-arg class method.
Definition: function.h:291
Definition: request_result_rpc_client.h:41
void Start(grpc::CentralControllerRpcService::StubInterface *stub)
Actually start the RPC by having the client call RequestFoo on the stub.
Definition: request_result_rpc_client.h:143
virtual ~RequestResultRpcClient()
Definition: request_result_rpc_client.h:135
void SendResultToServer(const RequestT &response)
Definition: request_result_rpc_client.h:155
Definition: request_result_rpc_client.h:120
Definition: function.h:47
#define ScopedMutex(x)
Definition: abstract_mutex.h:69
Helper class for lexically scoped mutexing.
Definition: abstract_mutex.h:46
void Message(MessageType type, const char *msg,...) INSTAWEB_PRINTF_FORMAT(3
Log an info, warning, error or fatal error message.
Function * CallbackForAsyncCleanup()
Definition: request_result_rpc_client.h:48
void Finish()
Definition: request_result_rpc_client.h:55
Definition: thread_system.h:40
Definition: message_handler.h:39
virtual CondvarCapableMutex * NewMutex()=0