builtin-programs/collect.folk

When when the collected results for /clause/ are /resultsVar/ \
          /body/ with environment /e/ {
    Wish to collect results for $clause with settle 0ms
}
When when the collected results for /clause/ with settle /settle/ are /resultsVar/ \
          /body/ with environment /e/ {
    Wish to collect results for $clause with settle $settle
}

set cc [C]
$cc cflags -I.
$cc include <string.h>
$cc include <pthread.h>
$cc include "trie.h"
$cc include "db.h"
$cc code {
    extern Db* db;
    extern Clause* jimObjToClause(Jim_Interp* interp, Jim_Obj* obj);
    extern Statement* HoldStatementGlobally(const char *key, double version,
                                            Clause *clause, long keepMs,
                                            const char *destructorCode,
                                            const char *sourceFileName, int sourceLineNumber);
    extern Statement* HoldStatementGloballyAcquiring(const char *key, double version,
                                                     Clause *clause, long keepMs,
                                                     const char *destructorCode,
                                                     const char *sourceFileName, int sourceLineNumber);
    extern Clause* claimizeClause(Clause* clause);

    typedef struct EnvironmentBinding {
        char name[100];
        Jim_Obj* value;
    } EnvironmentBinding;
    typedef struct Environment {
        int nBindings;
        EnvironmentBinding bindings[];
    } Environment;
    extern Environment* clauseUnify(Jim_Interp* interp, Clause* a, Clause* b);

    typedef struct Collect {
        char* _Atomic patternStr;
        uint64_t _Atomic collectAtTime;
    } Collect;

    #define COLLECTS_MAX 1000
    Collect collects[COLLECTS_MAX];
    pthread_mutex_t collectsMutex;

    static int64_t timestamp_get(clockid_t clk_id) {
        // Returns timestamp in nanoseconds.
        struct timespec ts;
        if (clock_gettime(clk_id, &ts)) {
            perror("can't even get the time :'-(");
        }
        return (int64_t)ts.tv_sec * 1000000000 + (int64_t)ts.tv_nsec;
    }

    char* makeCollectKey(Jim_Obj* patternObj) {
        const char COLLECT[] = "collect ";
        int collectLen = sizeof(COLLECT) - 1;

        int keyLen = Jim_Length(patternObj);
        char *collectKey = malloc(collectLen + keyLen + 1);
        memcpy(collectKey, COLLECT, collectLen);
        memcpy(collectKey + collectLen, Jim_String(patternObj), keyLen + 1);
        return collectKey;
    }

    Term* jimObjToTerm(Jim_Obj* obj) {
        int len; const char* s;
        s = Jim_GetString(obj, &len);
        return termNew(s, len);
    }

    int _Atomic version = 0;
}

$cc proc init {} void {
    pthread_mutex_init(&collectsMutex, NULL);
}
$cc proc Recollect! {Jim_Obj* patternObj bool isAtomically} void {
    // First, query for collect patterns.
    Clause* collectorPattern = clauseNew(10);
    collectorPattern->terms[0] = termNew("/someone/", -1);
    collectorPattern->terms[1] = termNew("wishes", -1);
    collectorPattern->terms[2] = termNew("to", -1);
    collectorPattern->terms[3] = termNew("collect", -1);
    collectorPattern->terms[4] = termNew("results", -1);
    collectorPattern->terms[5] = termNew("for", -1);
    collectorPattern->terms[6] = jimObjToTerm(patternObj);
    collectorPattern->terms[7] = termNew("with", -1);
    collectorPattern->terms[8] = termNew("settle", -1);
    collectorPattern->terms[9] = termNew("/settle/", -1);

    char* collectKey = makeCollectKey(patternObj);

    ResultSet* collectorsSet = dbQuery(db, collectorPattern);
    if (collectorsSet->nResults == 0) {
        Clause* emptyClause = clauseNew(0);
        HoldStatementGlobally(collectKey, version++,
                              emptyClause, 0, NULL, NULL, 0);
    } else {
        Clause* pattern = jimObjToClause(interp, patternObj);

        ResultSet* rs = dbQuery(db, pattern);
        size_t unclaimizedNResults = rs->nResults;

        Clause* claimizedPattern = claimizeClause(pattern);
        if (claimizedPattern != NULL) {
            ResultSet* rs1 = dbQuery(db, claimizedPattern);

            rs = realloc(rs, SIZEOF_RESULTSET(rs->nResults + rs1->nResults));
            for (size_t i = 0; i < rs1->nResults; i++) {
                rs->results[rs->nResults + i] = rs1->results[i];
            }
            rs->nResults += rs1->nResults;
            free(rs1);
        }

        // Now acquire all the statements in rs, unify each statement's
        // clause with the pattern, and build up a results Jim object.

        Statement** resultStmts = malloc(sizeof(Statement*) * rs->nResults);
        int resultStmtsCount = 0;

        Jim_Obj* resultsObj = Jim_NewListObj(interp, NULL, 0);
        for (size_t i = 0; i < rs->nResults; i++) {
            Statement* result = statementAcquire(db, rs->results[i]);
            if (result == NULL) { continue; }

            // If `isAtomically` is on, then throw away any
            // statement that has an AtomicallyVersion _and_ that
            // AtomicallyVersion isn't converged yet.
            if (isAtomically &&
                statementAtomicallyVersion(result) != NULL &&
                !dbAtomicallyVersionHasConverged(statementAtomicallyVersion(result))) {

                // fprintf(stderr, "DISCARD %.100s\n",
                //   clauseToString(statementClause(result)));
                statementRelease(db, result);
                continue;
            }

            resultStmts[resultStmtsCount++] = result;

            Environment* env = clauseUnify(interp,
                                           i >= unclaimizedNResults ? claimizedPattern : pattern,
                                           statementClause(result));
            Jim_Obj* envDict[env->nBindings * 2];
            for (int j = 0; j < env->nBindings; j++) {
                envDict[j*2] = Jim_NewStringObj(interp, env->bindings[j].name, -1);
                envDict[j*2+1] = env->bindings[j].value;
            }

            Jim_Obj *resultObj = Jim_NewDictObj(interp, envDict, env->nBindings * 2);
            Jim_ListAppendElement(interp, resultsObj, resultObj);

            free(env);
        }
        if (claimizedPattern != NULL) {
            free(claimizedPattern);
        }
        clauseFree(pattern);
        free(rs);

        // Note that at this point, we've still acquired all the
        // statements in rs. We need to hang onto them to inherit
        // their destructors.

        Clause* collectedClause = clauseNew(9);
        collectedClause->terms[0] = termNew("builtin-programs/collect.folk", -1);
        collectedClause->terms[1] = termNew("claims", -1);
        collectedClause->terms[2] = termNew("the", -1);
        collectedClause->terms[3] = termNew("collected", -1);
        collectedClause->terms[4] = termNew("results", -1);
        collectedClause->terms[5] = termNew("for", -1);
        collectedClause->terms[6] = jimObjToTerm(patternObj);
        collectedClause->terms[7] = termNew("are", -1);
        collectedClause->terms[8] = jimObjToTerm(resultsObj);
        Jim_DecrRefCount(interp, resultsObj);

        Statement* stmt =
            HoldStatementGloballyAcquiring(collectKey, version++,
                                           collectedClause, 0, NULL,
                                           NULL, 0);

        // Now inherit the destructors of all the statements in rs
        // into the new collection statement, and release all the
        // statements in rs.
        for (int i = 0; i < resultStmtsCount; i++) {
            if (stmt != NULL) {
                statementInheritDestructors(stmt, resultStmts[i]);
            }
            // FIXME: what to do if stmt is NULL?

            statementRelease(db, resultStmts[i]);
        }
        free(resultStmts);

        if (stmt != NULL) {
            dbInflightDecr(db, stmt);
            statementRelease(db, stmt);
        }
    }

    free(collectKey);
    clauseFree(collectorPattern);
    free(collectorsSet);
}
# If the recollect doesn't have a settle time, we should recollect
# immediately. If the recollect has a settle time, then we should bump
# its next occurrence forward by the settle time.
$cc proc ScheduleRecollect! {Jim_Obj* patternObj uint64_t settle} void {
    if (settle == 0) {
        Recollect_(patternObj, false);
        return;
    }

    uint64_t now = timestamp_get(CLOCK_MONOTONIC);
    char* patternStr = strdup(Jim_String(patternObj));

    pthread_mutex_lock(&collectsMutex);

    int i;
    for (i = 0; i < COLLECTS_MAX; i++) {
        if (collects[i].patternStr != NULL &&
            strcmp(collects[i].patternStr, patternStr) == 0) {

            collects[i].collectAtTime = now + settle;
            free(patternStr);
            break;
        }
    }
    if (i < COLLECTS_MAX) { goto done; }

    for (i = 0; i < COLLECTS_MAX; i++) {
        if (collects[i].patternStr == NULL) {
            collects[i].patternStr = patternStr;
            collects[i].collectAtTime = now + settle;
            break;
        }
    }
    FOLK_ENSURE(i < COLLECTS_MAX);

 done:
    pthread_mutex_unlock(&collectsMutex);
}
$cc proc RunScheduledRecollects! {} void {
    uint64_t now = timestamp_get(CLOCK_MONOTONIC);

    pthread_mutex_lock(&collectsMutex);
    for (int i = 0; i < COLLECTS_MAX; i++) {
        if (collects[i].patternStr != NULL &&
            collects[i].collectAtTime <= now) {

            Recollect_(Jim_NewStringObj(interp, collects[i].patternStr, -1), true);

            free(collects[i].patternStr);
            collects[i].patternStr = NULL;
            collects[i].collectAtTime = 0;
        }
    }
    pthread_mutex_unlock(&collectsMutex);
}
set collectLib [$cc compile]
$collectLib init

# FIXME: If there's not settlement for a while, we should just take
# whatever we can get.

# FIXME: If we're just booting now, we should sample whatever's in
# place already.

When /someone/ wishes to collect results for /pattern/ with settle /settle/ {
    if {[string match {*ms} $settle]} {
        set settleMs [string range $settle 0 end-2]
        set settleNs [* $settleMs 1000000]
    }
    When {*}$pattern {
        $collectLib ScheduleRecollect! $pattern $settleNs
        On unmatch [list $collectLib ScheduleRecollect! $pattern $settleNs]
    }
    $collectLib ScheduleRecollect! $pattern $settleNs
    On unmatch [list $collectLib ScheduleRecollect! $pattern $settleNs]
}

When the internal time is /t/ {
    $collectLib RunScheduledRecollects!
}