How to Install and Run Apache RocketMQ in Linux
Apache RocketMQ is a distributed messaging and streaming platform with low latency, high performance and reliability, trillion-level capacity, and flexible scalability.
This tutorial will show you how to install and deploy a RocketMQ cluster on your local machine. You will also learn how to send and receive messages to and from the cluster.
Prerequisites
- Any Linux or Unix-Like system.
- JDK 1.8 and above installed
Check how to install JDK on Ubuntu:
https://www.geekbits.io/how-to-install-amazon-corretto-jdk-on-ubuntu/
Step 1 - Download the Apache RocketMQ Binary
The first step is to download the RocketMQ binary on your machine. Keep in mind that RocketMQ is also available as a source package if you need to compile it.
Open your browser and navigate to the link below:
https://www.apache.org/dyn/closer.cgi?path=rocketmq/5.0.0/rocketmq-all-5.0.0-bin-release.zip
Locate your target RocketMQ binary and download it.
Or run the command:
wget https://dlcdn.apache.org/rocketmq/5.0.0/rocketmq-all-5.0.0-bin-release.zip
Step 2 - Extract the Downloaded Archive
Once download is complete, locate extract the archive with the commands:
unzip rocketmq-all-5.0.0-bin-release.zip
Navigate into the extracted directory:
cd rocketmq-all-5.0.0-bin-release
Step 3 - Start the NameServer
Once extracted, run the command below to start the NameServer:
nohup sh ./bin/mqnamesrv &
Check the status of the NameServer with the command:
cat nohup.out
If you get the set JAVA_HOME error while using the Amazon Corretto JDK, run the command:
export JAVA_HOME=/Library/Java/JavaVirtualMachines/amazon-corretto-17.jdk/Contents/Home/
Similarly, you may get the error “Create RAMDisk /Volumes/RAMDisk for gc logging on Darwin OS.”, run the command:
mkdir diskutil erasevolume HFS+ 'RAM Disk' `hdiutil attach -nobrowse -nomount ram://Volumes/RAMDisk`
Once completed, cat the nohup..out file until you see the message:
The Name Server boot success. serializeType=JSON
This means the NameServer has been started successfully.
Step 4 - Start Broker and Proxy
Once the NameServer is up, we need to start the broket and proxy. Run the commands:
nohup sh ./bin/mqbroker -n localhost:9876 --enable-proxy &
Check nohup logs:
Mon Nov 28 03:14:52 EAT 2022 rocketmq-proxy startup successfully
And we have deployed a single RocketMQ cluster on our local machine. We cna now send and recieve messages.
Step 5 - Send and Receive Message with SDK
- Start by creating a Java Project.
- Add the SDK Depedency to Maven
pom.xml
file.
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client-java</artifactId>
<version>5.0.0</version>
</dependency>
- Create topic using
mqadmin
tools:
sh ./bin/msqadmin updatetopic -n localhost:9876 -t SampleTopic
- In the Java Project, create a program to send the messages and add the code:
import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.MessageListener;
import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.message.MessageBuilder;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import java.time.Duration;
import java.util.List;
public class ProducerExample {
public static void main(String[] args) throws ClientException {
String endpoint = "localhost:8081";
String topic = "SampleTopic";
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);
ClientConfiguration configuration = builder.build();
Producer producer = provider.newProducerBuilder()
.setTopics(topic)
.setClientConfiguration(configuration)
.build();
Message message = provider.newMessageBuilder()
.setTopic(topic)
.setKeys("messageKey")
.setTag("messageTag")
.setBody("messageBody".getBytes())
.build();
try {
SendReceipt sendReceipt = producer.send(message);
System.out.println(sendReceipt.getMessageId());
} catch (ClientException e) {
e.printStackTrace();
}
}
}
Add a consumer program and add the code:
import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import java.io.IOException;
import java.util.Collections;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PushConsumerExample {
private static final Logger LOGGER = LoggerFactory.getLogger(PushConsumerExample.class);
private PushConsumerExample() {
}
public static void main(String[] args) throws ClientException, IOException, InterruptedException {
final ClientServiceProvider provider = ClientServiceProvider.loadService();
String endpoints = "localhost:8081";
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(endpoints)
.build();
String tag = "*";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
String consumerGroup = "ConsumerGroup";
String topic = "SampleTopic";
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
.setClientConfiguration(clientConfiguration)
.setConsumerGroup(consumerGroup)
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
.setMessageListener(messageView -> {
// LOGGER.info("Consume message={}", messageView);
System.out.println("Consume message!!");
return ConsumeResult.SUCCESS;
})
.build();
Thread.sleep(Long.MAX_VALUE);
// pushConsumer.close();
}
}
Run the sender and consumer programs to test.
Step 6 - Shutdown Servers
Run the commands below to stop the servers:
./bin/mqshutdown broker
./bin/mqshutdown namesrv