A Discrete-Event Network Simulator
API
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends Macros Groups Pages
distributed-simulator-impl.cc
Go to the documentation of this file.
1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2 /*
3  * This program is free software; you can redistribute it and/or modify
4  * it under the terms of the GNU General Public License version 2 as
5  * published by the Free Software Foundation;
6  *
7  * This program is distributed in the hope that it will be useful,
8  * but WITHOUT ANY WARRANTY; without even the implied warranty of
9  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  * GNU General Public License for more details.
11  *
12  * You should have received a copy of the GNU General Public License
13  * along with this program; if not, write to the Free Software
14  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
15  *
16  * Author: George Riley <riley@ece.gatech.edu>
17  *
18  */
19 
22 #include "mpi-interface.h"
23 
24 #include "ns3/simulator.h"
25 #include "ns3/scheduler.h"
26 #include "ns3/event-impl.h"
27 #include "ns3/channel.h"
28 #include "ns3/node-container.h"
29 #include "ns3/ptr.h"
30 #include "ns3/pointer.h"
31 #include "ns3/assert.h"
32 #include "ns3/log.h"
33 
34 #include <cmath>
35 
36 #ifdef NS3_MPI
37 #include <mpi.h>
38 #endif
39 
40 namespace ns3 {
41 
42 NS_LOG_COMPONENT_DEFINE ("DistributedSimulatorImpl");
43 
44 NS_OBJECT_ENSURE_REGISTERED (DistributedSimulatorImpl);
45 
47 {
48 }
49 
50 Time
52 {
53  return m_smallestTime;
54 }
55 
56 uint32_t
58 {
59  return m_txCount;
60 }
61 
62 uint32_t
64 {
65  return m_rxCount;
66 }
67 uint32_t
69 {
70  return m_myId;
71 }
72 
73 bool
75 {
76  return m_isFinished;
77 }
78 
80 
81 TypeId
83 {
84  static TypeId tid = TypeId ("ns3::DistributedSimulatorImpl")
86  .SetGroupName ("Mpi")
87  .AddConstructor<DistributedSimulatorImpl> ()
88  ;
89  return tid;
90 }
91 
93 {
94  NS_LOG_FUNCTION (this);
95 
96 #ifdef NS3_MPI
99 
100  // Allocate the LBTS message buffer
102  m_grantedTime = Seconds (0);
103 #else
105  NS_FATAL_ERROR ("Can't use distributed simulator without MPI compiled in");
106 #endif
107 
108  m_stop = false;
109  m_globalFinished = false;
110  // uids are allocated from 4.
111  // uid 0 is "invalid" events
112  // uid 1 is "now" events
113  // uid 2 is "destroy" events
114  m_uid = 4;
115  // before ::Run is entered, the m_currentUid will be zero
116  m_currentUid = 0;
117  m_currentTs = 0;
118  m_currentContext = 0xffffffff;
120  m_events = 0;
121 }
122 
124 {
125  NS_LOG_FUNCTION (this);
126 }
127 
128 void
130 {
131  NS_LOG_FUNCTION (this);
132 
133  while (!m_events->IsEmpty ())
134  {
135  Scheduler::Event next = m_events->RemoveNext ();
136  next.impl->Unref ();
137  }
138  m_events = 0;
139  delete [] m_pLBTS;
141 }
142 
143 void
145 {
146  NS_LOG_FUNCTION (this);
147 
148  while (!m_destroyEvents.empty ())
149  {
150  Ptr<EventImpl> ev = m_destroyEvents.front ().PeekEventImpl ();
151  m_destroyEvents.pop_front ();
152  NS_LOG_LOGIC ("handle destroy " << ev);
153  if (!ev->IsCancelled ())
154  {
155  ev->Invoke ();
156  }
157  }
158 
160 }
161 
162 
163 void
165 {
166  NS_LOG_FUNCTION (this);
167 
168 #ifdef NS3_MPI
169  if (MpiInterface::GetSize () <= 1)
170  {
171  m_lookAhead = Seconds (0);
172  }
173  else
174  {
175  if (m_lookAhead == Seconds (-1))
176  {
178  }
179  // else it was already set by SetLookAhead
180 
182  for (NodeContainer::Iterator iter = c.Begin (); iter != c.End (); ++iter)
183  {
184  if ((*iter)->GetSystemId () != MpiInterface::GetSystemId ())
185  {
186  continue;
187  }
188 
189  for (uint32_t i = 0; i < (*iter)->GetNDevices (); ++i)
190  {
191  Ptr<NetDevice> localNetDevice = (*iter)->GetDevice (i);
192  // only works for p2p links currently
193  if (!localNetDevice->IsPointToPoint ())
194  {
195  continue;
196  }
197  Ptr<Channel> channel = localNetDevice->GetChannel ();
198  if (channel == 0)
199  {
200  continue;
201  }
202 
203  // grab the adjacent node
204  Ptr<Node> remoteNode;
205  if (channel->GetDevice (0) == localNetDevice)
206  {
207  remoteNode = (channel->GetDevice (1))->GetNode ();
208  }
209  else
210  {
211  remoteNode = (channel->GetDevice (0))->GetNode ();
212  }
213 
214  // if it's not remote, don't consider it
215  if (remoteNode->GetSystemId () == MpiInterface::GetSystemId ())
216  {
217  continue;
218  }
219 
220  // compare delay on the channel with current value of
221  // m_lookAhead. if delay on channel is smaller, make
222  // it the new lookAhead.
223  TimeValue delay;
224  channel->GetAttribute ("Delay", delay);
225 
226  if (delay.Get () < m_lookAhead)
227  {
228  m_lookAhead = delay.Get ();
229  }
230  }
231  }
232  }
233 
234  // m_lookAhead is now set
236 
237  /*
238  * Compute the maximum inter-task latency and use that value
239  * for tasks with no inter-task links.
240  *
241  * Special processing for edge cases. For tasks that have no
242  * nodes need to determine a reasonable lookAhead value. Infinity
243  * would work correctly but introduces a performance issue; tasks
244  * with an infinite lookAhead would execute all their events
245  * before doing an AllGather resulting in very bad load balance
246  * during the first time window. Since all tasks participate in
247  * the AllGather it is desirable to have all the tasks advance in
248  * simulation time at a similar rate assuming roughly equal events
249  * per unit of simulation time in order to equalize the amount of
250  * work per time window.
251  */
252  long sendbuf;
253  long recvbuf;
254 
255  /* Tasks with no inter-task links do not contribute to max */
257  {
258  sendbuf = 0;
259  }
260  else
261  {
262  sendbuf = m_lookAhead.GetInteger ();
263  }
264 
265  MPI_Allreduce (&sendbuf, &recvbuf, 1, MPI_LONG, MPI_MAX, MPI_COMM_WORLD);
266 
267  /* For nodes that did not compute a lookahead use max from ranks
268  * that did compute a value. An edge case occurs if all nodes have
269  * no inter-task links (max will be 0 in this case). Use infinity so all tasks
270  * will proceed without synchronization until a single AllGather
271  * occurs when all tasks have finished.
272  */
273  if (m_lookAhead == GetMaximumSimulationTime () && recvbuf != 0)
274  {
275  m_lookAhead = Time (recvbuf);
277  }
278 
279 #else
280  NS_FATAL_ERROR ("Can't use distributed simulator without MPI compiled in");
281 #endif
282 }
283 
284 void
286 {
287  if (lookAhead > 0)
288  {
289  NS_LOG_FUNCTION (this << lookAhead);
290  m_lookAhead = lookAhead;
291  }
292  else
293  {
294  NS_LOG_WARN ("attempted to set look ahead negative: " << lookAhead);
295  }
296 }
297 
298 void
300 {
301  NS_LOG_FUNCTION (this << schedulerFactory);
302 
303  Ptr<Scheduler> scheduler = schedulerFactory.Create<Scheduler> ();
304 
305  if (m_events != 0)
306  {
307  while (!m_events->IsEmpty ())
308  {
309  Scheduler::Event next = m_events->RemoveNext ();
310  scheduler->Insert (next);
311  }
312  }
313  m_events = scheduler;
314 }
315 
316 void
318 {
319  NS_LOG_FUNCTION (this);
320 
321  Scheduler::Event next = m_events->RemoveNext ();
322 
323  NS_ASSERT (next.key.m_ts >= m_currentTs);
325 
326  NS_LOG_LOGIC ("handle " << next.key.m_ts);
327  m_currentTs = next.key.m_ts;
329  m_currentUid = next.key.m_uid;
330  next.impl->Invoke ();
331  next.impl->Unref ();
332 }
333 
334 bool
336 {
337  return m_globalFinished;
338 }
339 
340 bool
342 {
343  return m_events->IsEmpty () || m_stop;
344 }
345 
346 uint64_t
348 {
349  // If local MPI task is has no more events or stop was called
350  // next event time is infinity.
351  if (IsLocalFinished ())
352  {
354  }
355  else
356  {
357  Scheduler::Event ev = m_events->PeekNext ();
358  return ev.key.m_ts;
359  }
360 }
361 
362 Time
364 {
365  return TimeStep (NextTs ());
366 }
367 
368 void
370 {
371  NS_LOG_FUNCTION (this);
372 
373 #ifdef NS3_MPI
375  m_stop = false;
376  while (!m_globalFinished)
377  {
378  Time nextTime = Next ();
379 
380  // If local event is beyond grantedTime then need to synchronize
381  // with other tasks to determine new time window. If local task
382  // is finished then continue to participate in allgather
383  // synchronizations with other tasks until all tasks have
384  // completed.
385  if (nextTime > m_grantedTime || IsLocalFinished () )
386  {
387  // Can't process next event, calculate a new LBTS
388  // First receive any pending messages
390  // reset next time
391  nextTime = Next ();
392  // And check for send completes
394  // Finally calculate the lbts
396  m_myId, IsLocalFinished (), nextTime);
397  m_pLBTS[m_myId] = lMsg;
398  MPI_Allgather (&lMsg, sizeof (LbtsMessage), MPI_BYTE, m_pLBTS,
399  sizeof (LbtsMessage), MPI_BYTE, MPI_COMM_WORLD);
400  Time smallestTime = m_pLBTS[0].GetSmallestTime ();
401  // The totRx and totTx counts insure there are no transient
402  // messages; If totRx != totTx, there are transients,
403  // so we don't update the granted time.
404  uint32_t totRx = m_pLBTS[0].GetRxCount ();
405  uint32_t totTx = m_pLBTS[0].GetTxCount ();
407 
408  for (uint32_t i = 1; i < m_systemCount; ++i)
409  {
410  if (m_pLBTS[i].GetSmallestTime () < smallestTime)
411  {
412  smallestTime = m_pLBTS[i].GetSmallestTime ();
413  }
414  totRx += m_pLBTS[i].GetRxCount ();
415  totTx += m_pLBTS[i].GetTxCount ();
417  }
418  if (totRx == totTx)
419  {
420  // If lookahead is infinite then granted time should be as well.
421  // Covers the edge case if all the tasks have no inter tasks
422  // links, prevents overflow of granted time.
424  {
426  }
427  else
428  {
429  // Overflow is possible here if near end of representable time.
430  m_grantedTime = smallestTime + m_lookAhead;
431  }
432  }
433  }
434 
435  // Execute next event if it is within the current time window.
436  // Local task may be completed.
437  if ( (nextTime <= m_grantedTime) && (!IsLocalFinished ()) )
438  { // Safe to process
439  ProcessOneEvent ();
440  }
441  }
442 
443  // If the simulator stopped naturally by lack of events, make a
444  // consistency test to check that we didn't lose any events along the way.
445  NS_ASSERT (!m_events->IsEmpty () || m_unscheduledEvents == 0);
446 #else
447  NS_FATAL_ERROR ("Can't use distributed simulator without MPI compiled in");
448 #endif
449 }
450 
452 {
453  return m_myId;
454 }
455 
456 void
458 {
459  NS_LOG_FUNCTION (this);
460 
461  m_stop = true;
462 }
463 
464 void
466 {
467  NS_LOG_FUNCTION (this << delay.GetTimeStep ());
468 
470 }
471 
472 //
473 // Schedule an event for a _relative_ time in the future.
474 //
475 EventId
477 {
478  NS_LOG_FUNCTION (this << delay.GetTimeStep () << event);
479 
480  Time tAbsolute = delay + TimeStep (m_currentTs);
481 
482  NS_ASSERT (tAbsolute.IsPositive ());
483  NS_ASSERT (tAbsolute >= TimeStep (m_currentTs));
484  Scheduler::Event ev;
485  ev.impl = event;
486  ev.key.m_ts = static_cast<uint64_t> (tAbsolute.GetTimeStep ());
487  ev.key.m_context = GetContext ();
488  ev.key.m_uid = m_uid;
489  m_uid++;
491  m_events->Insert (ev);
492  return EventId (event, ev.key.m_ts, ev.key.m_context, ev.key.m_uid);
493 }
494 
495 void
496 DistributedSimulatorImpl::ScheduleWithContext (uint32_t context, Time const &delay, EventImpl *event)
497 {
498  NS_LOG_FUNCTION (this << context << delay.GetTimeStep () << m_currentTs << event);
499 
500  Scheduler::Event ev;
501  ev.impl = event;
502  ev.key.m_ts = m_currentTs + delay.GetTimeStep ();
503  ev.key.m_context = context;
504  ev.key.m_uid = m_uid;
505  m_uid++;
507  m_events->Insert (ev);
508 }
509 
510 EventId
512 {
513  NS_LOG_FUNCTION (this << event);
514 
515  Scheduler::Event ev;
516  ev.impl = event;
517  ev.key.m_ts = m_currentTs;
518  ev.key.m_context = GetContext ();
519  ev.key.m_uid = m_uid;
520  m_uid++;
522  m_events->Insert (ev);
523  return EventId (event, ev.key.m_ts, ev.key.m_context, ev.key.m_uid);
524 }
525 
526 EventId
528 {
529  NS_LOG_FUNCTION (this << event);
530 
531  EventId id (Ptr<EventImpl> (event, false), m_currentTs, 0xffffffff, 2);
532  m_destroyEvents.push_back (id);
533  m_uid++;
534  return id;
535 }
536 
537 Time
539 {
540  return TimeStep (m_currentTs);
541 }
542 
543 Time
545 {
546  if (IsExpired (id))
547  {
548  return TimeStep (0);
549  }
550  else
551  {
552  return TimeStep (id.GetTs () - m_currentTs);
553  }
554 }
555 
556 void
558 {
559  if (id.GetUid () == 2)
560  {
561  // destroy events.
562  for (DestroyEvents::iterator i = m_destroyEvents.begin (); i != m_destroyEvents.end (); i++)
563  {
564  if (*i == id)
565  {
566  m_destroyEvents.erase (i);
567  break;
568  }
569  }
570  return;
571  }
572  if (IsExpired (id))
573  {
574  return;
575  }
576  Scheduler::Event event;
577  event.impl = id.PeekEventImpl ();
578  event.key.m_ts = id.GetTs ();
579  event.key.m_context = id.GetContext ();
580  event.key.m_uid = id.GetUid ();
581  m_events->Remove (event);
582  event.impl->Cancel ();
583  // whenever we remove an event from the event list, we have to unref it.
584  event.impl->Unref ();
585 
587 }
588 
589 void
591 {
592  if (!IsExpired (id))
593  {
594  id.PeekEventImpl ()->Cancel ();
595  }
596 }
597 
598 bool
600 {
601  if (id.GetUid () == 2)
602  {
603  if (id.PeekEventImpl () == 0
604  || id.PeekEventImpl ()->IsCancelled ())
605  {
606  return true;
607  }
608  // destroy events.
609  for (DestroyEvents::const_iterator i = m_destroyEvents.begin (); i != m_destroyEvents.end (); i++)
610  {
611  if (*i == id)
612  {
613  return false;
614  }
615  }
616  return true;
617  }
618  if (id.PeekEventImpl () == 0
619  || id.GetTs () < m_currentTs
620  || (id.GetTs () == m_currentTs
621  && id.GetUid () <= m_currentUid)
622  || id.PeekEventImpl ()->IsCancelled ())
623  {
624  return true;
625  }
626  else
627  {
628  return false;
629  }
630 }
631 
632 Time
634 {
637  return TimeStep (0x7fffffffffffffffLL);
638 }
639 
640 uint32_t
642 {
643  return m_currentContext;
644 }
645 
646 } // namespace ns3
tuple channel
Definition: third.py:85
Time Get(void) const
Definition: time.cc:443
Simulation virtual time values and global simulation resolution.
Definition: nstime.h:102
Smart pointer class similar to boost::intrusive_ptr.
Definition: ptr.h:73
#define NS_LOG_FUNCTION(parameters)
If log level LOG_FUNCTION is enabled, this macro will output all input parameters separated by "...
virtual void SetScheduler(ObjectFactory schedulerFactory)
Set the Scheduler to be used to manage the event list.
#define NS_OBJECT_ENSURE_REGISTERED(type)
Register an Object subclass with the TypeId system.
Definition: object-base.h:44
std::vector< Ptr< Node > >::const_iterator Iterator
Node container iterator.
uint64_t m_ts
Event time stamp.
Definition: scheduler.h:81
virtual EventId Schedule(Time const &delay, EventImpl *event)
Schedule a future event execution (in the same context).
EventImpl * impl
Pointer to the event implementation.
Definition: scheduler.h:94
#define NS_ASSERT(condition)
At runtime, in debugging builds, if this condition is not true, the program prints the source file...
Definition: assert.h:67
#define NS_LOG_COMPONENT_DEFINE(name)
Define a Log component with a specific name.
Definition: log.h:201
virtual EventId ScheduleDestroy(EventImpl *event)
Schedule an event to run at the end of the simulation, after the Stop() time or condition has been re...
#define NS_UNUSED(x)
Mark a local variable as unused.
Definition: unused.h:36
virtual Time GetMaximumSimulationTime(void) const
Get the maximum representable simulation time.
Iterator End(void) const
Get an iterator which indicates past-the-last Node in the container.
#define NS_FATAL_ERROR(msg)
Report a fatal error with a message and terminate.
Definition: fatal-error.h:162
virtual void DoDispose(void)
Destructor implementation.
Definition: object.cc:340
virtual void SetMaximumLookAhead(const Time lookAhead)
uint32_t GetSystemId(void) const
Definition: node.cc:121
virtual Time Now(void) const
Return the current simulation virtual time.
virtual void DoDispose(void)
Destructor implementation.
void Invoke(void)
Called by the simulation engine to notify the event that it is time to execute.
Definition: event-impl.cc:46
static void TestSendComplete()
Check for completed sends.
virtual Time GetDelayLeft(const EventId &id) const
Get the remaining time until this event will execute.
static EventId Schedule(Time const &delay, MEM mem_ptr, OBJ obj)
Schedule an event to expire after delay.
Definition: simulator.h:1221
EventKey key
Key for sorting and ordering Events.
Definition: scheduler.h:95
static void Destroy()
Deletes storage used by the parallel environment.
AttributeValue implementation for Time.
Definition: nstime.h:957
Ptr< Object > Create(void) const
Create an Object instance of the configured TypeId.
uint32_t m_uid
Event unique id.
Definition: scheduler.h:82
virtual void Cancel(const EventId &id)
Set the cancel bit on this event: the event's associated function will not be invoked when it expires...
virtual EventId ScheduleNow(EventImpl *event)
Schedule an event to run at the current virtual time.
void Unref(void) const
Decrement the reference count.
Maintain the event list.
Definition: scheduler.h:66
#define NS_LOG_LOGIC(msg)
Use NS_LOG to output a message of level LOG_LOGIC.
Definition: log.h:252
virtual bool IsExpired(const EventId &id) const
Check if an event has already run or been cancelled.
Scheduler event.
Definition: scheduler.h:92
Distributed simulator implementation using lookahead.
keep track of a set of node pointers.
virtual void Destroy()
Execute the events scheduled with ScheduleDestroy().
Iterator Begin(void) const
Get an iterator which refers to the first Node in the container.
int64_t GetTimeStep(void) const
Get the raw time value, in the current resolution unit.
Definition: nstime.h:377
Time TimeStep(uint64_t ts)
Definition: nstime.h:952
int64_t GetInteger(void) const
Get the raw time value, in the current resolution unit.
Definition: nstime.h:385
virtual uint32_t GetSystemId(void) const
Get the system id of this simulator.
virtual uint32_t GetContext(void) const
Get the current simulation context.
Structure used for all-reduce LBTS computation.
static NodeContainer GetGlobal(void)
Create a NodeContainer that contains a list of all nodes created through NodeContainer::Create() and ...
virtual void Remove(const EventId &id)
Remove an event from the event list.
Instantiate subclasses of ns3::Object.
A simulation event.
Definition: event-impl.h:44
static void ReceiveMessages()
Check for received messages complete.
An identifier for simulation events.
Definition: event-id.h:53
static uint32_t GetSystemId()
#define NS_LOG_WARN(msg)
Use NS_LOG to output a message of level LOG_WARN.
Definition: log.h:228
static void Stop(void)
Tell the Simulator the calling event should be the last one executed.
Definition: simulator.cc:208
Time Seconds(double value)
Construct a Time in the indicated unit.
Definition: nstime.h:895
virtual void ScheduleWithContext(uint32_t context, Time const &delay, EventImpl *event)
Schedule a future event execution (in a different context).
virtual void Run(void)
Run the simulation.
virtual void Stop(void)
Tell the Simulator the calling event should be the last one executed.
virtual bool IsFinished(void) const
Check if the simulation should finish.
a unique identifier for an interface.
Definition: type-id.h:58
TypeId SetParent(TypeId tid)
Set the parent TypeId.
Definition: type-id.cc:827
static uint32_t GetSize()
void(* Time)(Time oldValue, Time newValue)
TracedValue callback signature for Time.
Definition: nstime.h:719
uint32_t m_context
Event context.
Definition: scheduler.h:83
The SimulatorImpl base class.