Page Speed Optimization Libraries  1.13.35.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
request_result_rpc_client.h
Go to the documentation of this file.
1 // Copyright 2016 Google Inc.
16 
17 #ifndef PAGESPEED_CONTROLLER_REQUEST_RESULT_RPC_CLIENT_H_
18 #define PAGESPEED_CONTROLLER_REQUEST_RESULT_RPC_CLIENT_H_
19 
20 #include <memory>
21 
22 #include "base/logging.h"
23 #include "pagespeed/controller/controller.grpc.pb.h"
28 #include "pagespeed/kernel/base/thread_annotations.h"
30 
34 
35 namespace net_instaweb {
36 
40 template <typename ReaderWriter>
41 class RpcHolder {
42  public:
43  RpcHolder(MessageHandler* handler) : handler_(handler) {}
44 
49  return MakeFunction(this, &RpcHolder::Finish, &RpcHolder::Error);
50  }
51 
55  void Finish() {
56  rw()->Finish(&status_, MakeFunction(this, &RpcHolder::FinishSucceeded,
57  &RpcHolder::FinishFailed));
58  }
59 
60  ::grpc::ClientContext* context() { return &context_; }
61 
62  MessageHandler* handler() { return handler_; }
63 
64  ReaderWriter* rw() { return rw_.get(); }
65 
66  void SetReaderWriter(std::unique_ptr<ReaderWriter> rw) {
67  DCHECK(rw_ == nullptr);
68  rw_ = std::move(rw);
69  }
70 
71  private:
72  void FinishSucceeded() {
74  if (status_.error_code() != ::grpc::StatusCode::OK &&
75  status_.error_code() != ::grpc::StatusCode::CANCELLED) {
76  MessageType severity =
77 #ifndef NDEBUG
78  (status_.error_code() == ::grpc::StatusCode::ABORTED) ? kFatal :
81 #endif
82  kWarning;
83  handler_->Message(severity,
84  "Received error status from CentralController: %d (%s)",
85  status_.error_code(), status_.error_message().c_str());
86  }
87  delete this;
88  }
89 
90  void FinishFailed() {
91  PS_LOG_WARN(handler_, "RpcHolder Finish failed");
92  delete this;
93  }
94 
95  void Error() {
96  PS_LOG_WARN(handler_, "RpcHolder cleanup to CentralController failed");
97  Finish();
98  }
99 
100  MessageHandler* handler_;
101  ::grpc::ClientContext context_;
102  std::unique_ptr<ReaderWriter> rw_;
103  ::grpc::Status status_;
104 };
105 
119 template <typename RequestT, typename ResponseT, typename CallbackT>
121  public:
122  typedef ::grpc::ClientAsyncReaderWriterInterface<RequestT, ResponseT>
123  ReaderWriter;
124 
126  ::grpc::CompletionQueue* queue, ThreadSystem* thread_system,
127  MessageHandler* handler, CallbackT* callback)
128  : mutex_(thread_system->NewMutex()),
129  queue_(queue),
130  callback_(callback),
131  rpc_(new RpcHolder<ReaderWriter>(handler)) {
132  CHECK(callback_ != nullptr);
133  }
134 
140  }
141 
143  void Start(grpc::CentralControllerRpcService::StubInterface* stub) {
144  ScopedMutex lock(mutex_.get());
145  rpc_->SetReaderWriter(
146  StartRpc(stub, rpc_->context(), queue_,
147  MakeFunction(this, &RequestResultRpcClient::BootStrapFinished,
148  &RequestResultRpcClient::StartUpFailed)));
149  }
150 
155  void SendResultToServer(const RequestT& response) {
156  ScopedMutex lock(mutex_.get());
157  if (rpc_ == nullptr) {
158  return;
159  }
165  RpcHolder<ReaderWriter>* rpc = rpc_.release();
166  rpc->rw()->Write(response, rpc->CallbackForAsyncCleanup());
167  }
168 
169  private:
172  virtual std::unique_ptr<ReaderWriter> StartRpc(
173  grpc::CentralControllerRpcService::StubInterface* stub,
174  ::grpc::ClientContext* context, ::grpc::CompletionQueue* queue,
175  void* tag) = 0;
176 
181  virtual void PopulateServerRequest(RequestT* request) = 0;
182 
184  void BootStrapFinished() {
185  ScopedMutex lock(mutex_.get());
186  RequestT req;
187  PopulateServerRequest(&req);
188  rpc_->rw()->Write(
189  req,
190  MakeFunction(this, &RequestResultRpcClient::WriteServerRequestComplete,
191  &RequestResultRpcClient::StartUpFailed));
192  }
193 
194  void WriteServerRequestComplete() {
195  ScopedMutex lock(mutex_.get());
196  rpc_->rw()->Read(
197  &resp_, MakeFunction(
198  this, &RequestResultRpcClient::NotifyClientOfServerDecision,
199  &RequestResultRpcClient::StartUpFailed));
200  }
201 
202  void NotifyClientOfServerDecision() {
203  ScopedMutex lock(mutex_.get());
204  DCHECK(rpc_ != nullptr);
207  bool ok_to_proceed = resp_.ok_to_proceed();
208  resp_.Clear();
209 
210  CallbackT* cb = callback_;
211  callback_ = nullptr;
212 
213  if (ok_to_proceed) {
214  lock.Release();
215  cb->CallRun();
216  return;
218  } else {
220  rpc_.reset();
221  lock.Release();
222  cb->CallCancel();
223  return;
224  }
225  }
226 
229  void StartUpFailed() LOCKS_EXCLUDED(mutex_) {
230  ScopedMutex lock(mutex_.get());
231  DCHECK(rpc_ != nullptr);
232  if (rpc_ != nullptr) {
233  PS_LOG_WARN(rpc_->handler(),
234  "Couldn't get response from CentralController");
240  RpcHolder<ReaderWriter>* rpc = rpc_.release();
241  rpc->Finish();
242 
243  Function* cb = callback_;
244  callback_ = nullptr;
245 
246  lock.Release();
247  cb->CallCancel();
248  return;
249  }
250  }
251 
252  std::unique_ptr<AbstractMutex> mutex_;
253  ::grpc::CompletionQueue* queue_;
254  CallbackT* callback_ GUARDED_BY(mutex_);
255  std::unique_ptr<RpcHolder<ReaderWriter>> rpc_ GUARDED_BY(mutex_);
256  ResponseT resp_ GUARDED_BY(mutex_);
257 };
258 
259 }
260 
261 #endif
Function * MakeFunction(C *object, void(C::*run)())
Makes a Function* that calls a 0-arg class method.
Definition: function.h:291
Definition: request_result_rpc_client.h:41
void Start(grpc::CentralControllerRpcService::StubInterface *stub)
Actually start the RPC by having the client call RequestFoo on the stub.
Definition: request_result_rpc_client.h:143
virtual ~RequestResultRpcClient()
Definition: request_result_rpc_client.h:135
void SendResultToServer(const RequestT &response)
Definition: request_result_rpc_client.h:155
Definition: request_result_rpc_client.h:120
Definition: function.h:47
#define ScopedMutex(x)
Definition: abstract_mutex.h:69
Helper class for lexically scoped mutexing.
Definition: abstract_mutex.h:46
void Message(MessageType type, const char *msg,...) INSTAWEB_PRINTF_FORMAT(3
Log an info, warning, error or fatal error message.
Function * CallbackForAsyncCleanup()
Definition: request_result_rpc_client.h:48
void Finish()
Definition: request_result_rpc_client.h:55
Definition: thread_system.h:40
Definition: message_handler.h:39
virtual CondvarCapableMutex * NewMutex()=0