The VisAO Camera
frameserver.cpp
Go to the documentation of this file.
1 /************************************************************
2 * frameserver.cpp
3 *
4 * Author: Jared R. Males (jrmales@email.arizona.edu)
5 *
6 * Definitions for a class for serving frames over UDP.
7 *
8 * Developed as part of the Magellan Adaptive Optics system.
9 ************************************************************/
10 
11 /** \file frameserver.cpp
12  * \author Jared R. Males
13  * \brief Definitions for a class for serving frames over UDP.
14  *
15  *
16 */
17 
18 #include "frameserver.h"
19 
20 
21 #ifndef ERROR_REPORT
22 #define ERROR_REPORT(er) if(global_error_report) (*global_error_report)(er,__FILE__,__LINE__);
23 #endif
24 
25 #ifndef LOG_INFO
26 #define LOG_INFO(li) if(global_log_info) (*global_log_info)(li);
27 #endif
28 
29 namespace VisAO
30 {
31 
32 
33 frameserver::frameserver(int argc, char **argv) throw (AOException) : VisAOApp_standalone(argc, argv)
34 {
35  Create();
36 }
37 
38 frameserver::frameserver(std::string name, const std::string &conffile) throw (AOException) : VisAOApp_standalone(name, conffile)
39 {
40  Create();
41 }
42 
43 int frameserver::Create()
44 {
45  std::string pathtmp;
46 
47  std::string visao_root = getenv("VISAO_ROOT");
48 
49  //Set up the ping_fifo_path
50  try
51  {
52  pathtmp = (std::string)(ConfigDictionary())["pingFifoPath"];
53  }
54  catch(Config_File_Exception)
55  {
56  pathtmp = "fifos";
57  }
58  pingFifoPath = visao_root + "/" + pathtmp +"/";
59  _logger->log(Logger::LOG_LEV_INFO, "Set pingFifoPath: %s", pingFifoPath.c_str());
60 
61 
62  try
63  {
64  remoteIP = (std::string)(ConfigDictionary())["remoteIP"];
65  }
66  catch(Config_File_Exception)
67  {
68  remoteIP = "127.0.0.1";
69  }
70  _logger->log(Logger::LOG_LEV_INFO, "Set remoteIP: %s", remoteIP.c_str());
71 
72  try
73  {
74  localPort =(ConfigDictionary())["localPort"];
75  }
76  catch(Config_File_Exception)
77  {
78  localPort = 2000;
79  }
80  _logger->log(Logger::LOG_LEV_INFO, "Set localPort: %i", localPort);
81 
82  try
83  {
84  remotePort = (ConfigDictionary())["remotePort"];
85  }
86  catch(Config_File_Exception)
87  {
88  remotePort = 2001;
89  }
90  _logger->log(Logger::LOG_LEV_INFO, "Set remotePort: %i", remotePort);
91 
92 
93  setup_fifo_list(3);
94  setup_baseApp(1, 1, 0, 0, false);
95  //setup_baseApp();
96 
97  //set_fifo_list_channel(&fl, 0, RWBUFF_SZ, (char *)std::string(pingFifoPath + "frameserver47_ping_in").c_str(), (char *)std::string(pingFifoPath + "frameserver47_ping_out").c_str(), &frame_ready, (void *)this);
98 
99  //No fifo handler, rather we assign it its own signal handler
100  set_fifo_list_channel(&fl, FRAME_READY_FIFO_CH, RWBUFF_SZ, (char *)std::string(pingFifoPath + "frameserver47_ping_in").c_str(), (char *)std::string(pingFifoPath + "frameserver47_ping_out").c_str(), 0, (void *)this);
101 
102  shmem_key = 5000;
103  attached_to_shmem = false;
104 
105  total_skipped = 0;
106  behind = 0;
107 
108  //Init the status board
110  if(create_statusboard(sizeof(basic_status_board)) != 0)
111  {
113  _logger->log(Logger::LOG_LEV_ERROR, "Could not create status board.");
114  }
115  else
116  {
118  strncpy(bsb->appname, MyFullName().c_str(), 25);
119  bsb->max_update_interval = 1;
120  }
121 
122  //_MAX_TDP_PACKET_SIZE = 6500; //Constants::MAX_TDP_PACKET_SIZE
123  //_MAX_ETH_PACKET_SIZE = 6500; //Constants::MAX_ETH_PACKET_SIZE
124 
125  _MAX_TDP_PACKET_SIZE = Constants::MAX_TDP_PACKET_SIZE;
126  _MAX_ETH_PACKET_SIZE = Constants::MAX_ETH_PACKET_SIZE;
127 
128  return 0;
129 }
130 
131 int frameserver::setPingFifoPath(std::string &pfp)
132 {
133  pingFifoPath = pfp;
134  return 0;
135 }
136 
137 int frameserver::setRemoteIp(std::string &ip)
138 {
139  remoteIP = ip;
140  return 0;
141 }
142 
143 int frameserver::setLocalPort(int lp)
144 {
145  localPort = lp;
146  return 0;
147 }
148 
149 int frameserver::setRemotePort(int rp)
150 {
151  remotePort = rp;
152  return 0;
153 }
154 
155 int frameserver::set_sharedim_stack(sharedim_stackS * s)
156 {
157  sis = s;
158  return 0;
159 }
160 
161 int frameserver::set_shmem_key(int sk)
162 {
163  shmem_key = sk;
164  return 0;
165 }
166 
167 int frameserver::connect_shmem()
168 {
169  sis = new sharedim_stackS;
170  if(sis->attach_shm(shmem_key) != 0)
171  {
172  ERROR_REPORT("Error attaching to shared memory.");
173  attached_to_shmem = false;
174  delete sis;
175  return -1;
176  }
177  attached_to_shmem = true;
178  return 0;
179 }
180 
181 int frameserver::set_sim(sharedimS s)
182 {
183  sim = s;
184  return 0;
185 }
186 
188 {
189  /*struct sigaction act;
190  sigset_t sset;
191 
192  connect_shmem();
193 
194  global_fifo_list = &fl;
195 
196  signal(SIGIO, SIG_IGN);
197  if(connect_fifo_list() == 0)
198  {
199  LOG_INFO("fifo_list connected.");
200  }
201  else
202  {
203  ERROR_REPORT("Error connecting the fifo list.");
204  return -1;
205  }
206 
207  act.sa_handler = &catch_fifo_response_list;
208  act.sa_flags = 0;
209  sigemptyset(&sset);
210  act.sa_mask = sset;
211 
212  sigaction(SIGIO, &act, 0);*/
213 
214  signal(SIGIO, SIG_IGN);
215  signal(RTSIGIO, SIG_IGN);
216 
217  //Install the main thread handler
219  {
220  ERROR_REPORT("Error installing main thread catcher.");
221  return -1;
222  }
223 
224  //Startup the I/O signal handling thread, without INHERIT_SCHED
225  if(start_signal_catcher() != 0)
226  {
227  ERROR_REPORT("Error starting signal catching thread.");
228  return -1;
229  }
230 
231  sleep(1); //let signal thread get started
232 
233  //Now Block all I/O signals in this thread.
234  if(block_sigio() != 0)
235  {
236  ERROR_REPORT("Error blicking SIGIO.");
237  return -1;
238  }
239 
240  //Setup to catch the ping in this thread
241  //The low signal catcher has already blocked it.
242 
243  fcntl(fl.fifo_ch[FRAME_READY_FIFO_CH].fd_in, F_SETOWN, getpid());
244 
245  int rv = fcntl(fl.fifo_ch[FRAME_READY_FIFO_CH].fd_in, F_SETSIG, RTSIGPING);
246  if(rv < 0)
247  {
248  logss.str("");
249  logss << "Error changing signal: " << strerror(errno);
250  ERROR_REPORT(logss.str().c_str());
251  return -1;
252  }
253 
254  struct sigaction act;
255  sigset_t sset;
256 
257  act.sa_sigaction = &frame_ready_handler;
258  act.sa_flags = SA_SIGINFO;
259  sigemptyset(&sset);
260  act.sa_mask = sset;
261 
262  errno = 0;
263 
264  if(sigaction(RTSIGPING, &act, 0) < 0)
265  {
266  logss.str("");
267  logss << "Error setting signal handler for RTSIGPING. Errno says: " << strerror(errno) << ".";
268  error_report(Logger::LOG_LEV_ERROR, logss.str());
269  return -1;
270  }
271 
272  conn = new UdpConnection(localPort, remoteIP, remotePort, 0);
273 
274  LOG_INFO("starting up . . .");
275 
276  while(!TimeToDie)
277  {
278  sleep(1);
279  }
280 
281  return 0;
282 }
283 
284 
285 int frameserver::calc_packetsPerFrame(int frameSizePixels, int bitPerPix)
286 {
287  frameSizeDw = (frameSizePixels * bitPerPix) / (Constants::DWORD_SIZE*8) + 8; // 4 DW for header and 4 for footer
288 
289  int frameSizeBytes = frameSizeDw * Constants::DWORD_SIZE;
290 
291  packetsPerFrame = frameSizeBytes / _MAX_TDP_PACKET_SIZE;
292 
293  if(frameSizeBytes % _MAX_TDP_PACKET_SIZE)
294  {
295  packetsPerFrame++;
296  }
297 
298  return 0;
299 }
300 
301 
303 {
304  static int last_image_abs = -1, last_image = -1, last_save_sequence = -1;
305  int skipped;
306  //timeval tv0, tv1;
307  //double dt;
308  static int frames = 0;
309  struct timespec ts;
310  ts.tv_sec = 0;
311  //send_skipped = 0;
312 
313  if(attached_to_shmem == false)
314  {
315  if(connect_shmem() != 0) return -1;
316  }
317 
318  //Detect a framegrabber restart
319  if(sis->get_last_image_abs() < last_image_abs || last_save_sequence != sis->header->save_sequence)
320  {
321  last_image_abs = sis->get_last_image_abs();
322  last_image = sis->get_last_image();
323  last_save_sequence = sis->header->save_sequence;
324 
325  //If we restarted, log it:
326  if(sis->get_last_image_abs() < last_image_abs) LOG_INFO("Detected framegrabber restart, resetting");
327  }
328 
329  if(last_image_abs < 0)
330  {
331  last_image = sis->get_last_image();
332  last_save_sequence = sis->header->save_sequence;
333  last_image_abs = sis->get_last_image_abs()-1;
334  }
335 
336  DiagnosticUdpHeader* header = (DiagnosticUdpHeader*) &((TDPHeader*)sendBuff)->header;
337 
338 
339  while(last_image_abs < sis->get_last_image_abs() && !TimeToDie)
340  {
341  last_image_abs = sis->get_last_image_abs();
342 
343  sim = sis->get_image(last_image);
344  int tosend, maxpcksz, pcktsz, bytesent;
345  if(sim.nx)
346  {
347  calc_packetsPerFrame(sim.nx*sim.ny, 16);//sim.depth);
348 
349  tosend = frameSizeDw*Constants::DWORD_SIZE;
350 
351  maxpcksz = Constants::MAX_ETH_PACKET_SIZE;
352 
353  bytesent = 0;
354 
355  header->tot_len = tosend;
356  header->saddr = 0;
357  header->packetId = 0;
358  header->frameId = sim.frameNo;
359 
360  #ifdef _debug
361  std::cout << "frameSizeDw = " << frameSizeDw << "\n";
362  std::cout << "packetsPerFrame = " << packetsPerFrame << "\n";
363  std::cout << "bytes tosend = " << tosend << "\n";
364  std::cout << "frameId = " << header->frameId << std::endl;
365  #endif
366 
367  //Disaster waiting to happen: this assumes that there will be at least 16 bytes in the last frame.
368  int i;
369  int sleeps = 0;
370  for(i=0; i< packetsPerFrame; i++)
371  {
372 
373  if(tosend - bytesent + Constants::TDP_PACKET_HEADER_SIZE > maxpcksz) pcktsz = maxpcksz;
374  else pcktsz = tosend - bytesent+Constants::TDP_PACKET_HEADER_SIZE;
375 
376  if(i == 0) //First frame needs a header
377  {
378  for(int j= 0; j < 16; j++)
379  {
380  sendBuff[Constants::TDP_PACKET_HEADER_SIZE+j] = 0;
381  }
382  for(int j = 16; j < pcktsz - Constants::TDP_PACKET_HEADER_SIZE; j++)
383  {
384  sendBuff[Constants::TDP_PACKET_HEADER_SIZE + j] = ((BYTE *)sim.imdata)[bytesent + j-16];
385  }
386  }
387  else if(i == packetsPerFrame -1) //Last needs a footer
388  {
389  int j;
390  for(j = 0; j < pcktsz - Constants::TDP_PACKET_HEADER_SIZE - 16; j++)
391  {
392  sendBuff[Constants::TDP_PACKET_HEADER_SIZE + j] = ((BYTE *)sim.imdata)[bytesent + j-16];
393  }
394  for(; j < pcktsz - Constants::TDP_PACKET_HEADER_SIZE; j++)
395  {
396  sendBuff[Constants::TDP_PACKET_HEADER_SIZE+j] = 0;
397  }
398  }
399  else
400  {
401  for(int j = 0; j < pcktsz - Constants::TDP_PACKET_HEADER_SIZE; j++)
402  {
403  sendBuff[Constants::TDP_PACKET_HEADER_SIZE + j] = ((BYTE *)sim.imdata)[bytesent + j-16];
404  }
405  }
406 
407  int old_packet = header->packetId;
408 
409  while(header->packetId == old_packet && !TimeToDie)
410  {
411  try
412  {
413  conn->send(sendBuff, pcktsz);
414  if(sleeps > 0)
415  {
416 
417  ts.tv_nsec = 100;
418  nanosleep(&ts, 0);
419  sleeps = 0;
420  }
421  else sleeps++;
422 
423  header->packetId++;
424  bytesent += pcktsz-Constants::TDP_PACKET_HEADER_SIZE;
425  }
426  catch (UdpFatalException u)
427  {
428  //Wait for resource to become available. Re-throw otherwise.
429  if(errno != EAGAIN)
430  {
431  std::cout << u.what() << "\n";
432  std::cout << frames << " " << bytesent << "\n";
433  ERROR_REPORT("Exception thrown sending data via udp");
434  throw(u);
435  }
436  }
437  }
438 
439  }
440 
441  #ifdef _debug
442  std::cout << "bytes sent = " << bytesent << "\n";
443  std::cout << "packets sent = " << i << "\n" << std::endl;
444  #endif
445 
446  frames++;
447  last_image_abs++;
448  //last_image++;
449  last_image = sis->get_last_image();
450  if(last_image >= sis->get_max_n_images()) last_image = 0;
451 
452 
453  //This is here to avoid errors. Should be handled in separate thread, etc. Also should macro-ize the channel number.
454  while(read_fifo_channel(&fl.fifo_ch[0]) > 0);
455  }
456  else
457  {
458  #ifdef _debug
459  std::cout << "last_image " << last_image << "\n";
460  std::cout << "last_image_abs " << last_image_abs << "\n";
461  std::cout << "max_n_images " << sis->get_max_n_images() << "\n";
462  #endif
463  ERROR_REPORT("Error getting image in send_frame().");
464  exit(0);
465  }
466  behind = sis->get_last_image_abs() - last_image_abs;
467 
468  //if(behind) std::cout << "Behind by: " << sis->get_last_image_abs() - last_image_abs << "\n";
469 
470 
471  if(behind >= sis->get_max_n_images())
472  {
473  skipped = behind - (int) .5*sis->get_max_n_images();
474  total_skipped += skipped;
475  _logger->log(Logger::LOG_LEV_ERROR,"Behind %i frames, skipping %i frames. Total skipped: %i", behind, skipped, total_skipped);
476 
477  last_image_abs += skipped;
478  last_image += skipped;
479  if(last_image >= sis->get_max_n_images()) last_image = last_image - sis->get_max_n_images();
480 
481  }
482 
483  }
484 
485  return 0;
486 }
487 
488 void frame_ready_handler(int signum __attribute__((unused)), siginfo_t *siginf, void *ucont __attribute__((unused)))
489 {
490  fifo_channel * fc;
491  frameserver *fw;
492 
493  if(siginf->si_code == POLL_IN)
494  {
495  fc = &global_fifo_list->fifo_ch[FRAME_READY_FIFO_CH];
496 
497  fw = (frameserver *)fc->auxdata;
498 
499  fw->send_frame();
500 
501  while(read_fifo_channel(fc) > 0 && !TimeToDie); //We don't do anything with this, just clear it out.
502  }
503 }
504 
505 
506 int frame_ready(fifo_channel *fc)
507 {
508  frameserver *fw;
509 
510  fw = (frameserver *)fc->auxdata;
511 
512  if(!TimeToDie)
513  {
514  fw->send_frame();
515 
516  while(read_fifo_channel(fc) > 0); //We don't do anything with this, just clear it out.
517  }
518  return 0;
519 }
520 
521 } //namespace VisAO
522 
virtual int Run()
The application main loop, to be re-implemented in derived classes.
fifo_list * global_fifo_list
The global fifo_list, for signal handling.
Definition: dioserver.cpp:19
virtual int start_signal_catcher(bool inherit_sched=true)
Starts the signal catching loop.
Declarations for a class for serving frames over UDP.
int packetsPerFrame
No of UDP messages per frame.
Definition: frameserver.h:73
int read_fifo_channel(fifo_channel *fc)
Read data from the input fifo channel.
Definition: fifoutils.c:246
int send_frame()
Send the frame, packet by packet, as if we're a BCU 47.
std::ostringstream logss
Conveninence string stream for building log messages.
void * statusboard_shmemptr
The pointer to the shared memory block for the statusboard.
virtual void error_report(int LogLevel, std::string emsg)
Report an error. Also calls log_msg.
sharedim_stack_header * header
Convenient pointer to the stack header, has same value as shmemptr.
virtual int setup_baseApp(bool usethreads=false)
Install fifo channels.
sharedim_stackS * sis
Manages a VisAO shared memory image stack.
Definition: frameserver.h:61
fifo_channel * fifo_ch
An array of fifo_channels.
Definition: fifoutils.h:207
int setup_fifo_list(int nfifos)
Allocate the fifo_list.
a class for serving frames over UDP.
Definition: frameserver.h:39
key_t statusboard_shmemkey
The key used to lookup the shared memory.
virtual int block_sigio()
Sets the signal mask to block SIGIO and RTSIGIO.
int TimeToDie
Global set by SIGTERM.
int attach_shm(key_t mkey)
Attachess the shared memory. A Reader process should start here.
int frameSizeDw
Size of the frame in DWORD.
Definition: frameserver.h:72
fifo_list fl
The list of named-pipe fifos used for inter-process comms.
int behind
The number of frames currently behind.
Definition: frameserver.h:69
#define STATUS_frameserver47
Shared memory key for the ccd47 frameserver status board.
Definition: statusboard.h:47
sharedim< IMDATA_TYPE > get_image(int imno)
Gets an image, as a sharedim structure, with the imdata pointer properly set for the calling process...
int total_skipped
The total number of frames skipped.
Definition: frameserver.h:70
int create_statusboard(size_t sz)
Creates and attaches to the statusboard shared memory.
virtual int install_sig_mainthread_catcher()
Install the SIG_MAINTHREAD signal catcher.
int calc_packetsPerFrame(int frameSizeBytes, int bitsPerPix)
Calculate frameSizeDW and packetsPerFrame.
sharedimS sim
The sharedim structure retreived from the stack.
Definition: frameserver.h:67
The namespace of VisAO software.
int get_last_image()
Returns the value of last_image currently in the header.
#define RWBUFF_SZ
The size of the i/o buffer.
Definition: fifoutils.h:43
int set_fifo_list_channel(fifo_list *fl, int nch, int buffsz, const char *fin, const char *fout, int(*inp_hand)(fifo_channel *), void *adata)
Set the details of one channel in the list.
Definition: fifoutils.c:373
int get_max_n_images()
Returns the value of max_n_images currently in the header.