常见的Java并发编程框架有哪些?

  • Post category:Java

常见的Java并发编程框架有以下几个:

  1. Java并发包 – java.util.concurrent:Java标准库的一部分,提供了各种并发编程相关的工具类和接口,如线程池、阻塞队列、原子操作等,可以让开发人员在编写并发代码时更加轻松。

  2. Akka:一种基于Actor模型的高并发编程框架,能够轻松地构建分布式、高容错性的应用。通过利用Actor模型的特性,Akka可以提供高效的轻量级并发管理,还可以通过Akka Cluster将应用分布式部署。

  3. Netty:一种高性能的网络编程框架,可以轻松地构建网络通信应用,如服务器、客户端,还可以提供异步、事件驱动的编程模型,以及高效的网络IO操作支持。

  4. Disruptor:一种高性能、低延迟的并发框架,可以支持数百万并发事件的处理,适用于需要低延迟和高吞吐量的业务场景,例如金融交易系统等。

下面分别对以上几个框架进行使用攻略:

Java并发包

Java并发包主要提供了线程池、阻塞队列、原子操作等工具,以下是一个基于线程池的示例:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ThreadPoolDemo {

    public static void main(String[] args) {
        // 创建线程池
        ExecutorService executor = Executors.newFixedThreadPool(5);

        for (int i = 0; i < 10; i++) {
            // 提交任务
            executor.submit(new Task(i));
        }

        // 关闭线程池
        executor.shutdown();
    }

    static class Task implements Runnable {
        private int num;

        public Task(int num) {
            this.num = num;
        }

        @Override
        public void run() {
            System.out.println("Task " + num + " is running.");
        }
    }
}

上面代码中,通过Executors.newFixedThreadPool(5)创建了一个线程池,容量为5,然后通过executor.submit(new Task(i))来提交任务。最后通过executor.shutdown()关闭线程池。

Akka

Akka基于Actor模型,以下是一个简单的使用示例:

import akka.actor._

case object Message

class MyActor extends Actor {

  def receive = {
    case Message => println("Hello Akka!")
    case _ => println("Unknown message.")
  }
}

object AkkaDemo extends App{

  val system = ActorSystem("mySystem")

  val myActor = system.actorOf(Props[MyActor], "myActor")

  myActor ! Message

  // 关闭ActorSystem
  system.terminate()
}

上面代码中,首先定义了一个MyActor,然后通过ActorSystem创建Actor,并向Actor发送一个Message消息。最后通过system.terminate()关闭ActorSystem。

Netty

Netty封装了Java NIO,提供了更高效的网络编程解决方案。以下是一个简单的Echo Server示例:

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

public class EchoServer {

    private int port;

    public EchoServer(int port) {
        this.port = port;
    }

    public void run() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();

            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
             .childHandler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ch.pipeline().addLast(
                         new StringDecoder(),
                         new StringEncoder(),
                         new EchoServerHandler());
                 }
             })
             .option(ChannelOption.SO_BACKLOG, 128)
             .childOption(ChannelOption.SO_KEEPALIVE, true);

            ChannelFuture f = b.bind(port).sync();

            System.out.println("Echo Server started on port " + port);

            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        new EchoServer(8888).run();
    }
}

上面代码中,通过ServerBootstrap来设置服务端Channel,并且设置了childHandler来添加业务处理逻辑,最后通过f.channel().closeFuture().sync()来阻塞主线程,等待服务端关闭。

Disruptor

以下是一个基于Disruptor实现的简单消费者生产者模型的示例:

import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import java.nio.ByteBuffer;
import java.util.concurrent.ThreadFactory;

public class DisruptorDemo {

    public static void main(String[] args) throws InterruptedException {
        // 创建一个线程工厂
        ThreadFactory threadFactory = r -> new Thread(r, "disruptor-thread");

        // 创建一个事件类
        class Event {
            private ByteBuffer buffer = ByteBuffer.allocate(8);

            public ByteBuffer getBuffer() {
                return buffer;
            }
        }

        // 创建一个事件生产者
        class EventProducer {
            private RingBuffer<Event> ringBuffer;

            public EventProducer(RingBuffer<Event> ringBuffer) {
                this.ringBuffer = ringBuffer;
            }

            public void onData(ByteBuffer buffer) {
                long sequence = ringBuffer.next();
                try {
                    Event event = ringBuffer.get(sequence);
                    event.getBuffer().put(buffer.array());
                } finally {
                    ringBuffer.publish(sequence);
                }
            }
        }

        // 定义一个事件处理器
        class EventConsumer {
            private String name;

            public EventConsumer(String name) {
                this.name = name;
            }

            public void onEvent(Event event) {
                byte[] bytes = event.getBuffer().array();
                System.out.println(name + " consume " + bytes[0] + bytes[1]);
            }
        }

        // 创建一个Disruptor
        Disruptor<Event> disruptor = new Disruptor<>(Event::new, 1024, threadFactory);

        // 创建事件消费者
        EventConsumer consumer1 = new EventConsumer("consumer1");
        EventConsumer consumer2 = new EventConsumer("consumer2");

        // 注册事件消费者
        disruptor.handleEventsWith(consumer1, consumer2);

        // 启动Disruptor
        disruptor.start();

        // 获取RingBuffer
        RingBuffer<Event> ringBuffer = disruptor.getRingBuffer();

        // 创建事件生产者
        EventProducer producer = new EventProducer(ringBuffer);

        // 生产事件
        ByteBuffer buffer = ByteBuffer.allocate(8);
        for (byte i = 0; i < 10; i++) {
            buffer.clear();
            buffer.put(i);
            buffer.put((byte) (i + 1));
            producer.onData(buffer);
        }

        // 关闭Disruptor
        disruptor.shutdown();
    }
}

上面代码中,通过Disruptor.createSingleProducer方法创建了一个Disruptor,然后定义了一个Event类,并通过Disruptor.handleEventsWith方法注册了两个事件消费者。最后通过创建一个事件生产者,来生产事件,并通过disruptor.shutdown()关闭Disruptor。