19 #ifndef PAGESPEED_SYSTEM_REDIS_CACHE_H_
20 #define PAGESPEED_SYSTEM_REDIS_CACHE_H_
25 #include <initializer_list>
37 #include "pagespeed/kernel/base/thread_annotations.h"
40 #include "pagespeed/kernel/cache/cache_interface.h"
43 #include "third_party/hiredis/src/hiredis.h"
45 namespace net_instaweb {
82 int64 reconnection_delay_ms, int64 timeout_us,
90 void StartUp(
bool connect_now =
true);
98 static GoogleString FormatName() {
return "RedisCache"; }
110 static int HashSlot(StringPiece key);
115 return redirections_->Get();
121 return cluster_slots_fetches_->Get();
125 struct RedisReplyDeleter {
126 void operator()(redisReply* ptr) {
127 if (ptr !=
nullptr) {
128 freeReplyObject(ptr);
132 typedef std::unique_ptr<redisReply, RedisReplyDeleter> RedisReply;
134 struct RedisContextDeleter {
135 void operator()(redisContext* ptr) {
136 if (ptr !=
nullptr) {
141 typedef std::unique_ptr<redisContext, RedisContextDeleter> RedisContext;
145 Connection(
RedisCache* redis_cache, StringPiece host,
int port,
148 void StartUp(
bool connect_now =
true)
149 LOCKS_EXCLUDED(redis_mutex_, state_mutex_);
150 bool IsHealthy() const LOCKS_EXCLUDED(redis_mutex_, state_mutex_);
151 void ShutDown() LOCKS_EXCLUDED(redis_mutex_, state_mutex_);
154 return StrCat(host_,
":", IntegerToString(port_));
157 AbstractMutex* GetOperationMutex() const LOCK_RETURNED(redis_mutex_) {
158 return redis_mutex_.get();
162 RedisReply RedisCommand(
const char* format, va_list args)
163 EXCLUSIVE_LOCKS_REQUIRED(redis_mutex_) LOCKS_EXCLUDED(state_mutex_);
164 RedisReply RedisCommand(const
char* format, ...)
165 EXCLUSIVE_LOCKS_REQUIRED(redis_mutex_) LOCKS_EXCLUDED(state_mutex_);
167 bool ValidateRedisReply(const RedisReply& reply,
168 std::initializer_list<
int> valid_types,
169 const
char* command_executed)
170 EXCLUSIVE_LOCKS_REQUIRED(redis_mutex_) LOCKS_EXCLUDED(state_mutex_);
180 bool IsHealthyLockHeld() const EXCLUSIVE_LOCKS_REQUIRED(state_mutex_);
181 void UpdateState() EXCLUSIVE_LOCKS_REQUIRED(redis_mutex_, state_mutex_);
184 bool EnsureConnectionAndDatabaseSelection()
185 EXCLUSIVE_LOCKS_REQUIRED(redis_mutex_)
186 LOCKS_EXCLUDED(state_mutex_);
187 bool EnsureConnection() EXCLUSIVE_LOCKS_REQUIRED(redis_mutex_)
188 LOCKS_EXCLUDED(state_mutex_);
189 bool EnsureDatabaseSelection() EXCLUSIVE_LOCKS_REQUIRED(state_mutex_)
190 LOCKS_EXCLUDED(state_mutex_);
192 RedisContext TryConnect() LOCKS_EXCLUDED(redis_mutex_, state_mutex_);
194 void LogRedisContextError(redisContext* redis, const
char* cause);
199 const scoped_ptr<AbstractMutex> redis_mutex_;
200 const scoped_ptr<AbstractMutex> state_mutex_;
202 RedisContext redis_ GUARDED_BY(redis_mutex_);
203 State state_ GUARDED_BY(state_mutex_);
204 int64 next_reconnect_at_ms_ GUARDED_BY(state_mutex_);
208 const
int database_index_;
212 typedef std::map<
GoogleString, std::unique_ptr<Connection>> ConnectionsMap;
214 struct ClusterMapping {
216 ClusterMapping(
int start_slot_range,
218 Connection* connection) :
219 start_slot_range_(start_slot_range),
220 end_slot_range_(end_slot_range),
221 connection_(connection) {}
222 int start_slot_range_;
224 Connection* connection_;
236 RedisReply RedisCommand(Connection* connection,
const char* format,
237 std::initializer_list<int> valid_reply_types, ...);
239 ThreadSynchronizer* GetThreadSynchronizerForTesting()
const {
240 return thread_synchronizer_.get();
243 ExternalServerSpec ParseRedirectionError(StringPiece error);
247 Connection* GetOrCreateConnection(ExternalServerSpec spec,
248 const int database_index);
251 void FetchClusterSlotMapping(Connection* connection)
252 LOCKS_EXCLUDED(cluster_map_lock_);
258 Connection* LookupConnection(StringPiece key)
259 LOCKS_EXCLUDED(cluster_map_lock_);
262 const int main_port_;
263 ThreadSystem* thread_system_;
264 MessageHandler* message_handler_;
266 const int64 reconnection_delay_ms_;
267 const int64 timeout_us_;
268 const scoped_ptr<ThreadSynchronizer> thread_synchronizer_;
269 const scoped_ptr<ThreadSystem::RWLock> connections_lock_;
270 const scoped_ptr<ThreadSystem::RWLock> cluster_map_lock_;
271 Variable* redirections_;
272 Variable* cluster_slots_fetches_;
276 ConnectionsMap connections_ GUARDED_BY(connections_lock_);
277 std::vector<ClusterMapping> cluster_mappings_ GUARDED_BY(cluster_map_lock_);
280 Connection* main_connection_;
282 const int database_index_;
284 friend class RedisCacheTest;
void Get(const GoogleString &key, Callback *callback) override
CacheInterface implementations.
int64 ClusterSlotsFetches()
Definition: redis_cache.h:120
Abstract interface for a cache.
Definition: cache_interface.h:32
bool IsHealthy() const override
bool IsBlocking() const override
Definition: redis_cache.h:94
Base class for implementations of monitoring statistics.
Definition: statistics.h:342
GoogleString Name() const override
CacheInterface implementations.
Definition: redis_cache.h:93
Definition: redis_cache.h:75
RedisCache(StringPiece host, int port, ThreadSystem *thread_system, MessageHandler *message_handler, Timer *timer, int64 reconnection_delay_ms, int64 timeout_us, Statistics *stats, int database_index)
std::string GoogleString
PAGESPEED_KERNEL_BASE_STRING_H_.
Definition: string.h:24
static int HashSlot(StringPiece key)
Redis spec defined hasher for keys. Static, since it's a pure function.
void Put(const GoogleString &key, const SharedString &value) override
Definition: thread_system.h:40
Definition: message_handler.h:39
void GetStatus(GoogleString *status_string)
int64 Redirections()
Definition: redis_cache.h:114
Timer interface, made virtual so it can be mocked for tests.
Definition: timer.h:27