Because these methods have different semantics explained in the JavaDoc. add/remove are unconditional while offer/poll return special value:
offeronly offers a new value, but it might not be accepted, e.g. if the queue is fullpollonly polls for the value, but we accept the fact the value might not be there.
To complicate matters more, BlockingQueue introduces yet another pair of methods for blocking add/remove. Of course they could have used the same named with a bunch of parameters/flags,
smellyGet(boolean blocking, boolean failOnEmpty)
but don't you think this is a better design?
| Throws ex. | Special v. | Blocks | Times out
--------+------------+------------+--------+---------------------
Insert | add(e) | offer(e) | put(e) | offer(e, time, unit)
Remove | remove() | poll() | take() | poll(time, unit)
Examine | element() | peek() | N/A | N/A
* https://meta.stackexchange.com/questions/73566
Answer from Tomasz Nurkiewicz on Stack Overflownaming - Java Queues - why "poll" and "offer"? - Stack Overflow
Unsure if I am implementing SQS poller correctly in Java
Temporal Workers polling from a queue
Java concurrent queue for tag change scripts
Videos
Because these methods have different semantics explained in the JavaDoc. add/remove are unconditional while offer/poll return special value:
offeronly offers a new value, but it might not be accepted, e.g. if the queue is fullpollonly polls for the value, but we accept the fact the value might not be there.
To complicate matters more, BlockingQueue introduces yet another pair of methods for blocking add/remove. Of course they could have used the same named with a bunch of parameters/flags,
smellyGet(boolean blocking, boolean failOnEmpty)
but don't you think this is a better design?
| Throws ex. | Special v. | Blocks | Times out
--------+------------+------------+--------+---------------------
Insert | add(e) | offer(e) | put(e) | offer(e, time, unit)
Remove | remove() | poll() | take() | poll(time, unit)
Examine | element() | peek() | N/A | N/A
* https://meta.stackexchange.com/questions/73566
You're confusing Queue with Stack; push and pop are associated with the latter.
Think of Queue in its proper producer/consumer context; poll and offer will make far more sense.
This code currently works, and is obviously unfinished, but I am not entirely sure if I am implementing things correctly as I have not used SQS before. I have the queue configured for long polling with a 20 second wait. I just want to make sure I am not doing something wrong, or in a less than optimal way. Messages need to be processed immediately, and so far they are, but I haven't tried really loading it up yet. I have just been sending requests via Postman to the service that puts messages in the queue that this feeds from. This is running in SpringBoot btw.
Edit: Should I be running recieveMessage() on a thread as well in that loop, or is running the processing on a thread enough to keep things flowing quickly?
@Component
public class SQSPoller {
Logger logger = LoggerFactory.getLogger(SQSPoller.class);
private final SqsClient sqsClient;
private final Executor executor;
private boolean pollerActive = false;
private final String queueName = "my_queue_name";
private final String queueUrl;
public SQSPoller() {
this.sqsClient = SqsClient.builder().region(Region.US_WEST_2).build();
this.executor = Executors.newFixedThreadPool(10);
this.queueUrl = sqsClient.getQueueUrl(GetQueueUrlRequest.builder().queueName(queueName).build()).queueUrl();
}
@PostConstruct
public void init() {
startPoller();
}
public void startPoller() {
if(!pollerActive) {
pollerActive = true;
while(pollerActive){
receiveMessage();
}
}
}
public void receiveMessage() {
ReceiveMessageResponse response = sqsClient.receiveMessage(ReceiveMessageRequest.builder()
.waitTimeSeconds(20)
.queueUrl(queueUrl)
.build()
);
logger.info("messaged received");
executor.execute(() -> processMessages(response.messages()));
}
public void processMessages(List<Message> messages){
AvroDeserializer<TransactionEvent> deserializer = new AvroDeserializer<>(TransactionEvent.class);
for(Message m : messages){
try{
TransactionEvent te = deserializer.deSerialize(m.body());
logger.info(te.toString());
} catch (IOException e){
logger.error(e.getMessage());
}
logger.info(m.body());
}
}
}