14 #include <epicsGuard.h> 15 #include <pv/thread.h> 16 #include <pv/bitSetUtil.h> 17 #include <pv/pvData.h> 18 #include <pv/pvAccess.h> 19 #include <pv/pvTimeStamp.h> 20 #include <pv/rpcService.h> 21 #include <pv/serverContext.h> 22 #include <pv/timeStamp.h> 24 #define epicsExportSharedSymbols 32 using std::tr1::static_pointer_cast;
37 namespace epics {
namespace pvDatabase {
42 static MonitorPtr nullMonitor;
43 static MonitorElementPtr NULLMonitorElement;
44 static Status failedToCreateMonitorStatus(
45 Status::STATUSTYPE_ERROR,
"failed to create monitor");
46 static Status alreadyStartedStatus(Status::STATUSTYPE_ERROR,
"already started");
47 static Status notStartedStatus(Status::STATUSTYPE_ERROR,
"not started");
48 static Status deletedStatus(Status::STATUSTYPE_ERROR,
"record is deleted");
50 class MonitorElementQueue;
53 class MonitorElementQueue
56 MonitorElementPtrArray elements;
66 POINTER_DEFINITIONS(MonitorElementQueue);
68 MonitorElementQueue(std::vector<MonitorElementPtr> monitorElementArray)
69 : elements(monitorElementArray),
70 size(monitorElementArray.size()),
80 virtual ~MonitorElementQueue() {}
92 MonitorElementPtr getFree()
94 if(numberFree==0)
return MonitorElementPtr();
96 int ind = nextGetFree;
97 MonitorElementPtr queueElement = elements[nextGetFree++];
98 if(nextGetFree>=size) nextGetFree = 0;
102 void setUsed(MonitorElementPtr
const &element)
104 if(element!=elements[nextSetUsed++]) {
105 throw std::logic_error(
"not correct queueElement");
108 if(nextSetUsed>=size) nextSetUsed = 0;
111 MonitorElementPtr getUsed()
113 if(numberUsed==0)
return MonitorElementPtr();
114 int ind = nextGetUsed;
115 MonitorElementPtr queueElement = elements[nextGetUsed++];
116 if(nextGetUsed>=size) nextGetUsed = 0;
117 return elements[ind];
119 void releaseUsed(MonitorElementPtr
const &element)
121 if(element!=elements[nextReleaseUsed++]) {
122 throw std::logic_error(
123 "not queueElement returned by last call to getUsed");
125 if(nextReleaseUsed>=size) nextReleaseUsed = 0;
138 public std::tr1::enable_shared_from_this<MonitorLocal>
140 enum MonitorState {idle,active,deleted};
142 POINTER_DEFINITIONS(MonitorLocal);
143 virtual ~MonitorLocal();
144 virtual Status start();
145 virtual Status stop();
146 virtual MonitorElementPtr poll();
147 virtual void detach(
PVRecordPtr const & pvRecord){}
148 virtual void release(MonitorElementPtr
const & monitorElement);
150 virtual void dataPut(
153 virtual void beginGroupPut(
PVRecordPtr const & pvRecord);
154 virtual void endGroupPut(
PVRecordPtr const & pvRecord);
155 virtual void unlisten(
PVRecordPtr const & pvRecord);
156 MonitorElementPtr getActiveElement();
157 void releaseActiveElement();
158 bool init(PVStructurePtr
const & pvRequest);
160 MonitorRequester::shared_pointer
const & channelMonitorRequester,
164 MonitorLocalPtr getPtrSelf()
166 return shared_from_this();
168 MonitorRequester::weak_pointer monitorRequester;
172 MonitorElementQueuePtr queue;
173 MonitorElementPtr activeElement;
180 MonitorLocal::MonitorLocal(
181 MonitorRequester::shared_pointer
const & channelMonitorRequester,
183 : monitorRequester(channelMonitorRequester),
191 MonitorLocal::~MonitorLocal()
197 Status MonitorLocal::start()
199 if(pvRecord->getTraceLevel()>0)
201 cout <<
"MonitorLocal::start state " << state << endl;
205 if(state==active)
return alreadyStartedStatus;
206 if(state==deleted)
return deletedStatus;
208 pvRecord->addListener(getPtrSelf(),pvCopy);
209 epicsGuard <PVRecord> guard(*pvRecord);
214 activeElement = queue->getFree();
215 activeElement->changedBitSet->clear();
216 activeElement->overrunBitSet->clear();
217 activeElement->changedBitSet->set(0);
218 releaseActiveElement();
222 Status MonitorLocal::stop()
224 if(pvRecord->getTraceLevel()>0){
225 cout <<
"MonitorLocal::stop state " << state << endl;
229 if(state==idle)
return notStartedStatus;
230 if(state==deleted)
return deletedStatus;
233 pvRecord->removeListener(getPtrSelf(),pvCopy);
237 MonitorElementPtr MonitorLocal::poll()
239 if(pvRecord->getTraceLevel()>1)
241 cout <<
"MonitorLocal::poll state " << state << endl;
245 if(state!=active)
return NULLMonitorElement;
246 return queue->getUsed();
250 void MonitorLocal::release(MonitorElementPtr
const & monitorElement)
252 if(pvRecord->getTraceLevel()>1)
254 cout <<
"MonitorLocal::release state " << state << endl;
258 if(state!=active)
return;
259 queue->releaseUsed(monitorElement);
263 void MonitorLocal::releaseActiveElement()
265 if(pvRecord->getTraceLevel()>1)
267 cout <<
"MonitorLocal::releaseActiveElement state " << state << endl;
271 if(state!=active)
return;
272 bool result = pvCopy->updateCopyFromBitSet(activeElement->pvStructurePtr,activeElement->changedBitSet);
274 MonitorElementPtr newActive = queue->getFree();
275 if(!newActive)
return;
276 BitSetUtil::compress(activeElement->changedBitSet,activeElement->pvStructurePtr);
277 BitSetUtil::compress(activeElement->overrunBitSet,activeElement->pvStructurePtr);
278 queue->setUsed(activeElement);
279 activeElement = newActive;
280 activeElement->changedBitSet->clear();
281 activeElement->overrunBitSet->clear();
283 MonitorRequesterPtr requester = monitorRequester.lock();
284 if(!requester)
return;
285 requester->monitorEvent(getPtrSelf());
291 if(pvRecord->getTraceLevel()>1)
293 cout <<
"MonitorLocal::dataPut(pvRecordField)" << endl;
295 if(state!=active)
return;
298 size_t offset = pvCopy->getCopyOffset(pvRecordField->getPVField());
299 BitSetPtr
const &changedBitSet = activeElement->changedBitSet;
300 BitSetPtr
const &overrunBitSet = activeElement->overrunBitSet;
301 bool isSet = changedBitSet->get(offset);
302 changedBitSet->set(offset);
303 if(isSet) overrunBitSet->set(offset);
307 releaseActiveElement();
312 void MonitorLocal::dataPut(
316 if(pvRecord->getTraceLevel()>1)
318 cout <<
"MonitorLocal::dataPut(requested,pvRecordField)" << endl;
320 if(state!=active)
return;
323 BitSetPtr
const &changedBitSet = activeElement->changedBitSet;
324 BitSetPtr
const &overrunBitSet = activeElement->overrunBitSet;
325 size_t offsetCopyRequested = pvCopy->getCopyOffset(
326 requested->getPVField());
327 size_t offset = offsetCopyRequested
328 + (pvRecordField->getPVField()->getFieldOffset()
329 - requested->getPVField()->getFieldOffset());
330 bool isSet = changedBitSet->get(offset);
331 changedBitSet->set(offset);
332 if(isSet) overrunBitSet->set(offset);
336 releaseActiveElement();
341 void MonitorLocal::beginGroupPut(
PVRecordPtr const & pvRecord)
343 if(pvRecord->getTraceLevel()>1)
345 cout <<
"MonitorLocal::beginGroupPut()" << endl;
347 if(state!=active)
return;
355 void MonitorLocal::endGroupPut(
PVRecordPtr const & pvRecord)
357 if(pvRecord->getTraceLevel()>1)
359 cout <<
"MonitorLocal::endGroupPut dataChanged " << dataChanged << endl;
361 if(state!=active)
return;
368 releaseActiveElement();
372 void MonitorLocal::unlisten(
PVRecordPtr const & pvRecord)
374 if(pvRecord->getTraceLevel()>1)
376 cout <<
"MonitorLocal::unlisten\n";
382 MonitorRequesterPtr requester = monitorRequester.lock();
384 if(pvRecord->getTraceLevel()>1)
386 cout <<
"MonitorLocal::unlisten calling requester->unlisten\n";
388 requester->unlisten(getPtrSelf());
393 bool MonitorLocal::init(PVStructurePtr
const & pvRequest)
396 size_t queueSize = 2;
397 PVStructurePtr pvOptions = pvRequest->getSubField<PVStructure>(
"record._options");
398 MonitorRequesterPtr requester = monitorRequester.lock();
399 if(!requester)
return false;
401 PVStringPtr pvString = pvOptions->getSubField<PVString>(
"queueSize");
405 std::stringstream ss;
406 ss << pvString->get();
410 requester->message(
"queueSize " +pvString->get() +
" illegal",errorMessage);
415 pvField = pvRequest->getSubField(
"field");
417 pvCopy = PVCopy::create(
418 pvRecord->getPVRecordStructure()->getPVStructure(),
421 requester->message(
"illegal pvRequest",errorMessage);
425 if(pvField->getField()->getType()!=structure) {
426 requester->message(
"illegal pvRequest",errorMessage);
429 pvCopy = PVCopy::create(
430 pvRecord->getPVRecordStructure()->getPVStructure(),
433 requester->message(
"illegal pvRequest",errorMessage);
437 if(queueSize<2) queueSize = 2;
438 std::vector<MonitorElementPtr> monitorElementArray;
439 monitorElementArray.reserve(queueSize);
440 for(
size_t i=0; i<queueSize; i++) {
441 PVStructurePtr pvStructure = pvCopy->createPVStructure();
442 MonitorElementPtr monitorElement(
443 new MonitorElement(pvStructure));
444 monitorElementArray.push_back(monitorElement);
447 requester->monitorConnect(
450 pvCopy->getStructure());
456 MonitorRequester::shared_pointer
const & monitorRequester,
457 PVStructurePtr
const & pvRequest)
459 MonitorLocalPtr monitor(
new MonitorLocal(
460 monitorRequester,pvRecord));
461 bool result = monitor->init(pvRequest);
464 StructureConstPtr structure;
465 monitorRequester->monitorConnect(
466 failedToCreateMonitorStatus,monitor,structure);
469 if(pvRecord->getTraceLevel()>0)
471 cout <<
"MonitorFactory::createMonitor" 472 <<
" recordName " << pvRecord->getRecordName() << endl;
std::tr1::shared_ptr< PVCopy > PVCopyPtr
Listener for PVRecord::message.
std::tr1::shared_ptr< MonitorLocal > MonitorLocalPtr
std::tr1::shared_ptr< MonitorRequester > MonitorRequesterPtr
std::tr1::shared_ptr< PVRecord > PVRecordPtr
std::tr1::shared_ptr< PVRecordStructure > PVRecordStructurePtr
std::tr1::shared_ptr< PVRecordField > PVRecordFieldPtr
std::tr1::shared_ptr< MonitorElementQueue > MonitorElementQueuePtr
MonitorPtr createMonitorLocal(PVRecordPtr const &pvRecord, MonitorRequester::shared_pointer const &monitorRequester, PVStructurePtr const &pvRequest)