当前位置:网站首页>[Flink source code practice (I)] add a rest API to Flink

[Flink source code practice (I)] add a rest API to Flink

2022-06-24 11:02:00 Pipi bear

This paper refers to flink committer tison The article , be based on flink 1.13 Version source code change implementation .

One 、 summary

https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/rest_api/

Flink The government has realized a large number of REST API Interface , Available for Flink UI Display data 、 Also used for respective monitoring panels . these REST API Of webserver As JobManager Part of is running . The default port is 8081, Can pass flink-conf.yaml Of rest.port Parameters to configure .

There are more than one JobManager Under the circumstances (HA scenario ), Every JobManager Will run its own REST API example , And selected by leader Of JobManager Instances provide information about completed and running jobs .

Two 、 Development of guidelines

REST API be located flink-runtime Under the project , The core to realize org.apache.flink.runtime.webmonitor.WebMonitorEndpoint ( because Flink In the early REST API Are used for monitoring , So the name is WebMonitorEndpoint. Now its work function also includes some non monitoring scenarios such as task start and stop ), It is mainly responsible for server Implement and request routing .

image.png

( The main :2 individual pierre package It is customized by the author below REST API The place of )

Of course Flink REST API The implementation is based on Netty and Netty Router , Because the implementation is light , So the performance is quite good .

And the whole REST API These four modules are required :

image.png

3、 ... and 、 Develop your own REST API!

0、 Design and planning

1) demand

towards http link http://${jobmaster-host}:8081/pierre/foo launch get request , Return to one json strand {"response":"bar"}

2) Achieve planning

When we want to add a new REST API When , We need at least :

  • Achieve one MessageHeaders, As an interface for new requests
  • Achieve one ResponseBody, As a result of Body
  • Achieve one AbstractRestHandler, According to the added MessageHeaders Class handles requests
  • take handler Sign up to org.apache.flink.runtime.webmonitor.WebMonitorEndpoint#initializeHandlers()

1、 Realization MessageHeaders

package org.apache.flink.runtime.rest.messages.pierre;

import org.apache.flink.runtime.rest.HttpMethodWrapper;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;

public class FooHeaders
        implements MessageHeaders<EmptyRequestBody, BarResponseBody, EmptyMessageParameters> {

    //  Single instance mode 
    private static final FooHeaders INSTANCE = new FooHeaders();

    public static FooHeaders getInstance() {
        return INSTANCE;
    }

    @Override
    public Class<BarResponseBody> getResponseClass() {
        return BarResponseBody.class;
    }

    @Override
    public HttpResponseStatus HttpResponseStatus() {
        return HttpResponseStatus.OK;
    }

    @Override
    public String getDescription() {
        return "pierre foobar service";
    }

    @Override
    public Class<EmptyRequestBody> getRequestClass() {
        return EmptyRequestBody.class;
    }

    //  analysis url The parameters inside 
    @Override
    public EmptyMessageParameters getUnresolvedMessageParameters() {
        return EmptyMessageParameters.getInstance();
    }

    @Override
    public HttpMethodWrapper getHttpMethod() {
        return HttpMethodWrapper.GET;
    }

	// URL Routing information 
    @Override
    public String getTargetRestEndpointURL() {
        return "/pierre/foo";
    }
}

Note here :

  • Must be single instance mode
  • HttpResponseStatusgetResponseClass And so on return null, Otherwise there will be a NullPointerException
    image.png

2、 Realization ResponseBody

package org.apache.flink.runtime.rest.messages.pierre;

import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;

public class BarResponseBody implements ResponseBody {
    private static final String FIELD_BAR = "response";

    @JsonProperty(FIELD_BAR)
    public final String response = "bar";

    private static final BarResponseBody INSTANCE = new BarResponseBody();

    public static BarResponseBody getInstance() {
        return INSTANCE;
    }
}

It's used here jackson annotation , need import FLINK shaded Version of , To avoid conflict .

3、 Realization `AbstractRestHandler

package org.apache.flink.runtime.rest.handler.pierre;

import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.pierre.BarResponseBody;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;

import javax.annotation.Nonnull;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

public class FooHandler
        extends AbstractRestHandler<
                RestfulGateway, EmptyRequestBody, BarResponseBody, EmptyMessageParameters> {

    public FooHandler(
            GatewayRetriever<? extends RestfulGateway> leaderRetriever,
            Time timeout,
            Map<String, String> responseHeaders,
            MessageHeaders<EmptyRequestBody, BarResponseBody, EmptyMessageParameters>
                    messageHeaders) {
        super(leaderRetriever, timeout, responseHeaders, messageHeaders);
    }

    @Override
    protected CompletableFuture<BarResponseBody> handleRequest(
            @Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request,
            @Nonnull RestfulGateway gateway)
            throws RestHandlerException {
        return CompletableFuture.completedFuture(BarResponseBody.getInstance());
    }
}

4、 register handler

    //  Their own handler
    final FooHandler fooHandler =
                new FooHandler(leaderRetriever, timeout, responseHeaders, FooHeaders.getInstance());
    ……
	handlers.add(Tuple2.of(fooHandler.getMessageHeaders(), fooHandler));

5、 Compile the package

Initial renovation Flink Code , Not very familiar , The following steps are listed for your reference :

  • maven-checkstyle-plugin Of failOnViolation Set to false, Because some of our minor changes don't fully comply with flink Code engineering specification for . Of course, if you want to give Flink Official contribution code , It must conform to the specifications .
  • mvn spotless:apply It will automatically format the code
  • mvn clean package -DskipTests Into the long package in

It is estimated that ten minutes :flink-dist/target Directory to generate the latest executable file

image.png

6、 ... and 、 effect

  • Start a local loacl colony ./bin/start-cluster.sh
  • request http://${jobmaster-host}:8081/pierre/foo
image.png

Be accomplished , The first step to perfection !

More exciting :https://github.com/pierre94/flink-notes

原网站

版权声明
本文为[Pipi bear]所创,转载请带上原文链接,感谢
https://yzsam.com/2021/06/20210610140820962q.html