GNU Radio 3.6.4.2 C++ API
gr_block.h
Go to the documentation of this file.
00001 /*
00002  * Copyright 2012-2013 Free Software Foundation, Inc.
00003  *
00004  * This file is part of GNU Radio
00005  *
00006  * GNU Radio is free software; you can redistribute it and/or modify
00007  * it under the terms of the GNU General Public License as published by
00008  * the Free Software Foundation; either version 3, or (at your option)
00009  * any later version.
00010  *
00011  * GNU Radio is distributed in the hope that it will be useful,
00012  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00013  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00014  * GNU General Public License for more details.
00015  *
00016  * You should have received a copy of the GNU General Public License
00017  * along with GNU Radio; see the file COPYING.  If not, write to
00018  * the Free Software Foundation, Inc., 51 Franklin Street,
00019  * Boston, MA 02110-1301, USA.
00020  */
00021 
00022 #ifndef INCLUDED_GNURADIO_GR_BLOCK_H
00023 #define INCLUDED_GNURADIO_GR_BLOCK_H
00024 
00025 #include <gr_core_api.h>
00026 #include <gras/block.hpp>
00027 #include <gr_io_signature.h>
00028 #include <gr_types.h>
00029 #include <gr_tags.h>
00030 #include <string>
00031 #include <deque>
00032 #include <map>
00033 #include <boost/foreach.hpp>
00034 #include <gruel/thread.h>
00035 #include <gr_sptr_magic.h>
00036 
00037 struct GR_CORE_API gr_block : gras::Block
00038 {
00039 
00040     gr_block(void);
00041 
00042     gr_block(
00043         const std::string &name,
00044         gr_io_signature_sptr input_signature,
00045         gr_io_signature_sptr output_signature
00046     );
00047 
00048     long unique_id(void) const{return _unique_id;}
00049     std::string name(void) const{return _name;}
00050     long _unique_id;
00051     std::string _name;
00052 
00053     virtual ~gr_block(void);
00054 
00055     gr_io_signature_sptr input_signature(void) const;
00056     gr_io_signature_sptr output_signature(void) const;
00057 
00058     void set_input_signature(gr_io_signature_sptr sig);
00059     void set_output_signature(gr_io_signature_sptr sig);
00060 
00061     virtual bool check_topology(int ninputs, int noutputs);
00062 
00063     //! Overload me! I am the forecast
00064     virtual void forecast(int, std::vector<int> &);
00065 
00066     //! Return options for the work call
00067     enum
00068     {
00069         WORK_CALLED_PRODUCE = -2,
00070         WORK_DONE = -1
00071     };
00072 
00073     /*!
00074     * \brief compute output items from input items
00075     *
00076     * \param noutput_items      number of output items to write on each output stream
00077     * \param ninput_items       number of input items available on each input stream
00078     * \param input_items                vector of pointers to the input items, one entry per input stream
00079     * \param output_items       vector of pointers to the output items, one entry per output stream
00080     *
00081     * \returns number of items actually written to each output stream, or -1 on EOF.
00082     * It is OK to return a value less than noutput_items.  -1 <= return value <= noutput_items
00083     *
00084     * general_work must call consume or consume_each to indicate how many items
00085     * were consumed on each input stream.
00086     */
00087     virtual int general_work(
00088         int noutput_items,
00089         gr_vector_int &ninput_items,
00090         gr_vector_const_void_star &input_items,
00091         gr_vector_void_star &output_items
00092     );
00093 
00094     virtual bool start(void);
00095     virtual bool stop(void);
00096 
00097     //! Call during work to consume items
00098     void consume_each(const int how_many_items);
00099 
00100     void consume(const size_t i, const int how_many_items);
00101 
00102     void produce(const size_t o, const int how_many_items);
00103 
00104     //! Get absolute count of all items consumed on the given input port
00105     uint64_t nitems_read(const size_t which_input = 0);
00106 
00107     //! Get absolute count of all items produced on the given output port
00108     uint64_t nitems_written(const size_t which_output = 0);
00109 
00110     void add_item_tag(
00111         const size_t which_output, const gr_tag_t &tag
00112     );
00113 
00114     void add_item_tag(
00115         const size_t which_output,
00116         uint64_t abs_offset,
00117         const pmt::pmt_t &key,
00118         const pmt::pmt_t &value,
00119         const pmt::pmt_t &srcid=pmt::PMT_F
00120     );
00121 
00122     void get_tags_in_range(
00123         std::vector<gr_tag_t> &tags,
00124         const size_t which_input,
00125         uint64_t abs_start,
00126         uint64_t abs_end,
00127         const pmt::pmt_t &key = pmt::pmt_t()
00128     );
00129 
00130     void set_alignment(const size_t alignment);
00131 
00132     bool is_unaligned(void);
00133 
00134     size_t fixed_rate_noutput_to_ninput(const size_t noutput_items);
00135 
00136     size_t interpolation(void) const;
00137 
00138     void set_interpolation(const size_t);
00139 
00140     size_t decimation(void) const;
00141 
00142     void set_decimation(const size_t);
00143 
00144     int max_noutput_items(void) const;
00145 
00146     void set_max_noutput_items(int);
00147 
00148     void unset_max_noutput_items(void);
00149 
00150     bool is_set_max_noutput_items(void) const;
00151 
00152     /*******************************************************************
00153      * Deal with input and output port configuration
00154      ******************************************************************/
00155 
00156     unsigned history(void) const;
00157 
00158     void set_history(unsigned history);
00159 
00160     /*!
00161      * Enable fixed rate logic.
00162      * When enabled, relative rate is assumed to be set,
00163      * and forecast is automatically called.
00164      * Also, consume will be called automatically.
00165      */
00166     void set_fixed_rate(const bool fixed_rate);
00167 
00168     //! Get the fixed rate setting
00169     bool fixed_rate(void) const;
00170 
00171     /*!
00172      * The relative rate can be thought of as interpolation/decimation.
00173      * In other words, relative rate is the ratio of output items to input items.
00174      */
00175     void set_relative_rate(const double relative_rate);
00176 
00177     //! Get the relative rate setting
00178     double relative_rate(void) const;
00179 
00180     /*!
00181      * The output multiple setting controls work output buffer sizes.
00182      * Buffers will be number of items modulo rounted to the multiple.
00183      */
00184     void set_output_multiple(const size_t multiple);
00185 
00186     //! Get the output multiple setting
00187     size_t output_multiple(void) const;
00188 
00189     /*******************************************************************
00190      * Deal with tag handling and tag configuration
00191      ******************************************************************/
00192 
00193     enum tag_propagation_policy_t
00194     {
00195         TPP_DONT = 0,
00196         TPP_ALL_TO_ALL = 1,
00197         TPP_ONE_TO_ONE = 2
00198     };
00199 
00200     tag_propagation_policy_t tag_propagation_policy(void);
00201 
00202     void set_tag_propagation_policy(tag_propagation_policy_t p);
00203 
00204     ///////////// TODO //////////////////////
00205     void set_max_output_buffer(long){}
00206     void set_max_output_buffer(int, long){}
00207     long max_output_buffer(size_t){return 0;}
00208     void set_min_output_buffer(long){}
00209     void set_min_output_buffer(int, long){}
00210     long min_output_buffer(size_t){return 0;}
00211 
00212     ///////////// ALIAS stuff - is it used? //////////////////////
00213     std::string          d_symbol_alias;
00214     std::string          d_symbol_name;
00215     std::string symbol_name() const { return d_symbol_name; }
00216     bool alias_set() { return !d_symbol_alias.empty(); }
00217     std::string alias(){ return alias_set()?d_symbol_alias:symbol_name(); }
00218     pmt::pmt_t alias_pmt(){ return pmt::pmt_intern(alias()); }
00219     void set_block_alias(std::string name){d_symbol_alias = name;}
00220 
00221     ///////////// MSG stuff not implemented //////////////////////
00222     typedef std::deque<pmt::pmt_t>    msg_queue_t;
00223     typedef std::map<pmt::pmt_t, msg_queue_t, pmt::pmt_comperator>    msg_queue_map_t;
00224     typedef std::map<pmt::pmt_t, msg_queue_t, pmt::pmt_comperator>::iterator msg_queue_map_itr;
00225     msg_queue_map_t msg_queue;
00226     pmt::pmt_t message_subscribers;
00227 
00228   typedef boost::function<void(pmt::pmt_t)> msg_handler_t;
00229   typedef std::map<pmt::pmt_t , msg_handler_t, pmt::pmt_comperator> d_msg_handlers_t;
00230   d_msg_handlers_t d_msg_handlers;
00231 
00232     template <typename T> void set_msg_handler(pmt::pmt_t which_port, T msg_handler){}
00233 
00234     void message_port_register_in(pmt::pmt_t /*port_id*/){}
00235     void message_port_register_out(pmt::pmt_t /*port_id*/){}
00236     void message_port_pub(pmt::pmt_t /*port_id*/, pmt::pmt_t /*msg*/){}
00237     void message_port_sub(pmt::pmt_t /*port_id*/, pmt::pmt_t /*target*/){}
00238     void message_port_unsub(pmt::pmt_t /*port_id*/, pmt::pmt_t /*target*/){}
00239 
00240     virtual bool message_port_is_hier(pmt::pmt_t port_id) { (void) port_id; /*std::cout << "is_hier\n";*/ return false; }
00241     virtual bool message_port_is_hier_in(pmt::pmt_t port_id) { (void) port_id; /*std::cout << "is_hier_in\n";*/ return false; }
00242     virtual bool message_port_is_hier_out(pmt::pmt_t port_id) { (void) port_id; /*std::cout << "is_hier_out\n";*/ return false; }
00243 
00244     /*!
00245     * \brief Get input message port names.
00246     *
00247     * Returns the available input message ports for a block. The
00248     * return object is a PMT vector that is filled with PMT symbols.
00249     */
00250     pmt::pmt_t message_ports_in(){return pmt::PMT_NIL;}
00251 
00252     /*!
00253     * \brief Get output message port names.
00254     *
00255     * Returns the available output message ports for a block. The
00256     * return object is a PMT vector that is filled with PMT symbols.
00257     */
00258     pmt::pmt_t message_ports_out(){return pmt::PMT_NIL;}
00259 
00260     //! is the queue empty?
00261     bool empty_p(pmt::pmt_t which_port) { 
00262         if(msg_queue.find(which_port) == msg_queue.end())
00263           throw std::runtime_error("port does not exist!");
00264         return msg_queue[which_port].empty();
00265     }
00266     bool empty_p() { 
00267         bool rv = true;
00268         BOOST_FOREACH(msg_queue_map_t::value_type &i, msg_queue){ rv &= msg_queue[i.first].empty(); }
00269         return rv;
00270     }
00271 
00272     //| Acquires and release the mutex
00273     void insert_tail( pmt::pmt_t /*which_port*/, pmt::pmt_t /*msg*/){}
00274     /*!
00275     * \returns returns pmt at head of queue or pmt_t() if empty.
00276     */
00277     pmt::pmt_t delete_head_nowait( pmt::pmt_t /*which_port*/){return pmt::PMT_NIL;}
00278 
00279     /*!
00280     * \returns returns pmt at head of queue or pmt_t() if empty.
00281     */
00282     pmt::pmt_t delete_head_blocking( pmt::pmt_t /*which_port*/){return pmt::PMT_NIL;}
00283 
00284     msg_queue_t::iterator get_iterator(pmt::pmt_t which_port){
00285         return msg_queue[which_port].begin();
00286     }
00287 
00288     void erase_msg(pmt::pmt_t which_port, msg_queue_t::iterator it){
00289         msg_queue[which_port].erase(it);
00290     }
00291 
00292     virtual bool has_msg_port(pmt::pmt_t which_port){
00293         if(msg_queue.find(which_port) != msg_queue.end()){
00294             return true;
00295         }
00296         if(pmt::pmt_dict_has_key(message_subscribers, which_port)){
00297             return true;
00298         }
00299         return false;
00300     }
00301 
00302   /*!
00303    * \brief Tests if there is a handler attached to port \p which_port
00304    */
00305    bool has_msg_handler(pmt::pmt_t which_port) {
00306      return (d_msg_handlers.find(which_port) != d_msg_handlers.end());
00307    }
00308 
00309   /*
00310    * This function is called by the runtime system to dispatch messages.
00311    *
00312    * The thread-safety guarantees mentioned in set_msg_handler are implemented
00313    * by the callers of this method.
00314    */
00315   virtual void dispatch_msg(pmt::pmt_t which_port, pmt::pmt_t msg)
00316   {
00317     // AA Update this
00318     if(has_msg_handler(which_port)) {  // Is there a handler?
00319       d_msg_handlers[which_port](msg); // Yes, invoke it.
00320     }
00321   }
00322 
00323   /*! Used by block's setters and work functions to make
00324    * setting/resetting of parameters thread-safe.
00325    *
00326    * Used by calling gruel::scoped_lock l(d_setlock);
00327    */ 
00328   gruel::mutex d_setlock;
00329 
00330   // ----------------------------------------------------------------------------
00331   // Functions to handle thread affinity
00332   std::vector<int> d_affinity;              // thread affinity proc. mask
00333 
00334   /*!
00335    * \brief Set the thread's affinity to processor core \p n.
00336    *
00337    * \param mask a vector of unsigned ints of the core numbers available to this block.
00338    */
00339   void set_processor_affinity(const std::vector<int> &mask){d_affinity=mask;}
00340 
00341   /*!
00342    * \brief Remove processor affinity to a specific core.
00343    */
00344   void unset_processor_affinity(){}
00345 
00346   /*!
00347    * \brief Get the current processor affinity.
00348    */
00349   std::vector<int> processor_affinity() { return d_affinity; }
00350 
00351     ///////////////// private vars //////////////////////
00352 
00353     gr_vector_int _work_ninput_items;
00354     gr_vector_int _fcast_ninput_items;
00355     size_t _num_outputs;
00356     ptrdiff_t _work_io_ptr_mask;
00357     size_t _output_multiple_items;
00358     double _relative_rate;
00359     bool _enable_fixed_rate;
00360     size_t _input_history_items;
00361     tag_propagation_policy_t _tag_prop_policy;
00362     size_t _interp, _decim;
00363     gr_io_signature_sptr _in_sig, _out_sig;
00364 
00365     ///////////////// the Block overloads //////////////////////
00366 
00367     //! implements work -> calls general work
00368     void work(const InputItems &, const OutputItems &);
00369 
00370     //! notifications of new topological commits
00371     void notify_topology(const size_t, const size_t);
00372 
00373     //! start notification
00374     void notify_active(void);
00375 
00376     //! stop notification
00377     void notify_inactive(void);
00378 
00379     //! implements tag_propagation_policy()
00380     virtual void propagate_tags(const size_t, const gras::TagIter &);
00381 
00382     void _update_input_reserve(void);
00383 
00384     gras::BufferQueueSptr input_buffer_allocator(const size_t, const gras::SBufferConfig &);
00385     gras::BufferQueueSptr output_buffer_allocator(const size_t, const gras::SBufferConfig &);
00386 
00387 };
00388 
00389 typedef boost::shared_ptr<gr_block> gr_block_sptr;
00390 
00391 GRAS_FORCE_INLINE void gr_block::consume_each(const int how_many_items)
00392 {
00393     if GRAS_UNLIKELY(how_many_items < 0) return;
00394     gras::Block::consume(size_t(how_many_items));
00395 }
00396 
00397 GRAS_FORCE_INLINE void gr_block::consume(const size_t i, const int how_many_items)
00398 {
00399     if GRAS_UNLIKELY(how_many_items < 0) return;
00400     gras::Block::consume(i, size_t(how_many_items));
00401 }
00402 
00403 GRAS_FORCE_INLINE void gr_block::produce(const size_t o, const int how_many_items)
00404 {
00405     if GRAS_UNLIKELY(how_many_items < 0) return;
00406     gras::Block::produce(o, size_t(how_many_items));
00407 }
00408 
00409 GRAS_FORCE_INLINE uint64_t gr_block::nitems_read(const size_t which_input)
00410 {
00411     return Block::get_consumed(which_input);
00412 }
00413 
00414 GRAS_FORCE_INLINE uint64_t gr_block::nitems_written(const size_t which_output)
00415 {
00416     return Block::get_produced(which_output);
00417 }
00418 
00419 GRAS_FORCE_INLINE size_t gr_block::interpolation(void) const
00420 {
00421     return _interp;
00422 }
00423 
00424 GRAS_FORCE_INLINE size_t gr_block::decimation(void) const
00425 {
00426     return _decim;
00427 }
00428 
00429 GRAS_FORCE_INLINE bool gr_block::is_unaligned(void)
00430 {
00431     //TODO
00432     //probably dont need this since volk dispatcher checks alignment
00433     //32 byte aligned is good enough for you
00434     return (_work_io_ptr_mask & ptrdiff_t(GRAS_MAX_ALIGNMENT-1)) != 0;
00435 }
00436 
00437 #endif /*INCLUDED_GNURADIO_GR_BLOCK_H*/