GNU Radio 3.6.4.2 C++ API
gr_block.h
Go to the documentation of this file.
1 /*
2  * Copyright 2012-2013 Free Software Foundation, Inc.
3  *
4  * This file is part of GNU Radio
5  *
6  * GNU Radio is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 3, or (at your option)
9  * any later version.
10  *
11  * GNU Radio is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with GNU Radio; see the file COPYING. If not, write to
18  * the Free Software Foundation, Inc., 51 Franklin Street,
19  * Boston, MA 02110-1301, USA.
20  */
21 
22 #ifndef INCLUDED_GNURADIO_GR_BLOCK_H
23 #define INCLUDED_GNURADIO_GR_BLOCK_H
24 
25 #include <gr_core_api.h>
26 #include <gras/block.hpp>
27 #include <gr_io_signature.h>
28 #include <gr_types.h>
29 #include <gr_tags.h>
30 #include <string>
31 #include <deque>
32 #include <map>
33 #include <boost/foreach.hpp>
34 #include <gruel/thread.h>
35 #include <gr_sptr_magic.h>
36 
37 struct GR_CORE_API gr_block : gras::Block
38 {
39 
40  gr_block(void);
41 
42  gr_block(
43  const std::string &name,
44  gr_io_signature_sptr input_signature,
45  gr_io_signature_sptr output_signature
46  );
47 
48  long unique_id(void) const{return _unique_id;}
49  std::string name(void) const{return _name;}
50  long _unique_id;
51  std::string _name;
52 
53  virtual ~gr_block(void);
54 
55  gr_io_signature_sptr input_signature(void) const;
56  gr_io_signature_sptr output_signature(void) const;
57 
58  void set_input_signature(gr_io_signature_sptr sig);
59  void set_output_signature(gr_io_signature_sptr sig);
60 
61  virtual bool check_topology(int ninputs, int noutputs);
62 
63  //! Overload me! I am the forecast
64  virtual void forecast(int, std::vector<int> &);
65 
66  //! Return options for the work call
67  enum
68  {
69  WORK_CALLED_PRODUCE = -2,
70  WORK_DONE = -1
71  };
72 
73  /*!
74  * \brief compute output items from input items
75  *
76  * \param noutput_items number of output items to write on each output stream
77  * \param ninput_items number of input items available on each input stream
78  * \param input_items vector of pointers to the input items, one entry per input stream
79  * \param output_items vector of pointers to the output items, one entry per output stream
80  *
81  * \returns number of items actually written to each output stream, or -1 on EOF.
82  * It is OK to return a value less than noutput_items. -1 <= return value <= noutput_items
83  *
84  * general_work must call consume or consume_each to indicate how many items
85  * were consumed on each input stream.
86  */
87  virtual int general_work(
88  int noutput_items,
89  gr_vector_int &ninput_items,
90  gr_vector_const_void_star &input_items,
91  gr_vector_void_star &output_items
92  );
93 
94  virtual bool start(void);
95  virtual bool stop(void);
96 
97  //! Call during work to consume items
98  void consume_each(const int how_many_items);
99 
100  void consume(const size_t i, const int how_many_items);
101 
102  void produce(const size_t o, const int how_many_items);
103 
104  //! Get absolute count of all items consumed on the given input port
105  uint64_t nitems_read(const size_t which_input = 0);
106 
107  //! Get absolute count of all items produced on the given output port
108  uint64_t nitems_written(const size_t which_output = 0);
109 
110  void add_item_tag(
111  const size_t which_output, const gr_tag_t &tag
112  );
113 
114  void add_item_tag(
115  const size_t which_output,
116  uint64_t abs_offset,
117  const pmt::pmt_t &key,
118  const pmt::pmt_t &value,
119  const pmt::pmt_t &srcid=pmt::PMT_F
120  );
121 
122  void get_tags_in_range(
123  std::vector<gr_tag_t> &tags,
124  const size_t which_input,
125  uint64_t abs_start,
126  uint64_t abs_end,
127  const pmt::pmt_t &key = pmt::pmt_t()
128  );
129 
130  void set_alignment(const size_t alignment);
131 
132  bool is_unaligned(void);
133 
134  size_t fixed_rate_noutput_to_ninput(const size_t noutput_items);
135 
136  size_t interpolation(void) const;
137 
138  void set_interpolation(const size_t);
139 
140  size_t decimation(void) const;
141 
142  void set_decimation(const size_t);
143 
144  int max_noutput_items(void) const;
145 
146  void set_max_noutput_items(int);
147 
148  void unset_max_noutput_items(void);
149 
150  bool is_set_max_noutput_items(void) const;
151 
152  /*******************************************************************
153  * Deal with input and output port configuration
154  ******************************************************************/
155 
156  unsigned history(void) const;
157 
158  void set_history(unsigned history);
159 
160  /*!
161  * Enable fixed rate logic.
162  * When enabled, relative rate is assumed to be set,
163  * and forecast is automatically called.
164  * Also, consume will be called automatically.
165  */
166  void set_fixed_rate(const bool fixed_rate);
167 
168  //! Get the fixed rate setting
169  bool fixed_rate(void) const;
170 
171  /*!
172  * The relative rate can be thought of as interpolation/decimation.
173  * In other words, relative rate is the ratio of output items to input items.
174  */
175  void set_relative_rate(const double relative_rate);
176 
177  //! Get the relative rate setting
178  double relative_rate(void) const;
179 
180  /*!
181  * The output multiple setting controls work output buffer sizes.
182  * Buffers will be number of items modulo rounted to the multiple.
183  */
184  void set_output_multiple(const size_t multiple);
185 
186  //! Get the output multiple setting
187  size_t output_multiple(void) const;
188 
189  /*******************************************************************
190  * Deal with tag handling and tag configuration
191  ******************************************************************/
192 
194  {
195  TPP_DONT = 0,
196  TPP_ALL_TO_ALL = 1,
197  TPP_ONE_TO_ONE = 2
198  };
199 
200  tag_propagation_policy_t tag_propagation_policy(void);
201 
202  void set_tag_propagation_policy(tag_propagation_policy_t p);
203 
204  ///////////// TODO //////////////////////
206  void set_max_output_buffer(int, long){}
207  long max_output_buffer(size_t){return 0;}
209  void set_min_output_buffer(int, long){}
210  long min_output_buffer(size_t){return 0;}
211 
212  ///////////// ALIAS stuff - is it used? //////////////////////
213  std::string d_symbol_alias;
214  std::string d_symbol_name;
215  std::string symbol_name() const { return d_symbol_name; }
216  bool alias_set() { return !d_symbol_alias.empty(); }
217  std::string alias(){ return alias_set()?d_symbol_alias:symbol_name(); }
218  pmt::pmt_t alias_pmt(){ return pmt::pmt_intern(alias()); }
219  void set_block_alias(std::string name){d_symbol_alias = name;}
220 
221  ///////////// MSG stuff not implemented //////////////////////
222  typedef std::deque<pmt::pmt_t> msg_queue_t;
223  typedef std::map<pmt::pmt_t, msg_queue_t, pmt::pmt_comperator> msg_queue_map_t;
224  typedef std::map<pmt::pmt_t, msg_queue_t, pmt::pmt_comperator>::iterator msg_queue_map_itr;
227 
228  typedef boost::function<void(pmt::pmt_t)> msg_handler_t;
229  typedef std::map<pmt::pmt_t , msg_handler_t, pmt::pmt_comperator> d_msg_handlers_t;
231 
232  template <typename T> void set_msg_handler(pmt::pmt_t which_port, T msg_handler){}
233 
236  void message_port_pub(pmt::pmt_t /*port_id*/, pmt::pmt_t /*msg*/){}
237  void message_port_sub(pmt::pmt_t /*port_id*/, pmt::pmt_t /*target*/){}
238  void message_port_unsub(pmt::pmt_t /*port_id*/, pmt::pmt_t /*target*/){}
239 
240  virtual bool message_port_is_hier(pmt::pmt_t port_id) { (void) port_id; /*std::cout << "is_hier\n";*/ return false; }
241  virtual bool message_port_is_hier_in(pmt::pmt_t port_id) { (void) port_id; /*std::cout << "is_hier_in\n";*/ return false; }
242  virtual bool message_port_is_hier_out(pmt::pmt_t port_id) { (void) port_id; /*std::cout << "is_hier_out\n";*/ return false; }
243 
244  /*!
245  * \brief Get input message port names.
246  *
247  * Returns the available input message ports for a block. The
248  * return object is a PMT vector that is filled with PMT symbols.
249  */
251 
252  /*!
253  * \brief Get output message port names.
254  *
255  * Returns the available output message ports for a block. The
256  * return object is a PMT vector that is filled with PMT symbols.
257  */
259 
260  //! is the queue empty?
261  bool empty_p(pmt::pmt_t which_port) {
262  if(msg_queue.find(which_port) == msg_queue.end())
263  throw std::runtime_error("port does not exist!");
264  return msg_queue[which_port].empty();
265  }
266  bool empty_p() {
267  bool rv = true;
268  BOOST_FOREACH(msg_queue_map_t::value_type &i, msg_queue){ rv &= msg_queue[i.first].empty(); }
269  return rv;
270  }
271 
272  //| Acquires and release the mutex
273  void insert_tail( pmt::pmt_t /*which_port*/, pmt::pmt_t /*msg*/){}
274  /*!
275  * \returns returns pmt at head of queue or pmt_t() if empty.
276  */
278 
279  /*!
280  * \returns returns pmt at head of queue or pmt_t() if empty.
281  */
283 
284  msg_queue_t::iterator get_iterator(pmt::pmt_t which_port){
285  return msg_queue[which_port].begin();
286  }
287 
288  void erase_msg(pmt::pmt_t which_port, msg_queue_t::iterator it){
289  msg_queue[which_port].erase(it);
290  }
291 
292  virtual bool has_msg_port(pmt::pmt_t which_port){
293  if(msg_queue.find(which_port) != msg_queue.end()){
294  return true;
295  }
296  if(pmt::pmt_dict_has_key(message_subscribers, which_port)){
297  return true;
298  }
299  return false;
300  }
301 
302  /*!
303  * \brief Tests if there is a handler attached to port \p which_port
304  */
305  bool has_msg_handler(pmt::pmt_t which_port) {
306  return (d_msg_handlers.find(which_port) != d_msg_handlers.end());
307  }
308 
309  /*
310  * This function is called by the runtime system to dispatch messages.
311  *
312  * The thread-safety guarantees mentioned in set_msg_handler are implemented
313  * by the callers of this method.
314  */
315  virtual void dispatch_msg(pmt::pmt_t which_port, pmt::pmt_t msg)
316  {
317  // AA Update this
318  if(has_msg_handler(which_port)) { // Is there a handler?
319  d_msg_handlers[which_port](msg); // Yes, invoke it.
320  }
321  }
322 
323  /*! Used by block's setters and work functions to make
324  * setting/resetting of parameters thread-safe.
325  *
326  * Used by calling gruel::scoped_lock l(d_setlock);
327  */
329 
330  // ----------------------------------------------------------------------------
331  // Functions to handle thread affinity
332  std::vector<int> d_affinity; // thread affinity proc. mask
333 
334  /*!
335  * \brief Set the thread's affinity to processor core \p n.
336  *
337  * \param mask a vector of unsigned ints of the core numbers available to this block.
338  */
339  void set_processor_affinity(const std::vector<int> &mask){d_affinity=mask;}
340 
341  /*!
342  * \brief Remove processor affinity to a specific core.
343  */
345 
346  /*!
347  * \brief Get the current processor affinity.
348  */
349  std::vector<int> processor_affinity() { return d_affinity; }
350 
351  ///////////////// private vars //////////////////////
352 
353  gr_vector_int _work_ninput_items;
354  gr_vector_int _fcast_ninput_items;
355  size_t _num_outputs;
356  ptrdiff_t _work_io_ptr_mask;
362  size_t _interp, _decim;
364 
365  ///////////////// the Block overloads //////////////////////
366 
367  //! implements work -> calls general work
368  void work(const InputItems &, const OutputItems &);
369 
370  //! notifications of new topological commits
371  void notify_topology(const size_t, const size_t);
372 
373  //! start notification
374  void notify_active(void);
375 
376  //! stop notification
377  void notify_inactive(void);
378 
379  //! implements tag_propagation_policy()
380  virtual void propagate_tags(const size_t, const gras::TagIter &);
381 
382  void _update_input_reserve(void);
383 
384  gras::BufferQueueSptr input_buffer_allocator(const size_t, const gras::SBufferConfig &);
385  gras::BufferQueueSptr output_buffer_allocator(const size_t, const gras::SBufferConfig &);
386 
387 };
388 
390 
391 GRAS_FORCE_INLINE void gr_block::consume_each(const int how_many_items)
392 {
393  if GRAS_UNLIKELY(how_many_items < 0) return;
394  gras::Block::consume(size_t(how_many_items));
395 }
396 
397 GRAS_FORCE_INLINE void gr_block::consume(const size_t i, const int how_many_items)
398 {
399  if GRAS_UNLIKELY(how_many_items < 0) return;
400  gras::Block::consume(i, size_t(how_many_items));
401 }
402 
403 GRAS_FORCE_INLINE void gr_block::produce(const size_t o, const int how_many_items)
404 {
405  if GRAS_UNLIKELY(how_many_items < 0) return;
406  gras::Block::produce(o, size_t(how_many_items));
407 }
408 
409 GRAS_FORCE_INLINE uint64_t gr_block::nitems_read(const size_t which_input)
410 {
411  return Block::get_consumed(which_input);
412 }
413 
414 GRAS_FORCE_INLINE uint64_t gr_block::nitems_written(const size_t which_output)
415 {
416  return Block::get_produced(which_output);
417 }
418 
419 GRAS_FORCE_INLINE size_t gr_block::interpolation(void) const
420 {
421  return _interp;
422 }
423 
424 GRAS_FORCE_INLINE size_t gr_block::decimation(void) const
425 {
426  return _decim;
427 }
428 
429 GRAS_FORCE_INLINE bool gr_block::is_unaligned(void)
430 {
431  //TODO
432  //probably dont need this since volk dispatcher checks alignment
433  //32 byte aligned is good enough for you
434  return (_work_io_ptr_mask & ptrdiff_t(GRAS_MAX_ALIGNMENT-1)) != 0;
435 }
436 
437 #endif /*INCLUDED_GNURADIO_GR_BLOCK_H*/
GRUEL_API pmt_t pmt_intern(const std::string &s)
Alias for pmt_string_to_symbol.
bool _enable_fixed_rate
Definition: gr_block.h:359
void message_port_register_out(pmt::pmt_t)
Definition: gr_block.h:235
pmt::pmt_t alias_pmt()
Definition: gr_block.h:218
long min_output_buffer(size_t)
Definition: gr_block.h:210
bool has_msg_handler(pmt::pmt_t which_port)
Tests if there is a handler attached to port which_port.
Definition: gr_block.h:305
size_t _input_history_items
Definition: gr_block.h:360
pmt::pmt_t message_ports_out()
Get output message port names.
Definition: gr_block.h:258
tag_propagation_policy_t
Definition: gr_block.h:193
d_msg_handlers_t d_msg_handlers
Definition: gr_block.h:230
std::map< pmt::pmt_t, msg_handler_t, pmt::pmt_comperator > d_msg_handlers_t
Definition: gr_block.h:229
bool empty_p()
Definition: gr_block.h:266
size_t _decim
Definition: gr_block.h:362
pmt::pmt_t delete_head_blocking(pmt::pmt_t)
Definition: gr_block.h:282
GRUEL_API const pmt_t PMT_F
bool is_unaligned(void)
Definition: gr_block.h:429
boost::mutex mutex
Definition: thread.h:44
uint64_t nitems_read(const size_t which_input=0)
Get absolute count of all items consumed on the given input port.
Definition: gr_block.h:409
size_t decimation(void) const
Definition: gr_block.h:424
std::vector< int > processor_affinity()
Get the current processor affinity.
Definition: gr_block.h:349
Definition: gr_tags.h:28
void set_min_output_buffer(int, long)
Definition: gr_block.h:209
long unique_id(void) const
Definition: gr_block.h:48
double _relative_rate
Definition: gr_block.h:358
void set_min_output_buffer(long)
Definition: gr_block.h:208
long max_output_buffer(size_t)
Definition: gr_block.h:207
void insert_tail(pmt::pmt_t, pmt::pmt_t)
Definition: gr_block.h:273
Definition: gr_block.h:37
void consume_each(const int how_many_items)
Call during work to consume items.
Definition: gr_block.h:391
void message_port_sub(pmt::pmt_t, pmt::pmt_t)
Definition: gr_block.h:237
void set_max_output_buffer(int, long)
Definition: gr_block.h:206
gr_vector_int _work_ninput_items
Definition: gr_block.h:353
pmt::pmt_t message_ports_in()
Get input message port names.
Definition: gr_block.h:250
gruel::mutex d_setlock
Definition: gr_block.h:328
#define GR_CORE_API
Definition: gr_core_api.h:30
virtual bool message_port_is_hier_out(pmt::pmt_t port_id)
Definition: gr_block.h:242
size_t interpolation(void) const
Definition: gr_block.h:419
void message_port_register_in(pmt::pmt_t)
Definition: gr_block.h:234
size_t _output_multiple_items
Definition: gr_block.h:357
std::string _name
Definition: gr_block.h:51
pmt::pmt_t delete_head_nowait(pmt::pmt_t)
Definition: gr_block.h:277
void set_block_alias(std::string name)
Definition: gr_block.h:219
std::deque< pmt::pmt_t > msg_queue_t
Definition: gr_block.h:222
unsigned __int64 uint64_t
Definition: stdint.h:90
std::string d_symbol_alias
Definition: gr_block.h:213
GRUEL_API bool pmt_dict_has_key(const pmt_t &dict, const pmt_t &key)
Return true if key exists in dict.
void consume(const size_t i, const int how_many_items)
Definition: gr_block.h:397
void set_processor_affinity(const std::vector< int > &mask)
Set the thread's affinity to processor core n.
Definition: gr_block.h:339
std::map< pmt::pmt_t, msg_queue_t, pmt::pmt_comperator >::iterator msg_queue_map_itr
Definition: gr_block.h:224
tag_propagation_policy_t _tag_prop_policy
Definition: gr_block.h:361
std::string d_symbol_name
Definition: gr_block.h:214
boost::function< void(pmt::pmt_t)> msg_handler_t
Definition: gr_block.h:228
VOLK_API void(kern.name) _manual($kern.arglist_full
Call into a specific implementation given by name.
void produce(const size_t o, const int how_many_items)
Definition: gr_block.h:403
void set_msg_handler(pmt::pmt_t which_port, T msg_handler)
Definition: gr_block.h:232
std::map< pmt::pmt_t, msg_queue_t, pmt::pmt_comperator > msg_queue_map_t
Definition: gr_block.h:223
virtual void dispatch_msg(pmt::pmt_t which_port, pmt::pmt_t msg)
Definition: gr_block.h:315
size_t _interp
Definition: gr_block.h:362
size_t _num_outputs
Definition: gr_block.h:355
std::string alias()
Definition: gr_block.h:217
GRUEL_API const pmt_t PMT_NIL
msg_queue_t::iterator get_iterator(pmt::pmt_t which_port)
Definition: gr_block.h:284
pmt::pmt_t message_subscribers
Definition: gr_block.h:226
VOLK_API $kern pname $kern name
A function pointer to the dispatcher implementation.
gr_io_signature_sptr _out_sig
Definition: gr_block.h:363
virtual bool message_port_is_hier(pmt::pmt_t port_id)
Definition: gr_block.h:240
virtual bool has_msg_port(pmt::pmt_t which_port)
Definition: gr_block.h:292
uint64_t nitems_written(const size_t which_output=0)
Get absolute count of all items produced on the given output port.
Definition: gr_block.h:414
void message_port_unsub(pmt::pmt_t, pmt::pmt_t)
Definition: gr_block.h:238
ptrdiff_t _work_io_ptr_mask
Definition: gr_block.h:356
bool empty_p(pmt::pmt_t which_port)
is the queue empty?
Definition: gr_block.h:261
boost::intrusive_ptr< pmt_base > pmt_t
typedef for shared pointer (transparent reference counting). See http://www.boost.org/libs/smart_ptr/smart_ptr.htm
Definition: pmt.h:54
gr_vector_int _fcast_ninput_items
Definition: gr_block.h:354
std::string symbol_name() const
Definition: gr_block.h:215
void set_max_output_buffer(long)
Definition: gr_block.h:205
msg_queue_map_t msg_queue
Definition: gr_block.h:225
virtual bool message_port_is_hier_in(pmt::pmt_t port_id)
Definition: gr_block.h:241
void erase_msg(pmt::pmt_t which_port, msg_queue_t::iterator it)
Definition: gr_block.h:288
void unset_processor_affinity()
Remove processor affinity to a specific core.
Definition: gr_block.h:344
long _unique_id
Definition: gr_block.h:50
void message_port_pub(pmt::pmt_t, pmt::pmt_t)
Definition: gr_block.h:236
bool alias_set()
Definition: gr_block.h:216
std::string name(void) const
Definition: gr_block.h:49
std::vector< int > d_affinity
Definition: gr_block.h:332