builtin-programs/collect.folk
When when the collected results for /clause/ are /resultsVar/ \
/body/ with environment /e/ {
Wish to collect results for $clause with settle 0ms
}
When when the collected results for /clause/ with settle /settle/ are /resultsVar/ \
/body/ with environment /e/ {
Wish to collect results for $clause with settle $settle
}
set cc [C]
$cc cflags -I.
$cc include <string.h>
$cc include <pthread.h>
$cc include "trie.h"
$cc include "db.h"
$cc code {
extern Db* db;
extern Clause* jimObjToClause(Jim_Interp* interp, Jim_Obj* obj);
extern Statement* HoldStatementGlobally(const char *key, double version,
Clause *clause, long keepMs,
const char *destructorCode,
const char *sourceFileName, int sourceLineNumber);
extern Statement* HoldStatementGloballyAcquiring(const char *key, double version,
Clause *clause, long keepMs,
const char *destructorCode,
const char *sourceFileName, int sourceLineNumber);
extern Clause* claimizeClause(Clause* clause);
typedef struct EnvironmentBinding {
char name[100];
Jim_Obj* value;
} EnvironmentBinding;
typedef struct Environment {
int nBindings;
EnvironmentBinding bindings[];
} Environment;
extern Environment* clauseUnify(Jim_Interp* interp, Clause* a, Clause* b);
typedef struct Collect {
char* _Atomic patternStr;
uint64_t _Atomic collectAtTime;
} Collect;
#define COLLECTS_MAX 1000
Collect collects[COLLECTS_MAX];
pthread_mutex_t collectsMutex;
static int64_t timestamp_get(clockid_t clk_id) {
// Returns timestamp in nanoseconds.
struct timespec ts;
if (clock_gettime(clk_id, &ts)) {
perror("can't even get the time :'-(");
}
return (int64_t)ts.tv_sec * 1000000000 + (int64_t)ts.tv_nsec;
}
char* makeCollectKey(Jim_Obj* patternObj) {
const char COLLECT[] = "collect ";
int collectLen = sizeof(COLLECT) - 1;
int keyLen = Jim_Length(patternObj);
char *collectKey = malloc(collectLen + keyLen + 1);
memcpy(collectKey, COLLECT, collectLen);
memcpy(collectKey + collectLen, Jim_String(patternObj), keyLen + 1);
return collectKey;
}
Term* jimObjToTerm(Jim_Obj* obj) {
int len; const char* s;
s = Jim_GetString(obj, &len);
return termNew(s, len);
}
int _Atomic version = 0;
}
$cc proc init {} void {
pthread_mutex_init(&collectsMutex, NULL);
}
$cc proc Recollect! {Jim_Obj* patternObj bool isAtomically} void {
// First, query for collect patterns.
Clause* collectorPattern = clauseNew(10);
collectorPattern->terms[0] = termNew("/someone/", -1);
collectorPattern->terms[1] = termNew("wishes", -1);
collectorPattern->terms[2] = termNew("to", -1);
collectorPattern->terms[3] = termNew("collect", -1);
collectorPattern->terms[4] = termNew("results", -1);
collectorPattern->terms[5] = termNew("for", -1);
collectorPattern->terms[6] = jimObjToTerm(patternObj);
collectorPattern->terms[7] = termNew("with", -1);
collectorPattern->terms[8] = termNew("settle", -1);
collectorPattern->terms[9] = termNew("/settle/", -1);
char* collectKey = makeCollectKey(patternObj);
ResultSet* collectorsSet = dbQuery(db, collectorPattern);
if (collectorsSet->nResults == 0) {
Clause* emptyClause = clauseNew(0);
HoldStatementGlobally(collectKey, version++,
emptyClause, 0, NULL, NULL, 0);
} else {
Clause* pattern = jimObjToClause(interp, patternObj);
ResultSet* rs = dbQuery(db, pattern);
size_t unclaimizedNResults = rs->nResults;
Clause* claimizedPattern = claimizeClause(pattern);
if (claimizedPattern != NULL) {
ResultSet* rs1 = dbQuery(db, claimizedPattern);
rs = realloc(rs, SIZEOF_RESULTSET(rs->nResults + rs1->nResults));
for (size_t i = 0; i < rs1->nResults; i++) {
rs->results[rs->nResults + i] = rs1->results[i];
}
rs->nResults += rs1->nResults;
free(rs1);
}
// Now acquire all the statements in rs, unify each statement's
// clause with the pattern, and build up a results Jim object.
Statement** resultStmts = malloc(sizeof(Statement*) * rs->nResults);
int resultStmtsCount = 0;
Jim_Obj* resultsObj = Jim_NewListObj(interp, NULL, 0);
for (size_t i = 0; i < rs->nResults; i++) {
Statement* result = statementAcquire(db, rs->results[i]);
if (result == NULL) { continue; }
// If `isAtomically` is on, then throw away any
// statement that has an AtomicallyVersion _and_ that
// AtomicallyVersion isn't converged yet.
if (isAtomically &&
statementAtomicallyVersion(result) != NULL &&
!dbAtomicallyVersionHasConverged(statementAtomicallyVersion(result))) {
// fprintf(stderr, "DISCARD %.100s\n",
// clauseToString(statementClause(result)));
statementRelease(db, result);
continue;
}
resultStmts[resultStmtsCount++] = result;
Environment* env = clauseUnify(interp,
i >= unclaimizedNResults ? claimizedPattern : pattern,
statementClause(result));
Jim_Obj* envDict[env->nBindings * 2];
for (int j = 0; j < env->nBindings; j++) {
envDict[j*2] = Jim_NewStringObj(interp, env->bindings[j].name, -1);
envDict[j*2+1] = env->bindings[j].value;
}
Jim_Obj *resultObj = Jim_NewDictObj(interp, envDict, env->nBindings * 2);
Jim_ListAppendElement(interp, resultsObj, resultObj);
free(env);
}
if (claimizedPattern != NULL) {
free(claimizedPattern);
}
clauseFree(pattern);
free(rs);
// Note that at this point, we've still acquired all the
// statements in rs. We need to hang onto them to inherit
// their destructors.
Clause* collectedClause = clauseNew(9);
collectedClause->terms[0] = termNew("builtin-programs/collect.folk", -1);
collectedClause->terms[1] = termNew("claims", -1);
collectedClause->terms[2] = termNew("the", -1);
collectedClause->terms[3] = termNew("collected", -1);
collectedClause->terms[4] = termNew("results", -1);
collectedClause->terms[5] = termNew("for", -1);
collectedClause->terms[6] = jimObjToTerm(patternObj);
collectedClause->terms[7] = termNew("are", -1);
collectedClause->terms[8] = jimObjToTerm(resultsObj);
Jim_DecrRefCount(interp, resultsObj);
Statement* stmt =
HoldStatementGloballyAcquiring(collectKey, version++,
collectedClause, 0, NULL,
NULL, 0);
// Now inherit the destructors of all the statements in rs
// into the new collection statement, and release all the
// statements in rs.
for (int i = 0; i < resultStmtsCount; i++) {
if (stmt != NULL) {
statementInheritDestructors(stmt, resultStmts[i]);
}
// FIXME: what to do if stmt is NULL?
statementRelease(db, resultStmts[i]);
}
free(resultStmts);
if (stmt != NULL) {
dbInflightDecr(db, stmt);
statementRelease(db, stmt);
}
}
free(collectKey);
clauseFree(collectorPattern);
free(collectorsSet);
}
# If the recollect doesn't have a settle time, we should recollect
# immediately. If the recollect has a settle time, then we should bump
# its next occurrence forward by the settle time.
$cc proc ScheduleRecollect! {Jim_Obj* patternObj uint64_t settle} void {
if (settle == 0) {
Recollect_(patternObj, false);
return;
}
uint64_t now = timestamp_get(CLOCK_MONOTONIC);
char* patternStr = strdup(Jim_String(patternObj));
pthread_mutex_lock(&collectsMutex);
int i;
for (i = 0; i < COLLECTS_MAX; i++) {
if (collects[i].patternStr != NULL &&
strcmp(collects[i].patternStr, patternStr) == 0) {
collects[i].collectAtTime = now + settle;
free(patternStr);
break;
}
}
if (i < COLLECTS_MAX) { goto done; }
for (i = 0; i < COLLECTS_MAX; i++) {
if (collects[i].patternStr == NULL) {
collects[i].patternStr = patternStr;
collects[i].collectAtTime = now + settle;
break;
}
}
FOLK_ENSURE(i < COLLECTS_MAX);
done:
pthread_mutex_unlock(&collectsMutex);
}
$cc proc RunScheduledRecollects! {} void {
uint64_t now = timestamp_get(CLOCK_MONOTONIC);
pthread_mutex_lock(&collectsMutex);
for (int i = 0; i < COLLECTS_MAX; i++) {
if (collects[i].patternStr != NULL &&
collects[i].collectAtTime <= now) {
Recollect_(Jim_NewStringObj(interp, collects[i].patternStr, -1), true);
free(collects[i].patternStr);
collects[i].patternStr = NULL;
collects[i].collectAtTime = 0;
}
}
pthread_mutex_unlock(&collectsMutex);
}
set collectLib [$cc compile]
$collectLib init
# FIXME: If there's not settlement for a while, we should just take
# whatever we can get.
# FIXME: If we're just booting now, we should sample whatever's in
# place already.
When /someone/ wishes to collect results for /pattern/ with settle /settle/ {
if {[string match {*ms} $settle]} {
set settleMs [string range $settle 0 end-2]
set settleNs [* $settleMs 1000000]
}
When {*}$pattern {
$collectLib ScheduleRecollect! $pattern $settleNs
On unmatch [list $collectLib ScheduleRecollect! $pattern $settleNs]
}
$collectLib ScheduleRecollect! $pattern $settleNs
On unmatch [list $collectLib ScheduleRecollect! $pattern $settleNs]
}
When the internal time is /t/ {
$collectLib RunScheduledRecollects!
}