added periodic task runner
parent
dc85ee44bf
commit
632a1785e4
@ -0,0 +1,76 @@
|
|||||||
|
#include "periodic.h"
|
||||||
|
#include <pthread.h>
|
||||||
|
#include <stdlib.h>
|
||||||
|
#include <errno.h>
|
||||||
|
|
||||||
|
typedef struct RMUtilTimer {
|
||||||
|
RMutilTimerFunc cb;
|
||||||
|
void *privdata;
|
||||||
|
struct timespec interval;
|
||||||
|
pthread_t thread;
|
||||||
|
pthread_mutex_t lock;
|
||||||
|
pthread_cond_t cond;
|
||||||
|
} RMUtilTimer;
|
||||||
|
|
||||||
|
static struct timespec timespecAdd(struct timespec *a, struct timespec *b) {
|
||||||
|
struct timespec ret;
|
||||||
|
ret.tv_sec = a->tv_sec + b->tv_sec;
|
||||||
|
|
||||||
|
long long ns = a->tv_nsec + b->tv_nsec;
|
||||||
|
ret.tv_sec += ns / 1000000000;
|
||||||
|
ret.tv_nsec = ns % 1000000000;
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void *rmutilTimer_Loop(void *ctx) {
|
||||||
|
RMUtilTimer *tm = ctx;
|
||||||
|
|
||||||
|
int rc = ETIMEDOUT;
|
||||||
|
struct timespec ts;
|
||||||
|
|
||||||
|
pthread_mutex_lock(&tm->lock);
|
||||||
|
while (rc != 0) {
|
||||||
|
clock_gettime(CLOCK_REALTIME, &ts);
|
||||||
|
struct timespec timeout = timespecAdd(&ts, &tm->interval);
|
||||||
|
if ((rc = pthread_cond_timedwait(&tm->cond, &tm->lock, &timeout)) == ETIMEDOUT) {
|
||||||
|
|
||||||
|
// Create a thread safe context if we're running inside redis
|
||||||
|
RedisModuleCtx *rctx = NULL;
|
||||||
|
if (RedisModule_GetThreadSafeContext) rctx = RedisModule_GetThreadSafeContext(NULL);
|
||||||
|
|
||||||
|
// call our callback...
|
||||||
|
tm->cb(rctx, tm->privdata);
|
||||||
|
|
||||||
|
// If needed - free the thread safe context.
|
||||||
|
// It's up to the user to decide whether automemory is active there
|
||||||
|
if (rctx) RedisModule_FreeThreadSafeContext(rctx);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// RedisModule_Log(tm->redisCtx, "notice", "Timer cancelled");
|
||||||
|
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
RMUtilTimer *RMUtil_NewPeriodicTimer(RMutilTimerFunc cb, void *privdata, struct timespec interval) {
|
||||||
|
RMUtilTimer *ret = malloc(sizeof(*ret));
|
||||||
|
*ret = (RMUtilTimer){
|
||||||
|
.privdata = privdata, .interval = interval, .cb = cb,
|
||||||
|
};
|
||||||
|
pthread_cond_init(&ret->cond, NULL);
|
||||||
|
pthread_mutex_init(&ret->lock, NULL);
|
||||||
|
|
||||||
|
pthread_create(&ret->thread, NULL, rmutilTimer_Loop, ret);
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
int RMUtilTimer_Stop(RMUtilTimer *t) {
|
||||||
|
int rc;
|
||||||
|
if (0 == (rc = pthread_cond_signal(&t->cond))) {
|
||||||
|
rc = pthread_join(t->thread, NULL);
|
||||||
|
}
|
||||||
|
return rc;
|
||||||
|
}
|
||||||
|
|
||||||
|
void RMUtilTimer_Free(RMUtilTimer *t) {
|
||||||
|
free(t);
|
||||||
|
}
|
@ -0,0 +1,27 @@
|
|||||||
|
#ifndef RMUTIL_PERIODIC_H_
|
||||||
|
#define RMUTIL_PERIODIC_H_
|
||||||
|
#include <time.h>
|
||||||
|
#include <redismodule.h>
|
||||||
|
|
||||||
|
/** periodic.h - Utility periodic timer running a task repeatedly every given time interval */
|
||||||
|
|
||||||
|
/* RMUtilTimer - opaque context for the timer */
|
||||||
|
struct RMUtilTimer;
|
||||||
|
|
||||||
|
/* RMutilTimerFunc - callback type for timer tasks. The ctx is a thread-safe redis module context
|
||||||
|
* that should be locked/unlocked by the callback when running stuff against redis. privdata is
|
||||||
|
* pre-existing private data */
|
||||||
|
typedef void (*RMutilTimerFunc)(RedisModuleCtx *ctx, void *privdata);
|
||||||
|
|
||||||
|
/* Create and start a new periodic timer. Each timer has its own thread and can only be run and
|
||||||
|
* stopped once. The timer runs `cb` every `interval` with `privdata` passed to the callback. */
|
||||||
|
struct RMUtilTimer *RMUtil_NewPeriodicTimer(RMutilTimerFunc cb, void *privdata,
|
||||||
|
struct timespec interval);
|
||||||
|
|
||||||
|
/* Stop the timer loop. This should return immediately and join the thread */
|
||||||
|
int RMUtilTimer_Stop(struct RMUtilTimer *t);
|
||||||
|
|
||||||
|
/* Free the timer context. The caller should be responsible for freeing the private data at this
|
||||||
|
* point */
|
||||||
|
void RMUtilTimer_Free(struct RMUtilTimer *t);
|
||||||
|
#endif
|
@ -0,0 +1,27 @@
|
|||||||
|
#include <stdio.h>
|
||||||
|
#include <redismodule.h>
|
||||||
|
#include <unistd.h>
|
||||||
|
#include "periodic.h"
|
||||||
|
#include "assert.h"
|
||||||
|
#include "test.h"
|
||||||
|
|
||||||
|
void timerCb(RedisModuleCtx *ctx, void *p) {
|
||||||
|
int *x = p;
|
||||||
|
(*x)++;
|
||||||
|
}
|
||||||
|
|
||||||
|
int testPeriodic() {
|
||||||
|
int x = 0;
|
||||||
|
struct RMUtilTimer *tm =
|
||||||
|
RMUtil_NewPeriodicTimer(timerCb, &x, (struct timespec){.tv_sec = 0, .tv_nsec = 10000000});
|
||||||
|
|
||||||
|
sleep(1);
|
||||||
|
|
||||||
|
ASSERT_EQUAL(0, RMUtilTimer_Stop(tm));
|
||||||
|
ASSERT(x > 0);
|
||||||
|
ASSERT(x <= 100);
|
||||||
|
RMUtilTimer_Free(tm);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_MAIN({ TESTFUNC(testPeriodic); });
|
Loading…
Reference in New Issue