Last Value Queues are useful youUser Documentation are only interested in the latest value entered into a queue. LVQ semantics are typically used for things like stock symbol updates when all you care about is the latest value for example.
Qpid C++ M4 or later supports two types of LVQ semantics:
LVQ
LVQ_NO_BROWSE
LVQ uses a header for a key, if the key matches it replaces the message in-place in the queue except a.) if the message with the matching key has been acquired b.) if the message with the matching key has been browsed In these two cases the message is placed into the queue in FIFO, if another message with the same key is received it will the 'un-accessed' message with the same key will be replaced
These two exceptions protect the consumer from missing the last update where a consumer or browser accesses a message and an update comes with the same key.
An example
[localhost tests]$ ./lvqtest --mode create_lvq [localhost tests]$ ./lvqtest --mode write Sending Data: key1=key1.0x7fffdf3f3180 Sending Data: key2=key2.0x7fffdf3f3180 Sending Data: key3=key3.0x7fffdf3f3180 Sending Data: key1=key1.0x7fffdf3f3180 Sending Data: last=last [localhost tests]$ ./lvqtest --mode browse Receiving Data:key1.0x7fffdf3f3180 Receiving Data:key2.0x7fffdf3f3180 Receiving Data:key3.0x7fffdf3f3180 Receiving Data:last [localhost tests]$ ./lvqtest --mode write Sending Data: key1=key1.0x7fffe4c7fa10 Sending Data: key2=key2.0x7fffe4c7fa10 Sending Data: key3=key3.0x7fffe4c7fa10 Sending Data: key1=key1.0x7fffe4c7fa10 Sending Data: last=last [localhost tests]$ ./lvqtest --mode browse Receiving Data:key1.0x7fffe4c7fa10 Receiving Data:key2.0x7fffe4c7fa10 Receiving Data:key3.0x7fffe4c7fa10 Receiving Data:last [localhost tests]$ ./lvqtest --mode consume Receiving Data:key1.0x7fffdf3f3180 Receiving Data:key2.0x7fffdf3f3180 Receiving Data:key3.0x7fffdf3f3180 Receiving Data:last Receiving Data:key1.0x7fffe4c7fa10 Receiving Data:key2.0x7fffe4c7fa10 Receiving Data:key3.0x7fffe4c7fa10 Receiving Data:last
LVQ uses a header for a key, if the key matches it replaces the message in-place in the queue except a.) if the message with the matching key has been acquired In these two cases the message is placed into the queue in FIFO, if another message with the same key is received it will the 'un-accessed' message with the same key will be replaced
Note, in this case browsed messaged are not invalidated, so updates can be missed.
An example
[localhost tests]$ ./lvqtest --mode create_lvq_no_browse [localhost tests]$ ./lvqtest --mode write Sending Data: key1=key1.0x7fffce5fb390 Sending Data: key2=key2.0x7fffce5fb390 Sending Data: key3=key3.0x7fffce5fb390 Sending Data: key1=key1.0x7fffce5fb390 Sending Data: last=last [localhost tests]$ ./lvqtest --mode write Sending Data: key1=key1.0x7fff346ae440 Sending Data: key2=key2.0x7fff346ae440 Sending Data: key3=key3.0x7fff346ae440 Sending Data: key1=key1.0x7fff346ae440 Sending Data: last=last [localhost tests]$ ./lvqtest --mode browse Receiving Data:key1.0x7fff346ae440 Receiving Data:key2.0x7fff346ae440 Receiving Data:key3.0x7fff346ae440 Receiving Data:last [localhost tests]$ ./lvqtest --mode browse Receiving Data:key1.0x7fff346ae440 Receiving Data:key2.0x7fff346ae440 Receiving Data:key3.0x7fff346ae440 Receiving Data:last [localhost tests]$ ./lvqtest --mode write Sending Data: key1=key1.0x7fff606583e0 Sending Data: key2=key2.0x7fff606583e0 Sending Data: key3=key3.0x7fff606583e0 Sending Data: key1=key1.0x7fff606583e0 Sending Data: last=last [localhost tests]$ ./lvqtest --mode consume Receiving Data:key1.0x7fff606583e0 Receiving Data:key2.0x7fff606583e0 Receiving Data:key3.0x7fff606583e0 Receiving Data:last [localhost tests]$
/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #include <qpid/client/AsyncSession.h> #include <qpid/client/Connection.h> #include <qpid/client/SubscriptionManager.h> #include <qpid/client/Session.h> #include <qpid/client/Message.h> #include <qpid/client/MessageListener.h> #include <qpid/client/QueueOptions.h> #include <iostream> using namespace qpid::client; using namespace qpid::framing; using namespace qpid::sys; using namespace qpid; using namespace std; enum Mode { CREATE_LVQ, CREATE_LVQ_NO_BROWSE, WRITE, BROWSE, CONSUME}; const char* modeNames[] = { "create_lvq","create_lvq_no_browse","write","browse","consume" }; // istream/ostream ops so Options can read/display Mode. istream& operator>>(istream& in, Mode& mode) { string s; in >> s; int i = find(modeNames, modeNames+5, s) - modeNames; if (i >= 5) throw Exception("Invalid mode: "+s); mode = Mode(i); return in; } ostream& operator<<(ostream& out, Mode mode) { return out << modeNames[mode]; } struct Args : public qpid::Options, public qpid::client::ConnectionSettings { bool help; Mode mode; Args() : qpid::Options("Simple latency test optins"), help(false), mode(BROWSE) { using namespace qpid; addOptions() ("help", optValue(help), "Print this usage statement") ("broker,b", optValue(host, "HOST"), "Broker host to connect to") ("port,p", optValue(port, "PORT"), "Broker port to connect to") ("username", optValue(username, "USER"), "user name for broker log in.") ("password", optValue(password, "PASSWORD"), "password for broker log in.") ("mechanism", optValue(mechanism, "MECH"), "SASL mechanism to use when authenticating.") ("tcp-nodelay", optValue(tcpNoDelay), "Turn on tcp-nodelay") ("mode", optValue(mode, "'see below'"), "Action mode." "\ncreate_lvq: create a new queue of type lvq.\n" "\ncreate_lvq_no_browse: create a new queue of type lvq with no lvq on browse.\n" "\nwrite: write a bunch of data & keys.\n" "\nbrowse: browse the queue.\n" "\nconsume: consume from the queue.\n"); } }; class Listener : public MessageListener { private: Session session; SubscriptionManager subscriptions; std::string queue; Message request; QueueOptions args; public: Listener(Session& session); void setup(bool browse); void send(std::string kv); void received(Message& message); void browse(); void consume(); }; Listener::Listener(Session& s) : session(s), subscriptions(s), queue("LVQtester") {} void Listener::setup(bool browse) { // set queue mode args.setOrdering(browse?LVQ_NO_BROWSE:LVQ); session.queueDeclare(arg::queue=queue, arg::exclusive=false, arg::autoDelete=false, arg::arguments=args); } void Listener::browse() { subscriptions.subscribe(*this, queue, SubscriptionSettings(FlowControl::unlimited(), ACCEPT_MODE_NONE, ACQUIRE_MODE_NOT_ACQUIRED)); subscriptions.run(); } void Listener::consume() { subscriptions.subscribe(*this, queue, SubscriptionSettings(FlowControl::unlimited(), ACCEPT_MODE_NONE, ACQUIRE_MODE_PRE_ACQUIRED)); subscriptions.run(); } void Listener::send(std::string kv) { request.getDeliveryProperties().setRoutingKey(queue); std::string key; args.getLVQKey(key); request.getHeaders().setString(key, kv); std::ostringstream data; data << kv; if (kv != "last") data << "." << hex << this; request.setData(data.str()); cout << "Sending Data: " << kv << "=" << data.str() << std::endl; async(session).messageTransfer(arg::content=request); } void Listener::received(Message& response) { cout << "Receiving Data:" << response.getData() << std::endl; /* if (response.getData() == "last"){ subscriptions.cancel(queue); } */ } int main(int argc, char** argv) { Args opts; opts.parse(argc, argv); if (opts.help) { std::cout << opts << std::endl; return 0; } Connection connection; try { connection.open(opts); Session session = connection.newSession(); Listener listener(session); switch (opts.mode) { case CONSUME: listener.consume(); break; case BROWSE: listener.browse(); break; case CREATE_LVQ: listener.setup(false); break; case CREATE_LVQ_NO_BROWSE: listener.setup(true); break; case WRITE: listener.send("key1"); listener.send("key2"); listener.send("key3"); listener.send("key1"); listener.send("last"); break; } connection.close(); return 0; } catch(const std::exception& error) { std::cout << error.what() << std::endl; } return 1; }