/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ using System; using System.Collections.Generic; using System.IO ; using System.Reflection ; using System.Threading ; using log4net ; using org.apache.qpid.client ; using org.apache.qpid.transport.util; using org.apache.qpid.transport.codec ; namespace org.apache.qpid.console { /** * Main Interaction point for interaction with the bus. Can be used to locate objects * on the bus, and invoke messages on them. */ public class Session { public static ILog log = LogManager.GetLogger(typeof(Session)) ; public static int CONTEXT_SYNC = 1 ; public static int CONTEXT_STARTUP = 2 ; public static int CONTEXT_MULTIGET = 3 ; public static int DEFAULT_GET_WAIT_TIME = 60000 ; public bool RecieveObjects = true ; public bool RecieveEvents = true ; public bool RecieveHeartbeat = true ; public bool UserBindings = false ; public Console Console ; protected Dictionary> Packages = new Dictionary>() ; protected List Brokers = new List() ; protected SequenceManager SequenceManager = new SequenceManager() ; protected object LockObject = new Object(); protected List SyncSequenceList = new List() ; protected List GetResult ; protected Object SyncResult ; public Session() { } public Session(Console console) { Console = console ; } public void AddBroker(string url) { BrokerURL brokerurl = GetBrokerURL(url) ; Broker broker = new Broker(this, brokerurl) ; Brokers.Add(broker) ; Dictionary args = new Dictionary() ; args.Add("_class", "agent") ; args.Add("_broker", broker) ; this.GetObjects(args) ; } public void RemoveBroker(Broker broker) { if (Brokers.Contains(broker)) { Brokers.Remove(broker) ; } broker.Shutdown() ; } public void Close() { foreach (Broker broker in Brokers.ToArray()) { this.RemoveBroker(broker) ; } } protected BrokerURL GetBrokerURL(string url) { return new BrokerURL(url) ; } public List GetObjects(Dictionary args) { List brokerList = null ; List agentList = new List() ; if (args.ContainsKey("_broker")) { brokerList = new List() ; brokerList.Add((Broker)args["_broker"]) ; } else { brokerList = this.Brokers ; } foreach (Broker broker in brokerList) { broker.WaitForStable() ; } if (args.ContainsKey("_agent")) { Agent agent = (Agent)args["_agent"] ; if (brokerList.Contains(agent.Broker)) { agentList.Add(agent) ; } else { throw new Exception("Agent is not managed by this console or the supplied broker") ; } } else { if (args.ContainsKey("_objectId")) { ObjectID oid = (ObjectID) args["_objectId"] ; foreach (Broker broker in Brokers) { foreach (Agent agent in broker.Agents.Values) { if ((agent.AgentBank == oid.AgentBank()) && (agent.BrokerBank == oid.BrokerBank())) { agentList.Add(agent) ; } } } } else { foreach (Broker broker in brokerList) { foreach (Agent agent in broker.Agents.Values) { if (agent.Broker.IsConnected()) { agentList.Add(agent) ; } } } } } GetResult = new List() ; if (agentList.Count > 0) { //FIXME Add a bunch of other suff too foreach (Agent agent in agentList) { Dictionary getParameters = new Dictionary() ; Broker broker = agent.Broker ; long seq = -1 ; lock(LockObject) { seq = SequenceManager.Reserve(Session.CONTEXT_MULTIGET) ; SyncSequenceList.Add(seq) ; } object packageName = null ; object className = null ; object key = null ; object sClass = null ; object oid = null ; object hash = null ; args.TryGetValue("_schema", out sClass) ; args.TryGetValue("_key", out key) ; args.TryGetValue("_class", out className) ; args.TryGetValue("_package", out packageName) ; args.TryGetValue("_objectID", out oid) ; args.TryGetValue("_hash", out hash) ; if ((className == null) && (oid == null) && (oid == null)) { throw new Exception("No class supplied, use '_schema', '_key', '_class', or '_objectId' argument") ; } if (oid != null) { getParameters.Add("_objectID", oid) ; } else { if (sClass != null) { key = key ?? ((SchemaClass)sClass).Key ; } if (key != null) { ClassKey cKey = (ClassKey)key ; className = className ?? cKey.ClassName ; packageName = packageName ?? cKey.PackageName ; hash = hash ?? cKey.Hash ; } if (packageName != null) { getParameters.Add("_package", packageName) ; } if (className != null) { getParameters.Add("_class", className) ; } if (hash != null) { getParameters.Add("_hash", hash) ; } foreach (KeyValuePair pair in args) { if (!pair.Key.StartsWith("_")) { getParameters.Add(pair.Key, pair.Value) ; } } } IEncoder enc = broker.CreateEncoder('G', seq) ; enc.WriteMap(getParameters) ; string routingKey = String.Format("agent.{0}.{1}", agent.BrokerBank, agent.AgentBank) ; Message msg = broker.CreateMessage(enc, routingKey) ; log.Debug("Get Object Keys: ") ; foreach (string pKey in getParameters.Keys) { log.Debug(String.Format("\tKey: '{0}' Value: '{1}'", pKey, getParameters[pKey])) ; } broker.Send(msg) ; } int waittime = DEFAULT_GET_WAIT_TIME ; bool timeout = false ; if (args.ContainsKey("_timeout")) { waittime = (int) args["_timeout"] ; } DateTime start = DateTime.Now ; lock (LockObject) { // FIXME ERROR while (SyncSequenceList.Count > 0){ Monitor.Wait(LockObject,waittime) ; TimeSpan duration = DateTime.Now - start; if (duration.TotalMilliseconds > waittime) { foreach (long pendingSeq in SyncSequenceList) { SequenceManager.Release(pendingSeq) ; } SyncSequenceList.Clear() ; timeout = true ; } } } //FIXME Add the error logic if ((GetResult.Count == 0) && timeout) { throw new Exception ("Get Request timed out") ; } } return GetResult ; } public List GetPackages() { this.WaitForStable() ; List returnValue = new List() ; foreach (String name in Packages.Keys) { returnValue.Add(name) ; } return returnValue ; } public List GetClasses(string packageName) { List returnValue = new List() ; this.WaitForStable() ; if (Packages.ContainsKey(packageName)) { foreach (SchemaClass sClass in Packages[packageName].Values) { returnValue.Add(sClass.Key) ; } } return returnValue ; } public SchemaClass GetSchema(ClassKey key) { return GetSchema(key, true) ; } protected SchemaClass GetSchema(ClassKey key, bool waitForStable) { if (waitForStable) { this.WaitForStable() ; } SchemaClass returnValue = null ; try { returnValue = Packages[key.PackageName][key.GetKeyString()] ; } catch (KeyNotFoundException) { // eat it } return returnValue ; } protected void WaitForStable() { foreach (Broker broker in Brokers) { broker.WaitForStable() ; } } public Broker GetBroker(long BrokerBank) { Broker returnValue = null ; foreach (Broker broker in Brokers) { if (broker.BrokerBank() == BrokerBank) { returnValue = broker ; break ; } } return returnValue ; } public MethodResult InvokeMethod(QMFObject obj, string name, List args, bool synchronous, int timeToLive) { Broker aBroker = this.GetBroker(obj.BrokerBank()) ; long seq = this.SendMethodRequest(obj, aBroker, name, args, synchronous, timeToLive) ; if (seq != 0) { if (!synchronous) { return null ; } try { aBroker.WaitForSync(timeToLive) ; } catch (Exception e) { SequenceManager.Release(seq) ; throw e ; } // FIXME missing error logic in the broker return (MethodResult) SyncResult ; } return null ; } protected long SendMethodRequest(QMFObject obj, Broker aBroker, string name, List args, bool synchronous, int timeToLive) { SchemaMethod method = obj.Schema.GetMethod(name) ; if (args == null) { args = new List() ; } long seq = 0 ; if (method != null) { KeyValuePair pair = new KeyValuePair(method, synchronous) ; seq = SequenceManager.Reserve(pair) ; IEncoder enc = aBroker.CreateEncoder('M', seq) ; obj.ObjectID.encode(enc) ; obj.Schema.Key.encode(enc) ; enc.WriteStr8(name) ; if (args.Count < method.InputArgCount) { throw new Exception(String.Format("Incorrect number of arguments: expected {0}, got{1}", method.InputArgCount, args.Count)) ; } int argIndex = 0 ; foreach (SchemaArgument arg in method.Arguments) { if (arg.IsInput()) {; this.EncodeValue(enc, arg.Type, args[argIndex]) ; argIndex += 1 ; } } Message msg = aBroker.CreateMessage(enc,obj.RoutingKey(),timeToLive) ; if (synchronous) { aBroker.SetSyncInFlight(true) ; } aBroker.Send(msg) ; } return seq ; } public QMFObject MakeObject(ClassKey key) { SchemaClass sClass = this.GetSchema(key) ; if (sClass == null) { throw new Exception("No schema found for class " + key.ToString()) ; } return this.CreateQMFObject(sClass, true, true, false) ; } public QMFObject MakeObject(String keyString) { return this.MakeObject(new ClassKey(keyString)) ; } // Callback Methods public void HandleNewAgent(Agent agent) { if (Console != null) { Console.NewAgent(agent) ; } } public void HandleAgentRemoved(Agent agent) { if (Console != null) { Console.AgentRemoved(agent) ; } } public void HandleBrokerConnect(Broker broker) { if (Console != null) { Console.BrokerConnected(broker) ; } } public void HandleBrokerDisconnect(Broker broker) { if (Console != null) { Console.BrokerDisconnected(broker) ; } } public void HandleBrokerResponse(Broker broker, IDecoder decoder, long sequence) { if (Console != null) { Console.BrokerInformation(broker) ; } long seq = SequenceManager.Reserve(CONTEXT_STARTUP) ; IEncoder endocder = broker.CreateEncoder('P', seq) ; broker.Send(endocder) ; } public void HandlePackageIndicator(Broker broker, IDecoder decoder, long sequence) { string packageName = decoder.ReadStr8() ; bool notify = false ; if (!Packages.ContainsKey(packageName)) { lock (LockObject) { Packages[packageName] = new Dictionary() ; notify = true ; } } if (notify && Console != null) { Console.NewPackage(packageName) ; } broker.IncrementOutstanding() ; long seq = SequenceManager.Reserve(Session.CONTEXT_STARTUP) ; IEncoder enc = broker.CreateEncoder('Q', seq) ; enc.WriteStr8(packageName) ; broker.Send(enc) ; } public void HandleCommandComplete(Broker broker, IDecoder decoder, long sequence) { long code = decoder.ReadUint32() ; string text = decoder.ReadStr8() ; Object context = this.SequenceManager.Release(sequence) ; if (context.Equals(CONTEXT_STARTUP)) { broker.DecrementOutstanding() ; } else { if ((context.Equals(CONTEXT_SYNC)) & broker.GetSyncInFlight()) { broker.SetSyncInFlight(false) ; } else { if (context.Equals(CONTEXT_MULTIGET) && SyncSequenceList.Contains(sequence)) { lock(LockObject) { SyncSequenceList.Remove(sequence) ; if (SyncSequenceList.Count == 0) { Monitor.PulseAll(LockObject) ; } } } } } } public void HandleClassIndicator(Broker broker, IDecoder decoder, long sequence) { short kind = decoder.ReadUint8() ; ClassKey classKey = new ClassKey(decoder) ; bool unknown = false ; lock (LockObject) { if (Packages.ContainsKey(classKey.PackageName)) { if (!Packages[classKey.PackageName].ContainsKey(classKey.GetKeyString())) { unknown = true ; } } } if (unknown) { broker.IncrementOutstanding() ; long seq = SequenceManager.Reserve(Session.CONTEXT_STARTUP) ; IEncoder enc = broker.CreateEncoder('S', seq) ; classKey.encode(enc) ; broker.Send(enc) ; } } public void HandleMethodResponse(Broker broker, IDecoder decoder, long sequence) { long code = decoder.ReadUint32() ; string text = decoder.ReadStr16() ; Dictionary outArgs = new Dictionary() ; object obj = SequenceManager.Release(sequence) ; if (obj == null) { return ; } KeyValuePair pair = (KeyValuePair) obj ; if (code == 0) { foreach (SchemaArgument arg in pair.Key.Arguments) { if (arg.IsOutput()) { outArgs.Add(arg.Name, this.DecodeValue(decoder, arg.Type)) ; } } } MethodResult result = new MethodResult(code, text, outArgs) ; if (pair.Value) { this.SyncResult = result; broker.SetSyncInFlight(false) ; } if (Console != null) { Console.MethodResponse(broker, sequence, result) ; } } public void HandleHeartbeatIndicator(Broker broker, IDecoder decoder, long sequence, IMessage msg) { if (Console != null) { long brokerBank = 1 ; long agentBank = 0 ; try { string routingKey = msg.DeliveryProperties.GetRoutingKey() ; if (routingKey != null) { agentBank = Agent.GetBrokerBank(routingKey) ; brokerBank = Agent.GetBrokerBank(routingKey) ; } } catch (Exception e) { log.Warn("Internal QPID error", e) ; } string agentKey = Agent.AgentKey(agentBank, brokerBank) ; long timestamp = decoder.ReadUint64() ; if (broker.Agents.ContainsKey(agentKey)) { Agent agent = broker.Agents[agentKey] ; Console.HearbeatRecieved(agent, timestamp) ; } } } public void HandleEventIndicator(Broker broker, IDecoder decoder, long sequence) { if (Console != null) { QMFEvent newEvent = new QMFEvent(this, decoder) ; Console.EventRecieved(broker, newEvent) ; } } public void HandleSchemaResponse(Broker broker, IDecoder decoder, long sequence) { short kind = decoder.ReadUint8() ; ClassKey classKey = new ClassKey(decoder) ; SchemaClass sClass = new SchemaClass(kind, classKey, decoder, this) ; lock(LockObject) { Dictionary classMappings = Packages[sClass.PackageName] ; classMappings.Remove(sClass.ClassKeyString) ; classMappings.Add(sClass.ClassKeyString, sClass) ; } SequenceManager.Release(sequence) ; broker.DecrementOutstanding() ; if (Console != null) { this.Console.NewClass(kind, classKey) ; } } public void HandleContentIndicator(Broker broker, IDecoder decoder, long sequence, bool hasProperties, bool hasStatistics) { ClassKey key = new ClassKey(decoder) ; SchemaClass sClass = null ;; lock (LockObject) { sClass = GetSchema(key, false) ; } if (sClass != null) { QMFObject obj = this.CreateQMFObject(sClass, decoder, hasProperties, hasStatistics, true) ; if (key.PackageName.Equals("org.apache.qpid.broker") && key.ClassName.Equals("agent") && hasProperties) { broker.UpdateAgent(obj) ; } lock (LockObject) { if (SyncSequenceList.Contains(sequence)) { if (!obj.IsDeleted() && this.SelectMatch(obj)) { GetResult.Add(obj) ; } } } if (Console != null) { if (hasProperties) { Console.ObjectProperties(broker, obj) ; } if (hasStatistics) { Console.ObjectStatistics(broker, obj) ; } } } } public bool SelectMatch(QMFObject obj) { return true ; } public object DecodeValue(IDecoder dec, short type) { switch (type) { case 1: return dec.ReadUint8() ; // U8 case 2: return dec.ReadUint16() ; // U16 case 3: return dec.ReadUint32() ; // U32 case 4: return dec.ReadUint64() ; // U64 case 6: return dec.ReadStr8() ; // SSTR case 7: return dec.ReadStr16() ; // LSTR case 8: return dec.ReadDatetime() ; // ABSTIME case 9: return dec.ReadUint32() ; // DELTATIME case 10: return new ObjectID(dec) ; // ref case 11: return dec.ReadUint8() != 0 ; // bool case 12: return dec.ReadFloat() ; // float case 13: return dec.ReadDouble() ; // double case 14: return dec.ReadUuid() ; // UUID case 15: return dec.ReadMap() ; // Ftable case 16: return dec.ReadInt8() ; // int8 case 17: return dec.ReadInt16() ; // int16 case 18: return dec.ReadInt32() ; // int32 case 19: return dec.ReadInt64() ; // int64 case 20: // Object // Peek into the inner type code, make sure // it is actually an object object returnValue = null ; short innerTypeCode = dec.ReadUint8() ; if (innerTypeCode != 20) { returnValue = this.DecodeValue(dec, innerTypeCode) ; } else { ClassKey classKey = new ClassKey(dec) ; lock(LockObject) { SchemaClass sClass = GetSchema(classKey) ; if (sClass != null) { returnValue = this.CreateQMFObject(sClass, dec, true, true, false) ; } } } return returnValue; case 21: // List { MSDecoder lDec = new MSDecoder(); lDec.Init(new MemoryStream(dec.ReadVbin32())); long count = lDec.ReadUint32(); List newList = new List(); while (count > 0) { short innerType = lDec.ReadUint8(); newList.Add(this.DecodeValue(lDec, innerType)); count -= 1; } return newList; } case 22: // Array { MSDecoder aDec = new MSDecoder(); aDec.Init(new MemoryStream(dec.ReadVbin32())); long cnt = aDec.ReadUint32(); short innerType = aDec.ReadUint8(); List aList = new List(); while (cnt > 0) { aList.Add(this.DecodeValue(aDec, innerType)); cnt -= 1; } return aList; } default: throw new Exception(String.Format("Invalid Type Code: {0}", type)) ; } } public void EncodeValue(IEncoder enc, short type, object val) { try { switch ((int)type) { case 1: enc.WriteUint8((short) val) ; break; // U8 case 2: enc.WriteUint16((int) val) ; break; // U16 case 3: enc.WriteUint32((long) val) ; break; // U32 case 4: enc.WriteUint64((long) val) ; break; // U64 case 6: enc.WriteStr8((string) val) ; break; // SSTR case 7: enc.WriteStr16((string) val) ; break; // LSTR case 8: enc.WriteDatetime((long) val); break; // ABSTIME case 9: enc.WriteUint32((long) val); break; // DELTATIME case 10: ((ObjectID)val).encode(enc) ; break; // ref case 11: if ((bool) val) { enc.WriteUint8(1) ; } else { enc.WriteUint8(0) ; } break ; case 12: enc.WriteFloat((float) val); break; // FLOAT case 13: enc.WriteDouble((double) val); break; // DOUBLE case 14: enc.WriteUuid((UUID) val) ; break ; // UUID case 15: enc.WriteMap((Dictionary) val) ; break ; // Ftable case 16: enc.WriteInt8((short) val) ; break; // int8 case 17: enc.WriteInt16((int) val) ; break; // int16 case 18: enc.WriteInt32(long.Parse(""+ val)) ; break; // int32 case 19: enc.WriteInt64(long.Parse("" + val)) ; break; // int64 case 20: // Object // Check that the object has a session, if not // take ownership of it QMFObject qObj = (QMFObject) val ; if (qObj.Session == null) { qObj.Session = this ; } qObj.Encode(enc) ; break; case 21: // List List items = (List) val ; MSEncoder lEnc = new MSEncoder(1) ; lEnc.Init() ; lEnc.WriteUint32(items.Count) ; foreach (object obj in items) { short innerType = Util.QMFType(obj) ; lEnc.WriteUint8(innerType) ; this.EncodeValue(lEnc,innerType,obj) ; } enc.WriteVbin32(lEnc.Segment().ToArray()) ; break ; case 22: // Array List aItems = (List) val ; MSEncoder aEnc = new MSEncoder(1) ; aEnc.Init() ; long aCount = aItems.Count ; aEnc.WriteUint32(aCount) ; if (aCount > 0) { Object anObj = aItems[0] ; short innerType = Util.QMFType(anObj) ; aEnc.WriteUint8(innerType) ; foreach (object obj in aItems) { this.EncodeValue(aEnc,innerType,obj) ; } } enc.WriteVbin32(aEnc.Segment().ToArray()) ; break ; default: throw new Exception(String.Format("Invalid Type Code: {0}", type)) ; } } catch (System.InvalidCastException e) { string msg = String.Format("Class cast exception for typecode {0}, type {1} ", type, val.GetType()) ; log.Error(msg) ; throw new Exception(msg + type, e) ; } } public List BindingKeys() { List bindings = new List() ; bindings.Add("schema.#") ; if (RecieveObjects & RecieveEvents & RecieveHeartbeat & !UserBindings) { bindings.Add("console.#") ; } else { if (RecieveObjects & !UserBindings) { bindings.Add("console.obj.#") ; } else { bindings.Add("console.obj.*.*.org.apache.qpid.broker.agent") ; } if (RecieveEvents) { bindings.Add("console.event.#") ; } if (RecieveHeartbeat) { bindings.Add("console.heartbeat.#") ; } } return bindings ; } protected QMFObject CreateQMFObject(SchemaClass schema, bool hasProperties, bool hasStats , bool isManaged) { Type realClass = typeof(QMFObject) ; if (Console != null) { realClass = Console.TypeMapping(schema.Key) ; } Type[] types = new Type[] {typeof(Session), typeof(SchemaClass), typeof(bool), typeof(bool),typeof(bool)} ; object[] args = new object[] {this, schema, hasProperties, hasStats, isManaged} ; ConstructorInfo ci = realClass.GetConstructor(types); return (QMFObject) ci.Invoke(args) ; } protected QMFObject CreateQMFObject(SchemaClass schema, IDecoder dec, bool hasProperties, bool hasStats , bool isManaged) { Type realClass = typeof(QMFObject) ; if (Console != null) { realClass = Console.TypeMapping(schema.Key) ; } Type[] types = new Type[] {typeof(Session), typeof(SchemaClass), typeof(IDecoder), typeof(bool), typeof(bool),typeof(bool)} ; object[] args = new object[] {this, schema, dec, hasProperties, hasStats, isManaged} ; ConstructorInfo ci = realClass.GetConstructor(types); return (QMFObject) ci.Invoke(args) ; } } }