Development

How to Install and Run Apache RocketMQ in Linux

Captain Salem 3 min read

How to Install and Run Apache RocketMQ in Linux

Apache RocketMQ is a distributed messaging and streaming platform with low latency, high performance and reliability, trillion-level capacity, and flexible scalability.

This tutorial will show you how to install and deploy a RocketMQ cluster on your local machine. You will also learn how to send and receive messages to and from the cluster.

Prerequisites

  1. Any Linux or Unix-Like system.
  2. JDK 1.8 and above installed

Check how to install JDK on Ubuntu:

https://www.geekbits.io/how-to-install-amazon-corretto-jdk-on-ubuntu/

Step 1 - Download the Apache RocketMQ Binary

The first step is to download the RocketMQ binary on your machine. Keep in mind that RocketMQ is also available as a source package if you need to compile it.

Open your browser and navigate to the link below:

https://www.apache.org/dyn/closer.cgi?path=rocketmq/5.0.0/rocketmq-all-5.0.0-bin-release.zip

Locate your target RocketMQ binary and download it.

Or run the command:

wget https://dlcdn.apache.org/rocketmq/5.0.0/rocketmq-all-5.0.0-bin-release.zip

Step 2 - Extract the Downloaded Archive

Once download is complete, locate extract the archive with the commands:

unzip rocketmq-all-5.0.0-bin-release.zip

Navigate into the extracted directory:

cd rocketmq-all-5.0.0-bin-release

Step 3 - Start the NameServer

Once extracted, run the command below to start the NameServer:

nohup sh ./bin/mqnamesrv &

Check the status of the NameServer with the command:

cat nohup.out

If you get the set JAVA_HOME error while using the Amazon Corretto JDK, run the command:

export JAVA_HOME=/Library/Java/JavaVirtualMachines/amazon-corretto-17.jdk/Contents/Home/

Similarly, you may get the error “Create RAMDisk /Volumes/RAMDisk for gc logging on Darwin OS.”, run the command:

mkdir diskutil erasevolume HFS+ 'RAM Disk' `hdiutil attach -nobrowse -nomount ram://Volumes/RAMDisk`

Once completed, cat the nohup..out file until you see the message:

The Name Server boot success. serializeType=JSON

This means the NameServer has been started successfully.

Step 4 - Start Broker and Proxy

Once the NameServer is up, we need to start the broket and proxy. Run the commands:

nohup sh ./bin/mqbroker -n localhost:9876 --enable-proxy &

Check nohup logs:

Mon Nov 28 03:14:52 EAT 2022 rocketmq-proxy startup successfully

And we have deployed a single RocketMQ cluster on our local machine. We cna now send and recieve messages.

Step 5 - Send and Receive Message with SDK

  1. Start by creating a Java Project.
  2. Add the SDK Depedency to Maven pom.xml file.
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client-java</artifactId>
    <version>5.0.0</version>
</dependency> 
  1. Create topic using mqadmin tools:
sh ./bin/msqadmin updatetopic -n localhost:9876 -t SampleTopic
  1. In the Java Project, create a program to send the messages and add the code:
import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.MessageListener;
import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.message.MessageBuilder;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import java.time.Duration;
import java.util.List;

public class ProducerExample {
    public static void main(String[] args) throws ClientException {
        String endpoint = "localhost:8081";
        String topic = "SampleTopic";
        ClientServiceProvider provider = ClientServiceProvider.loadService();
        ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);
        ClientConfiguration configuration = builder.build();
        Producer producer = provider.newProducerBuilder()
                .setTopics(topic)
                .setClientConfiguration(configuration)
                .build();
        Message message = provider.newMessageBuilder()
                .setTopic(topic)
                .setKeys("messageKey")
                .setTag("messageTag")
                .setBody("messageBody".getBytes())
                .build();
        try {
            SendReceipt sendReceipt = producer.send(message);
            System.out.println(sendReceipt.getMessageId());
        } catch (ClientException e) {
            e.printStackTrace();
        }
    }
}

Add a consumer program and add the code:

import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;

import java.io.IOException;
import java.util.Collections;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PushConsumerExample {
    private static final Logger LOGGER = LoggerFactory.getLogger(PushConsumerExample.class);

    private PushConsumerExample() {
    }

    public static void main(String[] args) throws ClientException, IOException, InterruptedException {
        final ClientServiceProvider provider = ClientServiceProvider.loadService();
        String endpoints = "localhost:8081";
        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
                .setEndpoints(endpoints)
                .build();
        String tag = "*";
        FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
        String consumerGroup = "ConsumerGroup";
        String topic = "SampleTopic";
        PushConsumer pushConsumer = provider.newPushConsumerBuilder()
                .setClientConfiguration(clientConfiguration)
                .setConsumerGroup(consumerGroup)
                .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
                .setMessageListener(messageView -> {
                    // LOGGER.info("Consume message={}", messageView);
                    System.out.println("Consume message!!");
                    return ConsumeResult.SUCCESS;
                })
                .build();
        Thread.sleep(Long.MAX_VALUE);
        // pushConsumer.close();
    }
}

Run the sender and consumer programs to test.

Step 6 - Shutdown Servers

Run the commands below to stop the servers:

./bin/mqshutdown broker
./bin/mqshutdown namesrv

End.

Share
Comments
More from Cloudenv

Cloudenv

Developer Tips, Tricks and Tutorials.

Great! You’ve successfully signed up.

Welcome back! You've successfully signed in.

You've successfully subscribed to Cloudenv.

Success! Check your email for magic link to sign-in.

Success! Your billing info has been updated.

Your billing was not updated.