From 632a1785e4a9965082421c9a710f3606807d0548 Mon Sep 17 00:00:00 2001 From: Dvir Volk Date: Sun, 2 Jul 2017 16:57:16 +0300 Subject: [PATCH] added periodic task runner --- rmutil/periodic.c | 76 ++++++++++++++++++++++++++++++++++++++++++ rmutil/periodic.h | 27 +++++++++++++++ rmutil/test_periodic.c | 27 +++++++++++++++ 3 files changed, 130 insertions(+) create mode 100644 rmutil/periodic.c create mode 100644 rmutil/periodic.h create mode 100644 rmutil/test_periodic.c diff --git a/rmutil/periodic.c b/rmutil/periodic.c new file mode 100644 index 0000000..3cdd997 --- /dev/null +++ b/rmutil/periodic.c @@ -0,0 +1,76 @@ +#include "periodic.h" +#include +#include +#include + +typedef struct RMUtilTimer { + RMutilTimerFunc cb; + void *privdata; + struct timespec interval; + pthread_t thread; + pthread_mutex_t lock; + pthread_cond_t cond; +} RMUtilTimer; + +static struct timespec timespecAdd(struct timespec *a, struct timespec *b) { + struct timespec ret; + ret.tv_sec = a->tv_sec + b->tv_sec; + + long long ns = a->tv_nsec + b->tv_nsec; + ret.tv_sec += ns / 1000000000; + ret.tv_nsec = ns % 1000000000; + return ret; +} + +static void *rmutilTimer_Loop(void *ctx) { + RMUtilTimer *tm = ctx; + + int rc = ETIMEDOUT; + struct timespec ts; + + pthread_mutex_lock(&tm->lock); + while (rc != 0) { + clock_gettime(CLOCK_REALTIME, &ts); + struct timespec timeout = timespecAdd(&ts, &tm->interval); + if ((rc = pthread_cond_timedwait(&tm->cond, &tm->lock, &timeout)) == ETIMEDOUT) { + + // Create a thread safe context if we're running inside redis + RedisModuleCtx *rctx = NULL; + if (RedisModule_GetThreadSafeContext) rctx = RedisModule_GetThreadSafeContext(NULL); + + // call our callback... + tm->cb(rctx, tm->privdata); + + // If needed - free the thread safe context. + // It's up to the user to decide whether automemory is active there + if (rctx) RedisModule_FreeThreadSafeContext(rctx); + } + } + // RedisModule_Log(tm->redisCtx, "notice", "Timer cancelled"); + + return NULL; +} + +RMUtilTimer *RMUtil_NewPeriodicTimer(RMutilTimerFunc cb, void *privdata, struct timespec interval) { + RMUtilTimer *ret = malloc(sizeof(*ret)); + *ret = (RMUtilTimer){ + .privdata = privdata, .interval = interval, .cb = cb, + }; + pthread_cond_init(&ret->cond, NULL); + pthread_mutex_init(&ret->lock, NULL); + + pthread_create(&ret->thread, NULL, rmutilTimer_Loop, ret); + return ret; +} + +int RMUtilTimer_Stop(RMUtilTimer *t) { + int rc; + if (0 == (rc = pthread_cond_signal(&t->cond))) { + rc = pthread_join(t->thread, NULL); + } + return rc; +} + +void RMUtilTimer_Free(RMUtilTimer *t) { + free(t); +} diff --git a/rmutil/periodic.h b/rmutil/periodic.h new file mode 100644 index 0000000..ddd2aa8 --- /dev/null +++ b/rmutil/periodic.h @@ -0,0 +1,27 @@ +#ifndef RMUTIL_PERIODIC_H_ +#define RMUTIL_PERIODIC_H_ +#include +#include + +/** periodic.h - Utility periodic timer running a task repeatedly every given time interval */ + +/* RMUtilTimer - opaque context for the timer */ +struct RMUtilTimer; + +/* RMutilTimerFunc - callback type for timer tasks. The ctx is a thread-safe redis module context + * that should be locked/unlocked by the callback when running stuff against redis. privdata is + * pre-existing private data */ +typedef void (*RMutilTimerFunc)(RedisModuleCtx *ctx, void *privdata); + +/* Create and start a new periodic timer. Each timer has its own thread and can only be run and + * stopped once. The timer runs `cb` every `interval` with `privdata` passed to the callback. */ +struct RMUtilTimer *RMUtil_NewPeriodicTimer(RMutilTimerFunc cb, void *privdata, + struct timespec interval); + +/* Stop the timer loop. This should return immediately and join the thread */ +int RMUtilTimer_Stop(struct RMUtilTimer *t); + +/* Free the timer context. The caller should be responsible for freeing the private data at this + * point */ +void RMUtilTimer_Free(struct RMUtilTimer *t); +#endif \ No newline at end of file diff --git a/rmutil/test_periodic.c b/rmutil/test_periodic.c new file mode 100644 index 0000000..030a021 --- /dev/null +++ b/rmutil/test_periodic.c @@ -0,0 +1,27 @@ +#include +#include +#include +#include "periodic.h" +#include "assert.h" +#include "test.h" + +void timerCb(RedisModuleCtx *ctx, void *p) { + int *x = p; + (*x)++; +} + +int testPeriodic() { + int x = 0; + struct RMUtilTimer *tm = + RMUtil_NewPeriodicTimer(timerCb, &x, (struct timespec){.tv_sec = 0, .tv_nsec = 10000000}); + + sleep(1); + + ASSERT_EQUAL(0, RMUtilTimer_Stop(tm)); + ASSERT(x > 0); + ASSERT(x <= 100); + RMUtilTimer_Free(tm); + return 0; +} + +TEST_MAIN({ TESTFUNC(testPeriodic); });