Python和Java之间相互调用是怎么做到的?
需求使用背景, 业务系统自定义了一套数据处理语言,支持sql,elasticsearch查询数据,然后经过一系列自定义处理,自定义处理支持执行python脚本,最终返回给调用端。具体执行流程,java服务执行页面输入的python语句,python语句调用java方法得到结果,python进行自定义处理后再返回给java服务,java服务最终返回具体的结果给客户端。
语法示例 其中, | dbquery , | python 为自定义语法示例,最终是通过java执行返回结果。
| dbquery sql="select id,num,create_time from security_log"
| python
import sys
from py4j.java_gateway import JavaGateway, GatewayParameters
gateway = JavaGateway(gateway_parameters=GatewayParameters(port=int(sys.argv)))
SESSION_ID = sys.argv
# 获取java 实例类
entry= gateway.entry_point
# 调用 java具体方法,java返回具体的json
preDataJson = entry.getPreDateJson(SESSION_ID)
# 以下开始处理java返回的结果json
import pandas as pd
df = pd.read_json(preDataJson)
rtn_data = df[:2].to_json(orient='records')
# 处理完成的rtn_data 再设置给java 实例
entry.setDatasetJson(SESSION_ID,rtn_data )
首先java服务中引用依赖。
<dependency>
<groupId>net.sf.py4j</groupId>
<artifactId>py4j</artifactId>
<version>0.10.7</version>
</dependency>
首先java端定义使用pythonServer网关入口。
public class PythonServer {
/**
* python 网关服务
*/
private GatewayServer gatewayServer;
/**
* python 服务启动端口
*/
private int port = GatewayServer.DEFAULT_PORT;
/**
* 缓存当前几个 python reader
*/
private Map<String, PythonComd> pythonComdMap = new ConcurrentHashMap<>();
public PythonServer() {
for (int i = 0; i < 10; i++) {
if (gatewayServer == null) {
try {
gatewayServer = new GatewayServer(new EntryPoint(), port);
gatewayServer.start();
log.info("Python 网关启动成功,端口:" + port);
} catch (Exception e) {
log.warn("Python 网关启动失败,端口:" + port);
port += 2;//每次加2
log.warn("Python 网关尝试下一个启动端口:" + port);
}
}
}
if (gatewayServer == null) {
log.error("Python 网关启动失败 10 次,不再进行重试,终止启动。");
System.exit(-1);
}
}
/**
* 打开一个 java to python 的会话,实际只是 put 一个识别自身的 uuid
* @param uuid
* @param pythonComd
*/
public void openSession(String uuid, PythonComd pythonComd) {
pythonComdMap.put(uuid, pythonComd);
}
/**
* 关闭一个 java to python 的会话
* @param uuid
*/
public void closeSession(String uuid) {
pythonComdMap.remove(uuid);
}
/**
* 用于 python 与 java GatewayServer 通信的端口号
* @return
*/
public int getPort() {
return port;
}
/**
* 当 java 进程关闭时,结束 python 网关。
*/
@PreDestroy
public void shutdown() {
gatewayServer.shutdown();
}
class EntryPoint {
/**
* 可以传递 json 给 python,但限制最大加载条数为 ENGINE_QUERY_LIMIT
* 优点:结构化数据,支持嵌套数组。缺点:空数组时没有字段名。
* @param uuid
* @return
*/
public String getPreDataJson(String uuid) {
PythonComd pythonComd = pythonComdMap.get(uuid);
if (pythonComd == null) {
return null;
}
Dataset<Row> preDataset = pythonComd.getPreDateset();
if (preDataset == null) {
return null;
}
try {
SparkContext sparkContext = preDataset.sparkSession().sparkContext();
PythonConf conf = pythonComd.getConf();
String taskId = conf.getTaskId();
String description = AppConfig.ENGINE_ADDR + "/search?taskId=" + taskId;// 仅用于 spark web ui 显示,暂时没有扩展此接口。
sparkContext.setJobGroup(uuid, description, true);
sparkContext.setLocalProperty(CallSite.SHORT_FORM(), "Python 进程获取 JSON 格式结果集。");
List<Map<String, Object>> list = DatasetUtil.datasetToList(preDataset, AppConfig.ENGINE_QUERY_LIMIT, taskId, "python 进程获取 json 格式结果集。");
return JsonUtil.objToJson(list);
} catch (GplException e) {
log.error(e.getMessage(), e);//2021-02-04,暂时没想明白 python 拉取 java 时,异常传递给前端
}
return null;
}
/**
* 接收 python 返回的 json。
* @param uuid
* @param json
*/
public void setDataJson(String uuid, String json) {
PythonComd pythonComd = pythonComdMap.get(uuid);
if (pythonComd == null) {
return;
}
if (json == null || json.isEmpty()) {
PythonConf conf = pythonComd.getConf();
log.error("任务 id:" + conf.getTaskId() + "," + conf.getComdName() + " 命令,Python 进程返回的 JSON 数据为空!");
return;
}
List<String> list = new ArrayList<>();
list.add(json);
SparkSession sparkSession = BeanFactory.getSparkSession();
Dataset<String> datasetTmp = sparkSession.createDataset(list, Encoders.STRING());
Dataset<Row> dataset = sparkSession.read().json(datasetTmp);
pythonComd.setReturnResult(dataset);
}
}
}
java端执行python代码。
public class PythonComd extends BaseComd<PythonConf> {
/**
* 单例,用于和 python 进程通信。
*/
@Resource
private PythonServer pythonServer;
/**
* python 进程执行结束后,回调 pythonComd.setReturnResult(dataset);
* execSparkDataset() 方法,返回 this.returnResult;
*/
@Setter
private Dataset<Row> returnResult;
/**
* 输出流字符串。
*/
private StringBuilder outputStringBuilder = new StringBuilder();
/**
* 错误流字符串。
*/
private StringBuilder errorStringBuilder = new StringBuilder();
/**
* 构造方法,传入配置类。
* @param conf
*/
public PythonComd(PythonConf conf) {
super(conf);
}
/**
* 以命令行的方式,执行 python 进程。
* @param preDataset 上一条命令生成的结果集对象
* @return
* @throws GplException
*/
@Override
public Dataset<Row> execSparkDataset(Dataset<Row> preDataset) throws GplException {
if (preDataset == null) {
throw new GplException("python 命令,未找到结果集!");
}
String script = conf.getExpression();
String fileName = conf.getFilename();
if (StringUtils.isEmpty(script) && StringUtils.isEmpty(fileName)) {
throw new GplException("python 命令,脚本为空!");
}
if (StringUtils.isEmpty(script) && !StringUtils.isEmpty(fileName)) {//如果是执行py文件
File pyFile = new File(NConst.PYTHON_PATH + fileName);
if (!pyFile.exists()) {
throw new GplException("python 命令,GPL 的 python 目录找不到脚本文件:" + fileName);
}
}
if (!StringUtils.isEmpty(AppConfig.PYTHON_ENV)) {
File file = new File(AppConfig.PYTHON_ENV);
if (!file.exists()) {
throw new GplException("python 命令,自定义环境变量,路径不存在:" + AppConfig.PYTHON_ENV);
}
}
String uuid = conf.getUuid();
pythonServer.openSession(uuid, this);
String containerName = uuid;
try {
String scriptPath = null;
if (!StringUtils.isEmpty(script)) {//优先执行脚本,如果没有脚本语句赐执行文件
String md5 = DigestUtils.md5Hex(script);
String tmpName = md5 + ".py";
scriptPath = NConst.PYTHON_PATH + tmpName;
File tmpFile = new File(scriptPath);//临时文件不再结束调用后删除,以后完善成定期清理
FileUtils.write(tmpFile, script, NConst.CHARSET_UTF_8);//把脚本写成临时文件
} else {
scriptPath = NConst.PYTHON_PATH + fileName;
}
List<String> command = new ArrayList<>();
if (!StringUtils.isEmpty(conf.getDocker())) {
// docker run --name containerName -v /home/nyx:/home/nyx --net host python:slim python /home/nyx/test.py 25333 uuid
command.add("docker");
command.add("run");
command.add("--name");
command.add(containerName);//容器名
command.add("-e");
command.add("TZ=Asia/Shanghai");//设置时区
command.add("-v");
command.add(NConst.PYTHON_PATH + ":" + NConst.PYTHON_PATH);// 绑定挂载目录
command.add("--net");
command.add("host");
command.add(conf.getDocker());// docker 镜像名
}
if (StringUtils.isEmpty(AppConfig.PYTHON_ENV)) {
command.add("python");
} else {
command.add(AppConfig.PYTHON_ENV);
}
command.add(scriptPath);//py文件路径
command.add(String.valueOf(pythonServer.getPort()));//端口号
command.add(uuid);
String cmd = String.join(" ", command);
log.info("任务 id:" + conf.getTaskId() + ",python 执行命令:" + cmd);
ProcessBuilder processBuilder = new ProcessBuilder(command);
Process process = processBuilder.start();
this.printProcessStream(process);
int timeout = conf.getTimeout();
log.info("任务 id:" + conf.getTaskId() + ",python 脚本执行超时参数为:" + timeout);
if (timeout > 0) {
boolean isExited = process.waitFor(timeout, TimeUnit.MILLISECONDS);//脚本执行超时退出
if (!isExited) {//如果命令行子进程没有退出,销毁子进程。
process.destroy();
}
} else {
process.waitFor();
}
this.clearTmpFile();//最后执行一下清理目录
} catch (Exception e) {
log.error(e.getMessage(), e);
} finally {
if (!StringUtils.isEmpty(conf.getDocker())) {
try {
Process process = new ProcessBuilder("docker", "rm", "-f", containerName).start();
this.printProcessStream(process);
process.waitFor();
log.info("任务 id:" + conf.getTaskId() + ",docker 执行 python 结束!删除容器:" + containerName);
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
pythonServer.closeSession(uuid);
}
if (errorStringBuilder.length() > 0) {
String error = errorStringBuilder.toString();
throw new GplException("python 命令,脚本执行异常!", error);
}
if (outputStringBuilder.length() > 0) {
String input = outputStringBuilder.toString();
comdTask.addWarnMsg("python 命令,控制台打印:" + input);
}
if (returnResult == null) {
throw new RuntimeException("python 命令,脚本执行结束,脚本返回结果集为空!");
}
// 此处不要计算 count(),会消耗掉几十秒,直接打印一下字段名就行了。
Map<String, String> fieldMap = new HashMap<>();
StructType structType = returnResult.schema();
StructField[] structFieldArray = structType.fields();
for (StructField structField : structFieldArray) {
fieldMap.put(structField.name(), structField.dataType().simpleString());
}
log.info("任务 id:" + conf.getTaskId() + ",python 命令,返回结果集字段:" + fieldMap.toString());
return this.returnResult;
}
/**
* 清理生成的临时文件
*/
private void clearTmpFile() {
File pyPath = new File(NConst.PYTHON_PATH);
File[] fileArray = pyPath.listFiles(f -> {
return f.getName().length() == 35//文件名长度 35 的是临时文件
&& System.currentTimeMillis() - f.lastModified() > 86400000;//删除大于 1 天的文件
});
for (File file : fileArray) {
file.delete();
}
}
/**
* 启动两个后台线程,用于接收 python 进程的输出流。
* @param process
*/
private void printProcessStream(Process process) {
Thread inputThread = new Thread(() -> {
try (BufferedReader inputReader = new BufferedReader(new InputStreamReader(process.getInputStream()))) {
String input = null;
while ((input = inputReader.readLine()) != null) {
outputStringBuilder.append("\n");
outputStringBuilder.append(input);
}
if (outputStringBuilder.length() > 0) {
log.info("任务 id:" + conf.getTaskId() + ",python print(): " + outputStringBuilder.toString());
}
} catch (Exception e) {
log.error(e.getMessage(), e);
}
});
inputThread.setName("gpl-py-input-stream-task-id=" + conf.getTaskId());
inputThread.setDaemon(true);
inputThread.start();
Thread errorThread = new Thread(() -> {
try (BufferedReader errorReader = new BufferedReader(new InputStreamReader(process.getErrorStream()))) {
String error = null;
while ((error = errorReader.readLine()) != null) {
errorStringBuilder.append("\n");
errorStringBuilder.append(error);
}
if (errorStringBuilder.length() > 0) {
log.error("任务 id:" + conf.getTaskId() + ",python error: " + errorStringBuilder.toString());
}
} catch (Exception e) {
log.error(e.getMessage(), e);
}
});
errorThread.setName("py-error-stream-task-id=" + conf.getTaskId());
errorThread.setDaemon(true);
errorThread.start();
}
}
有用
页:
[1]