Page Speed Optimization Libraries  1.13.35.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
request_result_rpc_handler.h
Go to the documentation of this file.
1 // Copyright 2016 Google Inc.
16 
17 #ifndef PAGESPEED_CONTROLLER_REQUEST_RESULT_RPC_HANDLER_H_
18 #define PAGESPEED_CONTROLLER_REQUEST_RESULT_RPC_HANDLER_H_
19 
20 #include "base/logging.h"
26 
41 
42 namespace net_instaweb {
43 
44 template <typename HandlerT, typename ControllerT, typename AsyncServiceT,
45  typename RequestT, typename ResponseT>
47  : public RpcHandler<AsyncServiceT, RequestT, ResponseT> {
48  public:
50 
51  virtual ~RequestResultRpcHandler() {}
52 
55  static void CreateAndStart(AsyncServiceT* service,
56  ::grpc::ServerCompletionQueue* cq,
57  ControllerT* controller) {
58  (new HandlerT(service, cq, controller))->Start();
59  }
60 
61  protected:
62  RequestResultRpcHandler(AsyncServiceT* service,
63  ::grpc::ServerCompletionQueue* cq,
64  ControllerT* controller)
65  : RpcHandler<AsyncServiceT, RequestT, ResponseT>::RpcHandler(service, cq),
66  controller_(controller),
67  state_(INIT) {}
68 
70  void Finish(const ::grpc::Status& status) {
71  state_ = DONE;
73  }
74 
75  ControllerT* controller() { return controller_; }
76 
77  private:
81  enum State {
82  INIT,
83  WAITING_FOR_CONTROLLER,
84  OPERATION_RUNNING,
85  DONE,
86  };
87 
90  class NotifyClientCallback : public Function {
91  public:
92  NotifyClientCallback(RequestResultRpcHandler* handler)
93  : handler_(handler) {}
94 
95  void Run() override { handler_->NotifyClient(true ); }
96  void Cancel() override { handler_->NotifyClient(false ); }
97 
98  private:
102  RequestResultRpcHandler::RefPtr handler_;
103  };
104 
109  virtual void HandleClientRequest(const RequestT& req, Function* cb) = 0;
110  virtual void HandleClientResult(const RequestT& req) = 0;
111 
115  virtual void HandleOperationFailed() = 0;
116 
119  void NotifyClient(bool ok_to_rewrite);
120 
122  void HandleRequest(const RequestT& req) override;
123  void HandleError() override;
124 
125  RpcHandler<AsyncServiceT, RequestT, ResponseT>* CreateHandler(
126  AsyncServiceT* service, ::grpc::ServerCompletionQueue* cq) override {
127  return new HandlerT(service, cq, controller());
128  }
129 
131  void InitResponder(AsyncServiceT* service, ::grpc::ServerContext* ctx,
132  typename RpcHandler<AsyncServiceT, RequestT,
133  ResponseT>::ReaderWriterT* responder,
134  ::grpc::ServerCompletionQueue* cq,
135  void* callback) override = 0;
136 
139  ControllerT* controller_;
140  State state_;
141 
142  friend class NotifyClientCallback;
143  friend class RequestResultRpcHandlerTest;
144 
145 
146 };
147 
148 template <typename HandlerT, typename ControllerT, typename AsyncServiceT,
149  typename RequestT, typename ResponseT>
150 void RequestResultRpcHandler<HandlerT, ControllerT, AsyncServiceT, RequestT,
151  ResponseT>::HandleRequest(const RequestT& req) {
152  switch (state_) {
153  case INIT:
154  state_ = WAITING_FOR_CONTROLLER;
155  HandleClientRequest(req, new NotifyClientCallback(this));
156  break;
157 
158  case OPERATION_RUNNING:
159  HandleClientResult(req);
162  Finish(::grpc::Status());
163  break;
164 
165  default:
166  LOG(ERROR) << "HandleRequest in unexpected state: " << state_;
167  Finish(::grpc::Status(::grpc::StatusCode::ABORTED,
168  "State machine error (HandleRequest)"));
169  }
170 }
171 
172 template <typename HandlerT, typename ControllerT, typename AsyncServiceT,
173  typename RequestT, typename ResponseT>
174 void RequestResultRpcHandler<HandlerT, ControllerT, AsyncServiceT, RequestT,
175  ResponseT>::HandleError() {
176  if (state_ == OPERATION_RUNNING) {
177  HandleOperationFailed();
178  }
181  state_ = DONE;
182 }
183 
184 template <typename HandlerT, typename ControllerT, typename AsyncServiceT,
185  typename RequestT, typename ResponseT>
186 void RequestResultRpcHandler<HandlerT, ControllerT, AsyncServiceT, RequestT,
187  ResponseT>::NotifyClient(bool ok_to_proceed) {
188  if (state_ != WAITING_FOR_CONTROLLER) {
192  if (ok_to_proceed) {
193  HandleOperationFailed();
194  }
195  if (state_ != DONE) {
198  Finish(::grpc::Status(::grpc::StatusCode::ABORTED,
199  "State machine error (NotifyClient)"));
200  LOG(DFATAL) << "NotifyClient in unexpected state: " << state_;
201  }
202  return;
203  }
204 
208  ResponseT resp;
209  resp.set_ok_to_proceed(ok_to_proceed);
210  bool write_ok = this->Write(resp);
211  if (write_ok && ok_to_proceed) {
212  state_ = OPERATION_RUNNING;
213  } else if (write_ok && !ok_to_proceed) {
215  Finish(::grpc::Status());
216  } else {
218  HandleOperationFailed();
219  state_ = DONE;
220  }
221 }
222 
223 }
224 
225 #endif
RpcHandler(AsyncServiceT *service,::grpc::ServerCompletionQueue *cq)
Definition: rpc_handler.h:55
bool Finish(const ::grpc::Status &status)
Definition: rpc_handler.h:226
static void CreateAndStart(AsyncServiceT *service,::grpc::ServerCompletionQueue *cq, ControllerT *controller)
Definition: request_result_rpc_handler.h:55
void Finish(const ::grpc::Status &status)
Hide the parent implementation so we can frob our own state machine.
Definition: request_result_rpc_handler.h:70
Definition: request_result_rpc_handler.h:46