Java API calls the methods of various Kafka protocols

  • 2020-10-23 20:57:56
  • OfStack

It is well known that Kafka itself implements a set of base 2 protocol (binary protocol) for various functions, such as sending messages, getting messages, submitting displacements, and creating topic. The specific protocol specification is shown in Kafka Protocol. The specific usage process of this protocol is as follows:

1. The client creates a request for the corresponding protocol

2. The client sends requests to the corresponding broker

3.broker handles the request and sends response to the client

While Kafka provides a large number of scripting tools for the implementation of various functions, many times we want to be able to programmatically embed some functionality into another system. The Java API approach is extremely flexible. In this article, I will try to give an example of the underlying framework of Java API, as well as corresponding examples for the two main functions of "Create topic" and "View Displacement". It should be noted in advance that the examples presented in this article do not take into account the Kafka cluster turning on security. In addition, Kafka's KIP4 should be 1 straight in the optimization of command line tools and various administrative operations, interested readers can follow this KIP.

The API used in this article depends on ES33en-ES34en, so if you are building with Maven, add:


<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>0.10.2.0</version>
</dependency>

For gradle, add:


compile group: 'org.apache.kafka', name: 'kafka-clients', version: '0.10.2.0'

The underlying framework


/**
   *  Send the request master method 
   * @param host      The target broker The host name 
   * @param port      The target broker The port of 
   * @param request     The request object 
   * @param apiKey     Request type 
   * @return        Serialized response
   * @throws IOException
   */
  public ByteBuffer send(String host, int port, AbstractRequest request, ApiKeys apiKey) throws IOException {
    Socket socket = connect(host, port);
    try {
      return send(request, apiKey, socket);
    } finally {
      socket.close();
    }
  }

  /**
   *  Send a serialized request and wait response return 
   * @param socket       Connect to the target broker the socket
   * @param request       The serialized request 
   * @return          Serialized response
   * @throws IOException
   */
  private byte[] issueRequestAndWaitForResponse(Socket socket, byte[] request) throws IOException {
    sendRequest(socket, request);
    return getResponse(socket);
  }

  /**
   *  Send a serialized request to socket
   * @param socket       Connect to the target broker the socket
   * @param request       The serialized request 
   * @throws IOException
   */
  private void sendRequest(Socket socket, byte[] request) throws IOException {
    DataOutputStream dos = new DataOutputStream(socket.getOutputStream());
    dos.writeInt(request.length);
    dos.write(request);
    dos.flush();
  }

  /**
   *  From a given socket obtain response
   * @param socket       Connect to the target broker the socket
   * @return          Serialized response
   * @throws IOException
   */
  private byte[] getResponse(Socket socket) throws IOException {
    DataInputStream dis = null;
    try {
      dis = new DataInputStream(socket.getInputStream());
      byte[] response = new byte[dis.readInt()];
      dis.readFully(response);
      return response;
    } finally {
      if (dis != null) {
        dis.close();
      }
    }
  }

  /**
   *  create Socket The connection 
   * @param hostName      The target broker The host name 
   * @param port        The target broker Service port ,  Such as 9092
   * @return          To create the Socket The connection 
   * @throws IOException
   */
  private Socket connect(String hostName, int port) throws IOException {
    return new Socket(hostName, port);
  }

  /**
   *  To a given socket Send the request 
   * @param request     The request object 
   * @param apiKey     Request type ,  What kind of request 
   * @param socket     Connect to the target broker the socket
   * @return        Serialized response
   * @throws IOException
   */
  private ByteBuffer send(AbstractRequest request, ApiKeys apiKey, Socket socket) throws IOException {
    RequestHeader header = new RequestHeader(apiKey.id, request.version(), "client-id", 0);
    ByteBuffer buffer = ByteBuffer.allocate(header.sizeOf() + request.sizeOf());
    header.writeTo(buffer);
    request.writeTo(buffer);
    byte[] serializedRequest = buffer.array();
    byte[] response = issueRequestAndWaitForResponse(socket, serializedRequest);
    ByteBuffer responseBuffer = ByteBuffer.wrap(response);
    ResponseHeader.parse(responseBuffer);
    return responseBuffer;
  }

With these methods in place, we can create specific requests.

Create topic


/**
   *  create topic
   *  Since it's just sample code, something is hard-coded into the program ( Such as hostname and port ) , you can modify by yourself 
   * @param topicName       topic The name 
   * @param partitions       Partition number 
   * @param replicationFactor    replications 
   * @throws IOException
   */
  public void createTopics(String topicName, int partitions, short replicationFactor) throws IOException {
    Map<String, CreateTopicsRequest.TopicDetails> topics = new HashMap<>();
    //  Multiple elements can be created simultaneously by inserting more than one element topic
    topics.put(topicName, new CreateTopicsRequest.TopicDetails(partitions, replicationFactor));
    int creationTimeoutMs = 60000;
    CreateTopicsRequest request = new CreateTopicsRequest.Builder(topics, creationTimeoutMs).build();
    ByteBuffer response = send("localhost", 9092, request, ApiKeys.CREATE_TOPICS);
    CreateTopicsResponse.parse(response, request.version());
  }

Check the displacement


/**
   *  To obtain a consumer group Under a certain topic Zonal displacement 
   * @param groupID      group id
   * @param topic       topic The name 
   * @param parititon      Partition number 
   * @throws IOException
   */
  public void getOffsetForPartition(String groupID, String topic, int parititon) throws IOException {
    TopicPartition tp = new TopicPartition(topic, parititon);
    OffsetFetchRequest request = new OffsetFetchRequest.Builder(groupID, singletonList(tp))
        .setVersion((short)2).build();
    ByteBuffer response = send("localhost", 9092, request, ApiKeys.OFFSET_FETCH);
    OffsetFetchResponse resp = OffsetFetchResponse.parse(response, request.version());
    OffsetFetchResponse.PartitionData partitionData = resp.responseData().get(tp);
    System.out.println(partitionData.offset);
  }

/**
   *  To obtain a consumer group Under all topic Displacement information of the partition 
   * @param groupID      group id
   * @return         (topic partition  -->  Partition information ) the map
   * @throws IOException
   */
  public Map<TopicPartition, OffsetFetchResponse.PartitionData> getAllOffsetsForGroup(String groupID) throws IOException {
    OffsetFetchRequest request = new OffsetFetchRequest.Builder(groupID, null).setVersion((short)2).build();
    ByteBuffer response = send("localhost", 9092, request, ApiKeys.OFFSET_FETCH);
    OffsetFetchResponse resp = OffsetFetchResponse.parse(response, request.version());
    return resp.responseData();
  }

okay, above is the sample code for "Create topic" and "View displacement", you can use these two examples to build other types of requests.


Related articles: