17 #ifndef PAGESPEED_CONTROLLER_REQUEST_RESULT_RPC_HANDLER_H_
18 #define PAGESPEED_CONTROLLER_REQUEST_RESULT_RPC_HANDLER_H_
20 #include "base/logging.h"
42 namespace net_instaweb {
44 template <
typename HandlerT,
typename ControllerT,
typename AsyncServiceT,
45 typename RequestT,
typename ResponseT>
47 :
public RpcHandler<AsyncServiceT, RequestT, ResponseT> {
56 ::grpc::ServerCompletionQueue* cq,
57 ControllerT* controller) {
58 (
new HandlerT(service, cq, controller))->
Start();
63 ::grpc::ServerCompletionQueue* cq,
64 ControllerT* controller)
66 controller_(controller),
70 void Finish(const ::grpc::Status& status) {
75 ControllerT* controller() {
return controller_; }
83 WAITING_FOR_CONTROLLER,
90 class NotifyClientCallback :
public Function {
92 NotifyClientCallback(RequestResultRpcHandler* handler)
93 : handler_(handler) {}
95 void Run()
override { handler_->NotifyClient(
true ); }
96 void Cancel()
override { handler_->NotifyClient(
false ); }
102 RequestResultRpcHandler::RefPtr handler_;
109 virtual void HandleClientRequest(
const RequestT& req, Function* cb) = 0;
110 virtual void HandleClientResult(
const RequestT& req) = 0;
115 virtual void HandleOperationFailed() = 0;
119 void NotifyClient(
bool ok_to_rewrite);
122 void HandleRequest(
const RequestT& req)
override;
123 void HandleError()
override;
125 RpcHandler<AsyncServiceT, RequestT, ResponseT>* CreateHandler(
126 AsyncServiceT* service, ::grpc::ServerCompletionQueue* cq)
override {
127 return new HandlerT(service, cq, controller());
131 void InitResponder(AsyncServiceT* service, ::grpc::ServerContext* ctx,
133 ResponseT>::ReaderWriterT* responder,
134 ::grpc::ServerCompletionQueue* cq,
135 void* callback)
override = 0;
139 ControllerT* controller_;
142 friend class NotifyClientCallback;
143 friend class RequestResultRpcHandlerTest;
148 template <
typename HandlerT,
typename ControllerT,
typename AsyncServiceT,
149 typename RequestT,
typename ResponseT>
150 void RequestResultRpcHandler<HandlerT, ControllerT, AsyncServiceT, RequestT,
151 ResponseT>::HandleRequest(
const RequestT& req) {
154 state_ = WAITING_FOR_CONTROLLER;
155 HandleClientRequest(req,
new NotifyClientCallback(
this));
158 case OPERATION_RUNNING:
159 HandleClientResult(req);
162 Finish(::grpc::Status());
166 LOG(ERROR) <<
"HandleRequest in unexpected state: " << state_;
167 Finish(::grpc::Status(::grpc::StatusCode::ABORTED,
168 "State machine error (HandleRequest)"));
172 template <
typename HandlerT,
typename ControllerT,
typename AsyncServiceT,
173 typename RequestT,
typename ResponseT>
174 void RequestResultRpcHandler<HandlerT, ControllerT, AsyncServiceT, RequestT,
175 ResponseT>::HandleError() {
176 if (state_ == OPERATION_RUNNING) {
177 HandleOperationFailed();
184 template <
typename HandlerT,
typename ControllerT,
typename AsyncServiceT,
185 typename RequestT,
typename ResponseT>
186 void RequestResultRpcHandler<HandlerT, ControllerT, AsyncServiceT, RequestT,
187 ResponseT>::NotifyClient(
bool ok_to_proceed) {
188 if (state_ != WAITING_FOR_CONTROLLER) {
193 HandleOperationFailed();
195 if (state_ != DONE) {
198 Finish(::grpc::Status(::grpc::StatusCode::ABORTED,
199 "State machine error (NotifyClient)"));
200 LOG(DFATAL) <<
"NotifyClient in unexpected state: " << state_;
209 resp.set_ok_to_proceed(ok_to_proceed);
210 bool write_ok = this->Write(resp);
211 if (write_ok && ok_to_proceed) {
212 state_ = OPERATION_RUNNING;
213 }
else if (write_ok && !ok_to_proceed) {
215 Finish(::grpc::Status());
218 HandleOperationFailed();
RpcHandler(AsyncServiceT *service,::grpc::ServerCompletionQueue *cq)
Definition: rpc_handler.h:55
bool Finish(const ::grpc::Status &status)
Definition: rpc_handler.h:226
static void CreateAndStart(AsyncServiceT *service,::grpc::ServerCompletionQueue *cq, ControllerT *controller)
Definition: request_result_rpc_handler.h:55
void Finish(const ::grpc::Status &status)
Hide the parent implementation so we can frob our own state machine.
Definition: request_result_rpc_handler.h:70
Definition: request_result_rpc_handler.h:46