记录下银行对账中使用CountDownLatch的场景
题目
使用线程池(核心线程20个)模拟存款、取款(金额不足时,取款暂停,等到余额足够时再取(重新放回线程池),若存款结束,金额仍然不够,则终止取款)。同时存款/取款各1000次,每次由一个线程去处理,每次金额在10~100之间
解决方案
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141
| package com.example.springbootdemo.demos.demo;
import java.util.Random; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger;
class BankAccount1 { private volatile int balance; public BankAccount1(int initialBalance) { this.balance = initialBalance; } public synchronized void deposit(int amount) { balance += amount; }
public synchronized boolean withdraw(int amount, boolean ignoreBalance) { if (ignoreBalance || balance >= amount) { balance -= amount; return true; } return false; }
public int getBalance() { return balance; } public boolean reconcile(int initial, int deposits, int withdrawals) { return initial + deposits - withdrawals == balance; } }
public class AdvancedBankSystem1 {
private static final int INITIAL_BALANCE = 1000; private static final int TOTAL_OPERATIONS = 1000; private static final int MAX_RETRIES = 3;
public static void main(String[] args) throws InterruptedException { BankAccount1 account = new BankAccount1(INITIAL_BALANCE); Random rand = new Random();
ExecutorService executor = Executors.newFixedThreadPool(20);
CountDownLatch latch = new CountDownLatch(2 * TOTAL_OPERATIONS);
AtomicInteger totalDeposits = new AtomicInteger(0); AtomicInteger totalWithdrawals = new AtomicInteger(0);
BlockingQueue<Runnable> pendingWithdrawals = new LinkedBlockingQueue<>();
for (int i = 0; i < TOTAL_OPERATIONS; i++) { final int amount = rand.nextInt(91) + 10; executor.submit(() -> { account.deposit(amount); totalDeposits.addAndGet(amount); latch.countDown(); }); }
for (int i = 0; i < TOTAL_OPERATIONS; i++) { final int amount = rand.nextInt(91) + 10; pendingWithdrawals.add(new WithdrawTask( account, amount, latch, totalWithdrawals, pendingWithdrawals, MAX_RETRIES )); }
for (int i = 0; i < TOTAL_OPERATIONS; i++) { executor.submit(pendingWithdrawals.take()); }
while (!pendingWithdrawals.isEmpty()) { Runnable task = pendingWithdrawals.poll(); if (task != null) { executor.submit(task); } }
latch.await(); executor.shutdown();
System.out.println("最终余额: " + account.getBalance()); System.out.println("初始余额: " + INITIAL_BALANCE); System.out.println("总存款额: " + totalDeposits.get()); System.out.println("总取款额: " + totalWithdrawals.get());
boolean result = account.reconcile(INITIAL_BALANCE, totalDeposits.get(), totalWithdrawals.get()); System.out.println("对账完毕,结果:" + result); }
static class WithdrawTask implements Runnable { private final BankAccount1 account; private final int amount; private final CountDownLatch latch; private final AtomicInteger totalWithdrawals; private final BlockingQueue<Runnable> queue; private final int maxRetries; private int retryCount = 0;
public WithdrawTask(BankAccount1 account, int amount, CountDownLatch latch, AtomicInteger totalWithdrawals, BlockingQueue<Runnable> queue, int maxRetries) { this.account = account; this.amount = amount; this.latch = latch; this.totalWithdrawals = totalWithdrawals; this.queue = queue; this.maxRetries = maxRetries; }
@Override public void run() { boolean success = account.withdraw(amount, false); if (!success && retryCount < maxRetries) { retryCount++; try { queue.put(this); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } else if (success) { totalWithdrawals.addAndGet(amount); } latch.countDown(); } } }
|
主要学的是CountDownLatch的使用方式,以及需要重试的时候,应该先将任务放入队列中,然后再提交到线程池执行,线程池本身是不支持重试的,异步的话,需要将任务放在mq中,由mq来将任务提交到线程池中这样不仅异步还能重试