AJA NTV2 SDK  18.0.0.2717
NTV2 SDK 18.0.0.2717
pullpin.cpp
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // File: PullPin.cpp
3 //
4 // Desc: DirectShow base classes - implements CPullPin class that pulls data
5 // from IAsyncReader.
6 //
7 // Copyright (c) 1992-2001 Microsoft Corporation. All rights reserved.
8 //------------------------------------------------------------------------------
9 
10 
11 #include <streams.h>
12 #include "pullpin.h"
13 
14 #ifdef DXMPERF
15 #include "dxmperf.h"
16 #endif // DXMPERF
17 
18 
20  : m_pReader(NULL),
21  m_pAlloc(NULL),
22  m_State(TM_Exit)
23 {
24 #ifdef DXMPERF
25  PERFLOG_CTOR( L"CPullPin", this );
26 #endif // DXMPERF
27 
28 }
29 
31 {
32  Disconnect();
33 
34 #ifdef DXMPERF
35  PERFLOG_DTOR( L"CPullPin", this );
36 #endif // DXMPERF
37 
38 }
39 
40 // returns S_OK if successfully connected to an IAsyncReader interface
41 // from this object
42 // Optional allocator should be proposed as a preferred allocator if
43 // necessary
44 HRESULT
45 CPullPin::Connect(IUnknown* pUnk, IMemAllocator* pAlloc, BOOL bSync)
46 {
47  CAutoLock lock(&m_AccessLock);
48 
49  if (m_pReader) {
50  return VFW_E_ALREADY_CONNECTED;
51  }
52 
53  HRESULT hr = pUnk->QueryInterface(IID_IAsyncReader, (void**)&m_pReader);
54  if (FAILED(hr)) {
55 
56 #ifdef DXMPERF
57  {
58  AM_MEDIA_TYPE * pmt = NULL;
59  PERFLOG_CONNECT( this, pUnk, hr, pmt );
60  }
61 #endif // DXMPERF
62 
63  return(hr);
64  }
65 
66  hr = DecideAllocator(pAlloc, NULL);
67  if (FAILED(hr)) {
68  Disconnect();
69 
70 #ifdef DXMPERF
71  {
72  AM_MEDIA_TYPE * pmt = NULL;
73  PERFLOG_CONNECT( this, pUnk, hr, pmt );
74  }
75 #endif // DXMPERF
76 
77  return hr;
78  }
79 
80  LONGLONG llTotal, llAvail;
81  hr = m_pReader->Length(&llTotal, &llAvail);
82  if (FAILED(hr)) {
83  Disconnect();
84 
85 #ifdef DXMPERF
86  {
87  AM_MEDIA_TYPE * pmt = NULL;
88  PERFLOG_CONNECT( this, pUnk, hr, pmt );
89  }
90 #endif
91 
92  return hr;
93  }
94 
95  // convert from file position to reference time
96  m_tDuration = llTotal * UNITS;
97  m_tStop = m_tDuration;
98  m_tStart = 0;
99 
100  m_bSync = bSync;
101 
102 #ifdef DXMPERF
103  {
104  AM_MEDIA_TYPE * pmt = NULL;
105  PERFLOG_CONNECT( this, pUnk, S_OK, pmt );
106  }
107 #endif // DXMPERF
108 
109 
110  return S_OK;
111 }
112 
113 // disconnect any connection made in Connect
114 HRESULT
116 {
117  CAutoLock lock(&m_AccessLock);
118 
119  StopThread();
120 
121 
122 #ifdef DXMPERF
123  PERFLOG_DISCONNECT( this, m_pReader, S_OK );
124 #endif // DXMPERF
125 
126 
127  if (m_pReader) {
128  m_pReader->Release();
129  m_pReader = NULL;
130  }
131 
132  if (m_pAlloc) {
133  m_pAlloc->Release();
134  m_pAlloc = NULL;
135  }
136 
137  return S_OK;
138 }
139 
140 // agree an allocator using RequestAllocator - optional
141 // props param specifies your requirements (non-zero fields).
142 // returns an error code if fail to match requirements.
143 // optional IMemAllocator interface is offered as a preferred allocator
144 // but no error occurs if it can't be met.
145 HRESULT
147  IMemAllocator * pAlloc,
148  __inout_opt ALLOCATOR_PROPERTIES * pProps)
149 {
150  ALLOCATOR_PROPERTIES *pRequest;
151  ALLOCATOR_PROPERTIES Request;
152  if (pProps == NULL) {
153  Request.cBuffers = 3;
154  Request.cbBuffer = 64*1024;
155  Request.cbAlign = 0;
156  Request.cbPrefix = 0;
157  pRequest = &Request;
158  } else {
159  pRequest = pProps;
160  }
161  HRESULT hr = m_pReader->RequestAllocator(
162  pAlloc,
163  pRequest,
164  &m_pAlloc);
165  return hr;
166 }
167 
168 // start pulling data
169 HRESULT
171 {
172  ASSERT(!ThreadExists());
173  return StartThread();
174 }
175 
176 // stop pulling data
177 HRESULT
179 {
180  StopThread();
181 
182  return S_OK;
183 }
184 
185 HRESULT
186 CPullPin::Seek(REFERENCE_TIME tStart, REFERENCE_TIME tStop)
187 {
188  CAutoLock lock(&m_AccessLock);
189 
190  ThreadMsg AtStart = m_State;
191 
192  if (AtStart == TM_Start) {
193  BeginFlush();
194  PauseThread();
195  EndFlush();
196  }
197 
198  m_tStart = tStart;
199  m_tStop = tStop;
200 
201  HRESULT hr = S_OK;
202  if (AtStart == TM_Start) {
203  hr = StartThread();
204  }
205 
206  return hr;
207 }
208 
209 HRESULT
210 CPullPin::Duration(__out REFERENCE_TIME* ptDuration)
211 {
212  *ptDuration = m_tDuration;
213  return S_OK;
214 }
215 
216 
217 HRESULT
218 CPullPin::StartThread()
219 {
220  CAutoLock lock(&m_AccessLock);
221 
222  if (!m_pAlloc || !m_pReader) {
223  return E_UNEXPECTED;
224  }
225 
226  HRESULT hr;
227  if (!ThreadExists()) {
228 
229  // commit allocator
230  hr = m_pAlloc->Commit();
231  if (FAILED(hr)) {
232  return hr;
233  }
234 
235  // start thread
236  if (!Create()) {
237  return E_FAIL;
238  }
239  }
240 
241  m_State = TM_Start;
242  hr = (HRESULT) CallWorker(m_State);
243  return hr;
244 }
245 
246 HRESULT
247 CPullPin::PauseThread()
248 {
249  CAutoLock lock(&m_AccessLock);
250 
251  if (!ThreadExists()) {
252  return E_UNEXPECTED;
253  }
254 
255  // need to flush to ensure the thread is not blocked
256  // in WaitForNext
257  HRESULT hr = m_pReader->BeginFlush();
258  if (FAILED(hr)) {
259  return hr;
260  }
261 
262  m_State = TM_Pause;
263  hr = CallWorker(TM_Pause);
264 
265  m_pReader->EndFlush();
266  return hr;
267 }
268 
269 HRESULT
270 CPullPin::StopThread()
271 {
272  CAutoLock lock(&m_AccessLock);
273 
274  if (!ThreadExists()) {
275  return S_FALSE;
276  }
277 
278  // need to flush to ensure the thread is not blocked
279  // in WaitForNext
280  HRESULT hr = m_pReader->BeginFlush();
281  if (FAILED(hr)) {
282  return hr;
283  }
284 
285  m_State = TM_Exit;
286  hr = CallWorker(TM_Exit);
287 
288  m_pReader->EndFlush();
289 
290  // wait for thread to completely exit
291  Close();
292 
293  // decommit allocator
294  if (m_pAlloc) {
295  m_pAlloc->Decommit();
296  }
297 
298  return S_OK;
299 }
300 
301 
302 DWORD
303 CPullPin::ThreadProc(void)
304 {
305  while(1) {
306  DWORD cmd = GetRequest();
307  switch(cmd) {
308  case TM_Exit:
309  Reply(S_OK);
310  return 0;
311 
312  case TM_Pause:
313  // we are paused already
314  Reply(S_OK);
315  break;
316 
317  case TM_Start:
318  Reply(S_OK);
319  Process();
320  break;
321  }
322 
323  // at this point, there should be no outstanding requests on the
324  // upstream filter.
325  // We should force begin/endflush to ensure that this is true.
326  // !!!Note that we may currently be inside a BeginFlush/EndFlush pair
327  // on another thread, but the premature EndFlush will do no harm now
328  // that we are idle.
329  m_pReader->BeginFlush();
330  CleanupCancelled();
331  m_pReader->EndFlush();
332  }
333 }
334 
335 HRESULT
336 CPullPin::QueueSample(
337  __inout REFERENCE_TIME& tCurrent,
338  REFERENCE_TIME tAlignStop,
339  BOOL bDiscontinuity
340  )
341 {
342  IMediaSample* pSample;
343 
344  HRESULT hr = m_pAlloc->GetBuffer(&pSample, NULL, NULL, 0);
345  if (FAILED(hr)) {
346  return hr;
347  }
348 
349  LONGLONG tStopThis = tCurrent + (pSample->GetSize() * UNITS);
350  if (tStopThis > tAlignStop) {
351  tStopThis = tAlignStop;
352  }
353  pSample->SetTime(&tCurrent, &tStopThis);
354  tCurrent = tStopThis;
355 
356  pSample->SetDiscontinuity(bDiscontinuity);
357 
358  hr = m_pReader->Request(
359  pSample,
360  0);
361  if (FAILED(hr)) {
362  pSample->Release();
363 
364  CleanupCancelled();
365  OnError(hr);
366  }
367  return hr;
368 }
369 
370 HRESULT
371 CPullPin::CollectAndDeliver(
372  REFERENCE_TIME tStart,
373  REFERENCE_TIME tStop)
374 {
375  IMediaSample* pSample = NULL; // better be sure pSample is set
376  DWORD_PTR dwUnused;
377  HRESULT hr = m_pReader->WaitForNext(
378  INFINITE,
379  &pSample,
380  &dwUnused);
381  if (FAILED(hr)) {
382  if (pSample) {
383  pSample->Release();
384  }
385  } else {
386  hr = DeliverSample(pSample, tStart, tStop);
387  }
388  if (FAILED(hr)) {
389  CleanupCancelled();
390  OnError(hr);
391  }
392  return hr;
393 
394 }
395 
396 HRESULT
397 CPullPin::DeliverSample(
398  IMediaSample* pSample,
399  REFERENCE_TIME tStart,
400  REFERENCE_TIME tStop
401  )
402 {
403  // fix up sample if past actual stop (for sector alignment)
404  REFERENCE_TIME t1, t2;
405  if (S_OK == pSample->GetTime(&t1, &t2)) {
406  if (t2 > tStop) {
407  t2 = tStop;
408  }
409 
410  // adjust times to be relative to (aligned) start time
411  t1 -= tStart;
412  t2 -= tStart;
413  HRESULT hr = pSample->SetTime(&t1, &t2);
414  if (FAILED(hr)) {
415  return hr;
416  }
417  }
418 
419 #ifdef DXMPERF
420  {
421  AM_MEDIA_TYPE * pmt = NULL;
422  pSample->GetMediaType( &pmt );
423  PERFLOG_RECEIVE( L"CPullPin", m_pReader, this, pSample, pmt );
424  }
425 #endif
426 
427  HRESULT hr = Receive(pSample);
428  pSample->Release();
429  return hr;
430 }
431 
432 void
433 CPullPin::Process(void)
434 {
435  // is there anything to do?
436  if (m_tStop <= m_tStart) {
437  EndOfStream();
438  return;
439  }
440 
441  BOOL bDiscontinuity = TRUE;
442 
443  // if there is more than one sample at the allocator,
444  // then try to queue 2 at once in order to overlap.
445  // -- get buffer count and required alignment
446  ALLOCATOR_PROPERTIES Actual;
447  HRESULT hr = m_pAlloc->GetProperties(&Actual);
448 
449  // align the start position downwards
450  REFERENCE_TIME tStart = AlignDown(m_tStart / UNITS, Actual.cbAlign) * UNITS;
451  REFERENCE_TIME tCurrent = tStart;
452 
453  REFERENCE_TIME tStop = m_tStop;
454  if (tStop > m_tDuration) {
455  tStop = m_tDuration;
456  }
457 
458  // align the stop position - may be past stop, but that
459  // doesn't matter
460  REFERENCE_TIME tAlignStop = AlignUp(tStop / UNITS, Actual.cbAlign) * UNITS;
461 
462 
463  DWORD dwRequest;
464 
465  if (!m_bSync) {
466 
467  // Break out of the loop either if we get to the end or we're asked
468  // to do something else
469  while (tCurrent < tAlignStop) {
470 
471  // Break out without calling EndOfStream if we're asked to
472  // do something different
473  if (CheckRequest(&dwRequest)) {
474  return;
475  }
476 
477  // queue a first sample
478  if (Actual.cBuffers > 1) {
479 
480  hr = QueueSample(tCurrent, tAlignStop, TRUE);
481  bDiscontinuity = FALSE;
482 
483  if (FAILED(hr)) {
484  return;
485  }
486  }
487 
488 
489 
490  // loop queueing second and waiting for first..
491  while (tCurrent < tAlignStop) {
492 
493  hr = QueueSample(tCurrent, tAlignStop, bDiscontinuity);
494  bDiscontinuity = FALSE;
495 
496  if (FAILED(hr)) {
497  return;
498  }
499 
500  hr = CollectAndDeliver(tStart, tStop);
501  if (S_OK != hr) {
502 
503  // stop if error, or if downstream filter said
504  // to stop.
505  return;
506  }
507  }
508 
509  if (Actual.cBuffers > 1) {
510  hr = CollectAndDeliver(tStart, tStop);
511  if (FAILED(hr)) {
512  return;
513  }
514  }
515  }
516  } else {
517 
518  // sync version of above loop
519  while (tCurrent < tAlignStop) {
520 
521  // Break out without calling EndOfStream if we're asked to
522  // do something different
523  if (CheckRequest(&dwRequest)) {
524  return;
525  }
526 
527  IMediaSample* pSample;
528 
529  hr = m_pAlloc->GetBuffer(&pSample, NULL, NULL, 0);
530  if (FAILED(hr)) {
531  OnError(hr);
532  return;
533  }
534 
535  LONGLONG tStopThis = tCurrent + (pSample->GetSize() * UNITS);
536  if (tStopThis > tAlignStop) {
537  tStopThis = tAlignStop;
538  }
539  pSample->SetTime(&tCurrent, &tStopThis);
540  tCurrent = tStopThis;
541 
542  if (bDiscontinuity) {
543  pSample->SetDiscontinuity(TRUE);
544  bDiscontinuity = FALSE;
545  }
546 
547  hr = m_pReader->SyncReadAligned(pSample);
548 
549  if (FAILED(hr)) {
550  pSample->Release();
551  OnError(hr);
552  return;
553  }
554 
555  hr = DeliverSample(pSample, tStart, tStop);
556  if (hr != S_OK) {
557  if (FAILED(hr)) {
558  OnError(hr);
559  }
560  return;
561  }
562  }
563  }
564 
565  EndOfStream();
566 }
567 
568 // after a flush, cancelled i/o will be waiting for collection
569 // and release
570 void
571 CPullPin::CleanupCancelled(void)
572 {
573  while (1) {
574  IMediaSample * pSample;
575  DWORD_PTR dwUnused;
576 
577  HRESULT hr = m_pReader->WaitForNext(
578  0, // no wait
579  &pSample,
580  &dwUnused);
581  if(pSample) {
582  pSample->Release();
583  } else {
584  // no more samples
585  return;
586  }
587  }
588 }
CPullPin::EndOfStream
virtual HRESULT EndOfStream(void) PURE
CPullPin::EndFlush
virtual HRESULT EndFlush() PURE
streams.h
NULL
#define NULL
Definition: ntv2caption608types.h:19
PERFLOG_RECEIVE
#define PERFLOG_RECEIVE(name, source, dest, sample, pmt)
Definition: dxmperf.h:61
CAutoLock
Definition: wxutil.h:83
CPullPin::Connect
HRESULT Connect(IUnknown *pUnk, IMemAllocator *pAlloc, BOOL bSync)
Definition: pullpin.cpp:45
PERFLOG_CONNECT
#define PERFLOG_CONNECT(connector, connectee, status, pmt)
Definition: dxmperf.h:68
CPullPin::DecideAllocator
virtual HRESULT DecideAllocator(IMemAllocator *pAlloc, __inout_opt ALLOCATOR_PROPERTIES *pProps)
Definition: pullpin.cpp:146
CPullPin::m_pAlloc
IMemAllocator * m_pAlloc
Definition: pullpin.h:74
CPullPin::Active
HRESULT Active(void)
Definition: pullpin.cpp:170
CPullPin::BeginFlush
virtual HRESULT BeginFlush() PURE
dxmperf.h
CPullPin::AlignDown
LONGLONG AlignDown(LONGLONG ll, LONG lAlign)
Definition: pullpin.h:114
CPullPin::Duration
HRESULT Duration(__out REFERENCE_TIME *ptDuration)
Definition: pullpin.cpp:210
PERFLOG_DTOR
#define PERFLOG_DTOR(name, iface)
Definition: dxmperf.h:59
CPullPin::~CPullPin
virtual ~CPullPin()
Definition: pullpin.cpp:30
CPullPin::CPullPin
CPullPin()
Definition: pullpin.cpp:19
CPullPin::Inactive
HRESULT Inactive(void)
Definition: pullpin.cpp:178
PERFLOG_CTOR
#define PERFLOG_CTOR(name, iface)
Definition: dxmperf.h:58
CPullPin::Seek
HRESULT Seek(REFERENCE_TIME tStart, REFERENCE_TIME tStop)
Definition: pullpin.cpp:186
UNITS
const LONGLONG UNITS
Definition: reftime.h:41
CPullPin::OnError
virtual void OnError(HRESULT hr) PURE
CPullPin::AlignUp
LONGLONG AlignUp(LONGLONG ll, LONG lAlign)
Definition: pullpin.h:119
CPullPin::Disconnect
HRESULT Disconnect()
Definition: pullpin.cpp:115
CPullPin::Receive
virtual HRESULT Receive(IMediaSample *) PURE
pullpin.h
ASSERT
#define ASSERT(_x_)
Definition: wxdebug.h:205
hr
__out HRESULT & hr
Definition: pstream.cpp:145
PERFLOG_DISCONNECT
#define PERFLOG_DISCONNECT(disconnector, disconnectee, status)
Definition: dxmperf.h:70