1 package cz.cuni.amis.pogamut.ut2004multi.communication.worldview.stubs; 2 3 import java.util.concurrent.BlockingQueue; 4 import java.util.concurrent.LinkedBlockingQueue; 5 import java.util.concurrent.TimeUnit; 6 7 import junit.framework.Assert; 8 import cz.cuni.amis.pogamut.base.communication.exception.CommunicationException; 9 import cz.cuni.amis.pogamut.base.communication.mediator.IMediator; 10 import cz.cuni.amis.pogamut.base.communication.translator.event.IWorldChangeEvent; 11 import cz.cuni.amis.pogamut.base.communication.worldview.IWorldChangeEventInput; 12 import cz.cuni.amis.pogamut.base.utils.logging.IAgentLogger; 13 import cz.cuni.amis.pogamut.base.utils.logging.LogCategory; 14 import cz.cuni.amis.utils.Job; 15 import cz.cuni.amis.utils.collections.MyCollections; 16 import cz.cuni.amis.utils.flag.Flag; 17 import cz.cuni.amis.utils.flag.ImmutableFlag; 18 import cz.cuni.amis.utils.token.IToken; 19 import cz.cuni.amis.utils.token.Tokens; 20 import java.util.logging.Level; 21 import java.util.logging.Logger; 22 23 public class MediatorStub implements IMediator { 24 25 private static int num = 0; 26 27 private Flag<Boolean> running = new Flag<Boolean>(false); 28 private BlockingQueue<IWorldChangeEvent> eventsQueue = new LinkedBlockingQueue<IWorldChangeEvent>(); 29 private LogCategory log; 30 private IToken token; 31 private IWorldChangeEventInput consumer; 32 33 public MediatorStub(IAgentLogger log, IWorldChangeEvent[] events) { 34 pushEvent(events); 35 this.token = Tokens.get("MediatorStub" + (++num)); 36 this.log = log.getCategory(this); 37 } 38 39 public MediatorStub(IAgentLogger logger) { 40 this.token = Tokens.get("MediatorStub" + (++num)); 41 this.log = logger.getCategory(this); 42 } 43 44 public int getEventQueueLength() { 45 return eventsQueue.size(); 46 } 47 48 public void pushEvent(IWorldChangeEvent event) { 49 eventsQueue.add(event); 50 } 51 52 public void pushEvent(IWorldChangeEvent[] events) { 53 eventsQueue.addAll(MyCollections.asList(events)); 54 } 55 56 public void clearEventsQueue() { 57 if (log.isLoggable(Level.WARNING)) log.warning("clearing events queue"); 58 eventsQueue.clear(); 59 } 60 61 public ImmutableFlag<Boolean> getRunning() { 62 return running.getImmutable(); 63 } 64 65 public void setConsumer(IWorldChangeEventInput consumer) { 66 this.consumer = consumer; 67 } 68 69 public IToken getComponentId() { 70 return token; 71 } 72 73 public LogCategory getLog() { 74 return log; 75 } 76 77 public void kill() { 78 running.setFlag(false); 79 } 80 81 public void start() throws CommunicationException { 82 running.setFlag(true); 83 new Job() { 84 85 @Override 86 protected void job() throws Exception { 87 try { 88 while (eventsQueue.size() > 0) { 89 if (!running.getFlag()) { 90 if (log.isLoggable(Level.WARNING)) log.warning("Me: Stop requested, stopping mediator."); 91 break; 92 } 93 IWorldChangeEvent event = eventsQueue.poll(100, TimeUnit.MILLISECONDS); 94 if (!running.getFlag()) { 95 if (log.isLoggable(Level.WARNING)) log.warning("Me: Stop requested, stopping mediator."); 96 break; 97 } 98 if (event == null) continue; 99 if (log.isLoggable(Level.INFO)) log.info("event " + event); 100 consumer.notify(event); 101 } 102 stop(); 103 clearEventsQueue(); 104 } catch (Exception e) { 105 e.printStackTrace(); 106 Assert.fail("WorldView failed to process event..."); 107 } 108 } 109 110 }.startJob(); 111 } 112 113 public void stop() { 114 running.setFlag(false); 115 } 116 117 }