东莞电商建站,常州市住房和城乡建设局网站,软件项目管理软件,携程的网站建设项目规划书前言
最近突然冒出一个想法#xff1a;能不能用SpringBoot自己实现一个类似AWS Lambda或阿里云函数计算的执行引擎#xff1f;
说干就干#xff0c;于是从零开始设计了一套基于SpringBoot的Serverless执行框架。
这套框架支持函数动态加载、按需执行、资源隔离#xff0c;甚…前言
最近突然冒出一个想法能不能用SpringBoot自己实现一个类似AWS Lambda或阿里云函数计算的执行引擎
说干就干于是从零开始设计了一套基于SpringBoot的Serverless执行框架。
这套框架支持函数动态加载、按需执行、资源隔离甚至还实现了简单的冷启动优化。
今天分享给大家看看如何用SpringBoot的强大能力打造一个属于自己的Serverless引擎。
设计思路
核心特性
我们要实现的Serverless引擎包含以下特性
动态函数加载支持运行时加载新的函数代码
函数隔离执行每个函数在独立的上下文中运行
生命周期管理自动管理函数的创建、执行和销毁
资源限制控制函数的执行时间
函数调用支持HTTP、定时器等多种触发方式
监控统计记录函数执行次数、耗时、成功率等指标
架构设计
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Function API │ │ Event Trigger │ │ Management UI │
└─────────┬───────┘ └─────────┬───────┘ └─────────┬───────┘│ │ │└──────────────────────┼──────────────────────┘│┌────────────┴──────────────┐│ Serverless Engine │└────────────┬──────────────┘│┌────────────────────────┼──────────────────────┐│ │ │
┌───────▼───────┐ ┌───────────▼─────────┐ ┌───────▼───────┐
│ Function Pool │ │ Execution Manager │ │ Resource Pool │
└───────────────┘ └─────────────────────┘ └───────────────┘核心实现
项目结构
src/
├── main/
│ ├── java/
│ │ └── com/
│ │ └── example/
│ │ ├── ServerlessEngine.java
│ │ ├── core/
│ │ │ ├── FunctionManager.java
│ │ │ ├── ExecutionEngine.java
│ │ │ ├── ResourceManager.java
│ │ │ └── EventDispatcher.java
│ │ ├── model/
│ │ │ ├── ServerlessFunction.java
│ │ │ ├── ExecutionContext.java
│ │ │ ├── ExecutionResult.java
│ │ │ └── FunctionMetrics.java
│ │ ├── executor/
│ │ │ ├── FunctionExecutor.java
│ │ │ └── IsolatedClassLoader.java
│ │ ├── trigger/
│ │ │ ├── HttpTrigger.java
│ │ │ ├── TimerTrigger.java
│ │ │ └── EventTrigger.java
│ │ ├── api/
│ │ │ └── ServerlessController.java
│ └── resources/
│ ├── application.yml
│ └── functions/
│ ├── demo-function.jar
│ └── user-function.jar函数接口定义
package com.example.model;import java.util.Map;/*** Serverless函数接口* 所有用户函数都需要实现这个接口*/
FunctionalInterface
public interface ServerlessFunction {/*** 函数执行入口* param input 输入参数* param context 执行上下文* return 执行结果*/Object handle(MapString, Object input, ExecutionContext context) throws Exception;
}执行上下文
package com.example.model;import java.time.LocalDateTime;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;/*** 函数执行上下文*/
public class ExecutionContext {private String requestId;private String functionName;private String functionVersion;private LocalDateTime startTime;private long timeoutMs;private MapString, Object environment;private MapString, Object attributes;public ExecutionContext(String requestId, String functionName) {this.requestId requestId;this.functionName functionName;this.functionVersion 1.0;this.startTime LocalDateTime.now();this.timeoutMs 30000; // 默认30秒超时this.environment new ConcurrentHashMap();this.attributes new ConcurrentHashMap();}// 获取剩余执行时间public long getRemainingTimeMs() {long elapsed System.currentTimeMillis() - java.sql.Timestamp.valueOf(startTime).getTime();return Math.max(0, timeoutMs - elapsed);}Overridepublic String toString() {return ExecutionContext{ requestId requestId , functionName functionName , functionVersion functionVersion , startTime startTime , timeoutMs timeoutMs };}
}执行结果
package com.example.model;import java.time.LocalDateTime;/*** 函数执行结果*/
public class ExecutionResult {private String requestId;private String functionName;private boolean success;private Object result;private String errorMessage;private String errorType;private LocalDateTime startTime;private LocalDateTime endTime;private long executionTime;public ExecutionResult(String requestId, String functionName) {this.requestId requestId;this.functionName functionName;this.startTime LocalDateTime.now();}// 标记执行成功public void markSuccess(Object result) {this.success true;this.result result;this.endTime LocalDateTime.now();this.executionTime calculateExecutionTime();}// 标记执行失败public void markFailure(String errorType, String errorMessage) {this.success false;this.errorType errorType;this.errorMessage errorMessage;this.endTime LocalDateTime.now();this.executionTime calculateExecutionTime();}// 计算执行时间private long calculateExecutionTime() {if (startTime ! null endTime ! null) {return java.sql.Timestamp.valueOf(endTime).getTime() - java.sql.Timestamp.valueOf(startTime).getTime();}return 0;}// Getter和Setter方法省略Overridepublic String toString() {return ExecutionResult{ requestId requestId , functionName functionName , success success , executionTime executionTime };}
}函数指标统计
package com.example.model;import java.time.LocalDateTime;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;/*** 函数执行指标*/
public class FunctionMetrics {private String functionName;private AtomicLong invocationCount new AtomicLong(0);private AtomicLong successCount new AtomicLong(0);private AtomicLong errorCount new AtomicLong(0);private AtomicLong totalExecutionTime new AtomicLong(0);private AtomicLong minExecutionTime new AtomicLong(Long.MAX_VALUE);private AtomicLong maxExecutionTime new AtomicLong(0);private AtomicReferenceLocalDateTime lastInvocation new AtomicReference();private AtomicReferenceLocalDateTime createTime new AtomicReference(LocalDateTime.now());public FunctionMetrics(String functionName) {this.functionName functionName;}// 记录函数调用public void recordInvocation(ExecutionResult result) {invocationCount.incrementAndGet();lastInvocation.set(LocalDateTime.now());if (result.isSuccess()) {successCount.incrementAndGet();} else {errorCount.incrementAndGet();}long executionTime result.getExecutionTime();totalExecutionTime.addAndGet(executionTime);// 更新最小执行时间minExecutionTime.updateAndGet(current - Math.min(current, executionTime));// 更新最大执行时间maxExecutionTime.updateAndGet(current - Math.max(current, executionTime));}// 获取平均执行时间public double getAvgExecutionTime() {long count invocationCount.get();if (count 0) {return 0.0;}return (double) totalExecutionTime.get() / count;}// 获取成功率public double getSuccessRate() {long total invocationCount.get();if (total 0) {return 0.0;}return (double) successCount.get() / total * 100;}// 获取错误率public double getErrorRate() {return 100.0 - getSuccessRate();}Overridepublic String toString() {return FunctionMetrics{ functionName functionName , invocationCount invocationCount.get() , successCount successCount.get() , errorCount errorCount.get() , avgExecutionTime String.format(%.2f, getAvgExecutionTime()) , successRate String.format(%.2f, getSuccessRate()) % };}
}隔离类加载器
package com.example.executor;import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.HashMap;
import java.util.Map;/*** 隔离类加载器* 为每个函数提供独立的类加载环境*/
public class IsolatedClassLoader extends URLClassLoader {private final String functionName;private final MapString, Class? loadedClasses new HashMap();private final ClassLoader parentClassLoader;public IsolatedClassLoader(String functionName, URL[] urls, ClassLoader parent) {super(urls, parent);this.functionName functionName;this.parentClassLoader parent;}Overrideprotected Class? loadClass(String name, boolean resolve) throws ClassNotFoundException {// 检查是否已经加载过Class? loadedClass loadedClasses.get(name);if (loadedClass ! null) {return loadedClass;}// 对于Java系统类使用父类加载器if (name.startsWith(java.) || name.startsWith(javax.) || name.startsWith(sun.) || name.startsWith(com.sun.)) {return super.loadClass(name, resolve);}// 对于Spring相关类使用父类加载器if (name.startsWith(org.springframework.) || name.startsWith(org.apache.) ||name.startsWith(com.fasterxml.)) {return super.loadClass(name, resolve);}try {// 尝试自己加载类Class? clazz findClass(name);loadedClasses.put(name, clazz);if (resolve) {resolveClass(clazz);}return clazz;} catch (ClassNotFoundException e) {// 如果找不到使用父类加载器return super.loadClass(name, resolve);}}Overrideprotected Class? findClass(String name) throws ClassNotFoundException {try {String path name.replace(., /) .class;InputStream is getResourceAsStream(path);if (is null) {throw new ClassNotFoundException(name);}byte[] classData readClassData(is);return defineClass(name, classData, 0, classData.length);} catch (IOException e) {throw new ClassNotFoundException(name, e);}}private byte[] readClassData(InputStream is) throws IOException {ByteArrayOutputStream buffer new ByteArrayOutputStream();byte[] data new byte[1024];int bytesRead;while ((bytesRead is.read(data)) ! -1) {buffer.write(data, 0, bytesRead);}return buffer.toByteArray();}public String getFunctionName() {return functionName;}public int getLoadedClassCount() {return loadedClasses.size();}Overridepublic void close() throws IOException {loadedClasses.clear();super.close();}
}函数执行器
package com.example.executor;import com.example.model.ExecutionContext;
import com.example.model.ExecutionResult;
import com.example.model.ServerlessFunction;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.io.File;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.net.URL;
import java.util.Map;
import java.util.concurrent.*;/*** 函数执行器* 负责在隔离环境中执行函数*/
Component
Slf4j
public class FunctionExecutor {Autowiredprivate ClassLoaderPool classLoaderPool;private final ExecutorService executorService;public FunctionExecutor() {// 创建线程池用于执行函数this.executorService Executors.newCachedThreadPool(r - {Thread t new Thread(r);t.setName(function-executor- System.currentTimeMillis());t.setDaemon(true);return t;});}/*** 执行函数*/public ExecutionResult execute(String functionName, String jarPath, String className, MapString, Object input, ExecutionContext context) {ExecutionResult result new ExecutionResult(context.getRequestId(), functionName);FutureObject future executorService.submit(() - {// 从池中获取ClassLoader不需要每次创建IsolatedClassLoader classLoader classLoaderPool.getClassLoader(functionName, jarPath, className);// 加载函数类Class? functionClass classLoader.loadClass(className);Object functionInstance functionClass.getDeclaredConstructor().newInstance();// 检查是否实现了ServerlessFunction接口if (!(functionInstance instanceof ServerlessFunction)) {throw new IllegalArgumentException(Function class must implement ServerlessFunction interface);}ServerlessFunction function (ServerlessFunction) functionInstance;// 执行函数return function.handle(input, context);});try {// 等待执行结果支持超时Object functionResult future.get(context.getTimeoutMs(), TimeUnit.MILLISECONDS);result.markSuccess(functionResult);} catch (TimeoutException e) {future.cancel(true);result.markFailure(TIMEOUT, Function execution timeout);} catch (ExecutionException e) {Throwable cause e.getCause();log.error(cause.getMessage(),cause);result.markFailure(cause.getClass().getSimpleName(), cause.getMessage());} catch (Exception e) {result.markFailure(e.getClass().getSimpleName(), e.getMessage());}return result;}/*** 关闭执行器*/public void shutdown() {executorService.shutdown();try {if (!executorService.awaitTermination(30, TimeUnit.SECONDS)) {executorService.shutdownNow();}} catch (InterruptedException e) {executorService.shutdownNow();Thread.currentThread().interrupt();}}
}函数管理器
package com.example.core;import com.example.model.FunctionMetrics;
import org.springframework.stereotype.Component;import java.io.File;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;/*** 函数管理器* 负责函数的注册、查找、生命周期管理*/
Component
public class FunctionManager {// 函数注册表private final MapString, FunctionDefinition functions new ConcurrentHashMap();// 函数指标private final MapString, FunctionMetrics metrics new ConcurrentHashMap();/*** 函数定义*/public static class FunctionDefinition {private String name;private String description;private String jarPath;private String className;private long timeoutMs;private MapString, Object environment;private Date createTime;private Date updateTime;public FunctionDefinition(String name, String jarPath, String className) {this.name name;this.jarPath jarPath;this.className className;this.timeoutMs 30000; // 默认30秒this.environment new HashMap();this.createTime new Date();this.updateTime new Date();}// Getter和Setter方法public String getName() { return name; }public void setName(String name) { this.name name; }public String getDescription() { return description; }public void setDescription(String description) { this.description description; }public String getJarPath() { return jarPath; }public void setJarPath(String jarPath) { this.jarPath jarPath; }public String getClassName() { return className; }public void setClassName(String className) { this.className className; }public long getTimeoutMs() { return timeoutMs; }public void setTimeoutMs(long timeoutMs) { this.timeoutMs timeoutMs; }public MapString, Object getEnvironment() { return environment; }public void setEnvironment(MapString, Object environment) { this.environment environment; }public Date getCreateTime() { return createTime; }public Date getUpdateTime() { return updateTime; }public void setUpdateTime(Date updateTime) { this.updateTime updateTime; }}/*** 注册函数*/public void registerFunction(String name, String jarPath, String className) {// 验证jar文件是否存在File jarFile new File(jarPath);if (!jarFile.exists()) {throw new IllegalArgumentException(JAR file not found: jarPath);}FunctionDefinition definition new FunctionDefinition(name, jarPath, className);functions.put(name, definition);// 初始化指标metrics.put(name, new FunctionMetrics(name));System.out.println(Function registered: name - className);}/*** 注册函数带配置*/public void registerFunction(String name, String jarPath, String className, long timeoutMs, MapString, Object environment) {registerFunction(name, jarPath, className);FunctionDefinition definition functions.get(name);definition.setTimeoutMs(timeoutMs);if (environment ! null) {definition.setEnvironment(new HashMap(environment));}}/*** 获取函数定义*/public FunctionDefinition getFunction(String name) {return functions.get(name);}/*** 检查函数是否存在*/public boolean functionExists(String name) {return functions.containsKey(name);}/*** 获取所有函数名称*/public SetString getAllFunctionNames() {return new HashSet(functions.keySet());}/*** 获取所有函数定义*/public CollectionFunctionDefinition getAllFunctions() {return new ArrayList(functions.values());}/*** 更新函数*/public void updateFunction(String name, String jarPath, String className) {if (!functionExists(name)) {throw new IllegalArgumentException(Function not found: name);}FunctionDefinition definition functions.get(name);definition.setJarPath(jarPath);definition.setClassName(className);definition.setUpdateTime(new Date());System.out.println(Function updated: name);}/*** 删除函数*/public void removeFunction(String name) {if (functions.remove(name) ! null) {metrics.remove(name);System.out.println(Function removed: name);}}/*** 获取函数指标*/public FunctionMetrics getFunctionMetrics(String name) {return metrics.get(name);}/*** 获取所有函数指标*/public CollectionFunctionMetrics getAllMetrics() {return new ArrayList(metrics.values());}/*** 清理所有函数*/public void clear() {functions.clear();metrics.clear();}/*** 获取函数数量*/public int getFunctionCount() {return functions.size();}
}执行引擎
package com.example;import cn.hutool.core.io.FileUtil;
import com.example.core.FunctionManager;
import com.example.trigger.TimerTrigger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;import java.util.HashMap;
import java.util.Map;/*** Serverless引擎启动类*/
SpringBootApplication
EnableScheduling
public class ServerlessEngine implements CommandLineRunner {Autowiredprivate FunctionManager functionManager;Autowiredprivate TimerTrigger timerTrigger;public static void main(String[] args) {FileUtil.writeBytes(123.getBytes(),functions/function.txt);SpringApplication.run(ServerlessEngine.class, args);}Overridepublic void run(String... args) throws Exception {System.out.println( Serverless Engine Started );// 注册示例函数registerDemoFunctions();// 注册示例定时任务registerDemoTimerTasks();System.out.println( Demo Functions and Tasks Registered );System.out.println(API available at: http://localhost:8080/serverless);}/*** 注册演示函数*/private void registerDemoFunctions() {// 注册Hello World函数functionManager.registerFunction(hello-world,functions/demo-function.jar,com.example.functions.HelloWorldFunction);// 注册用户服务函数MapString, Object userEnv new HashMap();userEnv.put(DB_URL, jdbc:h2:mem:testdb);userEnv.put(MAX_USERS, 1000);functionManager.registerFunction(user-service,functions/user-function.jar,com.example.functions.UserServiceFunction,60000, // 60秒超时userEnv);}/*** 注册演示定时任务*/private void registerDemoTimerTasks() {// 注册清理任务timerTrigger.registerTimerTask(cleanup-task,user-service,0 0 2 * * ? // 每天凌晨2点执行);// 注册健康检查任务timerTrigger.registerTimerTask(health-check,hello-world,0/10 * * * * ? // 每10秒执行一次);}
}HTTP触发器
package com.example.trigger;import com.example.core.ExecutionEngine;
import com.example.model.ExecutionResult;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.servlet.http.HttpServletRequest;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;/*** HTTP触发器* 处理HTTP请求触发的函数调用*/
Component
public class HttpTrigger {Autowiredprivate ExecutionEngine executionEngine;/*** 处理HTTP请求*/public ExecutionResult handleRequest(String functionName, HttpServletRequest request, MapString, Object body) {// 构建输入参数MapString, Object input new HashMap();// 添加HTTP相关信息MapString, Object httpInfo new HashMap();httpInfo.put(method, request.getMethod());httpInfo.put(path, request.getRequestURI());httpInfo.put(queryString, request.getQueryString());httpInfo.put(remoteAddr, request.getRemoteAddr());httpInfo.put(userAgent, request.getHeader(User-Agent));// 添加请求头MapString, String headers new HashMap();EnumerationString headerNames request.getHeaderNames();if (headerNames ! null) {while (headerNames.hasMoreElements()) {String headerName headerNames.nextElement();headers.put(headerName, request.getHeader(headerName));}}httpInfo.put(headers, headers);// 添加查询参数MapString, String[] queryParams request.getParameterMap();MapString, Object params new HashMap();queryParams.forEach((key, values) - {if (values.length 1) {params.put(key, values[0]);} else {params.put(key, values);}});httpInfo.put(queryParams, params);input.put(http, httpInfo);// 添加请求体if (body ! null) {input.put(body, body);}// 调用函数return executionEngine.invoke(functionName, input);}/*** 简化的GET请求处理*/public ExecutionResult handleGetRequest(String functionName, HttpServletRequest request) {return handleRequest(functionName, request, null);}/*** 简化的POST请求处理*/public ExecutionResult handlePostRequest(String functionName, HttpServletRequest request, MapString, Object body) {return handleRequest(functionName, request, body);}
}定时触发器
package com.example.trigger;import com.example.core.ExecutionEngine;
import com.example.model.ExecutionResult;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;/*** 定时触发器* 支持cron表达式定时触发函数*/
Component
public class TimerTrigger {Autowiredprivate ExecutionEngine executionEngine;// 定时任务注册表private final MapString, TimerTask timerTasks new ConcurrentHashMap();/*** 定时任务定义*/public static class TimerTask {private String name;private String functionName;private String cronExpression;private boolean enabled;private LocalDateTime lastExecution;private LocalDateTime nextExecution;private long executionCount;public TimerTask(String name, String functionName, String cronExpression) {this.name name;this.functionName functionName;this.cronExpression cronExpression;this.enabled true;this.executionCount 0;}// Getter和Setter方法public String getName() { return name; }public String getFunctionName() { return functionName; }public String getCronExpression() { return cronExpression; }public boolean isEnabled() { return enabled; }public void setEnabled(boolean enabled) { this.enabled enabled; }public LocalDateTime getLastExecution() { return lastExecution; }public void setLastExecution(LocalDateTime lastExecution) { this.lastExecution lastExecution; }public LocalDateTime getNextExecution() { return nextExecution; }public void setNextExecution(LocalDateTime nextExecution) { this.nextExecution nextExecution; }public long getExecutionCount() { return executionCount; }public void incrementExecutionCount() { this.executionCount; }}/*** 注册定时任务*/public void registerTimerTask(String taskName, String functionName, String cronExpression) {TimerTask task new TimerTask(taskName, functionName, cronExpression);timerTasks.put(taskName, task);System.out.println(Timer task registered: taskName - functionName ( cronExpression ));}/*** 移除定时任务*/public void removeTimerTask(String taskName) {if (timerTasks.remove(taskName) ! null) {System.out.println(Timer task removed: taskName);}}/*** 启用/禁用定时任务*/public void setTimerTaskEnabled(String taskName, boolean enabled) {TimerTask task timerTasks.get(taskName);if (task ! null) {task.setEnabled(enabled);System.out.println(Timer task taskName (enabled ? enabled : disabled));}}/*** 获取所有定时任务*/public MapString, TimerTask getAllTimerTasks() {return new HashMap(timerTasks);}/*** 手动执行定时任务*/public ExecutionResult executeTimerTask(String taskName) {TimerTask task timerTasks.get(taskName);if (task null) {throw new IllegalArgumentException(Timer task not found: taskName);}return executeTask(task);}/*** 定时执行 - 每分钟检查一次*/Scheduled(fixedRate 60000) // 每分钟执行一次public void checkAndExecuteTimerTasks() {LocalDateTime now LocalDateTime.now();timerTasks.values().stream().filter(TimerTask::isEnabled).forEach(task - {// 这里简化处理实际应该解析cron表达式// 为了演示我们每5分钟执行一次if (task.getLastExecution() null || task.getLastExecution().isBefore(now.minusMinutes(5))) {executeTask(task);}});}/*** 执行定时任务*/private ExecutionResult executeTask(TimerTask task) {// 构建输入参数MapString, Object input new HashMap();MapString, Object timerInfo new HashMap();timerInfo.put(taskName, task.getName());timerInfo.put(cronExpression, task.getCronExpression());timerInfo.put(executionTime, LocalDateTime.now().toString());timerInfo.put(executionCount, task.getExecutionCount());input.put(timer, timerInfo);// 执行函数ExecutionResult result executionEngine.invoke(task.getFunctionName(), input);// 更新任务信息task.setLastExecution(LocalDateTime.now());task.incrementExecutionCount();System.out.println(Timer task executed: task.getName() - task.getFunctionName() , success: result.isSuccess());return result;}
}Serverless控制器
package com.example.api;import com.example.core.ExecutionEngine;
import com.example.core.FunctionManager;
import com.example.model.ExecutionResult;
import com.example.model.FunctionMetrics;
import com.example.trigger.HttpTrigger;
import com.example.trigger.TimerTrigger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;import javax.servlet.http.HttpServletRequest;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;/*** Serverless API控制器*/
RestController
RequestMapping(/serverless)
public class ServerlessController {Autowiredprivate FunctionManager functionManager;Autowiredprivate ExecutionEngine executionEngine;Autowiredprivate HttpTrigger httpTrigger;Autowiredprivate TimerTrigger timerTrigger;/*** 调用函数*/PostMapping(/functions/{functionName}/invoke)public ResponseEntityMapString, Object invokeFunction(PathVariable String functionName,RequestBody(required false) MapString, Object input,HttpServletRequest request) {ExecutionResult result httpTrigger.handlePostRequest(functionName, request, input);MapString, Object response new HashMap();response.put(requestId, result.getRequestId());response.put(functionName, result.getFunctionName());response.put(success, result.isSuccess());response.put(executionTime, result.getExecutionTime());response.put(memoryUsed, result.getMemoryUsed());if (result.isSuccess()) {response.put(result, result.getResult());} else {response.put(errorType, result.getErrorType());response.put(errorMessage, result.getErrorMessage());}return ResponseEntity.ok(response);}/*** GET方式调用函数*/GetMapping(/functions/{functionName}/invoke)public ResponseEntityMapString, Object invokeFunctionGet(PathVariable String functionName,HttpServletRequest request) {ExecutionResult result httpTrigger.handleGetRequest(functionName, request);MapString, Object response new HashMap();response.put(requestId, result.getRequestId());response.put(functionName, result.getFunctionName());response.put(success, result.isSuccess());response.put(executionTime, result.getExecutionTime());if (result.isSuccess()) {response.put(result, result.getResult());} else {response.put(errorType, result.getErrorType());response.put(errorMessage, result.getErrorMessage());}return ResponseEntity.ok(response);}/*** 注册函数*/PostMapping(/functions/{functionName})public ResponseEntityMapString, String registerFunction(PathVariable String functionName,RequestBody MapString, Object config) {String jarPath (String) config.get(jarPath);String className (String) config.get(className);Long timeoutMs config.containsKey(timeoutMs) ? ((Number) config.get(timeoutMs)).longValue() : 30000L;Long maxMemory config.containsKey(maxMemory) ? ((Number) config.get(maxMemory)).longValue() : 128 * 1024 * 1024L;SuppressWarnings(unchecked)MapString, Object environment (MapString, Object) config.get(environment);functionManager.registerFunction(functionName, jarPath, className, timeoutMs, maxMemory, environment);MapString, String response new HashMap();response.put(message, Function registered successfully);response.put(functionName, functionName);return ResponseEntity.ok(response);}/*** 获取所有函数列表*/GetMapping(/functions)public ResponseEntityMapString, Object getAllFunctions() {CollectionFunctionManager.FunctionDefinition functions functionManager.getAllFunctions();MapString, Object response new HashMap();response.put(functions, functions);response.put(count, functions.size());return ResponseEntity.ok(response);}/*** 获取函数详情*/GetMapping(/functions/{functionName})public ResponseEntityFunctionManager.FunctionDefinition getFunctionDetail(PathVariable String functionName) {FunctionManager.FunctionDefinition function functionManager.getFunction(functionName);if (function null) {return ResponseEntity.notFound().build();}return ResponseEntity.ok(function);}/*** 删除函数*/DeleteMapping(/functions/{functionName})public ResponseEntityMapString, String deleteFunction(PathVariable String functionName) {functionManager.removeFunction(functionName);MapString, String response new HashMap();response.put(message, Function deleted successfully);response.put(functionName, functionName);return ResponseEntity.ok(response);}/*** 获取函数指标*/GetMapping(/functions/{functionName}/metrics)public ResponseEntityFunctionMetrics getFunctionMetrics(PathVariable String functionName) {FunctionMetrics metrics functionManager.getFunctionMetrics(functionName);if (metrics null) {return ResponseEntity.notFound().build();}return ResponseEntity.ok(metrics);}/*** 获取所有函数指标*/GetMapping(/metrics)public ResponseEntityMapString, Object getAllMetrics() {CollectionFunctionMetrics metrics functionManager.getAllMetrics();MapString, Object response new HashMap();response.put(metrics, metrics);response.put(count, metrics.size());return ResponseEntity.ok(response);}/*** 注册定时任务*/PostMapping(/timer-tasks/{taskName})public ResponseEntityMapString, String registerTimerTask(PathVariable String taskName,RequestBody MapString, String config) {String functionName config.get(functionName);String cronExpression config.get(cronExpression);timerTrigger.registerTimerTask(taskName, functionName, cronExpression);MapString, String response new HashMap();response.put(message, Timer task registered successfully);response.put(taskName, taskName);return ResponseEntity.ok(response);}/*** 获取所有定时任务*/GetMapping(/timer-tasks)public ResponseEntityMapString, Object getAllTimerTasks() {MapString, TimerTrigger.TimerTask tasks timerTrigger.getAllTimerTasks();MapString, Object response new HashMap();response.put(tasks, tasks);response.put(count, tasks.size());return ResponseEntity.ok(response);}/*** 手动执行定时任务*/PostMapping(/timer-tasks/{taskName}/execute)public ResponseEntityMapString, Object executeTimerTask(PathVariable String taskName) {ExecutionResult result timerTrigger.executeTimerTask(taskName);MapString, Object response new HashMap();response.put(requestId, result.getRequestId());response.put(success, result.isSuccess());response.put(executionTime, result.getExecutionTime());if (result.isSuccess()) {response.put(result, result.getResult());} else {response.put(errorType, result.getErrorType());response.put(errorMessage, result.getErrorMessage());}return ResponseEntity.ok(response);}/*** 系统状态*/GetMapping(/status)public ResponseEntityMapString, Object getSystemStatus() {MapString, Object status new HashMap();// 系统信息Runtime runtime Runtime.getRuntime();status.put(totalMemory, runtime.totalMemory());status.put(freeMemory, runtime.freeMemory());status.put(usedMemory, runtime.totalMemory() - runtime.freeMemory());status.put(maxMemory, runtime.maxMemory());status.put(availableProcessors, runtime.availableProcessors());// 函数统计status.put(functionCount, functionManager.getFunctionCount());status.put(timerTaskCount, timerTrigger.getAllTimerTasks().size());// 总执行次数long totalInvocations functionManager.getAllMetrics().stream().mapToLong(FunctionMetrics::getInvocationCount).sum();status.put(totalInvocations, totalInvocations);return ResponseEntity.ok(status);}
}主启动类
package com.example;import cn.hutool.core.io.FileUtil;
import com.example.core.FunctionManager;
import com.example.trigger.TimerTrigger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;import java.util.HashMap;
import java.util.Map;/*** Serverless引擎启动类*/
SpringBootApplication
EnableScheduling
public class ServerlessEngine implements CommandLineRunner {Autowiredprivate FunctionManager functionManager;Autowiredprivate TimerTrigger timerTrigger;public static void main(String[] args) {FileUtil.writeBytes(123.getBytes(),functions/function.txt);SpringApplication.run(ServerlessEngine.class, args);}Overridepublic void run(String... args) throws Exception {System.out.println( Serverless Engine Started );// 注册示例函数registerDemoFunctions();// 注册示例定时任务registerDemoTimerTasks();System.out.println( Demo Functions and Tasks Registered );System.out.println(API available at: http://localhost:8080/serverless);}/*** 注册演示函数*/private void registerDemoFunctions() {// 注册Hello World函数functionManager.registerFunction(hello-world,functions/demo-function.jar,com.example.functions.HelloWorldFunction);// 注册用户服务函数MapString, Object userEnv new HashMap();userEnv.put(DB_URL, jdbc:h2:mem:testdb);userEnv.put(MAX_USERS, 1000);functionManager.registerFunction(user-service,functions/user-function.jar,com.example.functions.UserServiceFunction,60000, // 60秒超时userEnv);}/*** 注册演示定时任务*/private void registerDemoTimerTasks() {// 注册清理任务timerTrigger.registerTimerTask(cleanup-task,user-service,0 0 2 * * ? // 每天凌晨2点执行);// 注册健康检查任务timerTrigger.registerTimerTask(health-check,hello-world,0/10 * * * * ? // 每10秒执行一次);}
}配置文件
# application.yml
server:port: 8080spring:application:name: serverless-enginejackson:date-format: yyyy-MM-dd HH:mm:sstime-zone: GMT8default-property-inclusion: non_null# Serverless引擎配置
serverless:function:# 函数存储目录function-dir: ./functions/# 默认超时时间毫秒default-timeout: 30000# 最大并发执行数max-concurrent-executions: 100executor:# 核心线程数core-pool-size: 10# 最大线程数max-pool-size: 50# 线程存活时间秒keep-alive-time: 60# 队列容量queue-capacity: 1000logging:level:com.example: DEBUGpattern:console: %d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%nmanagement:endpoints:web:exposure:include: health,info,metrics,envendpoint:health:show-details: always示例函数
Hello World函数
package com.example.functions;import com.example.model.ExecutionContext;
import com.example.model.ServerlessFunction;import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Map;/*** Hello World示例函数*/
public class HelloWorldFunction implements ServerlessFunction {Overridepublic Object handle(MapString, Object input, ExecutionContext context) throws Exception {MapString, Object result new HashMap();result.put(message, Hello from Serverless Engine!);result.put(timestamp, LocalDateTime.now().toString());result.put(requestId, context.getRequestId());result.put(functionName, context.getFunctionName());result.put(input, input);// 模拟一些处理时间Thread.sleep(100);return result;}
}用户服务函数
package com.example.functions;import com.example.model.ExecutionContext;
import com.example.model.ServerlessFunction;import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;/*** 用户服务示例函数*/
public class UserServiceFunction implements ServerlessFunction {// 模拟用户存储private static final MapLong, MapString, Object users new ConcurrentHashMap();private static final AtomicLong idGenerator new AtomicLong(1);static {// 初始化一些测试数据MapString, Object user1 new HashMap();user1.put(id, 1L);user1.put(name, John Doe);user1.put(email, johnexample.com);users.put(1L, user1);MapString, Object user2 new HashMap();user2.put(id, 2L);user2.put(name, Jane Smith);user2.put(email, janeexample.com);users.put(2L, user2);idGenerator.set(3);}Overridepublic Object handle(MapString, Object input, ExecutionContext context) throws Exception {String action (String) ((Map)input.get(body)).get(action);if (action null) {action list;}MapString, Object result new HashMap();switch (action.toLowerCase()) {case list:result.put(users, users.values());result.put(count, users.size());break;case get:Long userId Long.valueOf(input.get(userId).toString());MapString, Object user users.get(userId);if (user ! null) {result.put(user, user);} else {result.put(error, User not found);}break;case create:SuppressWarnings(unchecked)MapString, Object userData (MapString, Object) ((Map)input.get(body)).get(user);Long newId idGenerator.getAndIncrement();userData.put(id, newId);users.put(newId, userData);result.put(user, userData);result.put(message, User created successfully);break;case delete:Long deleteId Long.valueOf(input.get(userId).toString());MapString, Object deletedUser users.remove(deleteId);if (deletedUser ! null) {result.put(message, User deleted successfully);} else {result.put(error, User not found);}break;default:result.put(error, Unknown action: action);}result.put(action, action);result.put(timestamp, System.currentTimeMillis());return result;}
}功能测试
#!/bin/bash
# test-serverless-engine.shBASE_URLhttp://localhost:8080/serverlessecho Testing Serverless Engine # 1. 获取系统状态
echo 1. Getting system status...
curl -s ${BASE_URL}/status | jq .
echo# 2. 获取所有函数
echo 2. Getting all functions...
curl -s ${BASE_URL}/functions | jq .
echo# 3. 调用Hello World函数
echo 3. Invoking hello-world function...
curl -s -X POST ${BASE_URL}/functions/hello-world/invoke \-H Content-Type: application/json \-d {name: Serverless Test} | jq .
echo# 4. 调用用户服务函数 - 列出用户
echo 4. Invoking user-service function - list users...
curl -s -X POST ${BASE_URL}/functions/user-service/invoke \-H Content-Type: application/json \-d {action: list} | jq .
echo# 5. 调用用户服务函数 - 创建用户
echo 5. Invoking user-service function - create user...
curl -s -X POST ${BASE_URL}/functions/user-service/invoke \-H Content-Type: application/json \-d {action: create,user: {name: Bob Wilson,email: bobexample.com}} | jq .
echo# 6. 获取函数指标
echo 6. Getting function metrics...
curl -s ${BASE_URL}/metrics | jq .
echo# 7. 获取定时任务
echo 7. Getting timer tasks...
curl -s ${BASE_URL}/timer-tasks | jq .
echoecho Test Completed Maven配置
?xml version1.0 encodingUTF-8?
project xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersiongroupIdcom.example/groupIdartifactIdserverless-engine/artifactIdversion1.0.0/versionpackagingjar/packagingnameSpringBoot Serverless Engine/namedescriptionA serverless execution engine built with SpringBoot/descriptionparentgroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-parent/artifactIdversion2.7.18/versionrelativePath//parentpropertiesjava.version11/java.version/propertiesdependencies!-- Spring Boot Starter Web --dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependency!-- Spring Boot Starter Actuator --dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-actuator/artifactId/dependency!-- Jackson for JSON processing --dependencygroupIdcom.fasterxml.jackson.core/groupIdartifactIdjackson-databind/artifactId/dependency/dependencies/project总结
通过SpringBoot我们成功实现了一个功能完整的Serverless执行引擎。这个引擎具备了以下核心能力
核心特性
函数隔离每个函数在独立的类加载器中运行
生命周期管理自动管理函数的创建、执行和销毁
多种触发方式支持HTTP和定时器触发
监控统计完整的执行指标和性能统计
RESTful API完整的管理和调用接口
技术亮点
动态类加载使用自定义ClassLoader实现函数隔离
异步执行基于线程池的并发执行机制
资源控制支持超时和内存限制
指标收集实时统计函数执行情况
这套自研的Serverless引擎展示了SpringBoot强大的扩展能力不仅能快速构建业务应用还能打造底层基础设施。
希望这个实现能给大家一些启发在技术架构设计上有所帮助。