AWS SDK for C++

AWS SDK for C++ Version 1.11.440

Loading...
Searching...
No Matches
Queue.h
1
5#pragma once
6
7#include <aws/core/client/ClientConfiguration.h>
8#include <aws/queues/Queues_EXPORTS.h>
9#include <thread>
10#include <atomic>
11#include <functional>
12
13namespace Aws
14{
15 namespace Queues
16 {
17 static const char* MEM_TAG = "Aws::Queues::Queue";
18
23 template<typename MESSAGE_TYPE>
24 class Queue
25 {
26 typedef std::function<void(const Queue*, const MESSAGE_TYPE&, bool&)> MessageReceivedEventHandler;
27 typedef std::function<void(const Queue*, const MESSAGE_TYPE&)> MessageDeleteFailedEventHandler;
28 typedef std::function<void(const Queue*, const MESSAGE_TYPE&)> MessageDeleteSuccessEventHandler;
29 typedef std::function<void(const Queue*, const MESSAGE_TYPE&)> MessageSendFailedEventHandler;
30 typedef std::function<void(const Queue*, const MESSAGE_TYPE&)> MessageSendSuccessEventHandler;
31
32 public:
39 Queue(unsigned pollingFrequency) :
40 m_continue(true), m_pollingFrequencyMs(pollingFrequency), m_pollingThread(nullptr)
41 {
42 }
43
44 virtual ~Queue()
45 {
47 }
48
49 virtual MESSAGE_TYPE Top() const = 0;
50 virtual void Delete(const MESSAGE_TYPE&) = 0;
51 virtual void Push(const MESSAGE_TYPE&) = 0;
52
59 {
60 if(!m_pollingThread)
61 {
62 m_continue = true;
63 m_pollingThread = Aws::MakeUnique<std::thread>(MEM_TAG, &Queue::Main, this);
64 }
65 }
66
74 {
75 m_continue = false;
76 if(m_pollingThread)
77 {
78 m_pollingThread->join();
79 m_pollingThread = nullptr;
80 }
81 }
82
83 inline void SetMessageReceivedEventHandler(const MessageReceivedEventHandler& messageHandler) { m_messageReceivedHandler = messageHandler; }
84 inline void SetMessageDeleteFailedEventHandler(const MessageDeleteFailedEventHandler& messageHandler) { m_messageDeleteFailedHandler = messageHandler; }
85 inline void SetMessageDeleteSuccessEventHandler(const MessageDeleteSuccessEventHandler& messageHandler) { m_messageDeleteSuccessHandler = messageHandler; }
86 inline void SetMessageSendFailedEventHandler(const MessageSendFailedEventHandler& messageHandler) { m_messageSendFailedHandler = messageHandler; }
87 inline void SetMessageSendSuccessEventHandler(const MessageSendSuccessEventHandler& messageHandler) { m_messageSendSuccessHandler = messageHandler; }
88
89 inline void SetMessageReceivedEventHandler(MessageReceivedEventHandler&& messageHandler) { m_messageReceivedHandler = messageHandler; }
90 inline void SetMessageDeleteFailedEventHandler(MessageDeleteFailedEventHandler&& messageHandler) { m_messageDeleteFailedHandler = messageHandler; }
91 inline void SetMessageDeleteSuccessEventHandler(MessageDeleteSuccessEventHandler&& messageHandler) { m_messageDeleteSuccessHandler = messageHandler; }
92 inline void SetMessageSendFailedEventHandler(MessageSendFailedEventHandler&& messageHandler) { m_messageSendFailedHandler = messageHandler; }
93 inline void SetMessageSendSuccessEventHandler(MessageSendSuccessEventHandler&& messageHandler) { m_messageSendSuccessHandler = messageHandler; }
94
95 inline const MessageReceivedEventHandler& GetMessageReceivedEventHandler() const { return m_messageReceivedHandler; }
96 inline const MessageDeleteFailedEventHandler& GetMessageDeleteFailedEventHandler() const { return m_messageDeleteFailedHandler; }
97 inline const MessageDeleteSuccessEventHandler& GetMessageDeleteSuccessEventHandler() const { return m_messageDeleteSuccessHandler; }
98 inline const MessageSendFailedEventHandler& GetMessageSendFailedEventHandler() const { return m_messageSendFailedHandler; }
99 inline const MessageSendSuccessEventHandler& GetMessageSendSuccessEventHandler() const { return m_messageSendSuccessHandler; }
100
101 protected:
102 std::atomic<bool> m_continue;
103
104 private:
105 void Main()
106 {
107 while(m_continue)
108 {
109 auto start = std::chrono::system_clock::now();
111 bool deleteMessage = false;
112
114 if (receivedHandler)
115 {
117 }
118
119 if (deleteMessage)
120 {
122 }
123
124 if(m_continue)
125 {
126 auto stop = std::chrono::system_clock::now();
127 auto timeTaken = std::chrono::duration_cast<std::chrono::milliseconds>(stop - start);
128
129 if (m_pollingFrequencyMs >= timeTaken.count())
130 {
131 std::this_thread::sleep_for(std::chrono::milliseconds(m_pollingFrequencyMs - timeTaken.count()));
132 }
133 }
134 }
135 }
136
137 unsigned m_pollingFrequencyMs;
138 Aws::UniquePtr<std::thread> m_pollingThread;
139
140 // Handlers
141 MessageReceivedEventHandler m_messageReceivedHandler;
142 MessageDeleteFailedEventHandler m_messageDeleteFailedHandler;
143 MessageDeleteSuccessEventHandler m_messageDeleteSuccessHandler;
144 MessageSendFailedEventHandler m_messageSendFailedHandler;
145 MessageSendSuccessEventHandler m_messageSendSuccessHandler;
146 };
147 }
148}
const MessageDeleteFailedEventHandler & GetMessageDeleteFailedEventHandler() const
Definition Queue.h:96
void SetMessageDeleteSuccessEventHandler(const MessageDeleteSuccessEventHandler &messageHandler)
Definition Queue.h:85
const MessageSendFailedEventHandler & GetMessageSendFailedEventHandler() const
Definition Queue.h:98
void StopPolling()
Definition Queue.h:73
const MessageDeleteSuccessEventHandler & GetMessageDeleteSuccessEventHandler() const
Definition Queue.h:97
virtual MESSAGE_TYPE Top() const =0
void SetMessageSendFailedEventHandler(const MessageSendFailedEventHandler &messageHandler)
Definition Queue.h:86
const MessageSendSuccessEventHandler & GetMessageSendSuccessEventHandler() const
Definition Queue.h:99
virtual void Delete(const MESSAGE_TYPE &)=0
void StartPolling()
Definition Queue.h:58
void SetMessageSendSuccessEventHandler(const MessageSendSuccessEventHandler &messageHandler)
Definition Queue.h:87
Queue(unsigned pollingFrequency)
Definition Queue.h:39
void SetMessageReceivedEventHandler(MessageReceivedEventHandler &&messageHandler)
Definition Queue.h:89
void SetMessageDeleteFailedEventHandler(const MessageDeleteFailedEventHandler &messageHandler)
Definition Queue.h:84
void SetMessageDeleteSuccessEventHandler(MessageDeleteSuccessEventHandler &&messageHandler)
Definition Queue.h:91
virtual void Push(const MESSAGE_TYPE &)=0
void SetMessageReceivedEventHandler(const MessageReceivedEventHandler &messageHandler)
Definition Queue.h:83
void SetMessageSendSuccessEventHandler(MessageSendSuccessEventHandler &&messageHandler)
Definition Queue.h:93
virtual ~Queue()
Definition Queue.h:44
std::atomic< bool > m_continue
Definition Queue.h:102
void SetMessageDeleteFailedEventHandler(MessageDeleteFailedEventHandler &&messageHandler)
Definition Queue.h:90
const MessageReceivedEventHandler & GetMessageReceivedEventHandler() const
Definition Queue.h:95
void SetMessageSendFailedEventHandler(MessageSendFailedEventHandler &&messageHandler)
Definition Queue.h:92
static const char * MEM_TAG
Definition Queue.h:17
UniquePtr< T > MakeUnique(const char *allocationTag, ArgTypes &&... args)
std::unique_ptr< T, D > UniquePtr