interface change due to the parallel delivery implementation
authorMichal Voců <michal@ruk.cuni.cz>
Tue, 26 Oct 2010 07:01:14 +0000 (07:01 +0000)
committerMichal Voců <michal@ruk.cuni.cz>
Tue, 26 Oct 2010 07:01:14 +0000 (07:01 +0000)
org.glite.lb.logger-msg/src/activemq_cpp_plugin.cpp

index 8bab957..5daa51f 100644 (file)
@@ -289,7 +289,7 @@ OutputPlugin::initialize(const std::string &brokerURI)
 
 extern "C"
 int
-event_queue_connect(struct event_queue *eq)
+event_queue_connect(struct event_queue *eq, struct queue_thread *me)
 {
        OutputPlugin *output;
        std::string topicName(eq->dest_name);
@@ -310,17 +310,17 @@ event_queue_connect(struct event_queue *eq)
                output->connect(topicName);
        } catch(cms::CMSException &e) {
                set_error(IL_DL, 0, (char*)e.what());
-               eq->timeout = TIMEOUT;
+               me->timeout = TIMEOUT;
                return 0;
        }
-       eq->first_event_sent = 0;
+       me->first_event_sent = 0;
        return 1;
 }
 
 
 extern "C"
 int
-event_queue_send(struct event_queue *eq)
+event_queue_send(struct event_queue *eq, struct queue_thread *me)
 {
        OutputPlugin *output = (OutputPlugin*)eq->plugin_data;
        edg_wll_Context context;
@@ -338,8 +338,8 @@ event_queue_send(struct event_queue *eq)
            struct server_msg *msg;
            cms::Message *cms_msg;
 
-           if(event_queue_get(eq, &msg) < 0) {
-               goto err;
+           if(event_queue_get(eq, me, &msg) == 0) {
+                   break;
            }
 
            glite_common_log(IL_LOG_CATEGORY, LOG_PRIORITY_DEBUG, 
@@ -388,13 +388,13 @@ event_queue_send(struct event_queue *eq)
                    if(cms_msg) {
                            delete cms_msg;
                    }
-                   eq->timeout = TIMEOUT;
+                   me->timeout = TIMEOUT;
                    edg_wll_FreeContext(context);
                    set_error(IL_DL, 0, (char*)e.what());
                    return 0;
            }
-           event_queue_remove(eq);
-           eq->first_event_sent = 1;
+           event_queue_remove(eq, me);
+           me->first_event_sent = 1;
        }
        edg_wll_FreeContext(context);
        return 1;
@@ -418,7 +418,7 @@ err:
 
 extern "C"
 int
-event_queue_close(struct event_queue *eq)
+event_queue_close(struct event_queue *eq, struct queue_thread *me)
 {
        OutputPlugin *output = (OutputPlugin*)eq->plugin_data;
 
@@ -433,7 +433,7 @@ event_queue_close(struct event_queue *eq)
                set_error(IL_DL, 0, (char*)e.what());
                return -1;
        }
-       eq->first_event_sent = 0;
+       me->first_event_sent = 0;
        return 0;
 }