Spring Integration
概念
消息通道 (Message Channel)
消息传递的管道,连接各个组件:
例如:
- 直接通道 (DirectChannel):同步,同一线程执行,发送者会阻塞直到消息被接收
- 队列通道 (QueueChannel):异步,基于队列实现
集成流 (Integration Flow)
定义消息从输入到输出的完整处理路径
网关 (Gateway)
集成流的入口,将数据传递给集成流。业务代码可以通过 Gateway 调用集成流
Transformer
将消息转为其他格式
file
将字符串转为大写写入文件
流程:
Gateway -> textInChannel -> upper case transformer -> fileWriteChannel -> file write handler
依赖
plugins {
id 'java'
id 'org.springframework.boot' version '3.3.10'
id 'io.spring.dependency-management' version '1.1.7'
}
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-integration'
implementation 'org.springframework.integration:spring-integration-file'
}
定义 Flow
@Configuration
public class FileWriterIntegrationConfig {
@Value("${file.path}")
private String filePath;
public FileWritingMessageHandler fileWriter() {
FileWritingMessageHandler handler =
new FileWritingMessageHandler(new File(dirPath));
handler.setExpectReply(false);
handler.setFileExistsMode(FileExistsMode.APPEND);
handler.setAppendNewLine(true);
handler.setExpectReply(false);
handler.setFileExistsMode(FileExistsMode.APPEND);
handler.setAppendNewLine(true);
handler.setFileNameGenerator(message -> "output.txt");
handler.setAutoCreateDirectory(true);
handler.setCharset("UTF-8");
return handler;
}
@Bean
public IntegrationFlow fileWriterFlow() {
return IntegrationFlow
.from(MessageChannels.direct("textInChannel"))
.<String, String>transform(t -> t.toUpperCase())
.channel(MessageChannels.direct("fileWriterChannel"))
.handle(fileWriter())
.get();
}
}
定义 Gateway
@MessagingGateway
public interface MyStringGateway {
@Gateway(requestChannel = "textInChannel")
void send(String message);
}
传入字符串到 Gateway
@SpringBootTest
class TestFile {
@Autowired
MyStringGateway myStringGateway;
@Test
void test() {
myStringGateway.send("hello");
myStringGateway.send("integration");
myStringGateway.send("hello world");
}
}
输出到 output.txt 文件
HELLO
INTEGRATION
HELLO WORLD
sftp
一个自动从远程 sftp 拉取文件的示例
依赖
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-integration'
implementation 'org.springframework.integration:spring-integration-sftp'
}
定义 session factory 。
@Bean
public SessionFactory<SftpClient.DirEntry> sftpSessionFactory() {
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUser(user);
factory.setPassword(password);
factory.setAllowUnknownKeys(true);
return factory;
}
生存环境建议用密钥,而不是密码,且关闭 allow unknown keys
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUser(user);
factory.setPrivateKey(new ByteArrayResource(privateKey.getBytes()));
factory.setPrivateKeyPassphrase(passPhrase);
factory.setAllowUnknownKeys(false);
Spring Boot 3.3 对应的 Apache MINA SSHD 支持 RSA ECDSA ,但需要引入 Bouncy Castle 才能支持 ED25519
FileWritingMessageHandler 负责写文件,即使没有手动创建,也有自动配置的
@Bean
public SftpInboundFileSynchronizer sftpInboundFileSynchronizer() {
SftpInboundFileSynchronizer synchronizer = new SftpInboundFileSynchronizer(sftpSessionFactory());
synchronizer.setDeleteRemoteFiles(false);
synchronizer.setRemoteDirectory(sftpRemoteDirectory);
synchronizer.setFilter(new SftpSimplePatternFileListFilter("*.txt"));
synchronizer.setDeleteRemoteFiles(true);
return synchronizer;
}
@Bean
public SftpInboundFileSynchronizingMessageSource sftpMessageSource() {
SftpInboundFileSynchronizingMessageSource source = new SftpInboundFileSynchronizingMessageSource(sftpInboundFileSynchronizer());
source.setLocalDirectory(new File(sftpLocalDirectory));
source.setAutoCreateLocalDirectory(true);
source.setMaxFetchSize(10);
return source;
}
@Bean
public MessageHandler fileWritingMessageHandler() {
FileWritingMessageHandler handler = new FileWritingMessageHandler(new File(sftpLocalDirectory));
handler.setAutoCreateDirectory(true);
handler.setExpectReply(false);
return handler;
}
@Bean
public IntegrationFlow sftpInboundFlow() {
return IntegrationFlow
.from(sftpMessageSource(), e -> e.poller(Pollers.fixedDelay(Duration.ofSeconds(7)).maxMessagesPerPoll(2)))
.handle(message -> {
System.out.println("Fetched file: " + message.getPayload());
})
.get();
}