#define RECONNECT_TIME 60*20 // when try reconnect to PS in case of error (in sec)
#define RECONNECT_TIME_QUICK 1 // time between feed requests
+#define REACTION_TIME 60*2 // when try reconnect to PS in case of new feeds (in sec)
+#define LAUNCH_TIME 2 // wait (for starting slaves) before requesting feeds
extern SOAP_NMAC struct Namespace jp__namespaces[],jpps__namespaces[];
static int reject(int);
static int disconn(int,struct timeval *,void *);
int data_init(void **data);
+#ifndef ONETIME_FEEDS
+int feed_loop_slave(void);
+#endif
static struct glite_srvbones_service stab = {
fprintf(stderr, "WARNING: %d slaves can be too low for %d feeds\n", conf->slaves, nfeeds);
}
glite_srvbones_set_param(GLITE_SBPARAM_SLAVES_COUNT, conf->slaves);
+#ifndef ONETIME_FEEDS
+ if (feed_loop_slave() < 0) {
+ fprintf(stderr, "forking feed_loop_slave failed!\n");
+ } else
+#endif
glite_srvbones_run(data_init,&stab,1 /* XXX: entries in stab */,debug);
glite_jpis_free_db(isctx);
}
+#ifndef ONETIME_FEEDS
+int feed_loop_slave(void) {
+ pid_t pid;
+ glite_jpis_context_t isctx;
+
+ if ( (pid = fork()) ) return pid;
+
+ glite_jpis_init_context(&isctx, ctx, conf);
+ if (glite_jpis_init_db(isctx) != 0) {
+ printf("[%d] %s: DB error: %s (%s)\n", getpid(), __FUNCTION__, ctx->error->desc, ctx->error->source);
+ exit(1);
+ }
+
+ printf("[%d] %s: waiting before feed requests...\n", getpid(), __FUNCTION__);
+ sleep(LAUNCH_TIME);
+ printf("[%d] %s: feeder slave started\n", getpid(), __FUNCTION__);
+ do {
+ switch (feed_caller(isctx, conf)) {
+ case 1: break;
+ case 0:
+ sleep(REACTION_TIME);
+ break;
+ default:
+ printf("[%d] %s: feed locking error, slave terminated\n", getpid(), __FUNCTION__);
+ exit(1);
+ }
+ } while (1);
+
+ printf("[%d] %s: slave terminated\n", getpid(), __FUNCTION__);
+ exit(0);
+}
+#endif
+
+
/* slave's init comes here */
int data_init(void **data)
{
printf("[%d] slave started\n",getpid());
private->soap = soap_new();
+#if ONETIME_FEEDS
/* ask PS server for data */
do {
switch (feed_caller(private->ctx, conf)) {
return -1;
}
} while (1);
+#else
+ *data = (void *) private;
+ return 0;
+#endif
}
void glite_jp_free_conf(glite_jp_is_conf *conf)
{
- size_t i, j, k;
+ size_t i, j;
glite_jp_is_feed *feed;
if (!conf) return;
if (conf->feeds) for (i = 0; conf->feeds[i]; i++) {
feed = conf->feeds[i];
free(feed->PS_URL);
- for (j = 0; feed->query[j]; j++) {
- for (k = 0; feed->query[j][k].attr; k++) glite_jp_free_query_rec(&feed->query[j][k]);
- free(feed->query[j]);
- }
+ for (j = 0; feed->query[j].attr; j++) glite_jp_free_query_rec(&feed->query[j]);
free(feed->query);
free(feed);
}
typedef struct _glite_jp_is_feed {
char *PS_URL; //URLs of Primary Storage servers
- glite_jp_query_rec_t **query; // query to Primary Server (aka filter)
+ glite_jp_query_rec_t *query; // query to Primary Server (aka filter)
int history, // type of query
continuous;
long int uniqueid; // internal ID
#define COND_MAGIC 0x444E4F43
-static int glite_jpis_db_queries_deserialize(glite_jp_query_rec_t ***queries, void *blob, size_t blob_size) UNUSED;
+static int glite_jpis_db_queries_deserialize(glite_jp_query_rec_t **queries, void *blob, size_t blob_size) UNUSED;
static int is_indexed(glite_jp_is_conf *conf, const char *attr) {
}
-static int glite_jpis_db_queries_serialize(glite_jpis_context_t isctx, void **blob, size_t *len, glite_jp_query_rec_t **queries) {
+static int glite_jpis_db_queries_serialize(glite_jpis_context_t isctx, void **blob, size_t *len, glite_jp_query_rec_t *queries) {
size_t maxlen;
glite_jp_query_rec_t *query;
int ret;
size_t datalen;
if ((ret = array_init(blob, len, &maxlen, 1024)) != 0) return ret;
- query = queries ? *queries : NULL;
+ query = queries;
while(query && query->attr) {
if ((ret = array_add_long(blob, len, &maxlen, COND_MAGIC)) != 0) goto fail;
datalen = strlen(query->attr) + 1;
if ((ret = array_add_long(blob, len, &maxlen, datalen)) != 0) goto fail;
if (datalen)
if ((ret = array_add(blob, len, &maxlen, query->value2, datalen)) != 0) goto fail;
-
+
query++;
}
}
-static int glite_jpis_db_queries_deserialize(glite_jp_query_rec_t ***queries, void *blob, size_t blob_size) {
+static int glite_jpis_db_queries_deserialize(glite_jp_query_rec_t **queries, void *blob, size_t blob_size) {
size_t maxlen, len, datalen;
void *blob_ptr, *blob_end;
int ret;
uint32_t l;
- glite_jp_query_rec_t *query;
+ glite_jp_query_rec_t query;
int i;
- if ((ret = array_init((void **)queries, &len, &maxlen, 512)) != 0) return ret;
+ if ((ret = array_init((void *)queries, &len, &maxlen, 512)) != 0) return ret;
blob_ptr = blob;
blob_end = (char *)blob + blob_size;
while (blob_end > blob_ptr) {
ret = ENOMEM;
- if ((query = calloc(sizeof(*query), 1)) == NULL) goto fail;
+ memset(&query, 0, sizeof query);
l = array_get_long(&blob_ptr);
if (l != COND_MAGIC) {
lprintf("blob=%p, blob_ptr=%p, 0x%08" PRIX32 "\n", blob, blob_ptr, l);
datalen = array_get_long(&blob_ptr);
if (datalen) {
- if ((query->attr = malloc(datalen)) == NULL) goto fail_query;
- memcpy(query->attr, array_get(&blob_ptr, datalen), datalen);
- } else query->attr = NULL;
+ if ((query.attr = malloc(datalen)) == NULL) goto fail_query;
+ memcpy(query.attr, array_get(&blob_ptr, datalen), datalen);
+ } else query.attr = NULL;
- query->op = array_get_long(&blob_ptr);
- query->binary = array_get_long(&blob_ptr);
+ query.op = array_get_long(&blob_ptr);
+ query.binary = array_get_long(&blob_ptr);
datalen = array_get_long(&blob_ptr);
if (datalen) {
- if ((query->value = malloc(datalen)) == NULL) goto fail_query;
- memcpy(query->value, array_get(&blob_ptr, datalen), datalen);
- } else query->value = NULL;
- query->size = datalen;
+ if ((query.value = malloc(datalen)) == NULL) goto fail_query;
+ memcpy(query.value, array_get(&blob_ptr, datalen), datalen);
+ } else query.value = NULL;
+ query.size = datalen;
datalen = array_get_long(&blob_ptr);
if (datalen) {
- if ((query->value2 = malloc(datalen)) == NULL) goto fail_query;
- memcpy(query->value2, array_get(&blob_ptr, datalen), datalen);
- } else query->value2 = NULL;
- query->size2 = datalen;
+ if ((query.value2 = malloc(datalen)) == NULL) goto fail_query;
+ memcpy(query.value2, array_get(&blob_ptr, datalen), datalen);
+ } else query.value2 = NULL;
+ query.size2 = datalen;
- if ((ret = array_add((void **)queries, &len, &maxlen, &query, sizeof(query))) != 0) goto fail_query;
+ if ((ret = array_add((void *)queries, &len, &maxlen, &query, sizeof(query))) != 0) goto fail_query;
}
assert(blob_end == blob_ptr);
- query = NULL;
- if ((ret = array_add((void **)queries, &len, &maxlen, &query, sizeof(query))) != 0) goto fail;
+ memset(&query, 0, sizeof query);
+ if ((ret = array_add((void *)queries, &len, &maxlen, &query, sizeof(query))) != 0) goto fail;
return 0;
fail_query:
- free(query);
fail:
i = 0;
- query = (*queries)[i];
- while (query && query->attr) {
- free(query->attr);
- free(query->value);
- free(query->value2);
- free(query);
+ while ((*queries)[i].attr) {
+ free((*queries)[i].attr);
+ free((*queries)[i].value);
+ free((*queries)[i].value2);
i++;
- query = (*queries)[i];
}
free(*queries);
return ret;
+#include <sys/time.h>
#include <stdio.h>
#include <fcntl.h>
#include <assert.h>
glite_gsplugin_Context plugin_ctx;
glite_jp_error_t err;
char *src, *desc = NULL;
+ // preventive very long timeout
+ static const struct timeval to = {tv_sec: 7200, tv_usec: 0};
lprintf("(%ld) for %s called\n", uniqueid, dest);
glite_gsplugin_init_context(&plugin_ctx);
+ glite_gsplugin_set_timeout(plugin_ctx, &to);
if (ctx->conf->server_key) plugin_ctx->key_filename = strdup(ctx->conf->server_key);
if (ctx->conf->server_cert) plugin_ctx->cert_filename = strdup(ctx->conf->server_cert);
if ((dest_index = find_dest_index(conf, uniqueid)) < 0) goto err;
- for (i=0; conf->feeds[dest_index]->query[i]; i++);
+ for (i=0; conf->feeds[dest_index]->query[i].attr; i++);
GLITE_SECURITY_GSOAP_LIST_CREATE(soap, &in, conditions, struct jptype__primaryQuery, i);
- for (i=0; conf->feeds[dest_index]->query[i]; i++) {
- if (glite_jpis_QueryCondToSoap(soap, conf->feeds[dest_index]->query[i],
+ for (i=0; conf->feeds[dest_index]->query[i].attr; i++) {
+ if (glite_jpis_QueryCondToSoap(soap, &conf->feeds[dest_index]->query[i],
GLITE_SECURITY_GSOAP_LIST_GET(in.conditions, i)) != SOAP_OK) {
err.code = EINVAL;
err.desc = "error during conds conversion";
}
+#if 0
static int SoapToQueryCond(
struct jptype__indexQuery *in,
glite_jp_query_rec_t **out)
return 0;
}
+#endif
static int SoapToPrimaryQueryCond(
struct jptype__primaryQuery *in,
- glite_jp_query_rec_t **out)
+ glite_jp_query_rec_t *out)
{
glite_jp_query_rec_t *qr;
assert(in && in->attr); assert(out);
- *out = qr = calloc(2, sizeof(*qr));
+ qr = out;
qr[0].attr = strdup(in->attr);
glite_jpis_SoapToQueryOp(in->op, &(qr[0].op));
default:
if ( SoapToQueryRecordVal(in->value, &(qr[0].binary),
&(qr[0].size), &(qr[0].value)) ) {
- *out = NULL;
return 1;
}
break;
int glite_jpis_SoapToPrimaryQueryConds(
int size,
GLITE_SECURITY_GSOAP_LIST_TYPE(jptype, primaryQuery) in,
- glite_jp_query_rec_t ***out)
+ glite_jp_query_rec_t **out)
{
- glite_jp_query_rec_t **qr;
+ glite_jp_query_rec_t *qr;
int i;
assert(in || !size); assert(out);
qr = calloc(size+1, sizeof(*qr));
for (i=0; i<size; i++) {
- if ( SoapToPrimaryQueryCond(GLITE_SECURITY_GSOAP_LIST_GET(in, i), &(qr[i])) ) {
+ if ( SoapToPrimaryQueryCond(GLITE_SECURITY_GSOAP_LIST_GET(in, i), &qr[i]) ) {
*out = NULL;
free(qr);
return 1;
void glite_jpis_SoapToQueryOp(const enum jptype__queryOp in, glite_jp_queryop_t *out);
void glite_jpis_SoapToAttrOrig(const enum jptype__attrOrig *in, glite_jp_attr_orig_t *out);
+#if 0
int glite_jpis_SoapToQueryConds(int size, struct jptype__indexQuery **in, glite_jp_query_rec_t ***out);
-int glite_jpis_SoapToPrimaryQueryConds(int size, GLITE_SECURITY_GSOAP_LIST_TYPE(jptype, primaryQuery) in, glite_jp_query_rec_t ***out);
+#endif
+int glite_jpis_SoapToPrimaryQueryConds(int size, GLITE_SECURITY_GSOAP_LIST_TYPE(jptype, primaryQuery) in, glite_jp_query_rec_t **out);
#endif