C++ Static Memory Buffer Template Class Container Multithread

Aus Software Entwicklung Projekte
Wechseln zu: Navigation, Suche

Implementierung eines Buffers mit statischem Speicher - keine memory allocations

Muss aber noch optimiert werden - sind mir aktuell zu viele Locks

Was ist mit Callbacks?

Die Beispielimplementierung wird auch nur unter Windows ab Vista funktionieren, da die SRWLOCKS und die CONDITION_VARIABLE erst seit dieser Version unterstützt werden. Die erste Implementierung der Locks mittels boost::interprocess hat eine deutlich höhere CPU Last erzeugt und war auch deutlich langsamer, siehe Thread Synchronisation

#pragma once
 
template<typename BaseType>
class MemoryBuffer
{
public:
 
  //------------------------------------------------------------
  //------------------------------------------------------------
  MemoryBuffer(size_t Size)
  {
    buffer = new unsigned char[Size];
    BufferSize = Size;
    WriteRequestSize = 0;
    ReadRequestSize = 0;
 
    WriteBegin = buffer;
    WriteIntermediateEnd = buffer;
    WriteEnd = buffer;
    ReadEnd = buffer;
 
    InitializeSRWLock(&reentrant_block);
    InitializeSRWLock(&cond_get_read);
    InitializeSRWLock(&cond_get_write);
 
    InitializeSRWLock(&cond_write_srw);
    InitializeConditionVariable(&cond_write_possible);
 
    InitializeSRWLock(&cond_read_srw);
    InitializeConditionVariable(&cond_read_possible);
  }
  //------------------------------------------------------------
  //------------------------------------------------------------
  ~MemoryBuffer()
  {
    try
    {
      delete[] buffer;
      buffer = NULL;
 
      //DeleteCriticalSection(&reentrant_block);
    }
    catch (...)
    {
 
    }
  }
  bool WaitForFree(size_t RequestSize, int timeoutMS)
  {
    assert(RequestSize <= 0xFFFF);
    assert(timeoutMS >= 0);
    return _WaitForFree(RequestSize, timeoutMS);
  }
 
  void Get_w(BaseType **block, size_t *RequestSize)  //, int timeoutMS)
  {
    // Habe ich auch die korrekten Parameter
    assert(RequestSize != NULL);
    assert(block != NULL);
    assert(*RequestSize <= 0xFFFF);
 
    AcquireSRWLockShared(&cond_get_write);
    AcquireSRWLockShared(&reentrant_block);
    _Get_w(block, RequestSize);
    ReleaseSRWLockShared(&reentrant_block);
    if (*block == NULL) ReleaseSRWLockShared(&cond_get_write);
  }
 
  void Release_w(BaseType *block, size_t ReleaseSize, bool FreeUnused)
  {
    assert(block != NULL);
    assert(ReleaseSize <= 0xFFFF);
 
    AcquireSRWLockExclusive(&reentrant_block);
    _Release_w(block, ReleaseSize, FreeUnused);
    ReleaseSRWLockExclusive(&reentrant_block);
    ReleaseSRWLockShared(&cond_get_write);
  }
 
  size_t SizeWriteAvailable(size_t minSize)
  {
    assert(minSize <= 0xFFFF);
 
    AcquireSRWLockShared(&reentrant_block);
    size_t ret = _SizeWriteAvailable(minSize);
    ReleaseSRWLockShared(&reentrant_block);
    return ret;
  }
 
  bool WaitForData(size_t RequestSize, int timeoutMS)
  {
    assert(RequestSize <= 0xFFFF);
    assert(timeoutMS >= 0);
    return _WaitForData(RequestSize, timeoutMS);
  }
 
  void Get_r(BaseType **block, size_t *RequestSize)
  {
    AcquireSRWLockShared(&cond_get_read);
    AcquireSRWLockShared(&reentrant_block);
    _Get_r(block, RequestSize);
    ReleaseSRWLockShared(&reentrant_block);
    if (block == NULL) ReleaseSRWLockShared(&cond_get_read);
  }
  void Release_r(BaseType *block, size_t ReleaseSize)
  {
    AcquireSRWLockExclusive(&reentrant_block);
    _Release_r(block, ReleaseSize);
    ReleaseSRWLockExclusive(&reentrant_block);
    ReleaseSRWLockShared(&cond_get_read);
  }
 
  size_t SizeReadAvailable()
  {
    AcquireSRWLockShared(&reentrant_block);
    size_t ret = _SizeReadAvailable();
    ReleaseSRWLockShared(&reentrant_block);
    return ret;
  }
 
 
private:
  //------------------------------------------------------------
  //------------------------------------------------------------
  // For the writer
  bool _WaitForFree(size_t RequestSize, int timeoutMS)
  {
    while (true)
    {
      size_t size = SizeWriteAvailable(RequestSize);
      if (RequestSize <= size)
        return true;
 
      // Es gibt also keinen Block der gross genug ist, also warten
      WriteRequestSize = RequestSize;
      AcquireSRWLockShared(&cond_write_srw);
 
      BOOL ret = SleepConditionVariableSRW(&cond_write_possible, &cond_write_srw, timeoutMS, CONDITION_VARIABLE_LOCKMODE_SHARED);
      ReleaseSRWLockShared(&cond_write_srw);
      WriteRequestSize = 0;
      if (ret == FALSE)  // dann ist der Timeoutfall aufgetreten, in allen anderen Faellen, die Schleife einfach nochmal durchlaufen
      {
        return false;
      }
    }
  }
  //------------------------------------------------------------
  //------------------------------------------------------------
  // Max Request Size ist 64k
  void _Get_w(BaseType **block, size_t *RequestSize)  //, int timeoutMS)
  {
    // Jetzt versuche ich einen Block der Groesse RquestSize zu finden, ansonsten schmeisse ich eine Exception
    // Sollte vorher kontrolliert sein, dass das nicht passieren kann mittels WaitForFree
 
    if (WriteEnd == WriteIntermediateEnd) // dann gibt es kein Intermediate End
    {
      size_t SpaceEnd = Writer_NI_SpaceEnd();
      size_t SpaceBegin = Writer_NI_SpaceBegin();
 
      if (SpaceEnd >= *RequestSize)  // dann ist genug Platz am Ende
      {
        *block = WriteEnd;  // Gib einen Ptr auf das Ende zurueck
        WriteEnd += *RequestSize;  // und verschiebe den Ende Ptr weit genug
        WriteIntermediateEnd = WriteEnd;  // und bewege auch den Intermediate Ptr weiter				        
        return;
      }
      if (SpaceBegin >= *RequestSize)  // dann ist genug Platz am Anfang
      {
        // Jetzt muss ich aufteilen
        *block = buffer;
        WriteEnd = buffer + *RequestSize;
        // Intermediate bleibt da, wo es ist				
        return;
      }
    }
    else
    {
      // Es ist bereits der Intermediate Fall, also nicht mehr ans Ende schreiben, sondern nur noch dazwischen
      size_t SpaceBetween = Writer_I_SpaceBetween();  // Im Falle intermediate ist begin > als end
      if (SpaceBetween >= *RequestSize)  // dann ist genug Platz in der Mitte
      {
        // Jetzt muss ich aufteilen
        *block = WriteEnd;
        WriteEnd = WriteEnd + *RequestSize;
        // Intermediate bleibt da, wo es ist
        return;
      }
    }
    // Anscheinend gibt es doch nicht genug Platz, also Fehler zurueck
    *block = NULL;
    *RequestSize = 0;
  }
 
  //------------------------------------------------------------
  // @FreeUnused, falls vorher mehr requested wurde, kann trotzdem wieder alles abgegeben werden
  // oder aber halt weitervendet werden
  //------------------------------------------------------------
  void _Release_w(BaseType *block, size_t ReleaseSize, bool FreeUnused)
  {
    // Hiermit gebe ich den Write fuer das Lesen frei, indem ich das Ende verschiebe
    ReadEnd = block + ReleaseSize;
    if (FreeUnused)
    {
      if (WriteEnd == WriteIntermediateEnd)
      {
        WriteEnd = ReadEnd;
        WriteIntermediateEnd = ReadEnd;
      }
      else
      {
        WriteEnd = ReadEnd;
      }
    }
 
    if (_SizeReadAvailable() >= ReadRequestSize)  // dann ist jetzt wieder genug Platz zum schreiben da
    {
      WakeAllConditionVariable(&cond_read_possible);
    }
  }
 
  //------------------------------------------------------------
  //------------------------------------------------------------
  // minSize = minimale Block groesse
  size_t _SizeWriteAvailable(size_t minSize)
  {
    if (WriteEnd == WriteIntermediateEnd) // dann gibt es kein Intermediate End
    {
      // Platz am Ende
      size_t SpaceEnd = Writer_NI_SpaceEnd();
      size_t SpaceBegin = Writer_NI_SpaceBegin();
 
      if (minSize <= SpaceEnd)
      {
        return SpaceEnd;// dann ist genug Platz am Ende enthalten, sofort zurueck
      }
      if (minSize <= SpaceBegin)
      {
        return SpaceBegin; // dann ist genug Platz am Anfang enthalte
      }
 
      // dann gibt es keinen Platz mit minsize
      return 0;
    }
    else  // Spezialfall, es gibt zwei Enden und es kann nur ein dazwischen geben
    {
      size_t SpaceBetween = Writer_I_SpaceBetween();  // Im Falle intermediate ist begin > als end
 
      if (minSize <= SpaceBetween)
      {
        return SpaceBetween; // dann ist genug Platz in der Mitte enthalten
      }
      // dann gibt es keinen Platz mit minsize
      return 0;
    }
  }
 
  //------------------------------------------------------------
  //------------------------------------------------------------
  // For the reader
  bool _WaitForData(size_t RequestSize, int timeoutMS)
  {
    while (true)
    {
      size_t ReadAvail = SizeReadAvailable();
      if (ReadAvail >= RequestSize)
      {
        return true;
      }
      // Dann sind nicht genug Daten vorhanden
      ReadRequestSize = RequestSize;
      AcquireSRWLockShared(&cond_read_srw);
 
      BOOL ret = SleepConditionVariableSRW(&cond_read_possible, &cond_read_srw, timeoutMS, CONDITION_VARIABLE_LOCKMODE_SHARED);
      ReleaseSRWLockShared(&cond_read_srw);
      ReadRequestSize = 0;
      if (ret == FALSE)  // Dann ist ein Timeout aufgetreten
      {
        return false;
      }
    }
  }
  //------------------------------------------------------------
  //------------------------------------------------------------  
  void _Get_r(BaseType **block, size_t *RequestSize)
  {
    if (WriteBegin != ReadEnd)  // Sind Daten ueberhaupt zum Lesen vorhanden
    {
      if (WriteEnd == WriteIntermediateEnd)  // dann haben wir keinen Intermediatefall
      {
        size_t maxsize = (ReadEnd - WriteBegin);
        *block = WriteBegin;
        if (maxsize < *RequestSize)  // Begrenze Maxsize, falls nicht mehr in einem Aufruf geht
        {
          *RequestSize = maxsize;
        }
 
        return;
      }
      else
      {
        if (ReadEnd > WriteBegin) // Intermediate, aber nicht aktiv zum Lesen
        {
          size_t maxsize = (ReadEnd - WriteBegin);
          *block = WriteBegin;
          if (maxsize < *RequestSize)  // Begrenze Maxsize, falls nicht mehr in einem Aufruf geht
          {
            *RequestSize = maxsize;
          }
 
          return;
        }
        else
        {
          size_t maxsize = (WriteIntermediateEnd - WriteBegin);
          *block = WriteBegin;
          if (maxsize < *RequestSize)  // Begrenze Maxsize, falls nicht mehr in einem Aufruf geht
          {
            *RequestSize = maxsize;
          }
          return;
        }
      }
    }
    else  // dann sind keine Daten da und kann auch direkt wieder entsperren
    {
      *block = NULL;
      *RequestSize = 0;
      return;
    }
  }
  //------------------------------------------------------------
  //------------------------------------------------------------
  void _Release_r(BaseType *block, size_t ReleaseSize)
  {
    WriteBegin = block + ReleaseSize;
    if (WriteBegin == WriteIntermediateEnd) // und loesche das Intermeditate Ende, da verbraucht
    {
      if (WriteBegin == WriteEnd)  // dann ist sogar alles verbraucht
      {
        WriteEnd = buffer;
        ReadEnd = buffer;
      }
      WriteBegin = buffer;
      WriteIntermediateEnd = WriteEnd;
    }
    if (_SizeWriteAvailable(WriteRequestSize) >= WriteRequestSize)  // dann ist jetzt wieder genug Platz zum schreiben da
    {
      WakeAllConditionVariable(&cond_write_possible);
    }
  }
  //------------------------------------------------------------
  //------------------------------------------------------------
  size_t _SizeReadAvailable()
  {
    if (WriteBegin != ReadEnd)  // Sind Daten ueberhaupt zum Lesen vorhanden
    {
      size_t t = (ReadEnd - WriteBegin);
      //if (t > 0) t--;
 
      if (WriteEnd == WriteIntermediateEnd)  // dann haben wir keinen Intermediatefall
      {
        return t;
      }
      else
      {
        if (ReadEnd > WriteBegin) // Intermediate, aber nicht aktiv zum Lesen
        {
          return t;
        }
        else
        {
          return (WriteIntermediateEnd - WriteBegin) + (ReadEnd - buffer);
        }
      }
    }
    else
    {
      return 0;
    }
  }
  //------------------------------------------------------------
  //------------------------------------------------------------
private:
  size_t BufferSize;
  BaseType *buffer;
 
 
  // Sollte nur vom lesenden aus veraendert werden
  BaseType *WriteBegin;
 
  // wird vom schreibenden veraendert?!? Was ist mit dem Intermediate, wenn Begin ueber Intermediate sich ueberscheneidet?!?
  BaseType *WriteIntermediateEnd;  // falls das Ende nicht am Ende des buffers liegt
  BaseType *WriteEnd;
  BaseType *ReadEnd;
 
  // Protects alls functions, making them reentrant
  SRWLOCK reentrant_block;
 
  SRWLOCK cond_write_srw;
  CONDITION_VARIABLE cond_write_possible;
  size_t WriteRequestSize;
 
 
  SRWLOCK cond_read_srw;
  CONDITION_VARIABLE cond_read_possible;
  size_t ReadRequestSize;
 
  SRWLOCK cond_get_read;
  SRWLOCK cond_get_write;
 
  //------------------------------------------------------------
  //------------------------------------------------------------
  inline size_t  Writer_NI_SpaceEnd()
  {
    // Kann ein Feld hinter das Ende zeigen
    return (buffer + BufferSize) - WriteEnd;
  }
 
  //------------------------------------------------------------
  //------------------------------------------------------------
  inline size_t  Writer_NI_SpaceBegin()
  {
    size_t t = (WriteBegin - buffer);
    if (t > 0) t--;  // Immer einer weniger
    return t;  // Immer einer weniger, damit Begin nicht blockiert wird bzw. end begin einholt
  }
 
  //------------------------------------------------------------
  //------------------------------------------------------------
  inline size_t  Writer_I_SpaceBetween()
  {
    size_t t = (WriteBegin - WriteEnd);
    if (t > 0) t--;  // Immer einer weniger
    return t; // Immer einer weniger, damit Begin nicht blockiert wird bzw. end begin einholt
  }
};